Merge pull request 'secrets generator + zerotier module' (#188) from secrets-module into main

This commit is contained in:
clan-bot
2023-08-29 15:43:15 +00:00
13 changed files with 352 additions and 98 deletions

View File

@@ -4,7 +4,7 @@ import sys
from types import ModuleType
from typing import Optional
from . import admin, config, machines, secrets, webui
from . import admin, config, machines, secrets, webui, zerotier
from .errors import ClanError
from .ssh import cli as ssh_cli
@@ -41,6 +41,9 @@ def create_parser(prog: Optional[str] = None) -> argparse.ArgumentParser:
parser_webui = subparsers.add_parser("webui", help="start webui")
webui.register_parser(parser_webui)
parser_zerotier = subparsers.add_parser("zerotier", help="create zerotier network")
zerotier.register_parser(parser_zerotier)
if argcomplete:
argcomplete.autocomplete(parser)

View File

@@ -1,6 +1,7 @@
# !/usr/bin/env python3
import argparse
from .generate import register_generate_parser
from .groups import register_groups_parser
from .import_sops import register_import_sops_parser
from .machines import register_machines_parser
@@ -29,4 +30,9 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
import_sops_parser = subparser.add_parser("import-sops", help="import a sops file")
register_import_sops_parser(import_sops_parser)
parser_generate = subparser.add_parser(
"generate", help="generate secrets for machines if they don't exist yet"
)
register_generate_parser(parser_generate)
register_secrets_parser(subparser)

View File

@@ -0,0 +1,51 @@
import argparse
import subprocess
import sys
from clan_cli.errors import ClanError
def get_secret_script(machine: str) -> None:
proc = subprocess.run(
[
"nix",
"build",
"--impure",
"--print-out-paths",
"--expr",
"let f = builtins.getFlake (toString ./.); in "
f"(f.nixosConfigurations.{machine}.extendModules "
"{ modules = [{ clanCore.clanDir = toString ./.; }]; })"
".config.system.clan.generateSecrets",
],
check=True,
capture_output=True,
text=True,
)
if proc.returncode != 0:
print(proc.stderr, file=sys.stderr)
raise ClanError(f"failed to generate secrets:\n{proc.stderr}")
secret_generator_script = proc.stdout.strip()
print(secret_generator_script)
secret_generator = subprocess.run(
[secret_generator_script],
check=True,
)
if secret_generator.returncode != 0:
raise ClanError("failed to generate secrets")
else:
print("successfully generated secrets")
def generate_command(args: argparse.Namespace) -> None:
get_secret_script(args.machine)
def register_generate_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"machine",
help="The machine to generate secrets for",
)
parser.set_defaults(func=generate_command)

View File

@@ -1,3 +1,4 @@
import argparse
import json
import socket
import subprocess
@@ -42,7 +43,8 @@ class ZerotierController:
def __init__(self, port: int, home: Path) -> None:
self.port = port
self.home = home
self.secret = (home / "authtoken.secret").read_text()
self.authtoken = (home / "authtoken.secret").read_text()
self.secret = (home / "identity.secret").read_text()
def _http_request(
self,
@@ -56,7 +58,7 @@ class ZerotierController:
if data is not None:
body = json.dumps(data).encode("ascii")
headers["Content-Type"] = "application/json"
headers["X-ZT1-AUTH"] = self.secret
headers["X-ZT1-AUTH"] = self.authtoken
url = f"http://127.0.0.1:{self.port}{path}"
req = urllib.request.Request(url, headers=headers, method=method, data=body)
resp = urllib.request.urlopen(req)
@@ -75,11 +77,6 @@ class ZerotierController:
def get_network(self, id: str) -> dict[str, Any]:
return self._http_request(f"/controller/network/{id}")
def update_network(self, id: str, new_config: dict[str, Any]) -> dict[str, Any]:
return self._http_request(
f"/controller/network/{id}", method="POST", data=new_config
)
@contextmanager
def zerotier_controller() -> Iterator[ZerotierController]:
@@ -117,6 +114,7 @@ def zerotier_controller() -> Iterator[ZerotierController]:
"/proc",
"--dev",
"/dev",
"--unshare-user",
"--uid",
"0",
"--gid",
@@ -151,19 +149,28 @@ def zerotier_controller() -> Iterator[ZerotierController]:
p.wait()
class ZerotierNetwork:
def __init__(self, network_id: str) -> None:
self.network_id = network_id
# TODO: allow merging more network configuration here
def create_network(private: bool = False) -> ZerotierNetwork:
def create_network() -> dict:
with zerotier_controller() as controller:
network = controller.create_network()
network_id = network["nwid"]
network = controller.get_network(network_id)
network["private"] = private
network["v6AssignMode"]["rfc4193"] = True
controller.update_network(network_id, network)
# TODO: persist home into sops?
return ZerotierNetwork(network_id)
return {
"secret": controller.secret,
"networkid": network["nwid"],
}
def main(args: argparse.Namespace) -> None:
zerotier = create_network()
outpath = Path(args.outpath)
outpath.mkdir(parents=True, exist_ok=True)
with open(outpath / "network.id", "w+") as nwid_file:
nwid_file.write(zerotier["networkid"])
with open(outpath / "identity.secret", "w+") as secret_file:
secret_file.write(zerotier["secret"])
def register_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--outpath", help="directory to put the secret file to", required=True
)
parser.set_defaults(func=main)