Source code for doot.workflow.factory

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# mypy: disable-error-code="attr-defined"
  6# ruff: noqa: N812
  7# Imports:
  8from __future__ import annotations
  9
 10# ##-- stdlib imports
 11import datetime
 12import functools as ftz
 13import importlib
 14import itertools as itz
 15import logging as logmod
 16import pathlib as pl
 17import re
 18import time
 19import types
 20import typing
 21import weakref
 22from importlib.metadata import EntryPoint
 23from uuid import UUID, uuid1
 24
 25# ##-- end stdlib imports
 26
 27# ##-- 3rd party imports
 28import jgdv.structs.strang.errors as StrangErrs
 29from jgdv import Mixin, Proto
 30from jgdv._abstract.protocols.general import Buildable_p, SpecStruct_p
 31from jgdv._abstract.protocols.pydantic import ProtocolModelMeta
 32from jgdv.cli import ParamSpec, ParamSpecMaker_m
 33from jgdv.structs.chainguard import ChainGuard
 34from jgdv.structs.dkey import DKey
 35from jgdv.structs.locator import Location
 36from jgdv.structs.strang import CodeReference
 37
 38# ##-- end 3rd party imports
 39
 40# ##-- 1st party imports
 41import doot
 42import doot.errors
 43from doot.workflow import (ActionSpec, DootJob, DootTask, InjectSpec,
 44                           RelationSpec, TaskArtifact, TaskName, TaskSpec)
 45from doot.workflow import _interface as API
 46from doot.workflow._interface import (ActionSpec_i, InjectSpec_i, Job_p,
 47                                      RelationMeta_e, RelationSpec_i, Task_i,
 48                                      Task_p, TaskMeta_e, TaskName_p,
 49                                      TaskSpec_i, Artifact_i)
 50
 51# ##-- end 1st party imports
 52
 53# ##-| Local
 54from ._interface import SubTaskFactory_p, TaskFactory_p, DelayedSpec
 55# # End of Imports.
 56
 57# ##-- types
 58# isort: off
 59import abc
 60import collections.abc
 61from typing import TYPE_CHECKING, cast, assert_type, assert_never
 62from typing import Generic, NewType, Any, Annotated, override
 63# Protocols:
 64from typing import Protocol, runtime_checkable
 65# Typing Decorators:
 66from typing import no_type_check, final, overload
 67from dataclasses import _MISSING_TYPE, InitVar, dataclass, field, fields
 68from pydantic import (BaseModel, BeforeValidator, Field, ValidationError,
 69                      ValidationInfo, ValidatorFunctionWrapHandler, ConfigDict,
 70                      WrapValidator, field_validator, model_validator)
 71
 72if TYPE_CHECKING:
 73    from jgdv import Maybe
 74    import enum
 75    from typing import Final
 76    from typing import ClassVar, LiteralString
 77    from typing import Never, Self, Literal, _SpecialType
 78    from typing import TypeGuard
 79    from collections.abc import Iterable, Iterator, Callable, Generator
 80    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 81
 82    type SpecialType = _SpecialType
 83
 84# isort: on
 85# ##-- end types
 86
 87##-- logging
 88logging = logmod.getLogger(__name__)
 89##-- end logging
 90
 91##--| Consts
 92DEFAULT_ALIAS     : Final[str]             = doot.constants.entrypoints.DEFAULT_TASK_CTOR_ALIAS
 93DEFAULT_BLOCKING  : Final[tuple[str, ...]] = ("required_for", "on_fail")
 94DEFAULT_RELATION   : Final[RelationMeta_e] = RelationMeta_e.default()
 95##--| Utils
 96
 97##--|
 98
