Working base cli webui

This commit is contained in:
2023-10-23 01:18:58 +02:00
parent e5c0bc7fd4
commit 805efb7ec7
83 changed files with 9 additions and 6373 deletions

View File

@@ -1,31 +0,0 @@
import pytest
class KeyPair:
def __init__(self, pubkey: str, privkey: str) -> None:
self.pubkey = pubkey
self.privkey = privkey
KEYS = [
KeyPair(
"age1dhwqzkah943xzc34tc3dlmfayyevcmdmxzjezdgdy33euxwf59vsp3vk3c",
"AGE-SECRET-KEY-1KF8E3SR3TTGL6M476SKF7EEMR4H9NF7ZWYSLJUAK8JX276JC7KUSSURKFK",
),
KeyPair(
"age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62",
"AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ",
),
KeyPair(
"age1dhuh9xtefhgpr2sjjf7gmp9q2pr37z92rv4wsadxuqdx48989g7qj552qp",
"AGE-SECRET-KEY-169N3FT32VNYQ9WYJMLUSVTMA0TTZGVJF7YZWS8AHTWJ5RR9VGR7QCD8SKF",
),
]
@pytest.fixture
def age_keys() -> list[KeyPair]:
"""
Root directory of the tests
"""
return KEYS

View File

@@ -13,12 +13,8 @@ pytest_plugins = [
"api",
"temporary_dir",
"root",
"age_keys",
"sshd",
"command",
"ports",
"host_group",
"fixtures_flakes",
]

View File

@@ -1,23 +0,0 @@
secret-key: ENC[AES256_GCM,data:gjX4OmCUdd3TlA4p,iv:3yZVpyd6FqkITQY0nU2M1iubmzvkR6PfkK2m/s6nQh8=,tag:Abgp9xkiFFylZIyAlap6Ew==,type:str]
nested:
secret-key: ENC[AES256_GCM,data:iUMgDhhIjwvd7wL4,iv:jiJIrh12dSu/sXX+z9ITVoEMNDMjwIlFBnyv40oN4LE=,tag:G9VmAa66Km1sc7JEhW5AvA==,type:str]
sops:
kms: []
gcp_kms: []
azure_kv: []
hc_vault: []
age:
- recipient: age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62
enc: |
-----BEGIN AGE ENCRYPTED FILE-----
YWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSA0eWdRVjlydXlXOVZFQ3lO
bzU1eG9Iam5Ka29Sdlo0cHJ4b1R6bjdNSzBjCkgwRndCbWZQWHlDU0x1cWRmaGVt
N29lbjR6UjN0L2RhaXEzSG9zQmRsZGsKLS0tIEdsdWgxSmZwU3BWUDVxVWRSSC9M
eVZ6bjgwZnR2TTM5MkRYZWNFSFplQWsKmSzv12/dftL9jx2y35UZUGVK6xWdatE8
BGJiCvMlp0BQNrh2s/+YaEaBa48w8LL79U/XJnEZ+ZUwxmlbSTn6Hg==
-----END AGE ENCRYPTED FILE-----
lastmodified: "2023-08-08T14:27:20Z"
mac: ENC[AES256_GCM,data:iRWWX+L5Q5nKn3fBCLaWoz/mvqGnNnRd93gJmYXDZbRjFoHa9IFJZst5QDIDa1ZRYUe6G0/+lV5SBi+vwRm1pHysJ3c0ZWYjBP+e1jw3jLXxLV5gACsDC8by+6rFUCho0Xgu+Nqu2ehhNenjQQnCvDH5ivWbW70KFT5ynNgR9Tw=,iv:RYnnbLMC/hNfMwWPreMq9uvY0khajwQTZENO/P34ckY=,tag:Xi1PS5vM1c+sRkroHkPn1Q==,type:str]
pgp: []
unencrypted_suffix: _unencrypted
version: 3.7.3

View File

@@ -1,7 +0,0 @@
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
QyNTUxOQAAACDonlRWMYxHTtnOeeiurKA1j26EfVZWeozuqSrtCYScFwAAAJje9J1V3vSd
VQAAAAtzc2gtZWQyNTUxOQAAACDonlRWMYxHTtnOeeiurKA1j26EfVZWeozuqSrtCYScFw
AAAEBxDpEXwhlJB/f6ZJOT9BbSqXeLy9S6qeuc25hXu5kpbuieVFYxjEdO2c556K6soDWP
boR9VlZ6jO6pKu0JhJwXAAAAE2pvZXJnQHR1cmluZ21hY2hpbmUBAg==
-----END OPENSSH PRIVATE KEY-----

View File

@@ -1 +0,0 @@
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIOieVFYxjEdO2c556K6soDWPboR9VlZ6jO6pKu0JhJwX joerg@turingmachine

View File

@@ -1,7 +0,0 @@
HostKey $host_key
LogLevel DEBUG3
# In the nix build sandbox we don't get any meaningful PATH after login
MaxStartups 64:30:256
AuthorizedKeysFile $host_key.pub
AcceptEnv REALPATH
PasswordAuthentication no

View File

@@ -1,115 +0,0 @@
import fileinput
import logging
import shutil
import tempfile
from pathlib import Path
from typing import Iterator, NamedTuple
import pytest
from root import CLAN_CORE
from clan_cli.dirs import nixpkgs_source
from clan_cli.types import FlakeName
log = logging.getLogger(__name__)
# substitutes string sin a file.
# This can be used on the flake.nix or default.nix of a machine
def substitute(
file: Path,
clan_core_flake: Path | None = None,
flake: Path = Path(__file__).parent,
) -> None:
sops_key = str(flake.joinpath("sops.key"))
for line in fileinput.input(file, inplace=True):
line = line.replace("__NIXPKGS__", str(nixpkgs_source()))
if clan_core_flake:
line = line.replace("__CLAN_CORE__", str(clan_core_flake))
line = line.replace("__CLAN_SOPS_KEY_PATH__", sops_key)
line = line.replace("__CLAN_SOPS_KEY_DIR__", str(flake))
print(line, end="")
class FlakeForTest(NamedTuple):
name: FlakeName
path: Path
def create_flake(
monkeypatch: pytest.MonkeyPatch,
temporary_dir: Path,
flake_name: FlakeName,
clan_core_flake: Path | None = None,
machines: list[str] = [],
remote: bool = False,
) -> Iterator[FlakeForTest]:
"""
Creates a flake with the given name and machines.
The machine names map to the machines in ./test_machines
"""
template = Path(__file__).parent / flake_name
# copy the template to a new temporary location
home = Path(temporary_dir)
flake = home / ".local/state/clan/flake" / flake_name
shutil.copytree(template, flake)
# lookup the requested machines in ./test_machines and include them
if machines:
(flake / "machines").mkdir(parents=True, exist_ok=True)
for machine_name in machines:
machine_path = Path(__file__).parent / "machines" / machine_name
shutil.copytree(machine_path, flake / "machines" / machine_name)
substitute(flake / "machines" / machine_name / "default.nix", flake)
# in the flake.nix file replace the string __CLAN_URL__ with the the clan flake
# provided by get_test_flake_toplevel
flake_nix = flake / "flake.nix"
# this is where we would install the sops key to, when updating
substitute(flake_nix, clan_core_flake, flake)
if remote:
with tempfile.TemporaryDirectory() as workdir:
monkeypatch.chdir(workdir)
monkeypatch.setenv("HOME", str(home))
yield FlakeForTest(flake_name, flake)
else:
monkeypatch.chdir(flake)
monkeypatch.setenv("HOME", str(home))
yield FlakeForTest(flake_name, flake)
@pytest.fixture
def test_flake(
monkeypatch: pytest.MonkeyPatch, temporary_home: Path
) -> Iterator[FlakeForTest]:
yield from create_flake(monkeypatch, temporary_home, FlakeName("test_flake"))
@pytest.fixture
def test_flake_with_core(
monkeypatch: pytest.MonkeyPatch, temporary_dir: Path
) -> Iterator[FlakeForTest]:
if not (CLAN_CORE / "flake.nix").exists():
raise Exception(
"clan-core flake not found. This test requires the clan-core flake to be present"
)
yield from create_flake(
monkeypatch, temporary_dir, FlakeName("test_flake_with_core"), CLAN_CORE
)
@pytest.fixture
def test_flake_with_core_and_pass(
monkeypatch: pytest.MonkeyPatch,
temporary_dir: Path,
) -> Iterator[FlakeForTest]:
if not (CLAN_CORE / "flake.nix").exists():
raise Exception(
"clan-core flake not found. This test requires the clan-core flake to be present"
)
yield from create_flake(
monkeypatch,
temporary_dir,
FlakeName("test_flake_with_core_and_pass"),
CLAN_CORE,
)

