Source code for doot.control.tracker.registry

  1 #!/usr/bin/env python3
  2"""
  3
  4"""
  5# # mypy: disable-error-code="attr-defined"
  6# Imports:
  7from __future__ import annotations
  8
  9# ##-- stdlib imports
 10import datetime
 11import enum
 12import functools as ftz
 13import itertools as itz
 14import logging as logmod
 15import pathlib as pl
 16import re
 17import time
 18import types
 19import weakref
 20from collections import defaultdict
 21from uuid import UUID, uuid1
 22
 23# ##-- end stdlib imports
 24
 25# ##-- 3rd party imports
 26from jgdv import Mixin, Proto
 27
 28# ##-- end 3rd party imports
 29
 30# ##-- 1st party imports
 31import doot
 32import doot.errors
 33from doot.workflow import DootTask, TaskArtifact, TaskName
 34from doot.workflow import _interface as S_API#  noqa: N812
 35from doot.workflow._interface import (ActionSpec_i, ArtifactStatus_e, RelationMeta_e,
 36                                      InjectSpec_i, RelationSpec_i, TaskMeta_e,
 37                                      TaskName_p, TaskSpec_i, TaskStatus_e, Task_p, Artifact_i)
 38# ##-- end 1st party imports
 39
 40# ##-| Local
 41from . import _interface as API # noqa: N812
 42
 43# # End of Imports.
 44
 45# ##-- types
 46# isort: off
 47import abc
 48import collections.abc
 49from typing import TYPE_CHECKING, cast, assert_type, assert_never
 50from typing import Generic, NewType
 51# Protocols:
 52from typing import Protocol, runtime_checkable
 53# Typing Decorators:
 54from typing import no_type_check, final, override, overload
 55
 56if TYPE_CHECKING:
 57    from jgdv import Maybe
 58    from typing import Final
 59    from typing import ClassVar, Any, LiteralString
 60    from typing import Never, Self, Literal
 61    from typing import TypeGuard
 62    from collections.abc import Iterable, Iterator, Callable, Generator
 63    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 64
 65    from jgdv.structs.chainguard import ChainGuard
 66    type Abstract[T] = T
 67    type Concrete[T] = T
 68    type ActionElem  = ActionSpec_i|RelationSpec_i
 69    type ActionGroup = list[ActionElem]
 70##--|
 71##
 72from doot.workflow._interface import Task_i
 73# isort: on
 74# ##-- end types
 75
 76##-- logging
 77logging          = logmod.getLogger(__name__)
 78logging.disabled = False
 79##-- end logging
 80
 81##--|
 82
