"""InstanceMapping - instance-based matching."""
from __future__ import annotations
import copy
from datetime import datetime, timezone
from typing import Any
from pydantic import Field
from rdfsolve.mapping_models.core import (
SKOS_NARROW_MATCH,
InstanceMatchResult,
Mapping,
)
[docs]
class InstanceMapping(Mapping):
"""Mapping generated by instance-based matching.
Probes SPARQL endpoints for instances matching bioregistry
URI patterns to discover which classes across different
datasets represent the same kind of entity.
"""
mapping_type: str = Field(default="instance_matcher")
resource_prefix: str = Field(
...,
description="Bioregistry prefix, e.g. 'ensembl'",
)
uri_formats: list[str] = Field(
default_factory=list,
description="URI format prefixes that were probed",
)
match_results: list[InstanceMatchResult] = Field(
default_factory=list,
description=("Raw probe results before edge generation"),
)
[docs]
def to_jsonld(self) -> dict[str, Any]:
"""Extend base JSON-LD with instance-matcher provenance."""
doc = super().to_jsonld()
about = doc.get("@about", {})
about["resource"] = self.resource_prefix
about["uri_formats_queried"] = self.uri_formats
about["strategy"] = "instance_matcher"
doc["@about"] = about
return doc
[docs]
@classmethod
def from_bioregistry_resource(
cls,
prefix: str,
datasources: Any,
predicate: str = SKOS_NARROW_MATCH,
dataset_names: list[str] | None = None,
timeout: float = 60.0,
) -> InstanceMapping:
"""Probe all endpoints for a bioregistry resource.
Args:
prefix: Bioregistry prefix (e.g. ``"ensembl"``).
datasources: DataFrame with columns
``[dataset_name, endpoint_url]``.
predicate: Mapping predicate URI.
dataset_names: Optional subset of datasets to query.
timeout: SPARQL request timeout in seconds.
Returns:
:class:`InstanceMapping` ready for export.
"""
from rdfsolve.instance_matcher import probe_resource
return probe_resource(
prefix=prefix,
datasources=datasources,
predicate=predicate,
dataset_names=dataset_names,
timeout=timeout,
)
# ── JSON-LD merge helper ──────────────────────────────────────────
_STRUCTURAL_KEYS = frozenset(
{
"@id",
"void:inDataset",
"dcterms:created",
}
)
[docs]
def merge_instance_jsonld(
existing: dict[str, Any],
new: dict[str, Any],
) -> dict[str, Any]:
"""Merge *new* instance-mapping JSON-LD into *existing*.
Merges:
* ``@context`` - union of all prefix->namespace entries.
* ``@graph`` - nodes keyed by ``@id``; predicate targets
are merged (duplicates skipped).
* ``@about`` - ``uri_formats_queried`` is unioned;
``pattern_count`` recomputed; ``generated_at`` refreshed.
Returns:
The mutated *existing* dict (also returned for convenience).
"""
# ── context ──────────────────────────────────────────────
existing.setdefault("@context", {})
for k, v in new.get("@context", {}).items():
existing["@context"].setdefault(k, v)
# ── graph - merge by @id ─────────────────────────────────
existing_nodes: dict[str, dict[str, Any]] = {}
for node in existing.get("@graph", []):
nid = node.get("@id")
if nid:
existing_nodes[nid] = node
for new_node in new.get("@graph", []):
nid = new_node.get("@id")
if nid not in existing_nodes:
existing_nodes[nid] = copy.deepcopy(new_node)
else:
_merge_node(existing_nodes[nid], new_node)
existing["@graph"] = list(existing_nodes.values())
# ── @about ───────────────────────────────────────────────
about_ex = existing.setdefault("@about", {})
about_new = new.get("@about", {})
seen: list[str] = list(
about_ex.get("uri_formats_queried", []),
)
for fmt in about_new.get("uri_formats_queried", []):
if fmt not in seen:
seen.append(fmt)
about_ex["uri_formats_queried"] = seen
edge_count = sum(len(node) - len(_STRUCTURAL_KEYS & node.keys()) for node in existing["@graph"])
about_ex["pattern_count"] = edge_count
about_ex["generated_at"] = datetime.now(timezone.utc).isoformat()
return existing
def _merge_node(
ex_node: dict[str, Any],
new_node: dict[str, Any],
) -> None:
"""Merge predicate targets from *new_node* into *ex_node*."""
for key, value in new_node.items():
if key in _STRUCTURAL_KEYS:
continue
if key not in ex_node:
ex_node[key] = copy.deepcopy(value)
continue
# Normalise both sides to lists and merge unique
existing_vals = ex_node[key]
if not isinstance(existing_vals, list):
existing_vals = [existing_vals]
new_vals = value if isinstance(value, list) else [value]
for v in new_vals:
if v not in existing_vals:
existing_vals.append(v)
ex_node[key] = existing_vals[0] if len(existing_vals) == 1 else existing_vals