Source code for rdfsolve.sssom_importer

"""SSSOM (Simple Standard for Sharing Ontology Mappings) importer.

Downloads SSSOM bundles listed in ``data/sssom_sources.yaml``, extracts
every ``.sssom.tsv`` file, converts each one to a
:class:`~rdfsolve.mapping_models.sssom.SsomMapping` JSON-LD file, and
writes the results
to a configurable output directory (default: ``docker/mappings/sssom/``).

Typical usage
-------------
From Python::

    from rdfsolve.sssom_importer import import_sssom_source, seed_sssom_mappings

    # Import a single source defined in data/sssom_sources.yaml
    result = import_sssom_source(
        entry={
            "name": "ols_mappings",
            "provider": "EMBL-EBI (UK)",
            "url": "https://ftp.ebi.ac.uk/pub/databases/spot/ols/latest/mappings_sssom.tgz",
        },
        output_dir="docker/mappings/sssom/",
    )

    # Seed all sources in the YAML
    results = seed_sssom_mappings(
        sssom_sources_yaml="data/sssom_sources.yaml",
        output_dir="docker/mappings/sssom/",
    )

From the CLI::

    python scripts/seed_sssom_mappings.py
    python scripts/seed_sssom_mappings.py --name ols_mappings
    python scripts/seed_sssom_mappings.py --output-dir /tmp/sssom/

SSSOM TSV format
----------------
Each ``.sssom.tsv`` file has a YAML front-matter block (lines starting with
``#``) followed by a TSV header row and data rows.  Mandatory mapping columns
(SSSOM v0.15 and later):

    subject_id  predicate_id  object_id  mapping_justification

Optional columns used when present:

    subject_label  object_label  confidence  license  mapping_set_id
    mapping_set_title  subject_source  object_source

The front-matter may also carry ``mapping_set_id``, ``mapping_set_title``,
``license``, and a ``curie_map`` (prefix -> namespace) block.
"""

from __future__ import annotations

import csv
import io
import json
import logging
import os
import tarfile
import tempfile
import urllib.request
import zipfile
from collections.abc import Iterator
from pathlib import Path
from typing import Any

import yaml

from rdfsolve._uri import expand_curie

logger = logging.getLogger(__name__)

# ──────────────────────────────────────────────────────────────────────────────
# Predicate normalisation
# ──────────────────────────────────────────────────────────────────────────────

# Map common SSSOM / OWL predicates to their full URIs.
# The importer resolves short-forms via the file's own curie_map first; this
# table serves as a fallback for the most common cases.
_PREDICATE_FALLBACK: dict[str, str] = {
    # SKOS
    "skos:exactMatch": "http://www.w3.org/2004/02/skos/core#exactMatch",
    "skos:closeMatch": "http://www.w3.org/2004/02/skos/core#closeMatch",
    "skos:broadMatch": "http://www.w3.org/2004/02/skos/core#broadMatch",
    "skos:narrowMatch": "http://www.w3.org/2004/02/skos/core#narrowMatch",
    "skos:relatedMatch": "http://www.w3.org/2004/02/skos/core#relatedMatch",
    # OWL
    "owl:equivalentClass": "http://www.w3.org/2002/07/owl#equivalentClass",
    "owl:sameAs": "http://www.w3.org/2002/07/owl#sameAs",
    # SSSOM
    "sssom:NoTermFound": "https://w3id.org/sssom/NoTermFound",
}

# Default predicate when none is provided
_DEFAULT_PREDICATE = "http://www.w3.org/2004/02/skos/core#exactMatch"


# ──────────────────────────────────────────────────────────────────────────────
# SSSOM TSV parsing helpers
# ──────────────────────────────────────────────────────────────────────────────