[docs] 99@Proto(TaskFactory_p) 100class TaskFactory: 101 """ 102 Factory to create task specs, instantiate them, and make tasks 103 104 build : data -> spec 105 delay : data -> delayed -> spec 106 instantiate : spec -> spec(name=name[uuid]) 107 reify : spec,partial -> spec 108 over : orig,plus -> spec(plus<orig, name..<+>[uuid]) 109 under : orig,plus -> spec(orig<plus, name..<+>[uuid]) 110 make : spec -> task 111 112 """ 113 spec_ctor : type[TaskSpec_i] 114 task_ctor : type[Task_p] 115 job_ctor : type[Job_p] 116 117 def __init__(self, *, spec_ctor:Maybe[type]=None, task_ctor:Maybe[type]=None, job_ctor:Maybe[type]=None): 118 x : type[Any] 119 match spec_ctor: 120 case None: 121 match CodeReference(doot.aliases.task.spec)(): 122 case type() as ref: 123 self.spec_ctor = ref 124 case Exception() as err: 125 raise err 126 case type() as x: 127 self.spec_ctor = x 128 129 match task_ctor: 130 case None: 131 match CodeReference(doot.aliases.task.task)(): 132 case type() as ref: 133 self.task_ctor = ref 134 case Exception() as err: 135 raise err 136 case type() as x: 137 self.task_ctor = x 138 139 match job_ctor: 140 case None: 141 match CodeReference(doot.aliases.task.job)(): 142 case type() as ref: 143 self.job_ctor = ref 144 case Exception() as err: 145 raise err 146 case type() as x: 147 self.job_ctor = x 148 149 ##--| Spec manipulation 150
[docs] 151 def build(self, data:ChainGuard|dict|TaskName_p|str) -> TaskSpec_i: 152 result : TaskSpec_i 153 match data: 154 case TaskSpec_i(): 155 result = data 156 case ChainGuard() | dict() if "source" in data: 157 raise ValueError("source is deprecated, use 'sources'", data) 158 case ChainGuard() | dict(): 159 result = self.spec_ctor(**data) 160 case TaskName(): 161 result = self.spec_ctor(name=data) # type: ignore[call-arg] 162 case str(): 163 result = self.spec_ctor(name=TaskName(data)) # type: ignore[call-arg] 164 case x: 165 raise TypeError(type(x)) 166 167 return result
168
[docs] 169 def delay(self, *, base:TaskName_p, target:TaskName_p, inject:Maybe[InjectSpec_i]=None, applied:Maybe[dict]=None, overrides:dict) -> DelayedSpec: 170 """ 171 Build data structure that the registry will process into a full spec 172 """ 173 result : DelayedSpec = DelayedSpec(base=TaskName(base), 174 target=TaskName(target), 175 inject=inject, 176 applied=applied, 177 overrides=overrides, 178 ) 179 180 return result
181
[docs] 182 def instantiate(self, obj:TaskSpec_i, *, suffix:Maybe[bool|str]=None, extra:Maybe[Mapping]=None) -> TaskSpec_i: 183 """ 184 Return this spec, copied with a uniq name 185 """ 186 result : TaskSpec_i 187 instance : TaskSpec_i 188 # TODO use model_copy(update={...}) 189 instance = obj.model_copy() 190 instance.generated_names.clear() 191 match extra: 192 case None: 193 result = instance 194 case dict(): 195 result = self.merge(bot=instance, top=extra) 196 case x: 197 raise TypeError(type(x)) 198 199 result.name = self._prep_name(obj.name, suffix=suffix or False).to_uniq() 200 assert(result.name.uuid()) 201 return result
202
[docs] 203 def merge(self, top:dict|TaskSpec_i, bot:dict|TaskSpec_i, *, suffix:Maybe[str|Literal[False]]=None, name:Maybe[TaskName_p]=None) -> TaskSpec_i: 204 """ bot + top -> TaskSpec """ 205 result : dict 206 base_name : TaskName_p 207 top_data : dict 208 bot_data : dict 209 ##--| 210 if bot is top: 211 raise doot.errors.TrackingError("Tried to apply a spec over itself", top, bot) 212 213 ##--| prepare 214 match bot: 215 case dict(): 216 bot_data = bot 217 case TaskSpec_i(): 218 bot_data = dict(bot) 219 match top: 220 case dict(): 221 top_data = top 222 case TaskSpec_i(): 223 top_data = dict(top) 224 ##--| 225 result = self._specialize_merge(bot=bot_data, top=top_data) 226 match name: 227 case TaskName_p(): 228 result['name'] = name 229 case _: 230 base_name = top_data.get('name', None) or bot_data['name'] 231 result['name'] = self._prep_name(base_name, suffix=suffix) 232 ##--| 233 return self.build(result)
234 235 ##--| Task construction 236
[docs] 237 def make(self, obj:TaskSpec_i, ensure:Any=None) -> Task_p: 238 """ Create actual task instance 239 240 if no spec_ctor has been specified, uses the default spec_ctor for job/task 241 """ 242 task : Task_p 243 ctor : type 244 match obj.ctor: # Get the ctor 245 case None if TaskMeta_e.JOB in obj.meta: 246 ctor = self.job_ctor 247 case None: 248 ctor = self.task_ctor 249 case CodeReference() as x: 250 match x(check=ensure): 251 case type() as val: 252 ctor = val 253 case Exception() as err: 254 raise err 255 case x: 256 raise TypeError(type(x)) 257 258 assert(ctor is not None) 259 task = ctor(obj) 260 261 return task
262 263 ##--| utils 264
[docs] 265 def get_source_names(self, obj:TaskSpec_i) -> list[TaskName_p]: 266 """ Get from the spec's sources just its source tasks """ 267 val = [x for x in obj.sources if isinstance(x, TaskName)] 268 return cast("list[TaskName_p]", val)
269
[docs] 270 def action_groups(self, obj:TaskSpec_i) -> Iterable[Iterable]: 271 return [obj.depends_on, obj.setup, obj.actions, obj.cleanup, obj.on_fail]
272
[docs] 273 def action_group_elements(self, obj:TaskSpec_i) -> Iterable[ActionSpec_i|RelationSpec_i]: 274 """ Get the elements of: depends_on, setup, actions, and require_for. 275 """ 276 groups : Iterable[Iterable] = [obj.depends_on, obj.setup, obj.actions, obj.required_for] 277 for group in groups: 278 yield from group
279
[docs] 280 def _specialize_merge(self, *, bot:dict, top:dict) -> dict: 281 """ 282 Apply top over the top of bot 283 284 Combines, rather than overrides, particular values. 285 286 """ 287 x : Any 288 y : Any 289 specialized : dict 290 sources : set = set() 291 merge_keys : list = ["actions", "depends_on", "required_for", "cleanup", "on_fail", "setup"] 292 293 specialized = dict(bot) 294 specialized |= dict(top) 295 if 'name' in specialized: 296 del specialized['name'] 297 298 # Extend sources 299 match bot.get('sources', []), top.get('sources', []): 300 case x, y if len(x) < len(y): 301 sources.update(y) 302 case x, _: 303 sources.update(x) 304 305 if 'name' in bot: 306 sources.add(bot['name']) 307 if 'name' in top: 308 sources.add(top['name']) 309 specialized['sources'] = list(sources) 310 # Merge action groups 311 for x in merge_keys: 312 specialized[x] = [*bot.get(x, []), *top.get(x, [])] 313 314 # Internal is only for initial specs, to control listing 315 specialized[API.META_K] = set() 316 specialized[API.META_K].update(bot.get('meta', set())) 317 specialized[API.META_K].update(top.get('meta', set())) 318 specialized[API.META_K].difference_update({TaskMeta_e.INTERNAL}) 319 320 return specialized
321
[docs] 322 def _prep_name(self, base:TaskName_p, *, suffix:Maybe[int|str|Literal[False]]=None) -> TaskName_p: 323 result : TaskName_p 324 ##--| 325 match suffix: 326 case bool() | 0: 327 result = cast("TaskName_p", base) 328 case None: 329 result = base.push(TaskName.Marks.customised) # type: ignore[assignment] 330 case str(): 331 result = base.push(suffix) # type: ignore[assignment] 332 case int() as x if x > 0: 333 result = cast("TaskName_p", base.push(str(x))) 334 case x: 335 raise TypeError(type(x)) 336 ##--| 337 return result.de_uniq()
338
[docs] 339@Proto(SubTaskFactory_p) 340class SubTaskFactory: 341 """ Additional factory for generating related tasks of an instantiated spec """ 342
[docs] 343 def generate_names(self, obj:TaskSpec_i) -> list[TaskName_p]: 344 result : list[Maybe[TaskName_p]] = [ 345 self._job_head_p(obj), 346 self._cleanup_p(obj), 347 ] 348 return [x for x in result if x is not None]
349
[docs] 350 def generate_specs(self, obj:TaskSpec_i|Artifact_i|DelayedSpec) -> list[dict]: 351 result : list[dict] = [] 352 if not isinstance(obj, TaskSpec_i): 353 return result 354 355 logging.debug("[Task.Factory.Generate] : %s (%s)", obj.name, len(obj.generated_names)) 356 # Jobs generate their head 357 result += self._gen_job_head(obj) 358 result += self._gen_cleanup_task(obj) 359 360 obj.generated_names.update([x['name'] for x in result]) 361 return result
362
[docs] 363 def _gen_job_head(self, obj:TaskSpec_i) -> list[dict]: 364 """ 365 Generate a top spec for a job, taking the jobs cleanup actions 366 and using them as the head's main action. 367 Cleanup relations are turning into the head's dependencies 368 Depends on the job, and its reactively queued. 369 370 Equivalent to: 371 await job.depends_on() 372 await job.setup() 373 subtasks = job.actions() 374 await subtasks 375 job.head() 376 await job.cleanup() 377 """ 378 job_head : TaskName_p 379 match self._job_head_p(obj): 380 case None: 381 return [] 382 case TaskName_p() as job_head: 383 pass 384 385 tasks = [] 386 head_section = self._raw_data_to_specs(obj.extra.on_fail([], list).head_actions(), relation=RelationMeta_e.needs) 387 head_dependencies = [x for x in head_section if isinstance(x, RelationSpec_i) and x.target != job_head] 388 head_actions = [x for x in head_section if not isinstance(x, RelationSpec_i)] 389 ctor = obj.extra.on_fail(None).sub_ctor() 390 391 # build $head$ 392 head : dict = { 393 "name" : job_head, 394 "ctor" : ctor, 395 "sources" : [*obj.sources[:], obj.name, None], 396 "queue_behaviour" : API.QueueMeta_e.reactive, 397 "depends_on" : [obj.name, *head_dependencies], 398 "required_for" : obj.required_for[:], 399 "cleanup" : obj.cleanup[:], 400 "meta" : (obj.meta | {TaskMeta_e.JOB_HEAD}) - {TaskMeta_e.JOB}, 401 "actions" : head_actions, 402 **obj.extra, 403 } 404 assert(TaskMeta_e.JOB not in head['meta']) 405 tasks.append(head) 406 return tasks
407
[docs] 408 def _gen_cleanup_task(self, obj:TaskSpec_i) -> list[dict]: 409 """ Generate a cleanup task, shifting the 'cleanup' actions and dependencies 410 to 'depends_on' and 'actions' 411 """ 412 cleanup_name : TaskName_p 413 match self._cleanup_p(obj): 414 case None: 415 return [] 416 case TaskName_p() as cleanup_name: 417 pass 418 419 base_deps = [obj.name] + [x for x in obj.cleanup if isinstance(x, RelationSpec_i) and x.target != cleanup_name] 420 actions = [x for x in obj.cleanup if isinstance(x, ActionSpec_i)] 421 sources = [obj.name] 422 423 cleanup : dict = { 424 "name" : cleanup_name, 425 "ctor" : obj.ctor, 426 "sources" : sources, 427 "queue_behaviour" : API.QueueMeta_e.reactive, 428 "depends_on" : base_deps, 429 "actions" : actions, 430 "cleanup" : [], 431 "meta" : (obj.meta | {TaskMeta_e.TASK}) - {TaskMeta_e.JOB}, 432 } 433 assert(not bool(cleanup['cleanup'])) 434 return [cleanup]
435
[docs] 436 def _raw_data_to_specs(self, deps:list[str|dict], *, relation:RelationMeta_e=DEFAULT_RELATION) -> list[ActionSpec_i|RelationSpec_i]: 437 """ Convert toml provided raw data (str's, dicts) of specs into ActionSpec and RelationSpec object""" 438 results : list[ActionSpec_i|RelationSpec_i] = [] 439 for x in deps: 440 match x: 441 case ActionSpec_i() | RelationSpec_i(): 442 results.append(x) 443 case { "do": action } as d: 444 assert(isinstance(d, dict)) 445 results.append(ActionSpec.build(d)) 446 case _: 447 results.append(RelationSpec.build(x, relation=relation)) 448 449 return results
450
[docs] 451 def _job_head_p(self, obj:TaskSpec_i) -> Maybe[TaskName_p]: 452 # if not obj.name.uuid(): 453 # return None 454 if TaskMeta_e.JOB not in obj.meta: 455 return None 456 if obj.name.is_head(): 457 return None 458 459 return obj.name.with_head()
460 461
[docs] 462 def _cleanup_p(self, obj:TaskSpec_i) -> Maybe[TaskName_p]: 463 # if not obj.name.uuid(): 464 # return None 465 if self._job_head_p(obj) or obj.name.is_cleanup(): 466 return None 467 468 return obj.name.with_cleanup()