Module: dag

Expand source code
# Copyright (C) 2023-present The Project Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import List
import networkx as nx
from cl.runtime import RecordMixin
from cl.runtime.records.dataclasses_extensions import missing
from cl.runtime.view.dag.dag_edge import DagEdge
from cl.runtime.view.dag.dag_key import DagKey
from cl.runtime.view.dag.dag_layout_enum import DagLayoutEnum
from cl.runtime.view.dag.dag_node_position import DagNodePosition
from cl.runtime.view.dag.nodes.dag_node import DagNode


@dataclass(slots=True, kw_only=True)
class Dag(DagKey, RecordMixin[DagKey]):
    """Structure and visual representation of a directed acyclic graph (DAG)."""

    nodes: List[DagNode] = missing()
    """List of DAG nodes."""

    edges: List[DagEdge] = missing()
    """List of DAG edges."""

    def get_key(self) -> DagKey:
        """Return primary key of this instance in semicolon-delimited string format."""
        return DagKey(name=self.name)

    @staticmethod
    def auto_layout_dag(
        dag: "Dag",
        layout_mode: DagLayoutEnum = DagLayoutEnum.SPRING,
        offset_x: int = 600,
        base_scale: int = 100,
    ) -> "Dag":
        """
        Set positions automatically for the passed DAG.

        Parameters
        ----------
            dag : Dag
                Dag to create layout for.
            layout_mode : DagLayoutEnum
                Graph layout to use.
            offset_x : int
                Offset on x-axis to use between multiple Dags. Won't affect the resulting autolayout of a single Dag.
            base_scale : int
                Base scale to use while calculating the final Dag scale.
                The final scale is calculated for each subgraph separately based on the number of nodes in
                it using the following formula: base_scale * number_of_nodes^0.5.

        Returns
        -------
            Dag
                A modified Dag object with adjusted positions of nodes.
        """

        subgraphs = dag._build_disconnected_graphs()
        positions = {}
        base_offset_x = 0.0

        for subgraph in subgraphs:
            subgraph_scale = base_scale * len(subgraph.nodes) ** 0.5

            if layout_mode == DagLayoutEnum.CIRCULAR:
                layout = nx.circular_layout(subgraph, scale=subgraph_scale)
            elif layout_mode == DagLayoutEnum.PLANAR:
                layout = nx.planar_layout(subgraph, scale=subgraph_scale)
            elif layout_mode == DagLayoutEnum.SPRING:
                layout = nx.spring_layout(
                    subgraph,
                    scale=subgraph_scale,
                    # Number of iterations is for more accurate layout
                    iterations=100,
                    # Seed needed for consistent layout
                    seed=1,
                    # k - optimal distance between nodes, which should depend on the number of nodes
                    k=1 / (len(subgraph.nodes) ** 0.2),
                )
            else:
                raise Exception("Unsupported layout mode. Accepted layout modes: circular, planar, spring")
            positions.update({node: (x + base_offset_x, y) for node, (x, y) in layout.items()})
            base_offset_x += offset_x

        for node in dag.nodes:
            x, y = positions[node.id_]
            node.position = DagNodePosition(x=float(x), y=float(y))

        return dag

    @staticmethod
    def build_edge_between_nodes(source: DagNode, target: DagNode, label: str | None = None) -> DagEdge:
        """Create a connection between two DagNode instances."""
        return DagEdge(
            id_=f"e-{source.id_}-{target.id_}",
            label=label,
            source=source.id_,
            target=target.id_,
        )

    def _build_graph(self) -> nx.DiGraph:
        """Build networkx graph representation."""

        graph = nx.DiGraph(name=self.name)
        for edge in self.edges:
            graph.add_edge(**edge.to_networkx())
        for node in self.nodes:
            graph.add_node(**node.to_networkx())
        self._validate_graph(graph)
        return graph

    @staticmethod
    def _validate_graph(graph: nx.DiGraph):
        """Validate that graph has no cycles."""

        if not nx.is_directed_acyclic_graph(graph):
            raise RuntimeError("Graph is not acyclic!")

    def _build_disconnected_graphs(self) -> List[nx.DiGraph]:
        """Build list of disconnected (separated) networkx graphs."""

        graph = self._build_graph()
        return [graph.subgraph(subgraph) for subgraph in nx.weakly_connected_components(graph)]

Classes

class Dag (*, name: str = None, nodes: List[DagNode] = None, edges: List[DagEdge] = None)

Structure and visual representation of a directed acyclic graph (DAG).

