Tools

Utility tools for RDFSolve - download helpers, decompression, etc.

Benchmark

Benchmarking and metrics collection for RDFSolve mining runs.

Captures system-level performance data suitable for computer-science papers and reproducibility analysis:

  • Machine specifications (CPU, RAM, OS, kernel, hostname)

  • Per-run resource usage (wall time, peak RSS, CPU time, disk I/O)

  • QLever index/server metrics (index size, indexing time)

  • Environment info (Python version, rdfsolve version, QLever version)

  • Per-dataset statistics (triple count, file sizes, errors)

All data is written to benchmarks.jsonl (one JSON object per line) for easy aggregation with pandas / polars.

Usage from mine_local.py:

from rdfsolve.tools.benchmark import BenchmarkCollector

bench = BenchmarkCollector(output_dir=Path("mined_schemas"))
with bench.track("affymetrix", method="local-mine") as run:
    _mine_single_local(endpoint, name, out, args)
    run.add_extra("triples_indexed", 123456)
# run data auto-flushed to benchmarks.jsonl
class MachineInfo(hostname: str = '', os_name: str = '', os_release: str = '', os_version: str = '', architecture: str = '', cpu_model: str = '', cpu_count_logical: int = 0, cpu_count_physical: int = 0, ram_total_gb: float = 0.0, python_version: str = '', rdfsolve_version: str = '', qlever_version: str = '')[source]

Bases: object

Static information about the host machine.

hostname: str = ''
os_name: str = ''
os_release: str = ''
os_version: str = ''
architecture: str = ''
cpu_model: str = ''
cpu_count_logical: int = 0
cpu_count_physical: int = 0
ram_total_gb: float = 0.0
python_version: str = ''
rdfsolve_version: str = ''
qlever_version: str = ''
collect_machine_info() MachineInfo[source]

Gather static machine specifications.

class RunMetrics(dataset: str = '', method: str = '', endpoint: str = '', started_at: str = '', finished_at: str = '', wall_time_s: float = 0.0, cpu_user_s: float = 0.0, cpu_system_s: float = 0.0, peak_rss_mb: float = 0.0, read_bytes: int = 0, write_bytes: int = 0, success: bool = False, error: str = '', classes_found: int = 0, properties_found: int = 0, triples_count: int = 0, output_files: dict[str, str]=<factory>, output_sizes_mb: dict[str, float]=<factory>, extra: dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Resource usage captured for a single mining run.

dataset: str = ''
method: str = ''
endpoint: str = ''
started_at: str = ''
finished_at: str = ''
wall_time_s: float = 0.0
cpu_user_s: float = 0.0
cpu_system_s: float = 0.0
peak_rss_mb: float = 0.0
read_bytes: int = 0
write_bytes: int = 0
success: bool = False
error: str = ''
classes_found: int = 0
properties_found: int = 0
triples_count: int = 0
output_files: dict[str, str] = <dataclasses._MISSING_TYPE object>
output_sizes_mb: dict[str, float] = <dataclasses._MISSING_TYPE object>
extra: dict[str, Any] = <dataclasses._MISSING_TYPE object>
add_extra(key: str, value: Any) None[source]

Store an additional metric.

class BenchmarkCollector(output_dir: Path)[source]

Bases: object

Collects and persists benchmark data for mining runs.

Usage:

collector = BenchmarkCollector(output_dir)
with collector.track("drugbank", method="local-mine") as run:
    # … do the actual mining …
    run.add_extra("index_size_mb", 420)
# run metrics auto-saved to benchmarks.jsonl

Initialize a BenchmarkCollector with an output directory Path.

Parameters:

output_dir – Path to output directory

property machine_info: MachineInfo

Return the static machine info.

track(dataset: str, method: str = 'unknown', endpoint: str = '') Generator[RunMetrics, None, None][source]

Context manager that captures resource usage.

write_summary_csv() Path[source]

Read benchmarks.jsonl and produce a summary CSV.

Returns the path to the generated CSV file.

Decompress

Decompress RDF data files (.gz, .xz) in bulk.

Supports the two compression formats found across RDF data repositories:

  • .gz used by e.g. RDFPortal (*.ttl.gz), Bio2RDF (*.nq.gz)

  • .xz used by e.g. UniProt FTP (*.rdf.xz, *.owl.xz)

Usage

# Decompress a single file (auto-detects format) python -m rdfsolve.tools.decompress data/medgen/MGCONSO.ttl.gz

# Decompress everything under a directory python -m rdfsolve.tools.decompress data/uniprot_local/ –recursive

# Dry-run: just show what would be decompressed python -m rdfsolve.tools.decompress data/ –recursive –dry-run

# Keep the original compressed files python -m rdfsolve.tools.decompress data/ –recursive –keep

decompress_directory(directory: Path, *, recursive: bool = False, keep: bool = False, extensions: set[str] | None = None) list[Path][source]

Decompress all supported compressed files in a directory.

Parameters:
  • directory – Root directory to scan.

  • recursive – If True, walk subdirectories as well.

  • keep – Passed to decompress_file().

  • extensions – Restrict to a subset of extensions (e.g. {".gz"}). Defaults to all supported extensions.

Returns:

Paths to successfully decompressed files.

Return type:

list[Path]

decompress_file(src: Path, *, dest: Path | None = None, keep: bool = False, chunk_size: int = 65536) Path | None[source]

Decompress a single .gz or .xz file.

Parameters:
  • src – Path to the compressed file.

  • dest – Explicit output path. When None the suffix is stripped (e.g. foo.ttl.gz -> foo.ttl).

  • keep – If True the original compressed file is kept; otherwise it is removed after successful decompression.

  • chunk_size – Read/write buffer size in bytes (default 64 KiB).

Returns:

Path to the decompressed file, or None on error.

Return type:

Path | None