Source code for doot.workflow.structs.inject_spec

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5# ruff: noqa: FBT001, FBT002, ERA001, TC002, TC003
  6# Imports:
  7from __future__ import annotations
  8
  9# ##-- stdlib imports
 10import datetime
 11import enum
 12import functools as ftz
 13import itertools as itz
 14import logging as logmod
 15import pathlib as pl
 16import re
 17import time
 18import types
 19import weakref
 20from uuid import UUID, uuid1
 21
 22# ##-- end stdlib imports
 23
 24# ##-- 3rd party imports
 25from jgdv.structs.chainguard import ChainGuard
 26from jgdv.structs.strang import StrangError
 27from jgdv.cli import ParamSpec
 28# ##-- end 3rd party imports
 29
 30# ##-- 1st party imports
 31import doot
 32import doot.errors
 33from doot.util.dkey import DKey
 34
 35# ##-- end 1st party imports
 36
 37from .._interface import TaskSpec_i, Task_i, Task_p, MUST_INJECT_K
 38
 39# ##-- types
 40# isort: off
 41import abc
 42import collections.abc
 43import typing
 44from typing import Generic, cast, assert_type, assert_never
 45# Protocols:
 46from typing import Protocol, runtime_checkable
 47# Typing Decorators:
 48from typing import TYPE_CHECKING, no_type_check, final, override, overload
 49# from dataclasses import InitVar, dataclass, field
 50from pydantic import BaseModel, Field, model_validator, field_validator, ValidationError
 51from jgdv import Maybe
 52from collections.abc import Mapping
 53
 54if TYPE_CHECKING:
 55   from typing import Final
 56   from typing import ClassVar, Tuple, Any, LiteralString
 57   from typing import Never, Self, Literal
 58   from typing import TypeGuard
 59   from collections.abc import Iterable, Iterator, Callable, Generator
 60   from collections.abc import Sequence, MutableMapping, Hashable
 61
 62   from .. import TaskSpec
 63   from jgdv._abstract.protocols.general import SpecStruct_p
 64   from .. import TaskName
 65   type ConstraintData = TaskSpec | dict | ChainGuard
 66
 67# isort: on
 68# ##-- end types
 69
 70##-- logging
 71logging = logmod.getLogger(__name__)
 72##-- end logging
 73
 74# Vars:
 75
 76# Body:
 77
