Source code for rdfsolve.api

"""Main RDFSolve functionalities for VoID extraction and conversion."""

import json
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any

import pandas as pd
from rdflib import Graph

from .miner import _mine_one_source
from .parser import VoidParser

logger = logging.getLogger(__name__)

__all__ = [
    "compose_query_from_paths",
    "execute_sparql",
    "graph_to_jsonld",
    "graph_to_linkml",
    "graph_to_schema",
    "graph_to_shacl",
    "import_semra_source",
    "import_sssom_source",
    "infer_mappings",
    "load_mapping_jsonld",
    "load_parser_from_file",
    "load_parser_from_graph",
    "load_parser_from_jsonld",
    "mine_all_sources",
    "mine_schema",
    "probe_instance_mapping",
    "resolve_iris",
    "seed_inferenced_mappings",
    "seed_instance_mappings",
    "seed_semra_mappings",
    "seed_sssom_mappings",
    "to_jsonld_from_file",
    "to_linkml_from_file",
    "to_rdfconfig_from_file",
    "to_shacl_from_file",
    "to_void_from_file",
]


[docs] def load_parser_from_file( void_file_path: str, graph_uris: str | list[str] | None = None, exclude_graphs: bool = True, ) -> VoidParser: """Load a VoID file and return a parser for schema extraction. Args: void_file_path: Path to VoID Turtle file graph_uris: Graph URIs to filter queries exclude_graphs: Exclude system graphs Returns: VoidParser instance """ return VoidParser( void_source=void_file_path, graph_uris=graph_uris, exclude_graphs=exclude_graphs )
[docs] def load_parser_from_graph( graph: Graph, graph_uris: str | list[str] | None = None, exclude_graphs: bool = True, ) -> VoidParser: """Load a VoID graph and return a parser for schema extraction. Args: graph: RDFLib Graph with VoID data graph_uris: Graph URIs to filter queries exclude_graphs: Exclude system graphs Returns: VoidParser instance """ return VoidParser(void_source=graph, graph_uris=graph_uris, exclude_graphs=exclude_graphs)
[docs] def load_parser_from_jsonld( jsonld_path: str, graph_uris: str | list[str] | None = None, exclude_graphs: bool = True, ) -> VoidParser: """Load a mined-schema JSON-LD file and return a VoidParser. Reads the JSON-LD produced by ``rdfsolve mine``, reconstructs a :class:`~rdfsolve.schema_models.core.MinedSchema` via :meth:`MinedSchema.from_jsonld`, converts it to an in-memory VoID RDF graph, and wraps it in a :class:`~rdfsolve.parser.VoidParser` ready for export to CSV / LinkML / SHACL / RDF-config. Args: jsonld_path: Path to a ``*_schema.jsonld`` file produced by ``rdfsolve mine``. graph_uris: Graph URIs to filter (passed through to VoidParser). exclude_graphs: Exclude system graphs. Returns: VoidParser instance backed by the converted VoID graph. """ from .models import MinedSchema schema = MinedSchema.from_jsonld(jsonld_path) return VoidParser( void_source=schema.to_void_graph(), graph_uris=graph_uris, exclude_graphs=exclude_graphs, )
[docs] def to_linkml_from_file( void_file_path: str, filter_void_nodes: bool = True, schema_name: str | None = None, schema_description: str | None = None, schema_base_uri: str | None = None, ) -> str: """Convert a VoID file to LinkML YAML schema. Args: void_file_path: Path to VoID file filter_void_nodes: Remove VoID-specific nodes schema_name: Name for the schema schema_description: Description for the schema schema_base_uri: Base URI for the schema Returns: LinkML YAML schema string """ parser = load_parser_from_file(void_file_path) return parser.to_linkml_yaml( filter_void_nodes=filter_void_nodes, schema_name=schema_name, schema_description=schema_description, schema_base_uri=schema_base_uri, )
[docs] def to_shacl_from_file( void_file_path: str, filter_void_nodes: bool = True, schema_name: str | None = None, schema_description: str | None = None, schema_base_uri: str | None = None, closed: bool = True, suffix: str | None = None, include_annotations: bool = False, ) -> str: """Convert a VoID file to SHACL shapes. Generates SHACL (Shapes Constraint Language) shapes from a VoID description file. SHACL shapes define constraints on RDF data and can be used for validation. Args: void_file_path: Path to VoID file filter_void_nodes: Remove VoID-specific nodes schema_name: Name for the schema schema_description: Description for the schema schema_base_uri: Base URI for the schema closed: Generate closed shapes (only allow defined properties) suffix: Optional suffix for shape names (e.g., "Shape") include_annotations: Include class/slot annotations in shapes Returns: SHACL shapes as Turtle/RDF string Example: >>> from rdfsolve.api import to_shacl_from_file >>> shacl_ttl = to_shacl_from_file( ... "dataset_void.ttl", schema_name="my_dataset", closed=True ... ) >>> with open("schema.shacl.ttl", "w") as f: ... f.write(shacl_ttl) """ parser = load_parser_from_file(void_file_path) return parser.to_shacl( filter_void_nodes=filter_void_nodes, schema_name=schema_name, schema_description=schema_description, schema_base_uri=schema_base_uri, closed=closed, suffix=suffix, include_annotations=include_annotations, )
[docs] def to_rdfconfig_from_file( void_file_path: str, filter_void_nodes: bool = True, endpoint_url: str | None = None, endpoint_name: str | None = None, graph_uri: str | None = None, ) -> dict[str, str]: """Convert a VoID file to RDF-config YAML files. RDF-config is a schema standard that describes RDF data models using YAML configuration files. This function generates three files: - model.yml: Class and property structure - prefix.yml: Namespace prefix definitions - endpoint.yml: SPARQL endpoint configuration Note: The rdf-config tool requires these files to be named exactly model.yml, prefix.yml, and endpoint.yml, and placed in a directory named {dataset}_config. The CLI automatically creates this structure. Args: void_file_path: Path to VoID file filter_void_nodes: Remove VoID-specific nodes endpoint_url: SPARQL endpoint URL (optional) endpoint_name: Name for endpoint (default: "endpoint") graph_uri: Named graph URI (optional) Returns: Dictionary with 'model', 'prefix', 'endpoint' keys containing YAML strings Example: >>> from rdfsolve.api import to_rdfconfig_from_file >>> rdfconfig = to_rdfconfig_from_file( ... "dataset_void.ttl", ... endpoint_url="https://example.org/sparql", ... graph_uri="http://example.org/graph", ... ) >>> # Save files >>> with open("model.yml", "w") as f: ... f.write(rdfconfig["model"]) >>> with open("prefix.yml", "w") as f: ... f.write(rdfconfig["prefix"]) >>> with open("endpoint.yml", "w") as f: ... f.write(rdfconfig["endpoint"]) """ parser = load_parser_from_file(void_file_path) return parser.to_rdfconfig( filter_void_nodes=filter_void_nodes, endpoint_url=endpoint_url, endpoint_name=endpoint_name, graph_uri=graph_uri, )
[docs] def to_void_from_file( jsonld_path: str, ) -> Graph: """Convert a mined-schema JSON-LD file to a VoID RDF graph. Reads the JSON-LD, reconstructs a :class:`~rdfsolve.schema_models.core.MinedSchema`, and returns the equivalent VoID graph (rdflib ``Graph``). Args: jsonld_path: Path to a ``*_schema.jsonld`` file. Returns: rdflib ``Graph`` containing the VoID description. """ from .models import MinedSchema schema = MinedSchema.from_jsonld(jsonld_path) return schema.to_void_graph()
[docs] def to_jsonld_from_file( void_file_path: str, filter_void_admin_nodes: bool = True, endpoint_url: str | None = None, dataset_name: str | None = None, graph_uris: str | list[str] | None = None, ) -> dict[str, Any]: """Convert a VoID file to JSON-LD format. Args: void_file_path: Path to VoID file filter_void_admin_nodes: Remove VoID and administrative nodes endpoint_url: SPARQL endpoint URL for the @about section dataset_name: Dataset name for the @about section graph_uris: Graph URIs for the @about section Returns: JSON-LD with @context, @graph, and @about """ parser = load_parser_from_file(void_file_path) graph_uris_list = [graph_uris] if isinstance(graph_uris, str) else graph_uris return parser.to_jsonld( filter_void_admin_nodes=filter_void_admin_nodes, endpoint_url=endpoint_url, dataset_name=dataset_name, graph_uris=graph_uris_list, )
[docs] def graph_to_jsonld( graph: Graph, graph_uris: str | list[str] | None = None, filter_void_admin_nodes: bool = True, endpoint_url: str | None = None, dataset_name: str | None = None, ) -> dict[str, Any]: """Convert a VoID graph to JSON-LD format. Args: graph: RDFLib Graph with VoID data graph_uris: Graph URIs to filter extraction filter_void_admin_nodes: Remove VoID and administrative nodes endpoint_url: SPARQL endpoint URL for the @about section dataset_name: Dataset name for the @about section Returns: JSON-LD with @context, @graph, and @about """ parser = load_parser_from_graph(graph, graph_uris=graph_uris) graph_uris_list = [graph_uris] if isinstance(graph_uris, str) else graph_uris return parser.to_jsonld( filter_void_admin_nodes=filter_void_admin_nodes, endpoint_url=endpoint_url, dataset_name=dataset_name, graph_uris=graph_uris_list, )
[docs] def graph_to_linkml( graph: Graph, graph_uris: str | list[str] | None = None, filter_void_nodes: bool = True, schema_name: str | None = None, schema_description: str | None = None, schema_base_uri: str | None = None, ) -> str: """Convert a VoID graph to LinkML YAML schema. Args: graph: RDFLib Graph with VoID data graph_uris: Graph URIs to filter extraction filter_void_nodes: Remove VoID-specific nodes schema_name: Name for the schema schema_description: Description for the schema schema_base_uri: Base URI for the schema Returns: LinkML YAML schema string """ parser = load_parser_from_graph(graph, graph_uris=graph_uris) return parser.to_linkml_yaml( filter_void_nodes=filter_void_nodes, schema_name=schema_name, schema_description=schema_description, schema_base_uri=schema_base_uri, )
[docs] def graph_to_shacl( graph: Graph, graph_uris: str | list[str] | None = None, filter_void_nodes: bool = True, schema_name: str | None = None, schema_description: str | None = None, schema_base_uri: str | None = None, closed: bool = True, suffix: str | None = None, include_annotations: bool = False, ) -> str: """Convert a VoID graph to SHACL shapes. Generates SHACL (Shapes Constraint Language) shapes from a VoID graph. SHACL shapes define constraints on RDF data and can be used for validation. Args: graph: RDFLib Graph with VoID data graph_uris: Graph URIs to filter extraction filter_void_nodes: Remove VoID-specific nodes schema_name: Name for the schema schema_description: Description for the schema schema_base_uri: Base URI for the schema closed: Generate closed shapes (only allow defined properties) suffix: Optional suffix for shape names (e.g., "Shape") include_annotations: Include class/slot annotations in shapes Returns: SHACL shapes as Turtle/RDF string Example: >>> from rdflib import Graph >>> from rdfsolve.api import graph_to_shacl >>> void_graph = Graph() >>> void_graph.parse("dataset_void.ttl", format="turtle") >>> shacl_ttl = graph_to_shacl(void_graph, schema_name="my_dataset") """ parser = load_parser_from_graph(graph, graph_uris=graph_uris) return parser.to_shacl( filter_void_nodes=filter_void_nodes, schema_name=schema_name, schema_description=schema_description, schema_base_uri=schema_base_uri, closed=closed, suffix=suffix, include_annotations=include_annotations, )
[docs] def graph_to_schema( void_graph: Graph, graph_uris: str | list[str] | None = None, filter_void_admin_nodes: bool = True, ) -> pd.DataFrame: """Convert VoID graph to schema DataFrame. Args: void_graph: RDFLib graph with VoID data graph_uris: Graph URIs to extract filter_void_admin_nodes: Filter VoID or administrative nodes Returns: DataFrame with schema patterns (subject/property/object URIs) """ parser = VoidParser(void_source=void_graph, graph_uris=graph_uris) return parser.to_schema(filter_void_admin_nodes=filter_void_admin_nodes)
[docs] def mine_schema( endpoint_url: str, graph_uris: str | list[str] | None = None, dataset_name: str | None = None, chunk_size: int = 10_000, class_chunk_size: int | None = None, class_batch_size: int = 15, delay: float = 0.5, timeout: float = 120.0, counts: bool = True, two_phase: bool = True, report_path: str | None = None, filter_service_namespaces: bool = True, authors: list[dict[str, str]] | None = None, ) -> dict[str, Any]: """Mine RDF schema from a SPARQL endpoint using SELECT queries. This is a simpler, faster alternative to generate_void_from_endpoint that avoids heavy CONSTRUCT queries. Returns a MinedSchema which can export to JSON-LD or be converted to a VoID graph. Args: endpoint_url: SPARQL endpoint URL graph_uris: Graph URI(s) to restrict queries dataset_name: Human-readable dataset name chunk_size: Pagination page size class_chunk_size: Page size for Phase-1 class discovery (``None`` = single query, no pagination) class_batch_size: Number of classes to group into one VALUES query in Phase-2 (default 15) delay: Delay between pages (seconds) timeout: HTTP timeout per request counts: Whether to fetch triple counts two_phase: Use two-phase mining (default ``True``). Pass ``False`` for the legacy single-pass strategy. report_path: If given, write analytics JSON to this path filter_service_namespaces: Strip service/system namespace patterns from the result (default ``True``) Returns: JSON-LD dict with @context, @graph, and @about """ from .miner import mine_schema as _mine schema = _mine( endpoint_url=endpoint_url, graph_uris=graph_uris, dataset_name=dataset_name, chunk_size=chunk_size, class_chunk_size=class_chunk_size, class_batch_size=class_batch_size, delay=delay, timeout=timeout, counts=counts, two_phase=two_phase, report_path=report_path, filter_service_namespaces=filter_service_namespaces, authors=authors, ) return schema.to_jsonld()
[docs] def mine_all_sources( sources_csv: str | None = None, *, sources: str | None = None, output_dir: str = ".", fmt: str = "all", chunk_size: int = 10_000, class_chunk_size: int | None = None, class_batch_size: int = 15, delay: float = 0.5, timeout: float = 120.0, counts: bool = True, reports: bool = True, filter_service_namespaces: bool = True, untyped_as_classes: bool = False, authors: list[dict[str, str]] | None = None, on_progress: Callable[[str, int, int, str | None], None] | None = None, ) -> dict[str, Any]: """Mine schemas for all sources in a JSON-LD or CSV file. Reads a sources file (JSON-LD preferred, CSV still accepted) and runs :func:`mine_schema` for each entry whose *endpoint* is non-empty. Results are written to *output_dir* as ``{name}_schema.jsonld`` and / or ``{name}_void.ttl``. Per-source overrides (``chunk_size``, ``class_batch_size``, ``timeout``, etc.) in the JSON-LD file take precedence over the function-level defaults. Args: sources_csv: **Deprecated** - use *sources* instead. Path to a CSV file with data sources. Kept for backwards compatibility; ignored when *sources* is given. sources: Path to the sources file (JSON-LD or CSV). When ``None``, the default ``data/sources.jsonld`` (or ``.csv`` fallback) is used. output_dir: Directory where outputs are written. fmt: Export format - ``"jsonld"``, ``"void"``, or ``"all"``. chunk_size: Pagination page size for SPARQL queries. class_chunk_size: Page size for Phase-1 class discovery in two-phase mode. ``None`` = no pagination. Ignored for rows that are not two-phase. class_batch_size: Number of classes per VALUES query in Phase-2 of two-phase mining (default 15). delay: Delay between paginated pages (seconds). timeout: HTTP timeout per request (seconds). counts: Whether to fetch triple-count queries. reports: Write per-source analytics JSON reports. filter_service_namespaces: Strip service/system namespace patterns from each mined schema (default ``True``). untyped_as_classes: Treat untyped URI objects as ``owl:Class`` references instead of the generic ``rdfs:Resource`` sentinel (default ``False``). on_progress: Optional callback invoked after each source is processed. Signature: ``(dataset_name, index, total, status_or_error)``. *status_or_error* is ``None`` on success, or an error message string. Returns: Summary dict with keys ``"succeeded"``, ``"failed"``, and ``"skipped"`` mapping to lists of dataset names. """ from .sources import load_sources # Resolve the path: new kwarg > legacy positional > auto-detect src_path: str | None = sources or sources_csv or None out = Path(output_dir) out.mkdir(parents=True, exist_ok=True) entries = load_sources(src_path) succeeded: list[str] = [] failed: list[dict[str, str]] = [] skipped: list[str] = [] total = len(entries) for idx, entry in enumerate(entries, 1): name = entry.get("name", "") endpoint = entry.get("endpoint", "") if not endpoint: logger.info( "[%d/%d] Skipping %r: no endpoint", idx, total, name, ) skipped.append(name) if on_progress: on_progress(name, idx, total, "skipped") continue _mine_one_source( entry, idx=idx, total=total, out=out, fmt=fmt, chunk_size=chunk_size, class_chunk_size=class_chunk_size, class_batch_size=class_batch_size, delay=delay, timeout=timeout, counts=counts, reports=reports, filter_service_namespaces=filter_service_namespaces, untyped_as_classes=untyped_as_classes, authors=authors, on_progress=on_progress, succeeded=succeeded, failed=failed, ) return { "succeeded": succeeded, "failed": failed, "skipped": skipped, }
# ── SPARQL / IRI / Compose API ───────────────────────────────────
[docs] def execute_sparql( query: str, endpoint: str, method: str = "GET", timeout: int = 30, variable_map: dict[str, str] | None = None, ) -> dict[str, Any]: """Execute a SPARQL query against a remote endpoint. This is a pure-Python function- no Flask required. It delegates to :func:`rdfsolve.query.execute_sparql` which uses the robust :class:`~rdfsolve.sparql_helper.SparqlHelper` under the hood. Args: query: Full SPARQL query string. endpoint: SPARQL endpoint URL. method: HTTP method (``"GET"`` or ``"POST"``). timeout: Timeout in seconds. variable_map: Optional mapping of SPARQL ?variable -> schema URI. Returns: Dict with keys ``query``, ``endpoint``, ``variables``, ``rows``, ``variable_map``, ``row_count``, ``duration_ms``, and optionally ``error``. Example:: >>> from rdfsolve.api import execute_sparql >>> result = execute_sparql( ... query="SELECT ?s WHERE { ?s a ?o } LIMIT 5", ... endpoint="https://sparql.wikipathways.org/sparql/", ... ) >>> result["row_count"] 5 """ from rdfsolve.query import execute_sparql as _execute qr = _execute( query=query, endpoint=endpoint, method=method, timeout=timeout, variable_map=variable_map or {}, ) return qr.model_dump()
[docs] def resolve_iris( iris: list[str], endpoints: list[dict[str, Any]], timeout: int = 15, ) -> dict[str, Any]: """Resolve IRIs against SPARQL endpoints to discover their rdf:type. This is a pure-Python function- no Flask required. It delegates to :func:`rdfsolve.iri.resolve_iris`. Args: iris: List of IRI strings to resolve. endpoints: List of endpoint dicts, each with keys ``name``, ``endpoint``, and optionally ``graph``. timeout: Per-endpoint timeout in seconds. Returns: Dict with keys ``resolved``, ``not_found``, ``errors``. Example:: >>> from rdfsolve.api import resolve_iris >>> result = resolve_iris( ... iris=["http://identifiers.org/ncbigene/1234"], ... endpoints=[{ ... "name": "wikipathways", ... "endpoint": "https://sparql.wikipathways.org/sparql/", ... }], ... ) >>> result["resolved"] {...} """ from rdfsolve.iri import resolve_iris as _resolve return _resolve(iris=iris, endpoints=endpoints, timeout=timeout)
[docs] def compose_query_from_paths( paths: list[dict[str, Any]], prefixes: dict[str, str] | None = None, include_types: bool = False, include_labels: bool = True, limit: int = 100, value_bindings: dict[str, list[str]] | None = None, ) -> dict[str, Any]: """Generate a SPARQL query from diagram paths. This is a pure-Python function- no Flask required. It delegates to :func:`rdfsolve.compose.compose_query_from_paths`. Args: paths: List of path dicts, each with an ``edges`` list. Each edge has ``source``, ``target``, ``predicate``, and ``is_forward``. prefixes: Namespace prefix map (e.g. ``{"wp": "http://..."}``). include_types: Add ``rdf:type`` assertions. include_labels: Add ``OPTIONAL rdfs:label`` clauses. limit: LIMIT for the generated query. value_bindings: VALUES clause bindings ``{var: [uri, ...]}``. Returns: Dict with ``query`` (SPARQL string), ``variable_map`` (var -> schema URI), and ``jsonld`` (SPARQLExecutable JSON-LD). Example:: >>> from rdfsolve.api import compose_query_from_paths >>> result = compose_query_from_paths( ... paths=[{"edges": [{ ... "source": "http://ex.org/Gene", ... "target": "http://ex.org/Protein", ... "predicate": "http://ex.org/encodes", ... "is_forward": True, ... }]}], ... prefixes={"ex": "http://ex.org/"}, ... ) >>> print(result["query"]) PREFIX ex: <http://ex.org/> ... """ from rdfsolve.compose import compose_query_from_paths as _compose return _compose( paths=paths, prefixes=prefixes or {}, options={ "include_types": include_types, "include_labels": include_labels, "limit": limit, "value_bindings": value_bindings or {}, }, )
[docs] def probe_instance_mapping( prefix: str, sources_csv: str | None = None, *, sources: str | None = None, predicate: str = "http://www.w3.org/2004/02/skos/core#narrowMatch", dataset_names: list[str] | None = None, timeout: float = 60.0, ) -> dict[str, Any]: """Probe SPARQL endpoints for a bioregistry resource and return JSON-LD. For every dataset in *sources* (or the subset in *dataset_names*), queries the endpoint for RDF classes whose instances match the resource's known URI prefixes. Generates pairwise ``skos:narrowMatch`` edges (or *predicate* override) between classes across different datasets and returns the result as a JSON-LD mapping document. The returned dict has the same structure as a mined schema JSON-LD (``@context`` + ``@graph`` + ``@about``) and can be saved directly to ``docker/schemas/`` for auto-import on Flask startup. Args: prefix: Bioregistry prefix, e.g. ``"ensembl"``. sources_csv: **Deprecated** - use *sources* instead. sources: Path to the sources file (JSON-LD or CSV). When ``None``, auto-detects the default file. predicate: Mapping predicate URI. Defaults to ``skos:narrowMatch``. dataset_names: Restrict probing to these dataset names. timeout: SPARQL request timeout in seconds. Returns: JSON-LD ``dict`` with ``@context``, ``@graph``, ``@about``. Raises: ValueError: If *prefix* is unknown to bioregistry. """ from rdfsolve.instance_matcher import probe_resource from rdfsolve.sources import load_sources_dataframe src_path = sources or sources_csv or None datasources = load_sources_dataframe(src_path) mapping = probe_resource( prefix=prefix, datasources=datasources, predicate=predicate, dataset_names=dataset_names, timeout=timeout, ) return mapping.to_jsonld()
def _merge_instance_mapping_jsonld( existing: dict[str, Any], new: dict[str, Any], ) -> dict[str, Any]: """Merge *new* instance-mapping JSON-LD into *existing* in-place. Delegates to :func:`rdfsolve.mapping_models.instance.merge_instance_jsonld`. """ from rdfsolve.mapping_models.instance import merge_instance_jsonld return merge_instance_jsonld(existing, new)
[docs] def seed_instance_mappings( prefixes: list[str], sources_csv: str | None = None, *, sources: str | None = None, output_dir: str = "docker/mappings/instance_matching", predicate: str = "http://www.w3.org/2004/02/skos/core#narrowMatch", dataset_names: list[str] | None = None, timeout: float = 60.0, skip_existing: bool = False, ) -> dict[str, Any]: """Probe multiple bioregistry resources and write mapping JSON-LD files. Iterates over *prefixes*, runs :func:`probe_instance_mapping` for each, and writes the result to ``{output_dir}/{prefix}_instance_mapping.jsonld``. When a file already exists on disk the new probe results are **merged** into it rather than overwriting it: * New ``@graph`` nodes (source classes not yet in the file) are appended. * For existing source nodes, new predicate->target entries are added; duplicates are silently skipped. * ``uri_formats_queried`` in ``@about`` is unioned. * ``pattern_count`` and ``generated_at`` are refreshed. The default behaviour (``skip_existing=False``) is to always probe and merge. Pass ``skip_existing=True`` only when you explicitly want to skip prefixes whose output file already exists without re-probing. Args: prefixes: List of bioregistry prefixes to process. sources_csv: **Deprecated** - use *sources* instead. sources: Path to the sources file (JSON-LD or CSV). When ``None``, auto-detects the default file. output_dir: Directory where JSON-LD files are written (created if absent). predicate: Mapping predicate URI. dataset_names: Restrict probing to these dataset names. timeout: SPARQL request timeout per request. skip_existing: If ``True``, skip prefixes whose output file already exists without re-probing. Defaults to ``False`` (always probe and merge). Returns: Summary dict: ``{"succeeded": [...], "failed": [...]}``. """ import json as _json from rdfsolve.instance_matcher import probe_resource from rdfsolve.sources import load_sources_dataframe out = Path(output_dir) out.mkdir(parents=True, exist_ok=True) src_path = sources or sources_csv or None datasources = load_sources_dataframe(src_path) succeeded: list[str] = [] failed: list[dict[str, str]] = [] for prefix in prefixes: logger.info("Querying prefix: %s", prefix) outfile = out / f"{prefix}_instance_mapping.jsonld" if skip_existing and outfile.exists(): logger.info( "Skipping %s: already exists at %s (skip_existing=True)", prefix, outfile, ) succeeded.append(prefix) continue try: mapping = probe_resource( prefix=prefix, datasources=datasources, predicate=predicate, dataset_names=dataset_names, timeout=timeout, ) new_jsonld = mapping.to_jsonld() if outfile.exists(): try: existing_jsonld = _json.loads(outfile.read_text()) merged = _merge_instance_mapping_jsonld(existing_jsonld, new_jsonld) outfile.write_text(_json.dumps(merged, indent=2)) logger.info("Merged into existing: %s", outfile) except Exception as merge_exc: logger.warning( "Could not merge into %s (%s); overwriting.", outfile, merge_exc, ) outfile.write_text(_json.dumps(new_jsonld, indent=2)) logger.info("Overwritten: %s", outfile) else: outfile.write_text(_json.dumps(new_jsonld, indent=2)) logger.info("Written: %s", outfile) succeeded.append(prefix) except Exception as exc: logger.error("Failed %s: %s", prefix, exc) failed.append({"prefix": prefix, "error": str(exc)}) return {"succeeded": succeeded, "failed": failed}
# ── SeMRA import API ─────────────────────────────────────────────
[docs] def import_semra_source( source: str, keep_prefixes: list[str] | None = None, output_dir: str = "docker/mappings/semra", ) -> dict[str, Any]: """Import mappings from a SeMRA source and write one JSON-LD per prefix. Delegates to :func:`rdfsolve.semra_converter.import_source`. Args: source: SeMRA source key (e.g. ``"biomappings"``). keep_prefixes: Optional prefix filter. output_dir: Directory for output files. Returns: Summary dict ``{"succeeded", "failed", "skipped"}``. """ from rdfsolve.semra_converter import import_source return import_source( source=source, keep_prefixes=keep_prefixes, output_dir=output_dir, )
[docs] def seed_semra_mappings( sources: list[str], keep_prefixes: list[str] | None = None, output_dir: str = "docker/mappings/semra", ) -> dict[str, Any]: """Seed semra mapping files for multiple sources. Calls :func:`import_semra_source` for each entry in *sources* and aggregates the results. Args: sources: List of SeMRA source keys (e.g. ``["biomappings", "gilda"]``). keep_prefixes: Optional shared prefix filter applied to all sources. output_dir: Directory for output files. Returns: Aggregated summary with keys ``"succeeded"``, ``"failed"``, ``"skipped"``. """ succeeded: list[str] = [] failed: list[dict[str, str]] = [] skipped: list[str] = [] for source in sources: result = import_semra_source( source=source, keep_prefixes=keep_prefixes, output_dir=output_dir, ) succeeded.extend(result.get("succeeded", [])) failed.extend(result.get("failed", [])) skipped.extend(result.get("skipped", [])) return {"succeeded": succeeded, "failed": failed, "skipped": skipped}
[docs] def load_mapping_jsonld(path: str) -> dict[str, Any]: """Load a mapping JSON-LD file from disk. Args: path: Path to a ``.jsonld`` file. Returns: Parsed JSON dict. """ result: dict[str, Any] = json.loads(Path(path).read_text(encoding="utf-8")) return result
[docs] def infer_mappings( input_paths: list[str], output_path: str, *, inversion: bool = True, transitivity: bool = True, generalisation: bool = False, chain_cutoff: int = 3, dataset_name: str | None = None, ) -> dict[str, Any]: """Run the SeMRA inference pipeline over mapping JSON-LD files. Thin wrapper around :func:`rdfsolve.inference.infer_mappings`. See that function for full documentation. Args: input_paths: Paths to input mapping JSON-LD files. output_path: Path to write the inferenced mapping JSON-LD. inversion: Apply symmetric inversion. transitivity: Apply transitive chain inference. generalisation: Apply generalisation. chain_cutoff: Max chain length for transitivity. dataset_name: Override for ``@about.dataset_name``. Returns: Summary dict with ``"input_edges"``, ``"output_edges"``, ``"inference_types"``, ``"output_path"``. """ from rdfsolve.inference import infer_mappings as _infer return _infer( input_paths=input_paths, output_path=output_path, inversion=inversion, transitivity=transitivity, generalisation=generalisation, chain_cutoff=chain_cutoff, dataset_name=dataset_name, )
[docs] def seed_inferenced_mappings( input_dir: str = "docker/mappings", output_dir: str = "docker/mappings/inferenced", output_name: str = "inferenced_mappings", inversion: bool = True, transitivity: bool = True, generalisation: bool = False, chain_cutoff: int = 3, ) -> dict[str, Any]: """Infer over all mappings in *input_dir* and write to *output_dir*. Thin wrapper around :func:`rdfsolve.inference.seed_inferenced_mappings`. Args: input_dir: Directory containing mapping subdirs. output_dir: Directory for output. output_name: Stem for the output file. inversion: Apply inversion inference. transitivity: Apply transitivity inference. generalisation: Apply generalisation. chain_cutoff: Max chain length. Returns: Summary dict from :func:`infer_mappings`. """ from rdfsolve.inference import ( seed_inferenced_mappings as _seed, ) return _seed( input_dir=input_dir, output_dir=output_dir, output_name=output_name, inversion=inversion, transitivity=transitivity, generalisation=generalisation, chain_cutoff=chain_cutoff, )
[docs] def import_sssom_source( entry: dict[str, Any], output_dir: str = "docker/mappings/sssom", ) -> dict[str, Any]: """Download and convert one SSSOM source entry to JSON-LD files. Thin wrapper around :func:`rdfsolve.sssom_importer.import_sssom_source`. For each ``.sssom.tsv`` file found inside the archive at ``entry["url"]``, one JSON-LD file is written to *output_dir*:: {source_name}__{sssom_file_stem}.jsonld Args: entry: Dict with at least ``"name"`` and ``"url"`` keys, as found in ``data/sssom_sources.yaml``. output_dir: Directory to write output JSON-LD files. Returns: Summary dict with keys ``"succeeded"``, ``"failed"``, ``"skipped"``. """ from rdfsolve.sssom_importer import import_sssom_source as _import return _import(entry=entry, output_dir=output_dir)
[docs] def seed_sssom_mappings( sssom_sources_yaml: str = "data/sssom_sources.yaml", output_dir: str = "docker/mappings/sssom", names: list[str] | None = None, ) -> dict[str, Any]: """Seed SSSOM mapping files for all (or selected) sources. Thin wrapper around :func:`rdfsolve.sssom_importer.seed_sssom_mappings`. Reads *sssom_sources_yaml*, optionally filters to *names*, and calls :func:`import_sssom_source` for each entry. Args: sssom_sources_yaml: Path to the SSSOM sources YAML file (default: ``data/sssom_sources.yaml``). output_dir: Directory for output JSON-LD files (default: ``docker/mappings/sssom``). names: Optional list of source names to restrict processing; if ``None`` (default), all entries are processed. Returns: Aggregated summary with keys ``"succeeded"``, ``"failed"``, ``"skipped"``. """ from rdfsolve.sssom_importer import seed_sssom_mappings as _seed return _seed( sssom_sources_yaml=sssom_sources_yaml, output_dir=output_dir, names=names, )