Source code for atomrdf.visualize

import graphviz
import logging
import os
import re
import xml.etree.ElementTree as ET
from rdflib import BNode, URIRef, Namespace, Literal, RDF
from rdflib import Literal as RDFLiteral
import uuid
import json

logger = logging.getLogger(__name__)


# ─── GEXF export helpers ──────────────────────────────────────────────────────

#: RGB colours for each semantic category used in the GEXF / Gephi export.
GEXF_CATEGORY_COLORS = {
    "Sample":      (224, 123,  57),   # warm orange
    "Material":    (155,  89, 182),   # purple
    "Structure":   ( 41, 128, 185),   # blue
    "Element":     ( 39, 174,  96),   # green
    "Calculation": (192,  57,  43),   # red
    "Potential":   (243, 156,  18),   # gold
    "Property":    ( 22, 160, 133),   # teal
    "Literal":     (189, 195, 199),   # light grey
    "Other":       (149, 165, 166),   # grey
}

# exact rdf:type local-name → semantic category
_TYPE_CATEGORY_EXACT = {
    # Sample
    "AtomicScaleSample":              "Sample",
    # Material
    "CrystallineMaterial":            "Material",
    "AmorphousStructure":             "Material",
    "FluidMaterial":                  "Material",
    "CrystallineFluid":               "Material",
    # Structure — crystal geometry
    "CrystalStructure":               "Structure",
    "UnitCell":                       "Structure",
    "CrystalUnitCell":                "Structure",
    "BravaisLattice":                 "Structure",
    "SimulationCell":                 "Structure",
    "SimulationCellAngle":            "Structure",
    "SimulationCellLength":           "Structure",
    "SimulationCellVector":           "Structure",
    "LatticeParameter":               "Structure",
    "LatticeAngle":                   "Structure",
    # Structure — defects
    "GrainBoundary":                  "Structure",
    "TiltGrainBoundary":              "Structure",
    "TwistGrainBoundary":             "Structure",
    "SymmetricalTiltGrainBoundary":   "Structure",
    "Vacancy":                        "Structure",
    "InterstitialImpurity":           "Structure",
    "SubstitutionalImpurity":         "Structure",
    "DefectComplex":                  "Structure",
    # Element
    "ChemicalElement":                "Element",
    "Species":                        "Element",
    "ChemicalSpecies":                "Element",
    # Calculation — simulation types
    "MolecularStatics":               "Calculation",
    "MolecularDynamics":              "Calculation",
    "DensityFunctionalTheory":        "Calculation",
    "Simulation":                     "Calculation",
    "EnergyCalculation":              "Calculation",
    "GeneralizedGradientApproximation": "Calculation",
    "LocalDensityApproximation":      "Calculation",
    "InputParameter":                 "Calculation",
    "SoftwareAgent":                  "Calculation",
    # Calculation — relaxation types & ensembles
    "AtomicPositionRelaxation":       "Calculation",
    "CellShapeRelaxation":            "Calculation",
    "CellVolumeRelaxation":           "Calculation",
    "MicrocanonicalEnsemble":         "Calculation",
    "IsothermalIsobaricEnsemble":     "Calculation",
    "CanonicalEnsemble":              "Calculation",
    "GrandCanonicalEnsemble":         "Calculation",
    # Calculation — operations
    "DeleteAtom":                     "Calculation",
    "SubstituteAtom":                 "Calculation",
    "AddAtom":                        "Calculation",
    "Multiplication":                 "Calculation",
    "Division":                       "Calculation",
    "Subtraction":                    "Calculation",
    "Addition":                       "Calculation",
    # Potential
    "InteratomicPotential":           "Potential",
    "EmbeddedAtomModel":              "Potential",
    "ModifiedEmbeddedAtomModel":      "Potential",
    "PairPotential":                  "Potential",
    "FinnisSinclairPotential":        "Potential",
    "LennardJonesPotential":          "Potential",
    "MachineLearningPotential":       "Potential",
    "MACE":                           "Potential",
    "ACE":                            "Potential",
    "GRACE":                          "Potential",
    "NNPotential":                    "Potential",
    "DeePMD":                         "Potential",
    # Property
    "CalculatedProperty":             "Property",
    "GrainBoundaryEnergy":            "Property",
    "FormationEnergy":                "Property",
    "SurfaceEnergy":                  "Property",
    "VacancyFormationEnergy":         "Property",
    "SegregationEnergy":              "Property",
    "WorkOfSeparation":               "Property",
    "Volume":                         "Property",
    "TotalEnergy":                    "Property",
    "Energy":                         "Property",
    "Temperature":                    "Property",
    "Pressure":                       "Property",
    "Stress":                         "Property",
    "ElasticConstant":                "Property",
    "BulkModulus":                    "Property",
    "AtomAttribute":                  "Property",
}