View File

@@ -1,23 +0,0 @@
import os
import pwd
import pytest
from sshd import Sshd
from clan_cli.ssh import Host, HostGroup, HostKeyCheck
@pytest.fixture
def host_group(sshd: Sshd) -> HostGroup:
login = pwd.getpwuid(os.getuid()).pw_name
return HostGroup(
[
Host(
"127.0.0.1",
port=sshd.port,
user=login,
key=sshd.key,
host_key_check=HostKeyCheck.NONE,
)
]
)

View File

@@ -1,20 +0,0 @@
{ lib, ... }: {
clan.networking.deploymentAddress = "__CLAN_DEPLOYMENT_ADDRESS__";
system.stateVersion = lib.version;
sops.age.keyFile = "__CLAN_SOPS_KEY_PATH__";
clanCore.secretsUploadDirectory = "__CLAN_SOPS_KEY_DIR__";
clan.virtualisation.graphics = false;
clan.networking.zerotier.controller.enable = true;
networking.useDHCP = false;
systemd.services.shutdown-after-boot = {
enable = true;
wantedBy = [ "multi-user.target" ];
after = [ "multi-user.target" ];
script = ''
#!/usr/bin/env bash
shutdown -h now
'';
};
}

View File

@@ -1,20 +0,0 @@
{ lib, ... }: {
clan.networking.deploymentAddress = "__CLAN_DEPLOYMENT_ADDRESS__";
system.stateVersion = lib.version;
sops.age.keyFile = "__CLAN_SOPS_KEY_PATH__";
clanCore.secretsUploadDirectory = "__CLAN_SOPS_KEY_DIR__";
clan.virtualisation.graphics = false;
clan.networking.zerotier.controller.enable = true;
networking.useDHCP = false;
systemd.services.shutdown-after-boot = {
enable = true;
wantedBy = [ "multi-user.target" ];
after = [ "multi-user.target" ];
script = ''
#!/usr/bin/env bash
shutdown -h now
'';
};
}

View File

@@ -1,17 +0,0 @@
{ lib, ... }: {
clan.networking.deploymentAddress = "__CLAN_DEPLOYMENT_ADDRESS__";
system.stateVersion = lib.version;
clan.virtualisation.graphics = false;
networking.useDHCP = false;
systemd.services.shutdown-after-boot = {
enable = true;
wantedBy = [ "multi-user.target" ];
after = [ "multi-user.target" ];
script = ''
#!/usr/bin/env bash
shutdown -h now
'';
};
}

View File

@@ -1,136 +0,0 @@
import os
import shutil
import string
import subprocess
import time
from pathlib import Path
from sys import platform
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Iterator
import pytest
if TYPE_CHECKING:
from command import Command
from ports import PortFunction
class Sshd:
def __init__(self, port: int, proc: subprocess.Popen[str], key: str) -> None:
self.port = port
self.proc = proc
self.key = key
class SshdConfig:
def __init__(
self, path: Path, login_shell: Path, key: str, preload_lib: Path
) -> None:
self.path = path
self.login_shell = login_shell
self.key = key
self.preload_lib = preload_lib
@pytest.fixture(scope="session")
def sshd_config(test_root: Path) -> Iterator[SshdConfig]:
# FIXME, if any parent of the sshd directory is world-writable than sshd will refuse it.
# we use .direnv instead since it's already in .gitignore
with TemporaryDirectory() as _dir:
dir = Path(_dir)
host_key = test_root / "data" / "ssh_host_ed25519_key"
host_key.chmod(0o600)
template = (test_root / "data" / "sshd_config").read_text()
content = string.Template(template).substitute(dict(host_key=host_key))
config = dir / "sshd_config"
config.write_text(content)
login_shell = dir / "shell"
bash = shutil.which("bash")
path = os.environ["PATH"]
assert bash is not None
login_shell.write_text(
f"""#!{bash}
if [[ -f /etc/profile ]]; then
source /etc/profile
fi
if [[ -n "$REALPATH" ]]; then
export PATH="$REALPATH:${path}"
else
export PATH="${path}"
fi
exec {bash} -l "${{@}}"
"""
)
login_shell.chmod(0o755)
lib_path = None
assert (
platform == "linux"
), "we do not support the ld_preload trick on non-linux just now"
# This enforces a login shell by overriding the login shell of `getpwnam(3)`
lib_path = dir / "libgetpwnam-preload.so"
subprocess.run(
[
os.environ.get("CC", "cc"),
"-shared",
"-o",
lib_path,
str(test_root / "getpwnam-preload.c"),
],
check=True,
)
yield SshdConfig(config, login_shell, str(host_key), lib_path)
@pytest.fixture
def sshd(
sshd_config: SshdConfig,
command: "Command",
unused_tcp_port: "PortFunction",
monkeypatch: pytest.MonkeyPatch,
) -> Iterator[Sshd]:
import subprocess
port = unused_tcp_port()
sshd = shutil.which("sshd")
assert sshd is not None, "no sshd binary found"
env = {}
env = dict(
LD_PRELOAD=str(sshd_config.preload_lib),
LOGIN_SHELL=str(sshd_config.login_shell),
)
proc = command.run(
[sshd, "-f", str(sshd_config.path), "-D", "-p", str(port)], extra_env=env
)
monkeypatch.delenv("SSH_AUTH_SOCK", raising=False)
while True:
print(sshd_config.path)
if (
subprocess.run(
[
"ssh",
"-o",
"StrictHostKeyChecking=no",
"-o",
"UserKnownHostsFile=/dev/null",
"-i",
sshd_config.key,
"localhost",
"-p",
str(port),
"true",
],
).returncode
== 0
):
yield Sshd(port, proc, sshd_config.key)
return
else:
rc = proc.poll()
if rc is not None:
raise Exception(f"sshd processes was terminated with {rc}")
time.sleep(0.1)

View File

@@ -1,10 +0,0 @@
import pytest
from cli import Cli
def test_help(capsys: pytest.CaptureFixture) -> None:
cli = Cli()
with pytest.raises(SystemExit):
cli.run(["--help"])
captured = capsys.readouterr()
assert captured.out.startswith("usage:")

View File