def _parse_sssom_header(lines: list[str]) -> tuple[dict[str, Any], dict[str, str]]:
    """Parse the YAML front-matter from a list of comment lines.

    Returns ``(metadata_dict, curie_map)`` where *metadata_dict* contains
    the scalar key->value pairs from the SSSOM header and *curie_map* is the
    ``curie_map`` sub-dictionary (or an empty dict if absent).
    """
    # Strip leading '#' and collect only lines before the TSV header
    yaml_lines = [line[1:].lstrip(" ") for line in lines if line.startswith("#")]
    if not yaml_lines:
        return {}, {}

    raw = yaml.safe_load("\n".join(yaml_lines)) or {}
    if not isinstance(raw, dict):
        return {}, {}

    curie_map: dict[str, str] = raw.pop("curie_map", {}) or {}
    if not isinstance(curie_map, dict):
        curie_map = {}

    return raw, curie_map


def _parse_sssom_tsv(content: str) -> tuple[dict[str, Any], dict[str, str], list[dict[str, str]]]:
    """Parse a complete SSSOM TSV string.

    Returns ``(header_meta, curie_map, rows)`` where *rows* is a list of
    dicts (one per data row) keyed by column name.
    """
    lines = content.splitlines()
    comment_lines: list[str] = []
    data_lines: list[str] = []

    for line in lines:
        if line.startswith("#"):
            comment_lines.append(line)
        else:
            data_lines.append(line)

    header_meta, curie_map = _parse_sssom_header(comment_lines)

    # Parse the TSV section
    rows: list[dict[str, str]] = []
    if data_lines:
        reader = csv.DictReader(io.StringIO("\n".join(data_lines)), delimiter="\t")
        rows = [row for row in reader if any(v.strip() for v in row.values())]

    return header_meta, curie_map, rows


# ──────────────────────────────────────────────────────────────────────────────
# MappingEdge conversion
# ──────────────────────────────────────────────────────────────────────────────


def _rows_to_edges(
    rows: list[dict[str, str]],
    curie_map: dict[str, str],
    source_name: str,
) -> list[dict[str, Any]]:
    """Convert TSV rows to a list of MappingEdge-compatible dicts.

    Each dict has the same keys as
    :class:`~rdfsolve.mapping_models.core.MappingEdge`.
    We defer importing the Pydantic model to avoid circular import issues at
    module load time; callers that need actual ``MappingEdge`` objects can
    do ``MappingEdge(**d)`` on each dict.
    """
    edges: list[dict[str, Any]] = []
    skipped = 0

    for row in rows:
        subject_id = row.get("subject_id", "").strip()
        object_id = row.get("object_id", "").strip()
        predicate_id = row.get("predicate_id", "").strip()

        if not subject_id or not object_id:
            skipped += 1
            continue

        # Resolve CURIEs -> full URIs
        src_uri = expand_curie(subject_id, curie_map)
        tgt_uri = expand_curie(object_id, curie_map)

        if predicate_id:
            pred_uri = _PREDICATE_FALLBACK.get(predicate_id) or expand_curie(
                predicate_id, curie_map
            )
        else:
            pred_uri = _DEFAULT_PREDICATE

        # Derive a dataset label from the CURIE prefix when available
        def _dataset_label(curie: str, uri: str) -> str:
            if ":" in curie and not curie.startswith("http"):
                return curie.split(":")[0]
            # fall back to the source bundle name
            return source_name

        src_dataset = _dataset_label(subject_id, src_uri)
        tgt_dataset = _dataset_label(object_id, tgt_uri)

        # Optional confidence
        conf_str = row.get("confidence", "").strip()
        confidence: float | None = None
        if conf_str:
            try:
                confidence = float(conf_str)
            except ValueError:
                pass

        edge: dict[str, Any] = {
            "source_class": src_uri,
            "target_class": tgt_uri,
            "predicate": pred_uri,
            "source_dataset": src_dataset,
            "target_dataset": tgt_dataset,
        }
        if confidence is not None:
            edge["confidence"] = confidence

        edges.append(edge)

    if skipped:
        logger.debug("Skipped %d rows with missing subject_id or object_id", skipped)

    return edges


