1#!/usr/bin/env python3
2"""
3
4"""
5# ruff: noqa: N812
6# : disable-error-code="attr-defined"
7# Imports:
8from __future__ import annotations
9
10# ##-- stdlib imports
11import datetime
12import enum
13import functools as ftz
14import itertools as itz
15import logging as logmod
16import pathlib as pl
17import re
18import time
19import types
20from collections import defaultdict
21from contextlib import nullcontext
22from uuid import UUID, uuid1
23
24# ##-- end stdlib imports
25
26# ##-- 3rd party imports
27import networkx as nx
28from jgdv import Mixin, Proto
29from jgdv.debugging import NullHandler, SignalHandler
30
31# ##-- end 3rd party imports
32
33# ##-- 1st party imports
34import doot
35import doot.errors
36from doot.control.runner._interface import WorkflowRunner_p
37from doot.workflow import (ActionSpec, RelationSpec, TaskArtifact, TaskName, TaskSpec)
38from doot.workflow._interface import ActionResponse_e as ActRE
39from doot.workflow._interface import Job_p, Task_p, TaskName_p, TaskSpec_i, ActionSpec_i, RelationSpec_i
40
41# ##-- end 1st party imports
42
43# ##-| Local
44from . import _interface as API # noqa: N812
45from . import util as RU
46
47# # End of Imports.
48
49# ##-- types
50# isort: off
51import abc
52import collections.abc
53from typing import TYPE_CHECKING, cast, assert_type, assert_never
54from typing import Generic, NewType
55# Protocols:
56from typing import Protocol, runtime_checkable
57# Typing Decorators:
58from typing import no_type_check, final, override, overload
59
60if TYPE_CHECKING:
61 from doot.control.tracker._interface import WorkflowTracker_p
62 from doot.workflow._interface import Artifact_i, DelayedSpec
63 from jgdv import Maybe
64 from typing import Final
65 from typing import ClassVar, Any, LiteralString
66 from typing import Never, Self, Literal
67 from typing import TypeGuard
68 from collections.abc import Iterable, Iterator, Callable, Generator
69 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
70
71##--|
72from typing import ContextManager
73# isort: on
74# ##-- end types
75
76##-- logging
77logging = logmod.getLogger(__name__)
78##-- end logging
79
80##--| Vars
81skip_msg : Final[str] = doot.constants.printer.skip_by_condition_msg
82max_steps : Final[int] = doot.config.on_fail(100_000).commands.run.max_steps()
83hide_empty_cleanup : Final[bool] = doot.config.on_fail(False).commands.run.hide_empty_cleanup() # noqa: FBT003
84
85SETUP_GROUP : Final[str] = "setup"
86ACTION_GROUP : Final[str] = "actions"
87FAIL_GROUP : Final[str] = "on_fail"
88DEPENDS_GROUP : Final[str] = "depends_on"
89
90##--|
91
[docs]
92class ActionExecutor:
93 """ An internal object handling the logic of running action(groups) of a task """
94
[docs]
95 def execute_action_group(self, task:Task_p, *, group:str, large_step:int) -> Maybe[tuple[int, ActRE, list]]:
96 """ Execute a group of actions, possibly queue any task specs they produced,
97 and return a count of the actions run + the result
98 """
99 to_queue : list[TaskName_p|TaskSpec_i|DelayedSpec]
100 group_result : ActRE
101 actions : Iterable[ActionSpec_i]
102 executed_count : int
103 ##--|
104 actions = task.get_action_group(group)
105
106 if not bool(actions):
107 return None
108
109 group_result = ActRE.SUCCESS
110 to_queue = []
111 executed_count = 0
112
113 for action in self.skip_relation_specs(actions):
114 match self.execute_action(large_step, executed_count, action, task, group=group):
115 case True | None:
116 continue
117 case list() as result:
118 to_queue += result
119 case False:
120 group_result = ActRE.FAIL
121 break
122 case ActRE.SKIP:
123 doot.report.wf.act("skip", skip_msg)
124 group_result = ActRE.SKIP
125 break
126
127 executed_count += 1
128
129 else: # no break.
130 pass
131
132 return executed_count, group_result, to_queue
133
[docs]
134 def skip_relation_specs(self, actions:Iterable) -> Iterator:
135 """ return of True signals the action is a relationspec, so is to be ignored """
136 for action in actions:
137 match action:
138 case RelationSpec():
139 pass
140 case ActionSpec() as act:
141 yield act
142 case _:
143 raise doot.errors.TaskError("Task Failed: Bad Action: %s", repr(action))
144
[docs]
145 def execute_action(self, large_step:int, count:int, action:ActionSpec_i, task:Task_p, group:Maybe[str]=None) -> ActRE|list:
146 """ Run the given action of a specific task.
147
148 returns either a list of specs to (potentially) queue,
149 or an ActRE describing the action result.
150
151 """
152 result : ActRE|list
153 ##--|
154 task.internal_state['_action_step'] = count
155 match group:
156 case str():
157 doot.report.wf.act(f"{large_step}.{group}.{count}", str(action.do))
158 case None:
159 doot.report.wf.act(f"{large_step}._.{count}", str(action.do))
160
161 logging.debug("Action Executing for Task: %s", task.name)
162 logging.debug("Action State: %s.%s: args=%s kwargs=%s. state(size)=%s", large_step, count, action.args, dict(action.kwargs), len(task.internal_state.keys()))
163 response = action(task.internal_state)
164 match response:
165 case None | True:
166 result = ActRE.SUCCESS
167 case False | ActRE.FAIL:
168 raise doot.errors.TaskFailed("Task %s: Action Failed: %s", task.name, action.do, task=task.spec)
169 case ActRE.SKIP as result:
170 # result will be returned, and expand_job/execute_task will handle it
171 pass
172 case dict() as data: # update the task's state
173 task.internal_state.update({str(k):v for k,v in data.items()})
174 result = ActRE.SUCCESS
175 case list() as data if isinstance(task, Job_p):
176 result = data
177 case x:
178 raise doot.errors.TaskError("Task %s: Action %s Failed: Returned an unplanned for value: %s", task.name, action.do, x, task=task.spec)
179
180 return result
181
[docs]
182 def test_conditions(self, task:Task_p, *, large_step:int) -> bool:
183 """ run a task's depends_on group, coercing to a bool
184 returns False if the runner should skip the rest of the task
185 """
186 match self.execute_action_group(task, group=DEPENDS_GROUP, large_step=large_step):
187 case None:
188 return True
189 case _, ActRE.SKIP | ActRE.FAIL, _:
190 return False
191 case _:
192 return True
193##--|
194
[docs]
195@Proto(WorkflowRunner_p, check=False)
196@Mixin(RU._RunnerCtx_m, RU._RunnerHandlers_m, None)
197class DootRunner:
198 """ The simplest single threaded task runner """
199
200 large_step : int
201 tracker : WorkflowTracker_p
202 teardown_list : list
203 executor : ActionExecutor
204
205 def __init__(self:Self, *, tracker:WorkflowTracker_p, executor:Maybe[ActionExecutor]=None):
206 super().__init__()
207 self.large_step = 0
208 self.tracker = tracker
209 self.executor = executor or ActionExecutor()
210 self.teardown_list = [] # list of tasks to teardown
211
212 def __call__(self, *tasks:str, handler:Maybe[API.Handler]=None): #noqa: ARG002
213 """ tasks are initial targets to run.
214 so loop on the tracker, getting the next task,
215 running its actions,
216 and repeating,
217 until done
218
219 if task is a job, it is expanded and added into the tracker
220 """
221 match handler:
222 case True:
223 handler = SignalHandler()
224 case False:
225 handler = NullHandler()
226 case type() as x:
227 handler = x()
228 case x if hasattr(x, "__enter__"):
229 handler = x
230 case _:
231 handler = nullcontext()
232
233 assert(isinstance(handler, ContextManager))
234 with handler:
235 while bool(self.tracker) and self.large_step < max_steps:
236 self.run_next_task()
237 else:
238 pass
239
[docs]
240 def run_next_task(self) -> None:
241 """
242 Get the next task from the tracker, expand/run it,
243 and handle the result/failure
244 """
245 task : Maybe[Task_p|Artifact_i] = None
246 try:
247 match (task:=self.tracker.next_for()):
248 case None:
249 pass
250 case TaskArtifact():
251 self.notify_artifact(task)
252 case Job_p():
253 self.expand_job(task)
254 case Task_p():
255 self.execute_task(task)
256 case x:
257 doot.report.gen.error("Unknown Value provided to runner: %s", x)
258 except doot.errors.TaskError as err:
259 err.task = task
260 self.handle_failure(err)
261 except doot.errors.DootError as err:
262 self.handle_failure(err)
263 except Exception as err:
264 doot.report.wf.fail(info="Exception", msg=str(err))
265 self.tracker.clear()
266 raise
267 else:
268 self.handle_success(task)
269 self.sleep_after(task)
270 self.large_step += 1
271
[docs]
272 def expand_job(self, job:Job_p) -> None:
273 """ turn a job into all of its tasks, including teardowns """
274 logmod.debug("-- Expanding Job %s: %s", self.large_step, job.name)
275 assert(isinstance(job, Job_p))
276 try:
277 doot.report.wf.branch(job.spec.name, info=f"Job {self.large_step}")
278 if not self.executor.test_conditions(job, large_step=self.large_step):
279 return
280
281 self.executor.execute_action_group(job, group=SETUP_GROUP, large_step=self.large_step)
282 match self.executor.execute_action_group(job, group=ACTION_GROUP, large_step=self.large_step):
283 case None:
284 pass
285 case int(), ActRE(), [*xs]:
286 self._queue_more_tasks(job.name, xs)
287 except doot.errors.DootError as err:
288 self.executor.execute_action_group(job, group=FAIL_GROUP, large_step=self.large_step)
289 raise
290
[docs]
291 def execute_task(self, task:Task_p) -> None:
292 """ execute a single task's actions """
293 logmod.debug("-- Expanding Task %s: %s", self.large_step, task.name)
294 assert(not isinstance(task, Job_p))
295 try:
296 doot.report.wf.branch(task.spec.name, info=f"Task {self.large_step}")
297 if not self.executor.test_conditions(task, large_step=self.large_step):
298 return
299
300 self.executor.execute_action_group(task, group=SETUP_GROUP, large_step=self.large_step)
301 self.executor.execute_action_group(task, group=ACTION_GROUP, large_step=self.large_step)
302 except doot.errors.DootError as err:
303 self.executor.execute_action_group(task, group=FAIL_GROUP, large_step=self.large_step)
304 raise
305
[docs]
306 def _queue_more_tasks(self, source:TaskName_p, new_tasks:list) -> None:
307 """ When 'allowed', an action group can queue more tasks in the tracker,
308 can return a new ActRE to describe the result status of this group
309 """
310 new_nodes : list[TaskName_p] = []
311 failures = []
312 for spec in new_tasks:
313 match self.tracker.queue(spec):
314 case None:
315 failures.append(spec.name)
316 case TaskName_p() as x:
317 new_nodes.append(x)
318
319 if bool(failures):
320 raise doot.errors.JobExpansionError("Queuing generated specs failed", source, failures)
321
322 if bool(new_nodes):
323 self.tracker.build(sources=new_nodes) # type: ignore[arg-type]
324
325 ##--| handlers
326
[docs]
327 def handle_success[T:Task_p|Artifact_i](self, task:Maybe[T]) -> Maybe[T]:
328 raise NotImplementedError()
329
[docs]
330 def handle_failure(self, failure:Exception) -> None:
331 raise NotImplementedError()
332
[docs]
333 def sleep_after[T:Task_p|Artifact_i](self, task:Maybe[T]) -> None:
334 raise NotImplementedError()