"""SeMRA-powered inference pipeline for rdfsolve mappings.
Takes one or more mapping JSON-LD files, converts their edges to
``semra.Mapping`` objects, applies the requested inference operations
(inversion, transitivity/chain, generalisation), deduplicates via
``semra.api.assemble_evidences``, and writes the result as an
:class:`~rdfsolve.mapping_models.inference.InferencedMapping` JSON-LD file.
Main entry-point
----------------
:func:`infer_mappings` - full pipeline.
:func:`seed_inferenced_mappings` - convenience wrapper for CLI/scripts.
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any
from rdfsolve._uri import expand_curie
from rdfsolve.mapping_models.core import MappingEdge
from rdfsolve.mapping_models.inference import InferencedMapping
from rdfsolve.schema_models.core import AboutMetadata
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
__all__ = ["infer_mappings", "seed_inferenced_mappings"]
def _load_edges_from_jsonld(
path: Path,
) -> list[MappingEdge]:
"""Load MappingEdge objects from a single JSON-LD mapping file."""
data = json.loads(path.read_text(encoding="utf-8"))
context = data.get("@context", {})
graph = data.get("@graph", [])
edges: list[MappingEdge] = []
for node in graph:
source_id = node.get("@id", "")
src_ds_node = node.get("void:inDataset", {})
src_ds = src_ds_node.get("dcterms:title", "")
src_ep = (src_ds_node.get("void:sparqlEndpoint") or {}).get("@id")
for key, val in node.items():
if key.startswith("@") or key in (
"void:inDataset",
"dcterms:created",
):
continue
targets = val if isinstance(val, list) else [val]
for tgt in targets:
if not isinstance(tgt, dict):
continue
tgt_id = tgt.get("@id", "")
tgt_ds_node = tgt.get("void:inDataset", {})
tgt_ds = tgt_ds_node.get("dcterms:title", "")
tgt_ep = (tgt_ds_node.get("void:sparqlEndpoint") or {}).get("@id")
pred_uri = expand_curie(key, context)
src_uri = expand_curie(source_id, context)
tgt_uri = expand_curie(tgt_id, context)
edges.append(
MappingEdge(
source_class=src_uri,
target_class=tgt_uri,
predicate=pred_uri,
source_dataset=src_ds,
target_dataset=tgt_ds,
source_endpoint=src_ep,
target_endpoint=tgt_ep,
)
)
return edges
[docs]
def infer_mappings(
input_paths: list[str],
output_path: str,
*,
inversion: bool = True,
transitivity: bool = True,
generalisation: bool = False,
chain_cutoff: int = 3,
dataset_name: str | None = None,
) -> dict[str, Any]:
"""Run the inference pipeline over a set of mapping JSON-LD files.
Loads all mapping edges from *input_paths*, converts them to semra
Mappings, applies the chosen inference operations, deduplicates via
``semra.api.assemble_evidences``, converts back to rdfsolve edges,
and writes an :class:`~rdfsolve.mapping_models.inference.InferencedMapping`
JSON-LD to *output_path*.
Args:
input_paths: Paths to input mapping JSON-LD files.
output_path: Path to write the inferenced mapping JSON-LD.
inversion: Apply symmetric inversion of every mapping.
transitivity: Apply transitive chain inference.
generalisation: Apply generalisation (broader/narrower).
chain_cutoff: Max chain length for transitivity inference.
dataset_name: Override for the ``@about.dataset_name`` field.
Returns:
Summary dict with keys ``"input_edges"``, ``"output_edges"``,
``"inference_types"``, ``"output_path"``.
"""
from semra.api import assemble_evidences
from semra.inference import (
infer_chains,
infer_generalizations,
infer_reversible,
)
from rdfsolve.semra_converter import (
rdfsolve_edges_to_semra,
semra_evidence_to_jsonld_about,
semra_to_rdfsolve_edges,
)
# ── Load all edges ────────────────────────────────────────────────
all_edges: list[MappingEdge] = []
for p in input_paths:
pth = Path(p)
try:
edges = _load_edges_from_jsonld(pth)
all_edges.extend(edges)
logger.info(
"Loaded %d edges from %s",
len(edges),
pth.name,
)
except Exception as exc:
logger.warning("Skipping %s: %s", pth.name, exc)
logger.info("Total input edges: %d", len(all_edges))
# ── Convert to semra ──────────────────────────────────────────────
semra_mappings = rdfsolve_edges_to_semra(all_edges)
logger.info("Converted to %d semra Mappings", len(semra_mappings))
# ── Apply inference operations ────────────────────────────────────
applied: list[str] = []
if inversion:
semra_mappings = infer_reversible(semra_mappings)
applied.append("inversion")
logger.info(
"After inversion: %d mappings",
len(semra_mappings),
)
if transitivity:
semra_mappings = infer_chains(
semra_mappings,
cutoff=chain_cutoff,
)
applied.append("transitivity")
logger.info(
"After transitivity (cutoff=%d): %d mappings",
chain_cutoff,
len(semra_mappings),
)
if generalisation:
semra_mappings = infer_generalizations(semra_mappings)
applied.append("generalisation")
logger.info(
"After generalisation: %d mappings",
len(semra_mappings),
)
# ── Deduplicate ───────────────────────────────────────────────────
semra_mappings = assemble_evidences(semra_mappings)
logger.info(
"After assemble_evidences: %d unique mappings",
len(semra_mappings),
)
# ── Convert back to rdfsolve ──────────────────────────────────────
result_edges = semra_to_rdfsolve_edges(semra_mappings)
evidence_chain: list[dict[str, Any]] = []
for m in semra_mappings:
evidence_chain.extend(semra_evidence_to_jsonld_about(m.evidence))
# ── Build InferencedMapping ───────────────────────────────────────
stem = Path(output_path).stem
name = dataset_name or f"{stem}_mapping"
about = AboutMetadata.build(
dataset_name=name,
pattern_count=len(result_edges),
strategy="inferenced",
)
mapping = InferencedMapping(
edges=result_edges,
about=about,
inference_types=applied,
source_mapping_files=[str(p) for p in input_paths],
evidence_chain=evidence_chain,
stats={
"input_edges": len(all_edges),
"output_edges": len(result_edges),
"inference_types": applied,
},
)
# ── Write output ──────────────────────────────────────────────────
out = Path(output_path)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(
json.dumps(mapping.to_jsonld(), indent=2, ensure_ascii=False),
encoding="utf-8",
)
logger.info(
"Written %d inferenced edges to %s",
len(result_edges),
out,
)
return {
"input_edges": len(all_edges),
"output_edges": len(result_edges),
"inference_types": applied,
"output_path": str(out),
}
[docs]
def seed_inferenced_mappings(
input_dir: str = "docker/mappings",
output_dir: str = "docker/mappings/inferenced",
output_name: str = "inferenced_mappings",
inversion: bool = True,
transitivity: bool = True,
generalisation: bool = False,
chain_cutoff: int = 3,
) -> dict[str, Any]:
"""Infer over all mappings in *input_dir* and write to *output_dir*.
Collects all ``*.jsonld`` files under *input_dir*
(``instance_matching/``, ``semra/``, and ``sssom/`` subdirs), runs
:func:`infer_mappings`, and writes
``{output_dir}/{output_name}.jsonld``.
This is the convenience entry-point for the CLI and seed scripts.
Args:
input_dir: Directory that contains mapping subdirs.
output_dir: Directory to write inferenced output.
output_name: Stem for the output file (without ``.jsonld``).
inversion: Apply inversion inference.
transitivity: Apply transitivity inference.
generalisation: Apply generalisation inference.
chain_cutoff: Max chain length for transitivity.
Returns:
Summary from :func:`infer_mappings`.
"""
root = Path(input_dir)
input_paths: list[str] = []
for subdir_name in ("instance_matching", "semra", "sssom"):
subdir = root / subdir_name
if subdir.exists():
for f in sorted(subdir.glob("*.jsonld")):
input_paths.append(str(f))
if not input_paths:
logger.warning(
"No mapping files found under %s; nothing to infer.",
root,
)
return {
"input_edges": 0,
"output_edges": 0,
"inference_types": [],
"output_path": "",
}
output_path = str(Path(output_dir) / f"{output_name}.jsonld")
return infer_mappings(
input_paths=input_paths,
output_path=output_path,
inversion=inversion,
transitivity=transitivity,
generalisation=generalisation,
chain_cutoff=chain_cutoff,
dataset_name=output_name,
)