1 #!/usr/bin/env python3
2"""
3
4"""
5# # mypy: disable-error-code="attr-defined"
6# Imports:
7from __future__ import annotations
8
9# ##-- stdlib imports
10import datetime
11import enum
12import functools as ftz
13import itertools as itz
14import logging as logmod
15import pathlib as pl
16import re
17import time
18import types
19import weakref
20from collections import defaultdict
21from uuid import UUID, uuid1
22
23# ##-- end stdlib imports
24
25# ##-- 3rd party imports
26from jgdv import Mixin, Proto
27
28# ##-- end 3rd party imports
29
30# ##-- 1st party imports
31import doot
32import doot.errors
33from doot.workflow import DootTask, TaskArtifact, TaskName
34from doot.workflow import _interface as S_API# noqa: N812
35from doot.workflow._interface import (ActionSpec_i, ArtifactStatus_e, RelationMeta_e,
36 InjectSpec_i, RelationSpec_i, TaskMeta_e,
37 TaskName_p, TaskSpec_i, TaskStatus_e, Task_p, Artifact_i)
38# ##-- end 1st party imports
39
40# ##-| Local
41from . import _interface as API # noqa: N812
42
43# # End of Imports.
44
45# ##-- types
46# isort: off
47import abc
48import collections.abc
49from typing import TYPE_CHECKING, cast, assert_type, assert_never
50from typing import Generic, NewType
51# Protocols:
52from typing import Protocol, runtime_checkable
53# Typing Decorators:
54from typing import no_type_check, final, override, overload
55
56if TYPE_CHECKING:
57 from jgdv import Maybe
58 from typing import Final
59 from typing import ClassVar, Any, LiteralString
60 from typing import Never, Self, Literal
61 from typing import TypeGuard
62 from collections.abc import Iterable, Iterator, Callable, Generator
63 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
64
65 from jgdv.structs.chainguard import ChainGuard
66 type Abstract[T] = T
67 type Concrete[T] = T
68 type ActionElem = ActionSpec_i|RelationSpec_i
69 type ActionGroup = list[ActionElem]
70##--|
71##
72from doot.workflow._interface import Task_i
73# isort: on
74# ##-- end types
75
76##-- logging
77logging = logmod.getLogger(__name__)
78logging.disabled = False
79##-- end logging
80
81##--|
82
[docs]
83class _Registration_m(API.Registry_d):
84
[docs]
85 def register_spec(self, spec:TaskSpec_i) -> None:
86 """ Register task specs, abstract or concrete
87
88 Does *not* handle any taskspec generation logic
89 """
90 x : Any
91 ##--|
92 if TaskMeta_e.DISABLED in spec.meta:
93 logging.info("[Disabled] task: %s", spec.name[:])
94 return
95
96 match spec.name:
97 case TaskName_p() as x if x in self.specs:
98 if self.specs[x].spec is not spec:
99 raise ValueError("Tried to overwrite a spec", spec.name)
100 return
101 case TaskName_p() as x if TaskName.Marks.partial in x:
102 raise ValueError("By this point a partial spec should have been reified", x)
103
104 case TaskName_p() as x if (x.is_head() or x.is_cleanup()):
105 logging.info("[+.generated] : %s", spec.name)
106 if (gen_base:=x.de_uniq()) in self.specs:
107 # an explicitly registered abstract head/cleanup
108 self.specs[gen_base].related.add(spec.name)
109 if x.uuid() and (originator:=x.pop_generated()) in self.specs:
110 self.specs[originator].related.add(spec.name)
111 self.specs[spec.name] = API.SpecMeta_d(spec=spec)
112 self._register_spec_artifacts(spec)
113 self._register_blocking_relations(spec)
114 self._register_delayed_blockers(spec)
115 self._register_implicit_tasks(spec)
116 case TaskName_p() if x.uuid():
117 logging.info("[+.Concrete] : %s", spec.name)
118 self.concrete.add(spec.name.de_uniq())
119 self.specs[spec.name] = API.SpecMeta_d(spec=spec)
120 self._register_spec_artifacts(spec)
121 self._register_blocking_relations(spec)
122 self._register_delayed_blockers(spec)
123 self._register_implicit_tasks(spec)
124 self.specs[spec.name.de_uniq()].related.add(spec.name)
125 case TaskName_p():
126 logging.info("[+.Abstract] : %s", spec.name)
127 self.abstract.add(spec.name)
128 self.specs[spec.name] = API.SpecMeta_d(spec=spec)
129 self._register_spec_artifacts(spec)
130 self._register_blocking_relations(spec)
131 self._register_delayed_blockers(spec)
132 self._register_implicit_tasks(spec)
133 case x:
134 raise TypeError(type(x))
135
136
[docs]
137 def _register_artifact(self, art:Artifact_i, *tasks:TaskName_p, relation:Maybe[S_API.RelationMeta_e]=None) -> None:
138 logging.info("[+] Artifact: %s, %s", art, tasks)
139 obj : API.ArtifactMeta_d
140
141 match self.artifacts.get(art, None):
142 case API.ArtifactMeta_d() as obj:
143 pass
144 case None:
145 obj = API.ArtifactMeta_d(artifact=art)
146 self.artifacts[art] = obj
147
148 # Add it to the relevant abstract/concrete set
149 match art.is_concrete():
150 case True:
151 self.concrete.add(art)
152 case False:
153 self.abstract.add(art)
154
155 match relation:
156 case None:
157 pass
158 case S_API.RelationMeta_e.needs:
159 obj.consumers.update(tasks)
160 case S_API.RelationMeta_e.blocks:
161 obj.builders.update(tasks)
162
[docs]
163 def _register_spec_artifacts(self, spec:TaskSpec_i) -> None:
164 """ Register the artifacts a spec produces """
165 assert(hasattr(self._tracker, "_factory"))
166 for rel in self._tracker._factory.action_group_elements(spec):
167 match rel:
168 case RelationSpec_i(target=Artifact_i() as art, relation=reltype):
169 self._register_artifact(art, spec.name, relation=reltype)
170 case _:
171 pass
172
[docs]
173 def _register_blocking_relations(self, spec:TaskSpec_i) -> None:
174 """ a Task[required_for=[x,y,z] blocks x,y,z,
175 but if you just look at x,y,z, you can't know that.
176 This is the reverse mapping to allow for that
177
178 """
179 # assert(not spec.name.uuid())
180 assert(hasattr(self._tracker, "_factory"))
181 # Register Indirect dependencies:
182 # So if spec blocks target,
183 # record that target needs spec
184 for rel in self._tracker._factory.action_group_elements(spec):
185 match rel:
186 case RelationSpec_i(target=TaskName_p() as target, relation=RelationMeta_e.blocks) if target in self.specs: # type: ignore[attr-defined]
187 logging.info("[Requirement]: %s : %s", target, spec.name)
188 self.specs[target].blocked_by.add(spec.name)
189 case RelationSpec_i(target=Artifact_i() as target, relation=RelationMeta_e.blocks) if target in self.artifacts: # type: ignore[attr-defined]
190 logging.info("[Requirement]: %s : %s", target, spec.name)
191 self.artifacts[target].blocked_by.add(spec.name)
192 case RelationSpec_i(target=target, relation=RelationMeta_e.blocks):
193 logging.info("[Delayed.Requirement]: %s : %s", target, spec.name)
194 self._delayed_blockers[target].append(spec.name)
195 case _: # Ignore action specs and non blockers
196 pass
197 else:
198 return
199
[docs]
200 def _register_delayed_blockers(self, spec:TaskSpec_i) -> None:
201 simple = spec.name.de_uniq()
202 updates = set()
203 updates.update(self._delayed_blockers[spec.name])
204 updates.update(self._delayed_blockers[simple])
205 if not bool(updates):
206 return
207 logging.info("[Applying.Delayed.Requirements]: %s", spec.name)
208 self.specs[spec.name].blocked_by.update(updates)
209 if spec.name in self._delayed_blockers:
210 del self._delayed_blockers[spec.name]
211 if simple in self._delayed_blockers:
212 del self._delayed_blockers[simple]
213
[docs]
214 def _register_late_injection(self, task:TaskName_p, inject:InjectSpec_i, parent:TaskName_p) -> None:
215 """ Register an injection to run on task initialisation,
216 using the state injection's from its parent
217 """
218 logging.info("[Injection] Registering: %s <- %s", task, parent)
219 assert(parent in self.specs)
220 assert(task in self.specs)
221 assert(parent.uuid())
222 assert(task.uuid())
223 self.specs[task].injection_source = (parent, inject)
224
[docs]
225 def _register_implicit_tasks(self, spec:TaskSpec_i) -> None:
226 for data in self._tracker._subfactory.generate_specs(spec): # type: ignore[attr-defined]
227 logging.debug("[Implicit]: %s -> %s", spec.name, data["name"])
228 implicit = self._tracker._factory.build(data) # type: ignore[attr-defined]
229 if implicit.name not in self.specs:
230 self._tracker.register(implicit)
231
232
[docs]
233class _Instantiation_m(API.Registry_d):
234
[docs]
235 def instantiate_spec(self, name:Abstract[TaskName_p], *, force:Maybe[int|bool]=None, extra:Maybe[dict|ChainGuard]=None) -> Maybe[Concrete[TaskName_p]]:
236 """ Convert an Asbtract Spec into a Concrete Spec,
237 Reuses a existing concrete spec if possible.
238
239 If force=True, forces a new instance to be made
240 if force=False, blocks new instances from being made
241 """
242 meta : API.SpecMeta_d
243 spec : TaskSpec_i
244 instance : TaskSpec_i
245 ##--|
246 assert(hasattr(self._tracker, "_factory"))
247 match force:
248 case None|False|0 if name.uuid() and name in self.specs: # Re-use existing instance
249 if bool(extra):
250 raise ValueError("tried to instance a spec, while disallowing new specs, but providing extra values")
251 self._instantiate_implicit_tasks(name)
252 return name
253 case _:
254 pass
255
256 assert(not name.uuid()), name
257 meta = self.specs[name]
258 match list(meta.related), force:
259 case _, True: # disallow reuse
260 pass
261 case _, None if extra: # extra data provided
262 pass
263 case [x, *xs], None|False: # reuse
264 logging.info("[Instance.Concrete] : %s", x)
265 self._instantiate_implicit_tasks(x)
266 return x
267
268 spec = meta.spec
269 instance = self._tracker._factory.instantiate(spec, suffix=force, extra=extra)
270 assert(instance is not None)
271 assert(instance.name.uuid())
272 logging.debug("[Instance.new] %s into %s", name, instance.name)
273 # register the actual concrete spec
274 self._tracker.register(instance) # type: ignore[attr-defined]
275 assert(instance.name in self.specs)
276 assert(instance.name in meta.related)
277 self._instantiate_implicit_tasks(instance.name)
278 return instance.name
279
[docs]
280 def instantiate_relation(self, rel:RelationSpec_i, *, control:Concrete[TaskName_p]) -> Concrete[TaskName_p]: # noqa: PLR0912, PLR0915
281 """ find a matching relation according to constraints,
282 or create a new instance if theres no constraints/no match
283
284 returns the concrete TaskName_p of the instanced target of the relation
285 """
286 x : Any
287 control_meta : API.SpecMeta_d
288 control_obj : Task_p | TaskSpec_i
289 target : TaskName_p
290 instance : Maybe[TaskName_p]
291 existing : TaskName_p
292 potentials : list[TaskName_p]
293 ##--|
294 logging.info("[Instance.Relation] : %s -> %s -> %s", control, rel.relation.name, rel.target)
295 ##--| guards
296 if control not in self.specs:
297 raise doot.errors.TrackingError("Unknown control used in relation", control, rel)
298 match rel.target:
299 case TaskName_p() as targ if targ.uuid() and targ in self.specs:
300 logging.debug("[Instance.Relation.Exists] : %s", rel.target)
301 return rel.target
302 case TaskName_p() as targ if targ in self.specs:
303 target = targ
304 case TaskName_p() as targ if targ.uuid() and targ.de_uniq() in self.specs:
305 target = targ.de_uniq()
306 case TaskName_p() as targ if targ.pop(top=False) in self.specs:
307 target = cast("TaskName_p", targ.pop())
308 case TaskName_p() as target:
309 raise doot.errors.TrackingError("Unknown target declared in Constrained Relation", control, target)
310
311 assert(isinstance(target, TaskName_p))
312 ##--|
313 match self.specs[control]:
314 case API.SpecMeta_d(task=Task_i() as _task) as control_meta:
315 control_obj = _task
316 case API.SpecMeta_d(spec=_spec) as control_meta:
317 control_obj = _spec
318 ##--| reuse
319 potentials = list(self.specs[target].related)
320 for existing in potentials:
321 match self.specs[existing]:
322 case API.SpecMeta_d(task=Task_i() as _task) if not rel.accepts(control_obj, _task):
323 continue
324 case API.SpecMeta_d(spec=_spec) if not rel.accepts(control_obj, _spec):
325 continue
326 case _:
327 logging.debug("[Instance.Relation.Match] : %s", existing)
328 return existing
329 else:
330 # make a new rel.target instance
331 match rel.inject:
332 case InjectSpec_i() as inj:
333 pass
334 case _:
335 instance = self._tracker._instantiate(target, force=True)
336 assert(instance is not None)
337 logging.debug("[Instance.Relation.Basic] : %s", instance)
338 return instance
339
340 # Early injections applied here, so constrained relations can use them
341 match inj.apply_from_spec(control_obj):
342 case dict() as x if not bool(x):
343 instance = self._tracker._instantiate(target, force=True)
344 case x:
345 instance = self._tracker._instantiate(target, extra=x)
346
347 assert(instance is not None)
348 assert(instance not in potentials), instance
349 if instance and not inj.validate(control_obj, self.specs[instance].spec, only_spec=True):
350 raise doot.errors.TrackingError("Injection did not succeed", inj.validate_details(control_obj, self.specs[instance].spec, only_spec=True))
351
352 self._register_late_injection(instance, inj, control) # type: ignore[attr-defined]
353 logging.debug("[Instance.Relation.Inject] : %s", instance)
354 return instance
355
[docs]
356 def make_task(self, name:Concrete[TaskName_p], *, task_obj:Maybe[Task_i]=None) -> Concrete[TaskName_p]:
357 """ Build a Concrete Spec's Task object, then register it
358 if a task_obj is provided, store that instead
359
360 return the name of the task
361 """
362 assert(hasattr(self._tracker, "_factory"))
363 assert(isinstance(name, TaskName_p))
364 task : Task_p
365 meta : API.SpecMeta_d
366 ##--| guards
367 match self.specs[name], task_obj:
368 case _, _ if not name.uuid():
369 raise doot.errors.TrackingError("Tried to build a task using a non-concrete spec", name)
370 case None, _:
371 raise doot.errors.TrackingError("Tried to make a task from a non-existent spec name", name)
372 case API.SpecMeta_d(task=Task_p()), Task_p() as obj:
373 raise doot.errors.TrackingError("Tried to provide a task object for already existing task", name)
374 case API.SpecMeta_d(task=TaskStatus_e.DEFINED), Task_p() as obj:
375 self.specs[name].task = obj
376 return name
377 case API.SpecMeta_d(task=Task_p()), None:
378 return name
379 case API.SpecMeta_d(task=TaskStatus_e()), None:
380 logging.debug("[Instance] Task Object: %s", name)
381 meta = self.specs[name]
382 task = self._tracker._factory.make(meta.spec, ensure=Task_i)
383 # Store it
384 meta.task = task
385 return name
386 case x:
387 raise TypeError(type(x))
388
[docs]
389 def _instantiate_implicit_tasks(self, name:TaskName_p) -> None:
390 spec = self.specs[name].spec
391 for data in self._tracker._subfactory.generate_specs(spec): # type: ignore[attr-defined]
392 implicit = self._tracker._factory.build(data) # type: ignore[attr-defined]
393 if implicit.name not in self.specs:
394 self._tracker.register(implicit)
395 self._tracker._instantiate(implicit.name)
396
[docs]
397class _Verification_m(API.Registry_d):
398
[docs]
399 def verify(self, *, strict:bool=True) -> bool:
400 failures = []
401 for k in (missing:=self.concrete - self.abstract - self.artifacts.keys()):
402 failures.append(f"Abstact Spec {k} is missing")
403
404 # TODO Add more verify heuristics
405 if not bool(failures):
406 return True
407 if strict:
408 raise ValueError("Registry Failed Validation", failures)
409 else:
410 logging.warning("Registry Failed Validation: %s", failures)
411 return False
412
413##--|
414
[docs]
415@Proto(API.Registry_p)
416@Mixin(_Registration_m, _Instantiation_m, _Verification_m)
417class TrackRegistry(API.Registry_d):
418 """ Stores and manipulates specs, tasks, and artifacts """
419
420 def __init__(self, *args, **kwargs):
421 super().__init__(*args, **kwargs)
422 self._delayed_blockers = defaultdict(list)
423
424
[docs]
425 def get_status(self, target:Concrete[TaskName_p|Artifact_i]) -> tuple[TaskStatus_e|ArtifactStatus_e, int]:
426 """ Get the status of a target or artifact """
427 assert(hasattr(self._tracker, "_declare_priority"))
428 assert(hasattr(self._tracker, "_root_node"))
429 if isinstance(target, Artifact_i):
430 return target.get_status(), target.priority
431
432 assert(isinstance(target, TaskName_p))
433 match self.specs.get(target, None):
434 case None if target == self._tracker._root_node:
435 return TaskStatus_e.NAMED, self._tracker._declare_priority
436 case None if target.uuid() and target.de_uniq() in self.specs:
437 return TaskStatus_e.DECLARED, self._tracker._declare_priority
438 case API.SpecMeta_d(task=TaskStatus_e() as status):
439 return status, self._tracker._declare_priority
440 case API.SpecMeta_d(task=Task_p() as _target):
441 return _target.status, _target.priority
442 case _:
443 return TaskStatus_e.NAMED, self._tracker._declare_priority
444
[docs]
445 def set_status(self, target:Concrete[TaskName_p|Artifact_i], status:TaskStatus_e|ArtifactStatus_e) -> bool:
446 """ update the state of a task in the dependency graph
447 Returns True on status update,
448 False on no task or artifact to update.
449 """
450 x : Any
451 instance : TaskName_p
452 ##--|
453 logging.debug("[Status.=] : %s : %s", target, status)
454 match target:
455 case Artifact_i() as x:
456 return False
457 case TaskName_p() as x:
458 instance = x
459 case x:
460 raise TypeError(type(x))
461
462 assert(isinstance(status, TaskStatus_e))
463 match self.specs.get(instance, None):
464 case None:
465 return False
466 case API.SpecMeta_d(task=TaskStatus_e()) as _meta:
467 _meta.task = status
468 return False
469 case API.SpecMeta_d(task=Task_p() as _task):
470 _task.status = status
471 return True
472 case x:
473 raise TypeError(type(x))