Source code for rdfsolve.sources

"""Load data-source definitions from ``data/sources.yaml``.

The canonical source registry is a YAML file containing a flat list
of mappings, one per SPARQL data source.  Each mapping carries:

* **name** - unique human-readable identifier.
* **endpoint** - SPARQL endpoint URL.
* **graph_uris** - named graphs to query.
* **use_graph** - whether to wrap queries in a ``GRAPH`` clause.
* **two_phase** - use two-phase mining (default ``True``).
* Optional tuning knobs: *chunk_size*, *class_batch_size*,
  *class_chunk_size*, *timeout*, *delay*, *counts*, *unsafe_paging*.

Legacy CSV files (``data/sources.csv``) and JSON-LD files are still
accepted: the reader auto-detects the format by extension.

Typical usage::

    from rdfsolve.sources import load_sources

    for src in load_sources("data/sources.yaml"):
        print(src["name"], src["endpoint"])
"""

from __future__ import annotations

import csv
import json
import logging
from pathlib import Path
from typing import Any, TypedDict

import pandas as pd
import yaml

logger = logging.getLogger(__name__)


[docs] class SourceEntry(TypedDict, total=False): """Typed dictionary for a single data-source definition.""" name: str endpoint: str void_iri: str graph_uris: list[str] use_graph: bool two_phase: bool chunk_size: int class_batch_size: int class_chunk_size: int | None timeout: float delay: float counts: bool unsafe_paging: bool notes: str
# ── default path ────────────────────────────────────────────────── _REPO_ROOT = Path(__file__).resolve().parent.parent.parent DEFAULT_SOURCES_YAML = _REPO_ROOT / "data" / "sources.yaml" DEFAULT_SOURCES_JSONLD = _REPO_ROOT / "data" / "sources.jsonld" DEFAULT_SOURCES_CSV = _REPO_ROOT / "data" / "sources.csv" def _default_sources_path() -> Path: """Return the default sources file, preferring YAML.""" if DEFAULT_SOURCES_YAML.exists(): return DEFAULT_SOURCES_YAML if DEFAULT_SOURCES_JSONLD.exists(): return DEFAULT_SOURCES_JSONLD return DEFAULT_SOURCES_CSV # ── loading ───────────────────────────────────────────────────────
[docs] def load_sources( path: str | Path | None = None, ) -> list[SourceEntry]: """Load data-source definitions from a YAML, JSON-LD, or CSV file. Parameters ---------- path: Path to the sources file. When ``None`` the default ``data/sources.yaml`` (or ``.jsonld`` / ``.csv`` fallback) is used. Returns ------- list[SourceEntry] One dict per data source, keys normalised to snake_case. Sources without an ``endpoint`` are included (callers may skip them). """ p = Path(path) if path is not None else _default_sources_path() suffix = p.suffix.lower() if suffix in (".yaml", ".yml"): return _load_yaml(p) if suffix in (".jsonld", ".json"): return _load_jsonld(p) if suffix == ".csv": return _load_csv(p) raise ValueError( f"Unsupported sources file format {suffix!r}: expected .yaml, .yml, .jsonld, .json, or .csv" )
# ── YAML reader ─────────────────────────────────────────────────── def _load_yaml(path: Path) -> list[SourceEntry]: with open(path, encoding="utf-8") as fh: nodes = yaml.safe_load(fh) if not isinstance(nodes, list): raise ValueError(f"Expected a YAML list of source mappings in {path}") entries: list[SourceEntry] = [] for node in nodes: entry = _yaml_node_to_entry(node) entries.append(entry) logger.info("Loaded %d sources from %s", len(entries), path) return entries def _yaml_node_to_entry(node: dict[str, Any]) -> SourceEntry: """Convert a single YAML mapping to a SourceEntry.""" e: SourceEntry = {} e["name"] = node.get("name", "") e["endpoint"] = node.get("endpoint", "") e["void_iri"] = node.get("void_iri", "") raw_g = node.get("graph_uris", []) if isinstance(raw_g, str): raw_g = [raw_g] e["graph_uris"] = list(raw_g) e["use_graph"] = bool(node.get("use_graph", False)) e["two_phase"] = bool(node.get("two_phase", True)) e["counts"] = bool(node.get("counts", True)) e["unsafe_paging"] = bool(node.get("unsafe_paging", False)) for int_key in ( "chunk_size", "class_batch_size", "class_chunk_size", ): if int_key in node and node[int_key] is not None: e[int_key] = int(node[int_key]) for float_key in ("timeout", "delay"): if float_key in node and node[float_key] is not None: e[float_key] = float(node[float_key]) if "notes" in node: e["notes"] = str(node["notes"]) # Pass through download_* and local_endpoint fields so that # scripts (e.g. mine_local.py generate-qleverfile) can see them. for key in node: if key.startswith("download_") or key == "local_endpoint": e[key] = node[key] # type: ignore[literal-required] return e # ── JSON-LD reader ──────────────────────────────────────────────── def _load_jsonld(path: Path) -> list[SourceEntry]: with open(path, encoding="utf-8") as fh: doc = json.load(fh) graph = doc.get("@graph", []) entries: list[SourceEntry] = [] for node in graph: entry = _node_to_entry(node) entries.append(entry) logger.info("Loaded %d sources from %s", len(entries), path) return entries def _node_to_entry(node: dict[str, Any]) -> SourceEntry: """Convert a single JSON-LD ``@graph`` node to a SourceEntry.""" e: SourceEntry = {} e["name"] = node.get("name", "") # endpoint can be a plain string or {"@id": "…"} ep = node.get("endpoint", "") if isinstance(ep, dict): ep = ep.get("@id", "") e["endpoint"] = ep # void_iri - same treatment vi = node.get("void_iri", "") if isinstance(vi, dict): vi = vi.get("@id", "") e["void_iri"] = vi # graph_uris- normalise to list[str] raw_g = node.get("graph_uris", []) if isinstance(raw_g, str): raw_g = [raw_g] e["graph_uris"] = [(g["@id"] if isinstance(g, dict) else g) for g in raw_g] # booleans e["use_graph"] = bool(node.get("use_graph", False)) e["two_phase"] = bool(node.get("two_phase", True)) e["counts"] = bool(node.get("counts", True)) e["unsafe_paging"] = bool(node.get("unsafe_paging", False)) # optional numeric overrides (only set when present) for int_key in ("chunk_size", "class_batch_size", "class_chunk_size"): if int_key in node and node[int_key] is not None: e[int_key] = int(node[int_key]) for float_key in ("timeout", "delay"): if float_key in node and node[float_key] is not None: e[float_key] = float(node[float_key]) if "notes" in node: e["notes"] = str(node["notes"]) return e # ── CSV reader (legacy) ────────────────────────────────────────── def _load_csv(path: Path) -> list[SourceEntry]: with open(path, newline="", encoding="utf-8") as fh: reader = csv.DictReader(fh) rows = list(reader) entries: list[SourceEntry] = [] for row in rows: e: SourceEntry = {} e["name"] = (row.get("dataset_name") or "").strip() e["endpoint"] = (row.get("endpoint_url") or "").strip() e["void_iri"] = (row.get("void_iri") or "").strip() graph_uri = (row.get("graph_uri") or "").strip() e["graph_uris"] = [graph_uri] if graph_uri else [] e["use_graph"] = (row.get("use_graph") or "").strip().lower() in ("true", "1", "yes") # two_phase defaults to True unless explicitly off tp = (row.get("two_phase") or "").strip().lower() e["two_phase"] = tp not in ("false", "0", "no") entries.append(e) logger.info("Loaded %d sources from CSV %s", len(entries), path) return entries # ── DataFrame conversion (for instance_matcher compat) ────────────
[docs] def load_sources_dataframe( path: str | Path | None = None, ) -> pd.DataFrame: """Load sources and return a :class:`~pandas.DataFrame`. The DataFrame has columns compatible with :func:`~rdfsolve.instance_matcher.probe_resource`: ``dataset_name``, ``endpoint_url``, ``graph_uri``, ``use_graph``, ``void_iri``. Parameters ---------- path: Path to the sources file. ``None`` = auto-detect default. """ entries = load_sources(path) rows = [] for e in entries: rows.append( { "dataset_name": e.get("name", ""), "endpoint_url": e.get("endpoint", ""), "graph_uri": e["graph_uris"][0] if e.get("graph_uris") else "", "void_iri": e.get("void_iri", ""), "use_graph": e.get("use_graph", False), } ) return pd.DataFrame(rows)