# Regex that matches the UUID portion of atomRDF instance URIs such as
# ``sample:3f8a7b2c-1234-5678-abcd-ef0123456789_CrystalStructure``
_ATOMRDF_UUID_RE = re.compile(
    r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}_',
    re.IGNORECASE,
)


def _gexf_local_name(uri_str):
    """Return the local name of a URI (fragment after the last '#' or '/')."""
    for sep in ("#", "/"):
        if sep in uri_str:
            return uri_str.rsplit(sep, 1)[-1]
    return uri_str


def _gexf_term_key(term):
    """Return a hashable key that uniquely identifies an RDF term."""
    if isinstance(term, RDFLiteral):
        return f"lit\x00{term}\x00{term.datatype}"
    if isinstance(term, BNode):
        return f"bnode\x00{term}"
    return str(term)


def _gexf_label(term):
    """Return a short human-readable label for any RDF term."""
    if isinstance(term, RDFLiteral):
        s = str(term)
        return (s[:60] + "\u2026") if len(s) > 60 else s
    if isinstance(term, BNode):
        return str(term)[:24]
    return _gexf_local_name(str(term))


def _classify_by_local_name(local):
    """
    Return a semantic category for a URI local name, or ``None`` if unknown.

    Used for both rdf:type classification and URI-suffix extraction.
    """
    if local in _TYPE_CATEGORY_EXACT:
        return _TYPE_CATEGORY_EXACT[local]
    if "Sample" in local:
        return "Sample"
    if any(k in local for k in ("Potential", "ForceField")):
        return "Potential"
    if any(k in local for k in ("Property", "Energy", "Pressure",
                                "Stress", "Elasticity",
                                "Force", "Segregation", "Separation",
                                "Modulus", "Constant")):
        return "Property"
    if any(k in local for k in ("Statics", "Dynamics", "Functional",
                                "Simulation", "Calculation", "Activity",
                                "Theory", "Method", "Approximation")):
        return "Calculation"
    if any(k in local for k in ("Element", "Species")):
        return "Element"
    if any(k in local for k in ("Structure", "Lattice", "UnitCell", "Cell",
                                "GrainBoundary", "Grain", "Boundary",
                                "Defect", "Vacancy", "Impurity",
                                "Interstitial", "Substitutional")):
        return "Structure"
    if any(k in local for k in ("Material", "Crystalline", "Amorphous")):
        return "Material"
    return None


def _gexf_classify(term, type_map):
    """
    Return the semantic category string for *term*.

    Priority
    --------
    1. Literals → ``"Literal"``.
    2. ``rdf:type`` lookup — exact local-name match, then keyword match.
    3. URI suffix extraction — for ``sample:UUID_TypeSuffix`` URIs whose
       type is a Wikidata QID or is otherwise not in the type map.
    4. URI-scheme shortcuts (``simulation:``, ``property:``, …) for nodes
       that carry no explicit type.
    5. Fallback → ``"Other"``.
    """
    if isinstance(term, RDFLiteral):
        return "Literal"

    uri_str = str(term)
    key = _gexf_term_key(term)

    # ── 1. rdf:type classification (primary) ──────────────────────────────────
    for type_uri in type_map.get(key, ()):
        local = _gexf_local_name(str(type_uri))
        cat = _classify_by_local_name(local)
        if cat is not None:
            return cat

    # ── 2. URI suffix extraction ───────────────────────────────────────────────
    # atomRDF encodes the node's class in the URI suffix, e.g.
    # sample:3f8a..._CrystalStructure
    m = _ATOMRDF_UUID_RE.search(uri_str)
    if m:
        suffix = uri_str[m.end():]          # e.g. "CrystalStructure" or "SimulationCellVector_3"
        suffix = suffix.split("_")[0]       # drop any trailing index component
        cat = _classify_by_local_name(suffix)
        if cat is not None:
            return cat

    # ── 3. Direct local-name of the URI itself ────────────────────────────────
    # Catches pure ontology class URIs such as cmso:SimulationCellVector,
    # cmso:ChemicalElement, asmo:GrainBoundaryEnergy, etc., which appear as
    # the *objects* of rdf:type triples and have no type of their own.
    local = _gexf_local_name(uri_str)
    cat = _classify_by_local_name(local)
    if cat is not None:
        return cat

    # ── 4. URI-scheme shortcuts (fallback for untyped instance nodes) ─────────
    if "sample:" in uri_str:
        return "Sample"
    if any(tok in uri_str for tok in ("simulation:", "activity:", "operation:")):
        return "Calculation"
    if "property:" in uri_str:
        return "Property"
    if "potential:" in uri_str:
        return "Potential"

    return "Other"


