"""Converter layer between rdfsolve mapping types and semra types.
This module is the **only** place where rdfsolve and semra types meet.
All other rdfsolve modules import from here; they never import semra
directly.
Key functions
-------------
rdfsolve_edges_to_semra
Convert a list of :class:`~rdfsolve.mapping_models.core.MappingEdge`
+ provenance into ``list[semra.Mapping]``.
semra_to_rdfsolve_edges
Convert ``list[semra.Mapping]`` back to
:class:`~rdfsolve.mapping_models.core.MappingEdge` list.
semra_evidence_to_jsonld_about
Serialise a semra evidence chain into a JSON-LD ``@about`` fragment.
"""
from __future__ import annotations
import functools
import json as _json
import logging
from typing import TYPE_CHECKING, Any
from bioregistry import (
get_homepage,
get_iri,
get_registry_map,
get_uri_prefix,
parse_iri,
)
from pyobo import Reference
from semra.sources.wikidata import get_wikidata_mappings_by_prefix
from rdfsolve._uri import expand_curie_bioregistry
from rdfsolve.mapping_models.core import MappingEdge
from rdfsolve.mapping_models.semra import SemraMapping
from rdfsolve.schema_models.core import AboutMetadata
if TYPE_CHECKING:
from semra.struct import Mapping as SemraMapping_
from semra.struct import ReasonedEvidence, SimpleEvidence
logger = logging.getLogger(__name__)
__all__ = [
"import_source",
"rdfsolve_edges_to_semra",
"semra_evidence_to_jsonld_about",
"semra_to_rdfsolve_edges",
]
# ---------------------------------------------------------------------------
# Predicate URI <-> semra Reference mapping
# ---------------------------------------------------------------------------
def _build_predicate_maps() -> tuple[dict[str, Any], dict[Any, str]]:
"""Build the bidirectional predicate URI <-> semra Reference map.
Deferred so semra is only imported when the function is first called.
"""
from semra.vocabulary import (
BROAD_MATCH,
CLOSE_MATCH,
DB_XREF,
EQUIVALENT_TO,
EXACT_MATCH,
NARROW_MATCH,
REPLACED_BY,
SUBCLASS,
)
fwd: dict[str, Any] = {
"http://www.w3.org/2004/02/skos/core#exactMatch": EXACT_MATCH,
"http://www.w3.org/2004/02/skos/core#narrowMatch": NARROW_MATCH,
"http://www.w3.org/2004/02/skos/core#broadMatch": BROAD_MATCH,
"http://www.w3.org/2004/02/skos/core#closeMatch": CLOSE_MATCH,
"http://www.w3.org/2004/02/skos/core#related": DB_XREF,
"http://www.w3.org/2002/07/owl#equivalentClass": EQUIVALENT_TO,
"http://www.w3.org/2002/07/owl#sameAs": EQUIVALENT_TO,
"http://www.w3.org/2000/01/rdf-schema#subClassOf": SUBCLASS,
"http://purl.obolibrary.org/obo/IAO_0100001": REPLACED_BY,
"http://www.geneontology.org/formats/oboInOwl#hasDbXref": DB_XREF,
}
inv: dict[Any, str] = {}
# Build inverse: first entry wins for duplicate semra References
for uri, ref in fwd.items():
if ref not in inv:
inv[ref] = uri
return fwd, inv
@functools.lru_cache(maxsize=1)
def _get_maps() -> tuple[dict[str, Any], dict[Any, str]]:
"""Return ``(forward, inverse)`` predicate maps, built once."""
return _build_predicate_maps()
# ---------------------------------------------------------------------------
# Helper: justification from strategy string
# ---------------------------------------------------------------------------
def _strategy_to_justification(strategy: str) -> Any:
"""Map an rdfsolve strategy identifier to a semra justification Reference."""
from semra.vocabulary import UNSPECIFIED_MAPPING
map = {
"instance_matcher": UNSPECIFIED_MAPPING,
"semra_import": UNSPECIFIED_MAPPING,
"inferenced": UNSPECIFIED_MAPPING,
"miner": UNSPECIFIED_MAPPING,
}
return map.get(strategy, UNSPECIFIED_MAPPING)
# ---------------------------------------------------------------------------
# Helper: URI <-> bioregistry Reference
# ---------------------------------------------------------------------------
def _uri_to_reference(uri: str) -> Reference | None:
"""Convert a full URI (or CURIE) to a semra/pyobo ``Reference``, or ``None``.
Strategy:
1. Expand CURIEs via :func:`expand_curie_bioregistry`.
2. Try bioregistry.parse_iri for clean canonical prefix resolution.
3. Fall back to splitting on the last ``#`` or ``/`` and using the
local fragment as identifier and the namespace tail as prefix -
no bioregistry call needed, always succeeds for well-formed URIs.
The fallback is intentionally simple: it preserves the full URI
information losslessly so the roundtrip through semra does not drop
any edges.
"""
# Expand CURIE -> full URI first
uri = expand_curie_bioregistry(uri)
# Try bioregistry for clean prefix resolution
try:
parsed = parse_iri(uri)
if parsed:
prefix, identifier = parsed
return Reference(prefix=prefix, identifier=identifier)
except Exception as e:
logger.warning("URI to Reference error with URI %s: %s", uri, e)
# Direct split - works for any http(s) URI with a fragment or path local name.
# Use the namespace tail as a short prefix so the Reference is round-trippable
# back to the original URI via _reference_to_uri.
sep = max(uri.rfind("#"), uri.rfind("/"))
if sep >= 0 and sep < len(uri) - 1:
identifier = uri[sep + 1 :]
namespace = uri[: sep + 1]
# Derive a stable prefix from the namespace (last path component)
prefix = namespace.rstrip("/#").rsplit("/", 1)[-1].lower() or "unknown"
try:
return Reference(prefix=prefix, identifier=identifier)
except Exception as e:
logger.warning("Direct split URI to Reference error with URI %s: %s", uri, e)
return None
def _reference_to_uri(ref: Any) -> str | None:
"""Convert a semra/pyobo ``Reference`` to a full URI.
Resolution order:
1. ``bioregistry.get_iri(prefix, identifier)`` - canonical URI.
2. ``bioregistry.get_uri_prefix(prefix) + identifier`` - namespace expansion.
3. CURIE string ``prefix:identifier`` - last resort (should not end up
stored in JSON-LD; callers must warn when this path is taken).
"""
try:
uri = get_iri(ref.prefix, ref.identifier)
if uri:
return str(uri)
uri_prefix = get_uri_prefix(ref.prefix)
if uri_prefix:
return str(uri_prefix) + str(ref.identifier)
except Exception:
logger.debug("Could not convert reference %s to uri", ref.identifier)
# Last-resort CURIE - callers should log a warning
return f"{ref.prefix}:{ref.identifier}"
def _bioregistry_iri(prefix: str) -> str | None:
"""Return the upstream homepage IRI for *prefix* from bioregistry."""
try:
result = get_homepage(prefix)
return str(result) if result else None
except Exception:
logger.debug("Could not find homepage for %s to uri", prefix)
return None
# ---------------------------------------------------------------------------
# Public conversion functions
# ---------------------------------------------------------------------------
[docs]
def rdfsolve_edges_to_semra(
edges: list[MappingEdge],
about: AboutMetadata | None = None,
) -> list[SemraMapping_]:
"""Convert rdfsolve MappingEdge list to semra Mapping list.
Each :class:`~rdfsolve.mapping_models.core.MappingEdge` becomes one
``semra.Mapping`` with a single ``SimpleEvidence``. The evidence
carries:
* ``justification`` derived from ``about.strategy`` (defaults to
``semapv:UnspecifiedMatchingProcess``).
* ``mapping_set`` whose ``name`` is the source dataset and whose
``purl`` is the source endpoint URL (if available).
Predicates in the curated map are converted to their canonical semra
``Reference``. Any other predicate URI is parsed directly into a
``Reference`` via bioregistry; only edges whose predicate URI cannot be
resolved at all are dropped (and logged at DEBUG level).
Args:
edges: List of :class:`~rdfsolve.mapping_models.core.MappingEdge`
to convert.
about: Optional provenance metadata; used for justification lookup.
Returns:
List of ``semra.Mapping`` objects.
"""
from semra.struct import Mapping, MappingSet, SimpleEvidence
fwd, _ = _get_maps()
strategy = about.strategy if about else "unknown"
justification = _strategy_to_justification(strategy)
results: list[SemraMapping_] = []
for edge in edges:
pred_ref = fwd.get(edge.predicate)
if pred_ref is None:
# Not in the curated map - construct a Reference directly from the
# predicate URI so no edge is ever silently dropped.
pred_ref = _uri_to_reference(edge.predicate)
if pred_ref is None:
logger.debug(
"rdfsolve_edges_to_semra: cannot parse predicate URI %r - skipping",
edge.predicate,
)
continue
logger.debug(
"rdfsolve_edges_to_semra: predicate %r not in curated map; using raw Reference %r",
edge.predicate,
pred_ref,
)
subject = _uri_to_reference(edge.source_class)
object_ = _uri_to_reference(edge.target_class)
if subject is None or object_ is None:
logger.debug(
"rdfsolve_edges_to_semra: cannot parse URIs %r / %r - skipping",
edge.source_class,
edge.target_class,
)
continue
mapping_set = MappingSet(
name=f"{edge.source_dataset}_{edge.target_dataset}",
purl=edge.source_endpoint or "",
version="",
license="",
)
evidence = SimpleEvidence(
justification=justification,
mapping_set=mapping_set,
confidence=None, # TODO
)
results.append(
Mapping(
subject=subject,
predicate=pred_ref,
object=object_,
evidence=[evidence],
)
)
return results
[docs]
def semra_to_rdfsolve_edges(
mappings: list[SemraMapping_],
dataset_hint: str = "semra",
endpoint_hint: str = "",
) -> list[MappingEdge] | list[None]:
"""Convert semra Mapping list _ rdfsolve MappingEdge list.
Confidence is omitted (left as ``None``) intentionally - see the
integration plan for discussion of confidence aggregation.
Args:
mappings: semra ``Mapping`` objects to convert.
dataset_hint: Fallback dataset name when evidence doesn't carry one.
endpoint_hint: Fallback endpoint URL.
Returns:
List of :class:`~rdfsolve.mapping_models.core.MappingEdge`.
"""
_, inv = _get_maps()
edges: list[MappingEdge] = []
for mapping in mappings:
source_uri = _reference_to_uri(mapping.subject)
target_uri = _reference_to_uri(mapping.object)
predicate_uri = inv.get(mapping.predicate)
if predicate_uri is None:
# Not in curated inverse map - reconstruct full URI from the
# Reference using the same resolution order as _reference_to_uri.
predicate_uri = _reference_to_uri(mapping.predicate)
if (
predicate_uri is not None
and ":" in predicate_uri
and not predicate_uri.startswith(("http://", "https://", "urn:"))
):
# _reference_to_uri fell back to a bare CURIE - log it
logger.warning(
"semra_to_rdfsolve_edges: could not resolve predicate "
"Reference(%r, %r) to a full URI; stored as CURIE %r",
mapping.predicate.prefix,
mapping.predicate.identifier,
predicate_uri,
)
# Extract dataset/endpoint from first SimpleEvidence
source_dataset = dataset_hint
source_endpoint = endpoint_hint
for ev in mapping.evidence:
ms = getattr(ev, "mapping_set", None)
if ms is not None:
source_dataset = getattr(ms, "name", dataset_hint) or dataset_hint
purl = getattr(ms, "purl", None)
if purl:
source_endpoint = purl
else:
# Fall back to the upstream homepage for the prefix
source_endpoint = _bioregistry_iri(source_dataset) or endpoint_hint
break
# Resolve target dataset from the object's prefix (guard against None)
obj_prefix = getattr(mapping.object, "prefix", None) if mapping.object else None
target_dataset = obj_prefix or source_dataset
target_endpoint = _bioregistry_iri(target_dataset) or source_endpoint
edges.append(
MappingEdge(
source_class=source_uri,
target_class=target_uri,
predicate=predicate_uri,
source_dataset=source_dataset,
target_dataset=target_dataset,
source_endpoint=source_endpoint or None,
target_endpoint=target_endpoint or None,
confidence=None, # deliberately omitted
)
)
return edges
[docs]
def semra_evidence_to_jsonld_about(
evidence_list: list[SimpleEvidence | ReasonedEvidence],
) -> list[dict[str, Any]]:
"""Serialise a semra evidence chain into a list of JSON-LD dicts.
Returns a list suitable for embedding in ``@about.evidence``.
Each ``SimpleEvidence`` becomes::
{
"type": "simple",
"justification": "<prefix>:<identifier>",
"mapping_set": "<name>",
"purl": "<purl>",
}
Each ``ReasonedEvidence`` becomes::
{
"type": "reasoned",
"justification": "<prefix>:<identifier>",
"source_mapping_hexdigests": ["<hex1>", ...],
"confidence_factor": <float>
}
"""
out: list[dict[str, Any]] = []
for ev in evidence_list:
ev_type = getattr(ev, "evidence_type", None)
justification = getattr(ev, "justification", None)
j_str = (
f"{justification.prefix}:{justification.identifier}"
if justification is not None
else "unknown"
)
if ev_type == "simple" or hasattr(ev, "mapping_set"):
ms = getattr(ev, "mapping_set", None)
entry: dict[str, Any] = {
"type": "simple",
"justification": j_str,
}
if ms is not None:
entry["mapping_set"] = getattr(ms, "name", "")
purl = getattr(ms, "purl", "")
if purl:
entry["purl"] = purl
out.append(entry)
else:
# ReasonedEvidence
source_mappings = getattr(ev, "mappings", [])
entry = {
"type": "reasoned",
"justification": j_str,
"source_mapping_hexdigests": [
m.hexdigest() if hasattr(m, "hexdigest") else str(m) for m in source_mappings
],
}
cf = getattr(ev, "confidence_factor", None)
if cf is not None:
entry["confidence_factor"] = cf
out.append(entry)
return out
# -------------------------------------------------------------------
# High-level import orchestrator
# -------------------------------------------------------------------
def _build_semra_mapping(
group: list[SemraMapping_],
source: str,
prefix: str,
) -> dict[str, Any]:
"""Build a SemraMapping JSON-LD dict from a group of semra Mappings."""
edges = semra_to_rdfsolve_edges(group, dataset_hint=source)
evidence_chain: list[dict[str, Any]] = []
for m in group:
evidence_chain.extend(
semra_evidence_to_jsonld_about(m.evidence),
)
about = AboutMetadata.build(
dataset_name=f"{source}_{prefix}_mapping",
pattern_count=len(edges),
strategy="semra_import",
)
mapping = SemraMapping(
edges=edges,
about=about,
source_name=source,
source_prefix=prefix,
evidence_chain=evidence_chain,
)
return mapping.to_jsonld()
[docs]
def import_source(
source: str,
keep_prefixes: list[str] | None = None,
output_dir: str = "docker/mappings/semra",
) -> dict[str, Any]:
"""Fetch mappings from a SeMRA source and write JSON-LD files.
For each unique subject prefix in the fetched mappings, writes
``{output_dir}/{source}_{prefix}.jsonld``.
Handles the Wikidata special case (per-prefix fetch via
``get_wikidata_mappings_by_prefix``).
Args:
source: SeMRA source key (e.g. ``"biomappings"``).
keep_prefixes: Optional prefix filter.
output_dir: Directory for output files.
Returns:
Summary dict ``{"succeeded": [...], "failed": [...], "skipped": [...]}``.
"""
import json as _json
from collections import defaultdict
from pathlib import Path
from semra.api import keep_prefixes as _keep_prefixes
from semra.sources import SOURCE_RESOLVER
out = Path(output_dir)
out.mkdir(parents=True, exist_ok=True)
succeeded: list[str] = []
failed: list[dict[str, str]] = []
try:
logger.info("Fetching semra source: %s", source)
# ── Wikidata special case ────────────────────────────
if source.lower() in (
"wikidata",
"getwikidatamappings",
):
return _import_wikidata(
keep_prefixes,
out,
succeeded,
failed,
)
fn = SOURCE_RESOLVER.lookup(source)
semra_mappings = fn()
except Exception as exc:
logger.error(
"Failed to load semra source %r: %s",
source,
exc,
)
return {
"succeeded": [],
"failed": [{"source": source, "error": str(exc)}],
"skipped": [],
}
if keep_prefixes:
semra_mappings = _keep_prefixes(
semra_mappings,
keep_prefixes,
)
by_prefix: dict[str, list[SemraMapping_]] = defaultdict(list)
for m in semra_mappings:
pfx = getattr(m.subject, "prefix", None) or "unknown"
by_prefix[pfx].append(m)
logger.info(
"Source %r: %d mappings across %d prefixes",
source,
len(semra_mappings),
len(by_prefix),
)
for prefix, group in sorted(by_prefix.items()):
outfile = out / f"{source}_{prefix}.jsonld"
try:
doc = _build_semra_mapping(group, source, prefix)
outfile.write_text(
_json.dumps(doc, indent=2, ensure_ascii=False),
encoding="utf-8",
)
logger.info(
"Written: %s (%d edges)",
outfile,
len(group),
)
succeeded.append(f"{source}_{prefix}")
except Exception as exc:
logger.error(
"Failed %s/%s: %s",
source,
prefix,
exc,
)
failed.append(
{
"source": source,
"prefix": prefix,
"error": str(exc),
}
)
return {
"succeeded": succeeded,
"failed": failed,
"skipped": [],
}
def _import_wikidata(
keep_prefixes: list[str] | None,
out: Any,
succeeded: list[str],
failed: list[dict[str, str]],
) -> dict[str, Any]:
"""Handle the Wikidata special case for import_source."""
available = set(
get_registry_map("wikidata").keys(),
)
targets = [p for p in keep_prefixes if p in available] if keep_prefixes else sorted(available)
if not targets:
logger.warning(
"wikidata: none of the requested prefixes have a "
"Wikidata property mapping. Available: %s",
sorted(available)[:20],
)
return {
"succeeded": [],
"failed": [],
"skipped": ["wikidata"],
}
for wd_prefix in targets:
outfile = out / f"wikidata_{wd_prefix}.jsonld"
try:
logger.info(
"wikidata: fetching prefix %r",
wd_prefix,
)
grp = get_wikidata_mappings_by_prefix(wd_prefix)
doc = _build_semra_mapping(
grp,
"wikidata",
wd_prefix,
)
outfile.write_text(
_json.dumps(
doc,
indent=2,
ensure_ascii=False,
),
encoding="utf-8",
)
logger.info(
"Written: %s (%d edges)",
outfile,
len(grp),
)
succeeded.append(f"wikidata_{wd_prefix}")
except Exception as exc:
logger.error(
"Failed wikidata/%s: %s",
wd_prefix,
exc,
)
failed.append(
{
"source": "wikidata",
"prefix": wd_prefix,
"error": str(exc),
}
)
return {
"succeeded": succeeded,
"failed": failed,
"skipped": [],
}