[docs] 83class _Registration_m(API.Registry_d): 84
[docs] 85 def register_spec(self, spec:TaskSpec_i) -> None: 86 """ Register task specs, abstract or concrete 87 88 Does *not* handle any taskspec generation logic 89 """ 90 x : Any 91 ##--| 92 if TaskMeta_e.DISABLED in spec.meta: 93 logging.info("[Disabled] task: %s", spec.name[:]) 94 return 95 96 match spec.name: 97 case TaskName_p() as x if x in self.specs: 98 if self.specs[x].spec is not spec: 99 raise ValueError("Tried to overwrite a spec", spec.name) 100 return 101 case TaskName_p() as x if TaskName.Marks.partial in x: 102 raise ValueError("By this point a partial spec should have been reified", x) 103 104 case TaskName_p() as x if (x.is_head() or x.is_cleanup()): 105 logging.info("[+.generated] : %s", spec.name) 106 if (gen_base:=x.de_uniq()) in self.specs: 107 # an explicitly registered abstract head/cleanup 108 self.specs[gen_base].related.add(spec.name) 109 if x.uuid() and (originator:=x.pop_generated()) in self.specs: 110 self.specs[originator].related.add(spec.name) 111 self.specs[spec.name] = API.SpecMeta_d(spec=spec) 112 self._register_spec_artifacts(spec) 113 self._register_blocking_relations(spec) 114 self._register_delayed_blockers(spec) 115 self._register_implicit_tasks(spec) 116 case TaskName_p() if x.uuid(): 117 logging.info("[+.Concrete] : %s", spec.name) 118 self.concrete.add(spec.name.de_uniq()) 119 self.specs[spec.name] = API.SpecMeta_d(spec=spec) 120 self._register_spec_artifacts(spec) 121 self._register_blocking_relations(spec) 122 self._register_delayed_blockers(spec) 123 self._register_implicit_tasks(spec) 124 self.specs[spec.name.de_uniq()].related.add(spec.name) 125 case TaskName_p(): 126 logging.info("[+.Abstract] : %s", spec.name) 127 self.abstract.add(spec.name) 128 self.specs[spec.name] = API.SpecMeta_d(spec=spec) 129 self._register_spec_artifacts(spec) 130 self._register_blocking_relations(spec) 131 self._register_delayed_blockers(spec) 132 self._register_implicit_tasks(spec) 133 case x: 134 raise TypeError(type(x))
135 136
[docs] 137 def _register_artifact(self, art:Artifact_i, *tasks:TaskName_p, relation:Maybe[S_API.RelationMeta_e]=None) -> None: 138 logging.info("[+] Artifact: %s, %s", art, tasks) 139 obj : API.ArtifactMeta_d 140 141 match self.artifacts.get(art, None): 142 case API.ArtifactMeta_d() as obj: 143 pass 144 case None: 145 obj = API.ArtifactMeta_d(artifact=art) 146 self.artifacts[art] = obj 147 148 # Add it to the relevant abstract/concrete set 149 match art.is_concrete(): 150 case True: 151 self.concrete.add(art) 152 case False: 153 self.abstract.add(art) 154 155 match relation: 156 case None: 157 pass 158 case S_API.RelationMeta_e.needs: 159 obj.consumers.update(tasks) 160 case S_API.RelationMeta_e.blocks: 161 obj.builders.update(tasks)
162
[docs] 163 def _register_spec_artifacts(self, spec:TaskSpec_i) -> None: 164 """ Register the artifacts a spec produces """ 165 assert(hasattr(self._tracker, "_factory")) 166 for rel in self._tracker._factory.action_group_elements(spec): 167 match rel: 168 case RelationSpec_i(target=Artifact_i() as art, relation=reltype): 169 self._register_artifact(art, spec.name, relation=reltype) 170 case _: 171 pass
172
[docs] 173 def _register_blocking_relations(self, spec:TaskSpec_i) -> None: 174 """ a Task[required_for=[x,y,z] blocks x,y,z, 175 but if you just look at x,y,z, you can't know that. 176 This is the reverse mapping to allow for that 177 178 """ 179 # assert(not spec.name.uuid()) 180 assert(hasattr(self._tracker, "_factory")) 181 # Register Indirect dependencies: 182 # So if spec blocks target, 183 # record that target needs spec 184 for rel in self._tracker._factory.action_group_elements(spec): 185 match rel: 186 case RelationSpec_i(target=TaskName_p() as target, relation=RelationMeta_e.blocks) if target in self.specs: # type: ignore[attr-defined] 187 logging.info("[Requirement]: %s : %s", target, spec.name) 188 self.specs[target].blocked_by.add(spec.name) 189 case RelationSpec_i(target=Artifact_i() as target, relation=RelationMeta_e.blocks) if target in self.artifacts: # type: ignore[attr-defined] 190 logging.info("[Requirement]: %s : %s", target, spec.name) 191 self.artifacts[target].blocked_by.add(spec.name) 192 case RelationSpec_i(target=target, relation=RelationMeta_e.blocks): 193 logging.info("[Delayed.Requirement]: %s : %s", target, spec.name) 194 self._delayed_blockers[target].append(spec.name) 195 case _: # Ignore action specs and non blockers 196 pass 197 else: 198 return
199
[docs] 200 def _register_delayed_blockers(self, spec:TaskSpec_i) -> None: 201 simple = spec.name.de_uniq() 202 updates = set() 203 updates.update(self._delayed_blockers[spec.name]) 204 updates.update(self._delayed_blockers[simple]) 205 if not bool(updates): 206 return 207 logging.info("[Applying.Delayed.Requirements]: %s", spec.name) 208 self.specs[spec.name].blocked_by.update(updates) 209 if spec.name in self._delayed_blockers: 210 del self._delayed_blockers[spec.name] 211 if simple in self._delayed_blockers: 212 del self._delayed_blockers[simple]
213
[docs] 214 def _register_late_injection(self, task:TaskName_p, inject:InjectSpec_i, parent:TaskName_p) -> None: 215 """ Register an injection to run on task initialisation, 216 using the state injection's from its parent 217 """ 218 logging.info("[Injection] Registering: %s <- %s", task, parent) 219 assert(parent in self.specs) 220 assert(task in self.specs) 221 assert(parent.uuid()) 222 assert(task.uuid()) 223 self.specs[task].injection_source = (parent, inject)
224
[docs] 225 def _register_implicit_tasks(self, spec:TaskSpec_i) -> None: 226 for data in self._tracker._subfactory.generate_specs(spec): # type: ignore[attr-defined] 227 logging.debug("[Implicit]: %s -> %s", spec.name, data["name"]) 228 implicit = self._tracker._factory.build(data) # type: ignore[attr-defined] 229 if implicit.name not in self.specs: 230 self._tracker.register(implicit)
231 232
[docs] 233class _Instantiation_m(API.Registry_d): 234
[docs] 235 def instantiate_spec(self, name:Abstract[TaskName_p], *, force:Maybe[int|bool]=None, extra:Maybe[dict|ChainGuard]=None) -> Maybe[Concrete[TaskName_p]]: 236 """ Convert an Asbtract Spec into a Concrete Spec, 237 Reuses a existing concrete spec if possible. 238 239 If force=True, forces a new instance to be made 240 if force=False, blocks new instances from being made 241 """ 242 meta : API.SpecMeta_d 243 spec : TaskSpec_i 244 instance : TaskSpec_i 245 ##--| 246 assert(hasattr(self._tracker, "_factory")) 247 match force: 248 case None|False|0 if name.uuid() and name in self.specs: # Re-use existing instance 249 if bool(extra): 250 raise ValueError("tried to instance a spec, while disallowing new specs, but providing extra values") 251 self._instantiate_implicit_tasks(name) 252 return name 253 case _: 254 pass 255 256 assert(not name.uuid()), name 257 meta = self.specs[name] 258 match list(meta.related), force: 259 case _, True: # disallow reuse 260 pass 261 case _, None if extra: # extra data provided 262 pass 263 case [x, *xs], None|False: # reuse 264 logging.info("[Instance.Concrete] : %s", x) 265 self._instantiate_implicit_tasks(x) 266 return x 267 268 spec = meta.spec 269 instance = self._tracker._factory.instantiate(spec, suffix=force, extra=extra) 270 assert(instance is not None) 271 assert(instance.name.uuid()) 272 logging.debug("[Instance.new] %s into %s", name, instance.name) 273 # register the actual concrete spec 274 self._tracker.register(instance) # type: ignore[attr-defined] 275 assert(instance.name in self.specs) 276 assert(instance.name in meta.related) 277 self._instantiate_implicit_tasks(instance.name) 278 return instance.name
279
[docs] 280 def instantiate_relation(self, rel:RelationSpec_i, *, control:Concrete[TaskName_p]) -> Concrete[TaskName_p]: # noqa: PLR0912, PLR0915 281 """ find a matching relation according to constraints, 282 or create a new instance if theres no constraints/no match 283 284 returns the concrete TaskName_p of the instanced target of the relation 285 """ 286 x : Any 287 control_meta : API.SpecMeta_d 288 control_obj : Task_p | TaskSpec_i 289 target : TaskName_p 290 instance : Maybe[TaskName_p] 291 existing : TaskName_p 292 potentials : list[TaskName_p] 293 ##--| 294 logging.info("[Instance.Relation] : %s -> %s -> %s", control, rel.relation.name, rel.target) 295 ##--| guards 296 if control not in self.specs: 297 raise doot.errors.TrackingError("Unknown control used in relation", control, rel) 298 match rel.target: 299 case TaskName_p() as targ if targ.uuid() and targ in self.specs: 300 logging.debug("[Instance.Relation.Exists] : %s", rel.target) 301 return rel.target 302 case TaskName_p() as targ if targ in self.specs: 303 target = targ 304 case TaskName_p() as targ if targ.uuid() and targ.de_uniq() in self.specs: 305 target = targ.de_uniq() 306 case TaskName_p() as targ if targ.pop(top=False) in self.specs: 307 target = cast("TaskName_p", targ.pop()) 308 case TaskName_p() as target: 309 raise doot.errors.TrackingError("Unknown target declared in Constrained Relation", control, target) 310 311 assert(isinstance(target, TaskName_p)) 312 ##--| 313 match self.specs[control]: 314 case API.SpecMeta_d(task=Task_i() as _task) as control_meta: 315 control_obj = _task 316 case API.SpecMeta_d(spec=_spec) as control_meta: 317 control_obj = _spec 318 ##--| reuse 319 potentials = list(self.specs[target].related) 320 for existing in potentials: 321 match self.specs[existing]: 322 case API.SpecMeta_d(task=Task_i() as _task) if not rel.accepts(control_obj, _task): 323 continue 324 case API.SpecMeta_d(spec=_spec) if not rel.accepts(control_obj, _spec): 325 continue 326 case _: 327 logging.debug("[Instance.Relation.Match] : %s", existing) 328 return existing 329 else: 330 # make a new rel.target instance 331 match rel.inject: 332 case InjectSpec_i() as inj: 333 pass 334 case _: 335 instance = self._tracker._instantiate(target, force=True) 336 assert(instance is not None) 337 logging.debug("[Instance.Relation.Basic] : %s", instance) 338 return instance 339 340 # Early injections applied here, so constrained relations can use them 341 match inj.apply_from_spec(control_obj): 342 case dict() as x if not bool(x): 343 instance = self._tracker._instantiate(target, force=True) 344 case x: 345 instance = self._tracker._instantiate(target, extra=x) 346 347 assert(instance is not None) 348 assert(instance not in potentials), instance 349 if instance and not inj.validate(control_obj, self.specs[instance].spec, only_spec=True): 350 raise doot.errors.TrackingError("Injection did not succeed", inj.validate_details(control_obj, self.specs[instance].spec, only_spec=True)) 351 352 self._register_late_injection(instance, inj, control) # type: ignore[attr-defined] 353 logging.debug("[Instance.Relation.Inject] : %s", instance) 354 return instance
355
[docs] 356 def make_task(self, name:Concrete[TaskName_p], *, task_obj:Maybe[Task_i]=None) -> Concrete[TaskName_p]: 357 """ Build a Concrete Spec's Task object, then register it 358 if a task_obj is provided, store that instead 359 360 return the name of the task 361 """ 362 assert(hasattr(self._tracker, "_factory")) 363 assert(isinstance(name, TaskName_p)) 364 task : Task_p 365 meta : API.SpecMeta_d 366 ##--| guards 367 match self.specs[name], task_obj: 368 case _, _ if not name.uuid(): 369 raise doot.errors.TrackingError("Tried to build a task using a non-concrete spec", name) 370 case None, _: 371 raise doot.errors.TrackingError("Tried to make a task from a non-existent spec name", name) 372 case API.SpecMeta_d(task=Task_p()), Task_p() as obj: 373 raise doot.errors.TrackingError("Tried to provide a task object for already existing task", name) 374 case API.SpecMeta_d(task=TaskStatus_e.DEFINED), Task_p() as obj: 375 self.specs[name].task = obj 376 return name 377 case API.SpecMeta_d(task=Task_p()), None: 378 return name 379 case API.SpecMeta_d(task=TaskStatus_e()), None: 380 logging.debug("[Instance] Task Object: %s", name) 381 meta = self.specs[name] 382 task = self._tracker._factory.make(meta.spec, ensure=Task_i) 383 # Store it 384 meta.task = task 385 return name 386 case x: 387 raise TypeError(type(x))
388
[docs] 389 def _instantiate_implicit_tasks(self, name:TaskName_p) -> None: 390 spec = self.specs[name].spec 391 for data in self._tracker._subfactory.generate_specs(spec): # type: ignore[attr-defined] 392 implicit = self._tracker._factory.build(data) # type: ignore[attr-defined] 393 if implicit.name not in self.specs: 394 self._tracker.register(implicit) 395 self._tracker._instantiate(implicit.name)
396
[docs] 397class _Verification_m(API.Registry_d): 398
[docs] 399 def verify(self, *, strict:bool=True) -> bool: 400 failures = [] 401 for k in (missing:=self.concrete - self.abstract - self.artifacts.keys()): 402 failures.append(f"Abstact Spec {k} is missing") 403 404 # TODO Add more verify heuristics 405 if not bool(failures): 406 return True 407 if strict: 408 raise ValueError("Registry Failed Validation", failures) 409 else: 410 logging.warning("Registry Failed Validation: %s", failures) 411 return False
412 413##--| 414
[docs] 415@Proto(API.Registry_p) 416@Mixin(_Registration_m, _Instantiation_m, _Verification_m) 417class TrackRegistry(API.Registry_d): 418 """ Stores and manipulates specs, tasks, and artifacts """ 419 420 def __init__(self, *args, **kwargs): 421 super().__init__(*args, **kwargs) 422 self._delayed_blockers = defaultdict(list) 423 424
[docs] 425 def get_status(self, target:Concrete[TaskName_p|Artifact_i]) -> tuple[TaskStatus_e|ArtifactStatus_e, int]: 426 """ Get the status of a target or artifact """ 427 assert(hasattr(self._tracker, "_declare_priority")) 428 assert(hasattr(self._tracker, "_root_node")) 429 if isinstance(target, Artifact_i): 430 return target.get_status(), target.priority 431 432 assert(isinstance(target, TaskName_p)) 433 match self.specs.get(target, None): 434 case None if target == self._tracker._root_node: 435 return TaskStatus_e.NAMED, self._tracker._declare_priority 436 case None if target.uuid() and target.de_uniq() in self.specs: 437 return TaskStatus_e.DECLARED, self._tracker._declare_priority 438 case API.SpecMeta_d(task=TaskStatus_e() as status): 439 return status, self._tracker._declare_priority 440 case API.SpecMeta_d(task=Task_p() as _target): 441 return _target.status, _target.priority 442 case _: 443 return TaskStatus_e.NAMED, self._tracker._declare_priority
444
[docs] 445 def set_status(self, target:Concrete[TaskName_p|Artifact_i], status:TaskStatus_e|ArtifactStatus_e) -> bool: 446 """ update the state of a task in the dependency graph 447 Returns True on status update, 448 False on no task or artifact to update. 449 """ 450 x : Any 451 instance : TaskName_p 452 ##--| 453 logging.debug("[Status.=] : %s : %s", target, status) 454 match target: 455 case Artifact_i() as x: 456 return False 457 case TaskName_p() as x: 458 instance = x 459 case x: 460 raise TypeError(type(x)) 461 462 assert(isinstance(status, TaskStatus_e)) 463 match self.specs.get(instance, None): 464 case None: 465 return False 466 case API.SpecMeta_d(task=TaskStatus_e()) as _meta: 467 _meta.task = status 468 return False 469 case API.SpecMeta_d(task=Task_p() as _task): 470 _task.status = status 471 return True 472 case x: 473 raise TypeError(type(x))