@@ -1,229 +0,0 @@
import json
import tempfile
from pathlib import Path
from typing import Any, Optional
import pytest
from cli import Cli
from clan_cli import config
from clan_cli.config import parsing
from clan_cli.errors import ClanError
from fixtures_flakes import FlakeForTest
example_options = f"{Path(config.__file__).parent}/jsonschema/options.json"
# use pytest.parametrize
@pytest.mark.parametrize(
"args,expected",
[
(["name", "DavHau"], {"name": "DavHau"}),
(
["kernelModules", "foo", "bar", "baz"],
{"kernelModules": ["foo", "bar", "baz"]},
),
(["services.opt", "test"], {"services": {"opt": "test"}}),
(["userIds.DavHau", "42"], {"userIds": {"DavHau": 42}}),
],
)
def test_set_some_option(
args: list[str],
expected: dict[str, Any],
test_flake: FlakeForTest,
) -> None:
# create temporary file for out_file
with tempfile.NamedTemporaryFile() as out_file:
with open(out_file.name, "w") as f:
json.dump({}, f)
cli = Cli()
cli.run(
[
"config",
"--quiet",
"--options-file",
example_options,
"--settings-file",
out_file.name,
]
+ args
+ [test_flake.name]
)
json_out = json.loads(open(out_file.name).read())
assert json_out == expected
def test_configure_machine(
test_flake: FlakeForTest,
temporary_home: Path,
capsys: pytest.CaptureFixture,
monkeypatch: pytest.MonkeyPatch,
) -> None:
cli = Cli()
cli.run(["config", "-m", "machine1", "clan.jitsi.enable", "true", test_flake.name])
# clear the output buffer
capsys.readouterr()
# read a option value
cli.run(["config", "-m", "machine1", "clan.jitsi.enable", test_flake.name])
# read the output
assert capsys.readouterr().out == "true\n"
def test_walk_jsonschema_all_types() -> None:
schema = dict(
type="object",
properties=dict(
array=dict(
type="array",
items=dict(
type="string",
),
),
boolean=dict(type="boolean"),
integer=dict(type="integer"),
number=dict(type="number"),
string=dict(type="string"),
),
)
expected = {
"array": list[str],
"boolean": bool,
"integer": int,
"number": float,
"string": str,
}
assert config.parsing.options_types_from_schema(schema) == expected
def test_walk_jsonschema_nested() -> None:
schema = dict(
type="object",
properties=dict(
name=dict(
type="object",
properties=dict(
first=dict(type="string"),
last=dict(type="string"),
),
),
age=dict(type="integer"),
),
)
expected = {
"age": int,
"name.first": str,
"name.last": str,
}
assert config.parsing.options_types_from_schema(schema) == expected
# test walk_jsonschema with dynamic attributes (e.g. "additionalProperties")
def test_walk_jsonschema_dynamic_attrs() -> None:
schema = dict(
type="object",
properties=dict(
age=dict(type="integer"),
users=dict(
type="object",
additionalProperties=dict(type="string"),
),
),
)
expected = {
"age": int,
"users.<name>": str, # <name> is a placeholder for any string
}
assert config.parsing.options_types_from_schema(schema) == expected
def test_type_from_schema_path_simple() -> None:
schema = dict(
type="boolean",
)
assert parsing.type_from_schema_path(schema, []) == bool
def test_type_from_schema_path_nested() -> None:
schema = dict(
type="object",
properties=dict(
name=dict(
type="object",
properties=dict(
first=dict(type="string"),
last=dict(type="string"),
),
),
age=dict(type="integer"),
),
)
assert parsing.type_from_schema_path(schema, ["age"]) == int
assert parsing.type_from_schema_path(schema, ["name", "first"]) == str
def test_type_from_schema_path_dynamic_attrs() -> None:
schema = dict(
type="object",
properties=dict(
age=dict(type="integer"),
users=dict(
type="object",
additionalProperties=dict(type="string"),
),
),
)
assert parsing.type_from_schema_path(schema, ["age"]) == int
assert parsing.type_from_schema_path(schema, ["users", "foo"]) == str
def test_map_type() -> None:
with pytest.raises(ClanError):
config.map_type("foo")
assert config.map_type("string") == str
assert config.map_type("integer") == int
assert config.map_type("boolean") == bool
assert config.map_type("attribute set of string") == dict[str, str]
assert config.map_type("attribute set of integer") == dict[str, int]
assert config.map_type("null or string") == Optional[str]
# test the cast function with simple types
def test_cast() -> None:
assert config.cast(value=["true"], type=bool, opt_description="foo-option") is True
assert (
config.cast(value=["null"], type=Optional[str], opt_description="foo-option")
is None
)
assert (
config.cast(value=["bar"], type=Optional[str], opt_description="foo-option")
== "bar"
)
@pytest.mark.parametrize(
"option,value,options,expected",
[
("foo.bar", ["baz"], {"foo.bar": {"type": "str"}}, ("foo.bar", ["baz"])),
("foo.bar", ["baz"], {"foo": {"type": "attrs"}}, ("foo", {"bar": ["baz"]})),
(
"users.users.my-user.name",
["my-name"],
{"users.users.<name>.name": {"type": "str"}},
("users.users.<name>.name", ["my-name"]),
),
(
"foo.bar.baz.bum",
["val"],
{"foo.<name>.baz": {"type": "attrs"}},
("foo.<name>.baz", {"bum": ["val"]}),
),
(
"userIds.DavHau",
["42"],
{"userIds": {"type": "attrs"}},
("userIds", {"DavHau": ["42"]}),
),
],
)
def test_find_option(option: str, value: list, options: dict, expected: tuple) -> None:
assert config.find_option(option, value, options) == expected

View File

@@ -1,83 +0,0 @@
import json
import subprocess
from pathlib import Path
import pytest
from api import TestClient
from cli import Cli
from clan_cli.dirs import clan_flakes_dir
from clan_cli.flakes.create import DEFAULT_URL
@pytest.fixture
def cli() -> Cli:
return Cli()
@pytest.mark.impure
def test_create_flake_api(
monkeypatch: pytest.MonkeyPatch, api: TestClient, temporary_home: Path
) -> None:
monkeypatch.chdir(clan_flakes_dir())
flake_name = "flake_dir"
flake_dir = clan_flakes_dir() / flake_name
response = api.post(
"/api/flake/create",
json=dict(
dest=str(flake_dir),
url=str(DEFAULT_URL),
),
)
assert response.status_code == 201, f"Failed to create flake {response.text}"
assert (flake_dir / ".clan-flake").exists()
assert (flake_dir / "flake.nix").exists()
@pytest.mark.impure
def test_create_flake(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture,
temporary_home: Path,
cli: Cli,
) -> None:
monkeypatch.chdir(clan_flakes_dir())
flake_name = "flake_dir"
flake_dir = clan_flakes_dir() / flake_name
cli.run(["flakes", "create", flake_name])
assert (flake_dir / ".clan-flake").exists()
monkeypatch.chdir(flake_dir)
cli.run(["machines", "create", "machine1", flake_name])
capsys.readouterr() # flush cache
cli.run(["machines", "list", flake_name])
assert "machine1" in capsys.readouterr().out
flake_show = subprocess.run(
["nix", "flake", "show", "--json"],
check=True,
capture_output=True,
text=True,
)
flake_outputs = json.loads(flake_show.stdout)
try:
flake_outputs["nixosConfigurations"]["machine1"]
except KeyError:
pytest.fail("nixosConfigurations.machine1 not found in flake outputs")
# configure machine1
capsys.readouterr()
cli.run(
["config", "--machine", "machine1", "services.openssh.enable", "", flake_name]
)
capsys.readouterr()
cli.run(
[
"config",
"--machine",
"machine1",
"services.openssh.enable",
"true",
flake_name,
]
)

View File

