Source code for doot.workflow.structs.relation_spec

  1#!/usr/bin/env python3
  2"""
  3
  4"""
  5
  6# Imports:
  7from __future__ import annotations
  8
  9# ##-- stdlib imports
 10import datetime
 11import enum
 12import functools as ftz
 13import itertools as itz
 14import logging as logmod
 15import pathlib as pl
 16import re
 17import time
 18import weakref
 19from uuid import UUID, uuid1
 20
 21# ##-- end stdlib imports
 22
 23# ##-- 3rd party imports
 24from pydantic import (BaseModel, Field, field_validator,
 25                      model_validator)
 26from jgdv import Maybe, Proto
 27from jgdv.structs.chainguard import ChainGuard
 28from jgdv.structs.strang import CodeReference, Strang
 29from jgdv.structs.strang._interface import StrangMarkAbstract_e
 30from jgdv.structs.locator import Location
 31from jgdv._abstract.protocols.general import Buildable_p
 32from jgdv._abstract.protocols.pydantic import ProtocolModelMeta
 33# ##-- end 3rd party imports
 34
 35# ##-- 1st party imports
 36import doot
 37import doot.errors
 38
 39# ##-- end 1st party imports
 40
 41from .._interface import RelationMeta_e, Task_p, Task_i, RelationSpec_i, InjectSpec_i, TaskSpec_i, TaskName_p, Artifact_i
 42from .task_name import TaskName
 43from .artifact import  TaskArtifact
 44from .inject_spec import InjectSpec
 45
 46# ##-- types
 47# isort: off
 48# General
 49import abc
 50import collections.abc
 51import typing
 52import types
 53from typing import cast, assert_type, assert_never
 54from typing import Generic, NewType, Never
 55from typing import no_type_check, final, override, overload
 56# Protocols and Interfaces:
 57from typing import Protocol, runtime_checkable
 58if typing.TYPE_CHECKING:
 59    from typing import Final, ClassVar, Any, Self
 60    from typing import Literal, LiteralString
 61    from typing import TypeGuard
 62    from collections.abc import Iterable, Iterator, Callable, Generator
 63    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 64
 65    from jgdv import Maybe
 66    from .task_spec import TaskSpec
 67    from .._interface import RelationMark, RelationTarget
 68
 69# isort: on
 70# ##-- end types
 71
 72##-- logging
 73logging = logmod.getLogger(__name__)
 74##-- end logging
 75
