Source code for doot.workflow.task

  1#!/usr/bin/env python3
  2"""
  3A Utility implementation of most of what a task needs
  4"""
  5# mypy: disable-error-code="attr-defined"
  6# Imports:
  7from __future__ import annotations
  8
  9# ##-- stdlib imports
 10import datetime
 11import enum
 12import functools as ftz
 13import itertools as itz
 14import logging as logmod
 15import pathlib as pl
 16import re
 17import time
 18from copy import deepcopy
 19from uuid import UUID, uuid1
 20from weakref import ref
 21
 22# ##-- end stdlib imports
 23
 24# ##-- 3rd party imports
 25from jgdv import Mixin, Proto
 26from jgdv.cli import ParamSpecMaker_m
 27from jgdv.structs.strang import CodeReference
 28
 29# ##-- end 3rd party imports
 30
 31# ##-- 1st party imports
 32import doot
 33import doot.errors
 34from doot.errors import StructLoadError, TaskError
 35
 36# ##-- end 1st party imports
 37
 38# ##-| Local
 39from ._interface import (Action_p, Job_p, QueueMeta_e, Task_p, RelationSpec_i,
 40                         TaskMeta_e, TaskStatus_e, TaskSpec_i, TaskName_p, ActionSpec_i)
 41from .structs import RelationSpec, TaskArtifact, ActionSpec, TaskName
 42
 43# # End of Imports.
 44
 45# ##-- types
 46# isort: off
 47import abc
 48import collections.abc
 49from typing import TYPE_CHECKING, cast, assert_type, assert_never
 50from typing import Generic, NewType
 51# Protocols:
 52from typing import Protocol, runtime_checkable
 53# Typing Decorators:
 54from typing import no_type_check, final, override, overload
 55from types import LambdaType
 56
 57if TYPE_CHECKING:
 58    from jgdv import Maybe, Lambda
 59    from typing import Final
 60    from typing import ClassVar, Any, LiteralString
 61    from typing import Never, Self, Literal
 62    from typing import TypeGuard
 63    from collections.abc import Iterable, Iterator, Callable, Generator
 64    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 65
 66    from jgdv._abstract.protocols.general import SpecStruct_p
 67    from jgdv.cli import ParamSpec
 68    from doot.cmds.structs.task_stub import TaskStub
 69    from . import TaskSpec
 70
 71# isort: on
 72# ##-- end types
 73
 74##-- logging
 75logging = logmod.getLogger(__name__)
 76##-- end logging
 77
 78TASK_ALISES                    = doot.aliases.task
 79PRINT_LOCATIONS                = doot.constants.printer.PRINT_LOCATIONS
 80STATE_TASK_NAME_K : Final[str] = doot.constants.patterns.STATE_TASK_NAME_K
 81