@@ -1,22 +0,0 @@
from pathlib import Path
import pytest
from clan_cli.dirs import _get_clan_flake_toplevel
from clan_cli.errors import ClanError
def test_get_clan_flake_toplevel(
monkeypatch: pytest.MonkeyPatch, temporary_dir: Path
) -> None:
monkeypatch.chdir(temporary_dir)
with pytest.raises(ClanError):
print(_get_clan_flake_toplevel())
(temporary_dir / ".git").touch()
assert _get_clan_flake_toplevel() == temporary_dir
subdir = temporary_dir / "subdir"
subdir.mkdir()
monkeypatch.chdir(subdir)
(subdir / ".clan-flake").touch()
assert _get_clan_flake_toplevel() == subdir

View File

@@ -1,33 +0,0 @@
{
# this placeholder is replaced by the path to nixpkgs
inputs.nixpkgs.url = "__NIXPKGS__";
outputs = inputs: {
nixosConfigurations.machine1 = inputs.nixpkgs.lib.nixosSystem {
modules = [
./nixosModules/machine1.nix
(if builtins.pathExists ./machines/machine1/settings.json
then builtins.fromJSON (builtins.readFile ./machines/machine1/settings.json)
else { })
({ lib, options, pkgs, ... }: {
config = {
nixpkgs.hostPlatform = "x86_64-linux";
# speed up by not instantiating nixpkgs twice and disable documentation
nixpkgs.pkgs = inputs.nixpkgs.legacyPackages.x86_64-linux;
documentation.enable = false;
};
options.clanCore.optionsNix = lib.mkOption {
type = lib.types.raw;
internal = true;
readOnly = true;
default = (pkgs.nixosOptionsDoc { inherit options; }).optionsNix;
defaultText = "optionsNix";
description = ''
This is to export nixos options used for `clan config`
'';
};
})
];
};
};
}

View File

@@ -1,7 +0,0 @@
{ lib, ... }: {
options.clan.jitsi.enable = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Enable jitsi on this machine";
};
}

View File

@@ -1,51 +0,0 @@
import json
from pathlib import Path
import pytest
from api import TestClient
@pytest.mark.impure
def test_inspect_ok(api: TestClient, test_flake_with_core: Path) -> None:
params = {"url": str(test_flake_with_core)}
response = api.get(
"/api/flake/attrs",
params=params,
)
assert response.status_code == 200, "Failed to inspect vm"
data = response.json()
print("Data: ", data)
assert data.get("flake_attrs") == ["vm1"]
@pytest.mark.impure
def test_inspect_err(api: TestClient) -> None:
params = {"url": "flake-parts"}
response = api.get(
"/api/flake/attrs",
params=params,
)
assert response.status_code != 200, "Succeed to inspect vm but expected to fail"
data = response.json()
print("Data: ", data)
assert data.get("detail")
@pytest.mark.impure
def test_inspect_flake(api: TestClient, test_flake_with_core: Path) -> None:
params = {"url": str(test_flake_with_core)}
response = api.get(
"/api/flake",
params=params,
)
assert response.status_code == 200, "Failed to inspect vm"
data = response.json()
print("Data: ", json.dumps(data, indent=2))
assert data.get("content") is not None
actions = data.get("actions")
assert actions is not None
assert len(actions) == 2
assert actions[0].get("id") == "vms/inspect"
assert actions[0].get("uri") == "api/vms/inspect"
assert actions[1].get("id") == "vms/create"
assert actions[1].get("uri") == "api/vms/create"

View File

@@ -1,40 +0,0 @@
{
# Use this path to our repo root e.g. for UI test
# inputs.clan-core.url = "../../../../.";
# this placeholder is replaced by the path to nixpkgs
inputs.clan-core.url = "__CLAN_CORE__";
outputs = { self, clan-core }:
let
clan = clan-core.lib.buildClan {
directory = self;
clanName = "test_with_core_clan";
machines = {
vm1 = { lib, ... }: {
clan.networking.deploymentAddress = "__CLAN_DEPLOYMENT_ADDRESS__";
system.stateVersion = lib.version;
sops.age.keyFile = "__CLAN_SOPS_KEY_PATH__";
clanCore.secretsUploadDirectory = "__CLAN_SOPS_KEY_DIR__";
clan.virtualisation.graphics = false;
clan.networking.zerotier.controller.enable = true;
networking.useDHCP = false;
systemd.services.shutdown-after-boot = {
enable = true;
wantedBy = [ "multi-user.target" ];
after = [ "multi-user.target" ];
script = ''
#!/usr/bin/env bash
shutdown -h now
'';
};
};
};
};
in
{
inherit (clan) nixosConfigurations clanInternals;
};
}

View File

@@ -1,38 +0,0 @@
{
# Use this path to our repo root e.g. for UI test
# inputs.clan-core.url = "../../../../.";
# this placeholder is replaced by the path to clan-core
inputs.clan-core.url = "__CLAN_CORE__";
outputs = { self, clan-core }:
let
clan = clan-core.lib.buildClan {
directory = self;
clanName = "test_with_core_and_pass_clan";
machines = {
vm1 = { lib, ... }: {
clan.networking.deploymentAddress = "__CLAN_DEPLOYMENT_ADDRESS__";
system.stateVersion = lib.version;
clanCore.secretStore = "password-store";
clanCore.secretsUploadDirectory = lib.mkForce "__CLAN_SOPS_KEY_DIR__/secrets";
clan.networking.zerotier.controller.enable = true;
systemd.services.shutdown-after-boot = {
enable = true;
wantedBy = [ "multi-user.target" ];
after = [ "multi-user.target" ];
script = ''
#!/usr/bin/env bash
shutdown -h now
'';
};
};
};
};
in
{
inherit (clan) nixosConfigurations clanInternals;
};
}

View File

@@ -1,25 +0,0 @@
{
# Use this path to our repo root e.g. for UI test
# inputs.clan-core.url = "../../../../.";
# this placeholder is replaced by the path to nixpkgs
inputs.clan-core.url = "__CLAN_CORE__";
outputs = { self, clan-core }:
let
clan = clan-core.lib.buildClan {
directory = self;
clanName = "core_dynamic_machine_clan";
machines =
let
machineModules = builtins.readDir (self + "/machines");
in
builtins.mapAttrs
(name: _type: import (self + "/machines/${name}"))
machineModules;
};
in
{
inherit (clan) nixosConfigurations clanInternals;
};
}

View File

@@ -1,69 +0,0 @@
import subprocess
import tempfile
from pathlib import Path
import pytest
from clan_cli import git
from clan_cli.errors import ClanError
def test_commit_file(git_repo: Path) -> None:
# create a file in the git repo
(git_repo / "test.txt").touch()
# commit the file
git.commit_file((git_repo / "test.txt"), git_repo, "test commit")
# check that the repo directory does in fact contain the file
assert (git_repo / "test.txt").exists()
# check that the working tree is clean
assert not subprocess.check_output(["git", "status", "--porcelain"], cwd=git_repo)
# check that the latest commit message is correct
assert (
subprocess.check_output(
["git", "log", "-1", "--pretty=%B"], cwd=git_repo
).decode("utf-8")
== "test commit\n\n"
)
def test_commit_file_outside_git_raises_error(git_repo: Path) -> None:
# create a file outside the git (a temporary file)
with tempfile.NamedTemporaryFile() as tmp:
# commit the file
with pytest.raises(ClanError):
git.commit_file(Path(tmp.name), git_repo, "test commit")
# this should not fail but skip the commit
git.commit_file(Path(tmp.name), git_repo, "test commit")
def test_commit_file_not_existing_raises_error(git_repo: Path) -> None:
# commit a file that does not exist
with pytest.raises(ClanError):
git.commit_file(Path("test.txt"), git_repo, "test commit")
def test_clan_flake_in_subdir(git_repo: Path, monkeypatch: pytest.MonkeyPatch) -> None:
# create a clan_flake subdirectory
(git_repo / "clan_flake").mkdir()
# create a .clan-flake file
(git_repo / "clan_flake" / ".clan-flake").touch()
# change to the clan_flake subdirectory
monkeypatch.chdir(git_repo / "clan_flake")
# commit files to git
subprocess.run(["git", "add", "."], cwd=git_repo)
subprocess.run(["git", "commit", "-m", "init"], cwd=git_repo)
# add a new file under ./clan_flake
(git_repo / "clan_flake" / "test.txt").touch()
# commit the file
git.commit_file(git_repo / "clan_flake" / "test.txt", git_repo, "test commit")
# check that the repo directory does in fact contain the file
assert (git_repo / "clan_flake" / "test.txt").exists()
# check that the working tree is clean
assert not subprocess.check_output(["git", "status", "--porcelain"], cwd=git_repo)
# check that the latest commit message is correct
assert (
subprocess.check_output(
["git", "log", "-1", "--pretty=%B"], cwd=git_repo
).decode("utf-8")
== "test commit\n\n"
)

