1#!/usr/bin/env python3
2"""
3Abstract Specs: A[n]
4Concrete Specs: C[n]
5Task: T[n]
6
7 Expansion: ∀x ∈ C[n].depends_on => A[x] -> C[x]
8 Head: C[1].depends_on[A[n].$head$] => A[n] -> C[n], A[n].head -> C[n].head, connect
9
10"""
11# Imports:
12from __future__ import annotations
13
14# ##-- stdlib imports
15import datetime
16import enum
17import functools as ftz
18import itertools as itz
19import logging as logmod
20import pathlib as pl
21import re
22import time
23import types
24import weakref
25from collections import defaultdict
26from itertools import chain, cycle
27from uuid import UUID, uuid1
28
29# ##-- end stdlib imports
30
31# ##-- 3rd party imports
32from jgdv import Proto
33
34# ##-- end 3rd party imports
35
36# ##-- 1st party imports
37import doot
38import doot.errors
39from doot.workflow.factory import SubTaskFactory, TaskFactory
40from doot.workflow import (ActionSpec, DootTask, InjectSpec, RelationSpec,
41 TaskArtifact, TaskName, TaskSpec)
42from doot.workflow._interface import (CLI_K, Artifact_i, ArtifactStatus_e,
43 InjectSpec_i, RelationSpec_i, Task_i,
44 TaskName_p, TaskSpec_i, TaskStatus_e,
45 DelayedSpec,
46 MUST_INJECT_K)
47
48# ##-- end 1st party imports
49
50# ##-| Local
51from . import _interface as API # noqa: N812
52from .network import TrackNetwork
53from .queue import TrackQueue
54from .registry import TrackRegistry
55
56# # End of Imports.
57
58# ##-- types
59# isort: off
60import abc
61import collections.abc
62from typing import TYPE_CHECKING, cast, assert_type, assert_never
63from typing import Generic, NewType
64# Protocols:
65from typing import Protocol, runtime_checkable
66# Typing Decorators:
67from typing import no_type_check, final, override, overload
68
69if TYPE_CHECKING:
70 from jgdv import Maybe
71 from typing import Final
72 from typing import ClassVar, Any, LiteralString
73 from typing import Never, Self, Literal
74 from typing import TypeGuard
75 from collections.abc import Iterable, Iterator, Callable, Generator
76 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
77 from networkx import DiGraph
78
79 from doot.workflow._interface import TaskFactory_p, SubTaskFactory_p
80 type Abstract[T] = T
81 type Concrete[T] = T
82
83##--|
84from doot.workflow._interface import Task_p
85from ._interface import WorkflowTracker_p
86# isort: on
87# ##-- end types
88
89##-- logging
90logging = logmod.getLogger(__name__)
91##-- end logging
92
93##--|
94
[docs]
95class Tracker_abs:
96 """ A public base implementation of most of a tracker
97 Has three components:
98 _registry : db for specs and tasks
99 _network : the links between specs in the registry
100 _queue : the logic for determining what task to run next
101 """
102 _factory : TaskFactory_p
103 _subfactory : SubTaskFactory_p
104 _registry : API.Registry_p
105 _network : API.Network_p
106 _queue : API.Queue_p
107
108 _declare_priority : int
109 _min_priority : int
110
111 def __init__(self, **kwargs:Any) -> None:
112 factory = kwargs.pop("factory", TaskFactory)
113 subfactory = kwargs.pop("subfactory", SubTaskFactory)
114 registry = kwargs.pop("registry", TrackRegistry)
115 network = kwargs.pop("network", TrackNetwork)
116 queue = kwargs.pop("queue", TrackQueue)
117 self._declare_priority = API.DECLARE_PRIORITY
118 self._min_priority = API.MIN_PRIORITY
119 self._root_node = TaskName(API.ROOT)
120 self._factory = factory()
121 self._subfactory = subfactory()
122 self._registry = registry(tracker=self)
123 self._network = network(tracker=self)
124 self._queue = queue(tracker=self)
125
126 ##--| properties
127
[docs]
128 @property
129 def specs(self) -> dict[TaskName_p, API.SpecMeta_d]:
130 return self._registry.specs # type: ignore[attr-defined]
131
[docs]
132 @property
133 def artifacts(self) -> dict[Artifact_i, API.ArtifactMeta_d]:
134 return self._registry.artifacts # type: ignore[attr-defined]
135
[docs]
136 @property
137 def concrete(self) -> set:
138 return self._registry.concrete # type: ignore[attr-defined]
139
[docs]
140 @property
141 def abstract(self) -> set:
142 assert(hasattr(self._registry, "abstract"))
143 return self._registry.abstract
144
[docs]
145 @property
146 def network(self) -> Mapping:
147 return self._network._graph # type: ignore[attr-defined]
148
[docs]
149 @property
150 def active(self) -> set:
151 return self._queue.active_set
152
[docs]
153 @property
154 def is_valid(self) -> bool:
155 return not bool(self._network.non_expanded)
156
157 ##--| dunders
158
159 def __bool__(self) -> bool:
160 return bool(self._queue)
161
162 ##--| public
163
[docs]
164 def register(self, *specs:TaskSpec_i|Artifact_i|DelayedSpec)-> None:
165 actual : TaskSpec_i
166 queue : list = [*specs]
167 while bool(queue):
168 x = queue.pop()
169 match x:
170 case DelayedSpec():
171 actual = self._upgrade_delayed_to_actual(x)
172 self._registry.register_spec(actual)
173 case TaskSpec_i() if TaskName.Marks.partial in x.name:
174 actual = self._reify_partial_spec(x)
175 self._registry.register_spec(actual)
176 case TaskSpec_i():
177 self._registry.register_spec(x)
178 case Artifact_i():
179 self._registry._register_artifact(x) # type: ignore[attr-defined]
180 case x:
181 raise TypeError(type(x))
182
[docs]
183 def queue(self, name:str|TaskName_p|TaskSpec_i|Artifact_i|DelayedSpec, *, from_user:int|bool=False, status:Maybe[TaskStatus_e]=None, **kwargs:Any) -> Maybe[Concrete[TaskName_p|Artifact_i]]: # noqa: ARG002
184 queued : TaskName_p|Artifact_i
185 ##--|
186 match name:
187 case str() | TaskName_p() | Artifact_i():
188 pass
189 case DelayedSpec() as dspec:
190 self.register(dspec)
191 name = name.target
192 case TaskSpec_i():
193 self.register(name)
194 name = name.name
195 case x:
196 raise TypeError(type(x))
197 match self._queue.queue_entry(name, from_user=from_user), status:
198 case None, _:
199 return None
200 case TaskName_p()|Artifact_i() as queued, None:
201 pass
202 case TaskName_p()|Artifact_i() as queued, TaskStatus_e() as _status:
203 assert(hasattr(self, "set_status"))
204 self.set_status(queued, _status)
205 ##--|
206 assert(hasattr(self, "get_status"))
207 status, priority = self.get_status(target=queued)
208 logging.debug("[Tracker.Queue] : %s (S:%s, P:%s)", queued[:,:], status.name, priority)
209 return queued
210
[docs]
211 def build(self, *, sources:Maybe[Literal[True]|list[Concrete[TaskName_p]|Artifact_i]]=None) -> None:
212 self._network.build_network(sources=sources)
213
[docs]
214 def validate(self) -> None:
215 self._network.validate_network()
216
[docs]
217 def plan(self, *args:Any) -> list:
218 raise NotImplementedError()
219
[docs]
220 def clear(self) -> None:
221 self._queue.clear_queue()
222
[docs]
223 def report(self, target:TaskName_p) -> dict:
224 result : dict
225 ##--|
226 result = {}
227 abstract = target.de_uniq() if target.uuid() else target
228 related : list[TaskName_p] = [abstract]
229 while bool(related):
230 curr = related.pop()
231 related += self.specs[curr].related
232 result[str(curr)] = {str(x) for x in self.specs[curr].related}
233 else:
234 assert(str(target) in result)
235 return result
236 ##--| internal
237
[docs]
238 def _instantiate(self, target:TaskName_p|RelationSpec_i, *args:Any, task:bool=False, **kwargs:Any) -> Maybe[TaskName_p]:
239 match target:
240 case TaskName_p() as x if task:
241 return self._registry.make_task(x, *args, **kwargs) # type: ignore[return-value]
242 case TaskName_p() as x:
243 return self._registry.instantiate_spec(x, *args, **kwargs)
244 case RelationSpec_i() as x:
245 return self._registry.instantiate_relation(target, *args, **kwargs)
246 case x:
247 raise TypeError(type(x))
248
[docs]
249 def _connect(self, left:Concrete[TaskName_p]|Artifact_i, right:Maybe[Literal[False]|Concrete[TaskName_p]|Artifact_i]=None, **kwargs:Any) -> None:
250 self._network.connect(left, right, **kwargs)
251
[docs]
252 def _upgrade_delayed_to_actual(self, spec:DelayedSpec) -> TaskSpec_i:
253 """
254 can't be in taskfactory, as it requires the registered specs
255 """
256 x : Any
257 result : TaskSpec_i
258 base : TaskSpec_i
259 data : dict = {}
260 match spec:
261 case DelayedSpec(base=TaskName_p() as base_name,
262 target=TaskName_p() as target_name,
263 applied=dict() as applied,
264 inject=list() as injections,
265 overrides=dict() as overrides,
266 ):
267 pass
268 case x:
269 raise TypeError(type(x))
270
271 match self.specs.get(base_name, None):
272 case API.SpecMeta_d(spec=TaskSpec_i() as base):
273 pass
274 case _:
275 raise ValueError("The Base for a delayed spec was not found", spec.base)
276
277 data |= applied
278 for inj in injections:
279 assert(isinstance(inj, InjectSpec_i))
280 # apply_from_spec
281 data |= inj.apply_from_spec(base)
282 else:
283 data |= overrides
284 data['name'] = target_name
285 result = self._factory.merge(bot=base, top=data, suffix=False)
286 assert(result.name == target_name)
287 assert(not result.name.uuid())
288 return result
289
[docs]
290 def _reify_partial_spec(self, spec:TaskSpec_i) -> TaskSpec_i:
291 """
292 converts spec(name=group::task.a.b..$partial$, sources[*_, base], data)
293 into spec(name=group::a.b, data)
294 using base
295 can't be in the taskfactory, as it requires registered specs
296
297 """
298 x : Any
299 result : TaskSpec_i
300 base : TaskSpec_i
301 target : TaskName_p
302 ##--|
303 assert(TaskName.Marks.partial in spec.name)
304 match spec.sources[-1]:
305 case TaskName_p() as x if x not in self.specs:
306 raise ValueError("Could not find a partial spec's source", x)
307 case TaskName_p() as x:
308 base = self.specs[x].spec
309 case x:
310 raise TypeError(type(x))
311
312 match spec.name.pop(top=False):
313 case TaskName_p() as adjusted if adjusted in self.specs:
314 raise doot.errors.TrackingError("Tried to reify a partial spec into one that already is registered", spec.name, adjusted)
315 case TaskName_p() as x:
316 target = x
317 case x:
318 raise TypeError(type(x))
319
320 result = self._factory.merge(bot=base, top=spec, suffix=False)
321 result.name = target
322 return result
323
[docs]
324 def _generate_implicit_tasks(self, spec:TaskSpec_i) -> list[TaskSpec_i]:
325 """ Generate implicit subtasks for a concrete spec """
326 assert(spec.name.uuid())
327 return [self._factory.build(x) for x in self._subfactory.generate_specs(spec)]