Source code for doot.control.tracker._interface

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# ruff: noqa:
  6
  7# Imports:
  8from __future__ import annotations
  9
 10# ##-- stdlib imports
 11import atexit#  for @atexit.register
 12import collections
 13import contextlib
 14import datetime
 15import enum
 16import faulthandler
 17import functools as ftz
 18import hashlib
 19import itertools as itz
 20import logging as logmod
 21import pathlib as pl
 22import re
 23import time
 24import types
 25from copy import deepcopy
 26from uuid import UUID, uuid1
 27from weakref import ref
 28
 29# ##-- end stdlib imports
 30
 31# ##-- 1st party imports
 32from doot.workflow._interface import (ArtifactStatus_e, InjectSpec_i, ActionSpec_i,
 33                                      RelationSpec_i, Task_i, TaskSpec_i,
 34                                      TaskStatus_e)
 35
 36# ##-- end 1st party imports
 37
 38# ##-- types
 39# isort: off
 40import abc
 41import collections.abc
 42from typing import TYPE_CHECKING, cast, assert_type, assert_never
 43from typing import Generic, NewType, Never
 44# Protocols:
 45from typing import Protocol, runtime_checkable
 46# Typing Decorators:
 47from typing import no_type_check, final, override, overload
 48
 49if TYPE_CHECKING:
 50    from typing import Final
 51    from typing import ClassVar, Any, LiteralString
 52    from typing import Self, Literal
 53    from typing import TypeGuard
 54    from collections.abc import Iterable, Iterator, Callable, Generator
 55    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 56
 57    from jgdv import Maybe, Ident
 58    from jgdv.structs.chainguard import ChainGuard
 59    from doot.workflow._interface import Task_i, TaskName_p, Artifact_i, Task_p
 60    from doot.workflow._interface import TaskFactory_p, SubTaskFactory_p, DelayedSpec
 61
 62    type Abstract[T] = T
 63    type Concrete[T] = T
 64##--|
 65
 66# isort: on
 67# ##-- end types
 68
 69##-- logging
 70logging = logmod.getLogger(__name__)
 71##-- end logging
 72
