CLI: Restructured TaskManager and log collection

This commit is contained in:
Qubasa
2023-10-02 18:36:50 +02:00
parent af17c1bd7a
commit 7e180d2f12
3 changed files with 136 additions and 97 deletions

View File

@@ -5,19 +5,72 @@ import select
import shlex
import subprocess
import threading
from typing import Any
from typing import Any, Iterable, Iterator
from uuid import UUID, uuid4
class CmdState:
def __init__(self, proc: subprocess.Popen) -> None:
global LOOP
self.proc: subprocess.Popen = proc
def __init__(self, log: logging.Logger) -> None:
self.log: logging.Logger = log
self.p: subprocess.Popen = None
self.stdout: list[str] = []
self.stderr: list[str] = []
self.output: queue.SimpleQueue = queue.SimpleQueue()
self._output: queue.SimpleQueue = queue.SimpleQueue()
self.returncode: int | None = None
self.done: bool = False
self.running: bool = False
self.cmd_str: str | None = None
self.workdir: str | None = None
def close_queue(self) -> None:
if self.p is not None:
self.returncode = self.p.returncode
self._output.put(None)
self.running = False
self.done = True
def run(self, cmd: list[str]) -> None:
self.running = True
try:
self.cmd_str = shlex.join(cmd)
self.workdir = os.getcwd()
self.log.debug(f"Working directory: {self.workdir}")
self.log.debug(f"Running command: {shlex.join(cmd)}")
self.p = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
cwd=self.workdir,
)
while self.p.poll() is None:
# Check if stderr is ready to be read from
rlist, _, _ = select.select([self.p.stderr, self.p.stdout], [], [], 0)
if self.p.stderr in rlist:
assert self.p.stderr is not None
line = self.p.stderr.readline()
if line != "":
line = line.strip('\n')
self.stderr.append(line)
self.log.debug("stderr: %s", line)
self._output.put(line)
if self.p.stdout in rlist:
assert self.p.stdout is not None
line = self.p.stdout.readline()
if line != "":
line = line.strip('\n')
self.stdout.append(line)
self.log.debug("stdout: %s", line)
self._output.put(line)
if self.p.returncode != 0:
raise RuntimeError(f"Failed to run command: {shlex.join(cmd)}")
self.log.debug("Successfully ran command")
finally:
self.close_queue()
class BaseTask(threading.Thread):
@@ -31,64 +84,52 @@ class BaseTask(threading.Thread):
self.procs: list[CmdState] = []
self.failed: bool = False
self.finished: bool = False
self.logs_lock = threading.Lock()
def run(self) -> None:
try:
self.task_run()
except Exception as e:
for proc in self.procs:
proc.close_queue()
self.failed = True
self.log.exception(e)
finally:
self.finished = True
self.log.exception(e)
def task_run(self) -> None:
raise NotImplementedError
def run_cmd(self, cmd: list[str]) -> CmdState:
cwd = os.getcwd()
self.log.debug(f"Working directory: {cwd}")
self.log.debug(f"Running command: {shlex.join(cmd)}")
p = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
# shell=True,
cwd=cwd,
)
self.procs.append(CmdState(p))
p_state = self.procs[-1]
## TODO: If two clients are connected to the same task,
def logs_iter(self) -> Iterator[str]:
with self.logs_lock:
for proc in self.procs:
if self.finished:
self.log.debug("log iter: Task is finished")
break
if proc.done:
for line in proc.stderr:
yield line
for line in proc.stdout:
yield line
continue
while True:
out = proc._output
line = out.get()
if line is None:
break
yield line
while p.poll() is None:
# Check if stderr is ready to be read from
rlist, _, _ = select.select([p.stderr, p.stdout], [], [], 0)
if p.stderr in rlist:
assert p.stderr is not None
line = p.stderr.readline()
if line != "":
p_state.stderr.append(line.strip("\n"))
self.log.debug(f"stderr: {line}")
p_state.output.put(line)
def register_cmds(self, num_cmds: int) -> Iterable[CmdState]:
for i in range(num_cmds):
cmd = CmdState(self.log)
self.procs.append(cmd)
if p.stdout in rlist:
assert p.stdout is not None
line = p.stdout.readline()
if line != "":
p_state.stdout.append(line.strip("\n"))
self.log.debug(f"stdout: {line}")
p_state.output.put(line)
p_state.returncode = p.returncode
p_state.output.put(None)
p_state.done = True
if p.returncode != 0:
raise RuntimeError(f"Failed to run command: {shlex.join(cmd)}")
self.log.debug("Successfully ran command")
return p_state
for cmd in self.procs:
yield cmd
# TODO: We need to test concurrency
class TaskPool:
def __init__(self) -> None:
self.lock: threading.RLock = threading.RLock()