Source code for doot.workflow._interface

  1"""
  2Tasks are the main abstractions managed by Doot
  3
  4  - JOBS create tasks
  5  - TASKS have actions
  6  - ACTIONS are individual atomic steps of a task, given the detailed information necessary to perform the step.
  7
  8Jobs, as they can control refication order, can add setup and teardown tasks.
  9This can allow interleaving, or grouping.
 10
 11  Communication paths:
 12  Job  -> Task   : by creation
 13  Task -> Action : by creation
 14  Action -> Task : by return value, updating task state dict
 15  Task -> Job    : by reference to the job
 16
 17  Task -> Task     = Postboxes
 18  Action -> Action = Action -> Task State -> Action
 19
 20"""
 21# Imports:
 22from __future__ import annotations
 23
 24# ##-- stdlib imports
 25import datetime
 26import enum
 27import functools as ftz
 28import itertools as itz
 29import logging as logmod
 30from uuid import UUID, uuid1
 31
 32# ##-- end stdlib imports
 33
 34# ##-- 3rd party imports
 35from jgdv.structs.strang._interface import Strang_p
 36from jgdv.structs.locator._interface import Location_p
 37# ##-- end 3rd party imports
 38
 39# ##-- 1st party imports
 40import doot
 41import doot.errors
 42# ##-- end 1st party imports
 43
 44# ##-- types
 45# isort: off
 46import abc
 47import collections.abc
 48from typing import TYPE_CHECKING, cast, assert_type, assert_never
 49from typing import Generic, NewType, Any
 50# Protocols:
 51from typing import Protocol, runtime_checkable
 52# Typing Decorators:
 53from typing import no_type_check, final, override, overload
 54# Other:
 55from jgdv._abstract.protocols.general import SpecStruct_p, Buildable_p
 56
 57if TYPE_CHECKING:
 58    from jgdv.structs.chainguard import ChainGuard
 59    from jgdv.structs.strang import CodeReference
 60    import pathlib as pl
 61    from jgdv import Maybe, Func
 62    from typing import Final
 63    from typing import ClassVar, LiteralString
 64    from typing import Never, Self, Literal
 65    from typing import TypeGuard
 66    from collections.abc import Iterable, Iterator, Callable, Generator
 67    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 68    from jgdv.cli._interface import ParamSpec_p
 69    from jgdv._abstract.protocols.general import StubStruct_p
 70
 71    from doot.workflow import ActionSpec, TaskName
 72    type ActionReturn = Maybe[dict|bool|ActionResponse_e]
 73    type RelationTarget  = TaskName_p|Artifact_i
 74    type RelationMark    = RelationMeta_e
 75
 76# isort: on
 77# ##-- end types
 78
 79##-- logging
 80logging = logmod.getLogger(__name__)
 81##-- end logging
 82
 83##--| Vars
 84CLI_K             : Final[str]        = "cli"
 85DASH_S            : Final[str]        = "-"
 86DEFAULT_JOB       : Final[str]        = "job"
 87GROUP_K           : Final[str]        = "group"
 88META_K            : Final[str]        = "meta"
 89MUST_INJECT_K     : Final[str]        = "must_inject"
 90NAME_K            : Final[str]        = "name"
 91NONE_S            : Final[str]        = "None"
 92SPECIAL_KEYS      : Final[list[str]]  = [CLI_K, MUST_INJECT_K]
 93SUFFIX_K          : Final[str]        = "_add_suffix"
 94USCORE_S          : Final[str]        = "_"
 95DEFAULT_PRIORITY  : Final[int]        = 10
 96##--| Enum Protocols:
 97
