"""
Schema Miner - extract RDF schema patterns via simple SELECT queries.
Instead of building VoID on the endpoint with heavy CONSTRUCT + BIND
queries, this module runs three lightweight SELECT DISTINCT queries
and assembles the schema in Python:
1. **Typed-object patterns**::
SELECT DISTINCT ?sc ?p ?oc WHERE {
?s ?p ?o . ?s a ?sc . ?o a ?oc .
}
2. **Literal patterns** (datatype properties)::
SELECT DISTINCT ?sc ?p (DATATYPE(?o) AS ?dt) WHERE {
?s ?p ?o . ?s a ?sc . FILTER(isLiteral(?o))
}
3. **Untyped-URI patterns** (URI objects without ``rdf:type``)::
SELECT DISTINCT ?sc ?p WHERE {
?s ?p ?o . ?s a ?sc .
FILTER(isURI(?o))
FILTER NOT EXISTS { ?o a ?any }
}
All queries use OFFSET / LIMIT pagination via
:meth:`SparqlHelper.select_chunked`.
The primary export is :class:`MinedSchema` (-> JSON-LD). It can also
be converted to a VoID graph for downstream LinkML / SHACL / RDF-config
export via :class:`~rdfsolve.parser.VoidParser`.
"""
from __future__ import annotations
import json
import logging
import os
import sys
import time
from collections.abc import Callable
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any
from rdfsolve.models import (
AboutMetadata,
MinedSchema,
MiningReport,
OneShotQueryResult,
PhaseReport,
QueryStats,
SchemaPattern,
)
from rdfsolve.sparql_helper import PaginationTruncatedError, SparqlHelper
from rdfsolve.utils import get_local_name, pick_label
from rdfsolve.version import VERSION
if TYPE_CHECKING:
from rdfsolve.sources import SourceEntry
logger = logging.getLogger(__name__)
__all__ = [
"SchemaMiner",
"_mine_one_source",
"_resolve_source_overrides",
"_write_schema_outputs",
"mine_schema",
]
# -------------------------------------------------------------------
# SPARQL query templates (braces pre-escaped for str.format)
# -------------------------------------------------------------------
def _graph_clause(
graph_uris: list[str] | None,
) -> tuple[str, str]:
"""Return (open, close) strings for an optional GRAPH clause.
If *graph_uris* is ``None`` -> empty strings (default graph).
If a single URI -> ``GRAPH <uri> {`` / ``}``.
If multiple -> VALUES-based pattern.
"""
if not graph_uris:
return "", ""
if len(graph_uris) == 1:
return f"GRAPH <{graph_uris[0]}> {{", "}"
# Multiple graphs - use VALUES
values = " ".join(f"(<{u}>)" for u in graph_uris)
open_ = f"VALUES (?_g) {{ {values} }} GRAPH ?_g {{"
return open_, "}"
def _build_typed_object_query(
graph_uris: list[str] | None,
) -> str:
"""Query 1: typed-object patterns (``?o a ?oc``)."""
g_open, g_close = _graph_clause(graph_uris)
q = f"""\
SELECT DISTINCT ?sc ?p ?oc
WHERE {{
{g_open}
?s ?p ?o .
?s a ?sc .
?o a ?oc .
{g_close}
}}"""
return SparqlHelper.prepare_paginated_query(q)
def _build_literal_query(
graph_uris: list[str] | None,
) -> str:
"""Query 2: literal patterns with datatype."""
g_open, g_close = _graph_clause(graph_uris)
q = f"""\
SELECT DISTINCT ?sc ?p ?dt
WHERE {{
{g_open}
?s ?p ?o .
?s a ?sc .
FILTER(isLiteral(?o))
BIND(DATATYPE(?o) AS ?dt)
{g_close}
}}"""
return SparqlHelper.prepare_paginated_query(q)
def _build_untyped_uri_query(
graph_uris: list[str] | None,
) -> str:
"""Query 3: URI objects that lack an explicit ``rdf:type``."""
g_open, g_close = _graph_clause(graph_uris)
q = f"""\
SELECT DISTINCT ?sc ?p
WHERE {{
{g_open}
?s ?p ?o .
?s a ?sc .
FILTER(isURI(?o))
FILTER NOT EXISTS {{ ?o a ?any }}
{g_close}
}}"""
return SparqlHelper.prepare_paginated_query(q)
def _build_typed_object_query_plain(
graph_uris: list[str] | None,
) -> str:
"""Query 1 - typed-object patterns, no LIMIT/OFFSET placeholders.
Intended for one-shot execution against engines (e.g. QLever)
that can return an unbounded result set in a single response.
"""
g_open, g_close = _graph_clause(graph_uris)
return f"""\
SELECT DISTINCT ?sc ?p ?oc
WHERE {{
{g_open}
?s ?p ?o .
?s a ?sc .
?o a ?oc .
{g_close}
}}"""
def _build_literal_query_plain(
graph_uris: list[str] | None,
) -> str:
"""Query 2 - literal patterns, no LIMIT/OFFSET placeholders."""
g_open, g_close = _graph_clause(graph_uris)
return f"""\
SELECT DISTINCT ?sc ?p ?dt
WHERE {{
{g_open}
?s ?p ?o .
?s a ?sc .
FILTER(isLiteral(?o))
BIND(DATATYPE(?o) AS ?dt)
{g_close}
}}"""
def _build_untyped_uri_query_plain(
graph_uris: list[str] | None,
) -> str:
"""Query 3 - untyped-URI patterns, no LIMIT/OFFSET placeholders."""
g_open, g_close = _graph_clause(graph_uris)
return f"""\
SELECT DISTINCT ?sc ?p
WHERE {{
{g_open}
?s ?p ?o .
?s a ?sc .
FILTER(isURI(?o))
FILTER NOT EXISTS {{ ?o a ?any }}
{g_close}
}}"""
def _build_label_query(
uris: list[str],
graph_uris: list[str] | None,
) -> str:
"""Fetch labels for a set of URIs.
Returns bindings with ``?uri``, ``?rdfsLabel``, ``?dcTitle``,
``?iaoLabel``, ``?skosPrefLabel``, ``?skosAltLabel``.
Priority is resolved in Python via :func:`pick_label`.
"""
values = " ".join(f"(<{u}>)" for u in uris)
g_open, g_close = _graph_clause(graph_uris)
q = f"""\
SELECT ?uri ?rdfsLabel ?dcTitle ?iaoLabel ?skosPrefLabel ?skosAltLabel
WHERE {{
VALUES (?uri) {{ {values} }}
{g_open}
OPTIONAL {{ ?uri <http://www.w3.org/2000/01/rdf-schema#label> ?rdfsLabel . }}
OPTIONAL {{ ?uri <http://purl.org/dc/elements/1.1/title> ?dcTitle . }}
OPTIONAL {{ ?uri <http://purl.org/dc/terms/title> ?dcTitle . }}
OPTIONAL {{ ?uri <http://purl.obolibrary.org/obo/IAO_0000118> ?iaoLabel . }}
OPTIONAL {{ ?uri <http://www.w3.org/2004/02/skos/core#prefLabel> ?skosPrefLabel . }}
OPTIONAL {{ ?uri <http://www.w3.org/2004/02/skos/core#altLabel> ?skosAltLabel . }}
{g_close}
}}"""
return q
# -------------------------------------------------------------------
# Two-phase query builders (class-scoped, for large endpoints)
# -------------------------------------------------------------------
def _build_class_discovery_query(
graph_uris: list[str] | None,
) -> str:
"""Discover all distinct rdf:type classes (paginated template)."""
g_open, g_close = _graph_clause(graph_uris)
q = f"""\
SELECT DISTINCT ?class
WHERE {{
{g_open}
?s a ?class .
{g_close}
}}"""
return SparqlHelper.prepare_paginated_query(q)
def _build_class_discovery_query_plain(
graph_uris: list[str] | None,
) -> str:
"""Discover all distinct rdf:type classes (single shot)."""
g_open, g_close = _graph_clause(graph_uris)
return f"""\
SELECT DISTINCT ?class
WHERE {{
{g_open}
?s a ?class .
{g_close}
}}"""
def _build_class_typed_object_query(
class_uri: str,
graph_uris: list[str] | None,
) -> str:
"""Per-class typed-object patterns."""
g_open, g_close = _graph_clause(graph_uris)
return f"""\
SELECT DISTINCT ?p ?oc
WHERE {{
{g_open}
?s a <{class_uri}> .
?s ?p ?o .
?o a ?oc .
{g_close}
}}"""
def _build_class_literal_query(
class_uri: str,
graph_uris: list[str] | None,
) -> str:
"""Per-class literal patterns."""
g_open, g_close = _graph_clause(graph_uris)
return f"""\
SELECT DISTINCT ?p (DATATYPE(?o) AS ?dt)
WHERE {{
{g_open}
?s a <{class_uri}> .
?s ?p ?o .
FILTER(isLiteral(?o))
{g_close}
}}"""
def _build_class_untyped_uri_query(
class_uri: str,
graph_uris: list[str] | None,
) -> str:
"""Per-class untyped-URI patterns."""
g_open, g_close = _graph_clause(graph_uris)
return f"""\
SELECT DISTINCT ?p
WHERE {{
{g_open}
?s a <{class_uri}> .
?s ?p ?o .
FILTER(isURI(?o))
FILTER NOT EXISTS {{ ?o a ?any }}
{g_close}
}}"""
# ---- batched two-phase query builders (VALUES) --------------------
def _build_properties_for_class_query(
class_uri: str,
graph_uris: list[str] | None,
paginated: bool = False,
drop_distinct: bool = False,
) -> str:
"""Enumerate all distinct properties used by instances of *class_uri*.
This is a cheap single-hop query (one join) used as the first step of
the property-first decomposition fallback for expensive classes.
Parameters
----------
paginated:
When ``True`` returns a template with ``{offset}`` / ``{limit}``
placeholders.
drop_distinct:
When ``True`` omits ``DISTINCT`` from paginated queries; the
caller deduplicates in Python. Only active when
``paginated=True``. See
:func:`_build_batched_typed_object_query` for caveats.
"""
g_open, g_close = _graph_clause(graph_uris)
distinct = "" if (paginated and drop_distinct) else "DISTINCT "
q = f"""\
SELECT {distinct}?p
WHERE {{
{g_open}
?s a <{class_uri}> .
?s ?p ?o .
{g_close}
}}"""
if paginated:
return SparqlHelper.prepare_paginated_query(q)
return q
def _build_typed_object_for_class_property_query(
class_uri: str,
prop_uri: str,
graph_uris: list[str] | None,
paginated: bool = False,
drop_distinct: bool = False,
) -> str:
"""Enumerate typed-object classes for a single *(class, property)* pair.
Two-hop query but scoped to one property, so Virtuoso can use
the property index and stays well under the cost limit.
Parameters
----------
paginated:
When ``True`` returns a template with ``{offset}`` / ``{limit}``
placeholders.
drop_distinct:
When ``True`` omits ``DISTINCT`` from paginated queries; the
caller deduplicates in Python. Only active when
``paginated=True``. See
:func:`_build_batched_typed_object_query` for caveats.
"""
g_open, g_close = _graph_clause(graph_uris)
distinct = "" if (paginated and drop_distinct) else "DISTINCT "
q = f"""\
SELECT {distinct}?oc
WHERE {{
{g_open}
?s a <{class_uri}> .
?s <{prop_uri}> ?o .
?o a ?oc .
{g_close}
}}"""
if paginated:
return SparqlHelper.prepare_paginated_query(q)
return q
# Page size for property-first decomposition fallback. Large enough
# to collect most property lists in one or two pages, small enough to
# stay under per-page cost limits on Virtuoso-style endpoints.
_DECOMP_CHUNK = 1_000 # lmin
def _values_block(class_uris: list[str]) -> str:
"""Build a ``VALUES ?class { <u1> <u2> … }`` clause."""
entries = " ".join(f"<{u}>" for u in class_uris)
return f"VALUES ?class {{ {entries} }}"
def _build_batched_typed_object_query(
class_uris: list[str],
graph_uris: list[str] | None,
paginated: bool = False,
drop_distinct: bool = False,
) -> str:
"""Typed-object patterns for a batch of classes.
The ``VALUES`` clause is placed **inside** the ``GRAPH``
block so that the binding is visible to the triple patterns
on endpoints that enforce strict scoping (IDSM, QLever, …).
Parameters
----------
paginated:
When ``True`` the returned string is a template with
``{offset}`` / ``{limit}`` placeholders suitable for
:meth:`~rdfsolve.sparql_helper.SparqlHelper.select_chunked`.
drop_distinct:
When ``True`` omits ``DISTINCT`` from the query. On some
endpoints (e.g. Virtuoso) ``DISTINCT`` combined with
``OFFSET`` forces a full table scan on every page; dropping
it lets the engine page cheaply at the cost of duplicate rows
that the caller must deduplicate in Python. This is
**unsafe** in general (join cardinality may increase on other
engines) and is only activated via ``--unsafe-paging``.
Has no effect when ``paginated=False``; the non-paginated
single-shot query always retains ``DISTINCT``.
"""
g_open, g_close = _graph_clause(graph_uris)
values = _values_block(class_uris)
distinct = "" if (paginated and drop_distinct) else "DISTINCT "
q = f"""\
SELECT {distinct}?class ?p ?oc
WHERE {{
{g_open}
{values}
?s a ?class .
?s ?p ?o .
?o a ?oc .
{g_close}
}}"""
if paginated:
return SparqlHelper.prepare_paginated_query(q)
return q
def _build_batched_literal_query(
class_uris: list[str],
graph_uris: list[str] | None,
paginated: bool = False,
drop_distinct: bool = False,
) -> str:
"""Literal patterns for a batch of classes.
``VALUES`` is inside the ``GRAPH`` block - see
:func:`_build_batched_typed_object_query` for rationale.
Parameters
----------
paginated:
When ``True`` returns a template with ``{offset}`` /
``{limit}`` placeholders for
:meth:`~rdfsolve.sparql_helper.SparqlHelper.select_chunked`.
drop_distinct:
When ``True`` omits ``DISTINCT`` from paginated queries.
See :func:`_build_batched_typed_object_query` for caveats.
Only active when ``paginated=True``.
"""
g_open, g_close = _graph_clause(graph_uris)
values = _values_block(class_uris)
distinct = "" if (paginated and drop_distinct) else "DISTINCT "
q = f"""\
SELECT {distinct}?class ?p (DATATYPE(?o) AS ?dt)
WHERE {{
{g_open}
{values}
?s a ?class .
?s ?p ?o .
FILTER(isLiteral(?o))
{g_close}
}}"""
if paginated:
return SparqlHelper.prepare_paginated_query(q)
return q
def _build_batched_untyped_uri_query(
class_uris: list[str],
graph_uris: list[str] | None,
paginated: bool = False,
drop_distinct: bool = False,
) -> str:
"""Untyped-URI patterns for a batch of classes.
``VALUES`` is inside the ``GRAPH`` block - see
:func:`_build_batched_typed_object_query` for rationale.
Parameters
----------
paginated:
When ``True`` returns a template with ``{offset}`` /
``{limit}`` placeholders for
:meth:`~rdfsolve.sparql_helper.SparqlHelper.select_chunked`.
drop_distinct:
When ``True`` omits ``DISTINCT`` from paginated queries.
See :func:`_build_batched_typed_object_query` for caveats.
Only active when ``paginated=True``.
"""
g_open, g_close = _graph_clause(graph_uris)
values = _values_block(class_uris)
distinct = "" if (paginated and drop_distinct) else "DISTINCT "
q = f"""\
SELECT {distinct}?class ?p
WHERE {{
{g_open}
{values}
?s a ?class .
?s ?p ?o .
FILTER(isURI(?o))
FILTER NOT EXISTS {{ ?o a ?any }}
{g_close}
}}"""
if paginated:
return SparqlHelper.prepare_paginated_query(q)
return q
# ---- batched count query builders (VALUES) -----------------------
def _build_batched_typed_count_query(
class_uris: list[str],
graph_uris: list[str] | None,
paginated: bool = False,
drop_distinct: bool = False, # accepted for API compat
) -> str:
"""Typed-object COUNT grouped by ``(class, p, oc)`` for a class batch.
``GROUP BY`` replaces ``DISTINCT`` so *drop_distinct* is ignored.
Parameters
----------
paginated:
When ``True`` returns a template with ``{offset}`` /
``{limit}`` placeholders for
:meth:`~rdfsolve.sparql_helper.SparqlHelper.select_chunked`.
"""
g_open, g_close = _graph_clause(graph_uris)
values = _values_block(class_uris)
q = f"""\
SELECT ?class ?p ?oc (COUNT(*) AS ?cnt)
WHERE {{
{g_open}
{values}
?s a ?class .
?s ?p ?o .
?o a ?oc .
{g_close}
}}
GROUP BY ?class ?p ?oc"""
if paginated:
return SparqlHelper.prepare_paginated_query(q)
return q
def _build_batched_literal_count_query(
class_uris: list[str],
graph_uris: list[str] | None,
paginated: bool = False,
drop_distinct: bool = False, # accepted for API compat
) -> str:
"""Literal COUNT grouped by ``(class, p, dt)`` for a class batch.
Parameters
----------
paginated:
When ``True`` returns a template with ``{offset}`` /
``{limit}`` placeholders.
"""
g_open, g_close = _graph_clause(graph_uris)
values = _values_block(class_uris)
q = f"""\
SELECT ?class ?p ?dt (COUNT(*) AS ?cnt)
WHERE {{
{g_open}
{values}
?s a ?class .
?s ?p ?o .
FILTER(isLiteral(?o))
BIND(DATATYPE(?o) AS ?dt)
{g_close}
}}
GROUP BY ?class ?p ?dt"""
if paginated:
return SparqlHelper.prepare_paginated_query(q)
return q
def _build_batched_untyped_count_query(
class_uris: list[str],
graph_uris: list[str] | None,
paginated: bool = False,
drop_distinct: bool = False, # accepted for API compat
) -> str:
"""Untyped-URI COUNT grouped by ``(class, p)`` for a class batch.
Parameters
----------
paginated:
When ``True`` returns a template with ``{offset}`` /
``{limit}`` placeholders.
"""
g_open, g_close = _graph_clause(graph_uris)
values = _values_block(class_uris)
q = f"""\
SELECT ?class ?p (COUNT(*) AS ?cnt)
WHERE {{
{g_open}
{values}
?s a ?class .
?s ?p ?o .
FILTER(isURI(?o))
FILTER NOT EXISTS {{ ?o a ?any }}
{g_close}
}}
GROUP BY ?class ?p"""
if paginated:
return SparqlHelper.prepare_paginated_query(q)
return q
# -------------------------------------------------------------------
# Report collector
# -------------------------------------------------------------------
class _ReportCollector:
"""Accumulates analytics during a mining run.
Writes the JSON report to *report_path* after each phase so
partial data is persisted even if the process crashes.
Also captures resource-usage snapshots (CPU, memory, disk I/O)
at init and finalise time so that every report is self-contained.
"""
def __init__(
self,
report: MiningReport,
report_path: Path | None = None,
) -> None:
"""Set up the collector with *report* and optional *report_path*."""
self._report = report
self._path = report_path
# Resource-usage snapshots (populated in _snapshot_start)
self._t0: float = 0.0
self._cpu0_user: float = 0.0
self._cpu0_sys: float = 0.0
self._io0: dict[str, int] = {}
self._snapshot_start()
# ── Resource snapshots ─────────────────────────────────────────
@staticmethod
def _read_proc_io() -> dict[str, int]:
result: dict[str, int] = {}
try:
with open("/proc/self/io", encoding="utf-8") as f:
for line in f:
key, _, val = line.partition(":")
result[key.strip()] = int(val.strip())
except OSError:
pass
return result
@staticmethod
def _get_rusage() -> tuple[float, float, float]:
import platform as _platform
import resource as _resource
r = _resource.getrusage(_resource.RUSAGE_SELF)
div = 1024 if _platform.system() == "Linux" else 1048576
return r.ru_utime, r.ru_stime, r.ru_maxrss / div
def _snapshot_start(self) -> None:
import time as _time
self._t0 = _time.monotonic()
self._cpu0_user, self._cpu0_sys, _ = self._get_rusage()
self._io0 = self._read_proc_io()
def _collect_machine_info(self) -> dict[str, Any]:
"""Gather static machine info (lightweight)."""
import platform as _platform
info: dict[str, Any] = {
"hostname": _platform.node(),
"os_name": _platform.system(),
"os_release": _platform.release(),
"architecture": _platform.machine(),
"cpu_model": _platform.processor() or "",
"cpu_count_logical": os.cpu_count() or 0,
"python_version": _platform.python_version(),
}
try:
with open("/proc/cpuinfo", encoding="utf-8") as f:
for line in f:
if line.startswith("model name"):
info["cpu_model"] = line.split(":", 1)[1].strip()
break
except OSError:
pass
try:
with open("/proc/meminfo", encoding="utf-8") as f:
for line in f:
if line.startswith("MemTotal"):
kb = int(line.split()[1])
info["ram_total_gb"] = round(
kb / 1048576,
2,
)
break
except OSError:
pass
return info
def _collect_resource_usage(self) -> dict[str, Any]:
"""Capture delta resource usage since init."""
import time as _time
t1 = _time.monotonic()
cpu1_user, cpu1_sys, peak_rss = self._get_rusage()
io1 = self._read_proc_io()
return {
"wall_time_s": round(t1 - self._t0, 3),
"cpu_user_s": round(cpu1_user - self._cpu0_user, 3),
"cpu_system_s": round(cpu1_sys - self._cpu0_sys, 3),
"peak_rss_mb": round(peak_rss, 2),
"read_bytes": (io1.get("read_bytes", 0) - self._io0.get("read_bytes", 0)),
"write_bytes": (io1.get("write_bytes", 0) - self._io0.get("write_bytes", 0)),
}
# ── Query tracking ─────────────────────────────────────────────
def record_query(
self,
purpose: str,
duration_s: float,
success: bool = True,
) -> None:
"""Record one query execution."""
stats = self._report.query_stats.setdefault(
purpose,
QueryStats(),
)
stats.sent += 1
stats.total_time_s += duration_s
self._report.total_queries_sent += 1
if not success:
stats.failed += 1
self._report.total_queries_failed += 1
# ── Phase tracking ─────────────────────────────────────────────
def start_phase(self, name: str) -> PhaseReport:
"""Start a new phase and return its report object."""
phase = PhaseReport(
name=name,
started_at=datetime.now(timezone.utc).isoformat(),
)
self._report.phases.append(phase)
return phase
def finish_phase(
self,
phase: PhaseReport,
items: int = 0,
error: str | None = None,
) -> None:
"""Mark a phase as finished and flush the report."""
now = datetime.now(timezone.utc)
phase.finished_at = now.isoformat()
if phase.started_at:
started = datetime.fromisoformat(phase.started_at)
phase.duration_s = round(
(now - started).total_seconds(),
3,
)
phase.items_discovered = items
phase.error = error
self.flush()
def set_abort_reason(self, reason: str) -> None:
"""Record why mining was cut short and flush."""
self._report.abort_reason = reason
self.flush()
# ── Finalisation ───────────────────────────────────────────────
def finalise(
self,
pattern_count: int,
class_count: int,
property_count: int,
uris_labelled: int,
) -> MiningReport:
"""Set final summary fields and flush."""
r = self._report
r.finished_at = datetime.now(
timezone.utc,
).isoformat()
if r.started_at:
started = datetime.fromisoformat(r.started_at)
finished = datetime.fromisoformat(r.finished_at)
r.total_duration_s = round(
(finished - started).total_seconds(),
3,
)
r.pattern_count = pattern_count
r.class_count = class_count
r.property_count = property_count
r.unique_uris_labelled = uris_labelled
# Embed machine info and resource usage
r.machine = self._collect_machine_info()
r.benchmark = self._collect_resource_usage()
self.flush()
return r
# ── I/O ────────────────────────────────────────────────────────
def flush(self) -> None:
"""Write current state to disk (if a path was given)."""
if self._path is None:
return
try:
self._path.parent.mkdir(parents=True, exist_ok=True)
self._path.write_text(
json.dumps(
self._report.model_dump(),
indent=2,
default=str,
)
+ "\n",
encoding="utf-8",
)
except OSError as exc:
logger.warning("Could not write report: %s", exc)
@property
def report(self) -> MiningReport:
"""Return the accumulated :class:`MiningReport`."""
return self._report
# -------------------------------------------------------------------
# SchemaMiner
# -------------------------------------------------------------------
[docs]
class SchemaMiner:
"""Mine RDF schema patterns from a SPARQL endpoint.
Parameters
----------
endpoint_url:
SPARQL endpoint URL.
graph_uris:
Optional named-graph URI(s) to restrict queries to.
chunk_size:
Number of rows per paginated request.
class_chunk_size:
Page size for Phase-1 class discovery in two-phase mode.
``None`` disables pagination (single query).
class_batch_size:
Number of classes grouped into one ``VALUES`` query in
Phase-2 of two-phase mining. Default ``15``. Higher
values send fewer queries but each query is heavier.
delay:
Seconds to sleep between pagination requests.
timeout:
HTTP timeout per request (seconds).
counts:
Whether to also run COUNT queries for triple counts.
two_phase:
Use two-phase mining (default). Phase 1 discovers all
``rdf:type`` classes; phase 2 queries properties per
class. Much gentler on heavyweight endpoints like
QLever/PubChem/UniProt. Pass ``False`` for the legacy
single-pass strategy.
filter_service_namespaces:
When ``True`` (the default), remove patterns whose
subject, property, or object URI belongs to a
service/system namespace (Virtuoso, OpenLink, etc.)
from the final result.
untyped_as_classes:
When ``True``, treat untyped URI objects (those without
an explicit ``rdf:type``) as ``owl:Class`` references
instead of the generic ``rdfs:Resource`` sentinel.
Default ``False``.
"""
def __init__(
self,
endpoint_url: str,
graph_uris: str | list[str] | None = None,
chunk_size: int = 10_000,
class_chunk_size: int | None = None,
class_batch_size: int = 15,
delay: float = 0.5,
timeout: float = 120.0,
counts: bool = True,
two_phase: bool = True,
unsafe_paging: bool = False,
report_path: str | Path | None = None,
filter_service_namespaces: bool = True,
untyped_as_classes: bool = False,
authors: list[dict[str, str]] | None = None,
qlever_version: dict[str, str] | None = None,
one_shot: bool = False,
) -> None:
"""Initialize a SchemaMiner."""
self.endpoint_url = endpoint_url
self.graph_uris: list[str] | None = (
[graph_uris] if isinstance(graph_uris, str) else graph_uris
)
self.chunk_size = chunk_size
self.class_chunk_size = class_chunk_size
self.class_batch_size = max(1, class_batch_size)
self.delay = delay
self.timeout = timeout
self.counts = counts
self.two_phase = two_phase
self.unsafe_paging = unsafe_paging
self.filter_service_namespaces = filter_service_namespaces
self.untyped_as_classes = untyped_as_classes
self.authors = authors
self.qlever_version = qlever_version
self.one_shot = one_shot
self._helper = SparqlHelper(
endpoint_url,
timeout=timeout,
)
self._report_path = Path(report_path) if report_path else None
self._rc: _ReportCollector | None = None
self.last_report: MiningReport | None = None
@property
def _report(self) -> _ReportCollector:
"""Return the active report collector (raises if not set)."""
if self._rc is None:
raise RuntimeError("mine() must be called first")
return self._rc
# ---- public API -----------------------------------------------
def _build_strategy_string(self) -> str:
"""Return the strategy tag that describes the active mining flags."""
base = "miner/two-phase" if self.two_phase else "miner"
suffix = ""
if self.counts:
suffix += "+counts"
if self.untyped_as_classes:
suffix += "+untyped-as-classes"
return base + suffix
def _init_report(
self,
dataset_name: str | None,
strategy: str,
started_at: str,
) -> None:
"""Create a :class:`MiningReport` and attach a collector to ``self``."""
report = MiningReport(
dataset_name=dataset_name,
endpoint_url=self.endpoint_url,
graph_uris=self.graph_uris,
strategy=strategy,
rdfsolve_version=VERSION,
python_version=sys.version,
started_at=started_at,
finished_at=None,
total_duration_s=0.0,
total_queries_sent=0,
total_queries_failed=0,
abort_reason=None,
pattern_count=0,
class_count=0,
property_count=0,
unique_uris_labelled=0,
authors=self.authors,
qlever_version=self.qlever_version,
config={
"chunk_size": self.chunk_size,
"class_chunk_size": self.class_chunk_size,
"delay": self.delay,
"timeout": self.timeout,
"counts": self.counts,
"two_phase": self.two_phase,
"one_shot": self.one_shot,
"untyped_as_classes": self.untyped_as_classes,
},
)
self._rc = _ReportCollector(report, self._report_path)
def _run_patterns_phase(
self,
) -> tuple[list[SchemaPattern], list[OneShotQueryResult] | None]:
"""Dispatch to the appropriate pattern-mining strategy.
Returns
-------
(patterns, one_shot_results)
*one_shot_results* is ``None`` unless ``self.one_shot`` is
``True``.
"""
if self.one_shot:
patterns, one_shot_results = self._mine_one_shot()
return patterns, one_shot_results
if self.two_phase:
return self._mine_two_phase(), None
return self._mine_single_pass(), None
def _run_counts_phase(
self,
patterns: list[SchemaPattern],
) -> list[SchemaPattern]:
"""Run the counts phase and return enriched patterns."""
phase = self._report.start_phase("counts")
logger.info("Fetching triple counts …")
try:
patterns = self._enrich_counts(patterns)
self._report.finish_phase(phase, items=len(patterns))
except Exception as exc:
self._report.finish_phase(phase, error=str(exc))
raise
return patterns
def _run_labels_phase(
self,
patterns: list[SchemaPattern],
) -> tuple[list[SchemaPattern], set[str]]:
"""Run the labels phase and return (enriched patterns, uri set)."""
phase = self._report.start_phase("labels")
logger.info("Fetching labels …")
uris_before = self._unique_uris(patterns)
patterns = self._enrich_labels(patterns)
self._report.finish_phase(phase, items=len(uris_before))
return patterns, uris_before
@staticmethod
def _collect_class_property_sets(
patterns: list[SchemaPattern],
) -> tuple[set[str], set[str]]:
"""Return (classes, properties) sets from *patterns*."""
classes: set[str] = set()
properties: set[str] = set()
for p in patterns:
classes.add(p.subject_class)
if p.object_class not in ("Literal", "Resource"):
classes.add(p.object_class)
properties.add(p.property_uri)
return classes, properties
def _build_about_metadata(
self,
dataset_name: str | None,
strategy: str,
started_at: str,
patterns: list[SchemaPattern],
) -> AboutMetadata:
"""Construct :class:`AboutMetadata` from the completed mining run."""
finished_at = self._report.report.finished_at
return AboutMetadata.build(
endpoint=self.endpoint_url,
dataset_name=dataset_name,
graph_uris=self.graph_uris,
pattern_count=len(patterns),
strategy=strategy,
started_at=started_at,
finished_at=finished_at,
total_duration_s=self._report.report.total_duration_s,
authors=self.authors,
qlever_version=self.qlever_version,
)
@staticmethod
def _apply_namespace_filter(schema: MinedSchema) -> MinedSchema:
"""Strip service-namespace patterns and log if any were dropped."""
before = len(schema.patterns)
schema = schema.filter_service_namespaces()
dropped = before - len(schema.patterns)
if dropped:
logger.info(
"Filtered %d service-namespace patterns (%d -> %d)",
dropped,
before,
len(schema.patterns),
)
return schema
[docs]
def mine(
self,
dataset_name: str | None = None,
) -> MinedSchema:
"""Run all queries and return a :class:`MinedSchema`.
Parameters
----------
dataset_name:
Optional human-readable name attached to the metadata.
Notes
-----
The method also populates a :class:`MiningReport` with
per-phase timing, query counts, and failure stats. If a
*report_path* was given at construction time, the JSON is
flushed to disk after each phase completes.
"""
strategy = self._build_strategy_string()
started_at = datetime.now(timezone.utc).isoformat()
self._init_report(dataset_name, strategy, started_at)
t0 = time.monotonic()
patterns, one_shot_results = self._run_patterns_phase()
if self.counts:
patterns = self._run_counts_phase(patterns)
patterns, uris_before = self._run_labels_phase(patterns)
dt = time.monotonic() - t0
logger.info(
"Mining complete: %d patterns in %.1fs",
len(patterns),
dt,
)
classes, properties = self._collect_class_property_sets(
patterns,
)
self._report.finalise(
pattern_count=len(patterns),
class_count=len(classes),
property_count=len(properties),
uris_labelled=len(uris_before),
)
if one_shot_results is not None:
self._report.report.one_shot_results = one_shot_results
self._report.flush()
self.last_report = self._report.report
about = self._build_about_metadata(
dataset_name,
strategy,
started_at,
patterns,
)
schema = MinedSchema(patterns=patterns, about=about)
if self.filter_service_namespaces:
schema = self._apply_namespace_filter(schema)
return schema
# ---- helpers ---------------------------------------------------
@staticmethod
def _unique_uris(
patterns: list[SchemaPattern],
) -> set[str]:
"""Collect all unique URIs from patterns."""
uris: set[str] = set()
for p in patterns:
uris.add(p.subject_class)
uris.add(p.property_uri)
if p.object_class not in ("Literal", "Resource"):
uris.add(p.object_class)
return uris
# ---- single-pass mining (original) ----------------------------
def _mine_single_pass(self) -> list[SchemaPattern]:
"""Original three-query mining approach."""
patterns: list[SchemaPattern] = []
phase = self._report.start_phase("typed-object")
logger.info("Mining typed-object patterns …")
typed = self._run_typed_object()
patterns.extend(typed)
logger.info(f" -> {len(typed)} typed-object patterns")
self._report.finish_phase(phase, items=len(typed))
phase = self._report.start_phase("literal")
logger.info("Mining literal patterns …")
literals = self._run_literal()
patterns.extend(literals)
logger.info(f" -> {len(literals)} literal patterns")
self._report.finish_phase(phase, items=len(literals))
phase = self._report.start_phase("untyped-uri")
logger.info("Mining untyped-URI patterns …")
untyped = self._run_untyped_uri()
patterns.extend(untyped)
logger.info(f" -> {len(untyped)} untyped-URI patterns")
self._report.finish_phase(phase, items=len(untyped))
return patterns
# ---- one-shot mining (baseline for QLever comparison) ---------
def _parse_one_shot_bindings(
self,
qtype: str,
bindings: list[dict[str, Any]],
oc_default: str,
) -> list[SchemaPattern]:
"""Convert raw SPARQL bindings to :class:`SchemaPattern` objects.
Parameters
----------
qtype:
One of ``"typed-object"``, ``"literal"``, ``"untyped-uri"``.
bindings:
The ``results.bindings`` list from a SPARQL JSON response.
oc_default:
Object-class sentinel used for untyped-URI patterns
(``"Resource"`` or the OWL Class URI).
"""
patterns: list[SchemaPattern] = []
if qtype == "typed-object":
for b in bindings:
sc = b.get("sc", {}).get("value", "")
p = b.get("p", {}).get("value", "")
oc = b.get("oc", {}).get("value", "")
if sc and p and oc:
patterns.append(
SchemaPattern(
subject_class=sc,
property_uri=p,
object_class=oc,
)
)
elif qtype == "literal":
for b in bindings:
sc = b.get("sc", {}).get("value", "")
p = b.get("p", {}).get("value", "")
dt = b.get("dt", {}).get("value")
if sc and p:
patterns.append(
SchemaPattern(
subject_class=sc,
property_uri=p,
object_class="Literal",
datatype=dt if dt else None,
)
)
else: # untyped-uri
for b in bindings:
sc = b.get("sc", {}).get("value", "")
p = b.get("p", {}).get("value", "")
if sc and p:
patterns.append(
SchemaPattern(
subject_class=sc,
property_uri=p,
object_class=oc_default,
)
)
return patterns
def _run_one_shot_query(
self,
qtype: str,
query: str,
) -> tuple[list[dict[str, Any]], OneShotQueryResult]:
"""Execute a single one-shot SELECT and record analytics.
Parameters
----------
qtype:
Query type label (e.g. ``"typed-object"``). Used as the
phase name and report key.
query:
The SPARQL SELECT string to execute.
Returns
-------
(bindings, result)
*bindings* is the raw ``results.bindings`` list (empty on
failure). *result* is a :class:`OneShotQueryResult`
capturing success/failure, timing, and row count.
"""
phase = self._report.start_phase(f"one-shot/{qtype}")
t0 = time.monotonic()
try:
raw = self._helper.select(
query,
purpose=f"one-shot/{qtype}",
)
duration = time.monotonic() - t0
bindings = raw.get("results", {}).get("bindings", [])
row_count = len(bindings)
logger.info(
"One-shot %s: %d rows in %.1fs",
qtype,
row_count,
duration,
)
self._report.record_query(f"one-shot/{qtype}", duration)
self._report.finish_phase(phase, items=row_count)
return bindings, OneShotQueryResult(
query_type=qtype,
success=True,
duration_s=round(duration, 3),
row_count=row_count,
)
except Exception as exc:
duration = time.monotonic() - t0
logger.warning(
"One-shot %s failed after %.1fs: %s",
qtype,
duration,
exc,
)
self._report.record_query(
f"one-shot/{qtype}",
duration,
success=False,
)
self._report.finish_phase(phase, items=0, error=str(exc))
return [], OneShotQueryResult(
query_type=qtype,
success=False,
duration_s=round(duration, 3),
row_count=None,
error=str(exc),
)
def _mine_one_shot(
self,
) -> tuple[list[SchemaPattern], list[OneShotQueryResult]]:
"""Run each pattern query as a single unbounded SELECT.
No LIMIT/OFFSET, no bisection, no pagination fallback.
Intended for local QLever endpoints that can return an
unlimited result set in one HTTP response.
Returns
-------
patterns:
All patterns extracted from successful queries.
Patterns from failed queries are absent (not recovered).
results:
One :class:`OneShotQueryResult` per query type,
recording wall-clock time, row count, and any error.
These are stored in :attr:`MiningReport.one_shot_results`
so they can be compared against the fallback-chain run.
"""
oc_default = (
"http://www.w3.org/2002/07/owl#Class" if self.untyped_as_classes else "Resource"
)
_specs: list[tuple[str, str]] = [
("typed-object", _build_typed_object_query_plain(self.graph_uris)),
("literal", _build_literal_query_plain(self.graph_uris)),
("untyped-uri", _build_untyped_uri_query_plain(self.graph_uris)),
]
patterns: list[SchemaPattern] = []
results: list[OneShotQueryResult] = []
for qtype, query in _specs:
bindings, result = self._run_one_shot_query(qtype, query)
results.append(result)
if result.success:
patterns.extend(
self._parse_one_shot_bindings(
qtype,
bindings,
oc_default,
)
)
return patterns, results
# ---- two-phase mining (for large endpoints) -------------------
def _mine_two_phase(self) -> list[SchemaPattern]:
"""Two-phase mining: discover classes, then query per class.
Phase 1: ``SELECT DISTINCT ?class WHERE { ?s a ?class }``
Phase 2: For each class, run three lightweight queries
scoped to ``?s a <class>``.
This avoids the massive unscoped triple-join that chokes
large endpoints (QLever, PubChem, UniProt, etc.).
When ``class_chunk_size`` is ``None`` the class-discovery
query runs without OFFSET/LIMIT pagination (single shot).
A positive value enables paginated retrieval with that page
size - useful when the endpoint returns *very* many classes.
"""
# Phase 1 - discover classes
p1 = self._report.start_phase("class-discovery")
ccs = self.class_chunk_size
if ccs is None:
logger.info(
"Phase 1: discovering classes (no pagination) …",
)
q = _build_class_discovery_query_plain(
self.graph_uris,
)
t0 = time.monotonic()
try:
result = self._helper.select(
q,
purpose="two-phase/classes",
)
class_bindings = result.get("results", {}).get("bindings", [])
self._report.record_query(
"two-phase/classes",
time.monotonic() - t0,
)
except Exception:
self._report.record_query(
"two-phase/classes",
time.monotonic() - t0,
success=False,
)
raise
else:
logger.info(
"Phase 1: discovering classes (chunk_size=%d) …",
ccs,
)
q = _build_class_discovery_query(self.graph_uris)
class_bindings = self._collect_bindings(
q,
purpose="two-phase/classes",
chunk_size=ccs,
)
classes = [b.get("class", {}).get("value", "") for b in class_bindings]
classes = [c for c in classes if c]
logger.info(f" -> {len(classes)} classes found")
self._report.finish_phase(p1, items=len(classes))
# Phase 2 - batched per-class pattern discovery
p2 = self._report.start_phase("per-class-patterns")
patterns, abort_reason = self._run_phase2_batches(
classes,
self.graph_uris,
)
# ── Ontology-graph fallback ───────────────────────────
# If the GRAPH-scoped pass found 0 patterns but Phase 1
# discovered classes, the graph is probably an *ontology*
# (class definitions only) while instances live in the
# default graph or other named graphs. Retry without the
# GRAPH restriction so the instance triple patterns are
# visible.
if not patterns and classes and self.graph_uris and abort_reason is None:
logger.warning(
"Phase 2 returned 0 patterns with GRAPH <%s> "
"- retrying without GRAPH restriction "
"(ontology-graph fallback)",
", ".join(self.graph_uris),
)
patterns, abort_reason = self._run_phase2_batches(
classes,
None,
)
logger.info(f" -> {len(patterns)} total patterns from {len(classes)} classes")
self._report.finish_phase(p2, items=len(patterns))
if abort_reason:
self._report.set_abort_reason(abort_reason)
return patterns
# ---- Bisecting query helper -----------------------------------
def _query_with_bisect(
self,
classes: list[str],
graph_uris: list[str] | None,
build_fn: Any,
purpose: str,
) -> list[dict[str, Any]]:
"""Run a batched VALUES query with automatic bisection fallback.
Strategy (in order):
1. Single SELECT for the full *classes* list.
2. Recursively bisect the list and retry each half independently,
down to individual single-class queries.
3. For a single-class batch that still fails: paginated SELECT
(LIMIT/OFFSET) - the result set itself is just large, so
pagination (not batch splitting) is the right tool.
Bisecting is tried before pagination for multi-class batches
because pagination does not reduce join cardinality: every page
still joins over the full VALUES block. Splitting the batch
into smaller groups immediately lowers query cost.
This guarantees that a persistently expensive batch is eventually
broken down into single-class queries which always complete,
rather than silently dropping data.
Parameters
----------
classes:
The class URIs to include in the VALUES block.
graph_uris:
Named graphs to restrict queries to.
build_fn:
One of the ``_build_batched_*`` module-level functions.
Called as ``build_fn(classes, graph_uris)`` or
``build_fn(classes, graph_uris, paginated=True)``.
purpose:
Label used for logging and report tracking.
Returns
-------
list[dict]
All SPARQL result bindings collected across all sub-queries.
"""
# ── attempt 1: single-shot SELECT ──────────────────────────
label = f"{classes[0]}…" if len(classes) > 1 else classes[0]
q = build_fn(classes, graph_uris)
try:
result = self._helper.select(q, purpose=purpose)
bindings: list[dict[str, Any]] = result.get("results", {}).get("bindings", [])
return bindings
except Exception:
logger.warning(
" %s single-shot failed for [%s] (%d classes) - %s\n query: %s",
purpose,
label,
len(classes),
"trying paginated" if len(classes) == 1 else "bisecting",
q,
)
# ── attempt 2: bisect (multi-class) or paginate (single-class) ──
#
# For multi-class batches, bisect immediately: pagination does not
# reduce join cardinality (every page still scans the full VALUES
# block), so splitting the batch is strictly cheaper.
#
# For single-class batches the batch can't shrink further, so the
# only lever left is pagination of the (potentially large) result set.
if len(classes) > 1:
mid = len(classes) // 2
left = self._query_with_bisect(
classes[:mid],
graph_uris,
build_fn,
purpose,
)
right = self._query_with_bisect(
classes[mid:],
graph_uris,
build_fn,
purpose,
)
return left + right
# ── attempt 3: paginated SELECT (single-class only) ──────────
qt = build_fn(
classes,
graph_uris,
paginated=True,
drop_distinct=self.unsafe_paging,
)
try:
raw = self._collect_bindings(
qt,
purpose=purpose,
chunk_size=self.chunk_size,
)
# Deduplicate in Python (results may contain duplicates when
# unsafe_paging drops DISTINCT, or from page-boundary overlaps)
seen: set[tuple[tuple[str, str], ...]] = set()
deduped: list[dict[str, Any]] = []
for b in raw:
key = tuple(sorted((k, v.get("value", "")) for k, v in b.items()))
if key not in seen:
seen.add(key)
deduped.append(b)
return deduped
except PaginationTruncatedError as e2:
logger.warning(
" %s pagination truncated at offset %d"
" for <%s> - trying property decomposition\n"
" query template: %s",
purpose,
e2.offset,
classes[0],
qt,
)
except Exception as e2:
logger.warning(
" %s paginated fallback failed for <%s>: %s"
" - trying property decomposition\n query template: %s",
purpose,
classes[0],
e2,
qt,
)
# ── attempt 4: property-first decomposition (typed-object only) ──
# Can't bisect or paginate further. For typed-object queries, enumerate
# ?p (cheap, 1-hop), then look up ?oc per property (cheap, 2-hop).
# This sidesteps the 3-way join that exceeds Virtuoso's cost limit.
if build_fn is _build_batched_typed_object_query:
return self._typed_object_by_property(
classes[0],
graph_uris,
purpose,
)
logger.warning(
" %s: all strategies exhausted for <%s> - skipping",
purpose,
classes[0],
)
return []
# ---- Property-first typed-object decomposition ---------------
def _enumerate_properties_for_class(
self,
class_uri: str,
graph_uris: list[str] | None,
purpose: str,
) -> list[str] | None:
"""Return distinct property URIs for *class_uri*, or ``None`` on failure.
Tries a single-shot SELECT first; falls back to paginated retrieval.
Returns ``None`` (not an empty list) when even the paginated attempt
fails, so the caller can distinguish "no properties" from "query
failed".
"""
prop_q = _build_properties_for_class_query(class_uri, graph_uris)
try:
prop_result = self._helper.select(prop_q, purpose=purpose)
return [
b["p"]["value"]
for b in prop_result.get("results", {}).get("bindings", [])
if b.get("p", {}).get("value")
]
except Exception as e:
logger.warning(
" %s: property single-shot for <%s> failed: %s - retrying paginated",
purpose,
class_uri,
e,
)
prop_qt = _build_properties_for_class_query(
class_uri,
graph_uris,
paginated=True,
drop_distinct=self.unsafe_paging,
)
try:
raw = self._collect_bindings(
prop_qt,
purpose=purpose,
chunk_size=_DECOMP_CHUNK,
)
seen: set[str] = set()
props: list[str] = []
for b in raw:
p_val = b.get("p", {}).get("value", "")
if p_val and p_val not in seen:
seen.add(p_val)
props.append(p_val)
return props
except Exception as e2:
logger.warning(
" %s: property enumeration for <%s> failed even paginated: %s - skipping",
purpose,
class_uri,
e2,
)
return None
def _enumerate_oc_for_class_property(
self,
class_uri: str,
prop_uri: str,
graph_uris: list[str] | None,
purpose: str,
) -> list[str]:
"""Return distinct object-class URIs for *(class_uri, prop_uri)*.
Tries a single-shot SELECT first; falls back to paginated retrieval.
Returns an empty list when both attempts fail (the property is skipped
silently after a warning).
"""
oc_q = _build_typed_object_for_class_property_query(
class_uri,
prop_uri,
graph_uris,
)
try:
oc_result = self._helper.select(oc_q, purpose=purpose)
return [
b.get("oc", {}).get("value", "")
for b in oc_result.get("results", {}).get("bindings", [])
if b.get("oc", {}).get("value")
]
except Exception as e:
logger.warning(
" %s: oc single-shot for <%s>/<%s> failed: %s - retrying paginated",
purpose,
class_uri,
prop_uri,
e,
)
oc_qt = _build_typed_object_for_class_property_query(
class_uri,
prop_uri,
graph_uris,
paginated=True,
drop_distinct=self.unsafe_paging,
)
try:
raw_oc = self._collect_bindings(
oc_qt,
purpose=purpose,
chunk_size=_DECOMP_CHUNK,
)
seen: set[str] = set()
oc_vals: list[str] = []
for b in raw_oc:
v = b.get("oc", {}).get("value", "")
if v and v not in seen:
seen.add(v)
oc_vals.append(v)
return oc_vals
except Exception as e2:
logger.warning(
" %s: oc query for <%s>/<%s> failed even paginated: %s - skipping this property",
purpose,
class_uri,
prop_uri,
e2,
)
return []
def _typed_object_by_property(
self,
class_uri: str,
graph_uris: list[str] | None,
purpose: str,
) -> list[dict[str, Any]]:
"""Typed-object patterns for one class via property-first decomposition.
Used when the standard 3-way join ``?s a ?class . ?s ?p ?o . ?o a ?oc``
exceeds Virtuoso's cost limit even for a single class.
Strategy
--------
1. Enumerate distinct ``?p`` for the class (1-hop, cheap).
2. For each ``?p``, enumerate distinct ``?oc`` (2-hop, property-indexed,
cheap because the property scope drastically reduces the scan).
Returns synthetic bindings in the same shape as the normal
typed-object query so the caller needs no special handling.
Pagination for decomposed queries uses :data:`_DECOMP_CHUNK`
(1 000 rows) - large enough to collect
most property lists in one or two pages, small enough to stay
under Virtuoso's per-page cost limit. ``select_chunked`` will
adaptively shrink further if individual pages still time out.
"""
logger.info(
" %s: <%s> too expensive for 3-way join - trying property-first decomposition",
purpose,
class_uri,
)
# Step 1: enumerate properties
props = self._enumerate_properties_for_class(
class_uri,
graph_uris,
purpose,
)
if props is None:
return []
logger.info(
" %s: <%s> has %d distinct properties - querying each",
purpose,
class_uri,
len(props),
)
# Step 2: for each property, collect typed-object classes
bindings: list[dict[str, Any]] = []
for prop_uri in props:
for oc in self._enumerate_oc_for_class_property(
class_uri,
prop_uri,
graph_uris,
purpose,
):
bindings.append(
{
"class": {"type": "uri", "value": class_uri},
"p": {"type": "uri", "value": prop_uri},
"oc": {"type": "uri", "value": oc},
}
)
logger.info(
" %s: property-first decomposition for <%s> yielded %d bindings",
purpose,
class_uri,
len(bindings),
)
return bindings
# ---- Phase 2 batch runner -------------------------------------
def _run_phase2_batches(
self,
classes: list[str],
graph_uris: list[str] | None,
) -> tuple[list[SchemaPattern], str | None]:
"""Execute Phase 2 batched queries for *classes*.
Parameters
----------
classes:
Class URIs discovered in Phase 1.
graph_uris:
Graph URIs to wrap queries in a ``GRAPH`` clause.
Pass ``None`` to query without graph restriction
(needed when the discovery graph is an ontology
that contains no instance data).
Returns
-------
(patterns, abort_reason)
*abort_reason* is ``None`` when all batches succeed.
"""
bs = self.class_batch_size
total = len(classes)
n_batches = (total + bs - 1) // bs
scope = f"GRAPH <{', '.join(graph_uris)}>" if graph_uris else "default graph"
logger.info(
"Phase 2: mining patterns in %d batches of ≤%d classes (%d classes total, scope: %s) …",
n_batches,
bs,
total,
scope,
)
patterns: list[SchemaPattern] = []
abort_reason: str | None = None
for batch_idx in range(n_batches):
batch_start = batch_idx * bs
batch = classes[batch_start : batch_start + bs]
batch_label = (
f"batch {batch_idx + 1}/{n_batches} "
f"(classes {batch_start + 1}"
f"-{batch_start + len(batch)}/{total})"
)
logger.info(" %s", batch_label)
# 2a. Typed-object patterns for this batch
t0 = time.monotonic()
typed_bindings = self._query_with_bisect(
batch,
graph_uris,
_build_batched_typed_object_query,
"two-phase/typed-object",
)
self._report.record_query(
"two-phase/typed-object",
time.monotonic() - t0,
)
for b in typed_bindings:
cls = b.get("class", {}).get("value", "")
p = b.get("p", {}).get("value", "")
oc = b.get("oc", {}).get("value", "")
if cls and p and oc:
patterns.append(
SchemaPattern(
subject_class=cls,
property_uri=p,
object_class=oc,
)
)
# 2b. Literal patterns for this batch
t0 = time.monotonic()
literal_bindings = self._query_with_bisect(
batch,
graph_uris,
_build_batched_literal_query,
"two-phase/literal",
)
self._report.record_query(
"two-phase/literal",
time.monotonic() - t0,
)
for b in literal_bindings:
cls = b.get("class", {}).get("value", "")
p = b.get("p", {}).get("value", "")
dt = b.get("dt", {}).get("value")
if cls and p:
patterns.append(
SchemaPattern(
subject_class=cls,
property_uri=p,
object_class="Literal",
datatype=dt if dt else None,
)
)
# 2c. Untyped-URI patterns for this batch
t0 = time.monotonic()
untyped_bindings = self._query_with_bisect(
batch,
graph_uris,
_build_batched_untyped_uri_query,
"two-phase/untyped-uri",
)
self._report.record_query(
"two-phase/untyped-uri",
time.monotonic() - t0,
)
untyped_oc = (
"http://www.w3.org/2002/07/owl#Class" if self.untyped_as_classes else "Resource"
)
for b in untyped_bindings:
cls = b.get("class", {}).get("value", "")
p = b.get("p", {}).get("value", "")
if cls and p:
patterns.append(
SchemaPattern(
subject_class=cls,
property_uri=p,
object_class=untyped_oc,
)
)
# Polite delay between batches
if self.delay > 0:
time.sleep(self.delay)
return patterns, abort_reason
# ---- private query runners ------------------------------------
def _collect_bindings(
self,
query_template: str,
purpose: str = "",
chunk_size: int | None = None,
) -> list[dict[str, Any]]:
"""Paginate through a SELECT query and collect all bindings.
Parameters
----------
query_template:
SPARQL query with ``{offset}`` / ``{limit}`` placeholders.
purpose:
Tag for logging and report tracking.
chunk_size:
Override the default ``self.chunk_size`` for this call.
Useful for phase-specific page sizes.
"""
effective = chunk_size if chunk_size is not None else self.chunk_size
all_bindings: list[dict[str, Any]] = []
has_rc = hasattr(self, "_rc")
page = 0
for chunk in self._helper.select_chunked(
query_template,
chunk_size=effective,
delay_between_chunks=self.delay,
purpose=purpose,
):
page += 1
all_bindings.extend(chunk)
if has_rc:
self._report.record_query(purpose, 0.0)
logger.info(
" %s page %d: +%d rows (%d total)",
purpose,
page,
len(chunk),
len(all_bindings),
)
return all_bindings
def _run_typed_object(self) -> list[SchemaPattern]:
"""Run the typed-object SELECT query."""
q = _build_typed_object_query(self.graph_uris)
bindings = self._collect_bindings(
q,
purpose="mining/typed-object",
)
results: list[SchemaPattern] = []
for b in bindings:
sc = b.get("sc", {}).get("value", "")
p = b.get("p", {}).get("value", "")
oc = b.get("oc", {}).get("value", "")
if sc and p and oc:
results.append(
SchemaPattern(
subject_class=sc,
property_uri=p,
object_class=oc,
)
)
return results
def _run_literal(self) -> list[SchemaPattern]:
"""Run the literal-property SELECT query."""
q = _build_literal_query(self.graph_uris)
bindings = self._collect_bindings(
q,
purpose="mining/literal",
)
results: list[SchemaPattern] = []
for b in bindings:
sc = b.get("sc", {}).get("value", "")
p = b.get("p", {}).get("value", "")
dt = b.get("dt", {}).get("value")
if sc and p:
results.append(
SchemaPattern(
subject_class=sc,
property_uri=p,
object_class="Literal",
datatype=dt if dt else None,
)
)
return results
def _run_untyped_uri(self) -> list[SchemaPattern]:
"""Run the untyped-URI SELECT query."""
q = _build_untyped_uri_query(self.graph_uris)
bindings = self._collect_bindings(
q,
purpose="mining/untyped-uri",
)
oc = "http://www.w3.org/2002/07/owl#Class" if self.untyped_as_classes else "Resource"
results: list[SchemaPattern] = []
for b in bindings:
sc = b.get("sc", {}).get("value", "")
p = b.get("p", {}).get("value", "")
if sc and p:
results.append(
SchemaPattern(
subject_class=sc,
property_uri=p,
object_class=oc,
count=None,
datatype=None,
subject_label=None,
object_label=None,
property_label=None,
)
)
return results
def _fetch_typed_count_batch(
self,
batch: list[str],
label: str,
counts: dict[tuple[str, str, str], int],
) -> None:
"""Query typed-object counts for one class batch and update *counts*."""
try:
t0 = time.monotonic()
bindings = self._query_with_bisect(
batch,
self.graph_uris,
_build_batched_typed_count_query,
"counts/typed-object",
)
if hasattr(self, "_rc"):
self._report.record_query(
"counts/typed-object",
time.monotonic() - t0,
)
for b in bindings:
key = (
b.get("class", {}).get("value", ""),
b.get("p", {}).get("value", ""),
b.get("oc", {}).get("value", ""),
)
cnt = b.get("cnt", {}).get("value")
if cnt:
counts[key] = int(float(cnt))
except Exception as e:
logger.warning(
"Typed-object count query failed (%s): %s",
label,
e,
)
def _fetch_literal_count_batch(
self,
batch: list[str],
label: str,
counts: dict[tuple[str, str, str], int],
) -> None:
"""Query literal counts for one class batch and update *counts*."""
try:
t0 = time.monotonic()
bindings = self._query_with_bisect(
batch,
self.graph_uris,
_build_batched_literal_count_query,
"counts/literal",
)
if hasattr(self, "_rc"):
self._report.record_query(
"counts/literal",
time.monotonic() - t0,
)
for b in bindings:
dt = b.get("dt", {}).get("value", "")
key = (
b.get("class", {}).get("value", ""),
b.get("p", {}).get("value", ""),
f"Literal:{dt}" if dt else "Literal",
)
cnt = b.get("cnt", {}).get("value")
if cnt:
counts[key] = int(float(cnt))
except Exception as e:
logger.warning(
"Literal count query failed (%s): %s",
label,
e,
)
def _fetch_untyped_count_batch(
self,
batch: list[str],
label: str,
counts: dict[tuple[str, str, str], int],
) -> None:
"""Query untyped-URI counts for one class batch and update *counts*."""
try:
t0 = time.monotonic()
bindings = self._query_with_bisect(
batch,
self.graph_uris,
_build_batched_untyped_count_query,
"counts/untyped-uri",
)
if hasattr(self, "_rc"):
self._report.record_query(
"counts/untyped-uri",
time.monotonic() - t0,
)
for b in bindings:
key = (
b.get("class", {}).get("value", ""),
b.get("p", {}).get("value", ""),
"Resource",
)
cnt = b.get("cnt", {}).get("value")
if cnt:
counts[key] = int(float(cnt))
except Exception as e:
logger.warning(
"Untyped-URI count query failed (%s): %s",
label,
e,
)
def _enrich_counts(
self,
patterns: list[SchemaPattern],
) -> list[SchemaPattern]:
"""Run COUNT queries and merge counts into patterns.
Count queries use the same batched VALUES / bisect
infrastructure as the pattern queries so that they
remain feasible on large endpoints. Each query type
(typed-object, literal, untyped-URI) is run per class
batch; failures are handled per-batch (logged and
skipped) rather than aborting all counts.
"""
# Collect unique subject classes from already-mined
# patterns - these are the classes we need counts for.
subject_classes = sorted({p.subject_class for p in patterns})
if not subject_classes:
return patterns
bs = self.class_batch_size
total = len(subject_classes)
n_batches = (total + bs - 1) // bs
logger.info(
"Counting phase: %d classes in %d batches of ≤%d …",
total,
n_batches,
bs,
)
# Build lookup: (sc, p, oc) -> count
counts: dict[tuple[str, str, str], int] = {}
for batch_idx in range(n_batches):
start = batch_idx * bs
batch = subject_classes[start : start + bs]
label = f"batch {batch_idx + 1}/{n_batches}"
self._fetch_typed_count_batch(batch, label, counts)
self._fetch_literal_count_batch(batch, label, counts)
self._fetch_untyped_count_batch(batch, label, counts)
# Polite delay between batches
if self.delay > 0:
time.sleep(self.delay)
logger.info(
"Counting phase: collected %d count entries",
len(counts),
)
# Merge counts into patterns
enriched: list[SchemaPattern] = []
for pat in patterns:
if pat.object_class == "Literal":
dt_key = f"Literal:{pat.datatype}" if pat.datatype else "Literal"
key = (
pat.subject_class,
pat.property_uri,
dt_key,
)
else:
key = (
pat.subject_class,
pat.property_uri,
pat.object_class,
)
cnt = counts.get(key)
enriched.append(
pat.model_copy(update={"count": cnt}),
)
return enriched
def _fetch_label_batch(
self,
batch: list[str],
label_map: dict[str, str],
) -> None:
"""Query labels for one batch of URIs and update *label_map* in place.
Parameters
----------
batch:
URIs to look up labels for.
label_map:
Mapping that will be updated with ``{uri: label}`` entries.
URIs already present in the map are skipped.
"""
t0 = time.monotonic()
try:
q = _build_label_query(batch, self.graph_uris)
result = self._helper.select(q, purpose="labels")
if hasattr(self, "_rc"):
self._report.record_query(
"labels",
time.monotonic() - t0,
)
bindings = result.get("results", {}).get("bindings", [])
for b in bindings:
uri = b.get("uri", {}).get("value", "")
if not uri or uri in label_map:
continue
rdfs_lbl = b.get("rdfsLabel", {}).get("value")
dc_lbl = b.get("dcTitle", {}).get("value")
iao_lbl = b.get("iaoLabel", {}).get("value")
skos_pref = b.get("skosPrefLabel", {}).get("value")
skos_alt = b.get("skosAltLabel", {}).get("value")
label_map[uri] = pick_label(
rdfs_lbl,
dc_lbl,
uri,
iao_label=iao_lbl,
skos_pref_label=skos_pref,
skos_alt_label=skos_alt,
)
except Exception as e:
if hasattr(self, "_rc"):
self._report.record_query(
"labels",
time.monotonic() - t0,
success=False,
)
logger.warning("Label batch failed (%d URIs) : %s", len(batch), e)
def _enrich_labels(
self,
patterns: list[SchemaPattern],
) -> list[SchemaPattern]:
"""Fetch rdfs:label / dc:title for all URIs in patterns.
URIs are queried in batches (max 50 per request) to avoid
HTTP 414 URI-too-long errors on endpoints that reject large
GET query strings.
"""
# Collect all unique URIs that need labels
all_uris: set[str] = set()
for pat in patterns:
all_uris.add(pat.subject_class)
all_uris.add(pat.property_uri)
if pat.object_class not in ("Literal", "Resource"):
all_uris.add(pat.object_class)
if not all_uris:
return patterns
# Fetch labels in batches to keep query size small
label_map: dict[str, str] = {}
batch_size = 50
uri_list = sorted(all_uris)
for start in range(0, len(uri_list), batch_size):
batch = uri_list[start : start + batch_size]
self._fetch_label_batch(batch, label_map)
# Fill in labels using local name as fallback
enriched = _enrich_with_local(patterns, label_map)
return enriched
def _enrich_with_local(
patterns: list[SchemaPattern], label_map: dict[str, str]
) -> list[SchemaPattern]:
enriched: list[SchemaPattern] = []
for pat in patterns:
updates: dict[str, Any] = {}
updates["subject_label"] = label_map.get(
pat.subject_class,
get_local_name(pat.subject_class),
)
updates["property_label"] = label_map.get(
pat.property_uri,
get_local_name(pat.property_uri),
)
if pat.object_class in ("Literal", "Resource"):
updates["object_label"] = pat.object_class
else:
updates["object_label"] = label_map.get(
pat.object_class,
get_local_name(pat.object_class),
)
enriched.append(pat.model_copy(update=updates))
return enriched
# -------------------------------------------------------------------
# Convenience function
# -------------------------------------------------------------------
[docs]
def mine_schema(
endpoint_url: str,
graph_uris: str | list[str] | None = None,
dataset_name: str | None = None,
chunk_size: int = 10_000,
class_chunk_size: int | None = None,
class_batch_size: int = 15,
delay: float = 0.5,
timeout: float = 120.0,
counts: bool = True,
two_phase: bool = True,
report_path: str | Path | None = None,
filter_service_namespaces: bool = True,
untyped_as_classes: bool = False,
authors: list[dict[str, str]] | None = None,
qlever_version: dict[str, str] | None = None,
one_shot: bool = False,
) -> MinedSchema:
"""One-shot helper: mine a schema and return :class:`MinedSchema`.
Parameters
----------
endpoint_url:
SPARQL endpoint URL.
graph_uris:
Named-graph URI(s) to restrict queries to.
dataset_name:
Human-readable name for the dataset.
chunk_size:
Pagination page size for pattern queries (single-pass and
count queries).
class_chunk_size:
Page size for the Phase-1 class-discovery query in two-phase
mode. ``None`` (default) disables pagination - the class
list is fetched in a single query. Set to a positive integer
when the endpoint has too many classes for one response.
class_batch_size:
Number of classes to group into a single VALUES query in
Phase-2 of two-phase mining. Default ``15``. Higher values
send fewer queries but each query is heavier.
delay:
Delay between pages (seconds).
timeout:
HTTP timeout per request.
counts:
Fetch triple counts per pattern.
two_phase:
Use two-phase mining (default ``True``). Pass ``False``
for the legacy single-pass strategy.
one_shot:
Run each pattern query as a single unbounded SELECT with no
LIMIT/OFFSET and no fallback chain. Intended for local
QLever endpoints. When ``True``, ``two_phase`` is ignored.
report_path:
If given, write an analytics JSON report to this path.
The file is updated incrementally after each mining phase.
filter_service_namespaces:
Strip patterns whose URIs belong to service / system
namespaces (Virtuoso, OpenLink, etc.) from the
result. Default ``True``.
untyped_as_classes:
Treat untyped URI objects as ``owl:Class`` references
instead of the generic ``rdfs:Resource`` sentinel.
Default ``False``.
Returns
-------
MinedSchema
Contains patterns and provenance metadata.
"""
miner = SchemaMiner(
endpoint_url=endpoint_url,
graph_uris=graph_uris,
chunk_size=chunk_size,
class_chunk_size=class_chunk_size,
class_batch_size=class_batch_size,
delay=delay,
timeout=timeout,
counts=counts,
two_phase=two_phase,
report_path=report_path,
filter_service_namespaces=filter_service_namespaces,
untyped_as_classes=untyped_as_classes,
authors=authors,
qlever_version=qlever_version,
one_shot=one_shot,
)
return miner.mine(dataset_name=dataset_name)
# -------------------------------------------------------------------
# Batch-mining helpers (used by api.mine_all_sources)
# -------------------------------------------------------------------
def _resolve_source_overrides(
entry: SourceEntry,
*,
chunk_size: int,
class_chunk_size: int | None,
class_batch_size: int,
delay: float,
timeout: float,
counts: bool,
two_phase: bool,
idx: int,
total: int,
name: str,
) -> dict[str, Any]:
"""Return effective per-source mining parameters.
Values from *entry* take precedence over the function-level
defaults. ``class_chunk_size`` is only forwarded for two-phase
rows; a warning is emitted otherwise.
"""
effective_ccs: int | None = None
if two_phase:
effective_ccs = entry.get("class_chunk_size", class_chunk_size)
elif class_chunk_size is not None:
logger.info(
"[%d/%d] --class-chunk-size ignored for %r (not two-phase)",
idx,
total,
name,
)
return {
"chunk_size": entry.get("chunk_size", chunk_size),
"class_chunk_size": effective_ccs,
"class_batch_size": entry.get(
"class_batch_size",
class_batch_size,
),
"delay": entry.get("delay", delay),
"timeout": entry.get("timeout", timeout),
"counts": entry.get("counts", counts),
}
def _write_schema_outputs(
schema: MinedSchema,
*,
out: Path,
name: str,
tag: str,
fmt: str,
) -> None:
"""Serialise *schema* to the requested format(s) under *out*."""
if fmt in ("jsonld", "all"):
jsonld_path = out / f"{name}_{tag}_schema.jsonld"
with open(jsonld_path, "w") as fh:
json.dump(schema.to_jsonld(), fh, indent=2)
logger.info(" -> %s", jsonld_path)
if fmt in ("void", "all"):
void_path = out / f"{name}_{tag}_void.ttl"
void_g = schema.to_void_graph()
void_g.serialize(destination=str(void_path), format="turtle")
logger.info(" -> %s (%d triples)", void_path, len(void_g))
def _mine_one_source(
entry: SourceEntry,
*,
idx: int,
total: int,
out: Path,
fmt: str,
chunk_size: int,
class_chunk_size: int | None,
class_batch_size: int,
delay: float,
timeout: float,
counts: bool,
reports: bool,
filter_service_namespaces: bool,
untyped_as_classes: bool,
authors: list[dict[str, str]] | None,
on_progress: (Callable[[str, int, int, str | None], None] | None),
succeeded: list[str],
failed: list[dict[str, str]],
) -> None:
"""Mine one source entry, write outputs, update *succeeded*/*failed*.
All complex logic (parameter resolution, path building, error
handling) lives here so that :func:`~rdfsolve.api.mine_all_sources`
stays a thin loop.
"""
name: str = entry.get("name", "")
endpoint: str = entry.get("endpoint", "")
use_graph: bool = entry.get("use_graph", False)
row_two_phase: bool = entry.get("two_phase", True)
graph_uris_arg: list[str] | None = None
entry_graphs = entry.get("graph_uris", [])
if use_graph and entry_graphs:
graph_uris_arg = list(entry_graphs)
logger.info("[%d/%d] Mining %r (%s)", idx, total, name, endpoint)
params = _resolve_source_overrides(
entry,
chunk_size=chunk_size,
class_chunk_size=class_chunk_size,
class_batch_size=class_batch_size,
delay=delay,
timeout=timeout,
counts=counts,
two_phase=row_two_phase,
idx=idx,
total=total,
name=name,
)
tag = "mined_remote_untyped" if untyped_as_classes else "mined_remote"
rpt_path: Path | None = out / f"{name}_{tag}_report.json" if reports else None
try:
schema = mine_schema(
endpoint_url=endpoint,
graph_uris=graph_uris_arg,
dataset_name=name,
two_phase=row_two_phase,
report_path=rpt_path,
filter_service_namespaces=filter_service_namespaces,
untyped_as_classes=untyped_as_classes,
authors=authors,
**params,
)
_write_schema_outputs(
schema,
out=out,
name=name,
tag=tag,
fmt=fmt,
)
succeeded.append(name)
if on_progress:
on_progress(name, idx, total, None)
except Exception as exc:
msg = str(exc)
logger.warning(" FAIL %s: %s", name, msg)
failed.append({"dataset": name, "error": msg})
if on_progress:
on_progress(name, idx, total, msg)