cli: fix broken type checks in config module
This commit is contained in:
@@ -1,8 +1,10 @@
|
|||||||
import argparse
|
import argparse
|
||||||
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from . import admin, config, secrets, ssh
|
from . import admin, config, secrets, ssh
|
||||||
from .errors import ClanError
|
from .errors import ClanError
|
||||||
|
from .tty import warn
|
||||||
|
|
||||||
has_argcomplete = True
|
has_argcomplete = True
|
||||||
try:
|
try:
|
||||||
@@ -20,7 +22,10 @@ def main() -> None:
|
|||||||
admin.register_parser(parser_admin)
|
admin.register_parser(parser_admin)
|
||||||
|
|
||||||
parser_config = subparsers.add_parser("config")
|
parser_config = subparsers.add_parser("config")
|
||||||
config.register_parser(parser_config)
|
try:
|
||||||
|
config.register_parser(parser_config)
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
warn("The config command does not in the nix sandbox")
|
||||||
|
|
||||||
parser_ssh = subparsers.add_parser("ssh", help="ssh to a remote machine")
|
parser_ssh = subparsers.add_parser("ssh", help="ssh to a remote machine")
|
||||||
ssh.register_parser(parser_ssh)
|
ssh.register_parser(parser_ssh)
|
||||||
|
|||||||
@@ -4,12 +4,14 @@ import json
|
|||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Optional, Union
|
from typing import Any, Optional, Type, Union
|
||||||
|
|
||||||
|
from clan_cli.errors import ClanError
|
||||||
|
|
||||||
|
|
||||||
class Kwargs:
|
class Kwargs:
|
||||||
def __init__(self):
|
def __init__(self) -> None:
|
||||||
self.type = None
|
self.type: Optional[Type] = None
|
||||||
self.default: Any = None
|
self.default: Any = None
|
||||||
self.required: bool = False
|
self.required: bool = False
|
||||||
self.help: Optional[str] = None
|
self.help: Optional[str] = None
|
||||||
@@ -19,7 +21,7 @@ class Kwargs:
|
|||||||
|
|
||||||
def schema_from_module_file(
|
def schema_from_module_file(
|
||||||
file: Union[str, Path] = "./tests/config/example-interface.nix",
|
file: Union[str, Path] = "./tests/config/example-interface.nix",
|
||||||
) -> dict:
|
) -> dict[str, Any]:
|
||||||
absolute_path = Path(file).absolute()
|
absolute_path = Path(file).absolute()
|
||||||
# define a nix expression that loads the given module file using lib.evalModules
|
# define a nix expression that loads the given module file using lib.evalModules
|
||||||
nix_expr = f"""
|
nix_expr = f"""
|
||||||
@@ -37,22 +39,31 @@ def schema_from_module_file(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# takes a (sub)parser and configures it
|
|
||||||
def register_parser(
|
def register_parser(
|
||||||
parser: Optional[argparse.ArgumentParser] = None,
|
parser: argparse.ArgumentParser,
|
||||||
schema: Union[dict, str, Path] = "./tests/config/example-interface.nix",
|
file: Path = Path("./tests/config/example-interface.nix"),
|
||||||
) -> dict:
|
) -> None:
|
||||||
|
if file.name.endswith(".nix"):
|
||||||
|
schema = schema_from_module_file(file)
|
||||||
|
else:
|
||||||
|
schema = json.loads(file.read_text())
|
||||||
|
return _register_parser(parser, schema)
|
||||||
|
|
||||||
|
|
||||||
|
# takes a (sub)parser and configures it
|
||||||
|
def _register_parser(
|
||||||
|
parser: Optional[argparse.ArgumentParser],
|
||||||
|
schema: dict[str, Any],
|
||||||
|
) -> None:
|
||||||
# check if schema is a .nix file and load it in that case
|
# check if schema is a .nix file and load it in that case
|
||||||
if isinstance(schema, str) and schema.endswith(".nix"):
|
if "type" not in schema:
|
||||||
schema = schema_from_module_file(schema)
|
raise ClanError("Schema has no type")
|
||||||
elif not isinstance(schema, dict):
|
if schema["type"] != "object":
|
||||||
with open(str(schema)) as f:
|
raise ClanError("Schema is not an object")
|
||||||
schema: dict = json.load(f)
|
|
||||||
assert "type" in schema and schema["type"] == "object"
|
|
||||||
|
|
||||||
required_set = set(schema.get("required", []))
|
required_set = set(schema.get("required", []))
|
||||||
|
|
||||||
type_map = {
|
type_map: dict[str, Type] = {
|
||||||
"array": list,
|
"array": list,
|
||||||
"boolean": bool,
|
"boolean": bool,
|
||||||
"integer": int,
|
"integer": int,
|
||||||
@@ -60,8 +71,7 @@ def register_parser(
|
|||||||
"string": str,
|
"string": str,
|
||||||
}
|
}
|
||||||
|
|
||||||
if parser is None:
|
parser = argparse.ArgumentParser(description=schema.get("description"))
|
||||||
parser = argparse.ArgumentParser(description=schema.get("description"))
|
|
||||||
|
|
||||||
subparsers = parser.add_subparsers(
|
subparsers = parser.add_subparsers(
|
||||||
title="more options",
|
title="more options",
|
||||||
@@ -72,11 +82,12 @@ def register_parser(
|
|||||||
|
|
||||||
for name, value in schema.get("properties", {}).items():
|
for name, value in schema.get("properties", {}).items():
|
||||||
assert isinstance(value, dict)
|
assert isinstance(value, dict)
|
||||||
|
type_ = value.get("type")
|
||||||
|
|
||||||
# TODO: add support for nested objects
|
# TODO: add support for nested objects
|
||||||
if value.get("type") == "object":
|
if type_ == "object":
|
||||||
subparser = subparsers.add_parser(name, help=value.get("description"))
|
subparser = subparsers.add_parser(name, help=value.get("description"))
|
||||||
register_parser(parser=subparser, schema=value)
|
_register_parser(parser=subparser, schema=value)
|
||||||
continue
|
continue
|
||||||
# elif value.get("type") == "array":
|
# elif value.get("type") == "array":
|
||||||
# subparser = parser.add_subparsers(dest=name)
|
# subparser = parser.add_subparsers(dest=name)
|
||||||
@@ -92,22 +103,25 @@ def register_parser(
|
|||||||
|
|
||||||
if "enum" in value:
|
if "enum" in value:
|
||||||
enum_list = value["enum"]
|
enum_list = value["enum"]
|
||||||
assert len(enum_list) > 0, "Enum List is Empty"
|
if len(enum_list) == 0:
|
||||||
|
raise ClanError("Enum List is Empty")
|
||||||
arg_type = type(enum_list[0])
|
arg_type = type(enum_list[0])
|
||||||
assert all(
|
if not all(arg_type is type(item) for item in enum_list):
|
||||||
arg_type is type(item) for item in enum_list
|
raise ClanError(f"Items in [{enum_list}] with Different Types")
|
||||||
), f"Items in [{enum_list}] with Different Types"
|
|
||||||
|
|
||||||
kwargs.type = arg_type
|
kwargs.type = arg_type
|
||||||
kwargs.choices = enum_list
|
kwargs.choices = enum_list
|
||||||
else:
|
elif type_ in type_map:
|
||||||
kwargs.type = type_map[value.get("type")]
|
kwargs.type = type_map[type_]
|
||||||
del kwargs.choices
|
del kwargs.choices
|
||||||
|
else:
|
||||||
|
raise ClanError(f"Unsupported Type '{type_}' in schema")
|
||||||
|
|
||||||
name = f"--{name}"
|
name = f"--{name}"
|
||||||
|
|
||||||
if kwargs.type is bool:
|
if kwargs.type is bool:
|
||||||
assert not kwargs.default, "boolean have to be False in default"
|
if kwargs.default:
|
||||||
|
raise ClanError("Boolean have to be False in default")
|
||||||
kwargs.default = False
|
kwargs.default = False
|
||||||
kwargs.action = "store_true"
|
kwargs.action = "store_true"
|
||||||
del kwargs.type
|
del kwargs.type
|
||||||
@@ -117,7 +131,7 @@ def register_parser(
|
|||||||
parser.add_argument(name, **vars(kwargs))
|
parser.add_argument(name, **vars(kwargs))
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main() -> None:
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"schema",
|
"schema",
|
||||||
@@ -125,8 +139,7 @@ def main():
|
|||||||
type=str,
|
type=str,
|
||||||
)
|
)
|
||||||
args = parser.parse_args(sys.argv[1:2])
|
args = parser.parse_args(sys.argv[1:2])
|
||||||
schema = args.schema
|
register_parser(parser, args.schema)
|
||||||
register_parser(schema=schema, parser=parser)
|
|
||||||
parser.parse_args(sys.argv[2:])
|
parser.parse_args(sys.argv[2:])
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user