Module: successor_dag_node
Expand source code
# Copyright (C) 2023-present The Project Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from typing import List
from typing import Optional
from cl.runtime import Context
from cl.runtime import RecordMixin
from cl.runtime.log.exceptions.user_error import UserError
from cl.runtime.primitive.case_util import CaseUtil
from cl.runtime.records.dataclasses_extensions import missing
from cl.runtime.view.dag.dag import Dag
from cl.runtime.view.dag.dag_edge import DagEdge
from cl.runtime.view.dag.dag_layout_enum import DagLayoutEnum
from cl.runtime.view.dag.dag_node_data import DagNodeData
from cl.runtime.view.dag.nodes.dag_node import DagNode
from cl.runtime.views.dag.successor_dag_key import SuccessorDagKey
from cl.runtime.views.dag.successor_dag_node_key import SuccessorDagNodeKey
@dataclass(slots=True, kw_only=True)
class SuccessorDagNode(SuccessorDagNodeKey, RecordMixin[SuccessorDagNodeKey]):
"""Single node of SuccessorDag, defines its successors."""
dag: SuccessorDagKey = missing()
"""The DAG this node belongs to (included in node_id)."""
dag_node_id: str = missing()
"""Unique node identifier within the dag (included in node_id)."""
node_yaml: str = missing()
"""Node details in YAML format."""
successor_nodes: List[SuccessorDagNodeKey] | None = None
"""List of successor nodes (must belong to the same DAG)."""
successor_edges: List[str] | None = None
"""List of successor edge names in the same order as successor_nodes (must have the same size if not None)."""
def init(self) -> None:
# Generate from dag_id and node_name fields
self.node_id = f"{self.dag.dag_id}: {self.dag_node_id}"
# TODO: Make this a standard feature of CSV reader, then remove this code
if self.successor_nodes is not None:
self.successor_nodes = [
SuccessorDagNodeKey(**x) if isinstance(x, dict) else x for x in self.successor_nodes
]
# Verify that each successor belongs to the same DAG
if self.successor_nodes:
disallowed_list = [s for s in self.successor_nodes if s.node_id.split(":")[0] != self.dag.dag_id]
if disallowed_list and any(disallowed_list):
disallowed_list_str = " - ".join(f" - Node: {x.node_id}n" for x in disallowed_list)
raise UserError(
f"One or more successors do not belong to DAG {self.dag.dag_id}:n" f"{disallowed_list_str}"
)
# Verify that edges is None or has the same size
edge_count = len(self.successor_edges) if self.successor_edges is not None else 0
successor_count = len(self.successor_nodes) if self.successor_nodes is not None else 0
if False and edge_count != successor_count:
raise UserError(
f"In DAG node {self.node_id}, the number of edges is {edge_count} "
f"which does not match number of successors {successor_count}."
)
def get_key(self) -> SuccessorDagNodeKey:
return SuccessorDagNodeKey(node_id=self.node_id)
def view_dag(self) -> Dag:
"""DAG view for the node."""
return self.build_dag(node=self)
@staticmethod
def build_dag(
node: "SuccessorDagNode",
layout_mode: DagLayoutEnum = DagLayoutEnum.PLANAR,
ignore_fields: Optional[list[str]] = None,
) -> Dag:
"""Build the DAG for the given node.
Args:
node (SuccessorDagNode): The root node to start the DAG from.
layout_mode (DagLayoutEnum): Layout mode for arranging the DAG. Defaults to DagLayoutEnum.PLANAR.
ignore_fields (Optional[list[str]]): Fields to ignore during traversal. Defaults to an empty list.
Returns:
Dag: The constructed directed acyclic graph (DAG).
"""
ignore_fields = ignore_fields or []
nodes, edges = [node.to_dag_node()], []
def traverse_graph_from_node(node_record: SuccessorDagNode, source_node: DagNode):
"""Recursively traverse the graph starting from the given node.
Args:
node_record (SuccessorDagNode): The node to start traversal from.
source_node (DagNode): The corresponding DAG node.
"""
slots = getattr(node_record, "__slots__", None)
if not slots:
return
node_fields = {
field_name: field_value
for field_name in slots
if (
isinstance((field_value := getattr(node_record, field_name)), SuccessorDagNodeKey)
# TODO (Yauheni): Use declarations instead of isinstance
# TODO: (Yauheni): Current filtration filters out the empty lists, which should be processed
or (
field_value
and hasattr(field_value, "__iter__")
and isinstance(field_value[0], SuccessorDagNodeKey)
)
and field_name not in ignore_fields
)
}
for field_name, field_value in node_fields.items():
if isinstance(field_value, SuccessorDagNodeKey):
loaded_node = Context.current().load_one(SuccessorDagNodeKey, field_value)
if loaded_node is None:
SuccessorDagNode.__append_empty_node(
source_node=source_node,
node_id=field_value.node_id,
edge_label=CaseUtil.snake_to_title_case(field_name),
nodes=nodes,
edges=edges,
)
continue
tree_node = loaded_node.to_dag_node()
edges.append(
Dag.build_edge_between_nodes(
source=source_node,
target=tree_node,
label=CaseUtil.snake_to_title_case(field_name),
)
)
if tree_node not in nodes:
nodes.append(tree_node)
traverse_graph_from_node(node_record=loaded_node, source_node=tree_node)
else:
if not field_value:
continue
for index, node_key in enumerate(field_value, start=1):
edge_label = f"{CaseUtil.snake_to_title_case(field_name)}[{index}]"
if field_name.endswith("nodes"):
edges_names_field_name = field_name.rstrip("nodes") + "edges"
edges_names = getattr(node_record, edges_names_field_name, None)
if edges_names and len(edges_names) == len(field_value):
edge_label = edges_names[index - 1]
loaded_node = Context.current().load_one(SuccessorDagNodeKey, node_key)
if loaded_node is None:
SuccessorDagNode.__append_empty_node(
source_node=source_node,
node_id=node_key.node_id,
edge_label=edge_label,
nodes=nodes,
edges=edges,
)
continue
tree_node = loaded_node.to_dag_node()
edges.append(
Dag.build_edge_between_nodes(
source=source_node,
target=tree_node,
label=edge_label,
)
)
if tree_node not in nodes:
nodes.append(tree_node)
traverse_graph_from_node(node_record=loaded_node, source_node=tree_node)
traverse_graph_from_node(node, nodes[0])
dag = Dag(name=f"DAG from `{node.node_id}` node", nodes=nodes, edges=edges)
return Dag.auto_layout_dag(dag, layout_mode)
def to_dag_node(self) -> DagNode:
"""Transform tree node to the DAG node."""
node_data = DagNodeData(label=self.node_id)
node_data.node_data = {"title": self.node_id, "data": self.node_yaml}
return DagNode(id_=self.node_id, data=node_data)
@staticmethod
def __append_empty_node(
source_node: DagNode,
node_id: str,
edge_label: str,
nodes: list[DagNode],
edges: list[DagEdge],
) -> None:
"""Append empty node to the list of nodes and edge to the list of edges."""
# TODO (Yauheni): Add color information to the node with entry, which doesn't exist
empty_node = DagNode(id_=node_id, data=DagNodeData(label=node_id))
if empty_node not in nodes:
nodes.append(empty_node)
edges.append(
Dag.build_edge_between_nodes(source=source_node, target=empty_node, label=edge_label),
)
Classes
class SuccessorDagNode (*, node_id: str = None, dag: SuccessorDagKey = None, dag_node_id: str = None, node_yaml: str = None, successor_nodes: Optional[List[SuccessorDagNodeKey]] = None, successor_edges: Optional[List[str]] = None)
-
Single node of SuccessorDag, defines its successors.
Expand source code
@dataclass(slots=True, kw_only=True) class SuccessorDagNode(SuccessorDagNodeKey, RecordMixin[SuccessorDagNodeKey]): """Single node of SuccessorDag, defines its successors.""" dag: SuccessorDagKey = missing() """The DAG this node belongs to (included in node_id).""" dag_node_id: str = missing() """Unique node identifier within the dag (included in node_id).""" node_yaml: str = missing() """Node details in YAML format.""" successor_nodes: List[SuccessorDagNodeKey] | None = None """List of successor nodes (must belong to the same DAG).""" successor_edges: List[str] | None = None """List of successor edge names in the same order as successor_nodes (must have the same size if not None).""" def init(self) -> None: # Generate from dag_id and node_name fields self.node_id = f"{self.dag.dag_id}: {self.dag_node_id}" # TODO: Make this a standard feature of CSV reader, then remove this code if self.successor_nodes is not None: self.successor_nodes = [ SuccessorDagNodeKey(**x) if isinstance(x, dict) else x for x in self.successor_nodes ] # Verify that each successor belongs to the same DAG if self.successor_nodes: disallowed_list = [s for s in self.successor_nodes if s.node_id.split(":")[0] != self.dag.dag_id] if disallowed_list and any(disallowed_list): disallowed_list_str = " - ".join(f" - Node: {x.node_id}n" for x in disallowed_list) raise UserError( f"One or more successors do not belong to DAG {self.dag.dag_id}:n" f"{disallowed_list_str}" ) # Verify that edges is None or has the same size edge_count = len(self.successor_edges) if self.successor_edges is not None else 0 successor_count = len(self.successor_nodes) if self.successor_nodes is not None else 0 if False and edge_count != successor_count: raise UserError( f"In DAG node {self.node_id}, the number of edges is {edge_count} " f"which does not match number of successors {successor_count}." ) def get_key(self) -> SuccessorDagNodeKey: return SuccessorDagNodeKey(node_id=self.node_id) def view_dag(self) -> Dag: """DAG view for the node.""" return self.build_dag(node=self) @staticmethod def build_dag( node: "SuccessorDagNode", layout_mode: DagLayoutEnum = DagLayoutEnum.PLANAR, ignore_fields: Optional[list[str]] = None, ) -> Dag: """Build the DAG for the given node. Args: node (SuccessorDagNode): The root node to start the DAG from. layout_mode (DagLayoutEnum): Layout mode for arranging the DAG. Defaults to DagLayoutEnum.PLANAR. ignore_fields (Optional[list[str]]): Fields to ignore during traversal. Defaults to an empty list. Returns: Dag: The constructed directed acyclic graph (DAG). """ ignore_fields = ignore_fields or [] nodes, edges = [node.to_dag_node()], [] def traverse_graph_from_node(node_record: SuccessorDagNode, source_node: DagNode): """Recursively traverse the graph starting from the given node. Args: node_record (SuccessorDagNode): The node to start traversal from. source_node (DagNode): The corresponding DAG node. """ slots = getattr(node_record, "__slots__", None) if not slots: return node_fields = { field_name: field_value for field_name in slots if ( isinstance((field_value := getattr(node_record, field_name)), SuccessorDagNodeKey) # TODO (Yauheni): Use declarations instead of isinstance # TODO: (Yauheni): Current filtration filters out the empty lists, which should be processed or ( field_value and hasattr(field_value, "__iter__") and isinstance(field_value[0], SuccessorDagNodeKey) ) and field_name not in ignore_fields ) } for field_name, field_value in node_fields.items(): if isinstance(field_value, SuccessorDagNodeKey): loaded_node = Context.current().load_one(SuccessorDagNodeKey, field_value) if loaded_node is None: SuccessorDagNode.__append_empty_node( source_node=source_node, node_id=field_value.node_id, edge_label=CaseUtil.snake_to_title_case(field_name), nodes=nodes, edges=edges, ) continue tree_node = loaded_node.to_dag_node() edges.append( Dag.build_edge_between_nodes( source=source_node, target=tree_node, label=CaseUtil.snake_to_title_case(field_name), ) ) if tree_node not in nodes: nodes.append(tree_node) traverse_graph_from_node(node_record=loaded_node, source_node=tree_node) else: if not field_value: continue for index, node_key in enumerate(field_value, start=1): edge_label = f"{CaseUtil.snake_to_title_case(field_name)}[{index}]" if field_name.endswith("nodes"): edges_names_field_name = field_name.rstrip("nodes") + "edges" edges_names = getattr(node_record, edges_names_field_name, None) if edges_names and len(edges_names) == len(field_value): edge_label = edges_names[index - 1] loaded_node = Context.current().load_one(SuccessorDagNodeKey, node_key) if loaded_node is None: SuccessorDagNode.__append_empty_node( source_node=source_node, node_id=node_key.node_id, edge_label=edge_label, nodes=nodes, edges=edges, ) continue tree_node = loaded_node.to_dag_node() edges.append( Dag.build_edge_between_nodes( source=source_node, target=tree_node, label=edge_label, ) ) if tree_node not in nodes: nodes.append(tree_node) traverse_graph_from_node(node_record=loaded_node, source_node=tree_node) traverse_graph_from_node(node, nodes[0]) dag = Dag(name=f"DAG from `{node.node_id}` node", nodes=nodes, edges=edges) return Dag.auto_layout_dag(dag, layout_mode) def to_dag_node(self) -> DagNode: """Transform tree node to the DAG node.""" node_data = DagNodeData(label=self.node_id) node_data.node_data = {"title": self.node_id, "data": self.node_yaml} return DagNode(id_=self.node_id, data=node_data) @staticmethod def __append_empty_node( source_node: DagNode, node_id: str, edge_label: str, nodes: list[DagNode], edges: list[DagEdge], ) -> None: """Append empty node to the list of nodes and edge to the list of edges.""" # TODO (Yauheni): Add color information to the node with entry, which doesn't exist empty_node = DagNode(id_=node_id, data=DagNodeData(label=node_id)) if empty_node not in nodes: nodes.append(empty_node) edges.append( Dag.build_edge_between_nodes(source=source_node, target=empty_node, label=edge_label), )
Ancestors
- SuccessorDagNodeKey
- KeyMixin
- abc.ABC
- RecordMixin
- typing.Generic
Static methods
def build_dag(node: SuccessorDagNode, layout_mode: DagLayoutEnum = DagLayoutEnum.PLANAR, ignore_fields: Optional[list[str]] = None) -> Dag
-
Build the DAG for the given node.
Args
node
:SuccessorDagNode
- The root node to start the DAG from.
layout_mode
:DagLayoutEnum
- Layout mode for arranging the DAG. Defaults to DagLayoutEnum.PLANAR.
ignore_fields
:Optional[list[str]]
- Fields to ignore during traversal. Defaults to an empty list.
Returns
Dag
- The constructed directed acyclic graph (DAG).
def get_key_type() -> Type
-
Inherited from:
SuccessorDagNodeKey
.get_key_type
Return key type even when called from a record.
Fields
var dag -> SuccessorDagKey
-
The DAG this node belongs to (included in node_id).
var dag_node_id -> str
-
Unique node identifier within the dag (included in node_id).
var node_id -> str
-
Inherited from:
SuccessorDagNodeKey
.node_id
Unique node identifier across all DAGs is generated by init in ‘{dag_id}: {dag_node_id}’ format.
var node_yaml -> str
-
Node details in YAML format.
var successor_edges -> Optional[List[str]]
-
List of successor edge names in the same order as successor_nodes (must have the same size if not None).
var successor_nodes -> Optional[List[SuccessorDagNodeKey]]
-
List of successor nodes (must belong to the same DAG).
Methods
def get_key(self) -> SuccessorDagNodeKey
-
Inherited from:
RecordMixin
.get_key
Return a new key object whose fields populated from self, do not return self.
def init(self) -> None
def init_all(self) -> None
-
Inherited from:
RecordMixin
.init_all
Invoke ‘init’ for each class in the order from base to derived, then validate against schema.
def to_dag_node(self) -> DagNode
-
Transform tree node to the DAG node.
def view_dag(self) -> Dag
-
DAG view for the node.