1 #!/usr/bin/env python3
2"""
3
4"""
5# Imports:
6from __future__ import annotations
7
8# ##-- stdlib imports
9import datetime
10import enum
11import functools as ftz
12import itertools as itz
13import logging as logmod
14import pathlib as pl
15import re
16import time
17from uuid import UUID, uuid1
18
19# ##-- end stdlib imports
20
21# ##-- 3rd party imports
22from jgdv import Proto
23from jgdv.debugging.timing import TimeCtx
24from jgdv.structs.strang import CodeReference
25from jgdv.util.plugins.selector import plugin_selector
26
27# ##-- end 3rd party imports
28
29# ##-- 1st party imports
30import doot
31from doot.control.runner.step_runner import DootStepRunner
32from doot.workflow.check_locs import CheckLocsTask
33
34# ##-- end 1st party imports
35
36# ##-| Local
37from ._base import BaseCommand
38from ._interface import Command_p
39
40# # End of Imports.
41
42# ##-- types
43# isort: off
44import abc
45import collections.abc
46from typing import TYPE_CHECKING, cast, assert_type, assert_never
47from typing import Generic, NewType
48# Protocols:
49from typing import Protocol, runtime_checkable
50# Typing Decorators:
51from typing import no_type_check, final, override, overload
52
53if TYPE_CHECKING:
54 from jgdv import Maybe
55 from typing import Final
56 from typing import ClassVar, Any, LiteralString
57 from typing import Never, Self, Literal, ContextManager
58 from typing import TypeGuard
59 from collections.abc import Iterable, Iterator, Callable, Generator
60 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
61 from jgdv.structs.chainguard import ChainGuard
62 from doot.control.runner._inteface import WorkflowRunner_p
63 from doot.control.tracker._interface import WorkflowTracker_p
64
65# isort: on
66# ##-- end types
67
68##-- logging
69logging = logmod.getLogger(__name__)
70##-- end logging
71
72# TODO make a decorator to register these onto the cmd
73tracker_target : Final = doot.config.on_fail("default", str).settings.commands.run.tracker()
74runner_target : Final = doot.config.on_fail("default", str).settings.commands.run.runner()
75reporter_target : Final = doot.config.on_fail("default", str).settings.commands.run.reporter()
76interrupt_handler : Final = doot.config.on_fail("jgdv.debugging:SignalHandler", bool|str).settings.commands.run.interrupt()
77check_locs : Final = doot.config.on_fail(False).settings.commands.run.location_check.active() # noqa: FBT003
78
79CONFIRM : Final[str] = "Y"
80##--|
81
[docs]
82@Proto(Command_p)
83class RunCmd(BaseCommand):
84 _name = "run"
85 _help = tuple(["Will perform the tasks/jobs targeted.",
86 "Can be parameterized in a commands.run block with:",
87 "tracker(str), runner(str)",
88 ])
89
[docs]
90 @override
91 def param_specs(self) -> list:
92 return [
93 *super().param_specs(),
94 self.build_param(name="--interrupt", default=False, type=bool, desc="Activate interrupt handler"),
95 self.build_param(name="--step", default=False, type=bool, desc="Interrupt between workflow step"),
96 self.build_param(name="--dry-run", default=False, type=bool, desc="Don't perform actions"),
97 self.build_param(name="--confirm", default=False, type=bool, desc="Confirm the expected workflow plan"),
98 ]
99
100 def __call__(self, *, idx:int, tasks:ChainGuard, plugins:ChainGuard):
101 tracker : WorkflowTracker_p
102 runner : WorkflowRunner_p
103 interrupt : Maybe[bool|type[ContextManager]|ContextManager]
104 ##--|
105 doot.load_reporter(target=reporter_target)
106
107 doot.report.active_level(logmod.INFO)
108 doot.report.gen.gap()
109 doot.report.gen.line(f"Starting Run Cmd ({idx})", char="=")
110 tracker, runner = self._create_tracker_and_runner(idx, plugins)
111 interrupt = self._choose_interrupt_handler(idx)
112
113 self._register_specs(idx, tracker, tasks)
114 self._queue_tasks(idx, tracker)
115
116 logging.info("---- Starting Runner")
117 with (TimeCtx(logger=logging,
118 level=21) as timer,
119 runner,
120 ):
121 if not self._confirm_plan(idx, runner):
122 return
123 runner(handler=interrupt)
124
125 logging.info("---- Runner took: %s seconds", timer.total_s)
126
[docs]
127 def _create_tracker_and_runner(self, idx:int, plugins:ChainGuard) -> tuple[WorkflowTracker_p, WorkflowRunner_p]:
128 # Note the final parens to construct:
129 trackers = plugins.on_fail([], list).tracker()
130 runners = plugins.on_fail([], list).runner()
131
132 match plugin_selector(trackers, target=tracker_target):
133 case type() as x:
134 tracker = x()
135 case x:
136 raise TypeError(type(x))
137
138 match plugin_selector(runners, target=runner_target):
139 case _ if doot.args.on_fail(False).cmd[self.name][idx].args.step(): # noqa: FBT003
140 runner = DootStepRunner(tracker=tracker)
141 case type() as x:
142 runner = x(tracker=tracker)
143 case x:
144 raise TypeError(type(x))
145
146 return tracker, runner
147
[docs]
148 def _choose_interrupt_handler(self, idx:int) -> Maybe[bool|type|ContextManager]:
149 match interrupt_handler:
150 case _ if not doot.args.on_fail(False).cmd[self.name][idx].args.interrupt(): # noqa: FBT003
151 return None
152 case None:
153 return None
154 case True:
155 doot.report.gen.trace("Setting default interrupt handler")
156 return True
157 case str():
158 doot.report.gen.trace("Loading custom interrupt handler")
159 ref = CodeReference(interrupt_handler)
160 return ref(raise_error=True)
161 case _:
162 return None
163
164
[docs]
165 def _register_specs(self, idx:int, tracker:WorkflowTracker_p, tasks:ChainGuard) -> None:
166 doot.report.gen.trace("Registering Task Specs: %s", len(tasks))
167 for task in tasks.values():
168 tracker.register(task)
169
170 match CheckLocsTask():
171 case x if bool(x.spec.actions) and check_locs:
172 tracker.queue(CheckLocsTask(), from_user=True)
173 case _:
174 pass
175
[docs]
176 def _queue_tasks(self, idx:int, tracker:WorkflowTracker_p) -> None:
177 doot.report.gen.trace("Queuing Initial Tasks...")
178 doot.report.gen.gap()
179
180 for sub, calls in doot.args.on_fail({}).subs().items():
181 for i,_ in enumerate(calls, 1):
182 try:
183 tracker.queue(sub, from_user=i)
184 except doot.errors.TrackingError as err:
185 logging.exception("Failed to Queue Target: %s : %s", sub, err.args, exc_info=None)
186 return
187 else:
188 doot.report.gen.trace("%s Tasks Queued", len(tracker.active))
189
[docs]
190 def _confirm_plan(self, idx:int, runner:WorkflowRunner_p) -> bool:
191 """ Generate and Confirm the plan from the tracker"""
192 if not doot.args.on_fail(False).cmd[self.name][idx].args.confirm(): # noqa: FBT003
193 return True
194
195 tracker = runner.tracker
196 plan = tracker.generate_plan()
197 for i,(depth,node,_desc) in enumerate(plan):
198 doot.report.gen.trace("(D:%s) Step %-4s: %s", depth, i, node)
199 else:
200 match input("Confirm Execution Plan (Y/*): "):
201 case str() as x if x == CONFIRM:
202 return True
203 case _:
204 doot.report.gen.trace("Cancelling")
205 return False
206
[docs]
207 def _accept_subcmds(self) -> Literal[True]:
208 return True