1#!/usr/bin/env python3
2"""
3
4"""
5
6# Imports:
7from __future__ import annotations
8
9# ##-- stdlib imports
10import datetime
11import enum
12import functools as ftz
13import itertools as itz
14import logging as logmod
15import pathlib as pl
16import re
17import time
18import types
19from uuid import UUID, uuid1
20import weakref
21
22# ##-- end stdlib imports
23
24# ##-- 3rd party imports
25import boltons.queueutils
26# ##-- end 3rd party imports
27
28# ##-- 1st party imports
29import doot
30import doot.errors
31from doot.workflow._interface import (TaskMeta_e, TaskStatus_e, ArtifactStatus_e, QueueMeta_e, TaskName_p, TaskSpec_i, ActionSpec_i, Artifact_i, Task_p)
32from doot.workflow import (DootTask, TaskArtifact, TaskName)
33
34# ##-- end 1st party imports
35
36from . import _interface as API # noqa: N812
37
38# ##-- types
39# isort: off
40import abc
41import collections.abc
42from typing import TYPE_CHECKING, cast, assert_type, assert_never
43from typing import Generic, NewType
44# Protocols:
45from typing import Protocol, runtime_checkable
46# Typing Decorators:
47from typing import no_type_check, final, override, overload
48
49if TYPE_CHECKING:
50 from jgdv import Maybe
51 from typing import Final
52 from typing import ClassVar, Any, LiteralString
53 from typing import Never, Self, Literal
54 from typing import TypeGuard
55 from collections.abc import Iterable, Iterator, Callable, Generator
56 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
57 from doot.workflow import RelationSpec
58 from .registry import TrackRegistry
59 from .network import TrackNetwork
60
61 type Abstract[T] = T
62 type Concrete[T] = T
63 type ActionElem = ActionSpec_i|RelationSpec
64 type ActionGroup = list[ActionElem]
65 type Status = ArtifactStatus_e|TaskStatus_e
66
67##--|
68from doot.workflow._interface import Task_i
69# isort: on
70# ##-- end types
71
72##-- logging
73logging = logmod.getLogger(__name__)
74logging.disabled = False
75##-- end logging
76
77##--|
78
[docs]
79class TrackQueue:
80 """ The queue of active tasks. """
81
82 active_set : set[Concrete[TaskName_p]|Artifact_i]
83 execution_trace : list[Concrete[TaskName_p|Artifact_i]]
84 # TODO use this instead of _tracker._registry and _tracker._network
85 _tracker : API.WorkflowTracker_i
86 _queue : boltons.queueutils.HeapPriorityQueue
87
88 def __init__(self, *, tracker:API.WorkflowTracker_p) -> None:
89 match tracker:
90 case API.WorkflowTracker_i():
91 self._tracker = tracker
92 case x:
93 raise TypeError(type(x))
94 self.active_set = set()
95 self.execution_trace = []
96 self._queue = boltons.queueutils.HeapPriorityQueue()
97
98 ##--| dunders
99 def __bool__(self) -> bool:
100 return self._queue.peek(default=None) is not None
101
102 ##--| public
[docs]
103 def queue_entry(self, target:str|TaskName_p|Artifact_i, *, from_user:int|bool=False) -> Maybe[Concrete[TaskName_p|Artifact_i]]:
104 """
105 Queue a task by name|spec|Task_i.
106 registers and instantiates the relevant spec, inserts it into the _tracker._network
107 Does *not* rebuild the _tracker._network
108
109 returns a task name if the _tracker._network has changed, else None.
110
111 kwarg 'from_user' signifies the enty is a starting target, adding cli args if necessary and linking to the root.
112 """
113 x : Any
114 ##--|
115 match target:
116 case Artifact_i() as art:
117 return self._queue_artifact(art, from_user=from_user)
118 case TaskName_p() | str() as name:
119 return self._queue_task(name, from_user=from_user)
120 case x:
121 raise TypeError(type(x))
122
123
[docs]
124 def deque_entry(self, *, peek:bool=False) -> Concrete[TaskName_p]|Artifact_i:
125 """ remove (or peek) the top task from the _queue. """
126 assert(hasattr(self._tracker, "set_status"))
127 if peek:
128 return self._queue.peek()
129
130 return self._queue.pop()
131
[docs]
132 def clear_queue(self) -> None:
133 """ Remove everything from the task queue,
134
135 """
136 # TODO _queue the task's failure/cleanup tasks
137 self.active_set = set()
138 self.task_queue = boltons.queueutils.HeapPriorityQueue()
139
140 ##--| private
[docs]
141 def _queue_task(self, name:str|TaskName_p, *, from_user:int|bool=False) -> Maybe[TaskName_p]:
142 x : Any
143 assert(hasattr(self._tracker, "get_status"))
144 ##--| ensure the name is unique
145 match self._queue_prep_name(name):
146 case None:
147 return None
148 case TaskName_p() | str() as x if x not in self._tracker.specs:
149 raise doot.errors.TrackingError("Unrecognized task name, it may not be registered", x)
150 case TaskName_p() as x if not x.uuid():
151 inst_name = cast("TaskName_p", self._tracker._instantiate(x)) # type: ignore[attr-defined]
152 case TaskName_p() as x:
153 inst_name = x
154 case x:
155 raise TypeError(type(x))
156 ##--| connect in the network
157 if inst_name not in self._tracker.network:
158 self._tracker._connect(inst_name, None if bool(from_user) else False) # type: ignore[attr-defined]
159 ##--| update the queue
160 self.active_set.add(inst_name)
161 match self._tracker.specs[inst_name]:
162 case API.SpecMeta_d(task=Task_p() as task):
163 _, priority = self._tracker.get_status(target=inst_name)
164 self._queue.add(inst_name, priority)
165 case API.SpecMeta_d(task=TaskStatus_e()):
166 self._queue.add(inst_name, self._tracker._declare_priority)
167 case x:
168 raise TypeError(type(x))
169 logging.debug("[Queue] %s", inst_name[:])
170 return inst_name
171
172
[docs]
173 def _queue_artifact(self, art:Artifact_i, *, from_user:int|bool=False) -> Maybe[Artifact_i]:
174 assert(art in self._tracker.artifacts)
175 target_priority : int = self._tracker._declare_priority
176 self._tracker._connect(art, None if bool(from_user) else False) # type: ignore[arg-type]
177 self.active_set.add(art) # type: ignore[arg-type]
178 self._queue.add(art, priority=target_priority)
179 logging.debug("[Queue.+] : %s", art)
180 return cast("Artifact_i", art)
181
[docs]
182 def _queue_prep_name(self, name:str|TaskName_p) -> Maybe[TaskName_p]:
183 """ Heuristics for queueing task names
184
185 """
186 match name:
187 case TaskName_p() if name == self._tracker._root_node:
188 return None
189 case TaskName_p() if name in self._tracker.active:
190 return name
191 case TaskName_p() if name in self._tracker.network:
192 return name
193 case TaskName_p() if name in self._tracker.specs:
194 return name
195 case TaskName_p():
196 raise doot.errors.TrackingError("Unrecognized queue argument provided, it may not be registered", name)
197 case str():
198 return self._queue_prep_name(TaskName(name))
199 case x:
200 raise TypeError(type(x))