Source code for doot.control.runner.util

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# mypy: disable-error-code="attr-defined"
  6# Imports:
  7from __future__ import annotations
  8
  9# ##-- stdlib imports
 10import datetime
 11import enum
 12import functools as ftz
 13import itertools as itz
 14import logging as logmod
 15import pathlib as pl
 16import re
 17import time
 18import types
 19import weakref
 20from collections import defaultdict
 21from uuid import UUID, uuid1
 22
 23# ##-- end stdlib imports
 24
 25# ##-- 3rd party imports
 26from jgdv.debugging import SignalHandler
 27# ##-- end 3rd party imports
 28
 29# ##-- 1st party imports
 30import doot
 31import doot.errors
 32from doot.workflow._interface import ActionResponse_e as ActRE
 33from doot.workflow._interface import TaskStatus_e
 34from doot.workflow import ActionSpec, TaskSpec, TaskArtifact
 35from doot.workflow._interface import Task_p, Artifact_i
 36
 37# ##-- end 1st party imports
 38
 39# ##-- types
 40# isort: off
 41import abc
 42import collections.abc
 43from typing import TYPE_CHECKING, cast, assert_type, assert_never
 44from typing import Generic, NewType
 45# Protocols:
 46from typing import Protocol, runtime_checkable
 47# Typing Decorators:
 48from typing import no_type_check, final, override, overload
 49
 50if TYPE_CHECKING:
 51    from jgdv import Maybe, Traceback
 52    from typing import Final
 53    from typing import ClassVar, Any, LiteralString
 54    from typing import Never, Self, Literal
 55    from typing import TypeGuard
 56    from collections.abc import Iterable, Iterator, Callable, Generator
 57    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 58
 59    from ._interface import WorkflowRunner_p
 60
 61
 62##--|
 63# isort: on
 64# ##-- end types
 65
 66##-- logging
 67logging    = logmod.getLogger(__name__)
 68##-- end logging
 69
 70dry_run              : Final[bool]            = doot.args.on_fail(False).cmd.args.dry_run()  # noqa: FBT003
 71max_steps            : Final[int]             = doot.config.on_fail(100_000).commands.run.max_steps()
 72fail_prefix          : Final[str]             = doot.constants.printer.fail_prefix
 73loop_entry_msg       : Final[str]             = doot.constants.printer.loop_entry
 74loop_exit_msg        : Final[str]             = doot.constants.printer.loop_exit
 75
 76DEFAULT_SLEEP_LENGTH : Final[int|float]       = doot.config.on_fail(0.2, int|float).commands.run.sleep.task()
 77##--|
 78
