"""
Graph module contains the basic RDFGraph object in atomrdf. This object gets a structure
as an input and annotates it with the CMSO ontology (PLDO and PODO too as needed). The annotated
object is stored in triplets.
NOTES
-----
- To ensure domain and range checking works as expected, always add type before adding further properties!
Classes
-------
- KnowledgeGraph: Represents a knowledge graph that stores and annotates structure objects.
Attributes
----------
- defstyledict: A dictionary containing default styles for visualizing the graph.
"""
from rdflib import Graph, XSD, RDF, RDFS, BNode, URIRef
import os
import numpy as np
import inspect
from ase.io import write
import copy
import pandas as pd
import yaml
import uuid
import json
import shutil
import tarfile
import logging
import warnings
import re
import pickle
# from pyscal3.core import System
from pyscal3.atoms import Atoms
from atomrdf.visualize import visualize_graph, visualize_provenance
from atomrdf.network.network import OntologyNetwork
from atomrdf.network.ontology import read_ontology
from atomrdf.structure import System
import atomrdf.properties as prp
from atomrdf.stores import create_store, purge
import atomrdf.json_io as json_io
from atomrdf.workflow.workflow import Workflow
from atomrdf.sample import Sample
import atomrdf.mp as amp
from atomrdf.namespace import Namespace, CMSO, PLDO, PODO, ASMO, PROV, MATH, UNSAFECMSO, UNSAFEASMO, Literal
# read element data file
file_location = os.path.dirname(__file__).split("/")
file_location = "/".join(file_location[:-1])
file_location = os.path.join(os.path.dirname(__file__), "data/element.yml")
with open(file_location, "r") as fin:
element_indetifiers = yaml.safe_load(fin)
defstyledict = {
"edgecolor": "#263238",
"BNode": {
"color": "#D9D9D9",
"shape": "box",
"style": "filled",
"fontsize": "8",
"fontname": "Helvetica",
},
"URIRef": {
"color": "#C9DAF8",
"shape": "box",
"style": "filled",
"fontsize": "8",
"fontname": "Helvetica",
},
"Literal": {
"color": "#E6B8AF",
"shape": "parallelogram",
"style": "filled",
"fontsize": "8",
"fontname": "Helvetica",
},
}
def _clean_string(input_string):
input_string = re.sub(r'\W', '_', input_string)
if input_string[0].isdigit():
input_string = "s" + input_string
return input_string
def _replace_keys(refdict, indict):
for key, val in indict.items():
if key in refdict.keys():
if isinstance(val, dict):
_replace_keys(refdict[key], indict[key])
else:
refdict[key] = val
return refdict
def _dummy_log(str):
pass
def _name(term):
try:
return str(term.toPython())
except:
return str(term)
def _prepare_log(file):
logger = logging.getLogger(__name__)
handler = logging.FileHandler(file)
formatter = logging.Formatter("%(asctime)s %(name)-12s %(levelname)-8s %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.propagate = False
return logger
[docs]
class KnowledgeGraph:
"""
Represents a knowledge graph.
Parameters
----------
graph_file : str, optional
The path to the graph file to be parsed. Default is None.
store : str, optional
The type of store to use. Default is "Memory".
store_file : str, optional
The path to the store file. Default is None.
identifier : str, optional
The identifier for the graph. Default is "http://default_graph".
ontology : Ontology, optional
The ontology object to be used. Default is None.
structure_store : StructureStore, optional
The structure store object to be used. Default is None.
enable_log : bool, optional
Whether to enable logging. Default is False.
If true, a log file named atomrdf.log will be created in the current working directory.
Attributes
----------
graph : rdflib.Graph
The RDF graph.
sgraph : rdflib.Graph
The structure graph for a single chosen sample
ontology : Ontology
The ontology object.
terms : dict
The dictionary of ontology terms.
store : str
The type of store used.
Methods
-------
add_structure(structure)
Add a structure to the knowledge graph.
add(triple, validate=True)
Add a triple to the knowledge graph.
triples(triple)
Return the triples in the knowledge graph that match the given triple pattern.
"""
def __init__(
self,
graph_file=None,
store="Memory",
store_file=None,
identifier="http://default_graph",
ontology=None,
structure_store=None,
enable_log=False,
):
"""
Initialize the KnowledgeGraph object.
Parameters
----------
graph_file : str, optional
The path to the graph file to be parsed. Default is None.
store : str, optional
The type of store to use. Default is "Memory".
store_file : str, optional
The path to the store file. Default is None.
identifier : str, optional
The identifier for the graph. Default is "http://default_graph".
ontology : Ontology, optional
The ontology object to be used. Default is None.
structure_store : StructureStore, optional
The structure store object to be used. Default is None.
enable_log : bool, optional
Whether to enable logging. Default is False.
"""
create_store(
self,
store,
identifier,
store_file=store_file,
structure_store=structure_store,
)
self._store = store
self._identifier = identifier
self._store_file = store_file
# enable logging
if enable_log:
logger = _prepare_log(os.path.join(os.getcwd(), "atomrdf.log"))
self.log = logger.info
else:
self.log = _dummy_log
# start binding
self.graph.bind("cmso", CMSO)
self.graph.bind("pldo", PLDO)
if graph_file is not None:
if os.path.exists(graph_file):
self.graph.parse(graph_file)
self.sgraph = None
if ontology is None:
ontology = read_ontology()
self.ontology = ontology
self.terms = self.ontology.terms
self.store = store
self._n_triples = 0
self._initialize_graph()
self.workflow = Workflow(self)
[docs]
def purge(self, force=False):
"""
Remove all information from the KnowledgeGraph.
Parameters
----------
force : bool, optional
Whether to proceed with purging the graph. Default is False.
Returns
-------
None
Notes
-----
This method removes all information from the KnowledgeGraph. If the `force` parameter is set to False, a warning is issued before proceeding with the purging.
"""
if not force:
warnings.warn('This will remove all information from the KnowledgeGraph. Call with force=True to proceed.')
return
else:
#clean up files
sample_files = self.sample_files
for file in sample_files:
if os.path.exists(file):
os.remove(file)
graph = purge(self._store, self._identifier, self._store_file)
self.graph = graph
self._n_triples = 0
[docs]
def add_structure(self, structure):
"""
Add a structure to the knowledge graph.
Parameters
----------
structure : Structure
The structure object to be added.
Returns
-------
None
Notes
-----
This method adds a structure object to the knowledge graph. The structure object should be an instance of the Structure class.
The structure object is assigned to the graph and converted to RDF format.
"""
structure.graph = self
structure.to_graph()
def _is_valid(self, input_list):
valid = False
flat_list = []
for x in input_list:
if isinstance(x, list):
flat_list.extend(x)
else:
flat_list.append(x)
for x in flat_list:
if x is not None:
valid = True
break
return valid
def _is_ontoterm(self, term):
return type(term).__name__ == "OntoTerm"
def _is_uriref(self, term):
return type(term).__name__ == "URIRef"
def _is_bnode(self, term):
return not term.toPython().startswith("http")
def _modify_triple(self, triple):
modified_triple = []
for term in triple:
if self._is_ontoterm(term):
modified_triple.append(term.namespace_object)
else:
modified_triple.append(term)
return tuple(modified_triple)
def _check_domain_if_uriref(self, triple):
found = True
dm = self.value(triple[0], RDF.type)
if dm is not None:
# we need to check
domain = triple[1].domain
if len(domain) > 0:
if "owl:Thing" not in domain:
if triple[1].namespace_with_prefix not in dm:
# cross ontology term
self.log(
f"ignoring possible cross ontology connection between {triple[1].namespace} and {dm}"
)
return True, None
found = False
for d in domain:
if d.split(":")[-1] in dm:
found = True
break
return found, dm
# def _check_domain_if_ontoterm(self, triple):
# found = True
# domain = triple[0].domain
# if len(domain) > 0:
# if 'owl:Thing' not in domain:
# if triple[1].namespace != triple[0].namespace:
# #cross ontology term
# self.log(f'ignoring possible cross ontology connection between {triple[1].namespace} and {triple[0].namespace}')
# return True, None
# found = False
# if triple[1].name in domain:
# found = True
# return found, triple[0].name
def _check_domain(self, triple):
if self._is_ontoterm(triple[1]):
# check if type was provided
found = True
dm = None
if type(triple[0]).__name__ == "URIRef":
found, dm = self._check_domain_if_uriref(triple)
# elif self._is_ontoterm(triple[0]):
# found, dm = self._check_domain_if_ontoterm(triple)
if not found:
raise ValueError(f"{dm} not in domain of {triple[1].name}")
self.log(f"checked {triple[1].name} against domain {dm}")
def _check_range_if_uriref(self, triple):
found = True
rn = self.value(triple[2], RDF.type)
if rn is not None:
# we need to check
rang = triple[1].range
if len(rang) > 0:
if "owl:Thing" not in rang:
if triple[1].namespace_with_prefix not in rn:
# cross ontology term
self.log(
f"ignoring possible cross ontology connection between {triple[1].namespace} and {rn}"
)
return True, None
found = False
for r in rang:
if r.split(":")[-1] in rn:
found = True
break
return found, rn
# def _check_range_if_ontoterm(self, triple):
# found = True
# rang = triple[1].range
# if len(rang) > 0:
# if 'owl:Thing' not in rang:
# if triple[1].namespace != triple[2].namespace:
# #cross ontology term
# self.log(f'ignoring possible cross ontology connection between {triple[1].namespace} and {triple[2].namespace}')
# return True, None
#
# found = False
# if triple[2].name in rang:
# found = True
# return found, triple[2].name
def _check_range_if_literal(self, triple):
found = True
if triple[2].datatype is None:
self.log(
f"WARNING: {triple[1].name} has a range with unspecified datatype!"
)
warnings.warn(f"{triple[1].name} has a range with unspecified datatype!")
return True, None
destination_range = triple[2].datatype.toPython().split("#")[-1]
if destination_range == "string":
destination_range = "str"
elif destination_range == "integer":
destination_range = "int"
rang = triple[1].range
if len(rang) > 0:
found = False
if destination_range in rang:
found = True
return found, destination_range
def _check_range(self, triple):
if self._is_ontoterm(triple[1]):
# check if type was provided
found = True
dm = None
if type(triple[2]).__name__ == "URIRef":
found, dm = self._check_range_if_uriref(triple)
# elif self._is_ontoterm(triple[2]):
# found, dm = self._check_range_if_ontoterm(triple)
elif type(triple[2]).__name__ == "Literal":
found, dm = self._check_range_if_literal(triple)
if not found:
raise ValueError(f"{dm} not in range of {triple[1].name}")
self.log(f"checked {triple[1].name} against range {dm}")
[docs]
def add(self, triple, validate=True):
"""
Add a triple to the knowledge graph.
Parameters
----------
triple : tuple
The triple to be added in the form (subject, predicate, object).
validate : bool, optional
Whether to validate the triple against the domain and range. Default is True.
Returns
-------
None
Notes
-----
This method adds a triple to the knowledge graph. The triple should be provided as a tuple in the form (subject, predicate, object).
By default, the triple is validated against the domain and range. If the `validate` parameter is set to False, the validation is skipped.
Examples
--------
>>> graph = Graph()
>>> graph.add(("Alice", "likes", "Bob"))
>>> graph.add(("Bob", "age", 25), validate=False)
"""
modified_triple = self._modify_triple(triple)
self.log(f"attempting to add triple: {self._n_triples}")
self.log(f"- {modified_triple[0].toPython()}")
self.log(f"- {modified_triple[1].toPython()}")
self.log(f"- {modified_triple[2].toPython()}")
if validate:
self._check_domain(triple)
self._check_range(triple)
if str(modified_triple[2].toPython()) == "None":
self.log(f"rejecting None valued triple")
return
self.graph.add(modified_triple)
self._n_triples += 1
self.log("added")
[docs]
def triples(self, triple):
"""
Return the triples in the knowledge graph that match the given triple pattern.
Parameters
----------
triple : tuple
The triple pattern to match in the form (subject, predicate, object).
Returns
-------
generator
A generator that yields the matching triples.
Examples
--------
>>> graph = KnowledgeGraph()
>>> graph.add(("Alice", "likes", "Bob"))
>>> graph.add(("Alice", "dislikes", "Charlie"))
>>> graph.add(("Bob", "likes", "Alice"))
>>> for triple in graph.triples(("Alice", None, None)):
... print(triple)
('Alice', 'likes', 'Bob')
('Alice', 'dislikes', 'Charlie')
"""
modified_triple = self._modify_triple(triple)
return self.graph.triples(modified_triple)
[docs]
def value(self, arg1, arg2):
"""
Get the value of a triple in the knowledge graph.
Parameters
----------
arg1 : object
The subject of the triple.
arg2 : object
The predicate of the triple.
Returns
-------
object or None
The value of the triple if it exists, otherwise None.
Notes
-----
This method retrieves the value of a triple in the knowledge graph. The triple is specified by providing the subject and predicate as arguments.
If the triple exists in the graph, the corresponding value is returned. If the triple does not exist, None is returned.
Examples
--------
>>> graph = KnowledgeGraph()
>>> graph.add(("Alice", "likes", "Bob"))
>>> value = graph.value("Alice", "likes")
>>> print(value)
Bob
"""
modified_double = self._modify_triple((arg1, arg2))
return self.graph.value(modified_double[0], modified_double[1])
[docs]
def remove(self, triple):
"""
Remove a triple from the knowledge graph.
Parameters
----------
triple : tuple
The triple to be removed in the form (subject, predicate, object).
Returns
-------
None
Notes
-----
This method removes a triple from the knowledge graph. The triple should be provided as a tuple in the form (subject, predicate, object).
Examples
--------
>>> graph = KnowledgeGraph()
>>> graph.add(("Alice", "likes", "Bob"))
>>> graph.remove(("Alice", "likes", "Bob"))
"""
modified_triple = self._modify_triple(triple)
return self.graph.remove(modified_triple)
[docs]
def create_node(self, namestring, classtype, label=None):
"""
Create a new node in the graph.
Parameters
----------
namestring : str
The name of the node.
classtype : Object from a given ontology
The class type of the node.
Returns
-------
URIRef
The newly created node.
"""
item = URIRef(namestring)
self.add((item, RDF.type, classtype))
if label is not None:
self.add((item, RDFS.label, Literal(_clean_string(label))))
return item
def _initialize_graph(self):
"""
Create the RDF Graph from the data stored
Parameters
----------
names: bool
if True, alphanumeric names will be used instead of random BNodes
name_index: string
Prefix to be added to identifiers, default 01
Returns
-------
None
"""
# extra triples
self.add((CMSO.SimulationCellLength, RDFS.subClassOf, CMSO.Length))
self.add((CMSO.LatticeParameter, RDFS.subClassOf, CMSO.Length))
self.add(
(CMSO.Length, CMSO.hasUnit, URIRef("http://qudt.org/vocab/unit/ANGSTROM"))
)
self.add((CMSO.SimulationCellAngle, RDFS.subClassOf, CMSO.Angle))
self.add((CMSO.LatticeAngle, RDFS.subClassOf, CMSO.Angle))
self.add((CMSO.Angle, CMSO.hasUnit, URIRef("http://qudt.org/vocab/unit/DEG")))
self.add((CMSO.LatticeVector, RDFS.subClassOf, CMSO.Vector))
self.add((CMSO.SimulationCellVector, RDFS.subClassOf, CMSO.Vector))
# self.add((CMSO.PositionVector, RDFS.subClassOf, CMSO.Vector))
self.add(
(CMSO.Vector, CMSO.hasUnit, URIRef("http://qudt.org/vocab/unit/ANGSTROM"))
)
[docs]
def add_calculated_quantity(self, sample, propertyname, value, unit=None):
"""
Add a calculated quantity to a sample.
Parameters
----------
sample : URIRef
The URIRef of the sample to which the calculated quantity is being added.
propertyname : str
The name of the calculated property.
value : str
The value of the calculated property.
unit : str, optional
The unit of the calculated property. Default is None.
The unit should be from QUDT. See http://qudt.org/vocab/unit/
Returns
-------
None
Notes
-----
This method adds a calculated quantity to a sample in the knowledge graph. The calculated quantity is represented as a triple with the sample as the subject, the calculated property as the predicate, and the value as the object. The calculated property is created as a node in the graph with the given name and value. If a unit is provided, it is also added as a property of the calculated property node.
Examples
--------
>>> graph = KnowledgeGraph()
>>> sample = graph.create_node("Sample1", CMSO.Sample)
>>> graph.add_calculated_quantity(sample, "energy", "10.5", "eV")
"""
prop = self.create_node(f"{sample}_{propertyname}", CMSO.CalculatedProperty)
self.add((sample, CMSO.hasCalculatedProperty, prop))
self.add((prop, RDFS.label, Literal(propertyname)))
self.add((prop, ASMO.hasValue, Literal(value)))
if unit is not None:
self.add((prop, ASMO.hasUnit, URIRef(f"http://qudt.org/vocab/unit/{unit}")))
[docs]
def inspect_sample(self, sample):
"""
Inspects a sample and retrieves information about its atoms, material, defects, composition,
crystal structure, space group, calculated properties, and units.
Parameters
----------
sample: The sample to inspect.
Returns
-------
string: A string containing the information about the sample.
"""
natoms = self.value(sample, CMSO.hasNumberOfAtoms).toPython()
material = list([k[2] for k in self.triples((sample, CMSO.hasMaterial, None))])[
0
]
defects = list([k[2] for k in self.triples((material, CMSO.hasDefect, None))])
composition = list(
[
k[2].toPython()
for k in self.triples((material, CMSO.hasElementRatio, None))
]
)
crystalstructure = self.value(material, CMSO.hasStructure)
spacegroupsymbol = self.value(crystalstructure, CMSO.hasSpaceGroupSymbol).toPython()
lattice = self.value(sample, CMSO.hasNumberOfAtoms).toPython()
defect_types = list([self.value(d, RDF.type).toPython() for d in defects])
prop_nodes = list(
[k[2] for k in self.triples((sample, CMSO.hasCalculatedProperty, None))]
)
props = list([self.value(prop_node, RDFS.label) for prop_node in prop_nodes])
propvals = list([self.value(d, ASMO.hasValue).toPython() for d in prop_nodes])
units = list([self.value(d, ASMO.hasUnit).toPython() for d in prop_nodes])
st = []
st.append(f"Sample with {natoms} atoms.\n")
st.append("Material:\n")
st.append(" ".join(composition))
st.append("\n")
st.append(f"Space Group symbol: {spacegroupsymbol}\n")
if len(defect_types) > 0:
st.append("With defects:\n")
for d in defect_types:
st.append(f"{d}\n")
if len(props) > 0:
st.append("With calculated properties:\n")
for x in range(len(props)):
st.append(
f"{props[x]} with value: {propvals[x]} and unit: {units[x]}\n"
)
return " ".join(st)
[docs]
def visualize(self, *args, **kwargs):
"""
Visualizes the graph using the specified arguments.
This method is a wrapper around the `visualise` method and passes the same arguments to it.
Parameters
----------
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns
-------
dot: The visualization of the RDF tree.
"""
self.visualise(*args, **kwargs)
[docs]
def visualise(
self,
styledict=None,
rankdir="BT",
hide_types=False,
workflow_view=False,
sample_view=False,
size=None,
layout="neato",
):
"""
Visualize the RDF tree of the Graph.
Parameters
----------
styledict : dict, optional
If provided, allows customization of color and other properties.
rankdir : str, optional
The direction of the graph layout. Default is "BT" (bottom to top).
hide_types : bool, optional
Whether to hide the types in the visualization. Default is False.
workflow_view : bool, optional
Whether to enable the workflow view. Default is False.
sample_view : bool, optional
Whether to enable the sample view. Default is False.
size : tuple, optional
The size of the visualization. Default is None.
layout : str, optional
The name of the layout algorithm for the graph. Default is "neato".
Returns
-------
graphviz.dot.Digraph
The visualization of the RDF tree.
Notes
-----
The `styledict` parameter allows customization of the visualization style.
It has the following options:
BNode:
color : str
The color of the BNode boxes.
shape : str
The shape of the BNode boxes.
style : str
The style of the BNode boxes.
URIRef:
color : str
The color of the URIRef boxes.
shape : str
The shape of the URIRef boxes.
style : str
The style of the URIRef boxes.
Literal:
color : str
The color of the Literal boxes.
shape : str
The shape of the Literal boxes.
style : str
The style of the Literal boxes.
"""
if size is not None:
size = f"{size[0]},{size[1]}"
sdict = defstyledict.copy()
if styledict is not None:
sdict = _replace_keys(sdict, styledict)
return visualize_graph(
self.graph,
styledict=sdict,
rankdir=rankdir,
hide_types=hide_types,
workflow_view=workflow_view,
sample_view=sample_view,
size=size,
layout=layout,
)
[docs]
def write(self, filename, format="json-ld"):
"""
Write the serialised version of the graph to a file
Parameters
----------
filename: string
name of output file
format: string, {'turtle', 'xml', 'json-ld', 'ntriples', 'n3'}
output format to be written to
Returns
-------
None
"""
with open(filename, "w") as fout:
fout.write(self.graph.serialize(format=format))
[docs]
def close(self, filename, format="json-ld"):
"""
Close the graph and write to a file
Parameters
----------
filename: string
name of output file
Returns
-------
None
"""
self.write(filename, format=format)
[docs]
def archive(self, package_name, format="turtle", compress=True, add_simulations=False):
"""
Publish a dataset from graph including per atom quantities.
Parameters:
-----------
package_name : str
The name of the package to be created.
format : str, optional
The format in which the dataset should be written. Default is "turtle".
compress : bool, optional
Whether to compress the package into a tarball. Default is True.
Raises:
-------
ValueError
If the package_name already exists or if the tarball already exists.
Notes:
------
This method creates a package containing a dataset from the graph, including per atom quantities.
The package consists of a folder named package_name, which contains the dataset and related files.
If compress is True, the package is compressed into a tarball.
The method performs the following steps:
1. Checks if the package_name already exists. If it does, raises a ValueError.
2. If compress is True, checks if the tarball already exists. If it does, raises a ValueError.
3. Creates a folder named package_name.
4. Creates a subfolder named rdf_structure_store within the package folder.
5. Copies the files associated with each sample to the rdf_structure_store folder, while fixing the paths.
6. Updates the paths in the graph to point to the copied files.
7. Writes the dataset to a file named "triples" within the package folder.
8. If compress is True, compresses the package folder into a tarball.
9. Removes the package folder.
"""
# first step make a folder
if os.path.exists(package_name):
raise ValueError(f"{package_name} already exists")
if compress:
if os.path.exists(f"{package_name}.tar.gz"):
raise ValueError(f"{package_name} tarball already exists")
os.mkdir(package_name)
structure_store = f"{package_name}/rdf_structure_store"
os.mkdir(structure_store)
# now go through each sample, and copy the file, at the same time fix the paths
for sample in self.sample_ids:
filepath = self.value(URIRef(f"{sample}_Position"), CMSO.hasPath).toPython()
#filepath has to fixed with the correct prefix as needed
filepath = os.path.join(self.structure_store, os.path.basename(filepath))
shutil.copy(filepath, structure_store)
# now we have to remove the old path, and fix new
for val in ["Position", "Species"]:
self.remove((URIRef(f"{sample}_{val}"), CMSO.hasPath, None))
# assign corrected path
new_relpath = "/".join(["rdf_structure_store", filepath.split("/")[-1]])
self.add(
(
URIRef(f"{sample}_{val}"),
CMSO.hasPath,
Literal(new_relpath, datatype=XSD.string),
)
)
#copy simulation files if needed
if add_simulations:
sim_store = f"{package_name}/simulation_store"
os.mkdir(sim_store)
activities = self.activity_ids
for activity in activities:
path = self.value(activity, CMSO.hasPath)
if path is not None:
newpath = "/".join([sim_store, activity.toPython()])
shutil.copytree(path, newpath)
#remove old path
self.remove((activity, CMSO.hasPath, None))
self.add((activity, CMSO.hasPath, Literal(newpath, datatype=XSD.string)))
triple_file = os.path.join(package_name, "triples")
self.write(triple_file, format=format)
if compress:
with tarfile.open(f"{package_name}.tar.gz", "w:gz") as tar:
tar.add(package_name, arcname=os.path.basename(package_name))
shutil.rmtree(package_name)
[docs]
@classmethod
def unarchive(
cls,
package_name,
compress=True,
store="Memory",
store_file=None,
identifier="http://default_graph",
ontology=None,
):
"""
Unarchives a package and returns an instance of the Graph class.
Parameters
----------
package_name : str
The name of the package to unarchive.
compress : bool, optional
Whether to compress the package. Defaults to True.
store : str, optional
The type of store to use. Defaults to "Memory".
store_file : str, optional
The file to use for the store. Defaults to None.
identifier : str, optional
The identifier for the graph. Defaults to "http://default_graph".
ontology : str, optional
The ontology to use. Defaults to None.
Returns
-------
Graph
An instance of the Graph class.
Raises
------
FileNotFoundError
If the package file is not found.
tarfile.TarError
If there is an error while extracting the package.
"""
if compress:
package_base_name = ".".join(package_name.split(".")[:-2])
with tarfile.open(package_name) as fin:
fin.extractall(".")
# os.remove(package_name)
# copy things out
else:
package_base_name = package_name
return cls(
store=store,
store_file=store_file,
identifier=identifier,
graph_file=f"{package_base_name}/triples",
structure_store=f"{package_base_name}/rdf_structure_store",
ontology=ontology,
)
[docs]
def query(self, inquery, return_df=True):
"""
Query the graph using SPARQL
Parameters
----------
inquery: string
SPARQL query to be executed
return_df: bool, optional
if True, returns the results as a pandas DataFrame. Default is True.
Returns
-------
res: pandas DataFrame
pandas dataframe results
"""
res = self.graph.query(inquery)
if res is not None:
if return_df:
for line in inquery.split("\n"):
if "SELECT DISTINCT" in line:
break
labels = [x[1:] for x in line.split()[2:]]
return pd.DataFrame(res, columns=labels)
else:
return res
raise ValueError("SPARQL query returned None")
[docs]
def auto_query(
self,
source,
destination,
return_query=False,
enforce_types=True,
return_df=True,
):
"""
Automatically generates and executes a query based on the provided parameters.
Parameters
----------
source : OntoTerm
The source of the query.
destination : OntoTerm
The destination of the query.
return_query : bool, optional
If True, returns the generated query instead of executing it. Defaults to False.
enforce_types : bool, optional
If provided, enforces the specified type for the query. Defaults to None.
return_df: bool, optional
if True, returns the results as a pandas DataFrame. Default is True.
Returns
-------
pandas DataFrame or str
The result of the query execution. If `return_query` is True, returns the generated query as a string.
Otherwise, returns the result of the query execution as a pandas DataFrame.
"""
query = self.ontology.create_query(
source, destination, enforce_types=enforce_types
)
if return_query:
return query
res = self.query(query, return_df=return_df)
return res
#################################
# Methods to interact with sample
#################################
[docs]
def query_sample(
self, destination, return_query=False, enforce_types=None
):
"""
Query the knowledge graph for atomic scale samples.
Parameters
----------
destination : OntoTerm
The destination of the query.
return_query : bool, optional
If True, returns the generated query instead of executing it. Defaults to False.
enforce_types : bool, optional
If provided, enforces the specified type for the query. Defaults to None.
Returns
-------
pandas DataFrame or str
The result of the query execution. If `return_query` is True, returns the generated query as a string. Otherwise, returns the result of the query execution as a pandas DataFrame.
"""
return self.auto_query(
self.ontology.terms.cmso.AtomicScaleSample,
destination,
return_query=return_query,
enforce_types=enforce_types,
)
@property
def n_samples(self):
"""
Number of samples in the Graph
"""
return len([x for x in self.triples((None, RDF.type, CMSO.AtomicScaleSample))])
@property
def sample_ids(self):
"""
Returns a list of all Samples in the graph
"""
return [x[0] for x in self.triples((None, RDF.type, CMSO.AtomicScaleSample))]
@property
def sample_files(self):
files = []
for sample_id in self.sample_ids:
filepath = self.value(
URIRef(f"{sample_id}_Position"), CMSO.hasPath
).toPython()
files.append(filepath)
return files
@property
def sample_names(self):
"""
Returns a list of all Sample names in the graph
"""
samples = [x[0] for x in self.triples((None, RDF.type, CMSO.AtomicScaleSample))]
samples_names = []
for sample in samples:
sample_name = self.value(sample, RDFS.label)
if sample_name is not None:
samples_names.append(sample_name.toPython())
else:
samples_names.append(sample.toPython())
return samples_names
@property
def samples(self):
sample_ids = self.sample_ids
sample_names = self.sample_names
sample_objects = []
for ids, name in zip(sample_ids, sample_names):
sample_objects.append(Sample(name, ids, self))
return sample_objects
@property
def activity_ids(self):
"""
Returns a list of all Samples in the graph
"""
return [x[0] for x in self.triples((None, RDF.type, PROV.Activity))]
def _is_of_type(self, item, target_item):
"""
Check if an item is of a specific type
item - direct from graph, comparison only makes sense if it is URIRef
if item is node with https - direct comparison
if not - check the type of the item
target_item - URIRef or OntoTerm
"""
if not self._is_uriref(item):
return False
if self._is_bnode(item):
rdftype = self.value(item, RDF.type)
if rdftype is not None:
rdftype = rdftype.toPython()
else:
rdftype = item.toPython()
target_type = target_item.toPython()
return rdftype == target_type
[docs]
def iterate_graph(self, item, create_new_graph=False, create_new_list=False,
stop_at_sample=False):
"""
Iterate through the graph starting from the given item.
Parameters
----------
item : object
The item to start the iteration from.
create_new_list : bool, optional
If True, create a new list to store extracted triples, this is needed when
calling this function iteratively
stop_at_sample : bool, optional
If True, stops the iteration at the when a sample object is encountered. Default is False.
will only stop if `item` is a sample object
Returns
-------
None
"""
if not type(item).__name__ == 'URIRef':
return
if create_new_list:
self.slist = []
stop_at_sample = stop_at_sample and self._is_of_type(item, CMSO.AtomicScaleSample)
triples = list(self.triples((item, None, None)))
for triple in triples:
if not (stop_at_sample and self._is_of_type(triple[1], PROV.wasDerivedFrom)):
self.slist.append(triple)
self.iterate_graph(triple[2], stop_at_sample=stop_at_sample)
def iterate_and_create_graph(self, item, stop_at_sample=False):
self.iterate_graph(item, create_new_list=True, stop_at_sample=stop_at_sample)
triples = copy.deepcopy(self.slist)
self.slist = []
sgraph = KnowledgeGraph()
for triple in triples:
sgraph.add(triple)
return sgraph
def _create_a_new_name(self, uristring):
"""
take a given uriref string name and create one in similar fashion
"""
raw = uristring.split(':')
if len(raw) > 1:
prologue = raw[0]+':'
else:
prologue = ''
raw = uristring.split('_')
if len(raw) > 1:
epilogue = '_'+"_".join(raw[1:])
else:
epilogue = ''
return f"{prologue}{uuid.uuid4()}{epilogue}"
def iterate_and_rename_triples(self, item):
self.iterate_graph(item, create_new_list=True)
triples = copy.deepcopy(self.slist)
#now we have to edit this triples, and reapply them
#for that we make a dict of all URIRef values in this graph
uri_dict = {}
for triple in triples:
if isinstance(triple[0], URIRef):
if triple[0].toPython() not in uri_dict.keys():
if self._is_bnode(triple[0]):
uri_dict[triple[0].toPython()] = self._create_a_new_name(triple[0].toPython())
else:
uri_dict[triple[0].toPython()] = triple[0].toPython()
new_triples = []
for triple in triples:
subject = triple[0]
if subject.toPython() in uri_dict.keys():
subject = URIRef(uri_dict[subject.toPython()])
predicate = triple[1]
object = triple[2]
if object.toPython() in uri_dict.keys():
object = URIRef(uri_dict[object.toPython()])
new_triples.append((subject, predicate, object))
return URIRef(uri_dict[item.toPython()]), new_triples
[docs]
def copy_defects(self, sample, parent_sample):
"""
Copy defects from one sample to another
"""
parent_material = list([k[2] for k in self.triples((parent_sample, CMSO.hasMaterial, None))])[0]
parent_defects = list([x[2] for x in self.triples((parent_material, CMSO.hasDefect, None))])
material = list([k[2] for k in self.triples((sample, CMSO.hasMaterial, None))])[0]
for defect in parent_defects:
new_defect, defect_triples = self.iterate_and_rename_triples(defect)
#add the new defect to the new material
self.add((material, CMSO.hasDefect, new_defect))
#add the triples to the graph
for triple in defect_triples:
#print(triple)
self.add(triple)
#we need to add special items which are mapped to the sample directly
# now add the special props for vacancy, interstitial &substitional
for triple in self.triples(
(parent_sample, PODO.hasVacancyConcentration, None)
):
self.add((sample, triple[1], triple[2]))
#for triple in self.graph.triples(
# (parent_sample, PODO.hasNumberOfVacancies, None)
#):
# self.graph.add((self.sample, triple[1], triple[2]))
for triple in self.triples(
(parent_sample, PODO.hasImpurityConcentration, None)
):
self.add((sample, triple[1], triple[2]))
#for triple in self.graph.triples(
# (parent_sample, PODO.hasNumberOfImpurityAtoms, None)
#):
# self.graph.add((self.sample, triple[1], triple[2]))
[docs]
def get_sample(self, sample, no_atoms=False, stop_at_sample=True):
"""
Get the Sample as a KnowledgeGraph
Parameters
----------
sample: string
sample id
no_atoms: bool, optional
if True, returns the number of atoms in the sample
stop_at_sample: bool, optional
if True, stops the iteration at the when a sample object is encountered. Default is True.
Returns
-------
sgraph: :py:class:`RDFGraph`
the RDFGraph of the queried sample
na: int, only retured if no_atoms is True
"""
if isinstance(sample, str):
sample = URIRef(sample)
sgraph = self.iterate_and_create_graph(sample, stop_at_sample=stop_at_sample)
if no_atoms:
na = sgraph.value(sample, CMSO.hasNumberOfAtoms).toPython()
return sgraph, na
return sgraph
def get_label(self, item):
label = self.graph.value(item, RDFS.label)
if label is not None:
return label.toPython()
def get_sample_label(self, sample):
label = self.get_label(sample)
return label
def change_label(self, sample, label):
self.graph.remove((sample, RDFS.label, None))
self.graph.add((sample, RDFS.label, Literal(label, datatype=XSD.string)))
[docs]
def get_system_from_sample(self, sample):
"""
Get a pyscal :py:class:`atomrdf.structure.System` from the selected sample
Parameters
----------
sample: string
sample id
Returns
-------
system: :py:class:`atomrdf.structure.System`
corresponding system
"""
simcell = self.value(sample, CMSO.hasSimulationCell)
cell_vectors = [[], [], []]
for s in self.triples((simcell, CMSO.hasVector, None)):
cell_vectors[0].append(self.value(s[2], CMSO.hasComponent_x).toPython())
cell_vectors[1].append(self.value(s[2], CMSO.hasComponent_y).toPython())
cell_vectors[2].append(self.value(s[2], CMSO.hasComponent_z).toPython())
# cell_vectors
filepath = self.value(URIRef(f"{sample}_Position"), CMSO.hasPath).toPython()
position_identifier = self.value(
URIRef(f"{sample}_Position"), CMSO.hasIdentifier
).toPython()
species_identifier = self.value(
URIRef(f"{sample}_Species"), CMSO.hasIdentifier
).toPython()
# open the file for reading
with open(filepath, "r") as fin:
data = json.load(fin)
positions = data[position_identifier]["value"]
species = data[species_identifier]["value"]
atoms = {"positions": positions, "species": species}
at = Atoms()
at.from_dict(atoms)
sys = System()
sys.box = cell_vectors
sys.atoms = at
sys.sample = sample
sys.graph = self
sys._name = sample.toPython().split('sample:')[-1]
return sys
[docs]
def to_file(self, sample, filename=None, format="poscar",
add_sample_id=True,
copy_from=None,
pseudo_files=None):
"""
Save a given sample to a file
Parameters
----------
sample
ID of the sample
filename: string
name of output file
format: string, {"lammps-dump","lammps-data", "poscar", 'cif', 'quantum-espresso'}
or any format supported by ase
copy_from : str, optional
If provided, input options for quantum-espresso format will be copied from
the given file. Structure specific information will be replaced.
Note that the validity of input file is not checked.
pseudo_files : list, optional
if provided, add the pseudopotential filenames to file.
Should be in alphabetical order of chemical species symbols.
Returns
-------
None
"""
if filename is None:
filename = os.path.join(os.getcwd(), "out")
sys = self.get_system_from_sample(sample)
sys.to_file(filename=filename,
format=format,
add_sample_id=add_sample_id,
copy_from=copy_from,
pseudo_files=pseudo_files)
def enable_workflow(self, workflow_object, workflow_environment=None, workflow_module=None):
self.workflow.inform_graph(workflow_object,
workflow_environment=workflow_environment,
workflow_module=workflow_module)
def add_workflow(self, job, workflow_environment=None, workflow_module=None, job_dicts=None,
add_intermediate_jobs=False):
self.workflow.to_graph(job, workflow_environment=workflow_environment,
workflow_module=workflow_module,
job_dicts=job_dicts,
add_intermediate_jobs=add_intermediate_jobs)
def find_property(self, label):
prop_list = list(self.graph.triples((None, RDFS.label, label)))
if len(prop_list) == 0:
raise RuntimeError(f'Property {label} not found in the graph')
prop = prop_list[0][0]
return prop
def get_string_label(self, item):
label = self.get_label(item)
if label is None:
try:
label = str(item.toPython())
except:
label = str(item)
if "activity" in label:
method = self.value(item, ASMO.hasComputationalMethod)
if method is not None:
method_name = self.value(method, RDF.type)
if method_name is not None:
label = method_name.toPython().split("/")[-1]
return label
def _add_to_dict(self, prop, indict):
name = _name(prop)
if name not in indict.keys():
indict[name] = {}
indict[name]['found'] = False
indict[name]['label'] = self.get_string_label(prop)
def _get_ancestor(self, prop, prov):
#note that only one operation and parent are present!
if isinstance(prop, str):
prop = URIRef(prop)
propname = _name(prop)
operation = [x[1] for x in self.triples((prop, ASMO.wasCalculatedBy, None))]
if len(operation) > 0:
parent = [x[2] for x in self.triples((prop, ASMO.wasCalculatedBy, None))]
operation = operation[0]
parent = parent[0]
prov[propname]['operation'] = 'output_parameter'
prov[propname]['inputs'] = {}
prov[propname]['inputs']['0'] = _name(parent)
self._add_to_dict(parent, prov)
prov[_name(parent)]['inputs'] = {}
associated_samples = [x[0] for x in self.triples((None, PROV.wasGeneratedBy, parent))]
for count, sample in enumerate(associated_samples):
prov[_name(parent)]['inputs'][str(count)] = _name(sample)
self._add_to_dict(sample, prov)
prov[_name(parent)]['found'] = True
prov[_name(parent)]['operation'] = 'sample_for_activity'
else:
operation = [x[1] for x in self.triples((None, None, prop))]
parent = [list(self.triples((None, op, prop)))[0][0] for op in operation]
if len(operation) == 0:
prov[propname]['found'] = True
return prov
operation = operation[0]
parent = parent[0]
if operation.toPython() == "http://purls.helmholtz-metadaten.de/asmo/hasInputParameter":
#print(f'we ran 1 for {operation.toPython()}')
prov[propname]['operation'] = 'input_parameter'
prov[propname]['inputs'] = {}
prov[propname]['inputs']['0'] = _name(parent)
self._add_to_dict(parent, prov)
prov[_name(parent)]['inputs'] = {}
associated_samples = [x[0] for x in self.triples((None, PROV.wasGeneratedBy, parent))]
for count, sample in enumerate(associated_samples):
prov[_name(parent)]['inputs'][str(count)] = _name(sample)
self._add_to_dict(sample, prov)
prov[_name(parent)]['found'] = True
prov[_name(parent)]['operation'] = 'sample_for_activity'
elif operation.toPython() == "http://purls.helmholtz-metadaten.de/cmso/hasCalculatedProperty":
#print(f'we ran 2 for {operation.toPython()}')
prov[propname]['operation'] = 'output_parameter'
prov[propname]['inputs'] = {}
prov[propname]['inputs']['0'] = _name(parent)
self._add_to_dict(parent, prov)
prov[_name(parent)]['found'] = True
prov[_name(parent)]['operation'] = 'sample_output'
elif operation == MATH.hasSum:
addends = list(x[2] for x in self.triples((parent, MATH.hasAddend, None)))
prov[propname]['operation'] = 'addition'
prov[propname]['inputs'] = {}
for count, term in enumerate(addends):
prov[propname]['inputs'][f'{count}'] = _name(term)
self._add_to_dict(term, prov)
elif operation == MATH.hasDifference:
minuend = self.value(parent, MATH.hasMinuend)
subtrahend = self.value(parent, MATH.hasSubtrahend)
prov[propname]['operation'] = 'subtraction'
prov[propname]['inputs'] = {}
prov[propname]['inputs']['0'] = _name(minuend)
prov[propname]['inputs']['1'] = _name(subtrahend)
self._add_to_dict(minuend, prov)
self._add_to_dict(subtrahend, prov)
elif operation == MATH.hasProduct:
factors = list(x[2] for x in self.triples((parent, MATH.hasFactor, None)))
prov[propname]['operation'] = 'multiplication'
prov[propname]['inputs'] = {}
for count, term in enumerate(factors):
prov[propname]['inputs'][f'{count}'] = _name(term)
self._add_to_dict(term, prov)
elif operation == MATH.hasQuotient:
divisor = self.value(parent, MATH.hasDivisor)
dividend = self.value(parent, MATH.hasDividend)
prov[propname]['operation'] = 'division'
prov[propname]['inputs'] = {}
prov[propname]['inputs']['0'] = _name(divisor)
prov[propname]['inputs']['1'] = _name(dividend)
self._add_to_dict(divisor, prov)
self._add_to_dict(dividend, prov)
print(operation)
prov[propname]['found'] = True
return prov
def generate_provenance(self, prop=None, label=None, visualize=False):
if (prop is None) and (label is None):
raise ValueError('Either prop or label must be provided')
if prop is None:
prop = self.find_property(label)
name = _name(prop)
prov = {}
self._add_to_dict(prop, prov)
done = False
while not done:
done = True
keys = list(prov.keys())
for prop in keys:
if not prov[prop]['found']:
prov = self._get_ancestor(prop, prov)
done = False
if visualize:
return visualize_provenance(prov)
return prov
def query_structure_from_mp(self, api_key, chemical_system=None, material_ids=None, is_stable=True,
conventional=True,
add_to_graph=True):
docs = amp.query_mp(api_key,
chemical_system = chemical_system,
material_ids = material_ids,
is_stable = is_stable)
structures = []
for doc in docs:
struct = doc['structure']
if conventional:
aseatoms = struct.to_conventional().to_ase_atoms()
else:
aseatoms = struct.to_primitive().to_ase_atoms()
sys = System.read.ase(aseatoms)
symmetry = doc['symmetry']
if add_to_graph:
sys.graph = self
sys.to_graph()
targets = [None, symmetry['symbol'], symmetry['number'], None, None, None]
sys._add_crystal_structure(targets=targets)
#add energy
self.add_calculated_quantity(sys.sample,
'EnergyPerAtom',
doc['energy_per_atom'],
unit='EV')
structures.append(sys)
if len(structures) == 1:
return structures[0]
else:
return structures
[docs]
def update_sample(self, sample, struct):
"""
Take a new system, and update the given sample with it.
Updated properties would be cell, atom positions, species
Parameters
----------
sample: string
sample id
struct: :py:class:`atomrdf.structure.System`
system to be updated
"""
if isinstance(sample, str):
sample = URIRef(sample)
sample_id = sample.toPython()
chemical_species = self.value(sample, CMSO.hasSpecies)
# start by cleanly removing elements
for s in self.triples((chemical_species, CMSO.hasElement, None)):
element = s[2]
self.remove((element, None, None))
self.remove((chemical_species, None, None))
self.remove((sample, CMSO.hasSpecies, None))
# now recalculate and add it again
composition = struct.schema.material.element_ratio()
valid = False
for e, r in composition.items():
if e in element_indetifiers.keys():
valid = True
break
if valid:
chemical_species = self.create_node(
f"{sample_id}_ChemicalSpecies", CMSO.ChemicalSpecies
)
self.add((sample, CMSO.hasSpecies, chemical_species))
for e, r in composition.items():
if e in element_indetifiers.keys():
element = self.create_node(
element_indetifiers[e], CMSO.ChemicalElement
)
self.add((chemical_species, CMSO.hasElement, element))
self.add(
(element, CMSO.hasChemicalSymbol, Literal(e, datatype=XSD.string))
)
self.add(
(
element,
CMSO.hasElementRatio,
Literal(r, datatype=XSD.float),
)
)
# we also have to read in file and clean it up
filepath = self.value(
URIRef(f"{sample_id}_Position"), CMSO.hasPath
).toPython()
position_identifier = self.value(
URIRef(f"{sample_id}_Position"), CMSO.hasIdentifier
).toPython()
species_identifier = self.value(
URIRef(f"{sample_id}_Species"), CMSO.hasIdentifier
).toPython()
# clean up items
datadict = {
position_identifier: {
"value": struct.schema.atom_attribute.position(),
"label": "position",
},
species_identifier: {
"value": struct.schema.atom_attribute.species(),
"label": "species",
},
}
outfile = os.path.join(
self.structure_store, str(sample_id).split(":")[-1]
)
json_io.write_file(outfile, datadict)
#now the only thing that needs to be updated is the cell
simulation_cell = self.value(sample, CMSO.hasSimulationCell)
#readd volume
self.remove((simulation_cell, CMSO.hasVolume, None))
self.add(
(
simulation_cell,
CMSO.hasVolume,
Literal(
np.round(struct.schema.simulation_cell.volume(), decimals=2),
datatype=XSD.float,
),
)
)
#readd number of atoms
self.remove((sample, CMSO.hasNumberOfAtoms, None))
self.add(
(
sample,
CMSO.hasNumberOfAtoms,
Literal(struct.schema.simulation_cell.number_of_atoms(), datatype=XSD.integer),
)
)
#update simulation cell length
simulation_cell_length = self.value(simulation_cell, CMSO.hasLength)
self.remove((simulation_cell_length, None, None))
data = struct.schema.simulation_cell.length()
self.add(
(
simulation_cell_length,
CMSO.hasLength_x,
Literal(data[0], datatype=XSD.float),
)
)
self.add(
(
simulation_cell_length,
CMSO.hasLength_y,
Literal(data[1], datatype=XSD.float),
)
)
self.add(
(
simulation_cell_length,
CMSO.hasLength_z,
Literal(data[2], datatype=XSD.float),
)
)
#simulation cell vectors
simvecs = [x[2] for x in self.triples((simulation_cell, CMSO.hasVector, None))]
for simvec in simvecs:
self.remove((simvec, None, None))
#now re-add
data = struct.schema.simulation_cell.vector()
self.add(
(
simvecs[0],
CMSO.hasComponent_x,
Literal(data[0][0], datatype=XSD.float),
)
)
self.add(
(
simvecs[0],
CMSO.hasComponent_y,
Literal(data[0][1], datatype=XSD.float),
)
)
self.add(
(
simvecs[0],
CMSO.hasComponent_z,
Literal(data[0][2], datatype=XSD.float),
)
)
self.add(
(
simvecs[1],
CMSO.hasComponent_x,
Literal(data[1][0], datatype=XSD.float),
)
)
self.add(
(
simvecs[1],
CMSO.hasComponent_y,
Literal(data[1][1], datatype=XSD.float),
)
)
self.add(
(
simvecs[1],
CMSO.hasComponent_z,
Literal(data[1][2], datatype=XSD.float),
)
)
self.add(
(
simvecs[2],
CMSO.hasComponent_x,
Literal(data[2][0], datatype=XSD.float),
)
)
self.add(
(
simvecs[2],
CMSO.hasComponent_y,
Literal(data[2][1], datatype=XSD.float),
)
)
self.add(
(
simvecs[2],
CMSO.hasComponent_z,
Literal(data[2][2], datatype=XSD.float),
)
)
#angle
simangle = self.value(simulation_cell, CMSO.hasAngle)
self.remove((simangle, None, None))
data = struct.schema.simulation_cell.angle()
self.add(
(
simangle,
CMSO.hasAngle_alpha,
Literal(data[0], datatype=XSD.float),
)
)
self.add(
(
simangle,
CMSO.hasAngle_beta,
Literal(data[1], datatype=XSD.float),
)
)
self.add(
(
simangle,
CMSO.hasAngle_gamma,
Literal(data[2], datatype=XSD.float),
)
)