[docs] 82class _TaskActionPrep_m: 83
[docs] 84 def prepare_actions(self) -> None: 85 """ if the task/action spec requires particular action ctors, load them. 86 if the action spec doesn't have a ctor, use the task's action_ctor 87 88 collects any action errors together, then raises them as a task error 89 """ 90 logging.debug("Preparing Actions: %s", self.name) 91 failed : list[Exception] = [] 92 for action_spec in self.spec.action_group_elements(): 93 match action_spec: 94 case RelationSpec(): 95 pass 96 case ActionSpec() if action_spec.fun is not None: 97 pass 98 case ActionSpec() if action_spec.do is not None: 99 try: 100 action_spec.set_function() 101 except (doot.errors.StructError, ImportError) as err: 102 failed.append(err) 103 case ActionSpec(): 104 action_spec.set_function(fun=self.action_ctor) 105 case _: 106 failed.append(doot.errors.TaskError("Unknown element in action group: ", action_spec, self.name)) 107 else: 108 match failed: 109 case []: 110 pass 111 case [*xs]: 112 raise doot.errors.TaskError("Action Spec preparation failures", self.name[:], xs)
113
[docs] 114class _TaskProperties_m: 115
[docs] 116 @classmethod 117 def param_specs(cls) -> list[ParamSpec]: 118 """ make class parameter specs """ 119 return [ 120 cls.build_param(name="--help", default=False, implicit=True), 121 cls.build_param(name="--debug", default=False, implicit=True), 122 cls.build_param(name="--verbose", default=0, type=int, implicit=True), 123 ]
124
[docs] 125 @property 126 def name(self) -> TaskName: 127 return self.spec.name
128
[docs] 129 @property 130 def short_doc(self) -> str: 131 """ Generate Job Class 1 line help string """ 132 if self.__class__.__doc__ is None: 133 return ":: " 134 try: 135 split_doc = [x for x in self.__class__.__doc__.split("\n") if bool(x)] 136 return ":: " + split_doc[0].strip() if bool(split_doc) else "" 137 except AttributeError: 138 return ":: "
139
[docs] 140 @property 141 def doc(self) -> list[str]: 142 return self.spec.doc or self._help
143
[docs] 144 @property 145 def is_stale(self) -> bool: 146 return False
147
[docs] 148class _TaskStubbing_m: 149
[docs] 150 @classmethod 151 def stub_class(cls, stub:TaskStub) -> TaskStub: 152 """ Create a basic toml stub for this task""" 153 if bool(list(filter(lambda x: x[0] == "task", TASK_ALISES))): 154 stub.ctor = "task" 155 else: 156 stub.ctor = cls 157 158 # Come first 159 stub['required_for'].priority = -90 160 stub['depends_on'].priority = -100 161 162 stub['priority'].default = 10 163 stub['queue_behaviour'].default = "default" 164 stub['queue_behaviour'].comment = " | ".join({x.name for x in QueueMeta_e}) 165 stub['flags'].comment = " | ".join({x.name for x in TaskMeta_e}) 166 return stub
167
[docs] 168 def stub_instance(self, stub:TaskStub) -> TaskStub: 169 """ extend the class toml stub with details from this instance """ 170 stub['name'].default = self.name.de_uniq() 171 if bool(self.doc): 172 stub['doc'].default = self.doc[:] 173 else: 174 stub['doc'].default = [] 175 stub['flags'].default = self.spec.flags 176 177 return stub
178
[docs] 179class _TaskHelp_m: 180
[docs] 181 @classmethod 182 def class_help(cls) -> list[str]: 183 """ Task *class* help. """ 184 help_lines = [f"Task : {cls.__qualname__} v{cls._version}", ""] 185 mro = " -> ".join(x.__name__ for x in cls.mro()) 186 help_lines.append(f"Task MRO: {mro}") 187 help_lines.append("") 188 help_lines += cls._help 189 190 return help_lines
191 192##--| 193
[docs] 194@Proto(Task_p, check=True) 195@Mixin(ParamSpecMaker_m, _TaskProperties_m, _TaskStubbing_m, _TaskHelp_m, _TaskActionPrep_m) 196class DootTask: 197 """ 198 The simplest task, which can import action classes. 199 eg: 200 actions = [ {do = "doot.workflow.actions.shell_action:DootShellAction", args = ["echo", "this is a test"] } ] 201 202 Actions are imported upon task creation. 203 """ 204 Flags : ClassVar[type[TaskMeta_e]] = TaskMeta_e 205 INITIAL_STATE : ClassVar[TaskStatus_e] = TaskStatus_e.INIT 206 COMPLETE_STATES : ClassVar[set[TaskStatus_e]] = {TaskStatus_e.SUCCESS} 207 _default_flags : ClassVar = {TaskMeta_e.TASK} 208 action_ctor : type 209 _help : tuple[str, ...] = tuple(["The Simplest Task"]) 210 _version : str = "0.1" 211 _internal_state : dict 212 213 def __init__(self, spec:TaskSpec_i, *, action_ctor:Maybe[Callable]=None, **kwargs:Any): # noqa: ARG002 214 self.flags = TaskMeta_e.TASK 215 self._internal_state = dict(spec.extra) 216 self._spec = spec 217 self._priority = self.spec.priority 218 self._status = DootTask.INITIAL_STATE 219 220 self._internal_state[STATE_TASK_NAME_K] = self.spec.name 221 self._internal_state['_action_step'] = 0 222 223 match action_ctor: 224 case None: 225 from .actions import DootBaseAction # noqa: PLC0415 226 self.action_ctor = DootBaseAction 227 case type() as x: 228 self.action_ctor = x 229 case x: 230 raise TypeError(type(x)) 231 232 233 ##--| dunders 234 235 @override 236 def __repr__(self) -> str: 237 cls = self.__class__.__qualname__ 238 return f"<{cls}: {self.name.de_uniq()}>" 239 240 def __bool__(self) -> bool: 241 return self.status in DootTask.COMPLETE_STATES 242 243 @override 244 def __hash__(self) -> int: 245 return hash(self.name) 246 247 def __lt__(self, other:TaskName_p|Task_p) -> bool: 248 """ Task A < Task B if B ∈ A.depends_on """ 249 match other: 250 case TaskName_p(): 251 name = other.name 252 case Task_p(): 253 name = other.spec.name 254 case x: 255 raise TypeError(type(x)) 256 return any(name in x.target for x in self.spec.depends_on if isinstance(x, RelationSpec_i)) 257 258 @override 259 def __eq__(self, other:object) -> bool: 260 match other: 261 case str() | TaskName(): 262 return self.name == other 263 case Task_p(): 264 return self.name == other.name 265 case _: 266 return False 267 268 ##--| properties 269
[docs] 270 @property 271 def name(self) -> TaskName_p: 272 return self.spec.name
273
[docs] 274 @property 275 def spec(self) -> TaskSpec_i: 276 return self._spec
277 278 @property 279 def status(self) -> TaskStatus_e: 280 return self._status 281 282
[docs] 283 @status.setter 284 def status(self, val:TaskStatus_e) -> None: 285 self._status = val
286 287 @property 288 def priority(self) -> int: 289 return self._priority 290
[docs] 291 @priority.setter 292 def priority(self, val:int) -> None: 293 self._priority = val
294
[docs] 295 @property 296 def internal_state(self) -> dict: 297 return self._internal_state
298 ##--| methods 299
[docs] 300 def prepare_actions(self) -> None: 301 """ if the task/action spec requires particular action ctors, load them. 302 if the action spec doesn't have a ctor, use the task's action_ctor 303 304 collects any action errors together, then raises them as a task error 305 """ 306 logging.debug("Preparing Actions: %s", self.name) 307 failed : list[Exception] = [] 308 for action_spec in self.spec.action_group_elements(): 309 match action_spec: 310 case RelationSpec_i(): 311 pass 312 case ActionSpec_i() if action_spec.fun is not None: 313 pass 314 case ActionSpec_i() if action_spec.do is not None: 315 try: 316 action_spec.set_function() 317 except (doot.errors.StructError, ImportError) as err: 318 failed.append(err) 319 case ActionSpec_i(): 320 action_spec.set_function(fun=self.action_ctor) 321 case _: 322 failed.append(doot.errors.TaskError("Unknown element in action group: ", action_spec, self.name)) 323 else: 324 match failed: 325 case []: 326 pass 327 case [*xs]: 328 raise doot.errors.TaskError("Action Spec preparation failures", self.name[:], xs)
329
[docs] 330 def log(self, msg:str|Lambda|list, level:int=logmod.DEBUG, prefix:Maybe[str]=None) -> None: 331 """ 332 utility method to log a message, useful as tasks are running 333 """ 334 prefix = prefix or "" 335 assert(prefix is not None) 336 lines : list[str] = [] 337 match msg: 338 case str(): 339 lines.append(msg) 340 case LambdaType(): 341 lines.append(msg()) 342 case [LambdaType()]: 343 lines += msg[0]() 344 case list(): 345 lines += msg 346 347 for line in lines: 348 logging.log(level, prefix + str(line))
349
[docs] 350 def get_action_group(self, group_name:str) -> list[ActionSpec_i]: 351 if not bool(group_name): 352 raise TaskError("Tried to retrieve an empty groupname") 353 if hasattr(self, group_name): 354 return getattr(self, group_name) 355 if hasattr(self.spec, group_name): 356 return getattr(self.spec, group_name) 357 358 logging.warning("Unknown Groupname: %s", group_name) 359 return []