Source code for doot.control.runner.runner

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# ruff: noqa: N812
  6# : disable-error-code="attr-defined"
  7# Imports:
  8from __future__ import annotations
  9
 10# ##-- stdlib imports
 11import datetime
 12import enum
 13import functools as ftz
 14import itertools as itz
 15import logging as logmod
 16import pathlib as pl
 17import re
 18import time
 19import types
 20from collections import defaultdict
 21from contextlib import nullcontext
 22from uuid import UUID, uuid1
 23
 24# ##-- end stdlib imports
 25
 26# ##-- 3rd party imports
 27import networkx as nx
 28from jgdv import Mixin, Proto
 29from jgdv.debugging import NullHandler, SignalHandler
 30
 31# ##-- end 3rd party imports
 32
 33# ##-- 1st party imports
 34import doot
 35import doot.errors
 36from doot.control.runner._interface import WorkflowRunner_p
 37from doot.workflow import (ActionSpec, RelationSpec, TaskArtifact, TaskName, TaskSpec)
 38from doot.workflow._interface import ActionResponse_e as ActRE
 39from doot.workflow._interface import Job_p, Task_p, TaskName_p, TaskSpec_i, ActionSpec_i, RelationSpec_i
 40
 41# ##-- end 1st party imports
 42
 43# ##-| Local
 44from . import _interface as API # noqa: N812
 45from . import util as RU
 46
 47# # End of Imports.
 48
 49# ##-- types
 50# isort: off
 51import abc
 52import collections.abc
 53from typing import TYPE_CHECKING, cast, assert_type, assert_never
 54from typing import Generic, NewType
 55# Protocols:
 56from typing import Protocol, runtime_checkable
 57# Typing Decorators:
 58from typing import no_type_check, final, override, overload
 59
 60if TYPE_CHECKING:
 61    from doot.control.tracker._interface import WorkflowTracker_p
 62    from doot.workflow._interface import Artifact_i, DelayedSpec
 63    from jgdv import Maybe
 64    from typing import Final
 65    from typing import ClassVar, Any, LiteralString
 66    from typing import Never, Self, Literal
 67    from typing import TypeGuard
 68    from collections.abc import Iterable, Iterator, Callable, Generator
 69    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 70
 71##--|
 72from typing import ContextManager
 73# isort: on
 74# ##-- end types
 75
 76##-- logging
 77logging           = logmod.getLogger(__name__)
 78##-- end logging
 79
 80##--| Vars
 81skip_msg            : Final[str]   = doot.constants.printer.skip_by_condition_msg
 82max_steps           : Final[int]   = doot.config.on_fail(100_000).commands.run.max_steps()
 83hide_empty_cleanup  : Final[bool]  = doot.config.on_fail(False).commands.run.hide_empty_cleanup()  # noqa: FBT003
 84
 85SETUP_GROUP         : Final[str]   = "setup"
 86ACTION_GROUP        : Final[str]   = "actions"
 87FAIL_GROUP          : Final[str]   = "on_fail"
 88DEPENDS_GROUP       : Final[str]   = "depends_on"
 89
 90##--|
 91
