doot.control.tracker.network

The network of task relations.

Uses an nx.Digraph internally. Is build ‘backwards’, as this preserves the meaning of graph.pred[x] = [y] as y.depends_on[x] and graph.succ[x] = [y] as y.required_for[x]

Type Aliases

Abstract

Classes

_Expansion_m

_Validation_m

TrackNetwork

The _graph of concrete tasks and their dependencies

Module Contents

doot.control.tracker.network.Abstract: TypeAlias = T
class doot.control.tracker.network._Expansion_m[source]
_tracker: doot.control.tracker._interface.WorkflowTracker_i
pred: collections.abc.Mapping
succ: collections.abc.Mapping
nodes: collections.abc.Mapping
edges: collections.abc.Mapping
_graph: Any
non_expanded: set
build_network(
*,
sources: jgdv.Maybe[Literal[True] | list[Concrete[doot.workflow._interface.TaskName_p] | doot.workflow._interface.Artifact_i]] = None,
) None[source]

for each task queued (ie: connected to the root node) expand its dependencies and add into the _graph, until no more nodes to expand. then connect concrete _tracker._registry.artifacts to abstract _tracker._registry.artifacts.

passing sources=True forces build of any non_expanded nodes that have an edge

# TODO _graph could be built in total, or on demand

Parameters:

sources (jgdv.Maybe[Literal[True] | list[Concrete[doot.workflow._interface.TaskName_p] | doot.workflow._interface.Artifact_i]])

Return type:

None

connect(
left: Concrete[doot.workflow._interface.TaskName_p] | doot.workflow._interface.Artifact_i,
right: jgdv.Maybe[Literal[False] | Concrete[doot.workflow._interface.TaskName_p] | doot.workflow._interface.Artifact_i] = None,
**kwargs,
) None[source]

Connect a task node to another. left -> right If given left, None, connect left -> API.ROOT if given left, False, just add the node

(This preserves graph.pred[x] as the nodes x is dependent on)

Parameters:
Return type:

None

_add_node(
name: Concrete[doot.workflow._interface.TaskName_p] | doot.workflow._interface.Artifact_i,
) None[source]

idempotent

Parameters:

name (Concrete[doot.workflow._interface.TaskName_p] | doot.workflow._interface.Artifact_i)

Return type:

None

_expand_task_node(
name: Concrete[doot.workflow._interface.TaskName_p],
) set[Concrete[doot.workflow._interface.TaskName_p] | doot.workflow._interface.Artifact_i][source]

expand a task node, instantiating and connecting to its dependencies and dependents, without expanding those new nodes. returns a list of the new nodes tasknames

Parameters:

name (Concrete[doot.workflow._interface.TaskName_p])

Return type:

set[Concrete[doot.workflow._interface.TaskName_p] | doot.workflow._interface.Artifact_i]

_generate_successor_nodes(
spec: Concrete[doot.workflow.TaskSpec],
) list[Concrete[doot.workflow._interface.TaskName_p]][source]

instantiate and connect a job’s head task

for a spec S, find the tasks T that have registered a relation of T < S. (S would not know about these blockers).

For these T, link instantiated nodes that match constraints and link them to S, or if no nodes exist, create and link them.

Parameters:

spec (Concrete[doot.workflow.TaskSpec])

Return type:

list[Concrete[doot.workflow._interface.TaskName_p]]

_generate_blockers(
name: doot.workflow._interface.TaskName_p,
) set[Concrete[doot.workflow._interface.TaskName_p] | doot.workflow._interface.Artifact_i][source]
Parameters:

name (doot.workflow._interface.TaskName_p)

Return type:

set[Concrete[doot.workflow._interface.TaskName_p] | doot.workflow._interface.Artifact_i]

_expand_artifact(
artifact: doot.workflow._interface.Artifact_i,
) set[Concrete[doot.workflow._interface.TaskName_p] | doot.workflow._interface.Artifact_i][source]

expand _tracker._registry.artifacts, instantiating related tasks, and connecting the task to its abstract/concrete related _tracker._registry.artifacts

Parameters:

artifact (doot.workflow._interface.Artifact_i)

Return type:

set[Concrete[doot.workflow._interface.TaskName_p] | doot.workflow._interface.Artifact_i]

class doot.control.tracker.network._Validation_m[source]
_tracker: doot.control.tracker._interface.WorkflowTracker_i
_graph: Any
nodes: collections.abc.Mapping
edges: collections.abc.Mapping
pred: collections.abc.Mapping
succ: collections.abc.Mapping
validate_network(*, strict: bool = True) bool[source]

Finalise and ensure consistence of the task _graph. run tests to check the dependency graph is acceptable

Parameters:

strict (bool)

Return type:

bool

concrete_edges(
name: Concrete[doot.workflow._interface.TaskName_p | doot.workflow.TaskArtifact],
) jgdv.structs.chainguard.ChainGuard[source]

get the concrete edges of a task. ie: the ones in the task _graph, not the abstract ones in the spec.

Parameters:

name (Concrete[doot.workflow._interface.TaskName_p | doot.workflow.TaskArtifact])

Return type:

jgdv.structs.chainguard.ChainGuard

report_tree() None[source]

Use networkx + plt’s graph drawing to inspect the constructed graph

Return type:

None

class doot.control.tracker.network.TrackNetwork(
*,
tracker: doot.control.tracker._interface.WorkflowTracker_p,
)[source]

The _graph of concrete tasks and their dependencies

Parameters:

tracker (doot.control.tracker._interface.WorkflowTracker_p)

_tracker: doot.control.tracker._interface.WorkflowTracker_p
_graph: networkx.DiGraph[Concrete[doot.workflow._interface.TaskName_p] | doot.workflow.TaskArtifact]
non_expanded: set[doot.workflow._interface.TaskName_p | doot.workflow._interface.Artifact_i]
property nodes: dict[source]
Return type:

dict

property edges: dict[source]
Return type:

dict

property pred: dict[source]
Return type:

dict

property adj: dict[source]
Return type:

dict

property succ: dict[source]
Return type:

dict