Source code for rdfsolve.mapping_models.core

"""Core mapping models: MappingEdge, InstanceMatchResult, Mapping.

Base class and helpers for all mapping types.
"""

from __future__ import annotations

import json as _json
import logging
from collections import Counter
from collections.abc import Callable, Collection, Iterable
from pathlib import Path
from typing import Any

from pydantic import BaseModel, Field

from rdfsolve._uri import (
    _build_br_prefix_map,
    make_expander,
    uri_to_curie,
)
from rdfsolve.schema_models._constants import (
    _GRAPH_SKIP_KEYS,
)
from rdfsolve.schema_models.core import (
    AboutMetadata,
    _merge_into_list,
)

_log = logging.getLogger(__name__)

SKOS_NARROW_MATCH = "http://www.w3.org/2004/02/skos/core#narrowMatch"


# -------------------------------------------------------------------
# Data models
# -------------------------------------------------------------------


[docs] class MappingEdge(BaseModel): """A single mapping edge between two classes.""" source_class: str = Field( ..., description="URI of the source class", ) target_class: str = Field( ..., description="URI of the target class", ) predicate: str = Field( SKOS_NARROW_MATCH, description=("Mapping predicate URI (default: skos:narrowMatch)"), ) source_dataset: str = Field( ..., description=("Dataset name where source_class lives"), ) target_dataset: str = Field( ..., description=("Dataset name where target_class lives"), ) source_endpoint: str | None = Field(None) target_endpoint: str | None = Field(None) confidence: float | None = Field( None, ge=0, le=1, description="Optional match confidence score 0-1", )
[docs] class InstanceMatchResult(BaseModel): """Raw result of probing one URI format against one endpoint.""" dataset_name: str = Field( ..., description="Dataset name", ) endpoint_url: str = Field( ..., description="SPARQL endpoint URL", ) uri_format: str = Field( ..., description="URI prefix that was probed", ) matched_class: str | None = Field( None, description=("Class URI returned by the endpoint for this pattern; None if no match"), )
[docs] class Mapping(BaseModel): """Container for a set of mapping edges with provenance. Base class for all mapping types. """ edges: list[MappingEdge] = Field(default_factory=list) about: AboutMetadata = Field(...) mapping_type: str = Field( "unknown", description="Mapping strategy identifier", ) # ---- JSON-LD import ------------------------------------
[docs] @classmethod def from_jsonld(cls, path: str | Path) -> Mapping: """Reconstruct from a mapping JSON-LD file. Inverse of :meth:`to_jsonld`. Expands CURIEs using the file's own ``@context`` block. """ br_map = _build_br_prefix_map() raw = _json.loads( Path(path).read_text(encoding="utf-8"), ) context: dict[str, str] = raw.get("@context", {}) about_data = raw.get("@about", {}) # Legacy SSSOM files store curie_map in @about curie_map: dict[str, str] = about_data.get("curie_map") or {} merged = {**curie_map, **context} if curie_map else context expand = make_expander(merged, br_map) edges = _parse_mapping_graph( raw.get("@graph", []), expand, ) about = AboutMetadata.model_validate(about_data) strategy = about_data.get("strategy", "unknown") return cls( edges=edges, about=about, mapping_type=strategy, )
# ---- NetworkX export -----------------------------------
[docs] def to_networkx(self) -> Any: """Export the mapping as an ``nx.MultiDiGraph``.""" try: import networkx as _nx except ImportError as exc: raise ImportError( "networkx is required for to_networkx(); install it with: pip install networkx", ) from exc graph: Any = _nx.MultiDiGraph() for edge in self.edges: for uri, ds in ( (edge.source_class, edge.source_dataset), (edge.target_class, edge.target_dataset), ): if uri not in graph: graph.add_node(uri, dataset=ds) graph.add_edge( edge.source_class, edge.target_class, predicate=edge.predicate, source_dataset=edge.source_dataset, target_dataset=edge.target_dataset, strategy=self.mapping_type, confidence=edge.confidence, ) return graph
# ---- Dataset-level graph export ------------------------
[docs] @classmethod def dataset_graph( cls, paths: Iterable[str | Path], class_to_datasets: dict[str, set[str]], *, base_graph: Any | None = None, strategies: Collection[str] | None = None, ) -> Any: """Stream mapping files into a weighted dataset-pair graph. For every mapping edge whose both endpoint classes appear in *class_to_datasets*, increment the weight of the ``(dataset_a, dataset_b)`` pair in the output graph. """ try: import networkx as _nx except ImportError as exc: raise ImportError( "networkx is required for dataset_graph(); install it with: pip install networkx", ) from exc # ujson is ~3-5x faster for large files try: import ujson as _fast_json except ImportError: _fast_json = None # type: ignore[assignment] fast_json = _fast_json if _fast_json is not None else _json br_map = _build_br_prefix_map() skip_keys = frozenset( { "void:inDataset", "dcterms:created", } ) weights: Counter[tuple[str, str]] = Counter() for p in paths: _process_mapping_file( p, fast_json, br_map, skip_keys, class_to_datasets, strategies, weights, ) graph: Any = base_graph if base_graph is not None else _nx.Graph() for (a, b), w in weights.items(): if graph.has_edge(a, b): graph[a][b]["weight"] += w else: graph.add_edge(a, b, weight=w) return graph
# ---- JSON-LD export ------------------------------------
[docs] def to_jsonld(self) -> dict[str, Any]: """Export as JSON-LD with @context, @graph, @about. Edges are grouped by source_class. """ context: dict[str, str] = { "skos": ("http://www.w3.org/2004/02/skos/core#"), "rdfsolve": "https://w3id.org/rdfsolve/", "void": "http://rdfs.org/ns/void#", "dcterms": "http://purl.org/dc/terms/", "foaf": "http://xmlns.com/foaf/0.1/", "sd": ("http://www.w3.org/ns/sparql-service-description#"), } grouped: dict[str, dict[str, Any]] = {} created_at = self.about.generated_at for edge in self.edges: sc, sc_pfx, sc_ns = uri_to_curie( edge.source_class, ) tc, tc_pfx, tc_ns = uri_to_curie( edge.target_class, ) pp, pp_pfx, pp_ns = uri_to_curie( edge.predicate, ) for pfx, ns in ( (sc_pfx, sc_ns), (tc_pfx, tc_ns), (pp_pfx, pp_ns), ): if pfx and ns: context.setdefault(pfx, ns) tgt_obj: dict[str, Any] = { "@id": tc, "void:inDataset": _dataset_node( edge.target_dataset, edge.target_endpoint, ), } if edge.confidence is not None: tgt_obj["rdfsolve:confidence"] = edge.confidence if sc not in grouped: grouped[sc] = { "@id": sc, "void:inDataset": _dataset_node( edge.source_dataset, edge.source_endpoint, ), "dcterms:created": created_at, } _merge_into_list(grouped, sc, pp, tgt_obj) return { "@context": context, "@graph": list(grouped.values()), "@about": self.about.model_dump( exclude_none=True, ), }
# ------------------------------------------------------------------- # Helpers # ------------------------------------------------------------------- def _dataset_node( name: str, homepage: str | None = None, ) -> dict[str, Any]: """Build a void:inDataset node dict.""" node: dict[str, Any] = { "@id": f"rdfsolve:dataset/{name}", "dcterms:title": name, } if homepage: node["foaf:homepage"] = {"@id": homepage} return node def _parse_mapping_graph( graph_nodes: list[Any], expand: Callable[[str], str], ) -> list[MappingEdge]: """Parse @graph nodes from mapping JSON-LD.""" edges: list[MappingEdge] = [] for node in graph_nodes: src_id = node.get("@id", "") if not src_id: continue src_uri = expand(src_id) src_ds_node = node.get("void:inDataset") or {} src_ds = src_ds_node.get("dcterms:title", "") src_ep_raw = src_ds_node.get("void:sparqlEndpoint") or {} src_ep = src_ep_raw.get("@id") if isinstance(src_ep_raw, dict) else None for key, val in node.items(): if key.startswith("@") or key in (_GRAPH_SKIP_KEYS): continue pred_uri = expand(key) targets = val if isinstance(val, list) else [val] for tgt in targets: if not isinstance(tgt, dict) or not tgt.get( "@id", ): continue edge = _parse_mapping_target( tgt, expand, pred_uri, src_uri, src_ds, src_ep, ) if edge: edges.append(edge) return edges def _parse_mapping_target( tgt: dict[str, Any], expand: Callable[[str], str], pred_uri: str, src_uri: str, src_ds: str, src_ep: str | None, ) -> MappingEdge | None: """Parse one target dict into a MappingEdge or None.""" tgt_uri = expand(tgt["@id"]) tgt_ds_node = tgt.get("void:inDataset") or {} tgt_ds = tgt_ds_node.get("dcterms:title", "") or src_ds tgt_ep_raw = tgt_ds_node.get("void:sparqlEndpoint") or {} tgt_ep = tgt_ep_raw.get("@id") if isinstance(tgt_ep_raw, dict) else None confidence = tgt.get("rdfsolve:confidence") try: return MappingEdge( source_class=src_uri, target_class=tgt_uri, predicate=pred_uri, source_dataset=src_ds, target_dataset=tgt_ds, source_endpoint=src_ep, target_endpoint=tgt_ep, confidence=(float(confidence) if confidence is not None else None), ) except Exception: _log.debug( "Skipping invalid mapping edge", exc_info=True, ) return None def _process_mapping_file( path: str | Path, json_mod: Any, br_map: dict[str, str], skip_keys: frozenset[str], class_to_datasets: dict[str, set[str]], strategies: Collection[str] | None, weights: Counter[tuple[str, str]], ) -> None: """Process one mapping file, accumulating weights.""" try: raw = json_mod.loads(Path(path).read_bytes()) except Exception: _log.debug( "Could not read %s", path, exc_info=True, ) return about: dict[str, Any] = raw.get("@about", {}) strategy: str = about.get("strategy", "unknown") if strategies is not None and strategy not in strategies: return context: dict[str, str] = raw.get("@context", {}) curie_map: dict[str, str] = about.get("curie_map") or {} merged = {**curie_map, **context} if curie_map else context expand = make_expander(merged, br_map) for node in raw.get("@graph", ()): src_id: str = node.get("@id", "") if not src_id: continue src_cls = expand(src_id) src_datasets = class_to_datasets.get(src_cls) if not src_datasets: continue _accumulate_node_weights( node, expand, skip_keys, src_datasets, class_to_datasets, weights, ) def _accumulate_node_weights( node: dict[str, Any], expand: Callable[[str], str], skip_keys: frozenset[str], src_datasets: set[str], class_to_datasets: dict[str, set[str]], weights: Counter[tuple[str, str]], ) -> None: """Accumulate dataset-pair weights from one @graph node.""" for key, val in node.items(): if key[0] == "@" or key in skip_keys: continue targets = val if isinstance(val, list) else (val,) for tgt in targets: if not isinstance(tgt, dict): continue tgt_id: str = tgt.get("@id", "") if not tgt_id: continue tgt_cls = expand(tgt_id) tgt_datasets = class_to_datasets.get(tgt_cls) if not tgt_datasets: continue for sd in src_datasets: for td in tgt_datasets: if sd != td: pair = (sd, td) if sd < td else (td, sd) weights[pair] += 1