View File

@@ -1,47 +0,0 @@
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from cli import Cli
if TYPE_CHECKING:
from age_keys import KeyPair
def test_import_sops(
test_root: Path,
test_flake: Path,
capsys: pytest.CaptureFixture,
monkeypatch: pytest.MonkeyPatch,
age_keys: list["KeyPair"],
) -> None:
cli = Cli()
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[1].privkey)
cli.run(["secrets", "machines", "add", "machine1", age_keys[0].pubkey])
cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey])
cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey])
cli.run(["secrets", "groups", "add-user", "group1", "user1"])
cli.run(["secrets", "groups", "add-user", "group1", "user2"])
# To edit:
# SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml
cli.run(
[
"secrets",
"import-sops",
"--group",
"group1",
"--machine",
"machine1",
str(test_root.joinpath("data", "secrets.yaml")),
]
)
capsys.readouterr()
cli.run(["secrets", "users", "list"])
users = sorted(capsys.readouterr().out.rstrip().split())
assert users == ["user1", "user2"]
capsys.readouterr()
cli.run(["secrets", "get", "secret-key"])
assert capsys.readouterr().out == "secret-value"

View File

@@ -1,63 +0,0 @@
from pathlib import Path
from api import TestClient
def test_machines(api: TestClient, test_flake: Path) -> None:
response = api.get("/api/machines")
assert response.status_code == 200
assert response.json() == {"machines": []}
response = api.post("/api/machines", json={"name": "test"})
assert response.status_code == 201
assert response.json() == {"machine": {"name": "test", "status": "unknown"}}
response = api.get("/api/machines/test")
assert response.status_code == 200
assert response.json() == {"machine": {"name": "test", "status": "unknown"}}
response = api.get("/api/machines")
assert response.status_code == 200
assert response.json() == {"machines": [{"name": "test", "status": "unknown"}]}
def test_configure_machine(api: TestClient, test_flake: Path) -> None:
# ensure error 404 if machine does not exist when accessing the config
response = api.get("/api/machines/machine1/config")
assert response.status_code == 404
# ensure error 404 if machine does not exist when writing to the config
response = api.put("/api/machines/machine1/config", json={})
assert response.status_code == 404
# create the machine
response = api.post("/api/machines", json={"name": "machine1"})
assert response.status_code == 201
# ensure an empty config is returned by default for a new machine
response = api.get("/api/machines/machine1/config")
assert response.status_code == 200
assert response.json() == {"config": {}}
# get jsonschema for machine
response = api.get("/api/machines/machine1/schema")
assert response.status_code == 200
json_response = response.json()
assert "schema" in json_response and "properties" in json_response["schema"]
# set some config
response = api.put(
"/api/machines/machine1/config",
json=dict(
clan=dict(
jitsi=True,
)
),
)
assert response.status_code == 200
assert response.json() == {"config": {"clan": {"jitsi": True}}}
# get the config again
response = api.get("/api/machines/machine1/config")
assert response.status_code == 200
assert response.json() == {"config": {"clan": {"jitsi": True}}}

View File

@@ -1,21 +0,0 @@
from pathlib import Path
import pytest
from cli import Cli
def test_machine_subcommands(test_flake: Path, capsys: pytest.CaptureFixture) -> None:
cli = Cli()
cli.run(["machines", "create", "machine1"])
capsys.readouterr()
cli.run(["machines", "list"])
out = capsys.readouterr()
assert "machine1\n" == out.out
cli.run(["machines", "remove", "machine1"])
capsys.readouterr()
cli.run(["machines", "list"])
out = capsys.readouterr()
assert "" == out.out

View File

@@ -1,8 +0,0 @@
from fixtures_flakes import FlakeForTest
from clan_cli.config import machine
def test_schema_for_machine(test_flake: FlakeForTest) -> None:
schema = machine.schema_for_machine(test_flake.name, "machine1")
assert "properties" in schema

View File

