"""
Graph module contains the basic RDFGraph object in atomrdf. This object gets a structure
as an input and annotates it with the CMSO ontology (PLDO and PODO too as needed). The annotated
object is stored in triplets.
NOTES
-----
- Always add type triples before adding further properties.
Classes
-------
- KnowledgeGraph: Represents a knowledge graph that stores and annotates structure objects.
Attributes
----------
- defstyledict: A dictionary containing default styles for visualizing the graph.
"""
from rdflib import Graph, XSD, RDF, RDFS, BNode, URIRef
import os
import numpy as np
import inspect
from ase.io import write
import copy
import pandas as pd
import yaml
import uuid
import json
import shutil
import tarfile
import logging
import warnings
import re
import pickle
from pyscal3.atoms import Atoms
from atomrdf.visualize import visualize_graph, visualize_provenance
from atomrdf.ontology import read_ontology
import atomrdf.properties as prp
from atomrdf.stores import create_store, purge
import atomrdf.json_io as json_io
import atomrdf.mp as amp
from atomrdf.namespace import (
CMSO,
PLDO,
PODO,
ASMO,
PROV,
CDCO,
Literal,
)
# read element data file
file_location = os.path.dirname(__file__).split("/")
file_location = "/".join(file_location[:-1])
file_location = os.path.join(os.path.dirname(__file__), "data/element.yml")
with open(file_location, "r") as fin:
element_indetifiers = yaml.safe_load(fin)
defstyledict = {
"edgecolor": "#263238",
"BNode": {
"color": "#D9D9D9",
"shape": "box",
"style": "filled",
"fontsize": "8",
"fontname": "Helvetica",
},
"URIRef": {
"color": "#C9DAF8",
"shape": "box",
"style": "filled",
"fontsize": "8",
"fontname": "Helvetica",
},
"Literal": {
"color": "#E6B8AF",
"shape": "parallelogram",
"style": "filled",
"fontsize": "8",
"fontname": "Helvetica",
},
}
def _clean_string(input_string):
input_string = re.sub(r"\W", "_", input_string)
if input_string[0].isdigit():
input_string = "s" + input_string
return input_string
def _replace_keys(refdict, indict):
for key, val in indict.items():
if key in refdict.keys():
if isinstance(val, dict):
_replace_keys(refdict[key], indict[key])
else:
refdict[key] = val
return refdict
def _dummy_log(str):
pass
def _name(term):
try:
return str(term.toPython())
except:
return str(term)
def _prepare_log(file):
logger = logging.getLogger(__name__)
handler = logging.FileHandler(file)
formatter = logging.Formatter("%(asctime)s %(name)-12s %(levelname)-8s %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.propagate = False
return logger
[docs]
class KnowledgeGraph:
"""
Represents a knowledge graph.
Parameters
----------
graph_file : str, optional
The path to the graph file to be parsed. Default is None.
store : str, optional
The type of store to use. Default is "Memory".
store_file : str, optional
The path to the store file. Default is None.
identifier : str, optional
The identifier for the graph. Default is "http://default_graph".
ontology : Ontology, optional
The ontology object to be used. Default is None.
structure_store : StructureStore, optional
The structure store object to be used. Default is None.
enable_log : bool, optional
Whether to enable logging. Default is False.
If true, a log file named atomrdf.log will be created in the current working directory.
Attributes
----------
graph : rdflib.Graph
The RDF graph.
sgraph : rdflib.Graph
The structure graph for a single chosen sample
ontology : Ontology
The ontology object.
terms : dict
The dictionary of ontology terms.
store : str
The type of store used.
Methods
-------
add(triple, validate=True)
Add a triple to the knowledge graph.
triples(triple)
Return the triples in the knowledge graph that match the given triple pattern.
query(source, destinations=None, return_df=True, num_paths=1, limit=None)
Execute a SPARQL query on the knowledge graph using tools4RDF.
get_sample_as_structure(sample_id)
Retrieve a sample from the graph as an AtomicScaleSample object.
"""
def __init__(
self,
graph_file=None,
store="Memory",
store_file=None,
identifier="http://default_graph",
ontology=None,
structure_store=None,
enable_log=False,
):
"""
Initialize the KnowledgeGraph object.
Parameters
----------
graph_file : str, optional
The path to the graph file to be parsed. Default is None.
store : str, optional
The type of store to use. Default is "Memory".
store_file : str, optional
The path to the store file. Default is None.
identifier : str, optional
The identifier for the graph. Default is "http://default_graph".
ontology : Ontology, optional
The ontology object to be used. Default is None.
structure_store : StructureStore, optional
The structure store object to be used. Default is None.
enable_log : bool, optional
Whether to enable logging. Default is False.
"""
create_store(
self,
store,
identifier,
store_file=store_file,
structure_store=structure_store,
)
self._store = store
self._identifier = identifier
self._store_file = store_file
# enable logging
if enable_log:
logger = _prepare_log(os.path.join(os.getcwd(), "atomrdf.log"))
self.log = logger.info
else:
self.log = _dummy_log
# start binding
self.graph.bind("cmso", CMSO)
self.graph.bind("pldo", PLDO)
if graph_file is not None:
if os.path.exists(graph_file):
self.graph.parse(graph_file)
self.sgraph = None
if ontology is None:
try:
ontology = read_ontology()
except Exception:
ontology = None
self.ontology = ontology
self.terms = self.ontology.terms if self.ontology is not None else None
self.store = store
self._n_triples = 0
self.persistent_members = {
"MolecularStatics": None,
"MolecularDynamics": None,
"DensityFunctionalTheory": None,
}
[docs]
def purge(self, force=False):
"""
Remove all information from the KnowledgeGraph.
Parameters
----------
force : bool, optional
Whether to proceed with purging the graph. Default is False.
Returns
-------
None
Notes
-----
This method removes all information from the KnowledgeGraph. If the `force` parameter is set to False, a warning is issued before proceeding with the purging.
"""
if not force:
warnings.warn(
"This will remove all information from the KnowledgeGraph. Call with force=True to proceed."
)
return
else:
# Clean up structure store files referenced by this graph
# Query for all files referenced via CMSO.hasPath
file_paths = set()
for triple in self.triples((None, CMSO.hasPath, None)):
filepath = triple[2].toPython()
if filepath:
full_path = os.path.join(
self.structure_store, os.path.basename(filepath)
)
if os.path.exists(full_path):
file_paths.add(full_path)
# Remove the files
for filepath in file_paths:
os.remove(filepath)
# Close the current store before destroying and recreating it
# (required for file-backed stores like Oxigraph that use a lock file)
try:
self.graph.close()
except Exception:
pass
graph = purge(self._store, self._identifier, self._store_file)
self.graph = graph
self._n_triples = 0
def _is_valid(self, input_list):
valid = False
flat_list = []
for x in input_list:
if isinstance(x, list):
flat_list.extend(x)
else:
flat_list.append(x)
for x in flat_list:
if x is not None:
valid = True
break
return valid
[docs]
def add(self, triple):
"""
Add a triple to the knowledge graph.
Parameters
----------
triple : tuple
The triple to be added in the form (subject, predicate, object).
"""
self.log(f"attempting to add triple: {self._n_triples}")
self.log(f"- {triple[0].toPython()}")
self.log(f"- {triple[1].toPython()}")
self.log(f"- {triple[2].toPython()}")
if str(triple[2].toPython()) == "None":
self.log(f"rejecting None valued triple")
return
self.graph.add(triple)
self._n_triples += 1
self.log("added")
[docs]
def triples(self, triple):
"""
Return the triples in the knowledge graph that match the given triple pattern.
Parameters
----------
triple : tuple
The triple pattern to match in the form (subject, predicate, object).
Returns
-------
generator
A generator that yields the matching triples.
"""
return self.graph.triples(triple)
[docs]
def value(self, arg1, arg2):
"""
Get the value of a triple in the knowledge graph.
Parameters
----------
arg1 : object
The subject of the triple.
arg2 : object
The predicate of the triple.
Returns
-------
object or None
The value of the triple if it exists, otherwise None.
Notes
-----
This method retrieves the value of a triple in the knowledge graph. The triple is specified by providing the subject and predicate as arguments.
If the triple exists in the graph, the corresponding value is returned. If the triple does not exist, None is returned.
Examples
--------
>>> graph = KnowledgeGraph()
>>> graph.add(("Alice", "likes", "Bob"))
>>> value = graph.value("Alice", "likes")
>>> print(value)
Bob
"""
return self.graph.value(arg1, arg2)
def objects(self, arg1, arg2):
return self.graph.objects(arg1, arg2)
[docs]
def query(self, source, destinations=None, return_df=True, num_paths=1, limit=None):
"""
Execute a SPARQL query on the knowledge graph.
This method supports two query modes:
1. Raw SPARQL query strings (passed as source parameter)
2. Ontology-based queries using tools4RDF (source as OntoTerm)
Parameters
----------
source : str or OntoTerm
If str: Raw SPARQL query string to execute directly.
If OntoTerm: The source ontology term from which paths are to be queried.
Access terms via self.ontology.terms (e.g., self.ontology.terms.cmso.AtomicScaleSample).
destinations : list of OntoTerm or OntoTerm, optional
One or more destination ontology terms to which paths are to be queried.
Can be a single term or a list of terms. If None, all properties of the source are returned.
Only used when source is an OntoTerm.
return_df : bool, default=True
If True, returns results as a pandas DataFrame. Otherwise, returns raw query results.
num_paths : int, default=1
The number of paths to retrieve for each query when multiple paths exist.
Only used when source is an OntoTerm.
limit : int, optional
The maximum number of results to return. If None, no limit is applied.
Only used when source is an OntoTerm.
Returns
-------
pandas.DataFrame or list or None
If return_df is True, returns a pandas DataFrame with query results.
If return_df is False, returns a list of query results.
Returns None if no results are found.
Examples
--------
Query with raw SPARQL string:
>>> query = '''
... PREFIX cmso: <http://purls.helmholtz-metadaten.de/cmso/>
... SELECT DISTINCT ?symbol
... WHERE {
... ?sample cmso:hasNumberOfAtoms ?number .
... ?sample cmso:hasMaterial ?material .
... ?material cmso:hasStructure ?structure .
... ?structure cmso:hasSpaceGroupSymbol ?symbol .
... FILTER (?number="4"^^xsd:integer)
... }'''
>>> df = kg.query(query)
Query for all AtomicScaleSamples with their space group symbols:
>>> kg = KnowledgeGraph()
>>> df = kg.query(
... kg.ontology.terms.cmso.AtomicScaleSample,
... [kg.ontology.terms.cmso.hasSpaceGroupSymbol]
... )
Query with filters (using == operator on terms):
>>> df = kg.query(
... kg.ontology.terms.cmso.AtomicScaleSample,
... [kg.ontology.terms.cmso.hasNumberOfAtoms == 4]
... )
Notes
-----
When using ontology terms, this method uses tools4RDF to automatically generate
SPARQL queries based on the ontology structure. It handles namespace management,
path finding between ontology terms, and result formatting automatically.
"""
# Check if source is a raw SPARQL query string
if isinstance(source, str):
res = self.graph.query(source)
if res is not None and return_df:
# Extract column names from SELECT clause
import re
select_match = re.search(
r"SELECT\s+(?:DISTINCT\s+)?(.+?)\s+WHERE",
source,
re.IGNORECASE | re.DOTALL,
)
if select_match:
# Extract variable names (anything starting with ?)
variables = re.findall(r"\?(\w+)", select_match.group(1))
return pd.DataFrame(res, columns=variables)
else:
# Fallback: return as DataFrame without column names
return pd.DataFrame(res)
return res
else:
# Use tools4RDF for ontology-based queries
if self.ontology is None:
raise ValueError(
"Ontology not loaded (network unavailable). "
"Use a SPARQL string query instead."
)
return self.ontology.query(
self.graph,
source,
destinations=destinations,
return_df=return_df,
num_paths=num_paths,
limit=limit,
)
[docs]
def remove(self, triple):
"""
Remove a triple from the knowledge graph.
Parameters
----------
triple : tuple
The triple to be removed in the form (subject, predicate, object).
Returns
-------
None
Notes
-----
This method removes a triple from the knowledge graph. The triple should be provided as a tuple in the form (subject, predicate, object).
Examples
--------
>>> graph = KnowledgeGraph()
>>> graph.add(("Alice", "likes", "Bob"))
>>> graph.remove(("Alice", "likes", "Bob"))
"""
return self.graph.remove(triple)
[docs]
def create_node(self, namestring, classtype, label=None):
"""
Create a new node in the graph.
Parameters
----------
namestring : str
The name of the node.
classtype : Object from a given ontology
The class type of the node.
Returns
-------
URIRef
The newly created node.
"""
item = URIRef(namestring)
self.add((item, RDF.type, classtype))
if label is not None:
self.add((item, RDFS.label, Literal(_clean_string(label))))
return item
[docs]
def visualize(self, *args, **kwargs):
"""
Visualizes the graph using the specified arguments.
This method is a wrapper around the `visualise` method and passes the same arguments to it.
Parameters
----------
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns
-------
dot: The visualization of the RDF tree.
"""
self.visualise(*args, **kwargs)
[docs]
def visualise(
self,
styledict=None,
rankdir="BT",
hide_types=False,
workflow_view=False,
sample_view=False,
size=None,
layout="neato",
):
"""
Visualize the RDF tree of the Graph.
Parameters
----------
styledict : dict, optional
If provided, allows customization of color and other properties.
rankdir : str, optional
The direction of the graph layout. Default is "BT" (bottom to top).
hide_types : bool, optional
Whether to hide the types in the visualization. Default is False.
workflow_view : bool, optional
Whether to enable the workflow view. Default is False.
sample_view : bool, optional
Whether to enable the sample view. Default is False.
size : tuple, optional
The size of the visualization. Default is None.
layout : str, optional
The name of the layout algorithm for the graph. Default is "neato".
Returns
-------
graphviz.dot.Digraph
The visualization of the RDF tree.
Notes
-----
The `styledict` parameter allows customization of the visualization style.
It has the following options:
BNode:
color : str
The color of the BNode boxes.
shape : str
The shape of the BNode boxes.
style : str
The style of the BNode boxes.
URIRef:
color : str
The color of the URIRef boxes.
shape : str
The shape of the URIRef boxes.
style : str
The style of the URIRef boxes.
Literal:
color : str
The color of the Literal boxes.
shape : str
The shape of the Literal boxes.
style : str
The style of the Literal boxes.
"""
if size is not None:
size = f"{size[0]},{size[1]}"
sdict = defstyledict.copy()
if styledict is not None:
sdict = _replace_keys(sdict, styledict)
return visualize_graph(
self.graph,
styledict=sdict,
rankdir=rankdir,
hide_types=hide_types,
workflow_view=workflow_view,
sample_view=sample_view,
size=size,
layout=layout,
)
[docs]
def write(self, filename, format="json-ld"):
"""
Write the serialised version of the graph to a file
Parameters
----------
filename: string
name of output file
format: string, {'turtle', 'xml', 'json-ld', 'ntriples', 'n3'}
output format to be written to
Returns
-------
None
"""
with open(filename, "w") as fout:
fout.write(self.graph.serialize(format=format))
[docs]
def close(self, filename, format="json-ld"):
"""
Close the graph and write to a file
Parameters
----------
filename: string
name of output file
Returns
-------
None
"""
self.write(filename, format=format)
[docs]
def close_store(self):
"""
Release the underlying store (close file handles and locks).
This is a no-op for the in-memory store. For file-backed stores
(Oxigraph, SQLAlchemy) it releases the file lock so the same store
directory can be reopened in the same process or by another process.
Returns
-------
None
"""
self.graph.close()
def __del__(self):
try:
self.graph.close()
except Exception:
pass
[docs]
def archive(
self, package_name, format="turtle", compress=True, add_simulations=False
):
"""
Publish a dataset from graph including per atom quantities.
Parameters:
-----------
package_name : str
The name of the package to be created.
format : str, optional
The format in which the dataset should be written. Default is "turtle".
compress : bool, optional
Whether to compress the package into a tarball. Default is True.
Raises:
-------
ValueError
If the package_name already exists or if the tarball already exists.
Notes:
------
This method creates a package containing a dataset from the graph, including per atom quantities.
The package consists of a folder named package_name, which contains the dataset and related files.
If compress is True, the package is compressed into a tarball.
The method performs the following steps:
1. Checks if the package_name already exists. If it does, raises a ValueError.
2. If compress is True, checks if the tarball already exists. If it does, raises a ValueError.
3. Creates a folder named package_name.
4. Creates a subfolder named rdf_structure_store within the package folder.
5. Copies the files associated with each sample to the rdf_structure_store folder, while fixing the paths.
6. Updates the paths in the graph to point to the copied files.
7. Writes the dataset to a file named "triples" within the package folder.
8. If compress is True, compresses the package folder into a tarball.
9. Removes the package folder.
"""
# first step make a folder
if os.path.exists(package_name):
raise ValueError(f"{package_name} already exists")
if compress:
if os.path.exists(f"{package_name}.tar.gz"):
raise ValueError(f"{package_name} tarball already exists")
os.mkdir(package_name)
structure_store = f"{package_name}/rdf_structure_store"
os.mkdir(structure_store)
# now go through each sample, and copy the file, at the same time fix the paths
copied_basenames = set()
# First, attempt to copy the canonical sample files as before (best-effort)
for sample in self.sample_ids:
filepath = self.value(URIRef(f"{sample}_Position"), CMSO.hasPath)
if filepath is None:
continue
filepath = filepath.toPython()
# filepath has to be fixed with the correct prefix as needed
srcpath = os.path.join(self.structure_store, os.path.basename(filepath))
if os.path.exists(srcpath):
if os.path.basename(filepath) not in copied_basenames:
shutil.copy(srcpath, structure_store)
copied_basenames.add(os.path.basename(filepath))
# now we have to remove the old path, and fix new for Position/Species nodes
for val in ["Position", "Species"]:
self.remove((URIRef(f"{sample}_{val}"), CMSO.hasPath, None))
# assign corrected path
new_relpath = "/".join(
["rdf_structure_store", os.path.basename(filepath)]
)
self.add(
(
URIRef(f"{sample}_{val}"),
CMSO.hasPath,
Literal(new_relpath, datatype=XSD.string),
)
)
# Additionally, copy any other files referenced by CMSO.hasPath (e.g. calculated-property files)
for subj, pred, obj in list(self.triples((None, CMSO.hasPath, None))):
try:
path = obj.toPython()
except Exception:
continue
basename = os.path.basename(path)
src = os.path.join(self.structure_store, basename)
if not os.path.exists(src):
# nothing to copy for this path
continue
if basename in copied_basenames:
# already copied
# still ensure triple points to rdf_structure_store
self.remove((subj, CMSO.hasPath, None))
new_relpath = "/".join(["rdf_structure_store", basename])
self.add(
(subj, CMSO.hasPath, Literal(new_relpath, datatype=XSD.string))
)
continue
shutil.copy(src, structure_store)
copied_basenames.add(basename)
# replace path triple with corrected relative path inside package
self.remove((subj, CMSO.hasPath, None))
new_relpath = "/".join(["rdf_structure_store", basename])
self.add((subj, CMSO.hasPath, Literal(new_relpath, datatype=XSD.string)))
# copy simulation files if needed
if add_simulations:
sim_store = f"{package_name}/simulation_store"
os.mkdir(sim_store)
activities = self.activity_ids
for activity in activities:
path = self.value(activity, CMSO.hasPath)
if path is not None:
newpath = "/".join([sim_store, activity.toPython()])
shutil.copytree(path, newpath)
# remove old path
self.remove((activity, CMSO.hasPath, None))
self.add(
(activity, CMSO.hasPath, Literal(newpath, datatype=XSD.string))
)
triple_file = os.path.join(package_name, "triples")
self.write(triple_file, format=format)
if compress:
with tarfile.open(f"{package_name}.tar.gz", "w:gz") as tar:
tar.add(package_name, arcname=os.path.basename(package_name))
shutil.rmtree(package_name)
[docs]
@classmethod
def unarchive(
cls,
package_name,
compress=True,
store="Memory",
store_file=None,
identifier="http://default_graph",
ontology=None,
):
"""
Unarchives a package and returns an instance of the Graph class.
Parameters
----------
package_name : str
The name of the package to unarchive.
compress : bool, optional
Whether to compress the package. Defaults to True.
store : str, optional
The type of store to use. Defaults to "Memory".
store_file : str, optional
The file to use for the store. Defaults to None.
identifier : str, optional
The identifier for the graph. Defaults to "http://default_graph".
ontology : str, optional
The ontology to use. Defaults to None.
Returns
-------
Graph
An instance of the Graph class.
Raises
------
FileNotFoundError
If the package file is not found.
tarfile.TarError
If there is an error while extracting the package.
"""
if compress:
with tarfile.open(package_name) as fin:
# Derive the top-level directory from the archive members
# instead of string-splitting the path (which breaks with
# absolute paths like /full/path/dataset.tar.gz).
top_dirs = {m.name.split("/")[0] for m in fin.getmembers()}
if len(top_dirs) == 1:
package_base_name = top_dirs.pop()
else:
# Fallback: strip extensions from the basename
package_base_name = os.path.basename(package_name)
for ext in (".tar.gz", ".tar.bz2", ".tar"):
if package_base_name.endswith(ext):
package_base_name = package_base_name[: -len(ext)]
break
fin.extractall(".")
# os.remove(package_name)
# copy things out
else:
package_base_name = package_name
return cls(
store=store,
store_file=store_file,
identifier=identifier,
graph_file=f"{package_base_name}/triples",
structure_store=f"{package_base_name}/rdf_structure_store",
ontology=ontology,
)
[docs]
def merge_archive(self, package_name, compress=True, format="turtle"):
"""
Merge an archived dataset into this KnowledgeGraph.
Unlike ``unarchive`` (which creates a new graph), this method loads
the triples and structure-store files from an existing archive into
the *current* graph so that multiple datasets can be combined
incrementally::
kg = KnowledgeGraph()
kg.merge_archive("dataset_1_GB.tar.gz")
kg.merge_archive("dataset_2_GB.tar.gz")
# kg now contains both datasets
Parameters
----------
package_name : str
Path to the archive. When *compress* is True (default) this
should be a ``.tar.gz`` file; otherwise the name of an already-
extracted directory.
compress : bool, optional
Whether *package_name* is a compressed tarball. Default True.
format : str, optional
RDF serialisation format of the ``triples`` file inside the
archive. Default ``"turtle"``.
Notes
-----
* Structure-store JSON files from the archive are copied into
``self.structure_store``. UUID-based filenames make collisions
extremely unlikely; a warning is emitted if a file already exists
and it is silently skipped (the existing copy wins).
* After parsing, every ``CMSO.hasPath`` triple that still references
the archive-internal ``rdf_structure_store/`` prefix is rewritten
to point at ``self.structure_store``.
"""
# --- 1. Extract if compressed ----------------------------------------
if compress:
with tarfile.open(package_name) as fin:
# The archive's top-level directory name is used as package_base.
# extractall(".") puts it relative to cwd, so we must derive
# package_base from the archive member names rather than the
# (possibly absolute) package_name path.
top_dirs = {m.name.split("/")[0] for m in fin.getmembers()}
fin.extractall(".")
if len(top_dirs) == 1:
package_base = top_dirs.pop()
else:
# fallback: strip .tar.gz from basename
package_base = os.path.basename(package_name)
for suffix in (".tar.gz", ".tgz"):
if package_base.endswith(suffix):
package_base = package_base[: -len(suffix)]
break
else:
package_base = package_name
triple_file = os.path.join(package_base, "triples")
archive_ss = os.path.join(package_base, "rdf_structure_store")
# --- 2. Copy structure-store files -----------------------------------
if os.path.isdir(archive_ss):
for fname in os.listdir(archive_ss):
src = os.path.join(archive_ss, fname)
dst = os.path.join(self.structure_store, fname)
if os.path.exists(dst):
warnings.warn(
f"merge_archive: '{fname}' already exists in "
f"structure store — skipping (existing copy kept)",
UserWarning,
stacklevel=2,
)
continue
shutil.copy(src, dst)
# --- 3. Parse triples ------------------------------------------------
if os.path.exists(triple_file):
self.graph.parse(triple_file, format=format)
# --- 4. Rewrite CMSO.hasPath to point at self.structure_store --------
archive_prefix = "rdf_structure_store/"
for subj, pred, obj in list(self.triples((None, CMSO.hasPath, None))):
try:
path = obj.toPython()
except Exception:
continue
if not isinstance(path, str):
continue
basename = os.path.basename(path)
# Rewrite any path whose basename exists in self.structure_store
resolved = os.path.join(self.structure_store, basename)
if os.path.exists(resolved):
new_relpath = os.path.relpath(resolved, os.getcwd())
if path != new_relpath:
self.remove((subj, CMSO.hasPath, obj))
self.add(
(
subj,
CMSO.hasPath,
Literal(new_relpath, datatype=XSD.string),
)
)
# --- 5. Clean up extracted folder ------------------------------------
if compress and os.path.isdir(package_base):
shutil.rmtree(package_base)
@property
def n_samples(self):
"""
Number of samples in the Graph
"""
return len([x for x in self.triples((None, RDF.type, CMSO.AtomicScaleSample))])
@property
def sample_ids(self):
"""
Returns a list of all Samples in the graph
"""
return [x[0] for x in self.triples((None, RDF.type, CMSO.AtomicScaleSample))]
@property
def sample_names(self):
"""
Returns a list of all Sample names in the graph.
"""
samples = [x[0] for x in self.triples((None, RDF.type, CMSO.AtomicScaleSample))]
samples_names = []
for sample in samples:
sample_name = self.value(sample, RDFS.label)
if sample_name is not None:
samples_names.append(sample_name.toPython())
else:
samples_names.append(sample.toPython())
return samples_names
def _is_of_type(self, item, target_item):
"""
Check if an item is of a specific type
item - direct from graph, comparison only makes sense if it is URIRef
if item is node with https - direct comparison
if not - check the type of the item
target_item - URIRef or OntoTerm
"""
if not self._is_uriref(item):
return False
if self._is_bnode(item):
rdftype = self.value(item, RDF.type)
if rdftype is not None:
rdftype = rdftype.toPython()
else:
rdftype = item.toPython()
target_type = target_item.toPython()
return rdftype == target_type
[docs]
def get_sample_as_structure(self, sample_id):
"""
Retrieve a sample from the graph as an AtomicScaleSample object.
Parameters
----------
sample_id : str or URIRef
The ID of the sample to retrieve
Returns
-------
AtomicScaleSample
The sample as an AtomicScaleSample pydantic object
Examples
--------
>>> kg = KnowledgeGraph()
>>> sample = kg.get_sample_as_structure('sample:123')
>>> atoms = sample.to_structure() # Convert to ASE Atoms
>>> sample.to_file('output.lmp', format='lammps-dump')
"""
from atomrdf.datamodels.structure import AtomicScaleSample
if isinstance(sample_id, str):
sample_id = (
sample_id if sample_id.startswith("sample:") else f"sample:{sample_id}"
)
return AtomicScaleSample.from_graph(self, sample_id)
[docs]
def to_file(
self,
sample,
filename,
format="lammps-data",
copy_from=None,
pseudo_files=None,
):
"""
Write a sample structure to a file.
Parameters
----------
sample : str or URIRef
Sample ID
filename : str
Name of the output file
format : str, optional
Format of the output file. Default is 'lammps-data'.
Any format supported by ASE can be used.
copy_from : str, optional
If provided, input options for quantum-espresso format will be copied from
the given file. Structure specific information will be replaced.
Note that the validity of input file is not checked.
pseudo_files : list, optional
If provided, add the pseudopotential filenames to file.
Should be in alphabetical order of chemical species symbols.
Returns
-------
None
Examples
--------
>>> kg = KnowledgeGraph()
>>> kg.to_file('sample:123', 'output.lmp', 'lammps-data')
>>> kg.to_file('sample:456', 'POSCAR', 'vasp')
"""
sample_obj = self.get_sample_as_structure(sample)
sample_obj.to_file(
outfile=filename,
format=format,
copy_from=copy_from,
pseudo_files=pseudo_files,
)
def get_label(self, item):
label = self.graph.value(item, RDFS.label)
if label is not None:
return label.toPython()
def get_string_label(self, item):
label = self.get_label(item)
if label is None:
try:
label = str(item.toPython())
except:
label = str(item)
if "simulation" in label:
method = self.value(item, ASMO.hasComputationalMethod)
if method is not None:
method_name = self.value(method, RDF.type)
if method_name is not None:
label = method_name.toPython().split("/")[-1]
return label