Source code for doot.control.tracker._base

  1#!/usr/bin/env python3
  2"""
  3Abstract Specs: A[n]
  4Concrete Specs: C[n]
  5Task:           T[n]
  6
  7  Expansion: ∀x ∈ C[n].depends_on => A[x] -> C[x]
  8  Head: C[1].depends_on[A[n].$head$] => A[n] -> C[n], A[n].head -> C[n].head, connect
  9
 10"""
 11# Imports:
 12from __future__ import annotations
 13
 14# ##-- stdlib imports
 15import datetime
 16import enum
 17import functools as ftz
 18import itertools as itz
 19import logging as logmod
 20import pathlib as pl
 21import re
 22import time
 23import types
 24import weakref
 25from collections import defaultdict
 26from itertools import chain, cycle
 27from uuid import UUID, uuid1
 28
 29# ##-- end stdlib imports
 30
 31# ##-- 3rd party imports
 32from jgdv import Proto
 33
 34# ##-- end 3rd party imports
 35
 36# ##-- 1st party imports
 37import doot
 38import doot.errors
 39from doot.workflow.factory import SubTaskFactory, TaskFactory
 40from doot.workflow import (ActionSpec, DootTask, InjectSpec, RelationSpec,
 41                           TaskArtifact, TaskName, TaskSpec)
 42from doot.workflow._interface import (CLI_K, Artifact_i, ArtifactStatus_e,
 43                                      InjectSpec_i, RelationSpec_i, Task_i,
 44                                      TaskName_p, TaskSpec_i, TaskStatus_e,
 45                                      DelayedSpec,
 46                                      MUST_INJECT_K)
 47
 48# ##-- end 1st party imports
 49
 50# ##-| Local
 51from . import _interface as API # noqa: N812
 52from .network import TrackNetwork
 53from .queue import TrackQueue
 54from .registry import TrackRegistry
 55
 56# # End of Imports.
 57
 58# ##-- types
 59# isort: off
 60import abc
 61import collections.abc
 62from typing import TYPE_CHECKING, cast, assert_type, assert_never
 63from typing import Generic, NewType
 64# Protocols:
 65from typing import Protocol, runtime_checkable
 66# Typing Decorators:
 67from typing import no_type_check, final, override, overload
 68
 69if TYPE_CHECKING:
 70    from jgdv import Maybe
 71    from typing import Final
 72    from typing import ClassVar, Any, LiteralString
 73    from typing import Never, Self, Literal
 74    from typing import TypeGuard
 75    from collections.abc import Iterable, Iterator, Callable, Generator
 76    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 77    from networkx import DiGraph
 78
 79    from doot.workflow._interface import TaskFactory_p, SubTaskFactory_p
 80    type Abstract[T] = T
 81    type Concrete[T] = T
 82
 83##--|
 84from doot.workflow._interface import Task_p
 85from ._interface import WorkflowTracker_p
 86# isort: on
 87# ##-- end types
 88
 89##-- logging
 90logging    = logmod.getLogger(__name__)
 91##-- end logging
 92
 93##--|
 94
