"""LinkML schema generation from JSON-LD.
Converts a rdfsolve JSON-LD schema dict (``@context`` + ``@graph``)
into a LinkML ``SchemaDefinition`` or its YAML serialisation.
"""
from __future__ import annotations
import logging
import re
from typing import Any, cast
from bioregistry import curie_from_iri
from linkml.generators.yamlgen import YAMLGenerator
from linkml_runtime.linkml_model import (
ClassDefinition,
SchemaDefinition,
SlotDefinition,
TypeDefinition,
)
logger = logging.getLogger(__name__)
__all__ = [
"make_valid_linkml_name",
"to_linkml",
"to_linkml_yaml",
]
# ── Name-cleaning helpers ────────────────────────────────────────
def _clean_local_part(local: str) -> str:
"""Clean the local part of a name while preserving structure.
Examples::
"KeyEvent" -> "KeyEvent"
"data1025" -> "data_1025"
"edam.data1025" -> "edam_data_1025"
"C123456" -> "C_123456"
"""
local = local.replace(".", "_")
local = re.sub(r"([a-zA-Z])(\d)", r"\1_\2", local)
local = re.sub(r"([a-z])([A-Z])", r"\1_\2", local)
local = re.sub(r"[^a-zA-Z0-9_]", "_", local)
return local
def _finalize_linkml_name(name: str) -> str:
"""Apply final cleanup rules to ensure valid LinkML identifier."""
name = re.sub(r"_+", "_", name)
name = name.strip("_")
if name and name[0].isdigit():
name = f"item_{name}"
elif not name or not name[0].isalpha():
name = f"item_{name}" if name else "unknown_item"
if not name:
name = "unknown_item"
return name
[docs]
def make_valid_linkml_name(uri_or_curie: str) -> str:
"""Convert a URI or CURIE to a valid LinkML identifier.
LinkML identifiers must start with a letter and contain only
letters, digits, and underscores.
Examples::
"aopo:KeyEvent" -> "aopo_KeyEvent"
"edam.data1025" -> "edam_data_1025"
"http://example.org/Cls" -> prefix_Cls (via bioregistry)
"""
if uri_or_curie.startswith(("http://", "https://")):
curie = curie_from_iri(uri_or_curie)
if curie:
uri_or_curie = curie
if ":" in uri_or_curie:
prefix, local = uri_or_curie.split(":", 1)
prefix = re.sub(r"[^a-zA-Z0-9_]", "_", prefix)
local = _clean_local_part(local)
name = f"{prefix}_{local}"
else:
name = _clean_local_part(uri_or_curie)
return _finalize_linkml_name(name)
# ── Core conversion ──────────────────────────────────────────────
def _derive_schema_meta(
jsonld: dict[str, Any],
schema_name: str | None,
schema_description: str | None,
schema_base_uri: str | None,
) -> tuple[str, str, str]:
"""Return ``(schema_name, schema_uri, description)``."""
if not schema_name:
about = jsonld.get("@about", {})
schema_name = about.get("dataset_name", "rdf_schema")
schema_name = re.sub(r"[^a-zA-Z0-9_]", "_", schema_name)
schema_uri = (
f"https://w3id.org/{schema_name}/"
if not schema_base_uri
else schema_base_uri.rstrip("/") + "/"
)
description = schema_description or f"LinkML schema generated from JSON-LD for {schema_name}"
return schema_name, schema_uri, description
def _build_prefixes(
schema_name: str,
schema_uri: str,
jsonld_context: dict[str, Any],
) -> dict[str, str]:
"""Merge base prefixes with JSON-LD ``@context`` entries."""
base: dict[str, str] = {
schema_name: schema_uri,
"linkml": "https://w3id.org/linkml/",
"schema": "http://schema.org/",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"xsd": "http://www.w3.org/2001/XMLSchema#",
}
return {**base, **jsonld_context}
def _build_empty_schema(
schema_name: str,
schema_uri: str,
description: str,
prefixes: dict[str, str],
) -> SchemaDefinition:
"""Construct a :class:`SchemaDefinition` with types pre-populated."""
return SchemaDefinition(
id=schema_uri,
name=schema_name,
description=description,
default_prefix=schema_name,
prefixes=prefixes,
types={
"string": TypeDefinition(
name="string",
uri="xsd:string",
base="str",
),
"uriorcurie": TypeDefinition(
name="uriorcurie",
uri="xsd:anyURI",
base="URIorCURIE",
),
},
)
def _collect_graph_items(
jsonld: dict[str, Any],
) -> list[dict[str, Any]] | None:
"""Return non-dataset ``@graph`` items, or ``None`` if absent/empty."""
if "@graph" not in jsonld:
logger.warning(
"No @graph found in JSON-LD, returning empty schema",
)
return None
items = [item for item in jsonld["@graph"] if item.get("@type") != "void:Dataset"]
if not items:
logger.warning("No schema triples found in JSON-LD @graph")
return None
return items
def _scan_graph_items(
items: list[dict[str, Any]],
label_map: dict[str, str],
) -> tuple[
set[str],
set[str],
dict[str, list[str]],
dict[str, str],
dict[str, str],
dict[str, str],
dict[str, str],
]:
"""Single pass over *items* collecting all schema metadata.
Returns
-------
(all_class_names, all_slot_names, class_properties,
property_ranges, property_descriptions,
original_class_uris, original_slot_uris)
"""
all_class_names: set[str] = set()
all_slot_names: set[str] = set()
class_properties: dict[str, list[str]] = {}
property_ranges: dict[str, str] = {}
property_descriptions: dict[str, str] = {}
original_class_uris: dict[str, str] = {}
original_slot_uris: dict[str, str] = {}
for item in items:
if "@id" not in item:
continue
subject = item["@id"]
subject_clean = make_valid_linkml_name(subject)
all_class_names.add(subject_clean)
original_class_uris.setdefault(subject_clean, subject)
class_properties.setdefault(subject_clean, [])
for prop, value in item.items():
if prop.startswith("@") or prop == "_counts":
continue
prop_clean = make_valid_linkml_name(prop)
all_slot_names.add(prop_clean)
original_slot_uris.setdefault(prop_clean, prop)
if prop_clean not in class_properties[subject_clean]:
class_properties[subject_clean].append(prop_clean)
_update_property_range(
prop_clean,
value,
all_class_names,
original_class_uris,
property_ranges,
)
if prop_clean not in property_descriptions:
lbl = label_map.get(prop)
property_descriptions[prop_clean] = lbl if lbl else f"Property {prop}"
return (
all_class_names,
all_slot_names,
class_properties,
property_ranges,
property_descriptions,
original_class_uris,
original_slot_uris,
)
def _update_property_range(
prop_clean: str,
value: Any,
all_class_names: set[str],
original_class_uris: dict[str, str],
property_ranges: dict[str, str],
) -> None:
"""Infer and record the range for *prop_clean* from *value*."""
val = value[0] if isinstance(value, list) and value else value
if isinstance(val, dict):
if "@id" in val:
target = make_valid_linkml_name(val["@id"])
all_class_names.add(target)
original_class_uris.setdefault(target, val["@id"])
property_ranges[prop_clean] = target
elif "@value" in val:
property_ranges[prop_clean] = "string"
elif isinstance(val, str):
property_ranges[prop_clean] = "string"
else:
property_ranges.setdefault(prop_clean, "string")
def _build_classes(
all_class_names: set[str],
class_properties: dict[str, list[str]],
original_class_uris: dict[str, str],
slot_name_mapping: dict[str, str],
label_map: dict[str, str],
prefixes: dict[str, str],
) -> dict[str, ClassDefinition]:
"""Build :class:`ClassDefinition` objects for every class."""
classes: dict[str, ClassDefinition] = {}
for class_name in all_class_names:
class_slots = [slot_name_mapping.get(p, p) for p in class_properties.get(class_name, [])]
original_uri = original_class_uris.get(class_name, class_name)
class_uri = _expand_uri(original_uri, prefixes)
classes[class_name] = ClassDefinition(
name=class_name,
description=label_map.get(
original_class_uris.get(class_name, ""),
f"Class representing {class_name}",
),
slots=class_slots,
class_uri=class_uri,
)
return classes
def _build_slots(
all_slot_names: set[str],
slot_name_mapping: dict[str, str],
property_ranges: dict[str, str],
property_descriptions: dict[str, str],
original_slot_uris: dict[str, str],
class_properties: dict[str, list[str]],
all_class_names: set[str],
prefixes: dict[str, str],
) -> dict[str, SlotDefinition]:
"""Build :class:`SlotDefinition` objects for every slot."""
slots: dict[str, SlotDefinition] = {}
for orig_slot in all_slot_names:
final = slot_name_mapping[orig_slot]
rng = property_ranges.get(orig_slot, "string")
if rng not in all_class_names and rng not in (
"string",
"uriorcurie",
):
rng = "string"
original_uri = original_slot_uris.get(orig_slot, orig_slot)
slot_uri = _expand_uri(original_uri, prefixes)
slot_def = SlotDefinition(
name=final,
description=property_descriptions.get(
orig_slot,
f"Property {orig_slot}",
),
range=rng,
slot_uri=slot_uri,
)
domain_classes = [c for c, props in class_properties.items() if orig_slot in props]
if domain_classes:
slot_def.domain_of = domain_classes
slot_def.owner = domain_classes[0]
slots[final] = slot_def
return slots
[docs]
def to_linkml(
jsonld: dict[str, Any],
*,
schema_name: str | None = None,
schema_description: str | None = None,
schema_base_uri: str | None = None,
) -> SchemaDefinition:
"""Generate a LinkML ``SchemaDefinition`` from a JSON-LD dict.
Parameters
----------
jsonld:
JSON-LD document with ``@context``, ``@graph``, and
optionally ``_labels``.
schema_name:
Name for the schema (also used as default prefix).
schema_description:
Human-readable description.
schema_base_uri:
Base URI; defaults to ``https://w3id.org/{schema_name}/``.
Returns
-------
SchemaDefinition
"""
schema_name, schema_uri, description = _derive_schema_meta(
jsonld,
schema_name,
schema_description,
schema_base_uri,
)
prefixes = _build_prefixes(
schema_name,
schema_uri,
jsonld.get("@context", {}),
)
schema = _build_empty_schema(
schema_name,
schema_uri,
description,
prefixes,
)
items = _collect_graph_items(jsonld)
if items is None:
return schema
label_map: dict[str, str] = jsonld.get("_labels", {})
(
all_class_names,
all_slot_names,
class_properties,
property_ranges,
property_descriptions,
original_class_uris,
original_slot_uris,
) = _scan_graph_items(items, label_map)
conflicts = all_class_names & all_slot_names
slot_name_mapping = {s: (f"has_{s}" if s in conflicts else s) for s in all_slot_names}
schema.classes = _build_classes(
all_class_names,
class_properties,
original_class_uris,
slot_name_mapping,
label_map,
prefixes,
)
schema.slots = _build_slots(
all_slot_names,
slot_name_mapping,
property_ranges,
property_descriptions,
original_slot_uris,
class_properties,
all_class_names,
prefixes,
)
return schema
def _expand_uri(
uri_or_curie: str,
prefixes: dict[str, str],
) -> str:
"""Expand a CURIE to a full URI using *prefixes*, or return as-is."""
if uri_or_curie.startswith(("http://", "https://")):
return uri_or_curie
if ":" in uri_or_curie:
prefix, local = uri_or_curie.split(":", 1)
ns = prefixes.get(prefix)
if ns:
return ns + local
return uri_or_curie
# ── YAML serialisation ───────────────────────────────────────────
[docs]
def to_linkml_yaml(
jsonld: dict[str, Any],
*,
schema_name: str | None = None,
schema_description: str | None = None,
schema_base_uri: str | None = None,
) -> str:
"""Return the LinkML schema as a YAML string.
Parameters are the same as :func:`to_linkml`.
"""
linkml_schema = to_linkml(
jsonld,
schema_name=schema_name,
schema_description=schema_description,
schema_base_uri=schema_base_uri,
)
return cast(str, YAMLGenerator(linkml_schema).serialize())