1#!/usr/bin/env python3
2"""
3A Utility implementation of most of what a task needs
4"""
5# mypy: disable-error-code="attr-defined"
6# Imports:
7from __future__ import annotations
8
9# ##-- stdlib imports
10import datetime
11import enum
12import functools as ftz
13import itertools as itz
14import logging as logmod
15import pathlib as pl
16import re
17import time
18from copy import deepcopy
19from uuid import UUID, uuid1
20from weakref import ref
21
22# ##-- end stdlib imports
23
24# ##-- 3rd party imports
25from jgdv import Mixin, Proto
26from jgdv.cli import ParamSpecMaker_m
27from jgdv.structs.strang import CodeReference
28
29# ##-- end 3rd party imports
30
31# ##-- 1st party imports
32import doot
33import doot.errors
34from doot.errors import StructLoadError, TaskError
35
36# ##-- end 1st party imports
37
38# ##-| Local
39from ._interface import (Action_p, Job_p, QueueMeta_e, Task_p, RelationSpec_i,
40 TaskMeta_e, TaskStatus_e, TaskSpec_i, TaskName_p, ActionSpec_i)
41from .structs import RelationSpec, TaskArtifact, ActionSpec, TaskName
42
43# # End of Imports.
44
45# ##-- types
46# isort: off
47import abc
48import collections.abc
49from typing import TYPE_CHECKING, cast, assert_type, assert_never
50from typing import Generic, NewType
51# Protocols:
52from typing import Protocol, runtime_checkable
53# Typing Decorators:
54from typing import no_type_check, final, override, overload
55from types import LambdaType
56
57if TYPE_CHECKING:
58 from jgdv import Maybe, Lambda
59 from typing import Final
60 from typing import ClassVar, Any, LiteralString
61 from typing import Never, Self, Literal
62 from typing import TypeGuard
63 from collections.abc import Iterable, Iterator, Callable, Generator
64 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
65
66 from jgdv._abstract.protocols.general import SpecStruct_p
67 from jgdv.cli import ParamSpec
68 from doot.cmds.structs.task_stub import TaskStub
69 from . import TaskSpec
70
71# isort: on
72# ##-- end types
73
74##-- logging
75logging = logmod.getLogger(__name__)
76##-- end logging
77
78TASK_ALISES = doot.aliases.task
79PRINT_LOCATIONS = doot.constants.printer.PRINT_LOCATIONS
80STATE_TASK_NAME_K : Final[str] = doot.constants.patterns.STATE_TASK_NAME_K
81
[docs]
82class _TaskActionPrep_m:
83
[docs]
84 def prepare_actions(self) -> None:
85 """ if the task/action spec requires particular action ctors, load them.
86 if the action spec doesn't have a ctor, use the task's action_ctor
87
88 collects any action errors together, then raises them as a task error
89 """
90 logging.debug("Preparing Actions: %s", self.name)
91 failed : list[Exception] = []
92 for action_spec in self.spec.action_group_elements():
93 match action_spec:
94 case RelationSpec():
95 pass
96 case ActionSpec() if action_spec.fun is not None:
97 pass
98 case ActionSpec() if action_spec.do is not None:
99 try:
100 action_spec.set_function()
101 except (doot.errors.StructError, ImportError) as err:
102 failed.append(err)
103 case ActionSpec():
104 action_spec.set_function(fun=self.action_ctor)
105 case _:
106 failed.append(doot.errors.TaskError("Unknown element in action group: ", action_spec, self.name))
107 else:
108 match failed:
109 case []:
110 pass
111 case [*xs]:
112 raise doot.errors.TaskError("Action Spec preparation failures", self.name[:], xs)
113
[docs]
114class _TaskProperties_m:
115
[docs]
116 @classmethod
117 def param_specs(cls) -> list[ParamSpec]:
118 """ make class parameter specs """
119 return [
120 cls.build_param(name="--help", default=False, implicit=True),
121 cls.build_param(name="--debug", default=False, implicit=True),
122 cls.build_param(name="--verbose", default=0, type=int, implicit=True),
123 ]
124
[docs]
125 @property
126 def name(self) -> TaskName:
127 return self.spec.name
128
[docs]
129 @property
130 def short_doc(self) -> str:
131 """ Generate Job Class 1 line help string """
132 if self.__class__.__doc__ is None:
133 return ":: "
134 try:
135 split_doc = [x for x in self.__class__.__doc__.split("\n") if bool(x)]
136 return ":: " + split_doc[0].strip() if bool(split_doc) else ""
137 except AttributeError:
138 return ":: "
139
[docs]
140 @property
141 def doc(self) -> list[str]:
142 return self.spec.doc or self._help
143
[docs]
144 @property
145 def is_stale(self) -> bool:
146 return False
147
[docs]
148class _TaskStubbing_m:
149
[docs]
150 @classmethod
151 def stub_class(cls, stub:TaskStub) -> TaskStub:
152 """ Create a basic toml stub for this task"""
153 if bool(list(filter(lambda x: x[0] == "task", TASK_ALISES))):
154 stub.ctor = "task"
155 else:
156 stub.ctor = cls
157
158 # Come first
159 stub['required_for'].priority = -90
160 stub['depends_on'].priority = -100
161
162 stub['priority'].default = 10
163 stub['queue_behaviour'].default = "default"
164 stub['queue_behaviour'].comment = " | ".join({x.name for x in QueueMeta_e})
165 stub['flags'].comment = " | ".join({x.name for x in TaskMeta_e})
166 return stub
167
[docs]
168 def stub_instance(self, stub:TaskStub) -> TaskStub:
169 """ extend the class toml stub with details from this instance """
170 stub['name'].default = self.name.de_uniq()
171 if bool(self.doc):
172 stub['doc'].default = self.doc[:]
173 else:
174 stub['doc'].default = []
175 stub['flags'].default = self.spec.flags
176
177 return stub
178
[docs]
179class _TaskHelp_m:
180
[docs]
181 @classmethod
182 def class_help(cls) -> list[str]:
183 """ Task *class* help. """
184 help_lines = [f"Task : {cls.__qualname__} v{cls._version}", ""]
185 mro = " -> ".join(x.__name__ for x in cls.mro())
186 help_lines.append(f"Task MRO: {mro}")
187 help_lines.append("")
188 help_lines += cls._help
189
190 return help_lines
191
192##--|
193
[docs]
194@Proto(Task_p, check=True)
195@Mixin(ParamSpecMaker_m, _TaskProperties_m, _TaskStubbing_m, _TaskHelp_m, _TaskActionPrep_m)
196class DootTask:
197 """
198 The simplest task, which can import action classes.
199 eg:
200 actions = [ {do = "doot.workflow.actions.shell_action:DootShellAction", args = ["echo", "this is a test"] } ]
201
202 Actions are imported upon task creation.
203 """
204 Flags : ClassVar[type[TaskMeta_e]] = TaskMeta_e
205 INITIAL_STATE : ClassVar[TaskStatus_e] = TaskStatus_e.INIT
206 COMPLETE_STATES : ClassVar[set[TaskStatus_e]] = {TaskStatus_e.SUCCESS}
207 _default_flags : ClassVar = {TaskMeta_e.TASK}
208 action_ctor : type
209 _help : tuple[str, ...] = tuple(["The Simplest Task"])
210 _version : str = "0.1"
211 _internal_state : dict
212
213 def __init__(self, spec:TaskSpec_i, *, action_ctor:Maybe[Callable]=None, **kwargs:Any): # noqa: ARG002
214 self.flags = TaskMeta_e.TASK
215 self._internal_state = dict(spec.extra)
216 self._spec = spec
217 self._priority = self.spec.priority
218 self._status = DootTask.INITIAL_STATE
219
220 self._internal_state[STATE_TASK_NAME_K] = self.spec.name
221 self._internal_state['_action_step'] = 0
222
223 match action_ctor:
224 case None:
225 from .actions import DootBaseAction # noqa: PLC0415
226 self.action_ctor = DootBaseAction
227 case type() as x:
228 self.action_ctor = x
229 case x:
230 raise TypeError(type(x))
231
232
233 ##--| dunders
234
235 @override
236 def __repr__(self) -> str:
237 cls = self.__class__.__qualname__
238 return f"<{cls}: {self.name.de_uniq()}>"
239
240 def __bool__(self) -> bool:
241 return self.status in DootTask.COMPLETE_STATES
242
243 @override
244 def __hash__(self) -> int:
245 return hash(self.name)
246
247 def __lt__(self, other:TaskName_p|Task_p) -> bool:
248 """ Task A < Task B if B ∈ A.depends_on """
249 match other:
250 case TaskName_p():
251 name = other.name
252 case Task_p():
253 name = other.spec.name
254 case x:
255 raise TypeError(type(x))
256 return any(name in x.target for x in self.spec.depends_on if isinstance(x, RelationSpec_i))
257
258 @override
259 def __eq__(self, other:object) -> bool:
260 match other:
261 case str() | TaskName():
262 return self.name == other
263 case Task_p():
264 return self.name == other.name
265 case _:
266 return False
267
268 ##--| properties
269
[docs]
270 @property
271 def name(self) -> TaskName_p:
272 return self.spec.name
273
[docs]
274 @property
275 def spec(self) -> TaskSpec_i:
276 return self._spec
277
278 @property
279 def status(self) -> TaskStatus_e:
280 return self._status
281
282
[docs]
283 @status.setter
284 def status(self, val:TaskStatus_e) -> None:
285 self._status = val
286
287 @property
288 def priority(self) -> int:
289 return self._priority
290
[docs]
291 @priority.setter
292 def priority(self, val:int) -> None:
293 self._priority = val
294
[docs]
295 @property
296 def internal_state(self) -> dict:
297 return self._internal_state
298 ##--| methods
299
[docs]
300 def prepare_actions(self) -> None:
301 """ if the task/action spec requires particular action ctors, load them.
302 if the action spec doesn't have a ctor, use the task's action_ctor
303
304 collects any action errors together, then raises them as a task error
305 """
306 logging.debug("Preparing Actions: %s", self.name)
307 failed : list[Exception] = []
308 for action_spec in self.spec.action_group_elements():
309 match action_spec:
310 case RelationSpec_i():
311 pass
312 case ActionSpec_i() if action_spec.fun is not None:
313 pass
314 case ActionSpec_i() if action_spec.do is not None:
315 try:
316 action_spec.set_function()
317 except (doot.errors.StructError, ImportError) as err:
318 failed.append(err)
319 case ActionSpec_i():
320 action_spec.set_function(fun=self.action_ctor)
321 case _:
322 failed.append(doot.errors.TaskError("Unknown element in action group: ", action_spec, self.name))
323 else:
324 match failed:
325 case []:
326 pass
327 case [*xs]:
328 raise doot.errors.TaskError("Action Spec preparation failures", self.name[:], xs)
329
[docs]
330 def log(self, msg:str|Lambda|list, level:int=logmod.DEBUG, prefix:Maybe[str]=None) -> None:
331 """
332 utility method to log a message, useful as tasks are running
333 """
334 prefix = prefix or ""
335 assert(prefix is not None)
336 lines : list[str] = []
337 match msg:
338 case str():
339 lines.append(msg)
340 case LambdaType():
341 lines.append(msg())
342 case [LambdaType()]:
343 lines += msg[0]()
344 case list():
345 lines += msg
346
347 for line in lines:
348 logging.log(level, prefix + str(line))
349
[docs]
350 def get_action_group(self, group_name:str) -> list[ActionSpec_i]:
351 if not bool(group_name):
352 raise TaskError("Tried to retrieve an empty groupname")
353 if hasattr(self, group_name):
354 return getattr(self, group_name)
355 if hasattr(self.spec, group_name):
356 return getattr(self.spec, group_name)
357
358 logging.warning("Unknown Groupname: %s", group_name)
359 return []