Source code for doot.control.tracker.queue

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5
  6# Imports:
  7from __future__ import annotations
  8
  9# ##-- stdlib imports
 10import datetime
 11import enum
 12import functools as ftz
 13import itertools as itz
 14import logging as logmod
 15import pathlib as pl
 16import re
 17import time
 18import types
 19from uuid import UUID, uuid1
 20import weakref
 21
 22# ##-- end stdlib imports
 23
 24# ##-- 3rd party imports
 25import boltons.queueutils
 26# ##-- end 3rd party imports
 27
 28# ##-- 1st party imports
 29import doot
 30import doot.errors
 31from doot.workflow._interface import (TaskMeta_e, TaskStatus_e, ArtifactStatus_e, QueueMeta_e, TaskName_p, TaskSpec_i, ActionSpec_i, Artifact_i, Task_p)
 32from doot.workflow import (DootTask, TaskArtifact, TaskName)
 33
 34# ##-- end 1st party imports
 35
 36from . import _interface as API # noqa: N812
 37
 38# ##-- types
 39# isort: off
 40import abc
 41import collections.abc
 42from typing import TYPE_CHECKING, cast, assert_type, assert_never
 43from typing import Generic, NewType
 44# Protocols:
 45from typing import Protocol, runtime_checkable
 46# Typing Decorators:
 47from typing import no_type_check, final, override, overload
 48
 49if TYPE_CHECKING:
 50    from jgdv import Maybe
 51    from typing import Final
 52    from typing import ClassVar, Any, LiteralString
 53    from typing import Never, Self, Literal
 54    from typing import TypeGuard
 55    from collections.abc import Iterable, Iterator, Callable, Generator
 56    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 57    from doot.workflow import RelationSpec
 58    from .registry import TrackRegistry
 59    from .network import TrackNetwork
 60
 61    type Abstract[T]  = T
 62    type Concrete[T]  = T
 63    type ActionElem   = ActionSpec_i|RelationSpec
 64    type ActionGroup  = list[ActionElem]
 65    type Status  = ArtifactStatus_e|TaskStatus_e
 66
 67##--|
 68from doot.workflow._interface import Task_i
 69# isort: on
 70# ##-- end types
 71
 72##-- logging
 73logging          = logmod.getLogger(__name__)
 74logging.disabled = False
 75##-- end logging
 76
 77##--|
 78
[docs] 79class TrackQueue: 80 """ The queue of active tasks. """ 81 82 active_set : set[Concrete[TaskName_p]|Artifact_i] 83 execution_trace : list[Concrete[TaskName_p|Artifact_i]] 84 # TODO use this instead of _tracker._registry and _tracker._network 85 _tracker : API.WorkflowTracker_i 86 _queue : boltons.queueutils.HeapPriorityQueue 87 88 def __init__(self, *, tracker:API.WorkflowTracker_p) -> None: 89 match tracker: 90 case API.WorkflowTracker_i(): 91 self._tracker = tracker 92 case x: 93 raise TypeError(type(x)) 94 self.active_set = set() 95 self.execution_trace = [] 96 self._queue = boltons.queueutils.HeapPriorityQueue() 97 98 ##--| dunders 99 def __bool__(self) -> bool: 100 return self._queue.peek(default=None) is not None 101 102 ##--| public
[docs] 103 def queue_entry(self, target:str|TaskName_p|Artifact_i, *, from_user:int|bool=False) -> Maybe[Concrete[TaskName_p|Artifact_i]]: 104 """ 105 Queue a task by name|spec|Task_i. 106 registers and instantiates the relevant spec, inserts it into the _tracker._network 107 Does *not* rebuild the _tracker._network 108 109 returns a task name if the _tracker._network has changed, else None. 110 111 kwarg 'from_user' signifies the enty is a starting target, adding cli args if necessary and linking to the root. 112 """ 113 x : Any 114 ##--| 115 match target: 116 case Artifact_i() as art: 117 return self._queue_artifact(art, from_user=from_user) 118 case TaskName_p() | str() as name: 119 return self._queue_task(name, from_user=from_user) 120 case x: 121 raise TypeError(type(x))
122 123
[docs] 124 def deque_entry(self, *, peek:bool=False) -> Concrete[TaskName_p]|Artifact_i: 125 """ remove (or peek) the top task from the _queue. """ 126 assert(hasattr(self._tracker, "set_status")) 127 if peek: 128 return self._queue.peek() 129 130 return self._queue.pop()
131
[docs] 132 def clear_queue(self) -> None: 133 """ Remove everything from the task queue, 134 135 """ 136 # TODO _queue the task's failure/cleanup tasks 137 self.active_set = set() 138 self.task_queue = boltons.queueutils.HeapPriorityQueue()
139 140 ##--| private
[docs] 141 def _queue_task(self, name:str|TaskName_p, *, from_user:int|bool=False) -> Maybe[TaskName_p]: 142 x : Any 143 assert(hasattr(self._tracker, "get_status")) 144 ##--| ensure the name is unique 145 match self._queue_prep_name(name): 146 case None: 147 return None 148 case TaskName_p() | str() as x if x not in self._tracker.specs: 149 raise doot.errors.TrackingError("Unrecognized task name, it may not be registered", x) 150 case TaskName_p() as x if not x.uuid(): 151 inst_name = cast("TaskName_p", self._tracker._instantiate(x)) # type: ignore[attr-defined] 152 case TaskName_p() as x: 153 inst_name = x 154 case x: 155 raise TypeError(type(x)) 156 ##--| connect in the network 157 if inst_name not in self._tracker.network: 158 self._tracker._connect(inst_name, None if bool(from_user) else False) # type: ignore[attr-defined] 159 ##--| update the queue 160 self.active_set.add(inst_name) 161 match self._tracker.specs[inst_name]: 162 case API.SpecMeta_d(task=Task_p() as task): 163 _, priority = self._tracker.get_status(target=inst_name) 164 self._queue.add(inst_name, priority) 165 case API.SpecMeta_d(task=TaskStatus_e()): 166 self._queue.add(inst_name, self._tracker._declare_priority) 167 case x: 168 raise TypeError(type(x)) 169 logging.debug("[Queue] %s", inst_name[:]) 170 return inst_name
171 172
[docs] 173 def _queue_artifact(self, art:Artifact_i, *, from_user:int|bool=False) -> Maybe[Artifact_i]: 174 assert(art in self._tracker.artifacts) 175 target_priority : int = self._tracker._declare_priority 176 self._tracker._connect(art, None if bool(from_user) else False) # type: ignore[arg-type] 177 self.active_set.add(art) # type: ignore[arg-type] 178 self._queue.add(art, priority=target_priority) 179 logging.debug("[Queue.+] : %s", art) 180 return cast("Artifact_i", art)
181
[docs] 182 def _queue_prep_name(self, name:str|TaskName_p) -> Maybe[TaskName_p]: 183 """ Heuristics for queueing task names 184 185 """ 186 match name: 187 case TaskName_p() if name == self._tracker._root_node: 188 return None 189 case TaskName_p() if name in self._tracker.active: 190 return name 191 case TaskName_p() if name in self._tracker.network: 192 return name 193 case TaskName_p() if name in self._tracker.specs: 194 return name 195 case TaskName_p(): 196 raise doot.errors.TrackingError("Unrecognized queue argument provided, it may not be registered", name) 197 case str(): 198 return self._queue_prep_name(TaskName(name)) 199 case x: 200 raise TypeError(type(x))