1#!/usr/bin/env python3
2"""
3
4"""
5# mypy: disable-error-code="attr-defined"
6# ruff: noqa: N812
7# Imports:
8from __future__ import annotations
9
10# ##-- stdlib imports
11import datetime
12import functools as ftz
13import importlib
14import itertools as itz
15import logging as logmod
16import pathlib as pl
17import re
18import time
19import types
20import typing
21import weakref
22from importlib.metadata import EntryPoint
23from uuid import UUID, uuid1
24
25# ##-- end stdlib imports
26
27# ##-- 3rd party imports
28import jgdv.structs.strang.errors as StrangErrs
29from jgdv import Mixin, Proto
30from jgdv._abstract.protocols.general import Buildable_p, SpecStruct_p
31from jgdv._abstract.protocols.pydantic import ProtocolModelMeta
32from jgdv.cli import ParamSpec, ParamSpecMaker_m
33from jgdv.structs.chainguard import ChainGuard
34from jgdv.structs.dkey import DKey
35from jgdv.structs.locator import Location
36from jgdv.structs.strang import CodeReference
37
38# ##-- end 3rd party imports
39
40# ##-- 1st party imports
41import doot
42import doot.errors
43from doot.workflow import (ActionSpec, DootJob, DootTask, InjectSpec,
44 RelationSpec, TaskArtifact, TaskName, TaskSpec)
45from doot.workflow import _interface as API
46from doot.workflow._interface import (ActionSpec_i, InjectSpec_i, Job_p,
47 RelationMeta_e, RelationSpec_i, Task_i,
48 Task_p, TaskMeta_e, TaskName_p,
49 TaskSpec_i, Artifact_i)
50
51# ##-- end 1st party imports
52
53# ##-| Local
54from ._interface import SubTaskFactory_p, TaskFactory_p, DelayedSpec
55# # End of Imports.
56
57# ##-- types
58# isort: off
59import abc
60import collections.abc
61from typing import TYPE_CHECKING, cast, assert_type, assert_never
62from typing import Generic, NewType, Any, Annotated, override
63# Protocols:
64from typing import Protocol, runtime_checkable
65# Typing Decorators:
66from typing import no_type_check, final, overload
67from dataclasses import _MISSING_TYPE, InitVar, dataclass, field, fields
68from pydantic import (BaseModel, BeforeValidator, Field, ValidationError,
69 ValidationInfo, ValidatorFunctionWrapHandler, ConfigDict,
70 WrapValidator, field_validator, model_validator)
71
72if TYPE_CHECKING:
73 from jgdv import Maybe
74 import enum
75 from typing import Final
76 from typing import ClassVar, LiteralString
77 from typing import Never, Self, Literal, _SpecialType
78 from typing import TypeGuard
79 from collections.abc import Iterable, Iterator, Callable, Generator
80 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
81
82 type SpecialType = _SpecialType
83
84# isort: on
85# ##-- end types
86
87##-- logging
88logging = logmod.getLogger(__name__)
89##-- end logging
90
91##--| Consts
92DEFAULT_ALIAS : Final[str] = doot.constants.entrypoints.DEFAULT_TASK_CTOR_ALIAS
93DEFAULT_BLOCKING : Final[tuple[str, ...]] = ("required_for", "on_fail")
94DEFAULT_RELATION : Final[RelationMeta_e] = RelationMeta_e.default()
95##--| Utils
96
97##--|
98
[docs]
99@Proto(TaskFactory_p)
100class TaskFactory:
101 """
102 Factory to create task specs, instantiate them, and make tasks
103
104 build : data -> spec
105 delay : data -> delayed -> spec
106 instantiate : spec -> spec(name=name[uuid])
107 reify : spec,partial -> spec
108 over : orig,plus -> spec(plus<orig, name..<+>[uuid])
109 under : orig,plus -> spec(orig<plus, name..<+>[uuid])
110 make : spec -> task
111
112 """
113 spec_ctor : type[TaskSpec_i]
114 task_ctor : type[Task_p]
115 job_ctor : type[Job_p]
116
117 def __init__(self, *, spec_ctor:Maybe[type]=None, task_ctor:Maybe[type]=None, job_ctor:Maybe[type]=None):
118 x : type[Any]
119 match spec_ctor:
120 case None:
121 match CodeReference(doot.aliases.task.spec)():
122 case type() as ref:
123 self.spec_ctor = ref
124 case Exception() as err:
125 raise err
126 case type() as x:
127 self.spec_ctor = x
128
129 match task_ctor:
130 case None:
131 match CodeReference(doot.aliases.task.task)():
132 case type() as ref:
133 self.task_ctor = ref
134 case Exception() as err:
135 raise err
136 case type() as x:
137 self.task_ctor = x
138
139 match job_ctor:
140 case None:
141 match CodeReference(doot.aliases.task.job)():
142 case type() as ref:
143 self.job_ctor = ref
144 case Exception() as err:
145 raise err
146 case type() as x:
147 self.job_ctor = x
148
149 ##--| Spec manipulation
150
[docs]
151 def build(self, data:ChainGuard|dict|TaskName_p|str) -> TaskSpec_i:
152 result : TaskSpec_i
153 match data:
154 case TaskSpec_i():
155 result = data
156 case ChainGuard() | dict() if "source" in data:
157 raise ValueError("source is deprecated, use 'sources'", data)
158 case ChainGuard() | dict():
159 result = self.spec_ctor(**data)
160 case TaskName():
161 result = self.spec_ctor(name=data) # type: ignore[call-arg]
162 case str():
163 result = self.spec_ctor(name=TaskName(data)) # type: ignore[call-arg]
164 case x:
165 raise TypeError(type(x))
166
167 return result
168
[docs]
169 def delay(self, *, base:TaskName_p, target:TaskName_p, inject:Maybe[InjectSpec_i]=None, applied:Maybe[dict]=None, overrides:dict) -> DelayedSpec:
170 """
171 Build data structure that the registry will process into a full spec
172 """
173 result : DelayedSpec = DelayedSpec(base=TaskName(base),
174 target=TaskName(target),
175 inject=inject,
176 applied=applied,
177 overrides=overrides,
178 )
179
180 return result
181
[docs]
182 def instantiate(self, obj:TaskSpec_i, *, suffix:Maybe[bool|str]=None, extra:Maybe[Mapping]=None) -> TaskSpec_i:
183 """
184 Return this spec, copied with a uniq name
185 """
186 result : TaskSpec_i
187 instance : TaskSpec_i
188 # TODO use model_copy(update={...})
189 instance = obj.model_copy()
190 instance.generated_names.clear()
191 match extra:
192 case None:
193 result = instance
194 case dict():
195 result = self.merge(bot=instance, top=extra)
196 case x:
197 raise TypeError(type(x))
198
199 result.name = self._prep_name(obj.name, suffix=suffix or False).to_uniq()
200 assert(result.name.uuid())
201 return result
202
[docs]
203 def merge(self, top:dict|TaskSpec_i, bot:dict|TaskSpec_i, *, suffix:Maybe[str|Literal[False]]=None, name:Maybe[TaskName_p]=None) -> TaskSpec_i:
204 """ bot + top -> TaskSpec """
205 result : dict
206 base_name : TaskName_p
207 top_data : dict
208 bot_data : dict
209 ##--|
210 if bot is top:
211 raise doot.errors.TrackingError("Tried to apply a spec over itself", top, bot)
212
213 ##--| prepare
214 match bot:
215 case dict():
216 bot_data = bot
217 case TaskSpec_i():
218 bot_data = dict(bot)
219 match top:
220 case dict():
221 top_data = top
222 case TaskSpec_i():
223 top_data = dict(top)
224 ##--|
225 result = self._specialize_merge(bot=bot_data, top=top_data)
226 match name:
227 case TaskName_p():
228 result['name'] = name
229 case _:
230 base_name = top_data.get('name', None) or bot_data['name']
231 result['name'] = self._prep_name(base_name, suffix=suffix)
232 ##--|
233 return self.build(result)
234
235 ##--| Task construction
236
[docs]
237 def make(self, obj:TaskSpec_i, ensure:Any=None) -> Task_p:
238 """ Create actual task instance
239
240 if no spec_ctor has been specified, uses the default spec_ctor for job/task
241 """
242 task : Task_p
243 ctor : type
244 match obj.ctor: # Get the ctor
245 case None if TaskMeta_e.JOB in obj.meta:
246 ctor = self.job_ctor
247 case None:
248 ctor = self.task_ctor
249 case CodeReference() as x:
250 match x(check=ensure):
251 case type() as val:
252 ctor = val
253 case Exception() as err:
254 raise err
255 case x:
256 raise TypeError(type(x))
257
258 assert(ctor is not None)
259 task = ctor(obj)
260
261 return task
262
263 ##--| utils
264
[docs]
265 def get_source_names(self, obj:TaskSpec_i) -> list[TaskName_p]:
266 """ Get from the spec's sources just its source tasks """
267 val = [x for x in obj.sources if isinstance(x, TaskName)]
268 return cast("list[TaskName_p]", val)
269
[docs]
270 def action_groups(self, obj:TaskSpec_i) -> Iterable[Iterable]:
271 return [obj.depends_on, obj.setup, obj.actions, obj.cleanup, obj.on_fail]
272
[docs]
273 def action_group_elements(self, obj:TaskSpec_i) -> Iterable[ActionSpec_i|RelationSpec_i]:
274 """ Get the elements of: depends_on, setup, actions, and require_for.
275 """
276 groups : Iterable[Iterable] = [obj.depends_on, obj.setup, obj.actions, obj.required_for]
277 for group in groups:
278 yield from group
279
[docs]
280 def _specialize_merge(self, *, bot:dict, top:dict) -> dict:
281 """
282 Apply top over the top of bot
283
284 Combines, rather than overrides, particular values.
285
286 """
287 x : Any
288 y : Any
289 specialized : dict
290 sources : set = set()
291 merge_keys : list = ["actions", "depends_on", "required_for", "cleanup", "on_fail", "setup"]
292
293 specialized = dict(bot)
294 specialized |= dict(top)
295 if 'name' in specialized:
296 del specialized['name']
297
298 # Extend sources
299 match bot.get('sources', []), top.get('sources', []):
300 case x, y if len(x) < len(y):
301 sources.update(y)
302 case x, _:
303 sources.update(x)
304
305 if 'name' in bot:
306 sources.add(bot['name'])
307 if 'name' in top:
308 sources.add(top['name'])
309 specialized['sources'] = list(sources)
310 # Merge action groups
311 for x in merge_keys:
312 specialized[x] = [*bot.get(x, []), *top.get(x, [])]
313
314 # Internal is only for initial specs, to control listing
315 specialized[API.META_K] = set()
316 specialized[API.META_K].update(bot.get('meta', set()))
317 specialized[API.META_K].update(top.get('meta', set()))
318 specialized[API.META_K].difference_update({TaskMeta_e.INTERNAL})
319
320 return specialized
321
[docs]
322 def _prep_name(self, base:TaskName_p, *, suffix:Maybe[int|str|Literal[False]]=None) -> TaskName_p:
323 result : TaskName_p
324 ##--|
325 match suffix:
326 case bool() | 0:
327 result = cast("TaskName_p", base)
328 case None:
329 result = base.push(TaskName.Marks.customised) # type: ignore[assignment]
330 case str():
331 result = base.push(suffix) # type: ignore[assignment]
332 case int() as x if x > 0:
333 result = cast("TaskName_p", base.push(str(x)))
334 case x:
335 raise TypeError(type(x))
336 ##--|
337 return result.de_uniq()
338
[docs]
339@Proto(SubTaskFactory_p)
340class SubTaskFactory:
341 """ Additional factory for generating related tasks of an instantiated spec """
342
[docs]
343 def generate_names(self, obj:TaskSpec_i) -> list[TaskName_p]:
344 result : list[Maybe[TaskName_p]] = [
345 self._job_head_p(obj),
346 self._cleanup_p(obj),
347 ]
348 return [x for x in result if x is not None]
349
[docs]
350 def generate_specs(self, obj:TaskSpec_i|Artifact_i|DelayedSpec) -> list[dict]:
351 result : list[dict] = []
352 if not isinstance(obj, TaskSpec_i):
353 return result
354
355 logging.debug("[Task.Factory.Generate] : %s (%s)", obj.name, len(obj.generated_names))
356 # Jobs generate their head
357 result += self._gen_job_head(obj)
358 result += self._gen_cleanup_task(obj)
359
360 obj.generated_names.update([x['name'] for x in result])
361 return result
362
[docs]
363 def _gen_job_head(self, obj:TaskSpec_i) -> list[dict]:
364 """
365 Generate a top spec for a job, taking the jobs cleanup actions
366 and using them as the head's main action.
367 Cleanup relations are turning into the head's dependencies
368 Depends on the job, and its reactively queued.
369
370 Equivalent to:
371 await job.depends_on()
372 await job.setup()
373 subtasks = job.actions()
374 await subtasks
375 job.head()
376 await job.cleanup()
377 """
378 job_head : TaskName_p
379 match self._job_head_p(obj):
380 case None:
381 return []
382 case TaskName_p() as job_head:
383 pass
384
385 tasks = []
386 head_section = self._raw_data_to_specs(obj.extra.on_fail([], list).head_actions(), relation=RelationMeta_e.needs)
387 head_dependencies = [x for x in head_section if isinstance(x, RelationSpec_i) and x.target != job_head]
388 head_actions = [x for x in head_section if not isinstance(x, RelationSpec_i)]
389 ctor = obj.extra.on_fail(None).sub_ctor()
390
391 # build $head$
392 head : dict = {
393 "name" : job_head,
394 "ctor" : ctor,
395 "sources" : [*obj.sources[:], obj.name, None],
396 "queue_behaviour" : API.QueueMeta_e.reactive,
397 "depends_on" : [obj.name, *head_dependencies],
398 "required_for" : obj.required_for[:],
399 "cleanup" : obj.cleanup[:],
400 "meta" : (obj.meta | {TaskMeta_e.JOB_HEAD}) - {TaskMeta_e.JOB},
401 "actions" : head_actions,
402 **obj.extra,
403 }
404 assert(TaskMeta_e.JOB not in head['meta'])
405 tasks.append(head)
406 return tasks
407
[docs]
408 def _gen_cleanup_task(self, obj:TaskSpec_i) -> list[dict]:
409 """ Generate a cleanup task, shifting the 'cleanup' actions and dependencies
410 to 'depends_on' and 'actions'
411 """
412 cleanup_name : TaskName_p
413 match self._cleanup_p(obj):
414 case None:
415 return []
416 case TaskName_p() as cleanup_name:
417 pass
418
419 base_deps = [obj.name] + [x for x in obj.cleanup if isinstance(x, RelationSpec_i) and x.target != cleanup_name]
420 actions = [x for x in obj.cleanup if isinstance(x, ActionSpec_i)]
421 sources = [obj.name]
422
423 cleanup : dict = {
424 "name" : cleanup_name,
425 "ctor" : obj.ctor,
426 "sources" : sources,
427 "queue_behaviour" : API.QueueMeta_e.reactive,
428 "depends_on" : base_deps,
429 "actions" : actions,
430 "cleanup" : [],
431 "meta" : (obj.meta | {TaskMeta_e.TASK}) - {TaskMeta_e.JOB},
432 }
433 assert(not bool(cleanup['cleanup']))
434 return [cleanup]
435
[docs]
436 def _raw_data_to_specs(self, deps:list[str|dict], *, relation:RelationMeta_e=DEFAULT_RELATION) -> list[ActionSpec_i|RelationSpec_i]:
437 """ Convert toml provided raw data (str's, dicts) of specs into ActionSpec and RelationSpec object"""
438 results : list[ActionSpec_i|RelationSpec_i] = []
439 for x in deps:
440 match x:
441 case ActionSpec_i() | RelationSpec_i():
442 results.append(x)
443 case { "do": action } as d:
444 assert(isinstance(d, dict))
445 results.append(ActionSpec.build(d))
446 case _:
447 results.append(RelationSpec.build(x, relation=relation))
448
449 return results
450
[docs]
451 def _job_head_p(self, obj:TaskSpec_i) -> Maybe[TaskName_p]:
452 # if not obj.name.uuid():
453 # return None
454 if TaskMeta_e.JOB not in obj.meta:
455 return None
456 if obj.name.is_head():
457 return None
458
459 return obj.name.with_head()
460
461
[docs]
462 def _cleanup_p(self, obj:TaskSpec_i) -> Maybe[TaskName_p]:
463 # if not obj.name.uuid():
464 # return None
465 if self._job_head_p(obj) or obj.name.is_cleanup():
466 return None
467
468 return obj.name.with_cleanup()