1#!/usr/bin/env python3
2"""
3
4"""
5# mypy: disable-error-code="attr-defined"
6# Imports:
7from __future__ import annotations
8
9# ##-- stdlib imports
10import datetime
11import enum
12import functools as ftz
13import itertools as itz
14import logging as logmod
15import pathlib as pl
16import re
17import time
18import types
19import weakref
20from collections import defaultdict
21from uuid import UUID, uuid1
22
23# ##-- end stdlib imports
24
25# ##-- 3rd party imports
26from jgdv.debugging import SignalHandler
27# ##-- end 3rd party imports
28
29# ##-- 1st party imports
30import doot
31import doot.errors
32from doot.workflow._interface import ActionResponse_e as ActRE
33from doot.workflow._interface import TaskStatus_e
34from doot.workflow import ActionSpec, TaskSpec, TaskArtifact
35from doot.workflow._interface import Task_p, Artifact_i
36
37# ##-- end 1st party imports
38
39# ##-- types
40# isort: off
41import abc
42import collections.abc
43from typing import TYPE_CHECKING, cast, assert_type, assert_never
44from typing import Generic, NewType
45# Protocols:
46from typing import Protocol, runtime_checkable
47# Typing Decorators:
48from typing import no_type_check, final, override, overload
49
50if TYPE_CHECKING:
51 from jgdv import Maybe, Traceback
52 from typing import Final
53 from typing import ClassVar, Any, LiteralString
54 from typing import Never, Self, Literal
55 from typing import TypeGuard
56 from collections.abc import Iterable, Iterator, Callable, Generator
57 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
58
59 from ._interface import WorkflowRunner_p
60
61
62##--|
63# isort: on
64# ##-- end types
65
66##-- logging
67logging = logmod.getLogger(__name__)
68##-- end logging
69
70dry_run : Final[bool] = doot.args.on_fail(False).cmd.args.dry_run() # noqa: FBT003
71max_steps : Final[int] = doot.config.on_fail(100_000).commands.run.max_steps()
72fail_prefix : Final[str] = doot.constants.printer.fail_prefix
73loop_entry_msg : Final[str] = doot.constants.printer.loop_entry
74loop_exit_msg : Final[str] = doot.constants.printer.loop_exit
75
76DEFAULT_SLEEP_LENGTH : Final[int|float] = doot.config.on_fail(0.2, int|float).commands.run.sleep.task()
77##--|
78
[docs]
79class _RunnerCtx_m:
80 """ Mixin for a runner that adds ctx manager functionality """
81
82 _signal_failure : Maybe[doot.errors.DootError]
83
84 def __init__(self, *args, **kwargs) -> None: # noqa: ANN002, ANN003
85 super().__init__(*args, **kwargs) # type: ignore[safe-super]
86 self._enter_msg = loop_entry_msg
87 self._exit_msg = loop_exit_msg
88 self._signal_failure = None
89
90 def __enter__(self:WorkflowRunner_p) -> WorkflowRunner_p:
91 logging.info("Entering Runner Control")
92 doot.report.gen.trace("Building Task Network...")
93 doot.report.gen.gap()
94 self.tracker.build()
95 doot.report.gen.trace("Task Network Built.")
96 doot.report.gen.detail("Network Composition: %s Nodes, %s Edges, %s Edges from Root.",
97 len(self.tracker.network.nodes),
98 len(self.tracker.network.edges),
99 len(self.tracker.network.pred[self.tracker._root_node]))
100 doot.report.gen.trace("Validating Task Network...")
101 doot.report.gen.gap()
102 self.tracker.validate()
103 doot.report.gen.trace("Validation Complete.")
104 doot.report.gen.line(self._enter_msg)
105 doot.report.wf.root()
106 return self
107
108 def __exit__(self:WorkflowRunner_p, exc_type:type[Exception], exc_value:Exception, exc_traceback:Traceback) -> Literal[False]:
109 logging.info("Exiting Runner Control")
110 # TODO handle exc_types?
111 self._finish()
112 return False
113
[docs]
114 def _finish(self:WorkflowRunner_p) -> None:
115 """finish running tasks, summarizing results using the reporter
116 separate from __exit__ to allow it to be overridden
117 """
118 logging.info("Running Completed")
119 if self.large_step >= max_steps:
120 doot.report.gen.warn("Runner Hit the Step Limit: %s", max_steps)
121
122 doot.report.wf.finished().gap().line(self._exit_msg)
123 match self._signal_failure:
124 case None:
125 return
126 case doot.errors.DootError():
127 raise self._signal_failure
128
[docs]
129class _RunnerHandlers_m:
130 """ Mixin for runners with default handlers """
131
132 _signal_failure : Maybe[doot.errors.DootError]
133
[docs]
134 def handle_success[T:Maybe[Task_p|TaskArtifact]](self:WorkflowRunner_p, task:T) -> T:
135 """ The basic success handler. just informs the tracker of the success """
136 match task:
137 case None:
138 pass
139 case TaskArtifact() as art:
140 doot.report.wf.result([str(art.path)], info="Success")
141 case Task_p():
142 doot.report.wf.result([task.name[:]], info="Success")
143 self.tracker.set_status(task.name, TaskStatus_e.SUCCESS)
144 return task
145
[docs]
146 def handle_failure(self:WorkflowRunner_p, failure:Exception) -> None:
147 """ The basic failure handler.
148 Triggers a breakpoint on Interrupt,
149 otherwise informs the tracker of the failure.
150
151 Halts any failed or errored tasks, which propagates to any successors
152 Fails any DootErrors, TrackingErrors, and non-doot errors
153
154 the tracker handle's clearing itself and shutting down
155 """
156 match failure:
157 case doot.errors.Interrupt():
158 breakpoint()
159 pass
160 case doot.errors.TaskFailed() as err:
161 self._signal_failure = err
162 doot.report.gen.warn("%s Halting: %s", fail_prefix, err)
163 self.tracker.set_status(err.task, TaskStatus_e.HALTED)
164 case doot.errors.TaskError() as err:
165 doot.report.wf.fail()
166 self._signal_failure = err
167 self.tracker.set_status(err.task, TaskStatus_e.FAILED)
168 raise err
169 case doot.errors.TrackingError() as err:
170 doot.report.wf.fail()
171 self._signal_failure = err
172 raise err
173 case doot.errors.DootError() as err:
174 doot.report.wf.fail()
175 self._signal_failure = err
176 raise err
177 case err:
178 doot.report.wf.fail()
179 self._signal_failure = doot.errors.DootError("Unknown Failure")
180 doot.report.gen.error("%s Unknown failure occurred: %s", fail_prefix, failure)
181 raise err
182
[docs]
183 def notify_artifact(self:WorkflowRunner_p, art:TaskArtifact) -> None:
184 """ A No-op for when the tracker gives an artifact """
185 doot.report.wf.result(["Artifact: %s", art])
186 raise doot.errors.StateError("Artifact resolutely does not exist", art)
187
[docs]
188 def sleep_after(self:WorkflowRunner_p, task:Maybe[Task_p|Artifact_i]) -> None:
189 """
190 The runner's sleep method, which spaces out tasks
191 """
192 match task:
193 case None:
194 return
195 case TaskArtifact():
196 return
197
198 assert(isinstance(task, Task_p))
199 sleep_len = task.spec.extra.on_fail(DEFAULT_SLEEP_LENGTH, int|float).sleep()
200 doot.report.gen.detail("[Sleeping (%s)...]", sleep_len, extra={"colour":"white"})
201 time.sleep(sleep_len)