[docs] def to_gexf(g, output_file, include_literals=False, positions=None, sizes=None, top_n_labels=None, label_overrides=None, top_label_uris=None, injected_type_map=None): """ Export an RDF graph to GEXF format for visualisation in Gephi. Nodes are coloured by semantic category: =========== ====================== ========================================= Category Colour Covers =========== ====================== ========================================= Sample orange ``#E07B39`` ``cmso:AtomicScaleSample`` instances Material purple ``#9B59B6`` Material description nodes Structure blue ``#2980B9`` Crystal-structure / unit-cell nodes Element green ``#27AE60`` Chemical element / species nodes Calculation red ``#C0392B`` Simulation / activity nodes Potential gold ``#F39C12`` Interatomic potential nodes Property teal ``#16A085`` Calculated-property nodes Literal l.grey ``#BDC3C7`` RDF literal values Other grey ``#95A5A6`` Ontology terms & everything else =========== ====================== ========================================= The ``viz:color`` attribute written into the GEXF file is read natively by Gephi and drives the default node colour. The ``category`` attribute is also stored as a node attribute so it can be used in Gephi's *Partition* panel for colour/size adjustments after import. Parameters ---------- g : rdflib.Graph The graph to serialise (plain, named, or conjunctive graph). output_file : str Destination path for the ``.gexf`` file. include_literals : bool, optional Whether to create a node for every literal value. Default is ``False`` (drops literal nodes and their edges), which produces a cleaner resource-only graph that is easier to explore in Gephi. positions : dict, optional Mapping ``{uri_string: (x, y)}`` of pre-computed layout coordinates. Written as ``viz:position`` elements so Gephi uses them directly. When ``None`` no position attributes are written. sizes : dict, optional Mapping ``{uri_string: float}`` of pre-computed node sizes. Written as ``viz:size`` elements. When ``None`` Gephi uses its default. top_n_labels : int, optional When set, only the *top_n_labels* highest-degree nodes keep a visible label string; all other nodes are exported with an empty label. This is useful when opening in Gephi with "Show node labels" enabled — only the most connected nodes will display text. When ``None`` (the default) every node keeps its label. label_overrides : dict, optional Mapping ``{uri_string: display_label}`` of explicit label replacements. Applied after ``_gexf_label()`` so any URI can be given a clean short name (e.g. ``{"http://www.vasp.at": "VASP"}``). Returns ------- output_file : str The path of the file that was written. """ GEXF_NS = "http://gexf.net/1.3" VIZ_NS = "http://gexf.net/1.3/viz" ET.register_namespace("", GEXF_NS) ET.register_namespace("viz", VIZ_NS) # ── 1. Build rdf:type map (term_key → set of type URIs) ─────────────────── # If the caller pre-built the type_map from a richer graph (e.g. one that # still contains rdf:type triples even though those were stripped from g), # use it directly so node classification still works correctly. if injected_type_map is not None: type_map = injected_type_map else: type_map = {} for s, p, o in g: if p == RDF.type and isinstance(o, URIRef): k = _gexf_term_key(s) type_map.setdefault(k, set()).add(o) # ── 2. Collect unique nodes and directed edges ───────────────────────────── term_to_idx = {} # term_key → integer index node_rows = [] # [(label, category, uri_str), …] edge_rows = [] # [(src_idx, tgt_idx, predicate_local_name), …] def _register(term): k = _gexf_term_key(term) if k not in term_to_idx: idx = len(node_rows) term_to_idx[k] = idx cat = _gexf_classify(term, type_map) uri = str(term) lbl = (label_overrides.get(uri) or _gexf_label(term)) if label_overrides else _gexf_label(term) node_rows.append((lbl, cat, uri)) return term_to_idx[k] for s, p, o in g: if isinstance(o, RDFLiteral) and not include_literals: continue s_idx = _register(s) o_idx = _register(o) edge_rows.append((s_idx, o_idx, _gexf_local_name(str(p)))) # ── 2b. Blank labels for nodes not in the labelled set ──────────────────── # top_label_uris takes priority: an explicit set of URIs to keep labelled. # Fallback: top_n_labels uses internal degree to pick the top-N. if top_label_uris is not None: node_rows = [ (lbl if uri in top_label_uris else "", cat, uri) for lbl, cat, uri in node_rows ] elif top_n_labels is not None: degree = [0] * len(node_rows) for src, tgt, _ in edge_rows: degree[src] += 1 degree[tgt] += 1 threshold = sorted(degree, reverse=True)[min(top_n_labels, len(degree)) - 1] top_idx = set(i for i, d in enumerate(degree) if d >= threshold) if len(top_idx) > top_n_labels: ranked = sorted(top_idx, key=lambda i: degree[i], reverse=True) top_idx = set(ranked[:top_n_labels]) node_rows = [ (lbl if i in top_idx else "", cat, uri) for i, (lbl, cat, uri) in enumerate(node_rows) ] # ── 3. Build XML tree ────────────────────────────────────────────────────── root = ET.Element(f"{{{GEXF_NS}}}gexf", version="1.3") meta_el = ET.SubElement(root, f"{{{GEXF_NS}}}meta") ET.SubElement(meta_el, f"{{{GEXF_NS}}}creator").text = "atomRDF" ET.SubElement(meta_el, f"{{{GEXF_NS}}}description").text = ( "Knowledge graph exported from atomRDF" ) graph_el = ET.SubElement( root, f"{{{GEXF_NS}}}graph", mode="static", defaultedgetype="directed" ) # Node attribute declarations node_attrs_el = ET.SubElement( graph_el, f"{{{GEXF_NS}}}attributes", {"class": "node"} ) ET.SubElement(node_attrs_el, f"{{{GEXF_NS}}}attribute", id="cat", title="category", type="string") ET.SubElement(node_attrs_el, f"{{{GEXF_NS}}}attribute", id="uri", title="uri", type="string") # Edge attribute declarations edge_attrs_el = ET.SubElement( graph_el, f"{{{GEXF_NS}}}attributes", {"class": "edge"} ) ET.SubElement(edge_attrs_el, f"{{{GEXF_NS}}}attribute", id="pred", title="predicate", type="string") # Nodes nodes_el = ET.SubElement(graph_el, f"{{{GEXF_NS}}}nodes") for idx, (lbl, cat, uri) in enumerate(node_rows): r, gv, b = GEXF_CATEGORY_COLORS.get(cat, (149, 165, 166)) node_el = ET.SubElement( nodes_el, f"{{{GEXF_NS}}}node", id=str(idx), label=lbl ) attvals_el = ET.SubElement(node_el, f"{{{GEXF_NS}}}attvalues") ET.SubElement(attvals_el, f"{{{GEXF_NS}}}attvalue", {"for": "cat", "value": cat}) ET.SubElement(attvals_el, f"{{{GEXF_NS}}}attvalue", {"for": "uri", "value": uri}) ET.SubElement(node_el, f"{{{VIZ_NS}}}color", r=str(r), g=str(gv), b=str(b), a="1.0") if positions is not None and uri in positions: px, py = positions[uri] ET.SubElement(node_el, f"{{{VIZ_NS}}}position", x=f"{float(px):.4f}", y=f"{float(py):.4f}", z="0.0") if sizes is not None and uri in sizes: ET.SubElement(node_el, f"{{{VIZ_NS}}}size", value=f"{float(sizes[uri]):.4f}") # Edges edges_el = ET.SubElement(graph_el, f"{{{GEXF_NS}}}edges") for edge_idx, (src, tgt, pred) in enumerate(edge_rows): edge_el = ET.SubElement( edges_el, f"{{{GEXF_NS}}}edge", id=str(edge_idx), source=str(src), target=str(tgt), label=pred, ) attvals_el = ET.SubElement(edge_el, f"{{{GEXF_NS}}}attvalues") ET.SubElement(attvals_el, f"{{{GEXF_NS}}}attvalue", {"for": "pred", "value": pred}) # ── 4. Write file ────────────────────────────────────────────────────────── with open(output_file, "w", encoding="utf-8") as fh: fh.write('<?xml version="1.0" encoding="UTF-8"?>\n') fh.write(ET.tostring(root, encoding="unicode")) # ── 5. Summary ──────────────────────────────────────────────────────────── cat_counts = {} for _, cat, _ in node_rows: cat_counts[cat] = cat_counts.get(cat, 0) + 1 logger.info("Exported to '%s'", output_file) logger.info(" Nodes : %s", f"{len(node_rows):,}") logger.info(" Edges : %s", f"{len(edge_rows):,}") logger.info(" Node categories:") for cat in ("Sample", "Material", "Structure", "Element", "Calculation", "Potential", "Property", "Literal", "Other"): count = cat_counts.get(cat, 0) if count: rv, gv, bv = GEXF_CATEGORY_COLORS[cat] logger.info(" %-12s %6s #%02X%02X%02X", cat, f"{count:,}", rv, gv, bv) return output_file
def get_title_from_BNode(x): return x.toPython()
[docs] def get_string_from_URI(x): """ Extract a presentable string from URI. Parameters ---------- x : rdflib.term.URIRef The URI object to extract the string from. Returns ------- tuple A tuple containing the presentable string representation of the URI and its type. The string representation is the last part of the URI after splitting by '#' or '/'. The type can be either "URIRef" or "BNode". """ raw = x.toPython() # first try splitting by # rawsplit = raw.split("#") if len(rawsplit) > 1: return rawsplit[-1], "URIRef" # try splitting by = for chebi values if "CHEBI" in raw: rawsplit = raw.split("=") rawsplit = rawsplit[-1].split(":") if len(rawsplit) > 1: return ".".join(rawsplit[-2:]), "URIRef" if "sample:" in raw: rawsplit = raw.split(":") if len(rawsplit) > 1: return "_".join(rawsplit), "BNode" if "activity:" in raw: rawsplit = raw.split(":") if len(rawsplit) > 1: return "_".join(rawsplit), "BNode" if "simulation:" in raw: rawsplit = raw.split(":") if len(rawsplit) > 1: return "_".join(rawsplit), "BNode" if "operation:" in raw: rawsplit = raw.split(":") if len(rawsplit) > 1: return "_".join(rawsplit), "BNode" if "property:" in raw: rawsplit = raw.split(":") if len(rawsplit) > 1: return "_".join(rawsplit), "BNode" # just a normal url split now rawsplit = raw.split("/") if len(rawsplit) > 1: return ".".join(rawsplit[-2:]), "URIRef" # none of the conditions worked, which means it's a hex string return raw, "BNode"
[docs] def parse_object(x): """ Parse the given object and return its title and type. Parameters ---------- x : RDF term The RDF term to parse. Returns ------- tuple A tuple containing the title of the object and its type. """ if isinstance(x, BNode): return get_title_from_BNode(x), "BNode" elif isinstance(x, URIRef): return get_string_from_URI(x) elif isinstance(x, Literal): return str(x.title()), "Literal"
styledict = { "BNode": {"color": "#ffe6ff", "shape": "box", "style": "filled"}, "URIRef": {"color": "#ffffcc", "shape": "box", "style": "filled"}, "Literal": {"color": "#e6ffcc", "shape": "ellipse", "style": "filled"}, } def _switch_box(box): if box == "box": return "rectangle" # remember that only boxes will be used, circles no! def _fix_id(string1, istype1): if istype1 == "Literal": id1 = str(uuid.uuid4()) else: id1 = string1 return id1
[docs] def visualize_graph( g, styledict=styledict, rankdir="TB", hide_types=False, workflow_view=False, sample_view=False, size=None, layout="dot", ): """ Visualizes a graph using Graphviz. Parameters ---------- g : dict The graph to visualize. styledict : dict, optional A dictionary containing styles for different types of nodes and edges. Default is `styledict`. rankdir : str, optional The direction of the graph layout. Default is "TB" (top to bottom). hide_types : bool, optional Whether to hide nodes with the "type" attribute. Default is False. workflow_view : bool, optional Whether to enable the workflow view. Default is False. sample_view : bool, optional Whether to enable the sample view. Default is False. size : str, optional The size of the graph. Default is None. layout : str, optional The layout algorithm to use. Default is "dot". Returns ------- dot : graphviz.Digraph The graph visualization. """ dot = graphviz.Digraph() dot.attr( rankdir=rankdir, style="filled", size=size, layout=layout, overlap="false", ) for k in g: string1, istype1 = parse_object(k[0]) string2, istype2 = parse_object(k[2]) string3, istype = parse_object(k[1]) plot = True if workflow_view: # we collapse sample information # if cmso.connector is found, only use it is it is cmso.hasCalculated # all sub sample props, indicated by sample_x_jsjsj will be ignored. green_list = ["hasCalculatedProperty", "wasCalculatedBy", "hasValue"] ssplit = string3.split(".") if len(ssplit) == 2: if (ssplit[0] == "cmso") and (ssplit[1] not in green_list): plot = False if string3 == "subClassOf": plot = False ssplit = string2.split(".") if string3 == "type": if (ssplit[0] == "cmso") and (ssplit[1] not in ["CalculatedProperty"]): plot = False if (ssplit[0] == "cmso") and (ssplit[1] in ["AtomicScaleSample"]): dot.node( string1, label=string1, shape=styledict[istype1]["shape"], style=styledict[istype1]["style"], color=styledict[istype1]["color"], fontsize=styledict[istype1]["fontsize"], fontname=styledict[istype1]["fontname"], ) plot = False elif sample_view: green_list = ['wasDerivedFrom', 'wasGeneratedBy'] if string3 not in green_list: plot = False if hide_types and (string3 == "type"): plot = False if not plot: continue if istype1 == "Literal": id1 = str(uuid.uuid4()) else: id1 = string1 dot.node( id1, label=string1, shape=styledict[istype1]["shape"], style=styledict[istype1]["style"], color=styledict[istype1]["color"], fontsize=styledict[istype1]["fontsize"], fontname=styledict[istype1]["fontname"], ) if istype2 == "Literal": id2 = str(uuid.uuid4()) else: id2 = string2 dot.node( id2, label=string2, shape=styledict[istype2]["shape"], style=styledict[istype2]["style"], color=styledict[istype2]["color"], fontsize=styledict[istype2]["fontsize"], fontname=styledict[istype2]["fontname"], ) dot.edge( id1, id2, color=styledict["edgecolor"], label=string3, fontsize=styledict[istype2]["fontsize"], fontname=styledict[istype2]["fontname"], ) return dot
def _id(item): return str(item).replace(':', '_') def visualize_provenance( prov, rankdir="TB", size=None, layout="dot", ): dot = graphviz.Digraph() dot.attr( rankdir=rankdir, style="filled", size=size, layout=layout, overlap="false", ) #add all nodes for key in prov.keys(): nid = _id(key) #if "activity" in key: dot.node(nid, label=prov[key]['label'], shape='box', color="#C9DAF8", style="filled", fontname='Helvetica', fontsize='8') #else: # dot.node(nid, label=prov[key]['label'], # shape='parallelogram', # color="#C9DAF8", # style="filled", # fontname='Helvetica', # fontsize='8') #add all edges for key, val in prov.items(): if 'inputs' in val.keys(): if val['operation'] == 'input_parameter': for subkey, subval in val['inputs'].items(): dot.edge(_id(subval), _id(key), label='input_param', color="#263238", fontname='Helvetica', fontsize='8') if val['operation'] == 'output_parameter': for subkey, subval in val['inputs'].items(): dot.edge(_id(subval), _id(key), label='output_param', color="#263238", fontname='Helvetica', fontsize='8') elif val['operation'] == 'sample_for_activity': for subkey, subval in val['inputs'].items(): dot.edge(_id(subval), _id(key), label='input_sample', color="#263238", fontname='Helvetica', fontsize='8') elif val['operation'] == 'sample_output': for subkey, subval in val['inputs'].items(): dot.edge(_id(subval), _id(key), label='output_sample', color="#263238", fontname='Helvetica', fontsize='8') else: operation_id = str(uuid.uuid4()) operation = dot.node(operation_id, label=val['operation'], color="#E6B8AF", shape='box', style='filled', fontname='Helvetica', fontsize='8') for subkey, subval in val['inputs'].items(): dot.edge(_id(subval), operation_id, label='input', color="#263238", fontname='Helvetica', fontsize='8') dot.edge(operation_id, _id(key), label='output', color="#263238", fontname='Helvetica', fontsize='8') return dot