add option to import sops secrets with groups,users,machines,prefixes
This commit is contained in:
@@ -9,50 +9,52 @@ from typing import IO, Union
|
||||
|
||||
from .. import tty
|
||||
from ..errors import ClanError
|
||||
from .folders import list_objects, sops_secrets_folder, sops_users_folder
|
||||
from .folders import (
|
||||
list_objects,
|
||||
sops_groups_folder,
|
||||
sops_machines_folder,
|
||||
sops_secrets_folder,
|
||||
sops_users_folder,
|
||||
)
|
||||
from .sops import decrypt_file, encrypt_file, ensure_sops_key, read_key, update_keys
|
||||
from .types import VALID_SECRET_NAME, secret_name_type
|
||||
|
||||
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
list_objects(
|
||||
sops_secrets_folder(), lambda n: VALID_SECRET_NAME.match(n) is not None
|
||||
)
|
||||
|
||||
|
||||
def get_command(args: argparse.Namespace) -> None:
|
||||
secret: str = args.secret
|
||||
ensure_sops_key()
|
||||
secret_path = sops_secrets_folder() / secret / "secret"
|
||||
if not secret_path.exists():
|
||||
raise ClanError(f"Secret '{secret}' does not exist")
|
||||
print(decrypt_file(secret_path), end="")
|
||||
|
||||
|
||||
def encrypt_secret(secret: Path, value: Union[IO[str], str]) -> None:
|
||||
def encrypt_secret(
|
||||
secret: Path,
|
||||
value: Union[IO[str], str],
|
||||
add_users: list[str] = [],
|
||||
add_machines: list[str] = [],
|
||||
add_groups: list[str] = [],
|
||||
) -> None:
|
||||
key = ensure_sops_key()
|
||||
keys = set([key.pubkey])
|
||||
keys = set([])
|
||||
|
||||
for user in add_users:
|
||||
allow_member(users_folder(secret.name), sops_users_folder(), user, False)
|
||||
|
||||
for machine in add_machines:
|
||||
allow_member(
|
||||
machines_folder(secret.name), sops_machines_folder(), machine, False
|
||||
)
|
||||
|
||||
for group in add_groups:
|
||||
allow_member(groups_folder(secret.name), sops_groups_folder(), group, False)
|
||||
|
||||
for kind in ["users", "machines", "groups"]:
|
||||
if not (sops_secrets_folder() / kind).is_dir():
|
||||
continue
|
||||
k = read_key(sops_secrets_folder() / kind)
|
||||
keys.add(k)
|
||||
|
||||
if key.pubkey not in keys:
|
||||
keys.add(key.pubkey)
|
||||
allow_member(
|
||||
users_folder(secret.name), sops_users_folder(), key.username, False
|
||||
)
|
||||
|
||||
encrypt_file(secret / "secret", value, list(sorted(keys)))
|
||||
|
||||
# make sure we add ourselves to the key
|
||||
allow_member(users_folder(secret.name), sops_users_folder(), key.username)
|
||||
|
||||
|
||||
def set_command(args: argparse.Namespace) -> None:
|
||||
secret_value = os.environ.get("SOPS_NIX_SECRET")
|
||||
if secret_value:
|
||||
encrypt_secret(sops_secrets_folder() / args.secret, StringIO(secret_value))
|
||||
elif tty.is_interactive():
|
||||
secret = getpass.getpass(prompt="Paste your secret: ")
|
||||
encrypt_secret(sops_secrets_folder() / args.secret, StringIO(secret))
|
||||
else:
|
||||
encrypt_secret(sops_secrets_folder() / args.secret, sys.stdin)
|
||||
|
||||
|
||||
def remove_command(args: argparse.Namespace) -> None:
|
||||
secret: str = args.secret
|
||||
@@ -111,7 +113,9 @@ def collect_keys_for_path(path: Path) -> list[str]:
|
||||
return keys
|
||||
|
||||
|
||||
def allow_member(group_folder: Path, source_folder: Path, name: str) -> None:
|
||||
def allow_member(
|
||||
group_folder: Path, source_folder: Path, name: str, do_update_keys: bool = True
|
||||
) -> None:
|
||||
source = source_folder / name
|
||||
if not source.exists():
|
||||
raise ClanError(f"{name} does not exist in {source_folder}")
|
||||
@@ -125,7 +129,8 @@ def allow_member(group_folder: Path, source_folder: Path, name: str) -> None:
|
||||
os.remove(user_target)
|
||||
|
||||
user_target.symlink_to(os.path.relpath(source, user_target.parent))
|
||||
update_keys(group_folder.parent, collect_keys_for_path(group_folder.parent))
|
||||
if do_update_keys:
|
||||
update_keys(group_folder.parent, collect_keys_for_path(group_folder.parent))
|
||||
|
||||
|
||||
def disallow_member(group_folder: Path, name: str) -> None:
|
||||
@@ -150,6 +155,32 @@ def disallow_member(group_folder: Path, name: str) -> None:
|
||||
update_keys(target.parent.parent, collect_keys_for_path(group_folder.parent))
|
||||
|
||||
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
list_objects(
|
||||
sops_secrets_folder(), lambda n: VALID_SECRET_NAME.match(n) is not None
|
||||
)
|
||||
|
||||
|
||||
def get_command(args: argparse.Namespace) -> None:
|
||||
secret: str = args.secret
|
||||
ensure_sops_key()
|
||||
secret_path = sops_secrets_folder() / secret / "secret"
|
||||
if not secret_path.exists():
|
||||
raise ClanError(f"Secret '{secret}' does not exist")
|
||||
print(decrypt_file(secret_path), end="")
|
||||
|
||||
|
||||
def set_command(args: argparse.Namespace) -> None:
|
||||
secret_value = os.environ.get("SOPS_NIX_SECRET")
|
||||
if secret_value:
|
||||
encrypt_secret(sops_secrets_folder() / args.secret, StringIO(secret_value))
|
||||
elif tty.is_interactive():
|
||||
secret = getpass.getpass(prompt="Paste your secret: ")
|
||||
encrypt_secret(sops_secrets_folder() / args.secret, StringIO(secret))
|
||||
else:
|
||||
encrypt_secret(sops_secrets_folder() / args.secret, sys.stdin)
|
||||
|
||||
|
||||
def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
|
||||
parser_list = subparser.add_parser("list", help="list secrets")
|
||||
parser_list.set_defaults(func=list_command)
|
||||
|
||||
Reference in New Issue
Block a user