Source code for doot.cmds.run_cmd

  1 #!/usr/bin/env python3
  2"""
  3
  4"""
  5# Imports:
  6from __future__ import annotations
  7
  8# ##-- stdlib imports
  9import datetime
 10import enum
 11import functools as ftz
 12import itertools as itz
 13import logging as logmod
 14import pathlib as pl
 15import re
 16import time
 17from uuid import UUID, uuid1
 18
 19# ##-- end stdlib imports
 20
 21# ##-- 3rd party imports
 22from jgdv import Proto
 23from jgdv.debugging.timing import TimeCtx
 24from jgdv.structs.strang import CodeReference
 25from jgdv.util.plugins.selector import plugin_selector
 26
 27# ##-- end 3rd party imports
 28
 29# ##-- 1st party imports
 30import doot
 31from doot.control.runner.step_runner import DootStepRunner
 32from doot.workflow.check_locs import CheckLocsTask
 33
 34# ##-- end 1st party imports
 35
 36# ##-| Local
 37from ._base import BaseCommand
 38from ._interface import Command_p
 39
 40# # End of Imports.
 41
 42# ##-- types
 43# isort: off
 44import abc
 45import collections.abc
 46from typing import TYPE_CHECKING, cast, assert_type, assert_never
 47from typing import Generic, NewType
 48# Protocols:
 49from typing import Protocol, runtime_checkable
 50# Typing Decorators:
 51from typing import no_type_check, final, override, overload
 52
 53if TYPE_CHECKING:
 54    from jgdv import Maybe
 55    from typing import Final
 56    from typing import ClassVar, Any, LiteralString
 57    from typing import Never, Self, Literal, ContextManager
 58    from typing import TypeGuard
 59    from collections.abc import Iterable, Iterator, Callable, Generator
 60    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 61    from jgdv.structs.chainguard import ChainGuard
 62    from doot.control.runner._inteface import WorkflowRunner_p
 63    from doot.control.tracker._interface import WorkflowTracker_p
 64
 65# isort: on
 66# ##-- end types
 67
 68##-- logging
 69logging = logmod.getLogger(__name__)
 70##-- end logging
 71
 72# TODO make a decorator to register these onto the cmd
 73tracker_target     : Final       = doot.config.on_fail("default", str).settings.commands.run.tracker()
 74runner_target      : Final       = doot.config.on_fail("default", str).settings.commands.run.runner()
 75reporter_target    : Final       = doot.config.on_fail("default", str).settings.commands.run.reporter()
 76interrupt_handler  : Final       = doot.config.on_fail("jgdv.debugging:SignalHandler", bool|str).settings.commands.run.interrupt()
 77check_locs         : Final       = doot.config.on_fail(False).settings.commands.run.location_check.active()  # noqa: FBT003
 78
 79CONFIRM            : Final[str]  = "Y"
 80##--|
 81