[docs] 76class RelationSpec(BaseModel, Buildable_p, arbitrary_types_allowed=True, metaclass=ProtocolModelMeta): 77 """ {object} is {relation} to {target} 78 79 Object is optional, to allow multiple different objects to have the same relationship to the target. 80 Encodes a relation between an object , (who owns this relationspec) 81 and the subject of the relation (who is contained within the relation) 82 83 eg: (baking <needs> mixing) 84 (baking <blocks> cake) 85 86 May carry additional information: 87 - constraints : dict|list. Maps obj[x] == targ[y] requirements 88 - inject : InjectSpec. Maps targ[x] = obj[y] values to pass to target. 89 - object : Maybe[TaskName]. the owning base object of the relationship 90 91 """ 92 93 # What the Relation end point is: 94 Marks : ClassVar[type[RelationMark]] = RelationMeta_e 95 ##--| 96 target : TaskName_p|Artifact_i 97 relation : RelationMeta_e = RelationMeta_e.needs 98 object : Maybe[TaskName|TaskArtifact] = None 99 constraints : dict[str, str] = {} 100 inject : Maybe[InjectSpec] = None 101 _meta : dict = {} # Misc metadata 102
[docs] 103 @override 104 @classmethod 105 def build(cls, data:RelationSpec_i|ChainGuard|dict|TaskName_p|str, *, relation:Maybe[RelationMark]=None) -> RelationSpec_i: # type: ignore[override] 106 """ Create a new relation, defaulting to a requirement. 107 108 """ 109 d_data : dict 110 result : Any 111 target : Any 112 relation = relation or cls.Marks.needs 113 result = None 114 match data: 115 case RelationSpec(): # Do Nothing 116 result = data 117 case pl.Path(): # Rely on a file 118 result = cls(target=TaskArtifact(data), relation=relation) 119 case TaskName() | TaskArtifact(): 120 result = cls(target=data, relation=relation) 121 case str() as x if TaskArtifact.section(0).end in x: # type: ignore[operator] 122 target = TaskArtifact(x) 123 result = cls(target=target, relation=relation) 124 case str() as x if Location.section(0).end in x: # type: ignore[operator] 125 target = Location(x) 126 return cls(target=target, relation=relation) 127 case str() as x if TaskName.section(0).end in x: # type: ignore[operator] 128 target = TaskName(x) 129 result = cls(target=target, relation=relation) 130 case {"path": path} if "task" not in data: 131 return cls(target=TaskArtifact(path), relation=relation) 132 case {"task": taskname}: 133 assert(isinstance(data, dict)) 134 constraints = data.get("constraints", None) or data.get("constraints_", []) 135 inject = data.get("inject", None) or data.get("inject_", None) 136 result = cls(target=TaskName(taskname), 137 constraints=constraints, 138 inject=inject, 139 relation=relation) 140 case dict() as d_data if "target" in d_data: 141 result = cls(**d_data) 142 case _: 143 raise ValueError("Bad data used for relation spec", type(data), data) 144 145 return result
146
[docs] 147 @field_validator("target", mode="before") 148 def _validate_target(cls, val:Any) -> RelationTarget: 149 match val: 150 case TaskName() | TaskArtifact(): 151 return cast("TaskName_p", val) 152 case pl.Path(): 153 return cast("Artifact_i", TaskArtifact(val)) 154 case str() if TaskName.section(0).end in val: # type: ignore[operator] 155 return cast("TaskName_p", TaskName(val)) 156 case _: 157 raise ValueError("Unparsable target str")
158
[docs] 159 @field_validator("constraints", mode="before") 160 def _validate_constraints(cls, val:Any) -> dict: 161 match val: 162 case list(): 163 return {x:x for x in val} 164 case dict(): 165 return val 166 case _: 167 raise TypeError("Unknown constraints type", val)
168
[docs] 169 @field_validator("inject", mode="before") 170 def _validate_inject(cls, val:Any) -> Maybe[str|InjectSpec_i]: 171 match val: 172 case None: 173 return None 174 case str(): 175 return val 176 case dict() | ChainGuard(): 177 return InjectSpec.build(val) 178 case _: 179 raise TypeError("Unknown injection type", val)
180 181 @override 182 def __str__(self): 183 return f"<? {self.relation.name} {self.target}>" 184 185 @override 186 def __repr__(self): 187 return f"<RelationSpec: ? {self.relation.name} {self.target}>" 188 189 def __contains__(self, query:str|StrangMarkAbstract_e|TaskName_p|Artifact_i) -> bool: 190 match self.target, query: 191 case TaskName(), TaskName(): 192 return query <= self.target 193 case TaskArtifact(), TaskArtifact(): 194 return query in self.target 195 case TaskArtifact(), StrangMarkAbstract_e(): 196 return query in self.target 197 case _: 198 raise NotImplementedError(self.target, query) 199
[docs] 200 def to_ordered_pair(self, obj:RelationTarget, *, target:Maybe[TaskName_p]=None) -> tuple[Maybe[RelationTarget], Maybe[RelationTarget]]: 201 """ a helper to make an edge for the tracker. 202 uses the current (abstract) target, unless an instance is provided 203 """ 204 match self.relation: 205 case RelationMeta_e.needs: 206 # target is a predecessor 207 return (target or self.target, obj) # type: ignore[return-value] 208 case RelationMeta_e.blocks: 209 # target is a succcessor 210 return (obj, target or self.target) # type: ignore[return-value]
211
[docs] 212 def instantiate(self, *, obj:Maybe[RelationTarget]=None, target:Maybe[RelationTarget]=None) -> RelationSpec_i: 213 """ 214 Duplicate this relation, but with a suitable concrete task or artifact as the object or subject 215 """ 216 match self.target, target: 217 case _, None: 218 pass 219 case TaskName_p(), Artifact_i(): 220 raise doot.errors.TrackingError("tried to instantiate a relation with the wrong target", self.target, target) 221 case Artifact_i(), TaskName_p(): 222 raise doot.errors.TrackingError("tried to instantiate a relation with the wrong target", self.target, target) 223 case TaskName_p(), TaskName_p() if not target.uuid(): # type: ignore[union-attr] 224 raise doot.errors.TrackingError("tried to instantiate a relation with the wrong target status", self.target, target) 225 case Artifact_i() as abs_targ, Artifact_i() as in_targ if (match:=abs_targ.reify(in_targ)) is not None: # type: ignore[operator, arg-type] 226 target = match 227 case _, _: 228 pass 229 230 if target is None: 231 target = self.target # type: ignore[assignment] 232 if obj is None: 233 obj = self.object # type: ignore[assignment] 234 235 return RelationSpec(target=target, 236 object=obj or self.object, 237 relation=self.relation, 238 constraints=self.constraints)
239
[docs] 240 def forward_dir_p(self) -> bool: 241 """ is this relation's direction obj -> target? """ 242 return self.relation is RelationMeta_e.blocks
243
[docs] 244 def accepts(self, control:Task_i|TaskSpec_i, target:Task_i|TaskSpec_i) -> bool: 245 """ Test if this pair of Tasks satisfies the relation """ 246 uuids = [target.name.uuid(), control.name.uuid()] 247 obj_target = isinstance(self.target, TaskName_p) 248 name_match = obj_target and self.target <= target.name 249 if (None in uuids or not name_match): 250 return False 251 252 control_vals = control.internal_state if isinstance(control, Task_p) else control.extra # type: ignore[union-attr] 253 target_vals = target.internal_state if isinstance(target, Task_p) else target.extra # type: ignore[union-attr] 254 255 # Check constraints match 256 for targ_k,source_k in self.constraints.items(): 257 if source_k not in control_vals: 258 continue 259 if targ_k not in target_vals: 260 return False 261 262 if (targ_v:=target_vals.get(targ_k, None)) != (source_v:=control_vals[source_k]): 263 logging.debug("[Relation] Constraint does not match: %s(%s) : %s(%s)", targ_k, targ_v, source_k, source_v) 264 return False 265 else: 266 pass 267 268 if self.inject is None: 269 return True 270 271 return self.inject.validate(control, target)