Source code for doot.workflow.structs.task_spec

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# mypy: disable-error-code="attr-defined"
  6# ruff: noqa: N812
  7# Imports:
  8from __future__ import annotations
  9
 10# ##-- stdlib imports
 11import datetime
 12import functools as ftz
 13import importlib
 14import itertools as itz
 15import logging as logmod
 16import pathlib as pl
 17import re
 18import time
 19import types
 20import typing
 21import weakref
 22from importlib.metadata import EntryPoint
 23from uuid import UUID, uuid1
 24
 25# ##-- end stdlib imports
 26
 27# ##-- 3rd party imports
 28from jgdv import Proto, Mixin
 29from jgdv._abstract.protocols.general import (Buildable_p, SpecStruct_p)
 30from jgdv._abstract.protocols.pydantic import ProtocolModelMeta
 31from jgdv.cli import ParamSpec, ParamSpecMaker_m
 32from jgdv.structs.chainguard import ChainGuard
 33from jgdv.structs.dkey import DKey
 34from jgdv.structs.locator import Location
 35from jgdv.structs.strang import CodeReference
 36import jgdv.structs.strang.errors as StrangErrs
 37# ##-- end 3rd party imports
 38
 39# ##-- 1st party imports
 40import doot
 41import doot.errors
 42
 43# ##-- end 1st party imports
 44
 45from .. import _interface as API
 46from .._interface import TaskMeta_e, TaskName_p
 47from .action_spec import ActionSpec
 48from .inject_spec import InjectSpec
 49from .artifact import TaskArtifact
 50from .relation_spec import RelationMeta_e, RelationSpec
 51from .task_name import TaskName
 52from .._interface import Job_p, Artifact_i, RelationSpec_i, ActionSpec_i
 53
 54# ##-- types
 55# isort: off
 56import abc
 57import collections.abc
 58from typing import TYPE_CHECKING, cast, assert_type, assert_never
 59from typing import Generic, NewType, Any, Annotated, override
 60# Protocols:
 61from typing import Protocol, runtime_checkable
 62# Typing Decorators:
 63from typing import no_type_check, final, overload
 64from dataclasses import _MISSING_TYPE, InitVar, dataclass, field, fields
 65from pydantic import (BaseModel, BeforeValidator, Field, ValidationError,
 66                      ValidationInfo, ValidatorFunctionWrapHandler, ConfigDict,
 67                      WrapValidator, field_validator, model_validator)
 68# need to be outside of TYPE_CHECKING for pydantic
 69from jgdv import Maybe
 70if TYPE_CHECKING:
 71    import enum
 72    from typing import Final
 73    from typing import ClassVar, LiteralString
 74    from typing import Never, Self, Literal, _SpecialType
 75    from typing import TypeGuard
 76    from collections.abc import Iterable, Iterator, Callable, Generator
 77    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 78
 79    from .._interface import TaskSpec_i, Task_p, Task_i
 80    type SpecialType = _SpecialType
 81
 82# isort: on
 83# ##-- end types
 84
 85##-- logging
 86logging = logmod.getLogger(__name__)
 87##-- end logging
 88
 89##--| Consts
 90DEFAULT_ALIAS     : Final[str]             = doot.constants.entrypoints.DEFAULT_TASK_CTOR_ALIAS
 91DEFAULT_BLOCKING  : Final[tuple[str, ...]] = tuple(["required_for", "on_fail"])
 92DEFAULT_RELATION   : Final[RelationMeta_e] = RelationMeta_e.default()
 93##--| Utils
 94
