Source code for rdfsolve.mapping_models.instance

"""InstanceMapping - instance-based matching."""

from __future__ import annotations

import copy
from datetime import datetime, timezone
from typing import Any

from pydantic import Field

from rdfsolve.mapping_models.core import (
    SKOS_NARROW_MATCH,
    InstanceMatchResult,
    Mapping,
)


[docs] class InstanceMapping(Mapping): """Mapping generated by instance-based matching. Probes SPARQL endpoints for instances matching bioregistry URI patterns to discover which classes across different datasets represent the same kind of entity. """ mapping_type: str = Field(default="instance_matcher") resource_prefix: str = Field( ..., description="Bioregistry prefix, e.g. 'ensembl'", ) uri_formats: list[str] = Field( default_factory=list, description="URI format prefixes that were probed", ) match_results: list[InstanceMatchResult] = Field( default_factory=list, description=("Raw probe results before edge generation"), )
[docs] def to_jsonld(self) -> dict[str, Any]: """Extend base JSON-LD with instance-matcher provenance.""" doc = super().to_jsonld() about = doc.get("@about", {}) about["resource"] = self.resource_prefix about["uri_formats_queried"] = self.uri_formats about["strategy"] = "instance_matcher" doc["@about"] = about return doc
[docs] @classmethod def from_bioregistry_resource( cls, prefix: str, datasources: Any, predicate: str = SKOS_NARROW_MATCH, dataset_names: list[str] | None = None, timeout: float = 60.0, ) -> InstanceMapping: """Probe all endpoints for a bioregistry resource. Args: prefix: Bioregistry prefix (e.g. ``"ensembl"``). datasources: DataFrame with columns ``[dataset_name, endpoint_url]``. predicate: Mapping predicate URI. dataset_names: Optional subset of datasets to query. timeout: SPARQL request timeout in seconds. Returns: :class:`InstanceMapping` ready for export. """ from rdfsolve.instance_matcher import probe_resource return probe_resource( prefix=prefix, datasources=datasources, predicate=predicate, dataset_names=dataset_names, timeout=timeout, )
# ── JSON-LD merge helper ────────────────────────────────────────── _STRUCTURAL_KEYS = frozenset( { "@id", "void:inDataset", "dcterms:created", } )
[docs] def merge_instance_jsonld( existing: dict[str, Any], new: dict[str, Any], ) -> dict[str, Any]: """Merge *new* instance-mapping JSON-LD into *existing*. Merges: * ``@context`` - union of all prefix->namespace entries. * ``@graph`` - nodes keyed by ``@id``; predicate targets are merged (duplicates skipped). * ``@about`` - ``uri_formats_queried`` is unioned; ``pattern_count`` recomputed; ``generated_at`` refreshed. Returns: The mutated *existing* dict (also returned for convenience). """ # ── context ────────────────────────────────────────────── existing.setdefault("@context", {}) for k, v in new.get("@context", {}).items(): existing["@context"].setdefault(k, v) # ── graph - merge by @id ───────────────────────────────── existing_nodes: dict[str, dict[str, Any]] = {} for node in existing.get("@graph", []): nid = node.get("@id") if nid: existing_nodes[nid] = node for new_node in new.get("@graph", []): nid = new_node.get("@id") if nid not in existing_nodes: existing_nodes[nid] = copy.deepcopy(new_node) else: _merge_node(existing_nodes[nid], new_node) existing["@graph"] = list(existing_nodes.values()) # ── @about ─────────────────────────────────────────────── about_ex = existing.setdefault("@about", {}) about_new = new.get("@about", {}) seen: list[str] = list( about_ex.get("uri_formats_queried", []), ) for fmt in about_new.get("uri_formats_queried", []): if fmt not in seen: seen.append(fmt) about_ex["uri_formats_queried"] = seen edge_count = sum(len(node) - len(_STRUCTURAL_KEYS & node.keys()) for node in existing["@graph"]) about_ex["pattern_count"] = edge_count about_ex["generated_at"] = datetime.now(timezone.utc).isoformat() return existing
def _merge_node( ex_node: dict[str, Any], new_node: dict[str, Any], ) -> None: """Merge predicate targets from *new_node* into *ex_node*.""" for key, value in new_node.items(): if key in _STRUCTURAL_KEYS: continue if key not in ex_node: ex_node[key] = copy.deepcopy(value) continue # Normalise both sides to lists and merge unique existing_vals = ex_node[key] if not isinstance(existing_vals, list): existing_vals = [existing_vals] new_vals = value if isinstance(value, list) else [value] for v in new_vals: if v not in existing_vals: existing_vals.append(v) ex_node[key] = existing_vals[0] if len(existing_vals) == 1 else existing_vals