1#!/usr/bin/env python3
2"""
3
4"""
5# ruff: noqa:
6
7# Imports:
8from __future__ import annotations
9
10# ##-- stdlib imports
11import atexit# for @atexit.register
12import collections
13import contextlib
14import datetime
15import enum
16import faulthandler
17import functools as ftz
18import hashlib
19import itertools as itz
20import logging as logmod
21import pathlib as pl
22import re
23import time
24import types
25from copy import deepcopy
26from uuid import UUID, uuid1
27from weakref import ref
28
29# ##-- end stdlib imports
30
31# ##-- 1st party imports
32from doot.workflow._interface import (ArtifactStatus_e, InjectSpec_i, ActionSpec_i,
33 RelationSpec_i, Task_i, TaskSpec_i,
34 TaskStatus_e)
35
36# ##-- end 1st party imports
37
38# ##-- types
39# isort: off
40import abc
41import collections.abc
42from typing import TYPE_CHECKING, cast, assert_type, assert_never
43from typing import Generic, NewType, Never
44# Protocols:
45from typing import Protocol, runtime_checkable
46# Typing Decorators:
47from typing import no_type_check, final, override, overload
48
49if TYPE_CHECKING:
50 from typing import Final
51 from typing import ClassVar, Any, LiteralString
52 from typing import Self, Literal
53 from typing import TypeGuard
54 from collections.abc import Iterable, Iterator, Callable, Generator
55 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
56
57 from jgdv import Maybe, Ident
58 from jgdv.structs.chainguard import ChainGuard
59 from doot.workflow._interface import Task_i, TaskName_p, Artifact_i, Task_p
60 from doot.workflow._interface import TaskFactory_p, SubTaskFactory_p, DelayedSpec
61
62 type Abstract[T] = T
63 type Concrete[T] = T
64##--|
65
66# isort: on
67# ##-- end types
68
69##-- logging
70logging = logmod.getLogger(__name__)
71##-- end logging
72
[docs]
73class EdgeType_e(enum.Enum):
74 """ Enum describing the possible edges of the task tracker's task network """
75
76 TASK = enum.auto() # task to task
77 ARTIFACT_UP = enum.auto() # abstract to concrete artifact
78 ARTIFACT_DOWN = enum.auto() # concrete to abstract artifact
79 TASK_CROSS = enum.auto() # Task to artifact
80 ARTIFACT_CROSS = enum.auto() # artifact to task
81
82 default = TASK
83
[docs]
84 @classmethod
85 def artifact_edge_set[T](cls:type) -> set[T]:
86 return {cls.ARTIFACT_UP, cls.ARTIFACT_DOWN, cls.TASK_CROSS} # type: ignore[attr-defined]
87
88# Vars:
89MAX_LOOP : Final[int] = 100
90
91ARTIFACT_EDGES : Final[set[EdgeType_e]] = EdgeType_e.artifact_edge_set()
92CLEANUP : Final[str] = "cleanup"
93DECLARE_PRIORITY : Final[int] = 10
94EXPANDED : Final[str] = "expanded" # Node attribute name
95INITIAL_SOURCE_CHAIN_COUNT : Final[int] = 10
96MIN_PRIORITY : Final[int] = -10
97REACTIVE_ADD : Final[str] = "reactive-add"
98ROOT : Final[str] = "root::_.$gen$" # Root node of dependency graph
99
100SUCCESS_STATUSES : Final[set[TaskStatus_e|ArtifactStatus_e]] = {
101 TaskStatus_e.SUCCESS,
102 TaskStatus_e.TEARDOWN,
103 TaskStatus_e.DEAD,
104 ArtifactStatus_e.EXISTS,
105}
106
[docs]
107class ExecutionPolicy_e(enum.Enum):
108 """ How the task execution will be ordered
109 PRIORITY : Priority Queue with retry, job expansion, dynamic walk of network.
110 DEPTH : No (priority,retry,jobs). basic DFS of the pre-run dependency network
111 BREADTH : No (priority,retry,jobs). basic BFS of the pre-run dependency-network
112
113 """
114 PRIORITY = enum.auto() # By Task Priority
115 DEPTH = enum.auto() # Depth First Search
116 BREADTH = enum.auto() # Breadth First Search
117
118 default = PRIORITY
119
120##--| Data
121
148
162
[docs]
163class Registry_d:
164
165 _tracker : WorkflowTracker_p
166 specs : dict[TaskName_p, SpecMeta_d]
167 artifacts : dict[Artifact_i, ArtifactMeta_d]
168
169 abstract : set[Abstract[TaskName_p] | Artifact_i]
170 concrete : set[Abstract[TaskName_p] | Artifact_i]
171
172 def __init__(self, *, tracker:WorkflowTracker_p) -> None:
173 self._tracker = tracker
174 self.specs = {}
175 self.artifacts = {}
176 self.abstract = set()
177 self.concrete = set()
178
179##--| components
180
[docs]
181@runtime_checkable
182class Registry_p(Protocol):
183
[docs]
184 def register_spec(self, *specs:TaskSpec_i) -> None: ...
185
[docs]
186 def instantiate_spec(self, name:Abstract[TaskName_p], *, force:Maybe[bool|int]=None, extra:Maybe[dict|ChainGuard|bool]=None) -> Maybe[Concrete[TaskName_p]]: ...
187
[docs]
188 def instantiate_relation(self, rel:RelationSpec_i, *, control:Concrete[TaskName_p]) -> Concrete[TaskName_p]: ...
189
[docs]
190 def make_task(self, name:Concrete[TaskName_p], *, task_obj:Maybe[Task_i]=None, parent:Maybe[Concrete[TaskName_p]]=None) -> Concrete[TaskName_p]: ...
191
[docs]
192 def verify(self, *, strict:bool=True) -> bool: ...
193
[docs]
194class Network_p(Protocol):
195 _graph : Any
196 _root_node : TaskName_p
197 succ : Mapping
198 pred : Mapping
199 non_expanded : set[TaskName_p|Artifact_i]
200
[docs]
201 def build_network(self, *, sources:Maybe[Literal[True]|list[Concrete[TaskName_p]|Artifact_i]]=None) -> None: ...
202
[docs]
203 def connect(self, left:Concrete[TaskName_p]|Artifact_i, right:Maybe[Literal[False]|Concrete[TaskName_p]|Artifact_i]=None, **kwargs:Any) -> None: ...
204
[docs]
205 def validate_network(self, *, strict:bool=True) -> bool: ...
206
[docs]
207class Queue_p(Protocol):
208 active_set : set[TaskName_p|Artifact_i]
209
[docs]
210 def queue_entry(self, target:str|TaskName_p|Artifact_i, *, from_user:int|bool=False) -> Maybe[Concrete[TaskName_p|Artifact_i]]: ...
211
[docs]
212 def deque_entry(self, *, peek:bool=False) -> Concrete[TaskName_p]|Artifact_i: ...
213
[docs]
214 def clear_queue(self) -> None: ...
215
216##--| Tracker
217
[docs]
218@runtime_checkable
219class WorkflowTracker_p(Protocol):
220 """
221 Track tasks that have run, need to run, are running,
222 and have failed.
223 Does not execute anything itself
224 """
225 ##--| properties
226
[docs]
227 @property
228 def active(self) -> set[TaskName_p]: ...
229
[docs]
230 @property
231 def specs(self) -> dict[TaskName_p, SpecMeta_d]: ...
232
[docs]
233 @property
234 def artifacts(self) -> dict[Artifact_i, ArtifactMeta_d]: ...
235
[docs]
236 @property
237 def concrete(self) -> set[TaskName_p|Artifact_i]: ...
238
[docs]
239 @property
240 def abstract(self) -> set[TaskName_p|Artifact_i]: ...
241
[docs]
242 @property
243 def network(self) -> Mapping: ...
244
[docs]
245 @property
246 def is_valid(self) -> bool: ...
247
248 ##--| public
249
[docs]
250 def register(self, *specs:TaskSpec_i|Artifact_i|DelayedSpec)-> None: ...
251
[docs]
252 def queue(self, name:str|Ident|Concrete[TaskSpec_i]|DelayedSpec, *, from_user:int|bool=False, status:Maybe[TaskStatus_e]=None) -> Maybe[Concrete[Ident]]: ...
253
[docs]
254 def build(self, *, sources:Maybe[Literal[True]|list[Concrete[TaskName_p]|Artifact_i]]=None) -> None: ...
255
[docs]
256 def plan(self, *, policy:Maybe[ExecutionPolicy_e]=None) -> list[TaskName_p|Artifact_i]: ...
257
[docs]
258 def next_for(self, target:Maybe[str|Concrete[Ident]]=None) -> Maybe[Task_p|Artifact_i]: ...
259
[docs]
260 def clear(self) -> None: ...
261 ##--| inspection. TODO to remove
262
263 ##--| internal
264
265 @overload
266 def _instantiate(self, name:Abstract[TaskName_p], *, extra:Maybe[dict|ChainGuard|bool]=None) -> Maybe[Concrete[TaskName_p]]: ...
267
268 @overload
269 def _instantiate(self, rel:RelationSpec_i, *, control:Concrete[TaskName_p]) -> Concrete[TaskName_p]: ...
270
[docs]
271 @overload
272 def _instantiate(self, name:Concrete[TaskName_p], **kwargs:Any) -> Concrete[TaskName_p]: ...
273
[docs]
274 def _connect(self, left:Concrete[TaskName_p]|Artifact_i, right:Maybe[Literal[False]|Concrete[TaskName_p]|Artifact_i]=None, **kwargs:Any) -> None: ...
275
[docs]
276 def _dependency_states_of(self, focus:TaskName_p) -> list[tuple]: ...
277
[docs]
278 def _successor_states_of(self, focus:TaskName_p) -> list[tuple]: ...
279
[docs]
280@runtime_checkable
281class WorkflowTracker_i(WorkflowTracker_p, Protocol):
282 _root_node : TaskName_p
283 _factory : TaskFactory_p
284 _subfactory : SubTaskFactory_p
285 _declare_priority : int
286 _min_priority : int