[docs] 82@Proto(Command_p) 83class RunCmd(BaseCommand): 84 _name = "run" 85 _help = tuple(["Will perform the tasks/jobs targeted.", 86 "Can be parameterized in a commands.run block with:", 87 "tracker(str), runner(str)", 88 ]) 89
[docs] 90 @override 91 def param_specs(self) -> list: 92 return [ 93 *super().param_specs(), 94 self.build_param(name="--interrupt", default=False, type=bool, desc="Activate interrupt handler"), 95 self.build_param(name="--step", default=False, type=bool, desc="Interrupt between workflow step"), 96 self.build_param(name="--dry-run", default=False, type=bool, desc="Don't perform actions"), 97 self.build_param(name="--confirm", default=False, type=bool, desc="Confirm the expected workflow plan"), 98 ]
99 100 def __call__(self, *, idx:int, tasks:ChainGuard, plugins:ChainGuard): 101 tracker : WorkflowTracker_p 102 runner : WorkflowRunner_p 103 interrupt : Maybe[bool|type[ContextManager]|ContextManager] 104 ##--| 105 doot.load_reporter(target=reporter_target) 106 107 doot.report.active_level(logmod.INFO) 108 doot.report.gen.gap() 109 doot.report.gen.line(f"Starting Run Cmd ({idx})", char="=") 110 tracker, runner = self._create_tracker_and_runner(idx, plugins) 111 interrupt = self._choose_interrupt_handler(idx) 112 113 self._register_specs(idx, tracker, tasks) 114 self._queue_tasks(idx, tracker) 115 116 logging.info("---- Starting Runner") 117 with (TimeCtx(logger=logging, 118 level=21) as timer, 119 runner, 120 ): 121 if not self._confirm_plan(idx, runner): 122 return 123 runner(handler=interrupt) 124 125 logging.info("---- Runner took: %s seconds", timer.total_s) 126
[docs] 127 def _create_tracker_and_runner(self, idx:int, plugins:ChainGuard) -> tuple[WorkflowTracker_p, WorkflowRunner_p]: 128 # Note the final parens to construct: 129 trackers = plugins.on_fail([], list).tracker() 130 runners = plugins.on_fail([], list).runner() 131 132 match plugin_selector(trackers, target=tracker_target): 133 case type() as x: 134 tracker = x() 135 case x: 136 raise TypeError(type(x)) 137 138 match plugin_selector(runners, target=runner_target): 139 case _ if doot.args.on_fail(False).cmd[self.name][idx].args.step(): # noqa: FBT003 140 runner = DootStepRunner(tracker=tracker) 141 case type() as x: 142 runner = x(tracker=tracker) 143 case x: 144 raise TypeError(type(x)) 145 146 return tracker, runner
147
[docs] 148 def _choose_interrupt_handler(self, idx:int) -> Maybe[bool|type|ContextManager]: 149 match interrupt_handler: 150 case _ if not doot.args.on_fail(False).cmd[self.name][idx].args.interrupt(): # noqa: FBT003 151 return None 152 case None: 153 return None 154 case True: 155 doot.report.gen.trace("Setting default interrupt handler") 156 return True 157 case str(): 158 doot.report.gen.trace("Loading custom interrupt handler") 159 ref = CodeReference(interrupt_handler) 160 return ref(raise_error=True) 161 case _: 162 return None
163 164
[docs] 165 def _register_specs(self, idx:int, tracker:WorkflowTracker_p, tasks:ChainGuard) -> None: 166 doot.report.gen.trace("Registering Task Specs: %s", len(tasks)) 167 for task in tasks.values(): 168 tracker.register(task) 169 170 match CheckLocsTask(): 171 case x if bool(x.spec.actions) and check_locs: 172 tracker.queue(CheckLocsTask(), from_user=True) 173 case _: 174 pass
175
[docs] 176 def _queue_tasks(self, idx:int, tracker:WorkflowTracker_p) -> None: 177 doot.report.gen.trace("Queuing Initial Tasks...") 178 doot.report.gen.gap() 179 180 for sub, calls in doot.args.on_fail({}).subs().items(): 181 for i,_ in enumerate(calls, 1): 182 try: 183 tracker.queue(sub, from_user=i) 184 except doot.errors.TrackingError as err: 185 logging.exception("Failed to Queue Target: %s : %s", sub, err.args, exc_info=None) 186 return 187 else: 188 doot.report.gen.trace("%s Tasks Queued", len(tracker.active))
189
[docs] 190 def _confirm_plan(self, idx:int, runner:WorkflowRunner_p) -> bool: 191 """ Generate and Confirm the plan from the tracker""" 192 if not doot.args.on_fail(False).cmd[self.name][idx].args.confirm(): # noqa: FBT003 193 return True 194 195 tracker = runner.tracker 196 plan = tracker.generate_plan() 197 for i,(depth,node,_desc) in enumerate(plan): 198 doot.report.gen.trace("(D:%s) Step %-4s: %s", depth, i, node) 199 else: 200 match input("Confirm Execution Plan (Y/*): "): 201 case str() as x if x == CONFIRM: 202 return True 203 case _: 204 doot.report.gen.trace("Cancelling") 205 return False
206
[docs] 207 def _accept_subcmds(self) -> Literal[True]: 208 return True