generated from Luis/nextjs-python-web-template
don't add user to a secret if they already can access the secret
If the user is part of a group we don't need to add them explicitly
This commit is contained in:
@@ -19,6 +19,39 @@ from .sops import decrypt_file, encrypt_file, ensure_sops_key, read_key, update_
|
||||
from .types import VALID_SECRET_NAME, secret_name_type
|
||||
|
||||
|
||||
def collect_keys_for_type(folder: Path) -> set[str]:
|
||||
if not folder.exists():
|
||||
return set()
|
||||
keys = set()
|
||||
for p in folder.iterdir():
|
||||
if not p.is_symlink():
|
||||
continue
|
||||
try:
|
||||
target = p.resolve()
|
||||
except FileNotFoundError:
|
||||
tty.warn(f"Ignoring broken symlink {p}")
|
||||
continue
|
||||
kind = target.parent.name
|
||||
if folder.name != kind:
|
||||
tty.warn(f"Expected {p} to point to {folder} but points to {target.parent}")
|
||||
continue
|
||||
keys.add(read_key(target))
|
||||
return keys
|
||||
|
||||
|
||||
def collect_keys_for_path(path: Path) -> set[str]:
|
||||
keys = set([])
|
||||
keys.update(collect_keys_for_type(path / "machines"))
|
||||
keys.update(collect_keys_for_type(path / "users"))
|
||||
groups = path / "groups"
|
||||
if not groups.is_dir():
|
||||
return keys
|
||||
for group in groups.iterdir():
|
||||
keys.update(collect_keys_for_type(group / "machines"))
|
||||
keys.update(collect_keys_for_type(group / "users"))
|
||||
return keys
|
||||
|
||||
|
||||
def encrypt_secret(
|
||||
secret: Path,
|
||||
value: Union[IO[str], str],
|
||||
@@ -40,11 +73,7 @@ def encrypt_secret(
|
||||
for group in add_groups:
|
||||
allow_member(groups_folder(secret.name), sops_groups_folder(), group, False)
|
||||
|
||||
for kind in ["users", "machines", "groups"]:
|
||||
if not (sops_secrets_folder() / kind).is_dir():
|
||||
continue
|
||||
k = read_key(sops_secrets_folder() / kind)
|
||||
keys.add(k)
|
||||
keys = collect_keys_for_path(secret)
|
||||
|
||||
if key.pubkey not in keys:
|
||||
keys.add(key.pubkey)
|
||||
@@ -52,6 +81,7 @@ def encrypt_secret(
|
||||
users_folder(secret.name), sops_users_folder(), key.username, False
|
||||
)
|
||||
|
||||
breakpoint()
|
||||
encrypt_file(secret / "secret", value, list(sorted(keys)))
|
||||
|
||||
|
||||
@@ -79,39 +109,6 @@ def groups_folder(group: str) -> Path:
|
||||
return sops_secrets_folder() / group / "groups"
|
||||
|
||||
|
||||
def collect_keys_for_type(folder: Path) -> list[str]:
|
||||
if not folder.exists():
|
||||
return []
|
||||
keys = []
|
||||
for p in folder.iterdir():
|
||||
if not p.is_symlink():
|
||||
continue
|
||||
try:
|
||||
target = p.resolve()
|
||||
except FileNotFoundError:
|
||||
tty.warn(f"Ignoring broken symlink {p}")
|
||||
continue
|
||||
kind = target.parent.name
|
||||
if folder.name != kind:
|
||||
tty.warn(f"Expected {p} to point to {folder} but points to {target.parent}")
|
||||
continue
|
||||
keys.append(read_key(target))
|
||||
return keys
|
||||
|
||||
|
||||
def collect_keys_for_path(path: Path) -> list[str]:
|
||||
keys = []
|
||||
keys += collect_keys_for_type(path / "machines")
|
||||
keys += collect_keys_for_type(path / "users")
|
||||
groups = path / "groups"
|
||||
if not groups.is_dir():
|
||||
return keys
|
||||
for group in groups.iterdir():
|
||||
keys += collect_keys_for_type(group / "machines")
|
||||
keys += collect_keys_for_type(group / "users")
|
||||
return keys
|
||||
|
||||
|
||||
def allow_member(
|
||||
group_folder: Path, source_folder: Path, name: str, do_update_keys: bool = True
|
||||
) -> None:
|
||||
@@ -129,7 +126,10 @@ def allow_member(
|
||||
|
||||
user_target.symlink_to(os.path.relpath(source, user_target.parent))
|
||||
if do_update_keys:
|
||||
update_keys(group_folder.parent, collect_keys_for_path(group_folder.parent))
|
||||
update_keys(
|
||||
group_folder.parent,
|
||||
list(sorted(collect_keys_for_path(group_folder.parent))),
|
||||
)
|
||||
|
||||
|
||||
def disallow_member(group_folder: Path, name: str) -> None:
|
||||
@@ -151,7 +151,9 @@ def disallow_member(group_folder: Path, name: str) -> None:
|
||||
if len(os.listdir(group_folder.parent)) == 0:
|
||||
os.rmdir(group_folder.parent)
|
||||
|
||||
update_keys(target.parent.parent, collect_keys_for_path(group_folder.parent))
|
||||
update_keys(
|
||||
target.parent.parent, list(sorted(collect_keys_for_path(group_folder.parent)))
|
||||
)
|
||||
|
||||
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
|
||||
@@ -138,9 +138,7 @@ def encrypt_file(
|
||||
folder.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# hopefully /tmp is written to an in-memory file to avoid leaking secrets
|
||||
with NamedTemporaryFile(delete=False) as dummy_manifest_file, NamedTemporaryFile(
|
||||
delete=False
|
||||
) as f:
|
||||
with sops_manifest(keys) as manifest, NamedTemporaryFile(delete=False) as f:
|
||||
try:
|
||||
with open(f.name, "w") as fd:
|
||||
if isinstance(content, str):
|
||||
@@ -148,9 +146,7 @@ def encrypt_file(
|
||||
else:
|
||||
shutil.copyfileobj(content, fd)
|
||||
# we pass an empty manifest to pick up existing configuration of the user
|
||||
args = ["sops", "--config", dummy_manifest_file.name]
|
||||
for key in keys:
|
||||
args.extend(["--age", key])
|
||||
args = ["sops", "--config", str(manifest)]
|
||||
args.extend(["-i", "--encrypt", str(f.name)])
|
||||
cmd = nix_shell(["sops"], args)
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
Reference in New Issue
Block a user