[docs] 79class _RunnerCtx_m: 80 """ Mixin for a runner that adds ctx manager functionality """ 81 82 _signal_failure : Maybe[doot.errors.DootError] 83 84 def __init__(self, *args, **kwargs) -> None: # noqa: ANN002, ANN003 85 super().__init__(*args, **kwargs) # type: ignore[safe-super] 86 self._enter_msg = loop_entry_msg 87 self._exit_msg = loop_exit_msg 88 self._signal_failure = None 89 90 def __enter__(self:WorkflowRunner_p) -> WorkflowRunner_p: 91 logging.info("Entering Runner Control") 92 doot.report.gen.trace("Building Task Network...") 93 doot.report.gen.gap() 94 self.tracker.build() 95 doot.report.gen.trace("Task Network Built.") 96 doot.report.gen.detail("Network Composition: %s Nodes, %s Edges, %s Edges from Root.", 97 len(self.tracker.network.nodes), 98 len(self.tracker.network.edges), 99 len(self.tracker.network.pred[self.tracker._root_node])) 100 doot.report.gen.trace("Validating Task Network...") 101 doot.report.gen.gap() 102 self.tracker.validate() 103 doot.report.gen.trace("Validation Complete.") 104 doot.report.gen.line(self._enter_msg) 105 doot.report.wf.root() 106 return self 107 108 def __exit__(self:WorkflowRunner_p, exc_type:type[Exception], exc_value:Exception, exc_traceback:Traceback) -> Literal[False]: 109 logging.info("Exiting Runner Control") 110 # TODO handle exc_types? 111 self._finish() 112 return False 113
[docs] 114 def _finish(self:WorkflowRunner_p) -> None: 115 """finish running tasks, summarizing results using the reporter 116 separate from __exit__ to allow it to be overridden 117 """ 118 logging.info("Running Completed") 119 if self.large_step >= max_steps: 120 doot.report.gen.warn("Runner Hit the Step Limit: %s", max_steps) 121 122 doot.report.wf.finished().gap().line(self._exit_msg) 123 match self._signal_failure: 124 case None: 125 return 126 case doot.errors.DootError(): 127 raise self._signal_failure
128
[docs] 129class _RunnerHandlers_m: 130 """ Mixin for runners with default handlers """ 131 132 _signal_failure : Maybe[doot.errors.DootError] 133
[docs] 134 def handle_success[T:Maybe[Task_p|TaskArtifact]](self:WorkflowRunner_p, task:T) -> T: 135 """ The basic success handler. just informs the tracker of the success """ 136 match task: 137 case None: 138 pass 139 case TaskArtifact() as art: 140 doot.report.wf.result([str(art.path)], info="Success") 141 case Task_p(): 142 doot.report.wf.result([task.name[:]], info="Success") 143 self.tracker.set_status(task.name, TaskStatus_e.SUCCESS) 144 return task
145
[docs] 146 def handle_failure(self:WorkflowRunner_p, failure:Exception) -> None: 147 """ The basic failure handler. 148 Triggers a breakpoint on Interrupt, 149 otherwise informs the tracker of the failure. 150 151 Halts any failed or errored tasks, which propagates to any successors 152 Fails any DootErrors, TrackingErrors, and non-doot errors 153 154 the tracker handle's clearing itself and shutting down 155 """ 156 match failure: 157 case doot.errors.Interrupt(): 158 breakpoint() 159 pass 160 case doot.errors.TaskFailed() as err: 161 self._signal_failure = err 162 doot.report.gen.warn("%s Halting: %s", fail_prefix, err) 163 self.tracker.set_status(err.task, TaskStatus_e.HALTED) 164 case doot.errors.TaskError() as err: 165 doot.report.wf.fail() 166 self._signal_failure = err 167 self.tracker.set_status(err.task, TaskStatus_e.FAILED) 168 raise err 169 case doot.errors.TrackingError() as err: 170 doot.report.wf.fail() 171 self._signal_failure = err 172 raise err 173 case doot.errors.DootError() as err: 174 doot.report.wf.fail() 175 self._signal_failure = err 176 raise err 177 case err: 178 doot.report.wf.fail() 179 self._signal_failure = doot.errors.DootError("Unknown Failure") 180 doot.report.gen.error("%s Unknown failure occurred: %s", fail_prefix, failure) 181 raise err
182
[docs] 183 def notify_artifact(self:WorkflowRunner_p, art:TaskArtifact) -> None: 184 """ A No-op for when the tracker gives an artifact """ 185 doot.report.wf.result(["Artifact: %s", art]) 186 raise doot.errors.StateError("Artifact resolutely does not exist", art)
187
[docs] 188 def sleep_after(self:WorkflowRunner_p, task:Maybe[Task_p|Artifact_i]) -> None: 189 """ 190 The runner's sleep method, which spaces out tasks 191 """ 192 match task: 193 case None: 194 return 195 case TaskArtifact(): 196 return 197 198 assert(isinstance(task, Task_p)) 199 sleep_len = task.spec.extra.on_fail(DEFAULT_SLEEP_LENGTH, int|float).sleep() 200 doot.report.gen.detail("[Sleeping (%s)...]", sleep_len, extra={"colour":"white"}) 201 time.sleep(sleep_len)