[docs] 95class Tracker_abs: 96 """ A public base implementation of most of a tracker 97 Has three components: 98 _registry : db for specs and tasks 99 _network : the links between specs in the registry 100 _queue : the logic for determining what task to run next 101 """ 102 _factory : TaskFactory_p 103 _subfactory : SubTaskFactory_p 104 _registry : API.Registry_p 105 _network : API.Network_p 106 _queue : API.Queue_p 107 108 _declare_priority : int 109 _min_priority : int 110 111 def __init__(self, **kwargs:Any) -> None: 112 factory = kwargs.pop("factory", TaskFactory) 113 subfactory = kwargs.pop("subfactory", SubTaskFactory) 114 registry = kwargs.pop("registry", TrackRegistry) 115 network = kwargs.pop("network", TrackNetwork) 116 queue = kwargs.pop("queue", TrackQueue) 117 self._declare_priority = API.DECLARE_PRIORITY 118 self._min_priority = API.MIN_PRIORITY 119 self._root_node = TaskName(API.ROOT) 120 self._factory = factory() 121 self._subfactory = subfactory() 122 self._registry = registry(tracker=self) 123 self._network = network(tracker=self) 124 self._queue = queue(tracker=self) 125 126 ##--| properties 127
[docs] 128 @property 129 def specs(self) -> dict[TaskName_p, API.SpecMeta_d]: 130 return self._registry.specs # type: ignore[attr-defined]
131
[docs] 132 @property 133 def artifacts(self) -> dict[Artifact_i, API.ArtifactMeta_d]: 134 return self._registry.artifacts # type: ignore[attr-defined]
135
[docs] 136 @property 137 def concrete(self) -> set: 138 return self._registry.concrete # type: ignore[attr-defined]
139
[docs] 140 @property 141 def abstract(self) -> set: 142 assert(hasattr(self._registry, "abstract")) 143 return self._registry.abstract
144
[docs] 145 @property 146 def network(self) -> Mapping: 147 return self._network._graph # type: ignore[attr-defined]
148
[docs] 149 @property 150 def active(self) -> set: 151 return self._queue.active_set
152
[docs] 153 @property 154 def is_valid(self) -> bool: 155 return not bool(self._network.non_expanded)
156 157 ##--| dunders 158 159 def __bool__(self) -> bool: 160 return bool(self._queue) 161 162 ##--| public 163
[docs] 164 def register(self, *specs:TaskSpec_i|Artifact_i|DelayedSpec)-> None: 165 actual : TaskSpec_i 166 queue : list = [*specs] 167 while bool(queue): 168 x = queue.pop() 169 match x: 170 case DelayedSpec(): 171 actual = self._upgrade_delayed_to_actual(x) 172 self._registry.register_spec(actual) 173 case TaskSpec_i() if TaskName.Marks.partial in x.name: 174 actual = self._reify_partial_spec(x) 175 self._registry.register_spec(actual) 176 case TaskSpec_i(): 177 self._registry.register_spec(x) 178 case Artifact_i(): 179 self._registry._register_artifact(x) # type: ignore[attr-defined] 180 case x: 181 raise TypeError(type(x))
182
[docs] 183 def queue(self, name:str|TaskName_p|TaskSpec_i|Artifact_i|DelayedSpec, *, from_user:int|bool=False, status:Maybe[TaskStatus_e]=None, **kwargs:Any) -> Maybe[Concrete[TaskName_p|Artifact_i]]: # noqa: ARG002 184 queued : TaskName_p|Artifact_i 185 ##--| 186 match name: 187 case str() | TaskName_p() | Artifact_i(): 188 pass 189 case DelayedSpec() as dspec: 190 self.register(dspec) 191 name = name.target 192 case TaskSpec_i(): 193 self.register(name) 194 name = name.name 195 case x: 196 raise TypeError(type(x)) 197 match self._queue.queue_entry(name, from_user=from_user), status: 198 case None, _: 199 return None 200 case TaskName_p()|Artifact_i() as queued, None: 201 pass 202 case TaskName_p()|Artifact_i() as queued, TaskStatus_e() as _status: 203 assert(hasattr(self, "set_status")) 204 self.set_status(queued, _status) 205 ##--| 206 assert(hasattr(self, "get_status")) 207 status, priority = self.get_status(target=queued) 208 logging.debug("[Tracker.Queue] : %s (S:%s, P:%s)", queued[:,:], status.name, priority) 209 return queued
210
[docs] 211 def build(self, *, sources:Maybe[Literal[True]|list[Concrete[TaskName_p]|Artifact_i]]=None) -> None: 212 self._network.build_network(sources=sources)
213
[docs] 214 def validate(self) -> None: 215 self._network.validate_network()
216
[docs] 217 def plan(self, *args:Any) -> list: 218 raise NotImplementedError()
219
[docs] 220 def clear(self) -> None: 221 self._queue.clear_queue()
222
[docs] 223 def report(self, target:TaskName_p) -> dict: 224 result : dict 225 ##--| 226 result = {} 227 abstract = target.de_uniq() if target.uuid() else target 228 related : list[TaskName_p] = [abstract] 229 while bool(related): 230 curr = related.pop() 231 related += self.specs[curr].related 232 result[str(curr)] = {str(x) for x in self.specs[curr].related} 233 else: 234 assert(str(target) in result) 235 return result
236 ##--| internal 237
[docs] 238 def _instantiate(self, target:TaskName_p|RelationSpec_i, *args:Any, task:bool=False, **kwargs:Any) -> Maybe[TaskName_p]: 239 match target: 240 case TaskName_p() as x if task: 241 return self._registry.make_task(x, *args, **kwargs) # type: ignore[return-value] 242 case TaskName_p() as x: 243 return self._registry.instantiate_spec(x, *args, **kwargs) 244 case RelationSpec_i() as x: 245 return self._registry.instantiate_relation(target, *args, **kwargs) 246 case x: 247 raise TypeError(type(x))
248
[docs] 249 def _connect(self, left:Concrete[TaskName_p]|Artifact_i, right:Maybe[Literal[False]|Concrete[TaskName_p]|Artifact_i]=None, **kwargs:Any) -> None: 250 self._network.connect(left, right, **kwargs)
251
[docs] 252 def _upgrade_delayed_to_actual(self, spec:DelayedSpec) -> TaskSpec_i: 253 """ 254 can't be in taskfactory, as it requires the registered specs 255 """ 256 x : Any 257 result : TaskSpec_i 258 base : TaskSpec_i 259 data : dict = {} 260 match spec: 261 case DelayedSpec(base=TaskName_p() as base_name, 262 target=TaskName_p() as target_name, 263 applied=dict() as applied, 264 inject=list() as injections, 265 overrides=dict() as overrides, 266 ): 267 pass 268 case x: 269 raise TypeError(type(x)) 270 271 match self.specs.get(base_name, None): 272 case API.SpecMeta_d(spec=TaskSpec_i() as base): 273 pass 274 case _: 275 raise ValueError("The Base for a delayed spec was not found", spec.base) 276 277 data |= applied 278 for inj in injections: 279 assert(isinstance(inj, InjectSpec_i)) 280 # apply_from_spec 281 data |= inj.apply_from_spec(base) 282 else: 283 data |= overrides 284 data['name'] = target_name 285 result = self._factory.merge(bot=base, top=data, suffix=False) 286 assert(result.name == target_name) 287 assert(not result.name.uuid()) 288 return result
289
[docs] 290 def _reify_partial_spec(self, spec:TaskSpec_i) -> TaskSpec_i: 291 """ 292 converts spec(name=group::task.a.b..$partial$, sources[*_, base], data) 293 into spec(name=group::a.b, data) 294 using base 295 can't be in the taskfactory, as it requires registered specs 296 297 """ 298 x : Any 299 result : TaskSpec_i 300 base : TaskSpec_i 301 target : TaskName_p 302 ##--| 303 assert(TaskName.Marks.partial in spec.name) 304 match spec.sources[-1]: 305 case TaskName_p() as x if x not in self.specs: 306 raise ValueError("Could not find a partial spec's source", x) 307 case TaskName_p() as x: 308 base = self.specs[x].spec 309 case x: 310 raise TypeError(type(x)) 311 312 match spec.name.pop(top=False): 313 case TaskName_p() as adjusted if adjusted in self.specs: 314 raise doot.errors.TrackingError("Tried to reify a partial spec into one that already is registered", spec.name, adjusted) 315 case TaskName_p() as x: 316 target = x 317 case x: 318 raise TypeError(type(x)) 319 320 result = self._factory.merge(bot=base, top=spec, suffix=False) 321 result.name = target 322 return result
323
[docs] 324 def _generate_implicit_tasks(self, spec:TaskSpec_i) -> list[TaskSpec_i]: 325 """ Generate implicit subtasks for a concrete spec """ 326 assert(spec.name.uuid()) 327 return [self._factory.build(x) for x in self._subfactory.generate_specs(spec)]