[docs] 92class ActionExecutor: 93 """ An internal object handling the logic of running action(groups) of a task """ 94
[docs] 95 def execute_action_group(self, task:Task_p, *, group:str, large_step:int) -> Maybe[tuple[int, ActRE, list]]: 96 """ Execute a group of actions, possibly queue any task specs they produced, 97 and return a count of the actions run + the result 98 """ 99 to_queue : list[TaskName_p|TaskSpec_i|DelayedSpec] 100 group_result : ActRE 101 actions : Iterable[ActionSpec_i] 102 executed_count : int 103 ##--| 104 actions = task.get_action_group(group) 105 106 if not bool(actions): 107 return None 108 109 group_result = ActRE.SUCCESS 110 to_queue = [] 111 executed_count = 0 112 113 for action in self.skip_relation_specs(actions): 114 match self.execute_action(large_step, executed_count, action, task, group=group): 115 case True | None: 116 continue 117 case list() as result: 118 to_queue += result 119 case False: 120 group_result = ActRE.FAIL 121 break 122 case ActRE.SKIP: 123 doot.report.wf.act("skip", skip_msg) 124 group_result = ActRE.SKIP 125 break 126 127 executed_count += 1 128 129 else: # no break. 130 pass 131 132 return executed_count, group_result, to_queue
133
[docs] 134 def skip_relation_specs(self, actions:Iterable) -> Iterator: 135 """ return of True signals the action is a relationspec, so is to be ignored """ 136 for action in actions: 137 match action: 138 case RelationSpec(): 139 pass 140 case ActionSpec() as act: 141 yield act 142 case _: 143 raise doot.errors.TaskError("Task Failed: Bad Action: %s", repr(action))
144
[docs] 145 def execute_action(self, large_step:int, count:int, action:ActionSpec_i, task:Task_p, group:Maybe[str]=None) -> ActRE|list: 146 """ Run the given action of a specific task. 147 148 returns either a list of specs to (potentially) queue, 149 or an ActRE describing the action result. 150 151 """ 152 result : ActRE|list 153 ##--| 154 task.internal_state['_action_step'] = count 155 match group: 156 case str(): 157 doot.report.wf.act(f"{large_step}.{group}.{count}", str(action.do)) 158 case None: 159 doot.report.wf.act(f"{large_step}._.{count}", str(action.do)) 160 161 logging.debug("Action Executing for Task: %s", task.name) 162 logging.debug("Action State: %s.%s: args=%s kwargs=%s. state(size)=%s", large_step, count, action.args, dict(action.kwargs), len(task.internal_state.keys())) 163 response = action(task.internal_state) 164 match response: 165 case None | True: 166 result = ActRE.SUCCESS 167 case False | ActRE.FAIL: 168 raise doot.errors.TaskFailed("Task %s: Action Failed: %s", task.name, action.do, task=task.spec) 169 case ActRE.SKIP as result: 170 # result will be returned, and expand_job/execute_task will handle it 171 pass 172 case dict() as data: # update the task's state 173 task.internal_state.update({str(k):v for k,v in data.items()}) 174 result = ActRE.SUCCESS 175 case list() as data if isinstance(task, Job_p): 176 result = data 177 case x: 178 raise doot.errors.TaskError("Task %s: Action %s Failed: Returned an unplanned for value: %s", task.name, action.do, x, task=task.spec) 179 180 return result
181
[docs] 182 def test_conditions(self, task:Task_p, *, large_step:int) -> bool: 183 """ run a task's depends_on group, coercing to a bool 184 returns False if the runner should skip the rest of the task 185 """ 186 match self.execute_action_group(task, group=DEPENDS_GROUP, large_step=large_step): 187 case None: 188 return True 189 case _, ActRE.SKIP | ActRE.FAIL, _: 190 return False 191 case _: 192 return True
193##--| 194
[docs] 195@Proto(WorkflowRunner_p, check=False) 196@Mixin(RU._RunnerCtx_m, RU._RunnerHandlers_m, None) 197class DootRunner: 198 """ The simplest single threaded task runner """ 199 200 large_step : int 201 tracker : WorkflowTracker_p 202 teardown_list : list 203 executor : ActionExecutor 204 205 def __init__(self:Self, *, tracker:WorkflowTracker_p, executor:Maybe[ActionExecutor]=None): 206 super().__init__() 207 self.large_step = 0 208 self.tracker = tracker 209 self.executor = executor or ActionExecutor() 210 self.teardown_list = [] # list of tasks to teardown 211 212 def __call__(self, *tasks:str, handler:Maybe[API.Handler]=None): #noqa: ARG002 213 """ tasks are initial targets to run. 214 so loop on the tracker, getting the next task, 215 running its actions, 216 and repeating, 217 until done 218 219 if task is a job, it is expanded and added into the tracker 220 """ 221 match handler: 222 case True: 223 handler = SignalHandler() 224 case False: 225 handler = NullHandler() 226 case type() as x: 227 handler = x() 228 case x if hasattr(x, "__enter__"): 229 handler = x 230 case _: 231 handler = nullcontext() 232 233 assert(isinstance(handler, ContextManager)) 234 with handler: 235 while bool(self.tracker) and self.large_step < max_steps: 236 self.run_next_task() 237 else: 238 pass 239
[docs] 240 def run_next_task(self) -> None: 241 """ 242 Get the next task from the tracker, expand/run it, 243 and handle the result/failure 244 """ 245 task : Maybe[Task_p|Artifact_i] = None 246 try: 247 match (task:=self.tracker.next_for()): 248 case None: 249 pass 250 case TaskArtifact(): 251 self.notify_artifact(task) 252 case Job_p(): 253 self.expand_job(task) 254 case Task_p(): 255 self.execute_task(task) 256 case x: 257 doot.report.gen.error("Unknown Value provided to runner: %s", x) 258 except doot.errors.TaskError as err: 259 err.task = task 260 self.handle_failure(err) 261 except doot.errors.DootError as err: 262 self.handle_failure(err) 263 except Exception as err: 264 doot.report.wf.fail(info="Exception", msg=str(err)) 265 self.tracker.clear() 266 raise 267 else: 268 self.handle_success(task) 269 self.sleep_after(task) 270 self.large_step += 1
271
[docs] 272 def expand_job(self, job:Job_p) -> None: 273 """ turn a job into all of its tasks, including teardowns """ 274 logmod.debug("-- Expanding Job %s: %s", self.large_step, job.name) 275 assert(isinstance(job, Job_p)) 276 try: 277 doot.report.wf.branch(job.spec.name, info=f"Job {self.large_step}") 278 if not self.executor.test_conditions(job, large_step=self.large_step): 279 return 280 281 self.executor.execute_action_group(job, group=SETUP_GROUP, large_step=self.large_step) 282 match self.executor.execute_action_group(job, group=ACTION_GROUP, large_step=self.large_step): 283 case None: 284 pass 285 case int(), ActRE(), [*xs]: 286 self._queue_more_tasks(job.name, xs) 287 except doot.errors.DootError as err: 288 self.executor.execute_action_group(job, group=FAIL_GROUP, large_step=self.large_step) 289 raise
290
[docs] 291 def execute_task(self, task:Task_p) -> None: 292 """ execute a single task's actions """ 293 logmod.debug("-- Expanding Task %s: %s", self.large_step, task.name) 294 assert(not isinstance(task, Job_p)) 295 try: 296 doot.report.wf.branch(task.spec.name, info=f"Task {self.large_step}") 297 if not self.executor.test_conditions(task, large_step=self.large_step): 298 return 299 300 self.executor.execute_action_group(task, group=SETUP_GROUP, large_step=self.large_step) 301 self.executor.execute_action_group(task, group=ACTION_GROUP, large_step=self.large_step) 302 except doot.errors.DootError as err: 303 self.executor.execute_action_group(task, group=FAIL_GROUP, large_step=self.large_step) 304 raise
305
[docs] 306 def _queue_more_tasks(self, source:TaskName_p, new_tasks:list) -> None: 307 """ When 'allowed', an action group can queue more tasks in the tracker, 308 can return a new ActRE to describe the result status of this group 309 """ 310 new_nodes : list[TaskName_p] = [] 311 failures = [] 312 for spec in new_tasks: 313 match self.tracker.queue(spec): 314 case None: 315 failures.append(spec.name) 316 case TaskName_p() as x: 317 new_nodes.append(x) 318 319 if bool(failures): 320 raise doot.errors.JobExpansionError("Queuing generated specs failed", source, failures) 321 322 if bool(new_nodes): 323 self.tracker.build(sources=new_nodes) # type: ignore[arg-type]
324 325 ##--| handlers 326
[docs] 327 def handle_success[T:Task_p|Artifact_i](self, task:Maybe[T]) -> Maybe[T]: 328 raise NotImplementedError()
329
[docs] 330 def handle_failure(self, failure:Exception) -> None: 331 raise NotImplementedError()
332
[docs] 333 def sleep_after[T:Task_p|Artifact_i](self, task:Maybe[T]) -> None: 334 raise NotImplementedError()