import networkx as nx
import graphviz
import matplotlib.pyplot as plt
import numpy as np
import os
import warnings
from pyscal3.atoms import AttrSetter
import copy
from atomrdf.network.parser import OntoParser
from atomrdf.network.term import OntoTerm, strip_name
from functools import partial
owlfile = os.path.join(os.path.dirname(__file__), "../data/cmso.owl")
def _replace_name(name):
return ".".join(name.split(":"))
[docs]
class OntologyNetwork:
"""
Network representation of Onto
"""
def __init__(self, infile=None, delimiter="/"):
if infile is None:
infile = owlfile
self.g = nx.DiGraph()
self.onto = OntoParser(infile, delimiter=delimiter)
self.onto.attributes["data_node"] = []
self.data_prefix = "value"
self.terms = AttrSetter()
self._parse_all()
def _assign_attributes(self):
mapdict = {}
# add first level - namespaces
for key in self.namespaces.keys():
mapdict[key] = {}
# now iterate over all attributes
for k1 in ["class", "object_property", "data_property"]:
for k2, val in self.onto.attributes[k1].items():
mapdict[val.namespace][val.name_without_prefix] = val
self.terms._add_attribute(mapdict)
def _parse_all(self):
# call methods
self._add_class_nodes()
self._add_object_properties()
self._add_data_properties()
self._assign_attributes()
def __add__(self, ontonetwork):
# add onto network
self.onto = self.onto + ontonetwork.onto
# now parse again
self._parse_all()
return self
def strip_name(self, name):
raw = name.split(":")
if len(raw) > 1:
return raw[-1]
return name
@property
def attributes(self):
return self.onto.attributes
@property
def namespaces(self):
return self.onto.namespaces
@property
def extra_namespaces(self):
return self.onto.extra_namespaces
def __radd__(self, ontonetwork):
return self.__add__(ontonetwork)
def _add_class_nodes(self):
for key, val in self.onto.attributes["class"].items():
self.g.add_node(val.name, node_type="class")
def _add_object_properties(self):
for key, val in self.onto.attributes["object_property"].items():
self.g.add_node(val.name, node_type="object_property")
# find domain
for d in val.domain:
self.g.add_edge(d, val.name)
for r in val.range:
self.g.add_edge(val.name, r)
def _add_data_properties(self):
for key, val in self.onto.attributes["data_property"].items():
self.g.add_node(val.name, node_type="data_property")
for d in val.domain:
self.g.add_edge(d, val.name)
for r in val.range:
data_node = f"{val.name}{self.data_prefix}"
self.onto.attributes["data_node"].append(data_node)
self.g.add_node(data_node, node_type="literal", data_type=r)
self.g.add_edge(val.name, data_node)
[docs]
def add_namespace(self, namespace_name, namespace_iri):
"""
Add a new namespace.
Parameters
----------
namespace_name : str
The name of the namespace to add.
namespace_iri : str
The IRI of the namespace.
Raises
------
KeyError
If the namespace already exists.
"""
if namespace_name not in self.onto.namespaces.keys():
self.onto.namespaces[namespace_name] = namespace_iri
else:
raise KeyError("namespace is already there!")
[docs]
def add_term(
self,
uri,
node_type,
namespace=None,
dm=(),
rn=(),
data_type=None,
node_id=None,
delimiter="/",
):
"""
Add a node.
Parameters
----------
uri : str
The URI of the node.
node_type : str
The type of the node.
namespace : str, optional
The namespace of the node.
dm : list, optional
The domain metadata of the node.
rn : list, optional
The range metadata of the node.
data_type : str, optional
The data type of the node.
node_id : str, optional
The ID of the node.
delimiter : str, optional
The delimiter used for parsing the URI.
Raises
------
ValueError
If the namespace is not found.
"""
term = OntoTerm(
uri,
namespace=namespace,
node_type=node_type,
dm=dm,
rn=rn,
data_type=data_type,
node_id=node_id,
delimiter=delimiter,
)
if not term.namespace in self.onto.namespaces.keys():
raise ValueError("Namespace not found, first add namespace")
self.onto.attributes[node_type][term.name] = term
self._assign_attributes()
[docs]
def add_path(self, triple):
"""
Add a triple as path.
Note that all attributes of the triple should already exist in the graph.
The ontology itself is not modified. Only the graph representation of it is.
The expected use is to bridge between two (or more) different ontologies.
Parameters
----------
triple : tuple
A tuple representing the triple to be added. The tuple should contain three elements:
subject, predicate, and object.
Raises
------
ValueError
If the subject or object of the triple is not found in the attributes of the ontology.
"""
sub = triple[0]
pred = triple[1]
obj = triple[2]
if sub not in self.onto.attributes["class"].keys():
raise ValueError(f"{sub} not found in self.attributes")
# now add
subclasses = self.onto._get_subclasses(sub)
for subclass in subclasses:
self.g.add_edge(subclass, pred)
# now add pred
if pred in self.onto.attributes["object_property"].keys():
if obj not in self.onto.attributes["class"].keys():
raise ValueError(f"{obj} not found in self.attributes")
subclasses = self.onto._get_subclasses(obj)
for subclass in subclasses:
self.g.add_edge(pred, subclass)
# another possibility it is data property
elif pred in self.onto.attributes["data_property"].keys():
data_node = f"{pred}{self.data_prefix}"
self.g.add_node(data_node, node_type="literal", data_type=obj)
self.g.add_edge(pred, data_node)
else:
raise ValueError(f"{pred} not found in self.attributes")
[docs]
def draw(self,
styledict={
"class": {"shape": "box"},
"object_property": {"shape": "ellipse"},
"data_property": {"shape": "ellipse"},
"literal": {"shape": "parallelogram"},
},):
"""
Draw the network graph using graphviz.
Parameters
----------
styledict : dict, optional
A dictionary specifying the styles for different node types.
The keys of the dictionary are the node types, and the values are dictionaries
specifying the shape for each node type. Defaults to None.
Returns
-------
graphviz.Digraph
The graph object representing the network graph.
Example
-------
styledict = {
"class": {"shape": "box"},
"object_property": {"shape": "ellipse"},
"data_property": {"shape": "ellipse"},
"literal": {"shape": "parallelogram"},
}
network.draw(styledict)
"""
dot = graphviz.Digraph()
node_list = list(self.g.nodes(data="node_type"))
edge_list = list(self.g.edges)
for node in node_list:
name = _replace_name(node[0])
if node[1] is not None:
t = node[1]
dot.node(name, shape=styledict[t]["shape"], fontsize="6")
for edge in edge_list:
dot.edge(_replace_name(edge[0]), _replace_name(edge[1]))
return dot
def _get_shortest_path(self, source, target):
#this function will be modified to take OntoTerms direcl as input; and use their names.
path = nx.shortest_path(self.g, source=source.query_name, target=target.query_name)
#replace the start and end with thier corresponding variable names
path[0] = source.variable_name
path[-1] = target.variable_name
return path
[docs]
def get_shortest_path(self, source, target, triples=False):
"""
Compute the shortest path between two nodes in the graph.
Parameters:
-----------
source : node
The starting node for the path.
target : node
The target node for the path.
triples : bool, optional
If True, returns the path as a list of triples. Each triple consists of three consecutive nodes in the path.
If False, returns the path as a list of nodes.
Returns:
--------
path : list
The shortest path between the source and target nodes. If `triples` is True, the path is returned as a list of triples.
If `triples` is False, the path is returned as a list of nodes.
"""
#this function should also check for stepped queries
path = []
if len(target._parents) > 0:
#this needs a stepped query
complete_list = [source, *target._parents, target]
#get path for first two terms
path = self._get_shortest_path(complete_list[0], complete_list[1])
for x in range(2, len(complete_list)):
temp_source = complete_list[x-1]
temp_dest = complete_list[x]
temp_path = self._get_shortest_path(temp_source, temp_dest)
path.extend(temp_path[1:])
else:
path = self._get_shortest_path(source, target)
if triples:
triple_list = []
for x in range(len(path) // 2):
triple_list.append(path[2 * x : 2 * x + 3])
return triple_list
return path
[docs]
def get_path_from_sample(self, target):
"""
Get the shortest path from the 'cmso:ComputationalSample' node to the target node.
Parameters
----------
target : OntoTerm
The target node to find the shortest path to.
Returns
-------
list
A list of triples representing the shortest path from 'cmso:ComputationalSample' to the target node.
"""
#get the path
path = self.get_shortest_path(
source=self.terms.cmso.AtomicScaleSample, target=target, triples=True
)
return path
[docs]
def create_query(self, source, destinations, enforce_types=True):
"""
Create a SPARQL query string based on the given source, destinations, condition, and enforce_types.
Parameters
----------
source : Node
The source node from which the query starts.
destinations : list or Node
The destination node(s) to which the query should reach. If a single node is provided, it will be converted to a list.
enforce_types : bool, optional
Whether to enforce the types of the source and destination nodes in the query. Defaults to True.
Returns
-------
str
The generated SPARQL query string.
"""
#if not list, convert to list
if not isinstance(destinations, list):
destinations = [destinations]
# check if more than one of them have an associated condition -> if so throw error
no_of_conditions = 0
for destination in destinations:
if destination._condition is not None:
no_of_conditions += 1
if no_of_conditions > 1:
raise ValueError("Only one condition is allowed")
#iterate through the list, if they have condition parents, add them explicitely
for destination in destinations:
for parent in destination._condition_parents:
if parent.variable_name not in [d.variable_name for d in destinations]:
destinations.append(parent)
#all names are now collected, in a list of lists
# start prefix of query
query = []
for key, val in self.namespaces.items():
query.append(f"PREFIX {key}: <{val}>")
for key, val in self.extra_namespaces.items():
query.append(f"PREFIX {key}: <{val}>")
#construct the select distinct command:
#add source `variable_name`
#iterate over destinations, add their `variable_name`
select_destinations = [
"?"+destination.variable_name for destination in destinations
]
select_destinations = ["?"+source.variable_name] + select_destinations
query.append(f'SELECT DISTINCT {" ".join(select_destinations)}')
query.append("WHERE {")
#constructing the spaql query path triples, by iterating over destinations
#for each destination:
# - check if it has parent by looking at `._parents`
# - if it has `_parents`, called step path method
# - else just get the path
# - replace the ends of the path with `variable_name`
# - if it deosnt exist in the collection of lines, add the lines
all_triplets = {}
for count, destination in enumerate(destinations):
#print(source, destination)
triplets = self.get_shortest_path(source, destination, triples=True)
#print(triplets)
for triple in triplets:
#print(triple)
line_text = " ?%s %s ?%s ."% ( triple[0].replace(":", "_"),
triple[1],
triple[2].replace(":", "_"),
)
if line_text not in query:
query.append(line_text)
# we enforce types of the source and destination
if enforce_types:
if source.node_type == "class":
query.append(
" ?%s rdf:type %s ."
% (self.strip_name(source.variable_name), source.query_name)
)
for destination in destinations:
if destination.node_type == "class":
query.append(
" ?%s rdf:type %s ."
% (
destination.variable_name,
destination.query_name,
)
)
#- formulate the condition, given by the `FILTER` command:
# - extract the filter text from the term
# - loop over destinations:
# - call `replace(destination.query_name, destination.variable_name)`
filter_text = ""
# make filters; get all the unique filters from all the classes in destinations
for destination in destinations:
if destination._condition is not None:
filter_text = destination._condition
break
#replace the query_name with variable_name
if filter_text != "":
for destination in destinations:
filter_text = filter_text.replace(
destination.query_name, destination.variable_name
)
query.append(f"FILTER {filter_text}")
query.append("}")
#finished, clean up the terms;
for destination in destinations:
destination.refresh()
return "\n".join(query)