Records output
Validation results are written to the CDF Records API as DataQualityValidationRecord records. This page describes the record schema and how to query records containing data quality validation output.
Default configuration
Records are written to:
- Space:
dataQuality(configurable viarecords_spacein settings) - Container:
DataQualityValidationRecord(configurable viarecords.containerin settings) - Stream: Set in settings as
records.stream_id(e.g.dq_validation_stream,test_stream_1)
To post results when running validation locally, set post_to_records=True and provide a stream_id in RecordsConfig when calling run_validation.
Record schema (container properties)
Each record in the stream has a properties object keyed by space and container. The data quality fields live under properties[records_space][records_container] (e.g. dataQuality / DataQualityValidationRecord). Main properties:
| Property | Type | Description |
|---|---|---|
ruleSetId |
string | Rule set identifier (e.g. from view config) |
ruleSetVersion |
string | Rule set version |
jobRunId |
string | Unique identifier for the validation job run |
passedValidation |
boolean | true if no violations, false otherwise |
resultSeverity |
list of string | Severities from violations: Violation, Warning, Info |
failedConstraints |
list of string | Failed constraint identifiers (e.g. MinCountConstraintComponent::name) |
focusNode |
string | URI of the validated instance (focus node) |
focusNodeInstance |
object | Reference to the instance (space, externalId) |
dataDomainExternalId |
string | Data domain / view identifier |
validationReport |
object | Full report with violations, violationCount, summary |
sourceShape |
list of string | Source shape URIs from SHACL |
Record-level fields (outside properties) include externalId, createdTime, and lastUpdatedTime.
Querying records (Records API)
The Records API is in alpha. Use the CDF alpha header for all requests:
Base URL pattern: /api/v1/projects/{project}/streams/{stream_id}/records.
Filter records
Endpoint: POST .../streams/{stream_id}/records/filter
Request body:
filter(optional): Filter on container properties usingequals,in, etc.lastUpdatedTime(optional): Time range, e.g.{"gt": start_ms}or{"gte": start_ms, "lte": end_ms}(milliseconds since epoch).limit(optional): Max number of records to return (default behavior is API-specific).cursor(optional): For pagination.
Property paths in filters are arrays: [space, container, propertyName], e.g. ["dataQuality", "DataQualityValidationRecord", "ruleSetId"].
Example: fetch validation records for a rule set (last 7 days)
from cognite.client import CogniteClient
from datetime import datetime, timedelta
client = CogniteClient()
client.config.headers["cdf-version"] = "alpha"
stream_id = "dq_validation_stream" # or your settings["records"]["stream_id"]
space = "dataQuality"
container = "DataQualityValidationRecord"
rule_set_id = "EquipmentSHACLv1"
seven_days_ago_ms = int((datetime.now() - timedelta(days=7)).timestamp() * 1000)
response = client.post(
f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/filter",
json={
"lastUpdatedTime": {"gt": seven_days_ago_ms},
"limit": 1000,
"filter": {
"equals": {
"property": [space, container, "ruleSetId"],
"value": rule_set_id,
}
},
},
)
data = response.json()
records = data.get("items", [])
for record in records:
props = record.get("properties", {}).get(space, {}).get(container, {})
passed = props.get("passedValidation")
focus = props.get("focusNode")
failed = props.get("failedConstraints", [])
print(f" {record.get('externalId')}: passed={passed}, focus={focus}, failed={len(failed)}")
Example: filter by pass/fail
# Only failed validations
response = client.post(
f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/filter",
json={
"filter": {
"equals": {
"property": [space, container, "passedValidation"],
"value": False,
}
},
"limit": 500,
},
)
Aggregate records
Endpoint: POST .../streams/{stream_id}/records/aggregate
Use this to get counts, unique values, or other aggregates without loading all records.
Request body:
filter(optional): Same as in filter (e.g. byruleSetId,passedValidation).lastUpdatedTime(optional): Time range in milliseconds.aggregates: Map of aggregate names to definitions, e.g.:"totalCount": {"count": {}}"uniqueJobs": {"uniqueValues": {"property": [space, container, "jobRunId"]}}"passedBreakdown": {"uniqueValues": {"property": [space, container, "passedValidation"]}}"severityBreakdown": {"uniqueValues": {"property": [space, container, "resultSeverity"]}}
Example: aggregate stats for a rule set
payload = {
"lastUpdatedTime": {"gt": seven_days_ago_ms},
"filter": {
"equals": {"property": [space, container, "ruleSetId"], "value": rule_set_id}
},
"aggregates": {
"totalCount": {"count": {}},
"jobRuns": {"uniqueValues": {"property": [space, container, "jobRunId"]}},
"passedBreakdown": {"uniqueValues": {"property": [space, container, "passedValidation"]}},
},
}
response = client.post(
f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/aggregate",
json=payload,
)
data = response.json()
total = data.get("aggregates", {}).get("totalCount", {}).get("count", 0)
job_buckets = data.get("aggregates", {}).get("jobRuns", {}).get("uniqueValueBuckets", [])
unique_jobs = len(job_buckets)
# passedBreakdown.uniqueValueBuckets: [{"value": true, "count": n}, {"value": false, "count": m}]
Example: list distinct rule set IDs in the stream
payload = {
"lastUpdatedTime": {"gt": seven_days_ago_ms},
"aggregates": {
"ruleSetIds": {"uniqueValues": {"property": [space, container, "ruleSetId"]}}
},
}
response = client.post(
f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/aggregate",
json=payload,
)
data = response.json()
buckets = data.get("aggregates", {}).get("ruleSetIds", {}).get("uniqueValueBuckets", [])
rule_set_ids = [b["value"] for b in buckets if b.get("value")]
Restoring the client default headers
After querying Records, restore the client headers if you need to avoid sending cdf-version: alpha to other APIs:
original_headers = client.config.headers.copy()
client.config.headers["cdf-version"] = "alpha"
try:
# ... Records API calls ...
finally:
client.config.headers = original_headers
Dashboard
You can build your own dashboard using the filter and aggregate endpoints above. Query by ruleSetId, passedValidation, jobRunId, or any other container property to visualize pass/fail rates and violations over time.