[docs] 78class InjectSpec(BaseModel): 79 """A ConstraintData representation of an injection. 80 81 With a Task C, the control, and Task T, the target, 82 C injects data into T at defined times: 83 - from_spec[K1, K2] : T.spec[K1] = C.spec[K2] 84 - from_state[K1, K2] : T._internal_state[K1] = C._internal_state[K2] 85 - from_target[K0, K2] : T._internal_state[K1_] = T.spec[C.spec[K2]] 86 - literal[K1, V] : T._internal_state[k1] = V 87 88 Injection data can be: 89 - list[DKey] : coerced to dict of {K : K}. Implicit keys 90 - dict[k1:str, k2:DKey] : k1 is implicit, k2 is explicit 91 92 """ 93 from_spec : dict = Field(default_factory=dict) 94 from_state : dict = Field(default_factory=dict) 95 from_target : dict = Field(default_factory=dict) 96 literal : dict = Field(default_factory=dict) 97 with_suffix : Maybe[str] = Field(default=None) 98 _mapping : dict 99
[docs] 100 @classmethod 101 def build(cls, data:dict) -> Maybe[Self]: 102 """ builds an InjectSpec from basic data """ 103 logging.info("Building Injection: %s", data) 104 match data: 105 case None | InjectSpec(): 106 return data 107 case dict() | ChainGuard() as x if not bool(x): 108 return None 109 case dict() | ChainGuard(): 110 pass 111 case _: 112 raise doot.errors.InjectionError("Unknown injection base type", data) 113 114 try: 115 return cls(**data) 116 except StrangError as err: 117 raise doot.errors.InjectionError(err.args) from None 118 except ValidationError as err: 119 logging.debug("Building Injection Failed: %s : %s", data, err) 120 raise
121
[docs] 122 @staticmethod 123 def _prep_keys(keys:Maybe[dict[str,str]|list[str]], literal:bool=False) -> dict[str, Maybe[DKey|str]]: 124 """ prepare keys for the expansions 125 literal = True : means rhs is not a key 126 """ 127 k : DKey 128 try: 129 match keys: 130 case None: 131 return {} 132 case [*xs] if literal: 133 return {DKey(k, implicit=True):None for k in keys} 134 case [*xs]: 135 return {(k:=DKey(x, implicit=True)):k for x in xs} 136 case dict() if literal: 137 return {DKey(k, implicit=True):v for k,v in keys.items()} 138 case dict(): 139 return {DKey(k, implicit=True):DKey(v, insist=True, fallback=v) for k,v in keys.items()} 140 case _: 141 raise doot.errors.InjectionError("unknown keys type", keys) 142 143 except ValueError: 144 raise doot.errors.InjectionError("Injection LHS Keys need to be implicit", keys) from None 145 except TypeError: 146 raise doot.errors.InjectionError("Injection RHS Keys need to be explicit", keys) from None
147 148 ##--| validators
[docs] 149 @model_validator(mode="after") 150 def _validate_injection(self) -> Self: 151 # Build the target <- source mapping 152 self._mapping = dict() 153 for x,y in itz.chain(self.from_spec.items(), 154 self.from_state.items(), 155 self.from_target.items(), 156 self.literal.items()): 157 self._mapping[str(x)] = str(y) 158 else: 159 return self
160
[docs] 161 @field_validator("from_spec", mode="before") 162 def _validate_from_spec(cls, val:Any) -> dict: 163 return cls._prep_keys(val)
164
[docs] 165 @field_validator("from_state", mode="before") 166 def _validate_from_state(cls, val:Any) -> dict: 167 return cls._prep_keys(val)
168
[docs] 169 @field_validator("from_target", mode="before") 170 def _validate_from_target(cls, val:Any) -> dict: 171 result = cls._prep_keys(val) 172 return {f"{x:d}":y for x,y in result.items()}
173
[docs] 174 @field_validator("literal", mode="before") 175 def _validate_literal(cls, val:Any) -> dict: 176 return cls._prep_keys(val, literal=True)
177 178 ##--| dunders 179 def __bool__(self) -> bool: 180 return (bool(self.from_spec) 181 | bool(self.from_state) 182 | bool(self.from_target) 183 | bool(self.literal) 184 | (self.with_suffix is not None)) 185 186 ##--| public
[docs] 187 def validate(self, control:Task_i|TaskSpec_i, target:Task_i|TaskSpec_i, *, only_spec:bool=False) -> bool: 188 """ Ensures this injection is usable with given sources, and given required injections 189 190 eg: 191 target(must_inject=['a']), 192 control('a'=5) 193 Injection(from_spec=['a']) 194 The Injection is valid. 195 196 eg: 197 target(must_inject=['a']), 198 control('d'=9) 199 Injection(from_spec=['a']) 200 The Injection is invalid, 'a' is missing from the source. 201 202 eg: 203 target() 204 control('a'=10) 205 Injection(from_spec=['a']) 206 The Injection is invalid, 'a' is surplus to the task. 207 208 """ 209 result = self.validate_details(control, target, only_spec=only_spec) 210 return not any(bool(x) for x in result.values())
211
[docs] 212 def validate_details(self, control:Task_i|TaskSpec_i, target:Task_i|TaskSpec_i, *, only_spec:bool=False) -> dict: 213 """ 214 validate specs or tasks 215 checks from_spec, 216 and if given tasks, from_state as well 217 """ 218 control_needs : set = set(self.from_spec.values()) 219 target_needs : set = set(self.from_spec.keys()) 220 state_failure : bool = False 221 if not only_spec: 222 state_failure = bool(self.from_state) and not all(isinstance(x, Task_p) for x in (control, target)) 223 match control: 224 case Task_i(): 225 control_vals = control._internal_state 226 control_needs |= set(self.from_state.values()) 227 case _: 228 control_vals = control.extra # type: ignore[attr-defined] 229 230 match target: 231 case Task_i(): 232 target_vals = target._internal_state 233 target_needs |= set(self.from_state.keys()) 234 case _: 235 target_vals = target.extra # type: ignore[attr-defined] 236 237 238 must_inject = target_vals.get(MUST_INJECT_K, []) 239 240 # Get keys not found in the control spec 241 missing = control_needs - control_vals.keys() 242 # Get keys not found in the target spec 243 surplus = target_needs - target_vals.keys() 244 # Get keys expected for redirection, that are missing 245 control_redirects = set(self.from_target.values()) - control_vals.keys() 246 # Get target key redirections that are missing from target 247 target_redirects = {y for x in self.from_target.values() if (y:=x(control, limit=1)) not in target_vals} 248 mismatches = set() 249 if not (bool(missing) or bool(surplus)): 250 # if nothing is missing, test equality 251 mismatches |= {(x,y) for x,y in self.from_spec.items() if x(target_vals) != y(control_vals)} 252 253 literals = set() 254 if self.literal: 255 literals.update([x for x,y in self.literal.items() if target_vals.get(x, None) != y]) 256 257 return { 258 "rhs_missing" : missing, 259 "lhs_surplus" : surplus, 260 "rhs_redirect" : control_redirects, 261 "lhs_redirect" : target_redirects, 262 "mismatches" : mismatches, 263 "_internal_state" : state_failure, 264 "literal" : literals, 265 }
266
[docs] 267 def apply_from_spec(self, control:dict|TaskSpec_i|Task_p) -> dict: 268 """ Apply values from the control's spec values. 269 270 Fully expands keys in 'from_spec', 271 Only partially expands (L1) from 'from_target' 272 """ 273 control_data : Mapping|SpecStruct_p 274 # logging.info("Applying from_spec injection: %s", control.name) 275 match control: 276 case Task_i(): 277 control_data = cast("SpecStruct_p", control.spec) 278 case TaskSpec_i(): 279 control_data = cast("SpecStruct_p", control) 280 case Mapping(): 281 control_data = control 282 case x: 283 raise TypeError(type(x)) 284 285 data = {} 286 for x,y in self.from_spec.items(): 287 data[str(x)] = y(control_data) 288 for x,y in self.from_target.items(): 289 data[str(x)] = y(control_data, insist=True, fallback=y, limit=1) 290 else: 291 data.update(self.apply_literal(None)) 292 return data
293
[docs] 294 def apply_from_state(self, control:dict|Task_p) -> dict: 295 """ Expand a key using the control _internal_state """ 296 # logging.info("Applying from_state injection: %s", control.name) 297 pdata : dict 298 match control: 299 case Task_i(): 300 pdata = control._internal_state 301 case dict(): 302 pdata = control 303 case x: 304 raise TypeError(type(x)) 305 306 data = {} 307 for x,y in self.from_state.items(): 308 data[str(x)] = y(pdata) 309 else: 310 return data
311
[docs] 312 def apply_literal(self, val:Any) -> dict: 313 """ Takes a value and sets it for any keys in self.literal 314 315 Used for job's to insert literal values into a key. 316 eg: when mapping filenames to generated tasks 317 """ 318 if not bool(self.literal): 319 return {} 320 logging.debug("Applying literal injection") 321 data = {} 322 for x,_y in self.literal.items(): 323 match val, _y: 324 case None, None: 325 pass 326 case None, v: 327 data[str(x)] = v 328 case v, _: 329 data[str(x)] = v 330 else: 331 return data