Skip to content

Records

cognite_data_quality._records

DataQualityRecord

Bases: BaseModel

DMS instance upsert payload for a single validated instance.

Attributes:

Name Type Description
space str

Destination DMS space for this record.

external_id str

External ID of the record instance.

properties dict

Dict of property values to write to the Records container.

Source code in cognite_data_quality/_records.py
class DataQualityRecord(BaseModel):
    """DMS instance upsert payload for a single validated instance.

    Attributes:
        space: Destination DMS space for this record.
        external_id: External ID of the record instance.
        properties: Dict of property values to write to the Records container.
    """

    space: str
    external_id: str
    properties: dict

RecordsConfig

Bases: BaseModel

Records API coordinates and rule set metadata for a validation run.

Attributes:

Name Type Description
stream_id str | None

External ID of the Records API stream to ingest into. Set to None to skip Records API ingestion.

rule_set_id str

Identifier for the SHACL rule set being applied (written to every record).

rule_set_version str

Version string of the rule set (written to every record).

data_domain_external_id str | None

Optional data domain tag written to each record for filtering in the validation dashboard.

records_space str

CDF DMS space that hosts the validation records container. Default: "dataQuality".

records_container str

External ID of the DMS container that stores records. Default: "DataQualityValidationRecord".

use_instance_space bool

Write each record into the same space as the validated instance instead of records_space. Default: False.

record_space str | None

Explicit destination space, overriding records_space when use_instance_space is False.

Source code in cognite_data_quality/_records.py
class RecordsConfig(BaseModel):
    """Records API coordinates and rule set metadata for a validation run.

    Attributes:
        stream_id: External ID of the Records API stream to ingest into.
            Set to ``None`` to skip Records API ingestion.
        rule_set_id: Identifier for the SHACL rule set being applied
            (written to every record).
        rule_set_version: Version string of the rule set (written to every
            record).
        data_domain_external_id: Optional data domain tag written to each
            record for filtering in the validation dashboard.
        records_space: CDF DMS space that hosts the validation records
            container. Default: ``"dataQuality"``.
        records_container: External ID of the DMS container that stores
            records. Default: ``"DataQualityValidationRecord"``.
        use_instance_space: Write each record into the same space as the
            validated instance instead of *records_space*. Default: ``False``.
        record_space: Explicit destination space, overriding *records_space*
            when *use_instance_space* is ``False``.
    """

    stream_id: str | None = None
    rule_set_id: str
    rule_set_version: str
    data_domain_external_id: str | None = None
    records_space: str = Field(default="dataQuality")
    records_container: str = Field(default="DataQualityValidationRecord")
    use_instance_space: bool = Field(default=False)
    record_space: str | None = Field(default=None)

Violation

Bases: BaseModel

A single constraint violation extracted from a SHACL validation report.

All fields map directly to SHACL result properties and may be None when the SHACL engine did not produce that property for a given result.

Attributes:

Name Type Description
focusNode str | None

URI of the node that failed the constraint.

resultMessage str | None

Human-readable violation message from sh:message.

resultSeverity str | None

Severity URI (e.g. "sh:Violation", "sh:Warning").

sourceConstraintComponent str | None

SHACL constraint component that raised the violation (e.g. "sh:MinCountConstraintComponent").

resultPath str | None

Property path that caused the violation.

value str | None

The offending value, if any.

sourceShape str | None

External ID / URI of the shape that contains the violated constraint.

sourceConstraint str | None

The specific constraint within the shape.

dimension str | None

Data quality dimension label (Completeness, Validity, etc.).

targetClass str | None

Local name of the sh:targetClass for the shape.

Source code in cognite_data_quality/_records.py
class Violation(BaseModel):
    """A single constraint violation extracted from a SHACL validation report.

    All fields map directly to SHACL result properties and may be ``None``
    when the SHACL engine did not produce that property for a given result.

    Attributes:
        focusNode: URI of the node that failed the constraint.
        resultMessage: Human-readable violation message from ``sh:message``.
        resultSeverity: Severity URI (e.g. ``"sh:Violation"``, ``"sh:Warning"``).
        sourceConstraintComponent: SHACL constraint component that raised the
            violation (e.g. ``"sh:MinCountConstraintComponent"``).
        resultPath: Property path that caused the violation.
        value: The offending value, if any.
        sourceShape: External ID / URI of the shape that contains the
            violated constraint.
        sourceConstraint: The specific constraint within the shape.
        dimension: Data quality dimension label (Completeness, Validity, etc.).
        targetClass: Local name of the ``sh:targetClass`` for the shape.
    """

    focusNode: str | None = None
    resultMessage: str | None = None
    resultSeverity: str | None = None
    sourceConstraintComponent: str | None = None
    resultPath: str | None = None
    value: str | None = None
    sourceShape: str | None = None
    sourceConstraint: str | None = None
    dimension: str | None = None
    targetClass: str | None = None

