Source code for rdfsolve.tools.decompress

"""
Decompress RDF data files (.gz, .xz) in bulk.

Supports the two compression formats found across RDF data repositories:

* **.gz** used by e.g. RDFPortal (``*.ttl.gz``), Bio2RDF (``*.nq.gz``)
* **.xz** used by e.g. UniProt FTP (``*.rdf.xz``, ``*.owl.xz``)

Usage
-----
    # Decompress a single file (auto-detects format)
    python -m rdfsolve.tools.decompress data/medgen/MGCONSO.ttl.gz

    # Decompress everything under a directory
    python -m rdfsolve.tools.decompress data/uniprot_local/ --recursive

    # Dry-run: just show what would be decompressed
    python -m rdfsolve.tools.decompress data/ --recursive --dry-run

    # Keep the original compressed files
    python -m rdfsolve.tools.decompress data/ --recursive --keep
"""

from __future__ import annotations

import argparse
import gzip
import logging
import lzma
import shutil
from pathlib import Path

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s  %(levelname)-8s  %(message)s",
    datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)

__all__ = [
    "SUPPORTED_EXTENSIONS",
    "decompress_directory",
    "decompress_file",
]

# ── supported formats ────────────────────────────────────────────────────────

SUPPORTED_EXTENSIONS: dict[str, str] = {
    ".gz": "gzip",
    ".xz": "xz",
}


# ── core functions ───────────────────────────────────────────────────────────


[docs] def decompress_file( src: Path, *, dest: Path | None = None, keep: bool = False, chunk_size: int = 64 * 1024, ) -> Path | None: """Decompress a single ``.gz`` or ``.xz`` file. Parameters ---------- src Path to the compressed file. dest Explicit output path. When *None* the suffix is stripped (e.g. ``foo.ttl.gz`` -> ``foo.ttl``). keep If *True* the original compressed file is kept; otherwise it is removed after successful decompression. chunk_size Read/write buffer size in bytes (default 64 KiB). Returns ------- Path | None Path to the decompressed file, or *None* on error. """ suffix = src.suffix.lower() if suffix not in SUPPORTED_EXTENSIONS: log.warning("Unsupported extension %r - skipping %s", suffix, src) return None if dest is None: dest = src.with_suffix("") # strip the .gz / .xz if dest.exists(): log.info(" ✓ Already decompressed: %s", dest.name) return dest fmt = SUPPORTED_EXTENSIONS[suffix] opener = gzip.open if fmt == "gzip" else lzma.open src_mb = src.stat().st_size / (1024 * 1024) log.info(" ⬇ Decompressing %s (%.1f MB, %s) …", src.name, src_mb, fmt) try: with opener(src, "rb") as f_in, open(dest, "wb") as f_out: shutil.copyfileobj(f_in, f_out, length=chunk_size) dest_mb = dest.stat().st_size / (1024 * 1024) log.info(" ✓ %s -> %.1f MB", dest.name, dest_mb) except Exception as exc: log.error(" ✗ Failed to decompress %s: %s", src.name, exc) if dest.exists(): dest.unlink() return None if not keep: src.unlink() log.debug(" 🗑 Removed %s", src.name) return dest
[docs] def decompress_directory( directory: Path, *, recursive: bool = False, keep: bool = False, extensions: set[str] | None = None, ) -> list[Path]: """Decompress all supported compressed files in a directory. Parameters ---------- directory Root directory to scan. recursive If *True*, walk subdirectories as well. keep Passed to :func:`decompress_file`. extensions Restrict to a subset of extensions (e.g. ``{".gz"}``). Defaults to all supported extensions. Returns ------- list[Path] Paths to successfully decompressed files. """ if extensions is None: extensions = set(SUPPORTED_EXTENSIONS) pattern_fn = directory.rglob if recursive else directory.glob files = sorted(f for ext in extensions for f in pattern_fn(f"*{ext}") if f.is_file()) if not files: log.info("No compressed files found in %s", directory) return [] log.info("Found %d compressed file(s) in %s", len(files), directory) results: list[Path] = [] for f in files: out = decompress_file(f, keep=keep) if out is not None: results.append(out) log.info("Decompressed %d / %d files", len(results), len(files)) return results
# ── CLI entry-point ────────────────────────────────────────────────────────── def main() -> None: """CLI entry point — parse arguments and decompress the target path.""" parser = argparse.ArgumentParser( description="Decompress RDF data files (.gz, .xz).", ) parser.add_argument( "path", type=Path, help="File or directory to decompress.", ) parser.add_argument( "--recursive", "-r", action="store_true", help="Recurse into subdirectories.", ) parser.add_argument( "--keep", "-k", action="store_true", help="Keep the original compressed files.", ) parser.add_argument( "--dry-run", action="store_true", help="Only list files that would be decompressed.", ) parser.add_argument( "--extensions", nargs="*", default=None, help="Restrict to specific extensions (e.g. .gz .xz).", ) args = parser.parse_args() exts = set(args.extensions) if args.extensions else None target: Path = args.path.resolve() if target.is_file(): if args.dry_run: pass else: decompress_file(target, keep=args.keep) elif target.is_dir(): if exts is None: exts = set(SUPPORTED_EXTENSIONS) pattern_fn = target.rglob if args.recursive else target.glob files = sorted(f for ext in exts for f in pattern_fn(f"*{ext}") if f.is_file()) if args.dry_run: for _f in files: pass else: decompress_directory( target, recursive=args.recursive, keep=args.keep, extensions=exts, ) else: log.error("Path does not exist: %s", target) raise SystemExit(1) if __name__ == "__main__": main()