import json import socket import subprocess import time import urllib.request from contextlib import contextmanager from pathlib import Path from tempfile import TemporaryDirectory from typing import Any, Iterator, Optional from ..nix import nix_shell def try_bind_port(port: int) -> bool: tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) with tcp, udp: try: tcp.bind(("127.0.0.1", port)) udp.bind(("127.0.0.1", port)) return True except socket.error: return False def try_connect_port(port: int) -> bool: sock = socket.socket(socket.AF_INET) result = sock.connect_ex(("127.0.0.1", port)) sock.close() return result == 0 def find_free_port(port_range: range) -> int: for port in port_range: if try_bind_port(port): return port raise Exception("cannot find a free port") class ZerotierController: def __init__(self, port: int, home: Path) -> None: self.port = port self.home = home self.secret = (home / "authtoken.secret").read_text() def _http_request( self, path: str, method: str = "GET", headers: dict[str, str] = {}, data: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: body = None headers = headers.copy() if data is not None: body = json.dumps(data).encode("ascii") headers["Content-Type"] = "application/json" headers["X-ZT1-AUTH"] = self.secret url = f"http://127.0.0.1:{self.port}{path}" req = urllib.request.Request(url, headers=headers, method=method, data=body) resp = urllib.request.urlopen(req) return json.load(resp) def status(self) -> dict[str, Any]: # pragma: no cover return self._http_request("/status") def create_network(self, data: dict[str, Any] = {}) -> dict[str, Any]: identity = (self.home / "identity.public").read_text() node_id = identity.split(":")[0] return self._http_request( f"/controller/network/{node_id}______", method="POST", data=data ) def get_network(self, id: str) -> dict[str, Any]: return self._http_request(f"/controller/network/{id}") def update_network( self, id: str, new_config: dict[str, Any] ) -> dict[str, Any]: # pragma: no cover return self._http_request( f"/controller/network/{id}", method="POST", data=new_config ) @contextmanager def zerotier_controller() -> Iterator[ZerotierController]: # This check could be racy but it's unlikely in practice controller_port = find_free_port(range(10000, 65535)) cmd = nix_shell(["bash", "zerotierone"], ["bash", "-c", "command -v zerotier-one"]) res = subprocess.run( cmd, check=True, text=True, stdout=subprocess.PIPE, ) zerotier_exe = res.stdout.strip() if zerotier_exe is None: raise Exception("cannot find zerotier-one executable") if not zerotier_exe.startswith("/nix/store"): raise Exception( f"zerotier-one executable needs to come from /nix/store: {zerotier_exe}" ) with TemporaryDirectory() as d: tempdir = Path(d) home = tempdir / "zerotier-one" home.mkdir() cmd = nix_shell( ["bubblewrap"], [ "bwrap", "--proc", "/proc", "--dev", "/dev", "--uid", "0", "--gid", "0", "--ro-bind", "/nix", "/nix", "--bind", str(home), "/var/lib/zerotier-one", zerotier_exe, f"-p{controller_port}", ], ) with subprocess.Popen(cmd) as p: try: print( f"wait for controller to be started on 127.0.0.1:{controller_port}...", ) while not try_connect_port(controller_port): status = p.poll() if status is not None: raise Exception( f"zerotier-one has been terminated unexpected with {status}" ) time.sleep(0.1) print() yield ZerotierController(controller_port, home) finally: p.kill() p.wait() class ZerotierNetwork: def __init__(self, network_id: str) -> None: self.network_id = network_id # TODO: allow merging more network configuration here def create_network(private: bool = False) -> ZerotierNetwork: with zerotier_controller() as controller: network = controller.create_network() network_id = network["nwid"] network = controller.get_network(network_id) network["private"] = private network["v6AssignMode"]["rfc4193"] = True controller.update_network(network_id, network) # TODO: persist home into sops? return ZerotierNetwork(network_id)