Source code for rdfsolve.parser

"""VoID (Vocabulary of Interlinked Datasets) Parser.

Parses an in-memory VoID RDF graph and converts the embedded schema
to various downstream formats (JSON-LD, LinkML, SHACL, RDF-config,
DataFrame).
"""

import logging
from typing import Any, cast

import pandas as pd
from linkml_runtime.linkml_model import SchemaDefinition
from rdflib import Graph, Literal, URIRef

# Create logger with NullHandler by default , no output unless user configures
logger = logging.getLogger(__name__)
if not logger.handlers:
    logger.addHandler(logging.NullHandler())


[docs] class VoidParser: """Parser for VoID (Vocabulary of Interlinked Datasets) files.""" def __init__( self, void_source: str | Graph | None = None, graph_uris: str | list[str] | None = None, exclude_graphs: bool = True, ): """Initialize the VoID parser. Args: void_source: File path (str) or RDF Graph object graph_uris: Graph URI(s) to analyze, or None for all non-system graphs exclude_graphs: Exclude Virtuoso system graphs """ self.void_file_path: str | None = None self.graph: Graph = Graph() self.schema_triples: list[Any] = [] self.classes: dict[str, Any] = {} self.properties: dict[str, Any] = {} self.graph_uris = self._normalize_graph_uris(graph_uris) self.exclude_graphs = exclude_graphs self.exclude_graph_patterns: list[str] | None = None # VoID namespace URIs self.void_class = URIRef("http://rdfs.org/ns/void#class") self.void_property = URIRef("http://rdfs.org/ns/void#property") self.void_propertyPartition = URIRef("http://rdfs.org/ns/void#propertyPartition") self.void_classPartition = URIRef("http://rdfs.org/ns/void#classPartition") self.void_datatypePartition = URIRef("http://ldf.fi/void-ext#datatypePartition") # Bind common namespace prefixes self.void_ns = "http://rdfs.org/ns/void#" self.void_ext_ns = "http://ldf.fi/void-ext#" # Extended VoID properties for schema self.void_subjectClass = URIRef("http://ldf.fi/void-ext#subjectClass") self.void_objectClass = URIRef("http://ldf.fi/void-ext#objectClass") if void_source: if isinstance(void_source, str): self.void_file_path = void_source self._load_graph() elif isinstance(void_source, Graph): self.graph = void_source def _normalize_graph_uris(self, graph_uris: str | list[str] | None) -> list[str] | None: """Normalize graph URIs input to a list.""" if graph_uris is None: return None elif isinstance(graph_uris, str): return [graph_uris] elif isinstance(graph_uris, list): return graph_uris else: raise ValueError("graph_uris must be str, list of str, or None") def _load_graph(self) -> None: """Load the VoID file into an RDF graph.""" self.graph.parse(self.void_file_path, format="turtle") def _extract_classes(self) -> None: """Extract class information from VoID description.""" self.classes = {} for s, _p, o in self.graph.triples((None, self.void_class, None)): self.classes[s] = o def _extract_properties(self) -> None: """Extract property information from VoID description.""" self.properties = {} for s, _p, o in self.graph.triples((None, self.void_property, None)): self.properties[s] = o def _extract_schema_triples(self) -> None: """Extract schema triples by analyzing property partitions.""" self.schema_triples = [] # Try new ty extraction first (with subjectClass/objectClass) triples = self._extract_schema() if triples: self.schema_triples = triples return def _extract_schema(self) -> list[Any]: """Extract schema from property partitions with type info.""" triples: list[Any] = [] # Find all property partitions with subject/object class info for partition, _, property_uri in self.graph.triples((None, self.void_property, None)): # Get subject class subject_classes = list(self.graph.triples((partition, self.void_subjectClass, None))) # Get object class object_classes = list(self.graph.triples((partition, self.void_objectClass, None))) if subject_classes and object_classes: for _, _, subject_class in subject_classes: for _, _, object_class in object_classes: triples.append((subject_class, property_uri, object_class)) elif subject_classes: # Check for datatype partitions (literal objects) datatype_partitions = list( self.graph.triples((partition, self.void_datatypePartition, None)) ) if datatype_partitions: for _, _, subject_class in subject_classes: triples.append((subject_class, property_uri, "Literal")) else: # No explicit datatype or object class - assume Resource for _, _, subject_class in subject_classes: triples.append((subject_class, property_uri, "Resource")) return triples def _filter_void_admin_nodes(self, df: pd.DataFrame) -> pd.DataFrame: """Filter out VoID-related triples.""" mask = ( ~df["subject_uri"].str.contains("void", case=False, na=False) & ~df["property_uri"].str.contains("void", case=False, na=False) & ~df["object_uri"].str.contains("void", case=False, na=False) & ~df["subject_uri"].str.contains("well-known", case=False, na=False) & ~df["property_uri"].str.contains("well-known", case=False, na=False) & ~df["object_uri"].str.contains("well-known", case=False, na=False) & ~df["subject_uri"].str.contains("openlink", case=False, na=False) & ~df["property_uri"].str.contains("openlink", case=False, na=False) & ~df["object_uri"].str.contains("openlink", case=False, na=False) ) return df[mask].copy() def _extract_about_metadata( self, endpoint_url: str | None = None, dataset_name: str | None = None, graph_uris: list[str] | None = None, ) -> dict[str, Any]: """Extract metadata from the VoID graph for the @about section. Pulls metadata from the VoID graph (endpoint, title, graph URIs) and merges with any explicitly provided values. Args: endpoint_url: SPARQL endpoint URL (overrides graph value) dataset_name: Dataset name (overrides graph value) graph_uris: Graph URIs (overrides graph value) Returns: Dictionary with metadata for the @about section """ from datetime import datetime, timezone from rdfsolve.version import VERSION about: dict[str, Any] = { "generatedBy": f"rdfsolve {VERSION}", "generatedAt": datetime.now(timezone.utc).isoformat(), } # Try to extract metadata from the VoID graph void_dataset_type = URIRef("http://rdfs.org/ns/void#Dataset") void_sparql_endpoint = URIRef("http://rdfs.org/ns/void#sparqlEndpoint") dcterms_title = URIRef("http://purl.org/dc/terms/title") graph_endpoint = None graph_title = None graph_graph_uris: list[str] = [] for s, p, o in self.graph: if ( p == URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type") and o == void_dataset_type ): # Found a void:Dataset - extract its properties for _, pred, obj in self.graph.triples((s, None, None)): if pred == void_sparql_endpoint: graph_endpoint = str(obj) elif pred == dcterms_title: graph_title = str(obj) # Collect graph URIs from the parser if self.graph_uris: graph_graph_uris = list(self.graph_uris) # Use explicit values, fall back to graph values if endpoint_url: about["endpoint"] = endpoint_url elif graph_endpoint: about["endpoint"] = graph_endpoint if dataset_name: about["datasetName"] = dataset_name elif graph_title: about["datasetName"] = graph_title effective_graph_uris = graph_uris if graph_uris else graph_graph_uris if effective_graph_uris: about["graphURIs"] = effective_graph_uris if self.void_file_path: about["voidFile"] = self.void_file_path about["tripleCount"] = len(self.schema_triples) if self.schema_triples else 0 return about
[docs] def to_jsonld( self, filter_void_admin_nodes: bool = True, endpoint_url: str | None = None, dataset_name: str | None = None, graph_uris: list[str] | None = None, ) -> dict[str, Any]: """ Parse VoID file and return simple JSON-LD with the schema triples. Args: filter_void_admin_nodes: Whether to filter out VoID-specific nodes endpoint_url: SPARQL endpoint URL for the @about section dataset_name: Dataset name for the @about section graph_uris: Graph URIs for the @about section Returns: Simple JSON-LD with @context, @graph, and @about sections """ # Extract schema triples self._extract_schema_triples() if not self.schema_triples: about = self._extract_about_metadata( endpoint_url=endpoint_url, dataset_name=dataset_name, graph_uris=graph_uris, ) return {"@context": {}, "@graph": [], "@about": about} # Create minimal context for the namespaces we find context: dict[str, str] = {} triples: list[dict[str, Any]] = [] for s, p, o in self.schema_triples: # Convert to CURIEs and collect namespaces s_curie, s_prefix, s_namespace = self._get_curie_and_namespace(str(s)) p_curie, p_prefix, p_namespace = self._get_curie_and_namespace(str(p)) # Add prefixes to context if s_prefix and s_namespace: context[s_prefix] = s_namespace if p_prefix and p_namespace: context[p_prefix] = p_namespace # Handle object o_value: str | dict[str, str] if isinstance(o, Literal): # It's a literal value if o.datatype: o_value = {"@value": str(o), "@type": str(o.datatype)} else: o_value = str(o) else: # It's a URI/Resource o_curie, o_prefix, o_namespace = self._get_curie_and_namespace(str(o)) if o_prefix and o_namespace: context[o_prefix] = o_namespace o_value = {"@id": o_curie if o_curie else str(o)} # Create simple triple as JSON-LD triple = { "@id": s_curie if s_curie else str(s), p_curie if p_curie else str(p): o_value, } triples.append(triple) # Group triples by subject grouped: dict[str, dict[str, Any]] = {} for triple in triples: subject_id: str = cast(str, triple["@id"]) if subject_id not in grouped: grouped[subject_id] = {"@id": subject_id} # Merge properties for key, value in triple.items(): if key != "@id": if key in grouped[subject_id]: # Convert to array if not already if not isinstance(grouped[subject_id][key], list): grouped[subject_id][key] = [grouped[subject_id][key]] # Add new value if not duplicate if value not in grouped[subject_id][key]: grouped[subject_id][key].append(value) else: grouped[subject_id][key] = value # Build @about metadata section about = self._extract_about_metadata( endpoint_url=endpoint_url, dataset_name=dataset_name, graph_uris=graph_uris, ) # Return simple JSON-LD return {"@context": context, "@graph": list(grouped.values()), "@about": about}
def _create_context(self) -> dict[str, str]: """Create JSON-LD @context.""" # Start with standard W3C vocabularies (should not be needed anymore) context = { # Core RDF vocabularies "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", "rdfs": "http://www.w3.org/2000/01/rdf-schema#", "owl": "http://www.w3.org/2002/07/owl#", "xsd": "http://www.w3.org/2001/XMLSchema#", # Metadata vocabularies "dcterms": "http://purl.org/dc/terms/", "dc": "http://purl.org/dc/elements/1.1/", "prov": "http://www.w3.org/ns/prov#", "foaf": "http://xmlns.com/foaf/0.1/", "skos": "http://www.w3.org/2004/02/skos/core#", "schema": "https://schema.org/", # VoID and SHACL for schema description "void": "http://rdfs.org/ns/void#", "sh": "http://www.w3.org/ns/shacl#", # Common biological/chemical ontologies (clean URIs) "go": "http://purl.obolibrary.org/obo/GO_", "chebi": "http://purl.obolibrary.org/obo/CHEBI_", "pato": "http://purl.obolibrary.org/obo/PATO_", "ncit": "http://ncicb.nci.nih.gov/xml/owl/EVS/Thesaurus.owl#", "cheminf": "http://semanticscience.org/resource/CHEMINF_", } # Add prefixes from VoID graph namespace manager if self.graph and hasattr(self.graph, "namespace_manager"): for prefix, namespace in self.graph.namespace_manager.namespaces(): if prefix and namespace and str(prefix) not in context: # Only add if it's a valid URI and not already present ns_str = str(namespace) if ns_str.startswith(("http://", "https://", "urn:")): context[str(prefix)] = ns_str return context def _extract_context(self) -> dict[str, str]: """Extract @context from VoID graph and common namespaces.""" return self._create_context() def _filter_jsonld_void_admin_nodes(self, jsonld: dict[str, Any]) -> dict[str, Any]: """Filter out VoID administrative nodes from JSON-LD structure.""" void_patterns = [ "void", "rdfs", "rdf", "owl", "skos", "foaf", "dcterms", "dc", "prov", "schema", ] # Handle @graph structure if "@graph" in jsonld: filtered_graph = [] for item in jsonld["@graph"]: # Keep dataset description (first item) if item.get("@type") == "void:Dataset": filtered_graph.append(item) continue # Keep schema pattern statements, S-P-O relationships if "void:SchemaPattern" in item.get("@type", []): filtered_graph.append(item) continue # Filter other items based on @id patterns item_id = item.get("@id", "").lower() if not any(void_pat in item_id for void_pat in void_patterns): filtered_graph.append(item) jsonld_filtered = jsonld.copy() jsonld_filtered["@graph"] = filtered_graph return jsonld_filtered # Return as-is if no recognized structure return jsonld def _get_curie_and_namespace(self, uri: str) -> tuple[str, str, str]: """Get CURIE representation and extract prefix/namespace info. Args: uri: The URI to convert Returns: Tuple of (curie, prefix, namespace_uri). """ import re curie = None prefix = None namespace_uri = None # First try bioregistry conversion if uri.startswith(("http://", "https://")): try: from bioregistry import curie_from_iri, parse_iri parsed = parse_iri(uri) if parsed: prefix, local_id = parsed if local_id in uri: idx = uri.rfind(local_id) namespace_uri = uri[:idx] elif "#" in uri: namespace_uri = uri.rsplit("#", 1)[0] + "#" else: namespace_uri = uri.rsplit("/", 1)[0] + "/" curie = curie_from_iri(uri) if not curie and prefix and local_id: curie = f"{prefix}:{local_id}" except Exception as e: logger.debug("Bioregistry failed for %s: %s", uri, e) # Fallback to string manipulation if not curie: if "#" in uri: namespace_part, local_part = uri.rsplit("#", 1) namespace_uri = namespace_part + "#" elif "/" in uri: namespace_part, local_part = uri.rsplit("/", 1) namespace_uri = namespace_part + "/" else: local_part = uri if not prefix and namespace_uri: clean_uri = namespace_uri.replace( "http://", "", ).replace("https://", "") clean_uri = ( clean_uri.replace( "www.", "", ) .strip("/") .strip("#") ) if "/" in clean_uri: parts = clean_uri.split("/") prefix = parts[-1] if parts[-1] else parts[-2] if len(parts) > 1 else "ns" else: prefix = clean_uri.split(".")[0] if "." in clean_uri else clean_uri prefix = re.sub(r"[^a-zA-Z0-9_]", "", prefix)[:10] curie = f"{prefix}:{local_part}" if prefix and local_part else uri return curie or uri, prefix or "", namespace_uri or "" def _extract_schema_patterns_from_triples(self) -> list[dict[str, str]]: """ Extract schema patterns from the internal schema triples. This creates the schema_patterns structure expected by other methods. Returns: List of schema pattern dictionaries """ if not hasattr(self, "schema_triples") or not self.schema_triples: return [] patterns = [] for subject_uri, property_uri, object_uri in self.schema_triples: # Convert URIs to CURIEs for display subject_curie, _, _ = self._get_curie_and_namespace(str(subject_uri)) property_curie, _, _ = self._get_curie_and_namespace(str(property_uri)) object_curie, _, _ = self._get_curie_and_namespace(str(object_uri)) patterns.append( { "subject_class": subject_curie, "subject_uri": str(subject_uri), "property": property_curie, "property_uri": str(property_uri), "object_class": object_curie, "object_uri": str(object_uri), } ) return patterns
[docs] def to_schema(self, filter_void_admin_nodes: bool = True) -> pd.DataFrame: """ Parse VoID file and return schema as pandas DataFrame. This method now uses the JSON-LD generation as the source of truth. Args: filter_void_admin_nodes: Whether to filter out VoID-specific nodes Returns: DataFrame with schema information including CURIEs """ # Ensure schema is extracted (populates self.schema_triples) self._extract_schema_triples() # Get schema patterns from the internal triples schema_patterns = self._extract_schema_patterns_from_triples() if not schema_patterns: return pd.DataFrame() # Convert to DataFrame df = pd.DataFrame(schema_patterns) # Apply filtering if requested if filter_void_admin_nodes: df = self._filter_void_admin_nodes(df) return df
[docs] def to_linkml( self, filter_void_nodes: bool = True, schema_name: str | None = None, schema_description: str | None = None, schema_base_uri: str | None = None, jsonld_override: dict[str, Any] | None = None, ) -> SchemaDefinition: """Generate LinkML schema from JSON-LD representation. See :func:`rdfsolve.schema_models.linkml.to_linkml` for full documentation. """ from rdfsolve.schema_models.linkml import ( to_linkml as _to_linkml, ) jsonld = ( jsonld_override if jsonld_override is not None else self.to_jsonld(filter_void_nodes) ) return _to_linkml( jsonld, schema_name=schema_name, schema_description=schema_description, schema_base_uri=schema_base_uri, )
[docs] def to_linkml_yaml( self, filter_void_nodes: bool = True, schema_name: str | None = None, schema_description: str | None = None, schema_base_uri: str | None = None, ) -> str: """Return LinkML schema as YAML string. See :func:`rdfsolve.schema_models.linkml.to_linkml_yaml`. """ from rdfsolve.schema_models.linkml import ( to_linkml_yaml as _to_linkml_yaml, ) jsonld = self.to_jsonld(filter_void_nodes) return _to_linkml_yaml( jsonld, schema_name=schema_name, schema_description=schema_description, schema_base_uri=schema_base_uri, )
[docs] def to_shacl( self, filter_void_nodes: bool = True, schema_name: str | None = None, schema_description: str | None = None, schema_base_uri: str | None = None, closed: bool = True, suffix: str | None = None, include_annotations: bool = False, ) -> str: """Generate SHACL shapes from VoID schema. See :func:`rdfsolve.schema_models.shacl.to_shacl`. """ from rdfsolve.schema_models.shacl import ( to_shacl as _to_shacl, ) jsonld = self.to_jsonld(filter_void_nodes) return _to_shacl( jsonld, schema_name=schema_name, schema_description=schema_description, schema_base_uri=schema_base_uri, closed=closed, suffix=suffix, include_annotations=include_annotations, )
[docs] def to_rdfconfig( self, filter_void_nodes: bool = True, endpoint_url: str | None = None, endpoint_name: str | None = None, graph_uri: str | None = None, ) -> dict[str, str]: """Generate RDF-config YAML files. See :func:`rdfsolve.schema_models.rdfconfig.to_rdfconfig`. """ from rdfsolve.schema_models.rdfconfig import ( to_rdfconfig as _to_rdfconfig, ) jsonld = self.to_jsonld(filter_void_nodes) return _to_rdfconfig( jsonld, endpoint_url=endpoint_url, endpoint_name=endpoint_name, graph_uri=graph_uri, )