[docs] 98class Status_ep(Protocol): 99
[docs] 100 @classmethod 101 def default(cls) -> Any: ...
102
[docs] 103 @classmethod 104 def pre_set(cls) -> set: ...
105
[docs] 106 @classmethod 107 def success_set(cls) -> set: ...
108
[docs] 109 @classmethod 110 def fail_set(cls) -> set: ...
111 112##--| Enums 113
[docs] 114class QueueMeta_e(enum.Enum): 115 """ available ways a task can be activated for running 116 onRegister/auto : activates automatically when added to the task network 117 reactive : activates if an adjacent node completes 118 119 default : activates only if uses queues the task, or its a dependencyOf 120 121 """ 122 123 default = enum.auto() 124 onRegister = enum.auto() # noqa: N815 125 reactive = enum.auto() 126 reactiveFail = enum.auto() # noqa: N815 127 auto = onRegister
128
[docs] 129class RelationMeta_e(enum.Enum): 130 """ 131 What types+synonyms of task relation there can be, 132 in the form Obj {rel} Y, 133 134 eg: cake dependsOn baking. 135 or: baking requirementFor cake. 136 or: eatingCake conflictsWith givingCake 137 """ 138 needs = enum.auto() 139 blocks = enum.auto() 140 # excludes = enum.auto() # noqa: ERA001 141
[docs] 142 @classmethod 143 def default(cls) -> RelationMeta_e: 144 return cls.needs
145
[docs] 146class TaskMeta_e(enum.StrEnum): 147 """ 148 Flags describing properties of a task, 149 stored in the Task_p instance itself. 150 """ 151 152 TASK = enum.auto() 153 JOB = enum.auto() 154 TRANSFORMER = enum.auto() 155 156 INTERNAL = enum.auto() 157 JOB_HEAD = enum.auto() 158 CONCRETE = enum.auto() 159 DISABLED = enum.auto() 160 161 EPHEMERAL = enum.auto() 162 IDEMPOTENT = enum.auto() 163 REQ_TEARDOWN = enum.auto() 164 REQ_SETUP = enum.auto() 165 IS_TEARDOWN = enum.auto() 166 IS_SETUP = enum.auto() 167 THREAD_SAFE = enum.auto() 168 STATEFUL = enum.auto() 169 STATELESS = enum.auto() 170 VERSIONED = enum.auto() 171
[docs] 172 @classmethod 173 def default(cls) -> Maybe: 174 return cls.TASK
175
[docs] 176class TaskStatus_e(enum.Enum): 177 """ 178 Enumeration of the different states a task/artifact can be in. 179 The state is stored in the task object itself. 180 181 Before a task object hsa been created, the tracker 182 provides the status according to what specs exist for the task name. 183 184 """ 185 # Pre-Task Object Creation statuses: 186 NAMED = enum.auto() # A Name, missing a spec 187 DECLARED = enum.auto() # Abstract Spec Exists 188 189 DEFINED = enum.auto() # Spec has been instantiated into the dependency network 190 191 # Task Object Exists 192 DISABLED = enum.auto() # Artificial state for if a spec or task has been disabled. 193 INIT = enum.auto() # Task Object has been created. 194 WAIT = enum.auto() # Task is awaiting dependency check and pass 195 READY = enum.auto() # Dependencies are done, ready to execute/expand. 196 RUNNING = enum.auto() # Has been given to the runner, waiting for a status update. 197 SKIPPED = enum.auto() # Runner has signaled the task was skipped. 198 HALTED = enum.auto() # Task has reached minimum priority, timing out. 199 FAILED = enum.auto() # Runner has signaled Failure. 200 SUCCESS = enum.auto() # Runner has signaled success. 201 TEARDOWN = enum.auto() # Task is ready to be killed 202 DEAD = enum.auto() # Task is done. 203
[docs] 204 @classmethod 205 def default(cls) -> TaskStatus_e: 206 return cls.NAMED
207
[docs] 208 @classmethod 209 def pre_set(cls) -> set: 210 return {cls.NAMED, cls.DECLARED, cls.DEFINED}
211
[docs] 212 @classmethod 213 def success_set(cls) -> set: 214 return {cls.SUCCESS, cls.TEARDOWN, cls.DEAD}
215
[docs] 216 @classmethod 217 def fail_set(cls) -> set: 218 return {cls.SKIPPED, cls.HALTED, cls.FAILED}
219
[docs] 220class ArtifactStatus_e(enum.Enum): 221 """ States an artifact can be in """ 222 DECLARED = enum.auto() # doesn't exist or not checked 223 STALE = enum.auto() # Exists, but is old 224 TOCLEAN = enum.auto() # May exist, needs to be deleted 225 EXISTS = enum.auto() # Exists
226
[docs] 227class ActionResponse_e(enum.Enum): 228 """ 229 Description of how a Action went. 230 """ 231 232 SUCCEED = enum.auto() 233 FAIL = enum.auto() 234 SKIP = enum.auto() 235 SKIP_GROUP = enum.auto() 236 SKIP_TASK = enum.auto() 237 238 # Aliases 239 SUCCESS = SUCCEED
240 241##--| data 242
[docs] 243class DelayedSpec: 244 __slots__ = ("applied", "base", "inject", "overrides", "target") 245 246 base : TaskName_p 247 target : TaskName_p 248 # For from_spec injection 249 inject : list[InjectSpec_i] 250 # injection values applied from the creator 251 applied : dict 252 # Raw data applied over source 253 overrides : dict 254 255 def __init__(self, **kwargs:Any) -> None: 256 self.base = kwargs.pop("base") 257 self.target = kwargs.pop("target") 258 self.inject = [] 259 self.applied = kwargs.pop("applied", None) or {} 260 self.overrides = kwargs.pop("overrides") 261 match kwargs.pop("inject", []): 262 case None: 263 pass 264 case [*xs]: 265 self.inject += xs 266 case x: 267 self.inject.append(x) 268 assert(not bool(kwargs))
269##--| Spec Interfaces 270
[docs] 271@runtime_checkable 272class ActionSpec_i(Buildable_p, Protocol): 273 do : Maybe[CodeReference] 274 args : list[Any] 275 kwargs : ChainGuard 276 fun : Maybe[Func] 277 278 def __call__(self, *args:Any, **kwargs:Any) -> ActionReturn: ...
279
[docs] 280@runtime_checkable 281class InjectSpec_i(Buildable_p, Protocol): 282 from_spec : dict 283 from_state : dict 284 from_target : dict 285 literal : dict 286 with_suffix : Maybe[str] 287
[docs] 288 def apply_from_spec(self, parent:dict|TaskSpec_i|Task_p) -> dict: ...
289
[docs] 290 def apply_from_state(self, parent:dict|Task_p) -> dict: ...
291
[docs] 292 def apply_literal(self, val:Any) -> dict: ...
293
[docs] 294 def validate(self, control:Task_p|TaskSpec_i, target:Task_p|TaskSpec_i, *, only_spec:bool=False) -> bool: ...
295
[docs] 296 def validate_details(self, control:Task_p|TaskSpec_i, target:Task_p|TaskSpec_i, *, only_spec:bool=False) -> dict: ...
297
[docs] 298@runtime_checkable 299class RelationSpec_i(Protocol): 300 301 Marks : ClassVar[type[enum.Enum]] 302 ##--| 303 target : TaskName_p|Artifact_i 304 relation : RelationMeta_e 305 object : Maybe[TaskName_p|Artifact_i] 306 constraints : dict[str, str] 307 inject : Maybe[InjectSpec_i] 308 309 def __contains__(self, query:str|enum.Enum|TaskName_p|Artifact_i) -> bool: ... 310
[docs] 311 def to_ordered_pair(self, obj:RelationTarget, *, target:Maybe[TaskName_p]=None) -> tuple[Maybe[RelationTarget], Maybe[RelationTarget]]: ...
312
[docs] 313 def instantiate(self, *, obj:Maybe[RelationTarget]=None, target:Maybe[RelationTarget]=None) -> RelationSpec_i: ...
314
[docs] 315 def forward_dir_p(self) -> bool: ...
316
[docs] 317 def accepts(self, control:Task_i|TaskSpec_i, target:Task_i|TaskSpec_i) -> bool: ...
318
[docs] 319@runtime_checkable 320class TaskSpec_i(Protocol): 321 """ 322 The data spec of a task. is created from TOML data 323 """ 324 325 # task specific extras to use in state 326 _default_ctor : ClassVar[str] 327 # Action Groups that are depended on, rather than are dependencies of, this task: 328 _blocking_groups : ClassVar[tuple[str, ...]] 329 Marks : ClassVar[enum.Enum] 330 331 ##--| Core Instance Data 332 name : TaskName_p 333 doc : Maybe[list[str]] 334 sources : list[Maybe[TaskName_p|pl.Path]] 335 336 ##--| Default Action Groups 337 actions : list[ActionSpec_i] 338 required_for : list[ActionSpec_i|RelationSpec_i] 339 depends_on : list[ActionSpec_i|RelationSpec_i] 340 setup : list[ActionSpec_i|RelationSpec_i] 341 cleanup : list[ActionSpec_i|RelationSpec_i] 342 on_fail : list[ActionSpec_i|RelationSpec_i] 343 344 ##--| Any additional information: 345 version : str 346 priority : int 347 ctor : CodeReference 348 queue_behaviour : QueueMeta_e 349 meta : set[TaskMeta_e] 350
[docs] 351 @property 352 def extra(self) -> ChainGuard: ...
353
[docs] 354 def param_specs(self) -> list: ...
355
[docs] 356@runtime_checkable 357class Action_p(Protocol): 358 """ 359 holds individual action information and state, and executes it 360 """ 361 362 def __call__(self, spec:ActionSpec_i, task_state:dict) -> ActionReturn: 363 pass
364
[docs] 365@runtime_checkable 366class Artifact_i(Location_p, Protocol): 367 priority : int 368 369 @override 370 def __contains__(self, other:object) -> bool: ... 371
[docs] 372 def get_status(self) -> ArtifactStatus_e: ...
373
[docs] 374 def reify(self, other:pl.Path|Location_p) -> Maybe[Artifact_i]: ...
375##--| 376
[docs] 377@runtime_checkable 378class TaskName_p(Strang_p, Protocol): 379
[docs] 380 def with_head(self) -> Self: ...
381
[docs] 382 def is_head(self) -> bool: ...
383
[docs] 384 def with_cleanup(self) -> Self: 385 pass
386
[docs] 387 def is_cleanup(self) -> bool: 388 pass
389
[docs] 390 def pop_generated(self) -> Self: ...
391 392##--| Factory protocols 393
[docs] 394class TaskFactory_p(Protocol): 395 396 def __init__(self, *, spec_ctor:Maybe[type]=None, task_ctor:Maybe[type]=None, job_ctor:Maybe[type]=None): ... 397
[docs] 398 def build(self, data:ChainGuard|dict|TaskName_p|str) -> TaskSpec_i: ...
399
[docs] 400 def instantiate(self, obj:TaskSpec_i, *, extra:Maybe[Mapping|bool]=None) -> TaskSpec_i: ...
401
[docs] 402 def merge(self, *, bot:dict|TaskSpec_i, top:dict|TaskSpec_i, suffix:Maybe[str|Literal[False]]=None) -> TaskSpec_i: ...
403
[docs] 404 def make(self, obj:TaskSpec_i, *, ensure:Maybe=None, inject:Maybe[tuple[InjectSpec_i, Task_i]]=None, parent:Maybe[Task_i]=None) -> Task_i: ...
405
[docs] 406 def action_group_elements(self, obj:TaskSpec_i) -> Iterable[ActionSpec_i|RelationSpec_i]: ...
407
[docs] 408@runtime_checkable 409class SubTaskFactory_p(Protocol): 410
[docs] 411 def generate_names(self, obj:TaskSpec_i) -> list[TaskName_p]: ...
412
[docs] 413 def generate_specs(self, obj:TaskSpec_i|Artifact_i|DelayedSpec) -> list[dict]: ...
414 415##--| 416
[docs] 417@runtime_checkable 418class Task_p(Protocol): 419 420 def __init__(self, spec:TaskSpec_i) -> None: ... 421 422 ##--| dunders 423 424 @override 425 def __hash__(self): ... 426 427 def __lt__(self, other:TaskName_p|Task_p) -> bool: ... 428 """ Task A < Task B iff A ∈ B.run_after """ 429 430 @override 431 def __eq__(self, other:object) -> bool: ... 432 433 ##--| properties 434
[docs] 435 @property 436 def name(self) -> TaskName_p: ...
437
[docs] 438 @property 439 def spec(self) -> TaskSpec_i: ...
440 441 @property 442 def status(self) -> TaskStatus_e: ... 443
[docs] 444 @status.setter 445 def status(self, val:TaskStatus_e) -> None: ...
446 447 @property 448 def priority(self) -> int: ... 449
[docs] 450 @priority.setter 451 def priority(self, val:int) -> None: ...
452
[docs] 453 @property 454 def internal_state(self) -> dict: ...
455 ##--| other 456
[docs] 457 def log(self, msg:str, level:int=logmod.DEBUG, prefix:Maybe[str]=None) -> None: ...
458 """ utility method to log a message, useful as tasks are running """ 459
[docs] 460 def prepare_actions(self) -> None: ...
461
[docs] 462 def get_action_group(self, group_name:str) -> list[ActionSpec_i]: ...
463
[docs] 464@runtime_checkable 465class Job_p(Task_p, Protocol): 466 """ 467 builds tasks 468 """ 469
[docs] 470 def expand_job(self) -> list: 471 pass
472
[docs] 473@runtime_checkable 474class Task_i(Task_p, Protocol): 475 """ 476 Meta information for a task 477 """ 478 _default_flags : ClassVar[set[TaskMeta_e]] 479 480 _version : str 481 _help : tuple[str, ...] 482 doc : tuple[str, ...]