build_records(instances, violations, *, job_run_id, records_config, namespace_base)

Build Records API payloads for all instances.

Source code in cognite_data_quality/_records.py
def build_records(
    instances: Iterable[dict],
    violations: list[Violation],
    *,
    job_run_id: str,
    records_config: RecordsConfig,
    namespace_base: str,
) -> list[dict]:
    """Build Records API payloads for all instances."""
    violations_by_instance: dict[tuple[str, str], list[Violation]] = defaultdict(list)
    for violation in violations:
        parsed = parse_focus_node_uri(violation.focusNode or "", namespace_base)
        if parsed:
            violations_by_instance[parsed].append(violation)

    items: list[dict] = []
    for instance in instances:
        inst_space = instance.get("space", "unknown")
        inst_external_id = instance.get("externalId", "unknown")
        instance_key = (inst_space, inst_external_id)
        instance_violations = violations_by_instance.get(instance_key, [])
        focus_node = f"{namespace_base}{inst_space}/{inst_external_id}"

        failed_constraints: list[str] = []
        source_shapes: set[str] = set()
        constraint_details: list[dict] = []
        severities: list[str] = []
        dimensions: set[str] = set()
        target_classes: set[str] = set()

        for violation in instance_violations:
            constraint_component = violation.sourceConstraintComponent or "Unknown"
            result_path = violation.resultPath
            constraint_str = f"{constraint_component}::{result_path}" if result_path else constraint_component
            failed_constraints.append(constraint_str)

            if violation.sourceShape:
                source_shapes.add(violation.sourceShape)

            if violation.dimension:
                dimensions.add(violation.dimension)

            if violation.targetClass:
                target_classes.add(violation.targetClass)

            raw_severity = violation.resultSeverity or "Violation"
            severity = raw_severity.split("#")[-1] if "#" in raw_severity else raw_severity
            severities.append(severity)

            detail = {
                "sourceConstraintComponent": constraint_component,
                "resultMessage": violation.resultMessage or "No message",
                "resultSeverity": severity,
                "resultPath": violation.resultPath,
                "value": violation.value,
                "sourceConstraint": violation.sourceConstraint,
                "sourceShape": violation.sourceShape,
                "dimension": violation.dimension,
            }
            constraint_details.append({k: v for k, v in detail.items() if v is not None})

        record_external_id = f"dq_{records_config.rule_set_id}_{inst_external_id}_{job_run_id}"
        properties: dict[str, object] = {
            "ruleSetId": records_config.rule_set_id,
            "ruleSetVersion": records_config.rule_set_version,
            "jobRunId": job_run_id,
            "passedValidation": not any(s.lower() == _VIOLATION_SEVERITY for s in severities),
            "resultSeverity": severities,
            "failedConstraints": failed_constraints,
            "focusNode": focus_node,
            "focusNodeInstance": {"space": inst_space, "externalId": inst_external_id},
            "validationReport": {
                "violationCount": len(instance_violations),
                "violations": constraint_details,
                "summary": (
                    f"{len(instance_violations)} violation(s)" if instance_violations else "Passed all constraints"
                ),
            },
        }

        if source_shapes:
            properties["sourceShape"] = list(source_shapes)

        if dimensions:
            properties["dimensions"] = list(dimensions)

        if target_classes:
            properties["targetClass"] = list(target_classes)

        if records_config.data_domain_external_id:
            properties["dataDomainExternalId"] = records_config.data_domain_external_id

        dest_space = (
            inst_space
            if records_config.use_instance_space and inst_space and inst_space != "unknown"
            else (records_config.record_space or records_config.records_space)
        )
        items.append(
            {
                "space": dest_space,
                "externalId": record_external_id,
                "sources": [
                    {
                        "source": {
                            "type": "container",
                            "space": records_config.records_space,
                            "externalId": records_config.records_container,
                        },
                        "properties": properties,
                    }
                ],
            }
        )

    return items

build_schema_issue_records(schema_issues, *, job_run_id, records_config, datamodel_space, datamodel_external_id, datamodel_version)

Build Records API payloads for schema inconsistencies.

Parameters:

Name Type Description Default
schema_issues list

List of SchemaIssue objects from SHACL validation

required
job_run_id str

Unique identifier for this validation run

required
records_config RecordsConfig

Records API configuration

required
datamodel_space str

Data model space being validated

required
datamodel_external_id str

Data model external ID

required
datamodel_version str

Data model version

required

Returns:

Type Description
list[dict]

List of record payloads ready for Records API