@@ -1,240 +0,0 @@
import logging
import os
from contextlib import contextmanager
from typing import TYPE_CHECKING, Iterator
import pytest
from cli import Cli
from fixtures_flakes import FlakeForTest
from clan_cli.errors import ClanError
if TYPE_CHECKING:
from age_keys import KeyPair
log = logging.getLogger(__name__)
def _test_identities(
what: str,
test_flake: FlakeForTest,
capsys: pytest.CaptureFixture,
age_keys: list["KeyPair"],
) -> None:
cli = Cli()
sops_folder = test_flake.path / "sops"
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey, test_flake.name])
assert (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError):
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey, test_flake.name])
cli.run(
[
"secrets",
what,
"add",
"-f",
"foo",
age_keys[0].privkey,
test_flake.name,
]
)
capsys.readouterr() # empty the buffer
cli.run(["secrets", what, "get", "foo", test_flake.name])
out = capsys.readouterr() # empty the buffer
assert age_keys[0].pubkey in out.out
capsys.readouterr() # empty the buffer
cli.run(["secrets", what, "list", test_flake.name])
out = capsys.readouterr() # empty the buffer
assert "foo" in out.out
cli.run(["secrets", what, "remove", "foo", test_flake.name])
assert not (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError): # already removed
cli.run(["secrets", what, "remove", "foo", test_flake.name])
capsys.readouterr()
cli.run(["secrets", what, "list", test_flake.name])
out = capsys.readouterr()
assert "foo" not in out.out
def test_users(
test_flake: FlakeForTest, capsys: pytest.CaptureFixture, age_keys: list["KeyPair"]
) -> None:
_test_identities("users", test_flake, capsys, age_keys)
def test_machines(
test_flake: FlakeForTest, capsys: pytest.CaptureFixture, age_keys: list["KeyPair"]
) -> None:
_test_identities("machines", test_flake, capsys, age_keys)
def test_groups(
test_flake: FlakeForTest, capsys: pytest.CaptureFixture, age_keys: list["KeyPair"]
) -> None:
cli = Cli()
capsys.readouterr() # empty the buffer
cli.run(["secrets", "groups", "list", test_flake.name])
assert capsys.readouterr().out == ""
with pytest.raises(ClanError): # machine does not exist yet
cli.run(
["secrets", "groups", "add-machine", "group1", "machine1", test_flake.name]
)
with pytest.raises(ClanError): # user does not exist yet
cli.run(["secrets", "groups", "add-user", "groupb1", "user1", test_flake.name])
cli.run(
["secrets", "machines", "add", "machine1", age_keys[0].pubkey, test_flake.name]
)
cli.run(["secrets", "groups", "add-machine", "group1", "machine1", test_flake.name])
# Should this fail?
cli.run(["secrets", "groups", "add-machine", "group1", "machine1", test_flake.name])
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey, test_flake.name])
cli.run(["secrets", "groups", "add-user", "group1", "user1", test_flake.name])
capsys.readouterr() # empty the buffer
cli.run(["secrets", "groups", "list", test_flake.name])
out = capsys.readouterr().out
assert "user1" in out
assert "machine1" in out
cli.run(["secrets", "groups", "remove-user", "group1", "user1", test_flake.name])
cli.run(
["secrets", "groups", "remove-machine", "group1", "machine1", test_flake.name]
)
groups = os.listdir(test_flake.path / "sops" / "groups")
assert len(groups) == 0
@contextmanager
def use_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]:
old_key = os.environ["SOPS_AGE_KEY_FILE"]
monkeypatch.delenv("SOPS_AGE_KEY_FILE")
monkeypatch.setenv("SOPS_AGE_KEY", key)
try:
yield
finally:
monkeypatch.delenv("SOPS_AGE_KEY")
monkeypatch.setenv("SOPS_AGE_KEY_FILE", old_key)
def test_secrets(
test_flake: FlakeForTest,
capsys: pytest.CaptureFixture,
monkeypatch: pytest.MonkeyPatch,
age_keys: list["KeyPair"],
) -> None:
cli = Cli()
capsys.readouterr() # empty the buffer
cli.run(["secrets", "list", test_flake.name])
assert capsys.readouterr().out == ""
monkeypatch.setenv("SOPS_NIX_SECRET", "foo")
monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(test_flake.path / ".." / "age.key"))
cli.run(["secrets", "key", "generate"])
capsys.readouterr() # empty the buffer
cli.run(["secrets", "key", "show"])
key = capsys.readouterr().out
assert key.startswith("age1")
cli.run(["secrets", "users", "add", "testuser", key, test_flake.name])
with pytest.raises(ClanError): # does not exist yet
cli.run(["secrets", "get", "nonexisting", test_flake.name])
cli.run(["secrets", "set", "initialkey", test_flake.name])
capsys.readouterr()
cli.run(["secrets", "get", "initialkey", test_flake.name])
assert capsys.readouterr().out == "foo"
capsys.readouterr()
cli.run(["secrets", "users", "list", test_flake.name])
users = capsys.readouterr().out.rstrip().split("\n")
assert len(users) == 1, f"users: {users}"
owner = users[0]
monkeypatch.setenv("EDITOR", "cat")
cli.run(["secrets", "set", "--edit", "initialkey", test_flake.name])
monkeypatch.delenv("EDITOR")
cli.run(["secrets", "rename", "initialkey", "key", test_flake.name])
capsys.readouterr() # empty the buffer
cli.run(["secrets", "list", test_flake.name])
assert capsys.readouterr().out == "key\n"
cli.run(
["secrets", "machines", "add", "machine1", age_keys[0].pubkey, test_flake.name]
)
cli.run(["secrets", "machines", "add-secret", "machine1", "key", test_flake.name])
capsys.readouterr()
cli.run(["secrets", "machines", "list", test_flake.name])
assert capsys.readouterr().out == "machine1\n"
with use_key(age_keys[0].privkey, monkeypatch):
capsys.readouterr()
cli.run(["secrets", "get", "key", test_flake.name])
assert capsys.readouterr().out == "foo"
cli.run(
["secrets", "machines", "remove-secret", "machine1", "key", test_flake.name]
)
cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey, test_flake.name])
cli.run(["secrets", "users", "add-secret", "user1", "key", test_flake.name])
capsys.readouterr()
with use_key(age_keys[1].privkey, monkeypatch):
cli.run(["secrets", "get", "key", test_flake.name])
assert capsys.readouterr().out == "foo"
cli.run(["secrets", "users", "remove-secret", "user1", "key", test_flake.name])
with pytest.raises(ClanError): # does not exist yet
cli.run(
["secrets", "groups", "add-secret", "admin-group", "key", test_flake.name]
)
cli.run(["secrets", "groups", "add-user", "admin-group", "user1", test_flake.name])
cli.run(["secrets", "groups", "add-user", "admin-group", owner, test_flake.name])
cli.run(["secrets", "groups", "add-secret", "admin-group", "key", test_flake.name])
capsys.readouterr() # empty the buffer
cli.run(["secrets", "set", "--group", "admin-group", "key2", test_flake.name])
with use_key(age_keys[1].privkey, monkeypatch):
capsys.readouterr()
cli.run(["secrets", "get", "key", test_flake.name])
assert capsys.readouterr().out == "foo"
# extend group will update secrets
cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey, test_flake.name])
cli.run(["secrets", "groups", "add-user", "admin-group", "user2", test_flake.name])
with use_key(age_keys[2].privkey, monkeypatch): # user2
capsys.readouterr()
cli.run(["secrets", "get", "key", test_flake.name])
assert capsys.readouterr().out == "foo"
cli.run(
["secrets", "groups", "remove-user", "admin-group", "user2", test_flake.name]
)
with pytest.raises(ClanError), use_key(age_keys[2].privkey, monkeypatch):
# user2 is not in the group anymore
capsys.readouterr()
cli.run(["secrets", "get", "key", test_flake.name])
print(capsys.readouterr().out)
cli.run(
["secrets", "groups", "remove-secret", "admin-group", "key", test_flake.name]
)
cli.run(["secrets", "remove", "key", test_flake.name])
cli.run(["secrets", "remove", "key2", test_flake.name])
capsys.readouterr() # empty the buffer
cli.run(["secrets", "list", test_flake.name])
assert capsys.readouterr().out == ""

View File

@@ -1,56 +0,0 @@
from typing import TYPE_CHECKING
import pytest
from cli import Cli
from fixtures_flakes import FlakeForTest
from clan_cli.machines.facts import machine_get_fact
from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.secrets.secrets import has_secret
if TYPE_CHECKING:
from age_keys import KeyPair
@pytest.mark.impure
def test_generate_secret(
monkeypatch: pytest.MonkeyPatch,
test_flake_with_core: FlakeForTest,
age_keys: list["KeyPair"],
) -> None:
monkeypatch.chdir(test_flake_with_core.path)
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli = Cli()
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey])
cli.run(["secrets", "generate", "vm1"])
has_secret(test_flake_with_core.name, "vm1-age.key")
has_secret(test_flake_with_core.name, "vm1-zerotier-identity-secret")
network_id = machine_get_fact(
test_flake_with_core.name, "vm1", "zerotier-network-id"
)
assert len(network_id) == 16
age_key = (
sops_secrets_folder(test_flake_with_core.name)
.joinpath("vm1-age.key")
.joinpath("secret")
)
identity_secret = (
sops_secrets_folder(test_flake_with_core.name)
.joinpath("vm1-zerotier-identity-secret")
.joinpath("secret")
)
age_key_mtime = age_key.lstat().st_mtime_ns
secret1_mtime = identity_secret.lstat().st_mtime_ns
# test idempotency
cli.run(["secrets", "generate", "vm1"])
assert age_key.lstat().st_mtime_ns == age_key_mtime
assert identity_secret.lstat().st_mtime_ns == secret1_mtime
machine_path = (
sops_secrets_folder(test_flake_with_core.name)
.joinpath("vm1-zerotier-identity-secret")
.joinpath("machines")
.joinpath("vm1")
)
assert machine_path.exists()

View File