# ──────────────────────────────────────────────────────────────────────────────
# Archive download + extraction
# ──────────────────────────────────────────────────────────────────────────────


def _download_to_tempfile(url: str) -> Path:
    """Download *url* to a temporary file and return its path.

    Tries ``requests`` first (handles TLS + redirects well); falls back to
    ``urllib`` if requests is not installed.
    """
    suffix = Path(url.split("?")[0]).suffix or ".tmp"
    fd, tmp_path = tempfile.mkstemp(suffix=suffix)
    os.close(fd)
    logger.info("Downloading %s …", url)

    try:
        import requests  # optional dependency

        with requests.get(url, stream=True, timeout=120) as resp:
            resp.raise_for_status()
            with open(tmp_path, "wb") as fh:
                for chunk in resp.iter_content(chunk_size=1 << 20):
                    fh.write(chunk)
    except ImportError:
        # Fall back to urllib
        req = urllib.request.Request(  # noqa: S310
            url,
        )
        with urllib.request.urlopen(req, timeout=120) as resp:  # noqa: S310
            with open(tmp_path, "wb") as fh:
                while True:
                    chunk = resp.read(1 << 20)
                    if not chunk:
                        break
                    fh.write(chunk)

    logger.info("Download complete -> %s", tmp_path)
    return Path(tmp_path)


def _iter_sssom_tsv_from_archive(archive_path: Path) -> Iterator[tuple[str, str]]:
    """Yield ``(filename, content)`` for every ``.sssom.tsv`` in an archive.

    Supports ``.tgz``, ``.tar.gz``, ``.tar.bz2``, ``.zip``.  For plain
    ``.tsv`` / ``.sssom.tsv`` inputs (not archives) just reads the file.
    """
    name = archive_path.name.lower()

    if name.endswith(".tgz") or name.endswith(".tar.gz") or name.endswith(".tar.bz2"):
        with tarfile.open(archive_path) as tar:
            for member in tar.getmembers():
                if member.name.endswith(".sssom.tsv") and member.isfile():
                    f = tar.extractfile(member)
                    if f:
                        content = f.read().decode("utf-8", errors="replace")
                        yield Path(member.name).name, content

    elif name.endswith(".zip"):
        with zipfile.ZipFile(archive_path) as zf:
            for info in zf.infolist():
                if info.filename.endswith(".sssom.tsv") and not info.is_dir():
                    content = zf.read(info).decode("utf-8", errors="replace")
                    yield Path(info.filename).name, content

    elif name.endswith(".tsv") or name.endswith(".sssom.tsv"):
        yield archive_path.name, archive_path.read_text(encoding="utf-8", errors="replace")

    else:
        raise ValueError(
            f"Unsupported archive format: {archive_path.name!r}. "
            "Expected .tgz, .tar.gz, .tar.bz2, .zip, or .sssom.tsv"
        )


# ──────────────────────────────────────────────────────────────────────────────
# Public API
# ──────────────────────────────────────────────────────────────────────────────