Source code in cognite_data_quality/_records.py
def build_schema_issue_records(
    schema_issues: list,  # List of SchemaIssue from SHACL validation
    *,
    job_run_id: str,
    records_config: RecordsConfig,
    datamodel_space: str,
    datamodel_external_id: str,
    datamodel_version: str,
) -> list[dict]:
    """Build Records API payloads for schema inconsistencies.

    Args:
        schema_issues: List of SchemaIssue objects from SHACL validation
        job_run_id: Unique identifier for this validation run
        records_config: Records API configuration
        datamodel_space: Data model space being validated
        datamodel_external_id: Data model external ID
        datamodel_version: Data model version

    Returns:
        List of record payloads ready for Records API
    """
    if not schema_issues:
        return []

    items: list[dict] = []
    for idx, issue in enumerate(schema_issues):
        # Convert issue to dict if it has to_dict method, otherwise use __dict__
        issue_dict = issue.to_dict() if hasattr(issue, "to_dict") else issue.__dict__

        # Create unique record ID for schema issue
        issue_type = issue_dict.get("issue_type", "unknown")
        view_id = issue_dict.get("view_id", "unknown").replace("/", "_")
        property_name = issue_dict.get("property_name", "unknown")
        record_external_id = (
            f"schema_{records_config.rule_set_id}_{issue_type}_{view_id}_{property_name}_{job_run_id}_{idx}"
        )

        # Build properties for Records API
        properties: dict[str, object] = {
            "ruleSetId": records_config.rule_set_id,
            "ruleSetVersion": records_config.rule_set_version,
            "jobRunId": job_run_id,
            "passedValidation": False,  # Schema issues always mean validation didn't fully pass
            "resultSeverity": [issue_dict.get("severity", "Warning")],
            "failedConstraints": [f"schema::{issue_type}"],
            "focusNode": issue_dict.get("view_id", "unknown"),
            "validationReport": {
                "violationCount": 1,
                "violations": [
                    {
                        "issueType": issue_type,
                        "severity": issue_dict.get("severity", "Warning"),
                        "message": issue_dict.get("message", ""),
                        "viewId": issue_dict.get("view_id"),
                        "propertyName": issue_dict.get("property_name"),
                        "sourceViewId": issue_dict.get("source_view_id"),
                        "errorCode": issue_dict.get("error_code"),
                        "details": issue_dict.get("details", {}),
                    }
                ],
                "summary": issue_dict.get("message", "Schema inconsistency detected"),
            },
        }

        if records_config.data_domain_external_id:
            properties["dataDomainExternalId"] = records_config.data_domain_external_id

        # Add data model context to the validation report details (not as separate properties)
        properties["validationReport"]["dataModel"] = {  # type: ignore[index]
            "space": datamodel_space,
            "externalId": datamodel_external_id,
            "version": datamodel_version,
        }

        items.append(
            {
                "space": records_config.records_space,
                "externalId": record_external_id,
                "sources": [
                    {
                        "source": {
                            "type": "container",
                            "space": records_config.records_space,
                            "externalId": records_config.records_container,
                        },
                        "properties": properties,
                    }
                ],
            }
        )

    return items

ensure_records_infrastructure(client, records_config)

Ensure Records API stream and container exist.

Source code in cognite_data_quality/_records.py
def ensure_records_infrastructure(client: CogniteClient, records_config: RecordsConfig) -> None:
    """Ensure Records API stream and container exist."""
    if records_config.stream_id is None:
        return
    _ensure_records_stream(client, records_config.stream_id)
    _ensure_records_container(client, records_config.records_space, records_config.records_container)

parse_focus_node_uri(uri, namespace_base='http://purl.org/cognite/')

Parse focusNode URI into (space, external_id), matching NEAT and simple formats.

NEAT: {namespace_base}space/{instance_space}#{urlencoded(external_id)} Simple: {namespace_base}{space}/{external_id}

Source code in cognite_data_quality/_records.py
def parse_focus_node_uri(
    uri: str,
    namespace_base: str = "http://purl.org/cognite/",
) -> tuple[str, str] | None:
    """Parse focusNode URI into (space, external_id), matching NEAT and simple formats.

    NEAT:   {namespace_base}space/{instance_space}#{urlencoded(external_id)}
    Simple:     {namespace_base}{space}/{external_id}
    """
    if not uri or not uri.startswith(namespace_base):
        return None
    suffix = uri[len(namespace_base) :].rstrip("/")
    if not suffix:
        return None
    if suffix.startswith("space/"):
        idx = suffix.find("#")
        if idx < 0:
            return None
        instance_space = suffix[6:idx].rstrip("/")
        external_id = urllib.parse.unquote(suffix[idx + 1 :].strip())
        return (instance_space, external_id) if instance_space and external_id is not None else None
    parts = suffix.split("/")
    if len(parts) >= 2:
        return (parts[0], parts[1])
    if len(parts) == 1:
        return ("unknown", parts[0])
    return None