Expand source code
@dataclass(slots=True, kw_only=True)
class Dag(DagKey, RecordMixin[DagKey]):
    """Structure and visual representation of a directed acyclic graph (DAG)."""

    nodes: List[DagNode] = missing()
    """List of DAG nodes."""

    edges: List[DagEdge] = missing()
    """List of DAG edges."""

    def get_key(self) -> DagKey:
        """Return primary key of this instance in semicolon-delimited string format."""
        return DagKey(name=self.name)

    @staticmethod
    def auto_layout_dag(
        dag: "Dag",
        layout_mode: DagLayoutEnum = DagLayoutEnum.SPRING,
        offset_x: int = 600,
        base_scale: int = 100,
    ) -> "Dag":
        """
        Set positions automatically for the passed DAG.

        Parameters
        ----------
            dag : Dag
                Dag to create layout for.
            layout_mode : DagLayoutEnum
                Graph layout to use.
            offset_x : int
                Offset on x-axis to use between multiple Dags. Won't affect the resulting autolayout of a single Dag.
            base_scale : int
                Base scale to use while calculating the final Dag scale.
                The final scale is calculated for each subgraph separately based on the number of nodes in
                it using the following formula: base_scale * number_of_nodes^0.5.

        Returns
        -------
            Dag
                A modified Dag object with adjusted positions of nodes.
        """

        subgraphs = dag._build_disconnected_graphs()
        positions = {}
        base_offset_x = 0.0

        for subgraph in subgraphs:
            subgraph_scale = base_scale * len(subgraph.nodes) ** 0.5

            if layout_mode == DagLayoutEnum.CIRCULAR:
                layout = nx.circular_layout(subgraph, scale=subgraph_scale)
            elif layout_mode == DagLayoutEnum.PLANAR:
                layout = nx.planar_layout(subgraph, scale=subgraph_scale)
            elif layout_mode == DagLayoutEnum.SPRING:
                layout = nx.spring_layout(
                    subgraph,
                    scale=subgraph_scale,
                    # Number of iterations is for more accurate layout
                    iterations=100,
                    # Seed needed for consistent layout
                    seed=1,
                    # k - optimal distance between nodes, which should depend on the number of nodes
                    k=1 / (len(subgraph.nodes) ** 0.2),
                )
            else:
                raise Exception("Unsupported layout mode. Accepted layout modes: circular, planar, spring")
            positions.update({node: (x + base_offset_x, y) for node, (x, y) in layout.items()})
            base_offset_x += offset_x

        for node in dag.nodes:
            x, y = positions[node.id_]
            node.position = DagNodePosition(x=float(x), y=float(y))

        return dag

    @staticmethod
    def build_edge_between_nodes(source: DagNode, target: DagNode, label: str | None = None) -> DagEdge:
        """Create a connection between two DagNode instances."""
        return DagEdge(
            id_=f"e-{source.id_}-{target.id_}",
            label=label,
            source=source.id_,
            target=target.id_,
        )

    def _build_graph(self) -> nx.DiGraph:
        """Build networkx graph representation."""

        graph = nx.DiGraph(name=self.name)
        for edge in self.edges:
            graph.add_edge(**edge.to_networkx())
        for node in self.nodes:
            graph.add_node(**node.to_networkx())
        self._validate_graph(graph)
        return graph

    @staticmethod
    def _validate_graph(graph: nx.DiGraph):
        """Validate that graph has no cycles."""

        if not nx.is_directed_acyclic_graph(graph):
            raise RuntimeError("Graph is not acyclic!")

    def _build_disconnected_graphs(self) -> List[nx.DiGraph]:
        """Build list of disconnected (separated) networkx graphs."""

        graph = self._build_graph()
        return [graph.subgraph(subgraph) for subgraph in nx.weakly_connected_components(graph)]

Ancestors

Static methods

def auto_layout_dag(dag: Dag, layout_mode: DagLayoutEnum = DagLayoutEnum.SPRING, offset_x: int = 600, base_scale: int = 100) -> Dag

Set positions automatically for the passed DAG.

Parameters

dag : Dag
    Dag to create layout for.
layout_mode : DagLayoutEnum
    Graph layout to use.
offset_x : int
    Offset on x-axis to use between multiple Dags. Won't affect the resulting autolayout of a single Dag.
base_scale : int
    Base scale to use while calculating the final Dag scale.
    The final scale is calculated for each subgraph separately based on the number of nodes in
    it using the following formula: base_scale * number_of_nodes^0.5.

Returns

Dag
    A modified Dag object with adjusted positions of nodes.
def build_edge_between_nodes(source: DagNode, target: DagNode, label: str | None = None) -> DagEdge

Create a connection between two DagNode instances.

def get_key_type() -> Type

Inherited from: DagKey.get_key_type

Return key type even when called from a record.

Fields

var edges -> List[DagEdge]

List of DAG edges.

var name -> str

Inherited from: DagKey.name

Unique DAG identifier.

var nodes -> List[DagNode]

List of DAG nodes.

Methods

def get_key(self) -> DagKey

Return primary key of this instance in semicolon-delimited string format.

def init_all(self) -> None

Inherited from: RecordMixin.init_all

Invoke ‘init’ for each class in the order from base to derived, then validate against schema.