From 718f6477742a2f3fe6b05cd69f6178379f29fcf9 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Sat, 14 Oct 2023 14:57:36 +0200 Subject: [PATCH] Added flake_name:str argument everywhere, nix fmt doesn't complain anymore --- pkgs/clan-cli/clan_cli/config/__init__.py | 21 +++-- pkgs/clan-cli/clan_cli/config/machine.py | 4 +- pkgs/clan-cli/clan_cli/dirs.py | 10 +-- pkgs/clan-cli/clan_cli/flakes/create.py | 4 +- pkgs/clan-cli/clan_cli/flakes/list.py | 4 +- pkgs/clan-cli/clan_cli/machines/create.py | 4 +- pkgs/clan-cli/clan_cli/machines/install.py | 4 +- pkgs/clan-cli/clan_cli/machines/machines.py | 8 +- pkgs/clan-cli/clan_cli/machines/update.py | 10 ++- pkgs/clan-cli/clan_cli/secrets/folders.py | 12 +-- pkgs/clan-cli/clan_cli/secrets/generate.py | 8 +- pkgs/clan-cli/clan_cli/secrets/groups.py | 86 +++++++++++-------- pkgs/clan-cli/clan_cli/secrets/import_sops.py | 5 +- pkgs/clan-cli/clan_cli/secrets/machines.py | 80 ++++++++++++----- pkgs/clan-cli/clan_cli/secrets/secrets.py | 84 ++++++++++++------ pkgs/clan-cli/clan_cli/secrets/sops.py | 16 ++-- .../clan_cli/secrets/sops_generate.py | 39 ++++++--- pkgs/clan-cli/clan_cli/secrets/upload.py | 8 +- pkgs/clan-cli/clan_cli/secrets/users.py | 58 +++++++++---- pkgs/clan-cli/clan_cli/vms/create.py | 9 +- pkgs/clan-cli/clan_cli/vms/inspect.py | 9 +- pkgs/clan-cli/clan_cli/webui/api_inputs.py | 4 +- pkgs/clan-cli/tests/fixtures_flakes.py | 27 +++--- pkgs/clan-cli/tests/test_dirs.py | 8 +- pkgs/clan-cli/tests/test_machines_config.py | 6 +- pkgs/clan-cli/tests/test_secrets_generate.py | 24 ++++-- .../tests/test_secrets_password_store.py | 13 +-- pkgs/clan-cli/tests/test_vms_api_create.py | 6 +- 28 files changed, 365 insertions(+), 206 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/config/__init__.py b/pkgs/clan-cli/clan_cli/config/__init__.py index b134dcc..e5dc749 100644 --- a/pkgs/clan-cli/clan_cli/config/__init__.py +++ b/pkgs/clan-cli/clan_cli/config/__init__.py @@ -9,7 +9,7 @@ import sys from pathlib import Path from typing import Any, Optional, Tuple, get_origin -from clan_cli.dirs import get_clan_flake_toplevel, machine_settings_file +from clan_cli.dirs import machine_settings_file, specific_flake_dir from clan_cli.errors import ClanError from clan_cli.git import commit_file from clan_cli.nix import nix_eval @@ -106,8 +106,10 @@ def cast(value: Any, type: Any, opt_description: str) -> Any: ) -def options_for_machine(machine_name: str, show_trace: bool = False) -> dict: - clan_dir = get_clan_flake_toplevel() +def options_for_machine( + flake_name: str, machine_name: str, show_trace: bool = False +) -> dict: + clan_dir = specific_flake_dir(flake_name) flags = [] if show_trace: flags.append("--show-trace") @@ -128,9 +130,9 @@ def options_for_machine(machine_name: str, show_trace: bool = False) -> dict: def read_machine_option_value( - machine_name: str, option: str, show_trace: bool = False + flake_name: str, machine_name: str, option: str, show_trace: bool = False ) -> str: - clan_dir = get_clan_flake_toplevel() + clan_dir = specific_flake_dir(flake_name) # use nix eval to read from .#nixosConfigurations.default.config.{option} # this will give us the evaluated config with the options attribute cmd = nix_eval( @@ -163,19 +165,19 @@ def get_or_set_option(args: argparse.Namespace) -> None: # load options if args.options_file is None: options = options_for_machine( - machine_name=args.machine, show_trace=args.show_trace + args.flake, machine_name=args.machine, show_trace=args.show_trace ) else: with open(args.options_file) as f: options = json.load(f) # compute settings json file location if args.settings_file is None: - get_clan_flake_toplevel() settings_file = machine_settings_file(args.flake, args.machine) else: settings_file = args.settings_file # set the option with the given value set_option( + flake_name=args.flake, option=args.option, value=args.value, options=options, @@ -184,7 +186,7 @@ def get_or_set_option(args: argparse.Namespace) -> None: show_trace=args.show_trace, ) if not args.quiet: - new_value = read_machine_option_value(args.machine, args.option) + new_value = read_machine_option_value(args.flake, args.machine, args.option) print(f"New Value for {args.option}:") print(new_value) @@ -241,6 +243,7 @@ def find_option( def set_option( + flake_name: str, option: str, value: Any, options: dict, @@ -289,7 +292,7 @@ def set_option( json.dump(new_config, f, indent=2) print(file=f) # add newline at the end of the file to make git happy - if settings_file.resolve().is_relative_to(get_clan_flake_toplevel()): + if settings_file.resolve().is_relative_to(specific_flake_dir(flake_name)): commit_file(settings_file, commit_message=f"Set option {option_description}") diff --git a/pkgs/clan-cli/clan_cli/config/machine.py b/pkgs/clan-cli/clan_cli/config/machine.py index 113978f..8ff6b60 100644 --- a/pkgs/clan-cli/clan_cli/config/machine.py +++ b/pkgs/clan-cli/clan_cli/config/machine.py @@ -6,9 +6,9 @@ from pathlib import Path from fastapi import HTTPException from clan_cli.dirs import ( - get_flake_path, machine_settings_file, nixpkgs_source, + specific_flake_dir, specific_machine_dir, ) from clan_cli.git import commit_file, find_git_repo_root @@ -47,7 +47,7 @@ def set_config_for_machine(flake_name: str, machine_name: str, config: dict) -> def schema_for_machine(flake_name: str, machine_name: str) -> dict: - flake = get_flake_path(flake_name) + flake = specific_flake_dir(flake_name) # use nix eval to lib.evalModules .#nixosModules.machine-{machine_name} proc = subprocess.run( diff --git a/pkgs/clan-cli/clan_cli/dirs.py b/pkgs/clan-cli/clan_cli/dirs.py index 8ada2f1..1ea596b 100644 --- a/pkgs/clan-cli/clan_cli/dirs.py +++ b/pkgs/clan-cli/clan_cli/dirs.py @@ -6,7 +6,7 @@ from typing import Optional from .errors import ClanError -def get_clan_flake_toplevel() -> Path: +def _get_clan_flake_toplevel() -> Path: return find_toplevel([".clan-flake", ".git", ".hg", ".svn", "flake.nix"]) @@ -61,22 +61,22 @@ def clan_config_dir() -> Path: return path.resolve() -def clan_flake_dir() -> Path: +def clan_flakes_dir() -> Path: path = clan_data_dir() / "flake" if not path.exists(): path.mkdir() return path.resolve() -def get_flake_path(name: str) -> Path: - flake_dir = clan_flake_dir() / name +def specific_flake_dir(name: str) -> Path: + flake_dir = clan_flakes_dir() / name if not flake_dir.exists(): raise ClanError(f"Flake {name} does not exist") return flake_dir def machines_dir(flake_name: str) -> Path: - return get_flake_path(flake_name) / "machines" + return specific_flake_dir(flake_name) / "machines" def specific_machine_dir(flake_name: str, machine: str) -> Path: diff --git a/pkgs/clan-cli/clan_cli/flakes/create.py b/pkgs/clan-cli/clan_cli/flakes/create.py index 343d33a..e35626d 100644 --- a/pkgs/clan-cli/clan_cli/flakes/create.py +++ b/pkgs/clan-cli/clan_cli/flakes/create.py @@ -7,7 +7,7 @@ from pydantic import AnyUrl from pydantic.tools import parse_obj_as from ..async_cmd import CmdOut, run, runforcli -from ..dirs import clan_flake_dir +from ..dirs import clan_flakes_dir from ..nix import nix_command, nix_shell DEFAULT_URL: AnyUrl = parse_obj_as( @@ -54,7 +54,7 @@ async def create_flake(directory: Path, url: AnyUrl) -> Dict[str, CmdOut]: def create_flake_command(args: argparse.Namespace) -> None: - flake_dir = clan_flake_dir() / args.name + flake_dir = clan_flakes_dir() / args.name runforcli(create_flake, flake_dir, DEFAULT_URL) diff --git a/pkgs/clan-cli/clan_cli/flakes/list.py b/pkgs/clan-cli/clan_cli/flakes/list.py index 8aa211a..bec34ef 100644 --- a/pkgs/clan-cli/clan_cli/flakes/list.py +++ b/pkgs/clan-cli/clan_cli/flakes/list.py @@ -2,13 +2,13 @@ import argparse import logging import os -from ..dirs import clan_flake_dir +from ..dirs import clan_flakes_dir log = logging.getLogger(__name__) def list_flakes() -> list[str]: - path = clan_flake_dir() + path = clan_flakes_dir() log.debug(f"Listing machines in {path}") if not path.exists(): return [] diff --git a/pkgs/clan-cli/clan_cli/machines/create.py b/pkgs/clan-cli/clan_cli/machines/create.py index 9575ace..f96beea 100644 --- a/pkgs/clan-cli/clan_cli/machines/create.py +++ b/pkgs/clan-cli/clan_cli/machines/create.py @@ -3,7 +3,7 @@ import logging from typing import Dict from ..async_cmd import CmdOut, run, runforcli -from ..dirs import get_flake_path, specific_machine_dir +from ..dirs import specific_flake_dir, specific_machine_dir from ..errors import ClanError from ..nix import nix_shell @@ -35,7 +35,7 @@ async def create_machine(flake_name: str, machine_name: str) -> Dict[str, CmdOut def create_command(args: argparse.Namespace) -> None: try: - flake_dir = get_flake_path(args.flake) + flake_dir = specific_flake_dir(args.flake) runforcli(create_machine, flake_dir, args.machine) except ClanError as e: print(e) diff --git a/pkgs/clan-cli/clan_cli/machines/install.py b/pkgs/clan-cli/clan_cli/machines/install.py index 2969caf..1881b32 100644 --- a/pkgs/clan-cli/clan_cli/machines/install.py +++ b/pkgs/clan-cli/clan_cli/machines/install.py @@ -3,7 +3,7 @@ import subprocess from pathlib import Path from tempfile import TemporaryDirectory -from ..dirs import get_flake_path +from ..dirs import specific_flake_dir from ..machines.machines import Machine from ..nix import nix_shell from ..secrets.generate import generate_secrets @@ -40,7 +40,7 @@ def install_nixos(machine: Machine) -> None: def install_command(args: argparse.Namespace) -> None: - machine = Machine(args.machine, flake_dir=get_flake_path(args.flake)) + machine = Machine(args.machine, flake_dir=specific_flake_dir(args.flake)) machine.deployment_address = args.target_host install_nixos(machine) diff --git a/pkgs/clan-cli/clan_cli/machines/machines.py b/pkgs/clan-cli/clan_cli/machines/machines.py index 9224fcc..db6b974 100644 --- a/pkgs/clan-cli/clan_cli/machines/machines.py +++ b/pkgs/clan-cli/clan_cli/machines/machines.py @@ -5,7 +5,6 @@ import sys from pathlib import Path from typing import Optional -from ..dirs import get_clan_flake_toplevel from ..nix import nix_build, nix_config, nix_eval from ..ssh import Host, parse_deployment_address @@ -31,7 +30,7 @@ class Machine: def __init__( self, name: str, - flake_dir: Optional[Path] = None, + flake_dir: Path, machine_data: Optional[dict] = None, ) -> None: """ @@ -41,10 +40,7 @@ class Machine: @machine_json: can be optionally used to skip evaluation of the machine, location of the json file with machine data """ self.name = name - if flake_dir is None: - self.flake_dir = get_clan_flake_toplevel() - else: - self.flake_dir = flake_dir + self.flake_dir = flake_dir if machine_data is None: self.machine_data = build_machine_data(name, self.flake_dir) diff --git a/pkgs/clan-cli/clan_cli/machines/update.py b/pkgs/clan-cli/clan_cli/machines/update.py index c3dc3e2..c5c7d0d 100644 --- a/pkgs/clan-cli/clan_cli/machines/update.py +++ b/pkgs/clan-cli/clan_cli/machines/update.py @@ -4,7 +4,7 @@ import os import subprocess from pathlib import Path -from ..dirs import get_flake_path +from ..dirs import specific_flake_dir from ..machines.machines import Machine from ..nix import nix_build, nix_command, nix_config from ..secrets.generate import generate_secrets @@ -95,7 +95,11 @@ def get_all_machines(clan_dir: Path) -> HostGroup: host = parse_deployment_address( name, machine_data["deploymentAddress"], - meta={"machine": Machine(name=name, machine_data=machine_data)}, + meta={ + "machine": Machine( + name=name, flake_dir=clan_dir, machine_data=machine_data + ) + }, ) hosts.append(host) return HostGroup(hosts) @@ -111,7 +115,7 @@ def get_selected_machines(machine_names: list[str], flake_dir: Path) -> HostGrou # FIXME: we want some kind of inventory here. def update(args: argparse.Namespace) -> None: - flake_dir = get_flake_path(args.flake) + flake_dir = specific_flake_dir(args.flake) if len(args.machines) == 1 and args.target_host is not None: machine = Machine(name=args.machines[0], flake_dir=flake_dir) machine.deployment_address = args.target_host diff --git a/pkgs/clan-cli/clan_cli/secrets/folders.py b/pkgs/clan-cli/clan_cli/secrets/folders.py index f9e8d31..4659a7f 100644 --- a/pkgs/clan-cli/clan_cli/secrets/folders.py +++ b/pkgs/clan-cli/clan_cli/secrets/folders.py @@ -3,17 +3,17 @@ import shutil from pathlib import Path from typing import Callable -from ..dirs import get_clan_flake_toplevel +from ..dirs import specific_flake_dir from ..errors import ClanError -def get_sops_folder() -> Path: - return get_clan_flake_toplevel() / "sops" +def get_sops_folder(flake_name: str) -> Path: + return specific_flake_dir(flake_name) / "sops" -def gen_sops_subfolder(subdir: str) -> Callable[[], Path]: - def folder() -> Path: - return get_clan_flake_toplevel() / "sops" / subdir +def gen_sops_subfolder(subdir: str) -> Callable[[str], Path]: + def folder(flake_name: str) -> Path: + return specific_flake_dir(flake_name) / "sops" / subdir return folder diff --git a/pkgs/clan-cli/clan_cli/secrets/generate.py b/pkgs/clan-cli/clan_cli/secrets/generate.py index 0dea6e3..d299299 100644 --- a/pkgs/clan-cli/clan_cli/secrets/generate.py +++ b/pkgs/clan-cli/clan_cli/secrets/generate.py @@ -6,6 +6,7 @@ import sys from clan_cli.errors import ClanError +from ..dirs import specific_flake_dir from ..machines.machines import Machine log = logging.getLogger(__name__) @@ -29,7 +30,7 @@ def generate_secrets(machine: Machine) -> None: def generate_command(args: argparse.Namespace) -> None: - machine = Machine(args.machine) + machine = Machine(name=args.machine, flake_dir=specific_flake_dir(args.flake)) generate_secrets(machine) @@ -38,4 +39,9 @@ def register_generate_parser(parser: argparse.ArgumentParser) -> None: "machine", help="The machine to generate secrets for", ) + parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) parser.set_defaults(func=generate_command) diff --git a/pkgs/clan-cli/clan_cli/secrets/groups.py b/pkgs/clan-cli/clan_cli/secrets/groups.py index de6a98c..4e5faf0 100644 --- a/pkgs/clan-cli/clan_cli/secrets/groups.py +++ b/pkgs/clan-cli/clan_cli/secrets/groups.py @@ -20,24 +20,27 @@ from .types import ( ) -def machines_folder(group: str) -> Path: - return sops_groups_folder() / group / "machines" +def machines_folder(flake_name: str, group: str) -> Path: + return sops_groups_folder(flake_name) / group / "machines" -def users_folder(group: str) -> Path: - return sops_groups_folder() / group / "users" +def users_folder(flake_name: str, group: str) -> Path: + return sops_groups_folder(flake_name) / group / "users" class Group: - def __init__(self, name: str, machines: list[str], users: list[str]) -> None: + def __init__( + self, flake_name: str, name: str, machines: list[str], users: list[str] + ) -> None: self.name = name self.machines = machines self.users = users + self.flake_name = flake_name -def list_groups() -> list[Group]: +def list_groups(flake_name: str) -> list[Group]: groups: list[Group] = [] - folder = sops_groups_folder() + folder = sops_groups_folder(flake_name) if not folder.exists(): return groups @@ -45,24 +48,24 @@ def list_groups() -> list[Group]: group_folder = folder / name if not group_folder.is_dir(): continue - machines_path = machines_folder(name) + machines_path = machines_folder(flake_name, name) machines = [] if machines_path.is_dir(): for f in machines_path.iterdir(): if validate_hostname(f.name): machines.append(f.name) - users_path = users_folder(name) + users_path = users_folder(flake_name, name) users = [] if users_path.is_dir(): for f in users_path.iterdir(): if VALID_USER_NAME.match(f.name): users.append(f.name) - groups.append(Group(name, machines, users)) + groups.append(Group(flake_name, name, machines, users)) return groups def list_command(args: argparse.Namespace) -> None: - for group in list_groups(): + for group in list_groups(args.flake): print(group.name) if group.machines: print("machines:") @@ -84,9 +87,9 @@ def list_directory(directory: Path) -> str: return msg -def update_group_keys(group: str) -> None: - for secret_ in secrets.list_secrets(): - secret = sops_secrets_folder() / secret_ +def update_group_keys(flake_name: str, group: str) -> None: + for secret_ in secrets.list_secrets(flake_name): + secret = sops_secrets_folder(flake_name) / secret_ if (secret / "groups" / group).is_symlink(): update_keys( secret, @@ -94,7 +97,9 @@ def update_group_keys(group: str) -> None: ) -def add_member(group_folder: Path, source_folder: Path, name: str) -> None: +def add_member( + flake_name: str, group_folder: Path, source_folder: Path, name: str +) -> None: source = source_folder / name if not source.exists(): msg = f"{name} does not exist in {source_folder}: " @@ -109,10 +114,10 @@ def add_member(group_folder: Path, source_folder: Path, name: str) -> None: ) os.remove(user_target) user_target.symlink_to(os.path.relpath(source, user_target.parent)) - update_group_keys(group_folder.parent.name) + update_group_keys(flake_name, group_folder.parent.name) -def remove_member(group_folder: Path, name: str) -> None: +def remove_member(flake_name: str, group_folder: Path, name: str) -> None: target = group_folder / name if not target.exists(): msg = f"{name} does not exist in group in {group_folder}: " @@ -121,7 +126,7 @@ def remove_member(group_folder: Path, name: str) -> None: os.remove(target) if len(os.listdir(group_folder)) > 0: - update_group_keys(group_folder.parent.name) + update_group_keys(flake_name, group_folder.parent.name) if len(os.listdir(group_folder)) == 0: os.rmdir(group_folder) @@ -130,56 +135,65 @@ def remove_member(group_folder: Path, name: str) -> None: os.rmdir(group_folder.parent) -def add_user(group: str, name: str) -> None: - add_member(users_folder(group), sops_users_folder(), name) +def add_user(flake_name: str, group: str, name: str) -> None: + add_member( + flake_name, users_folder(flake_name, group), sops_users_folder(flake_name), name + ) def add_user_command(args: argparse.Namespace) -> None: - add_user(args.group, args.user) + add_user(args.flake, args.group, args.user) -def remove_user(group: str, name: str) -> None: - remove_member(users_folder(group), name) +def remove_user(flake_name: str, group: str, name: str) -> None: + remove_member(flake_name, users_folder(flake_name, group), name) def remove_user_command(args: argparse.Namespace) -> None: - remove_user(args.group, args.user) + remove_user(args.flake, args.group, args.user) -def add_machine(group: str, name: str) -> None: - add_member(machines_folder(group), sops_machines_folder(), name) +def add_machine(flake_name: str, group: str, name: str) -> None: + add_member( + flake_name, + machines_folder(flake_name, group), + sops_machines_folder(flake_name), + name, + ) def add_machine_command(args: argparse.Namespace) -> None: - add_machine(args.group, args.machine) + add_machine(args.flake, args.group, args.machine) -def remove_machine(group: str, name: str) -> None: - remove_member(machines_folder(group), name) +def remove_machine(flake_name: str, group: str, name: str) -> None: + remove_member(flake_name, machines_folder(flake_name, group), name) def remove_machine_command(args: argparse.Namespace) -> None: - remove_machine(args.group, args.machine) + remove_machine(args.flake, args.group, args.machine) def add_group_argument(parser: argparse.ArgumentParser) -> None: parser.add_argument("group", help="the name of the secret", type=group_name_type) -def add_secret(group: str, name: str) -> None: - secrets.allow_member(secrets.groups_folder(name), sops_groups_folder(), group) +def add_secret(flake_name: str, group: str, name: str) -> None: + secrets.allow_member( + secrets.groups_folder(flake_name, name), sops_groups_folder(flake_name), group + ) def add_secret_command(args: argparse.Namespace) -> None: - add_secret(args.group, args.secret) + add_secret(args.flake, args.group, args.secret) -def remove_secret(group: str, name: str) -> None: - secrets.disallow_member(secrets.groups_folder(name), group) +def remove_secret(flake_name: str, group: str, name: str) -> None: + secrets.disallow_member(secrets.groups_folder(flake_name, name), group) def remove_secret_command(args: argparse.Namespace) -> None: - remove_secret(args.group, args.secret) + remove_secret(args.flake, args.group, args.secret) def register_groups_parser(parser: argparse.ArgumentParser) -> None: diff --git a/pkgs/clan-cli/clan_cli/secrets/import_sops.py b/pkgs/clan-cli/clan_cli/secrets/import_sops.py index bfac456..698a618 100644 --- a/pkgs/clan-cli/clan_cli/secrets/import_sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/import_sops.py @@ -36,14 +36,15 @@ def import_sops(args: argparse.Namespace) -> None: file=sys.stderr, ) continue - if (sops_secrets_folder() / k / "secret").exists(): + if (sops_secrets_folder(args.flake) / k / "secret").exists(): print( f"WARNING: {k} already exists, skipping", file=sys.stderr, ) continue encrypt_secret( - sops_secrets_folder() / k, + args.flake, + sops_secrets_folder(args.flake) / k, v, add_groups=args.group, add_machines=args.machine, diff --git a/pkgs/clan-cli/clan_cli/secrets/machines.py b/pkgs/clan-cli/clan_cli/secrets/machines.py index 24da93a..b78d52d 100644 --- a/pkgs/clan-cli/clan_cli/secrets/machines.py +++ b/pkgs/clan-cli/clan_cli/secrets/machines.py @@ -7,65 +7,67 @@ from .sops import read_key, write_key from .types import public_or_private_age_key_type, secret_name_type -def add_machine(name: str, key: str, force: bool) -> None: - write_key(sops_machines_folder() / name, key, force) +def add_machine(flake_name: str, name: str, key: str, force: bool) -> None: + write_key(sops_machines_folder(flake_name) / name, key, force) -def remove_machine(name: str) -> None: - remove_object(sops_machines_folder(), name) +def remove_machine(flake_name: str, name: str) -> None: + remove_object(sops_machines_folder(flake_name), name) -def get_machine(name: str) -> str: - return read_key(sops_machines_folder() / name) +def get_machine(flake_name: str, name: str) -> str: + return read_key(sops_machines_folder(flake_name) / name) -def has_machine(name: str) -> bool: - return (sops_machines_folder() / name / "key.json").exists() +def has_machine(flake_name: str, name: str) -> bool: + return (sops_machines_folder(flake_name) / name / "key.json").exists() -def list_machines() -> list[str]: - path = sops_machines_folder() +def list_machines(flake_name: str) -> list[str]: + path = sops_machines_folder(flake_name) def validate(name: str) -> bool: - return validate_hostname(name) and has_machine(name) + return validate_hostname(name) and has_machine(flake_name, name) return list_objects(path, validate) -def add_secret(machine: str, secret: str) -> None: +def add_secret(flake_name: str, machine: str, secret: str) -> None: secrets.allow_member( - secrets.machines_folder(secret), sops_machines_folder(), machine + secrets.machines_folder(flake_name, secret), + sops_machines_folder(flake_name), + machine, ) -def remove_secret(machine: str, secret: str) -> None: - secrets.disallow_member(secrets.machines_folder(secret), machine) +def remove_secret(flake_name: str, machine: str, secret: str) -> None: + secrets.disallow_member(secrets.machines_folder(flake_name, secret), machine) def list_command(args: argparse.Namespace) -> None: - lst = list_machines() + lst = list_machines(args.flake) if len(lst) > 0: print("\n".join(lst)) def add_command(args: argparse.Namespace) -> None: - add_machine(args.machine, args.key, args.force) + add_machine(args.flake, args.machine, args.key, args.force) def get_command(args: argparse.Namespace) -> None: - print(get_machine(args.machine)) + print(get_machine(args.flake, args.machine)) def remove_command(args: argparse.Namespace) -> None: - remove_machine(args.machine) + remove_machine(args.flake, args.machine) def add_secret_command(args: argparse.Namespace) -> None: - add_secret(args.machine, args.secret) + add_secret(args.flake, args.machine, args.secret) def remove_secret_command(args: argparse.Namespace) -> None: - remove_secret(args.machine, args.secret) + remove_secret(args.flake, args.machine, args.secret) def register_machines_parser(parser: argparse.ArgumentParser) -> None: @@ -75,9 +77,16 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None: help="the command to run", required=True, ) + # Parser list_parser = subparser.add_parser("list", help="list machines") + list_parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) list_parser.set_defaults(func=list_command) + # Parser add_parser = subparser.add_parser("add", help="add a machine") add_parser.add_argument( "-f", @@ -86,6 +95,11 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None: action="store_true", default=False, ) + add_parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) add_parser.add_argument( "machine", help="the name of the machine", type=machine_name_type ) @@ -96,21 +110,39 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None: ) add_parser.set_defaults(func=add_command) + # Parser get_parser = subparser.add_parser("get", help="get a machine public key") get_parser.add_argument( "machine", help="the name of the machine", type=machine_name_type ) + get_parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) get_parser.set_defaults(func=get_command) + # Parser remove_parser = subparser.add_parser("remove", help="remove a machine") + remove_parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) remove_parser.add_argument( "machine", help="the name of the machine", type=machine_name_type ) remove_parser.set_defaults(func=remove_command) + # Parser add_secret_parser = subparser.add_parser( "add-secret", help="allow a machine to access a secret" ) + add_secret_parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) add_secret_parser.add_argument( "machine", help="the name of the machine", type=machine_name_type ) @@ -119,9 +151,15 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None: ) add_secret_parser.set_defaults(func=add_secret_command) + # Parser remove_secret_parser = subparser.add_parser( "remove-secret", help="remove a group's access to a secret" ) + remove_secret_parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) remove_secret_parser.add_argument( "machine", help="the name of the group", type=machine_name_type ) diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py index f607c3a..62065fd 100644 --- a/pkgs/clan-cli/clan_cli/secrets/secrets.py +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -53,62 +53,79 @@ def collect_keys_for_path(path: Path) -> set[str]: def encrypt_secret( + flake_name: str, secret: Path, value: IO[str] | str | None, add_users: list[str] = [], add_machines: list[str] = [], add_groups: list[str] = [], ) -> None: - key = ensure_sops_key() + key = ensure_sops_key(flake_name) keys = set([]) for user in add_users: - allow_member(users_folder(secret.name), sops_users_folder(), user, False) + allow_member( + users_folder(flake_name, secret.name), + sops_users_folder(flake_name), + user, + False, + ) for machine in add_machines: allow_member( - machines_folder(secret.name), sops_machines_folder(), machine, False + machines_folder(flake_name, secret.name), + sops_machines_folder(flake_name), + machine, + False, ) for group in add_groups: - allow_member(groups_folder(secret.name), sops_groups_folder(), group, False) + allow_member( + groups_folder(flake_name, secret.name), + sops_groups_folder(flake_name), + group, + False, + ) keys = collect_keys_for_path(secret) if key.pubkey not in keys: keys.add(key.pubkey) allow_member( - users_folder(secret.name), sops_users_folder(), key.username, False + users_folder(flake_name, secret.name), + sops_users_folder(flake_name), + key.username, + False, ) encrypt_file(secret / "secret", value, list(sorted(keys))) -def remove_secret(secret: str) -> None: - path = sops_secrets_folder() / secret +def remove_secret(flake_name: str, secret: str) -> None: + path = sops_secrets_folder(flake_name) / secret if not path.exists(): raise ClanError(f"Secret '{secret}' does not exist") shutil.rmtree(path) def remove_command(args: argparse.Namespace) -> None: - remove_secret(args.secret) + remove_secret(args.flake, args.secret) def add_secret_argument(parser: argparse.ArgumentParser) -> None: parser.add_argument("secret", help="the name of the secret", type=secret_name_type) -def machines_folder(group: str) -> Path: - return sops_secrets_folder() / group / "machines" +def machines_folder(flake_name: str, group: str) -> Path: + return sops_secrets_folder(flake_name) / group / "machines" -def users_folder(group: str) -> Path: - return sops_secrets_folder() / group / "users" +def users_folder(flake_name: str, group: str) -> Path: + return sops_secrets_folder(flake_name) / group / "users" -def groups_folder(group: str) -> Path: - return sops_secrets_folder() / group / "groups" +def groups_folder(flake_name: str, group: str) -> Path: + return sops_secrets_folder(flake_name) / group / "groups" def list_directory(directory: Path) -> str: @@ -171,35 +188,37 @@ def disallow_member(group_folder: Path, name: str) -> None: ) -def has_secret(secret: str) -> bool: - return (sops_secrets_folder() / secret / "secret").exists() +def has_secret(flake_name: str, secret: str) -> bool: + return (sops_secrets_folder(flake_name) / secret / "secret").exists() -def list_secrets() -> list[str]: - path = sops_secrets_folder() +def list_secrets(flake_name: str) -> list[str]: + path = sops_secrets_folder(flake_name) def validate(name: str) -> bool: - return VALID_SECRET_NAME.match(name) is not None and has_secret(name) + return VALID_SECRET_NAME.match(name) is not None and has_secret( + flake_name, name + ) return list_objects(path, validate) def list_command(args: argparse.Namespace) -> None: - lst = list_secrets() + lst = list_secrets(args.flake) if len(lst) > 0: print("\n".join(lst)) -def decrypt_secret(secret: str) -> str: - ensure_sops_key() - secret_path = sops_secrets_folder() / secret / "secret" +def decrypt_secret(flake_name: str, secret: str) -> str: + ensure_sops_key(flake_name) + secret_path = sops_secrets_folder(flake_name) / secret / "secret" if not secret_path.exists(): raise ClanError(f"Secret '{secret}' does not exist") return decrypt_file(secret_path) def get_command(args: argparse.Namespace) -> None: - print(decrypt_secret(args.secret), end="") + print(decrypt_secret(args.flake, args.secret), end="") def set_command(args: argparse.Namespace) -> None: @@ -212,7 +231,8 @@ def set_command(args: argparse.Namespace) -> None: elif tty.is_interactive(): secret_value = getpass.getpass(prompt="Paste your secret: ") encrypt_secret( - sops_secrets_folder() / args.secret, + args.flake, + sops_secrets_folder(args.flake) / args.secret, secret_value, args.user, args.machine, @@ -221,8 +241,8 @@ def set_command(args: argparse.Namespace) -> None: def rename_command(args: argparse.Namespace) -> None: - old_path = sops_secrets_folder() / args.secret - new_path = sops_secrets_folder() / args.new_name + old_path = sops_secrets_folder(args.flake) / args.secret + new_path = sops_secrets_folder(args.flake) / args.new_name if not old_path.exists(): raise ClanError(f"Secret '{args.secret}' does not exist") if new_path.exists(): @@ -237,9 +257,19 @@ def register_secrets_parser(subparser: argparse._SubParsersAction) -> None: parser_get = subparser.add_parser("get", help="get a secret") add_secret_argument(parser_get) parser_get.set_defaults(func=get_command) + parser_get.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) parser_set = subparser.add_parser("set", help="set a secret") add_secret_argument(parser_set) + parser_set.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) parser_set.add_argument( "--group", type=str, diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index b79b41c..c9a389b 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -51,7 +51,7 @@ def generate_private_key() -> tuple[str, str]: raise ClanError("Failed to generate private sops key") from e -def get_user_name(user: str) -> str: +def get_user_name(flake_name: str, user: str) -> str: """Ask the user for their name until a unique one is provided.""" while True: name = input( @@ -59,14 +59,14 @@ def get_user_name(user: str) -> str: ) if name: user = name - if not (sops_users_folder() / user).exists(): + if not (sops_users_folder(flake_name) / user).exists(): return user - print(f"{sops_users_folder() / user} already exists") + print(f"{sops_users_folder(flake_name) / user} already exists") -def ensure_user_or_machine(pub_key: str) -> SopsKey: +def ensure_user_or_machine(flake_name: str, pub_key: str) -> SopsKey: key = SopsKey(pub_key, username="") - folders = [sops_users_folder(), sops_machines_folder()] + folders = [sops_users_folder(flake_name), sops_machines_folder(flake_name)] for folder in folders: if folder.exists(): for user in folder.iterdir(): @@ -90,13 +90,13 @@ def default_sops_key_path() -> Path: return user_config_dir() / "sops" / "age" / "keys.txt" -def ensure_sops_key() -> SopsKey: +def ensure_sops_key(flake_name: str) -> SopsKey: key = os.environ.get("SOPS_AGE_KEY") if key: - return ensure_user_or_machine(get_public_key(key)) + return ensure_user_or_machine(flake_name, get_public_key(key)) path = default_sops_key_path() if path.exists(): - return ensure_user_or_machine(get_public_key(path.read_text())) + return ensure_user_or_machine(flake_name, get_public_key(path.read_text())) else: raise ClanError( "No sops key found. Please generate one with 'clan secrets key generate'." diff --git a/pkgs/clan-cli/clan_cli/secrets/sops_generate.py b/pkgs/clan-cli/clan_cli/secrets/sops_generate.py index 73fd1dd..f6657a5 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops_generate.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops_generate.py @@ -9,7 +9,7 @@ from typing import Any from clan_cli.nix import nix_shell -from ..dirs import get_clan_flake_toplevel +from ..dirs import specific_flake_dir from ..errors import ClanError from .folders import sops_secrets_folder from .machines import add_machine, has_machine @@ -17,21 +17,29 @@ from .secrets import decrypt_secret, encrypt_secret, has_secret from .sops import generate_private_key -def generate_host_key(machine_name: str) -> None: - if has_machine(machine_name): +def generate_host_key(flake_name: str, machine_name: str) -> None: + if has_machine(flake_name, machine_name): return priv_key, pub_key = generate_private_key() - encrypt_secret(sops_secrets_folder() / f"{machine_name}-age.key", priv_key) - add_machine(machine_name, pub_key, False) + encrypt_secret( + flake_name, + sops_secrets_folder(flake_name) / f"{machine_name}-age.key", + priv_key, + ) + add_machine(flake_name, machine_name, pub_key, False) def generate_secrets_group( - secret_group: str, machine_name: str, tempdir: Path, secret_options: dict[str, Any] + flake_name: str, + secret_group: str, + machine_name: str, + tempdir: Path, + secret_options: dict[str, Any], ) -> None: - clan_dir = get_clan_flake_toplevel() + clan_dir = specific_flake_dir(flake_name) secrets = secret_options["secrets"] needs_regeneration = any( - not has_secret(f"{machine_name}-{secret['name']}") + not has_secret(flake_name, f"{machine_name}-{secret['name']}") for secret in secrets.values() ) generator = secret_options["generator"] @@ -62,7 +70,8 @@ export secrets={shlex.quote(str(secrets_dir))} msg += text raise ClanError(msg) encrypt_secret( - sops_secrets_folder() / f"{machine_name}-{secret['name']}", + flake_name, + sops_secrets_folder(flake_name) / f"{machine_name}-{secret['name']}", secret_file.read_text(), add_machines=[machine_name], ) @@ -79,17 +88,18 @@ export secrets={shlex.quote(str(secrets_dir))} # this is called by the sops.nix clan core module def generate_secrets_from_nix( + flake_name: str, machine_name: str, secret_submodules: dict[str, Any], ) -> None: - generate_host_key(machine_name) + generate_host_key(flake_name, machine_name) errors = {} with TemporaryDirectory() as d: # if any of the secrets are missing, we regenerate all connected facts/secrets for secret_group, secret_options in secret_submodules.items(): try: generate_secrets_group( - secret_group, machine_name, Path(d), secret_options + flake_name, secret_group, machine_name, Path(d), secret_options ) except ClanError as e: errors[secret_group] = e @@ -102,12 +112,15 @@ def generate_secrets_from_nix( # this is called by the sops.nix clan core module def upload_age_key_from_nix( + flake_name: str, machine_name: str, ) -> None: secret_name = f"{machine_name}-age.key" - if not has_secret(secret_name): # skip uploading the secret, not managed by us + if not has_secret( + flake_name, secret_name + ): # skip uploading the secret, not managed by us return - secret = decrypt_secret(secret_name) + secret = decrypt_secret(flake_name, secret_name) secrets_dir = Path(os.environ["SECRETS_DIR"]) (secrets_dir / "key.txt").write_text(secret) diff --git a/pkgs/clan-cli/clan_cli/secrets/upload.py b/pkgs/clan-cli/clan_cli/secrets/upload.py index 5e31a95..46c217d 100644 --- a/pkgs/clan-cli/clan_cli/secrets/upload.py +++ b/pkgs/clan-cli/clan_cli/secrets/upload.py @@ -4,6 +4,7 @@ import subprocess from pathlib import Path from tempfile import TemporaryDirectory +from ..dirs import specific_flake_dir from ..machines.machines import Machine from ..nix import nix_shell @@ -37,7 +38,7 @@ def upload_secrets(machine: Machine) -> None: def upload_command(args: argparse.Namespace) -> None: - machine = Machine(args.machine) + machine = Machine(name=args.machine, flake_dir=specific_flake_dir(args.flake)) upload_secrets(machine) @@ -46,4 +47,9 @@ def register_upload_parser(parser: argparse.ArgumentParser) -> None: "machine", help="The machine to upload secrets to", ) + parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) parser.set_defaults(func=upload_command) diff --git a/pkgs/clan-cli/clan_cli/secrets/users.py b/pkgs/clan-cli/clan_cli/secrets/users.py index 2faaf64..a653f29 100644 --- a/pkgs/clan-cli/clan_cli/secrets/users.py +++ b/pkgs/clan-cli/clan_cli/secrets/users.py @@ -11,20 +11,20 @@ from .types import ( ) -def add_user(name: str, key: str, force: bool) -> None: - write_key(sops_users_folder() / name, key, force) +def add_user(flake_name: str, name: str, key: str, force: bool) -> None: + write_key(sops_users_folder(flake_name) / name, key, force) -def remove_user(name: str) -> None: - remove_object(sops_users_folder(), name) +def remove_user(flake_name: str, name: str) -> None: + remove_object(sops_users_folder(flake_name), name) -def get_user(name: str) -> str: - return read_key(sops_users_folder() / name) +def get_user(flake_name: str, name: str) -> str: + return read_key(sops_users_folder(flake_name) / name) -def list_users() -> list[str]: - path = sops_users_folder() +def list_users(flake_name: str) -> list[str]: + path = sops_users_folder(flake_name) def validate(name: str) -> bool: return ( @@ -35,38 +35,40 @@ def list_users() -> list[str]: return list_objects(path, validate) -def add_secret(user: str, secret: str) -> None: - secrets.allow_member(secrets.users_folder(secret), sops_users_folder(), user) +def add_secret(flake_name: str, user: str, secret: str) -> None: + secrets.allow_member( + secrets.users_folder(flake_name, secret), sops_users_folder(flake_name), user + ) -def remove_secret(user: str, secret: str) -> None: - secrets.disallow_member(secrets.users_folder(secret), user) +def remove_secret(flake_name: str, user: str, secret: str) -> None: + secrets.disallow_member(secrets.users_folder(flake_name, secret), user) def list_command(args: argparse.Namespace) -> None: - lst = list_users() + lst = list_users(args.flake) if len(lst) > 0: print("\n".join(lst)) def add_command(args: argparse.Namespace) -> None: - add_user(args.user, args.key, args.force) + add_user(args.flake, args.user, args.key, args.force) def get_command(args: argparse.Namespace) -> None: - print(get_user(args.user)) + print(get_user(args.flake, args.user)) def remove_command(args: argparse.Namespace) -> None: - remove_user(args.user) + remove_user(args.flake, args.user) def add_secret_command(args: argparse.Namespace) -> None: - add_secret(args.user, args.secret) + add_secret(args.flake, args.user, args.secret) def remove_secret_command(args: argparse.Namespace) -> None: - remove_secret(args.user, args.secret) + remove_secret(args.flake, args.user, args.secret) def register_users_parser(parser: argparse.ArgumentParser) -> None: @@ -77,6 +79,11 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None: required=True, ) list_parser = subparser.add_parser("list", help="list users") + list_parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) list_parser.set_defaults(func=list_command) add_parser = subparser.add_parser("add", help="add a user") @@ -90,14 +97,29 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None: type=public_or_private_age_key_type, ) add_parser.set_defaults(func=add_command) + add_parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) get_parser = subparser.add_parser("get", help="get a user public key") get_parser.add_argument("user", help="the name of the user", type=user_name_type) get_parser.set_defaults(func=get_command) + get_parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) remove_parser = subparser.add_parser("remove", help="remove a user") remove_parser.add_argument("user", help="the name of the user", type=user_name_type) remove_parser.set_defaults(func=remove_command) + remove_parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) add_secret_parser = subparser.add_parser( "add-secret", help="allow a user to access a secret" diff --git a/pkgs/clan-cli/clan_cli/vms/create.py b/pkgs/clan-cli/clan_cli/vms/create.py index 1f78608..2df7b9a 100644 --- a/pkgs/clan-cli/clan_cli/vms/create.py +++ b/pkgs/clan-cli/clan_cli/vms/create.py @@ -9,7 +9,7 @@ from pathlib import Path from typing import Iterator from uuid import UUID -from ..dirs import get_clan_flake_toplevel +from ..dirs import specific_flake_dir from ..nix import nix_build, nix_config, nix_shell from ..task_manager import BaseTask, Command, create_task from .inspect import VmConfig, inspect_vm @@ -147,7 +147,7 @@ def create_vm(vm: VmConfig) -> BuildVmTask: def create_command(args: argparse.Namespace) -> None: - clan_dir = get_clan_flake_toplevel() + clan_dir = specific_flake_dir(args.flake) vm = asyncio.run(inspect_vm(flake_url=clan_dir, flake_attr=args.machine)) task = create_vm(vm) @@ -157,4 +157,9 @@ def create_command(args: argparse.Namespace) -> None: def register_create_parser(parser: argparse.ArgumentParser) -> None: parser.add_argument("machine", type=str) + parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) parser.set_defaults(func=create_command) diff --git a/pkgs/clan-cli/clan_cli/vms/inspect.py b/pkgs/clan-cli/clan_cli/vms/inspect.py index 417dd71..e74382d 100644 --- a/pkgs/clan-cli/clan_cli/vms/inspect.py +++ b/pkgs/clan-cli/clan_cli/vms/inspect.py @@ -6,7 +6,7 @@ from pathlib import Path from pydantic import AnyUrl, BaseModel from ..async_cmd import run -from ..dirs import get_clan_flake_toplevel +from ..dirs import specific_flake_dir from ..nix import nix_config, nix_eval @@ -33,7 +33,7 @@ async def inspect_vm(flake_url: AnyUrl | Path, flake_attr: str) -> VmConfig: def inspect_command(args: argparse.Namespace) -> None: - clan_dir = get_clan_flake_toplevel() + clan_dir = specific_flake_dir(args.flake) res = asyncio.run(inspect_vm(flake_url=clan_dir, flake_attr=args.machine)) print("Cores:", res.cores) print("Memory size:", res.memory_size) @@ -42,4 +42,9 @@ def inspect_command(args: argparse.Namespace) -> None: def register_inspect_parser(parser: argparse.ArgumentParser) -> None: parser.add_argument("machine", type=str) + parser.add_argument( + "flake", + type=str, + help="name of the flake to create machine for", + ) parser.set_defaults(func=inspect_command) diff --git a/pkgs/clan-cli/clan_cli/webui/api_inputs.py b/pkgs/clan-cli/clan_cli/webui/api_inputs.py index 618a6bd..d3a9545 100644 --- a/pkgs/clan-cli/clan_cli/webui/api_inputs.py +++ b/pkgs/clan-cli/clan_cli/webui/api_inputs.py @@ -4,7 +4,7 @@ from typing import Any from pydantic import AnyUrl, BaseModel, validator -from ..dirs import clan_data_dir, clan_flake_dir +from ..dirs import clan_data_dir, clan_flakes_dir from ..flakes.create import DEFAULT_URL log = logging.getLogger(__name__) @@ -38,7 +38,7 @@ class ClanFlakePath(BaseModel): @validator("dest") def check_dest(cls: Any, v: Path) -> Path: # noqa - return validate_path(clan_flake_dir(), v) + return validate_path(clan_flakes_dir(), v) class FlakeCreateInput(ClanFlakePath): diff --git a/pkgs/clan-cli/tests/fixtures_flakes.py b/pkgs/clan-cli/tests/fixtures_flakes.py index 0320270..e488440 100644 --- a/pkgs/clan-cli/tests/fixtures_flakes.py +++ b/pkgs/clan-cli/tests/fixtures_flakes.py @@ -2,7 +2,7 @@ import fileinput import shutil import tempfile from pathlib import Path -from typing import Iterator +from typing import Iterator, NamedTuple import pytest from root import CLAN_CORE @@ -27,22 +27,27 @@ def substitute( print(line, end="") +class TestFlake(NamedTuple): + name: str + path: Path + + def create_flake( monkeypatch: pytest.MonkeyPatch, - name: str, + flake_name: str, clan_core_flake: Path | None = None, machines: list[str] = [], remote: bool = False, -) -> Iterator[Path]: +) -> Iterator[TestFlake]: """ Creates a flake with the given name and machines. The machine names map to the machines in ./test_machines """ - template = Path(__file__).parent / name + template = Path(__file__).parent / flake_name # copy the template to a new temporary location with tempfile.TemporaryDirectory() as tmpdir_: home = Path(tmpdir_) - flake = home / name + flake = home / flake_name shutil.copytree(template, flake) # lookup the requested machines in ./test_machines and include them if machines: @@ -60,20 +65,20 @@ def create_flake( with tempfile.TemporaryDirectory() as workdir: monkeypatch.chdir(workdir) monkeypatch.setenv("HOME", str(home)) - yield flake + yield TestFlake(flake_name, flake) else: monkeypatch.chdir(flake) monkeypatch.setenv("HOME", str(home)) - yield flake + yield TestFlake(flake_name, flake) @pytest.fixture -def test_flake(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]: +def test_flake(monkeypatch: pytest.MonkeyPatch) -> Iterator[TestFlake]: yield from create_flake(monkeypatch, "test_flake") @pytest.fixture -def test_flake_with_core(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]: +def test_flake_with_core(monkeypatch: pytest.MonkeyPatch) -> Iterator[TestFlake]: if not (CLAN_CORE / "flake.nix").exists(): raise Exception( "clan-core flake not found. This test requires the clan-core flake to be present" @@ -82,7 +87,9 @@ def test_flake_with_core(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]: @pytest.fixture -def test_flake_with_core_and_pass(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]: +def test_flake_with_core_and_pass( + monkeypatch: pytest.MonkeyPatch, +) -> Iterator[TestFlake]: if not (CLAN_CORE / "flake.nix").exists(): raise Exception( "clan-core flake not found. This test requires the clan-core flake to be present" diff --git a/pkgs/clan-cli/tests/test_dirs.py b/pkgs/clan-cli/tests/test_dirs.py index c621ff1..5b412e0 100644 --- a/pkgs/clan-cli/tests/test_dirs.py +++ b/pkgs/clan-cli/tests/test_dirs.py @@ -2,7 +2,7 @@ from pathlib import Path import pytest -from clan_cli.dirs import get_clan_flake_toplevel +from clan_cli.dirs import _get_clan_flake_toplevel from clan_cli.errors import ClanError @@ -11,12 +11,12 @@ def test_get_clan_flake_toplevel( ) -> None: monkeypatch.chdir(temporary_dir) with pytest.raises(ClanError): - print(get_clan_flake_toplevel()) + print(_get_clan_flake_toplevel()) (temporary_dir / ".git").touch() - assert get_clan_flake_toplevel() == temporary_dir + assert _get_clan_flake_toplevel() == temporary_dir subdir = temporary_dir / "subdir" subdir.mkdir() monkeypatch.chdir(subdir) (subdir / ".clan-flake").touch() - assert get_clan_flake_toplevel() == subdir + assert _get_clan_flake_toplevel() == subdir diff --git a/pkgs/clan-cli/tests/test_machines_config.py b/pkgs/clan-cli/tests/test_machines_config.py index 4d3a0a7..a7ab422 100644 --- a/pkgs/clan-cli/tests/test_machines_config.py +++ b/pkgs/clan-cli/tests/test_machines_config.py @@ -1,8 +1,8 @@ -from pathlib import Path +from fixtures_flakes import TestFlake from clan_cli.config import machine -def test_schema_for_machine(test_flake: Path) -> None: - schema = machine.schema_for_machine("machine1", test_flake) +def test_schema_for_machine(test_flake: TestFlake) -> None: + schema = machine.schema_for_machine(test_flake.name, "machine1") assert "properties" in schema diff --git a/pkgs/clan-cli/tests/test_secrets_generate.py b/pkgs/clan-cli/tests/test_secrets_generate.py index 4ddf87d..2263231 100644 --- a/pkgs/clan-cli/tests/test_secrets_generate.py +++ b/pkgs/clan-cli/tests/test_secrets_generate.py @@ -1,8 +1,8 @@ -from pathlib import Path from typing import TYPE_CHECKING import pytest from cli import Cli +from fixtures_flakes import TestFlake from clan_cli.machines.facts import machine_get_fact from clan_cli.secrets.folders import sops_secrets_folder @@ -15,21 +15,27 @@ if TYPE_CHECKING: @pytest.mark.impure def test_generate_secret( monkeypatch: pytest.MonkeyPatch, - test_flake_with_core: Path, + test_flake_with_core: TestFlake, age_keys: list["KeyPair"], ) -> None: - monkeypatch.chdir(test_flake_with_core) + monkeypatch.chdir(test_flake_with_core.path) monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey) cli = Cli() cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey]) cli.run(["secrets", "generate", "vm1"]) - has_secret("vm1-age.key") - has_secret("vm1-zerotier-identity-secret") - network_id = machine_get_fact("vm1", "zerotier-network-id") + has_secret(test_flake_with_core.name, "vm1-age.key") + has_secret(test_flake_with_core.name, "vm1-zerotier-identity-secret") + network_id = machine_get_fact( + test_flake_with_core.name, "vm1", "zerotier-network-id" + ) assert len(network_id) == 16 - age_key = sops_secrets_folder().joinpath("vm1-age.key").joinpath("secret") + age_key = ( + sops_secrets_folder(test_flake_with_core.name) + .joinpath("vm1-age.key") + .joinpath("secret") + ) identity_secret = ( - sops_secrets_folder() + sops_secrets_folder(test_flake_with_core.name) .joinpath("vm1-zerotier-identity-secret") .joinpath("secret") ) @@ -42,7 +48,7 @@ def test_generate_secret( assert identity_secret.lstat().st_mtime_ns == secret1_mtime machine_path = ( - sops_secrets_folder() + sops_secrets_folder(test_flake_with_core.name) .joinpath("vm1-zerotier-identity-secret") .joinpath("machines") .joinpath("vm1") diff --git a/pkgs/clan-cli/tests/test_secrets_password_store.py b/pkgs/clan-cli/tests/test_secrets_password_store.py index 3ea459c..999f08a 100644 --- a/pkgs/clan-cli/tests/test_secrets_password_store.py +++ b/pkgs/clan-cli/tests/test_secrets_password_store.py @@ -3,6 +3,7 @@ from pathlib import Path import pytest from cli import Cli +from fixtures_flakes import TestFlake from clan_cli.machines.facts import machine_get_fact from clan_cli.nix import nix_shell @@ -12,11 +13,11 @@ from clan_cli.ssh import HostGroup @pytest.mark.impure def test_upload_secret( monkeypatch: pytest.MonkeyPatch, - test_flake_with_core_and_pass: Path, + test_flake_with_core_and_pass: TestFlake, temporary_dir: Path, host_group: HostGroup, ) -> None: - monkeypatch.chdir(test_flake_with_core_and_pass) + monkeypatch.chdir(test_flake_with_core_and_pass.path) gnupghome = temporary_dir / "gpg" gnupghome.mkdir(mode=0o700) monkeypatch.setenv("GNUPGHOME", str(gnupghome)) @@ -39,7 +40,9 @@ def test_upload_secret( ) subprocess.run(nix_shell(["pass"], ["pass", "init", "test@local"]), check=True) cli.run(["secrets", "generate", "vm1"]) - network_id = machine_get_fact("vm1", "zerotier-network-id") + network_id = machine_get_fact( + test_flake_with_core_and_pass.name, "vm1", "zerotier-network-id" + ) assert len(network_id) == 16 identity_secret = ( temporary_dir / "pass" / "machines" / "vm1" / "zerotier-identity-secret.gpg" @@ -50,13 +53,13 @@ def test_upload_secret( cli.run(["secrets", "generate", "vm1"]) assert identity_secret.lstat().st_mtime_ns == secret1_mtime - flake = test_flake_with_core_and_pass.joinpath("flake.nix") + flake = test_flake_with_core_and_pass.path.joinpath("flake.nix") host = host_group.hosts[0] addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}" new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr) flake.write_text(new_text) cli.run(["secrets", "upload", "vm1"]) zerotier_identity_secret = ( - test_flake_with_core_and_pass / "secrets" / "zerotier-identity-secret" + test_flake_with_core_and_pass.path / "secrets" / "zerotier-identity-secret" ) assert zerotier_identity_secret.exists() diff --git a/pkgs/clan-cli/tests/test_vms_api_create.py b/pkgs/clan-cli/tests/test_vms_api_create.py index ef9a40a..25bedf2 100644 --- a/pkgs/clan-cli/tests/test_vms_api_create.py +++ b/pkgs/clan-cli/tests/test_vms_api_create.py @@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Iterator import pytest from api import TestClient from cli import Cli -from fixtures_flakes import create_flake +from fixtures_flakes import TestFlake, create_flake from httpx import SyncByteStream from root import CLAN_CORE @@ -14,7 +14,7 @@ if TYPE_CHECKING: @pytest.fixture -def flake_with_vm_with_secrets(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]: +def flake_with_vm_with_secrets(monkeypatch: pytest.MonkeyPatch) -> Iterator[TestFlake]: yield from create_flake( monkeypatch, "test_flake_with_core_dynamic_machines", @@ -26,7 +26,7 @@ def flake_with_vm_with_secrets(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path @pytest.fixture def remote_flake_with_vm_without_secrets( monkeypatch: pytest.MonkeyPatch, -) -> Iterator[Path]: +) -> Iterator[TestFlake]: yield from create_flake( monkeypatch, "test_flake_with_core_dynamic_machines",