[docs] 73class EdgeType_e(enum.Enum): 74 """ Enum describing the possible edges of the task tracker's task network """ 75 76 TASK = enum.auto() # task to task 77 ARTIFACT_UP = enum.auto() # abstract to concrete artifact 78 ARTIFACT_DOWN = enum.auto() # concrete to abstract artifact 79 TASK_CROSS = enum.auto() # Task to artifact 80 ARTIFACT_CROSS = enum.auto() # artifact to task 81 82 default = TASK 83
[docs] 84 @classmethod 85 def artifact_edge_set[T](cls:type) -> set[T]: 86 return {cls.ARTIFACT_UP, cls.ARTIFACT_DOWN, cls.TASK_CROSS} # type: ignore[attr-defined]
87 88# Vars: 89MAX_LOOP : Final[int] = 100 90 91ARTIFACT_EDGES : Final[set[EdgeType_e]] = EdgeType_e.artifact_edge_set() 92CLEANUP : Final[str] = "cleanup" 93DECLARE_PRIORITY : Final[int] = 10 94EXPANDED : Final[str] = "expanded" # Node attribute name 95INITIAL_SOURCE_CHAIN_COUNT : Final[int] = 10 96MIN_PRIORITY : Final[int] = -10 97REACTIVE_ADD : Final[str] = "reactive-add" 98ROOT : Final[str] = "root::_.$gen$" # Root node of dependency graph 99 100SUCCESS_STATUSES : Final[set[TaskStatus_e|ArtifactStatus_e]] = { 101 TaskStatus_e.SUCCESS, 102 TaskStatus_e.TEARDOWN, 103 TaskStatus_e.DEAD, 104 ArtifactStatus_e.EXISTS, 105} 106
[docs] 107class ExecutionPolicy_e(enum.Enum): 108 """ How the task execution will be ordered 109 PRIORITY : Priority Queue with retry, job expansion, dynamic walk of network. 110 DEPTH : No (priority,retry,jobs). basic DFS of the pre-run dependency network 111 BREADTH : No (priority,retry,jobs). basic BFS of the pre-run dependency-network 112 113 """ 114 PRIORITY = enum.auto() # By Task Priority 115 DEPTH = enum.auto() # Depth First Search 116 BREADTH = enum.auto() # Breadth First Search 117 118 default = PRIORITY
119 120##--| Data 121
[docs] 122class SpecMeta_d: 123 """ 124 Registry data for a spec. 125 When spec is abstract, related are the concrete instantiations 126 when spec is concrete, related are the implicit subtasks 127 128 blocked_by are the dependencies not mentioned in the spec 129 injection_source is the injection to run just before executing the task 130 injection_targets are tasks that block this task cleaning up 131 """ 132 __slots__ = ("blocked_by", "injection_source", "injection_targets", "related", "spec", "task") 133 134 spec : TaskSpec_i 135 task : Task_p|TaskStatus_e 136 related : set[TaskName_p] 137 blocked_by : set[TaskName_p|Artifact_i] 138 injection_source : Maybe[tuple[TaskName_p, InjectSpec_i]] 139 injection_targets : set[TaskName_p] 140 141 def __init__(self, *, spec:TaskSpec_i) -> None: 142 self.spec = spec 143 self.task = TaskStatus_e.DECLARED 144 self.related = set() 145 self.blocked_by = set() 146 self.injection_source = None 147 self.injection_targets = set()
148
[docs] 149class ArtifactMeta_d: 150 __slots__ = ("artifact", "blocked_by", "builders", "consumers") 151 152 artifact : Artifact_i 153 blocked_by : set[TaskName_p|Artifact_i] 154 builders : set[TaskName_p] 155 consumers : set[TaskName_p] 156 157 def __init__(self, *, artifact:Artifact_i) -> None: 158 self.artifact = artifact 159 self.blocked_by = set() 160 self.builders = set() 161 self.consumers = set()
162
[docs] 163class Registry_d: 164 165 _tracker : WorkflowTracker_p 166 specs : dict[TaskName_p, SpecMeta_d] 167 artifacts : dict[Artifact_i, ArtifactMeta_d] 168 169 abstract : set[Abstract[TaskName_p] | Artifact_i] 170 concrete : set[Abstract[TaskName_p] | Artifact_i] 171 172 def __init__(self, *, tracker:WorkflowTracker_p) -> None: 173 self._tracker = tracker 174 self.specs = {} 175 self.artifacts = {} 176 self.abstract = set() 177 self.concrete = set()
178 179##--| components 180
[docs] 181@runtime_checkable 182class Registry_p(Protocol): 183
[docs] 184 def register_spec(self, *specs:TaskSpec_i) -> None: ...
185
[docs] 186 def instantiate_spec(self, name:Abstract[TaskName_p], *, force:Maybe[bool|int]=None, extra:Maybe[dict|ChainGuard|bool]=None) -> Maybe[Concrete[TaskName_p]]: ...
187
[docs] 188 def instantiate_relation(self, rel:RelationSpec_i, *, control:Concrete[TaskName_p]) -> Concrete[TaskName_p]: ...
189
[docs] 190 def make_task(self, name:Concrete[TaskName_p], *, task_obj:Maybe[Task_i]=None, parent:Maybe[Concrete[TaskName_p]]=None) -> Concrete[TaskName_p]: ...
191
[docs] 192 def verify(self, *, strict:bool=True) -> bool: ...
193
[docs] 194class Network_p(Protocol): 195 _graph : Any 196 _root_node : TaskName_p 197 succ : Mapping 198 pred : Mapping 199 non_expanded : set[TaskName_p|Artifact_i] 200
[docs] 201 def build_network(self, *, sources:Maybe[Literal[True]|list[Concrete[TaskName_p]|Artifact_i]]=None) -> None: ...
202
[docs] 203 def connect(self, left:Concrete[TaskName_p]|Artifact_i, right:Maybe[Literal[False]|Concrete[TaskName_p]|Artifact_i]=None, **kwargs:Any) -> None: ...
204
[docs] 205 def validate_network(self, *, strict:bool=True) -> bool: ...
206
[docs] 207class Queue_p(Protocol): 208 active_set : set[TaskName_p|Artifact_i] 209
[docs] 210 def queue_entry(self, target:str|TaskName_p|Artifact_i, *, from_user:int|bool=False) -> Maybe[Concrete[TaskName_p|Artifact_i]]: ...
211
[docs] 212 def deque_entry(self, *, peek:bool=False) -> Concrete[TaskName_p]|Artifact_i: ...
213
[docs] 214 def clear_queue(self) -> None: ...
215 216##--| Tracker 217
[docs] 218@runtime_checkable 219class WorkflowTracker_p(Protocol): 220 """ 221 Track tasks that have run, need to run, are running, 222 and have failed. 223 Does not execute anything itself 224 """ 225 ##--| properties 226
[docs] 227 @property 228 def active(self) -> set[TaskName_p]: ...
229
[docs] 230 @property 231 def specs(self) -> dict[TaskName_p, SpecMeta_d]: ...
232
[docs] 233 @property 234 def artifacts(self) -> dict[Artifact_i, ArtifactMeta_d]: ...
235
[docs] 236 @property 237 def concrete(self) -> set[TaskName_p|Artifact_i]: ...
238
[docs] 239 @property 240 def abstract(self) -> set[TaskName_p|Artifact_i]: ...
241
[docs] 242 @property 243 def network(self) -> Mapping: ...
244
[docs] 245 @property 246 def is_valid(self) -> bool: ...
247 248 ##--| public 249
[docs] 250 def register(self, *specs:TaskSpec_i|Artifact_i|DelayedSpec)-> None: ...
251
[docs] 252 def queue(self, name:str|Ident|Concrete[TaskSpec_i]|DelayedSpec, *, from_user:int|bool=False, status:Maybe[TaskStatus_e]=None) -> Maybe[Concrete[Ident]]: ...
253
[docs] 254 def build(self, *, sources:Maybe[Literal[True]|list[Concrete[TaskName_p]|Artifact_i]]=None) -> None: ...
255
[docs] 256 def plan(self, *, policy:Maybe[ExecutionPolicy_e]=None) -> list[TaskName_p|Artifact_i]: ...
257
[docs] 258 def next_for(self, target:Maybe[str|Concrete[Ident]]=None) -> Maybe[Task_p|Artifact_i]: ...
259
[docs] 260 def clear(self) -> None: ...
261 ##--| inspection. TODO to remove 262 263 ##--| internal 264 265 @overload 266 def _instantiate(self, name:Abstract[TaskName_p], *, extra:Maybe[dict|ChainGuard|bool]=None) -> Maybe[Concrete[TaskName_p]]: ... 267 268 @overload 269 def _instantiate(self, rel:RelationSpec_i, *, control:Concrete[TaskName_p]) -> Concrete[TaskName_p]: ... 270
[docs] 271 @overload 272 def _instantiate(self, name:Concrete[TaskName_p], **kwargs:Any) -> Concrete[TaskName_p]: ...
273
[docs] 274 def _connect(self, left:Concrete[TaskName_p]|Artifact_i, right:Maybe[Literal[False]|Concrete[TaskName_p]|Artifact_i]=None, **kwargs:Any) -> None: ...
275
[docs] 276 def _dependency_states_of(self, focus:TaskName_p) -> list[tuple]: ...
277
[docs] 278 def _successor_states_of(self, focus:TaskName_p) -> list[tuple]: ...
279
[docs] 280@runtime_checkable 281class WorkflowTracker_i(WorkflowTracker_p, Protocol): 282 _root_node : TaskName_p 283 _factory : TaskFactory_p 284 _subfactory : SubTaskFactory_p 285 _declare_priority : int 286 _min_priority : int