@@ -1,65 +0,0 @@
import subprocess
from pathlib import Path
import pytest
from cli import Cli
from fixtures_flakes import FlakeForTest
from clan_cli.machines.facts import machine_get_fact
from clan_cli.nix import nix_shell
from clan_cli.ssh import HostGroup
@pytest.mark.impure
def test_upload_secret(
monkeypatch: pytest.MonkeyPatch,
test_flake_with_core_and_pass: FlakeForTest,
temporary_dir: Path,
host_group: HostGroup,
) -> None:
monkeypatch.chdir(test_flake_with_core_and_pass.path)
gnupghome = temporary_dir / "gpg"
gnupghome.mkdir(mode=0o700)
monkeypatch.setenv("GNUPGHOME", str(gnupghome))
monkeypatch.setenv("PASSWORD_STORE_DIR", str(temporary_dir / "pass"))
gpg_key_spec = temporary_dir / "gpg_key_spec"
gpg_key_spec.write_text(
"""
Key-Type: 1
Key-Length: 1024
Name-Real: Root Superuser
Name-Email: test@local
Expire-Date: 0
%no-protection
"""
)
cli = Cli()
subprocess.run(
nix_shell(["gnupg"], ["gpg", "--batch", "--gen-key", str(gpg_key_spec)]),
check=True,
)
subprocess.run(nix_shell(["pass"], ["pass", "init", "test@local"]), check=True)
cli.run(["secrets", "generate", "vm1"])
network_id = machine_get_fact(
test_flake_with_core_and_pass.name, "vm1", "zerotier-network-id"
)
assert len(network_id) == 16
identity_secret = (
temporary_dir / "pass" / "machines" / "vm1" / "zerotier-identity-secret.gpg"
)
secret1_mtime = identity_secret.lstat().st_mtime_ns
# test idempotency
cli.run(["secrets", "generate", "vm1"])
assert identity_secret.lstat().st_mtime_ns == secret1_mtime
flake = test_flake_with_core_and_pass.path.joinpath("flake.nix")
host = host_group.hosts[0]
addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}"
new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr)
flake.write_text(new_text)
cli.run(["secrets", "upload", "vm1"])
zerotier_identity_secret = (
test_flake_with_core_and_pass.path / "secrets" / "zerotier-identity-secret"
)
assert zerotier_identity_secret.exists()

View File

@@ -1,41 +0,0 @@
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from cli import Cli
from clan_cli.ssh import HostGroup
if TYPE_CHECKING:
from age_keys import KeyPair
@pytest.mark.impure
def test_secrets_upload(
monkeypatch: pytest.MonkeyPatch,
test_flake_with_core: Path,
host_group: HostGroup,
age_keys: list["KeyPair"],
) -> None:
monkeypatch.chdir(test_flake_with_core)
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli = Cli()
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey])
cli.run(["secrets", "machines", "add", "vm1", age_keys[1].pubkey])
monkeypatch.setenv("SOPS_NIX_SECRET", age_keys[0].privkey)
cli.run(["secrets", "set", "vm1-age.key"])
flake = test_flake_with_core.joinpath("flake.nix")
host = host_group.hosts[0]
addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}"
new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr)
flake.write_text(new_text)
cli.run(["secrets", "upload", "vm1"])
# the flake defines this path as the location where the sops key should be installed
sops_key = test_flake_with_core.joinpath("key.txt")
assert sops_key.exists()
assert sops_key.read_text() == age_keys[0].privkey

View File