[docs] def import_sssom_source( entry: dict[str, Any], output_dir: str = "docker/mappings/sssom", ) -> dict[str, Any]: """Download and convert one SSSOM source entry to JSON-LD files. For each ``.sssom.tsv`` file found in the archive at ``entry["url"]``, one JSON-LD file is written to *output_dir*:: {source_name}__{sssom_filename_stem}.jsonld Args: entry: A dict with at least ``name`` and ``url`` keys, as found in ``data/sssom_sources.yaml``. output_dir: Directory to write output JSON-LD files. Returns: Summary dict:: { "succeeded": ["ols_mappings__hp.sssom.tsv", ...], "failed": [{"file": "...", "error": "..."}], "skipped": [], } """ from rdfsolve.mapping_models.core import MappingEdge from rdfsolve.mapping_models.sssom import SsomMapping from rdfsolve.schema_models.core import AboutMetadata source_name: str = entry["name"] url: str = entry["url"] out = Path(output_dir) out.mkdir(parents=True, exist_ok=True) succeeded: list[str] = [] failed: list[dict[str, str]] = [] # Download the archive try: archive_path = _download_to_tempfile(url) except Exception as exc: logger.error("Failed to download %s: %s", url, exc) return { "succeeded": [], "failed": [{"source": source_name, "error": str(exc)}], "skipped": [], } try: for sssom_filename, content in _iter_sssom_tsv_from_archive(archive_path): label = f"{source_name}__{sssom_filename}" try: header_meta, curie_map, rows = _parse_sssom_tsv(content) edge_dicts = _rows_to_edges(rows, curie_map, source_name) edges = [MappingEdge(**d) for d in edge_dicts] stem = Path(sssom_filename).stem # strip .sssom.tsv -> stem dataset_name = f"{source_name}__{stem}" about = AboutMetadata.build( dataset_name=dataset_name, pattern_count=len(edges), strategy="sssom_import", ) mapping = SsomMapping( edges=edges, about=about, source_name=source_name, sssom_file=sssom_filename, mapping_set_id=header_meta.get("mapping_set_id"), mapping_set_title=header_meta.get("mapping_set_title"), license=header_meta.get("license") or entry.get("license"), curie_map=curie_map, ) out_path = out / f"{dataset_name}.jsonld" out_path.write_text( json.dumps(mapping.to_jsonld(), indent=2, ensure_ascii=False), encoding="utf-8", ) logger.info("Wrote %s (%d edges)", out_path.name, len(edges)) succeeded.append(label) except Exception as exc: logger.warning("Failed to convert %s: %s", sssom_filename, exc) failed.append({"file": label, "error": str(exc)}) finally: try: archive_path.unlink() except OSError: pass return {"succeeded": succeeded, "failed": failed, "skipped": []}
[docs] def seed_sssom_mappings( sssom_sources_yaml: str = "data/sssom_sources.yaml", output_dir: str = "docker/mappings/sssom", names: list[str] | None = None, ) -> dict[str, Any]: """Seed SSSOM mapping files for all (or selected) sources. Reads *sssom_sources_yaml*, optionally filters to *names*, and calls :func:`import_sssom_source` for each entry. Args: sssom_sources_yaml: Path to the SSSOM sources YAML file. output_dir: Directory for output JSON-LD files. names: Optional list of source names to restrict processing; if ``None`` (default), all entries are processed. Returns: Aggregated summary with keys ``"succeeded"``, ``"failed"``, ``"skipped"``. """ yaml_path = Path(sssom_sources_yaml) if not yaml_path.exists(): raise FileNotFoundError( f"SSSOM sources YAML not found: {yaml_path}. Create data/sssom_sources.yaml first." ) entries: list[dict[str, Any]] = yaml.safe_load(yaml_path.read_text(encoding="utf-8")) or [] if names: name_set = set(names) unknown = name_set - {e["name"] for e in entries} if unknown: logger.warning("Unknown SSSOM source name(s): %s", ", ".join(sorted(unknown))) entries = [e for e in entries if e["name"] in name_set] if not entries: logger.warning("No SSSOM sources to process.") return {"succeeded": [], "failed": [], "skipped": []} all_succeeded: list[str] = [] all_failed: list[dict[str, str]] = [] all_skipped: list[str] = [] for entry in entries: logger.info("Processing SSSOM source: %s", entry["name"]) result = import_sssom_source(entry, output_dir=output_dir) all_succeeded.extend(result.get("succeeded", [])) all_failed.extend(result.get("failed", [])) all_skipped.extend(result.get("skipped", [])) return { "succeeded": all_succeeded, "failed": all_failed, "skipped": all_skipped, }