Skip to content

Records output

Validation results are written to the CDF Records API as DataQualityValidationRecord records. This page describes the record schema and how to query records containing data quality validation output.

Default configuration

Records are written to:

  • Space: dataQuality (configurable via records_space in settings)
  • Container: DataQualityValidationRecord (configurable via records.container in settings)
  • Stream: Set in settings as records.stream_id (e.g. dq_validation_stream, test_stream_1)

To post results when running validation locally, set post_to_records=True and provide a stream_id in RecordsConfig when calling run_validation.

Record schema (container properties)

Each record in the stream has a properties object keyed by space and container. The data quality fields live under properties[records_space][records_container] (e.g. dataQuality / DataQualityValidationRecord). Main properties:

Property Type Description
ruleSetId string Rule set identifier (e.g. from view config)
ruleSetVersion string Rule set version
jobRunId string Unique identifier for the validation job run
passedValidation boolean true if no violations, false otherwise
resultSeverity list of string Severities from violations: Violation, Warning, Info
failedConstraints list of string Failed constraint identifiers (e.g. MinCountConstraintComponent::name)
focusNode string URI of the validated instance (focus node)
focusNodeInstance object Reference to the instance (space, externalId)
dataDomainExternalId string Data domain / view identifier
validationReport object Full report with violations, violationCount, summary
sourceShape list of string Source shape URIs from SHACL

Record-level fields (outside properties) include externalId, createdTime, and lastUpdatedTime.

Querying records (Records API)

The Records API is in alpha. Use the CDF alpha header for all requests:

client.config.headers["cdf-version"] = "alpha"

Base URL pattern: /api/v1/projects/{project}/streams/{stream_id}/records.

Filter records

Endpoint: POST .../streams/{stream_id}/records/filter

Request body:

  • filter (optional): Filter on container properties using equals, in, etc.
  • lastUpdatedTime (optional): Time range, e.g. {"gt": start_ms} or {"gte": start_ms, "lte": end_ms} (milliseconds since epoch).
  • limit (optional): Max number of records to return (default behavior is API-specific).
  • cursor (optional): For pagination.

Property paths in filters are arrays: [space, container, propertyName], e.g. ["dataQuality", "DataQualityValidationRecord", "ruleSetId"].

Example: fetch validation records for a rule set (last 7 days)

from cognite.client import CogniteClient
from datetime import datetime, timedelta

client = CogniteClient()
client.config.headers["cdf-version"] = "alpha"

stream_id = "dq_validation_stream"  # or your settings["records"]["stream_id"]
space = "dataQuality"
container = "DataQualityValidationRecord"
rule_set_id = "EquipmentSHACLv1"

seven_days_ago_ms = int((datetime.now() - timedelta(days=7)).timestamp() * 1000)

response = client.post(
    f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/filter",
    json={
        "lastUpdatedTime": {"gt": seven_days_ago_ms},
        "limit": 1000,
        "filter": {
            "equals": {
                "property": [space, container, "ruleSetId"],
                "value": rule_set_id,
            }
        },
    },
)
data = response.json()
records = data.get("items", [])

for record in records:
    props = record.get("properties", {}).get(space, {}).get(container, {})
    passed = props.get("passedValidation")
    focus = props.get("focusNode")
    failed = props.get("failedConstraints", [])
    print(f"  {record.get('externalId')}: passed={passed}, focus={focus}, failed={len(failed)}")

Example: filter by pass/fail

# Only failed validations
response = client.post(
    f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/filter",
    json={
        "filter": {
            "equals": {
                "property": [space, container, "passedValidation"],
                "value": False,
            }
        },
        "limit": 500,
    },
)

Aggregate records

Endpoint: POST .../streams/{stream_id}/records/aggregate

Use this to get counts, unique values, or other aggregates without loading all records.

Request body:

  • filter (optional): Same as in filter (e.g. by ruleSetId, passedValidation).
  • lastUpdatedTime (optional): Time range in milliseconds.
  • aggregates: Map of aggregate names to definitions, e.g.:
  • "totalCount": {"count": {}}
  • "uniqueJobs": {"uniqueValues": {"property": [space, container, "jobRunId"]}}
  • "passedBreakdown": {"uniqueValues": {"property": [space, container, "passedValidation"]}}
  • "severityBreakdown": {"uniqueValues": {"property": [space, container, "resultSeverity"]}}

Example: aggregate stats for a rule set

payload = {
    "lastUpdatedTime": {"gt": seven_days_ago_ms},
    "filter": {
        "equals": {"property": [space, container, "ruleSetId"], "value": rule_set_id}
    },
    "aggregates": {
        "totalCount": {"count": {}},
        "jobRuns": {"uniqueValues": {"property": [space, container, "jobRunId"]}},
        "passedBreakdown": {"uniqueValues": {"property": [space, container, "passedValidation"]}},
    },
}
response = client.post(
    f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/aggregate",
    json=payload,
)
data = response.json()

total = data.get("aggregates", {}).get("totalCount", {}).get("count", 0)
job_buckets = data.get("aggregates", {}).get("jobRuns", {}).get("uniqueValueBuckets", [])
unique_jobs = len(job_buckets)
# passedBreakdown.uniqueValueBuckets: [{"value": true, "count": n}, {"value": false, "count": m}]

Example: list distinct rule set IDs in the stream

payload = {
    "lastUpdatedTime": {"gt": seven_days_ago_ms},
    "aggregates": {
        "ruleSetIds": {"uniqueValues": {"property": [space, container, "ruleSetId"]}}
    },
}
response = client.post(
    f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/aggregate",
    json=payload,
)
data = response.json()
buckets = data.get("aggregates", {}).get("ruleSetIds", {}).get("uniqueValueBuckets", [])
rule_set_ids = [b["value"] for b in buckets if b.get("value")]

Restoring the client default headers

After querying Records, restore the client headers if you need to avoid sending cdf-version: alpha to other APIs:

original_headers = client.config.headers.copy()
client.config.headers["cdf-version"] = "alpha"
try:
    # ... Records API calls ...
finally:
    client.config.headers = original_headers

Dashboard

You can build your own dashboard using the filter and aggregate endpoints above. Query by ruleSetId, passedValidation, jobRunId, or any other container property to visualize pass/fail rates and violations over time.