[docs] 95def _action_group_sort_key(val:ActionSpec_i|RelationSpec_i) -> Any: 96 match val: 97 case ActionSpec_i(): # Don't change ActionSpec ordering 98 return (1,) 99 case RelationSpec_i(target=Artifact_i() as target): 100 return (-1,) 101 case RelationSpec_i(target=TaskName_p() as target): 102 return (0, target) 103 case x: 104 raise TypeError(type(x))
105
[docs] 106def _raw_data_to_specs(deps:list[str|dict], *, relation:RelationMeta_e=DEFAULT_RELATION) -> list[ActionSpec|RelationSpec]: 107 """ Convert toml provided raw data (str's, dicts) of specs into ActionSpec and RelationSpec object""" 108 results = [] 109 for x in deps: 110 match x: 111 case ActionSpec() | RelationSpec(): 112 results.append(x) 113 case { "do": action } as d: 114 assert(isinstance(d, dict)) 115 results.append(ActionSpec.build(d)) 116 case _: 117 results.append(RelationSpec.build(x, relation=relation)) 118 119 return results
120
[docs] 121def _prepare_action_group(group:Maybe[list[str]], handler:ValidatorFunctionWrapHandler, info:ValidationInfo) -> list[RelationSpec|ActionSpec]: 122 """ 123 Builds, Expands, Sorts, and Validates action/relation groups, 124 converting toml specified strings, list, and dicts to Artifacts (ie:files), Task Names, ActionSpecs 125 126 As a wrap handler, it has the context of what field is being processed, 127 this allows it to set the correct RelationMeta_e type 128 129 # TODO handle callables? 130 """ 131 results : list[RelationSpec|ActionSpec] 132 rel_root : TaskName 133 relation_type : RelationMeta_e 134 ##--| 135 relation_type = RelationMeta_e.blocks if info.field_name in TaskSpec._blocking_groups else RelationMeta_e.needs 136 match group: 137 case None | []: 138 return [] 139 case _: 140 pass 141 142 # Build initial Relation/Action Specs 143 results = _raw_data_to_specs(cast("list[str|dict]", group), relation=relation_type) 144 145 action_order = [x for x in results if isinstance(x, ActionSpec)] 146 res = sorted(results, key=_action_group_sort_key) 147 sorted_action_order = [x for x in res if isinstance(x, ActionSpec)] 148 assert(x is y for x,y in zip(action_order, sorted_action_order, strict=True)), "Sorting Action Specs modifed the order" 149 return handler(res)
150 151##--| 152ActionGroup = Annotated[list[ActionSpec|RelationSpec], WrapValidator(_prepare_action_group)] 153##--| 154
[docs] 155class _TransformerUtils_m: 156 """Utilities for artifact transformers""" 157
[docs] 158 def instantiate_transformer(self:TaskSpec_i, target:Artifact_i|tuple[Artifact_i, Artifact_i]) -> Maybe[TaskSpec_i]: 159 """ Create an instantiated transformer spec. 160 ie : ?.txt -> spec -> ?.blah 161 becomes: a.txt -> spec -> a.blah 162 163 can be given one artifact, which will be used for matching on pre and post, 164 or a tuple, which specifies an exact transform 165 166 TODO: handle ?/?.txt, */?.txt, blah/*/?.txt, path/blah.? 167 """ 168 pre : Artifact_i 169 post : Artifact_i 170 match target: 171 case Artifact_i(): 172 pre, post = target, target 173 case (Artifact_i() as pre, Artifact_i() as post): 174 pass 175 176 assert(pre.is_concrete() or post.is_concrete()) 177 instance = self.instantiate() 178 match self.transformer_of(): 179 case None: 180 raise doot.errors.TrackingError("Tried to transformer to_uniq a non-transformer", self.name) 181 case (x, y) if pre in x.target or post in y.target: 182 # exact transform 183 # replace x with pre in depends_on 184 instance.depends_on.remove(x) 185 instance.depends_on.append(x.instantiate(target=pre)) 186 # replace y with post in required_for 187 instance.required_for.remove(y) 188 instance.required_for.append(y.instantiate(target=post)) 189 case _: 190 return None 191 192 return instance
193
[docs] 194 def transformer_of(self:TaskSpec_i) -> Maybe[tuple[RelationSpec_i, RelationSpec_i]]: # noqa: PLR0911, PLR0912 195 """ If this spec can transform an artifact, 196 return those relations. 197 198 Transformers have file relations of a single solo abstract artifact 199 so: 'file:>a/path/?.txt' -> 'file:>b/path/?.bib' 200 (other relations can exist as well, but to be a transformer there needs to 201 be only 1 in, 1 out solo file relation 202 203 """ 204 x : Any 205 y : Any 206 match self._transform: 207 case False: 208 return None 209 case (x,y): 210 return cast("tuple[RelationSpec, RelationSpec]", self._transform) 211 case None: 212 pass 213 214 assert(TaskMeta_e.TRANSFORMER in self.meta) 215 216 pre, post = None, None 217 for x in self.depends_on: 218 match x: 219 case RelationSpec(target=TaskArtifact() as target) if not target.is_concrete(): 220 if pre is not None: 221 # If theres more than one applicable, its not a tranformer 222 self._transform = False 223 return None 224 pre = x 225 case _: 226 pass 227 228 for y in self.required_for: 229 match y: 230 case RelationSpec(target=TaskArtifact() as target) if Location.Marks.glob in target: 231 pass 232 case RelationSpec(target=TaskArtifact() as target) if not target.is_concrete(): 233 if post is not None: 234 self._transform = False 235 return None 236 post = y 237 case _: 238 pass 239 240 match pre, post: 241 case None, _: 242 self._transform = False 243 return None 244 case _, None: 245 self._transform = False 246 return None 247 case RelationSpec(), RelationSpec(): 248 self._transform = (pre, post) 249 return self._transform 250 251 raise ValueError("This shouldn't be possible")
252 253##--| 254
[docs] 255@Proto(API.TaskSpec_i, check=True) 256class TaskSpec(BaseModel, arbitrary_types_allowed=True, extra="allow"): # type: ignore[call-arg] 257 """ The information needed to describe a generic task. 258 Optional things are shoved into 'extra', so things can use .on_fail on the chainguard 259 260 the cli parser can understand cli=[{}] specs 261 actions : list[ [args] | {do='', args=[], **kwargs} ] 262 263 Notes: 264 sources = [root, ... grandparent, parent]. 'None' indicates halt on climbing source chain 265 266 """ 267 268 ##--| 269 _default_ctor : ClassVar[str] = DEFAULT_ALIAS 270 _blocking_groups : ClassVar[tuple[str, ...]] = DEFAULT_BLOCKING 271 Marks : ClassVar[type[enum.Enum]] = TaskMeta_e 272 ##--| core 273 name : TaskName_p = Field() 274 doc : Maybe[list[str]] = Field(default_factory=list) 275 sources : list[Maybe[TaskName|pl.Path]] = Field(default_factory=list) 276 277 # Action Groups: 278 ##--| action groups 279 actions : ActionGroup = Field(default_factory=list) 280 required_for : ActionGroup = Field(default_factory=list) 281 depends_on : ActionGroup = Field(default_factory=list) 282 setup : ActionGroup = Field(default_factory=list) 283 cleanup : ActionGroup = Field(default_factory=list) 284 on_fail : ActionGroup = Field(default_factory=list) 285 286 # Any additional 287 ##--| additional 288 version : str = Field(default=doot.__version__) # TODO: make dict? 289 priority : int = Field(default=10) 290 ctor : Maybe[CodeReference] = Field(default=None, validate_default=True) 291 queue_behaviour : API.QueueMeta_e = Field(default=API.QueueMeta_e.default) 292 meta : set[TaskMeta_e] = Field(default_factory=set) 293 generated_names : set[TaskName] = Field(init=False, default_factory=set) 294 295 # task specific estate 296 ##--| 297 _transform : Maybe[Literal[False]|tuple[RelationSpec, RelationSpec]] = None 298 299 ##--| validators 300
[docs] 301 @model_validator(mode="before") 302 def _convert_toml_keys(cls, data:dict) -> dict: 303 """ converts a-key into a_key, and joins group+name """ 304 cleaned : dict 305 sep : Maybe[str] = TaskName.section(0).end 306 assert(sep is not None) 307 308 cleaned = {k.replace(API.DASH_S, API.USCORE_S) : v for k,v in data.items()} 309 if API.GROUP_K in cleaned and sep not in cleaned[API.GROUP_K]: 310 cleaned[API.NAME_K] = sep.join([cleaned[API.GROUP_K], cleaned[API.NAME_K]]) 311 del cleaned[API.GROUP_K] 312 return cleaned
313
[docs] 314 @field_validator("name", mode="before") 315 def _validate_name(cls, val:str|TaskName) -> TaskName: 316 match val: 317 case TaskName(): 318 return val 319 case str(): 320 try: 321 name = TaskName(val) 322 except StrangErrs.StrangError as err: 323 raise ValueError(*err.args) from err 324 else: 325 return name 326 case _: 327 raise TypeError("A TaskSpec Name should be a str or TaskName", val)
328
[docs] 329 @field_validator("meta", mode="before") 330 def _validate_meta(cls, val:str|list|set|TaskMeta_e) -> set[str]: 331 vals : Iterable[str] 332 match val: 333 case TaskMeta_e(): 334 return {val} 335 case str(): 336 vals = [val] 337 case set() | list(): 338 vals = val 339 case x: 340 raise TypeError(type(x)) 341 342 return {x if isinstance(x, TaskMeta_e) else TaskMeta_e[x] for x in vals}
343
[docs] 344 @field_validator("ctor", mode="before") 345 def _validate_ctor(cls, val:Maybe[str|CodeReference]) -> Maybe[CodeReference]: 346 match val: 347 case None: 348 return None 349 case EntryPoint(): 350 return CodeReference(val) 351 case CodeReference(): 352 return val 353 case type()|str(): 354 return CodeReference(val) 355 case _: 356 return CodeReference(val)
357
[docs] 358 @field_validator("queue_behaviour", mode="before") 359 def _validate_queue_behaviour(cls, val:str|API.QueueMeta_e) -> API.QueueMeta_e: 360 match val: 361 case API.QueueMeta_e(): 362 return val 363 case str(): 364 return API.QueueMeta_e(val) 365 case _: 366 raise ValueError("Queue Behaviour needs to be a str or a QueueMeta_e enum", val)
367
[docs] 368 @field_validator("sources", mode="before") 369 def _validate_sources(cls, val:list[Maybe[str|TaskName]]) -> list[Maybe[str|TaskName|pl.Path]]: 370 """ builds the soures list, converting strings to task names, 371 372 """ 373 result : list[Maybe[str|TaskName|pl.Path]] = [] 374 for x in val: 375 match x: 376 case API.NONE_S | None: 377 result.append(None) 378 case TaskName() as x if TaskName.Marks.partial in x: 379 raise ValueError("A TaskSpec can not rely on a partial spec", x) 380 case TaskName() | pl.Path(): 381 result.append(x) 382 case str(): 383 try: 384 name = TaskName(x) 385 if TaskName.Marks.partial in name: 386 raise ValueError("A TaskSpec can not rely on a partial spec", x) 387 result.append(name) 388 except (StrangErrs.StrangError, ValidationError): 389 result.append(pl.Path(x)) 390 case x: 391 raise TypeError("Bad Typed Source", x) 392 393 return result
394
[docs] 395 @model_validator(mode="after") 396 def _validate_metadata(self) -> Self: 397 """ General object validator, mainly for metadata processing 398 399 """ 400 base_meta : set[TaskMeta_e] = self.meta.copy() 401 # Basic metadata from the spec: 402 if self.extra.on_fail(False).disabled(): # noqa: FBT003 403 base_meta.add(TaskMeta_e.DISABLED) 404 405 if TaskName.Marks.extend in self.name and not self.name.is_head(): 406 base_meta.add(TaskMeta_e.JOB) 407 408 match self.ctor and self.ctor(): 409 case None: 410 pass 411 case ImportError() as err: 412 logging.warning("Ctor Import Failed for: %s : %s", self.name, self.ctor) 413 base_meta.add(TaskMeta_e.DISABLED) 414 case type() as x if TaskMeta_e.JOB in base_meta and not isinstance(x, Job_p): 415 logging.warning("Ctor Not a Job for: %s : %s", self.name, self.ctor) 416 base_meta.add(TaskMeta_e.DISABLED) 417 case type() as x if hasattr(x, "_default_flags"): 418 base_meta.update(x._default_flags) 419 case x: 420 raise TypeError(type(x)) 421 422 # Validate 423 if TaskName.Marks.partial in self.name and not bool(self.sources): 424 raise ValueError("Tried to create a partial spec with no base source", self.name) 425 426 if TaskMeta_e.TRANSFORMER not in base_meta: 427 self._transform = False 428 429 # Update the spec 430 if not bool(base_meta) and (default:=TaskMeta_e.default()): 431 base_meta.add(default) 432 self.meta.update(base_meta) 433 434 return self
435 436 ##--| dunders 437 @override 438 def __hash__(self) -> int: 439 return hash(str(self.name)) 440 441 ##--| properties
[docs] 442 @property 443 def extra(self) -> ChainGuard: 444 return ChainGuard(self.model_extra)
445
[docs] 446 @property 447 def action_groups(self) -> list[list]: 448 return [self.depends_on, self.setup, self.actions, self.cleanup, self.on_fail]
449
[docs] 450 @property 451 def params(self) -> dict: 452 return self.model_extra
453
[docs] 454 @property 455 def args(self) -> list: 456 return []
457
[docs] 458 @property 459 def kwargs(self) -> dict: 460 return self.model_extra
461 462 ##--| methods
[docs] 463 def action_group_elements(self) -> Iterable[ActionSpec|RelationSpec]: 464 """ Get the elements of: depends_on, setup, actions, and require_for. 465 *never* cleanup, which generates its own task 466 """ 467 queue = [self.depends_on, self.setup, self.actions, self.required_for] 468 469 for group in queue: 470 yield from group
471
[docs] 472 def param_specs(self) -> list: 473 result = [] 474 for x in self.extra.on_fail([]).cli(): 475 result.append(ParamSpecMaker_m.build_param(**x)) 476 else: 477 return result