@@ -1,85 +0,0 @@
import os
import sys
from typing import Union
import pytest
import pytest_subprocess.fake_process
from pytest_subprocess import utils
import clan_cli
from clan_cli.ssh import cli
def test_no_args(
capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.setattr(sys, "argv", ["", "ssh"])
with pytest.raises(SystemExit):
clan_cli.main()
captured = capsys.readouterr()
assert captured.err.startswith("usage:")
# using fp fixture from pytest-subprocess
def test_ssh_no_pass(
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
) -> None:
host = "somehost"
user = "user"
if os.environ.get("IN_NIX_SANDBOX"):
monkeypatch.delenv("IN_NIX_SANDBOX")
cmd: list[Union[str, utils.Any]] = [
"nix",
fp.any(),
"shell",
fp.any(),
"-c",
"torify",
"ssh",
"-o",
"UserKnownHostsFile=/dev/null",
"-o",
"StrictHostKeyChecking=no",
f"{user}@{host}",
fp.any(),
]
fp.register(cmd)
cli.ssh(
host=host,
user=user,
)
assert fp.call_count(cmd) == 1
def test_ssh_with_pass(
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
) -> None:
host = "somehost"
user = "user"
if os.environ.get("IN_NIX_SANDBOX"):
monkeypatch.delenv("IN_NIX_SANDBOX")
cmd: list[Union[str, utils.Any]] = [
"nix",
fp.any(),
"shell",
fp.any(),
"-c",
"torify",
"sshpass",
"-p",
fp.any(),
]
fp.register(cmd)
cli.ssh(
host=host,
user=user,
password="XXX",
)
assert fp.call_count(cmd) == 1
def test_qrcode_scan(fp: pytest_subprocess.fake_process.FakeProcess) -> None:
cmd: list[Union[str, utils.Any]] = [fp.any()]
fp.register(cmd, stdout="https://test.test")
result = cli.qrcode_scan("test.png")
assert result == "https://test.test"

View File

@@ -1,97 +0,0 @@
import subprocess
from clan_cli.ssh import Host, HostGroup, run
def test_run() -> None:
p = run("echo hello")
assert p.stdout is None
def test_run_failure() -> None:
p = run("exit 1", check=False)
assert p.returncode == 1
try:
p = run("exit 1")
except Exception:
pass
else:
assert False, "Command should have raised an error"
hosts = HostGroup([Host("some_host")])
def test_run_environment() -> None:
p1 = run("echo $env_var", stdout=subprocess.PIPE, extra_env=dict(env_var="true"))
assert p1.stdout == "true\n"
p2 = hosts.run_local(
"echo $env_var", extra_env=dict(env_var="true"), stdout=subprocess.PIPE
)
assert p2[0].result.stdout == "true\n"
p3 = hosts.run_local(
["env"], extra_env=dict(env_var="true"), stdout=subprocess.PIPE
)
assert "env_var=true" in p3[0].result.stdout
def test_run_non_shell() -> None:
p = run(["echo", "$hello"], stdout=subprocess.PIPE)
assert p.stdout == "$hello\n"
def test_run_stderr_stdout() -> None:
p = run("echo 1; echo 2 >&2", stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert p.stdout == "1\n"
assert p.stderr == "2\n"
def test_run_local() -> None:
hosts.run_local("echo hello")
def test_timeout() -> None:
try:
hosts.run_local("sleep 10", timeout=0.01)
except Exception:
pass
else:
assert False, "should have raised TimeoutExpired"
def test_run_function() -> None:
def some_func(h: Host) -> bool:
p = h.run_local("echo hello", stdout=subprocess.PIPE)
return p.stdout == "hello\n"
res = hosts.run_function(some_func)
assert res[0].result
def test_run_exception() -> None:
try:
hosts.run_local("exit 1")
except Exception:
pass
else:
assert False, "should have raised Exception"
def test_run_function_exception() -> None:
def some_func(h: Host) -> None:
h.run_local("exit 1")
try:
hosts.run_function(some_func)
except Exception:
pass
else:
assert False, "should have raised Exception"
def test_run_local_non_shell() -> None:
p2 = hosts.run_local(["echo", "1"], stdout=subprocess.PIPE)
assert p2[0].result.stdout == "1\n"

View File

@@ -1,64 +0,0 @@
import subprocess
from clan_cli.ssh import Host, HostGroup
def test_run(host_group: HostGroup) -> None:
proc = host_group.run("echo hello", stdout=subprocess.PIPE)
assert proc[0].result.stdout == "hello\n"
def test_run_environment(host_group: HostGroup) -> None:
p1 = host_group.run(
"echo $env_var", stdout=subprocess.PIPE, extra_env=dict(env_var="true")
)
assert p1[0].result.stdout == "true\n"
p2 = host_group.run(["env"], stdout=subprocess.PIPE, extra_env=dict(env_var="true"))
assert "env_var=true" in p2[0].result.stdout
def test_run_no_shell(host_group: HostGroup) -> None:
proc = host_group.run(["echo", "$hello"], stdout=subprocess.PIPE)
assert proc[0].result.stdout == "$hello\n"
def test_run_function(host_group: HostGroup) -> None:
def some_func(h: Host) -> bool:
p = h.run("echo hello", stdout=subprocess.PIPE)
return p.stdout == "hello\n"
res = host_group.run_function(some_func)
assert res[0].result
def test_timeout(host_group: HostGroup) -> None:
try:
host_group.run_local("sleep 10", timeout=0.01)
except Exception:
pass
else:
assert False, "should have raised TimeoutExpired"
def test_run_exception(host_group: HostGroup) -> None:
r = host_group.run("exit 1", check=False)
assert r[0].result.returncode == 1
try:
host_group.run("exit 1")
except Exception:
pass
else:
assert False, "should have raised Exception"
def test_run_function_exception(host_group: HostGroup) -> None:
def some_func(h: Host) -> subprocess.CompletedProcess[str]:
return h.run_local("exit 1")
try:
host_group.run_function(some_func)
except Exception:
pass
else:
assert False, "should have raised Exception"

View File

@@ -1,30 +0,0 @@
from pathlib import Path
import pytest
from api import TestClient
@pytest.mark.impure
def test_inspect(api: TestClient, test_flake_with_core: Path) -> None:
response = api.post(
"/api/vms/inspect",
json=dict(flake_url=str(test_flake_with_core), flake_attr="vm1"),
)
assert response.status_code == 200, f"Failed to inspect vm: {response.text}"
config = response.json()["config"]
assert config.get("flake_attr") == "vm1"
assert config.get("cores") == 1
assert config.get("memory_size") == 1024
assert config.get("graphics") is False
def test_incorrect_uuid(api: TestClient) -> None:
uuid_endpoints = [
"/api/vms/{}/status",
"/api/vms/{}/logs",
]
for endpoint in uuid_endpoints:
response = api.get(endpoint.format("1234"))
assert response.status_code == 422, f"Failed to get vm status: {response.text}"

View File

@@ -1,114 +0,0 @@
import os
from pathlib import Path
from typing import TYPE_CHECKING, Iterator
import pytest
from api import TestClient
from cli import Cli
from fixtures_flakes import FlakeForTest, create_flake
from httpx import SyncByteStream
from root import CLAN_CORE
from clan_cli.types import FlakeName
if TYPE_CHECKING:
from age_keys import KeyPair
@pytest.fixture
def flake_with_vm_with_secrets(
monkeypatch: pytest.MonkeyPatch, temporary_home: Path
) -> Iterator[FlakeForTest]:
yield from create_flake(
monkeypatch,
temporary_home,
FlakeName("test_flake_with_core_dynamic_machines"),
CLAN_CORE,
machines=["vm_with_secrets"],
)
@pytest.fixture
def remote_flake_with_vm_without_secrets(
monkeypatch: pytest.MonkeyPatch, temporary_home: Path
) -> Iterator[FlakeForTest]:
yield from create_flake(
monkeypatch,
temporary_home,
FlakeName("test_flake_with_core_dynamic_machines"),
CLAN_CORE,
machines=["vm_without_secrets"],
remote=True,
)
@pytest.fixture
def create_user_with_age_key(
monkeypatch: pytest.MonkeyPatch,
test_flake: FlakeForTest,
age_keys: list["KeyPair"],
) -> None:
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli = Cli()
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey, test_flake.name])
def generic_create_vm_test(api: TestClient, flake: Path, vm: str) -> None:
print(f"flake_url: {flake} ")
response = api.post(
"/api/vms/create",
json=dict(
flake_url=str(flake),
flake_attr=vm,
cores=1,
memory_size=1024,
graphics=False,
),
)
assert response.status_code == 200, "Failed to create vm"
uuid = response.json()["uuid"]
assert len(uuid) == 36
assert uuid.count("-") == 4
response = api.get(f"/api/vms/{uuid}/status")
assert response.status_code == 200, "Failed to get vm status"
response = api.get(f"/api/vms/{uuid}/logs")
print("=========VM LOGS==========")
assert isinstance(response.stream, SyncByteStream)
for line in response.stream:
print(line.decode("utf-8"))
print("=========END LOGS==========")
assert response.status_code == 200, "Failed to get vm logs"
print("Get /api/vms/{uuid}/status")
response = api.get(f"/api/vms/{uuid}/status")
print("Finished Get /api/vms/{uuid}/status")
assert response.status_code == 200, "Failed to get vm status"
data = response.json()
assert (
data["status"] == "FINISHED"
), f"Expected to be finished, but got {data['status']} ({data})"
@pytest.mark.skipif(not os.path.exists("/dev/kvm"), reason="Requires KVM")
@pytest.mark.impure
def test_create_local(
api: TestClient,
monkeypatch: pytest.MonkeyPatch,
flake_with_vm_with_secrets: FlakeForTest,
create_user_with_age_key: None,
) -> None:
generic_create_vm_test(api, flake_with_vm_with_secrets.path, "vm_with_secrets")
@pytest.mark.skipif(not os.path.exists("/dev/kvm"), reason="Requires KVM")
@pytest.mark.impure
def test_create_remote(
api: TestClient,
monkeypatch: pytest.MonkeyPatch,
remote_flake_with_vm_without_secrets: FlakeForTest,
) -> None:
generic_create_vm_test(
api, remote_flake_with_vm_without_secrets.path, "vm_without_secrets"
)

View File

@@ -1,33 +0,0 @@
import os
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from cli import Cli
if TYPE_CHECKING:
from age_keys import KeyPair
no_kvm = not os.path.exists("/dev/kvm")
@pytest.mark.impure
def test_inspect(test_flake_with_core: Path, capsys: pytest.CaptureFixture) -> None:
cli = Cli()
cli.run(["vms", "inspect", "vm1"])
out = capsys.readouterr() # empty the buffer
assert "Cores" in out.out
@pytest.mark.skipif(no_kvm, reason="Requires KVM")
@pytest.mark.impure
def test_create(
monkeypatch: pytest.MonkeyPatch,
test_flake_with_core: Path,
age_keys: list["KeyPair"],
) -> None:
monkeypatch.chdir(test_flake_with_core)
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli = Cli()
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey])
cli.run(["vms", "create", "vm1"])

View File

@@ -10,12 +10,12 @@ from ports import PortFunction
@pytest.mark.timeout(10)
def test_start_server(unused_tcp_port: PortFunction, temporary_dir: Path) -> None:
def test_start_server(unused_tcp_port: PortFunction, temporary_home: Path) -> None:
port = unused_tcp_port()
fifo = temporary_dir / "fifo"
fifo = temporary_home / "fifo"
os.mkfifo(fifo)
notify_script = temporary_dir / "firefox"
notify_script = temporary_home / "firefox"
bash = shutil.which("bash")
assert bash is not None
notify_script.write_text(
@@ -27,8 +27,8 @@ echo "1" > {fifo}
notify_script.chmod(0o700)
env = os.environ.copy()
print(str(temporary_dir.absolute()))
env["PATH"] = ":".join([str(temporary_dir.absolute())] + env["PATH"].split(":"))
print(str(temporary_home.absolute()))
env["PATH"] = ":".join([str(temporary_home.absolute())] + env["PATH"].split(":"))
with subprocess.Popen(
[sys.executable, "-m", "clan_cli.webui", "--port", str(port)], env=env
) as p: