Records output
What this is
Validation produces two types of records in the CDF Records API:
| Container | Purpose |
|---|---|
DataQualityValidationRecord |
One record per focus node per run — pass/fail, violation details, severity |
RuleEngineResult |
One record per inference result — derived state from SHACL-AF sh:rule blocks |
Both containers live in the same configurable space (default dataQuality) and stream.
When to use it
Use this guide when building dashboards, triage workflows, or automations that consume validation and inference outputs.
User mental model
DataQualityValidationRecordcaptures validation outcomes.RuleEngineResultcaptures inferred/derived outputs.- Both are queryable through the same Records API filter/aggregate endpoints.
Runtime behavior
Grouped uniqueness record mode
To support high-volume grouped violations (uniqueness overflow and green pass markers), DataQualityValidationRecord
includes an explicit record mode:
validationType = "single_violation"for existing per-instance recordsvalidationType = "group_violation"for grouped summary records
Grouped records also include:
groupViolationTypesubtype:global_overflowvalue_overflowpass
For group_violation records, focusNode and focusNodeInstance may be intentionally omitted.
Consumers (apps/agents) should branch on validationType and groupViolationType rather than parsing free text.
Default configuration
Records are written to:
- Space:
dataQuality(configurable viarecords_spacein settings) - Container:
DataQualityValidationRecord(configurable viarecords.containerin settings) - Stream: Set in settings as
records.stream_id(e.g.dq_validation_stream,test_stream_1)
To post results when running validation locally, set post_to_records=True and provide a stream_id in RecordsConfig when calling run_validation.
Record schema (container properties)
Each record in the stream has a properties object keyed by space and container. The data quality fields live under properties[records_space][records_container] (e.g. dataQuality / DataQualityValidationRecord). Main properties:
| Property | Type | Description |
|---|---|---|
ruleSetId |
string | Rule set identifier (e.g. from view config) |
ruleSetVersion |
string | Rule set version |
jobRunId |
string | Unique identifier for the validation job run |
passedValidation |
boolean | true if no violations, false otherwise |
resultSeverity |
list of string | Severities from violations: Violation, Warning, Info |
failedConstraints |
list of string | Failed constraint identifiers (e.g. MinCountConstraintComponent::name) |
focusNode |
string | URI of the validated instance (focus node) |
focusNodeInstance |
object | Reference to the instance (space, externalId) |
dataDomainExternalId |
string | Data domain / view identifier |
validationReport |
object | Full report with violations, violationCount, summary |
sourceShape |
list of string | Source shape URIs from SHACL |
Record-level fields (outside properties) include externalId, createdTime, and lastUpdatedTime.
Minimal happy path
Querying records (Records API)
The Records API is in alpha. Use the CDF alpha header for all requests:
Base URL pattern: /api/v1/projects/{project}/streams/{stream_id}/records.
Filter records
Endpoint: POST .../streams/{stream_id}/records/filter
Request body:
filter(optional): Filter on container properties usingequals,in, etc.lastUpdatedTime(optional): Time range, e.g.{"gt": start_ms}or{"gte": start_ms, "lte": end_ms}(milliseconds since epoch).limit(optional): Max number of records to return (default behavior is API-specific).cursor(optional): For pagination.
Property paths in filters are arrays: [space, container, propertyName], e.g. ["dataQuality", "DataQualityValidationRecord", "ruleSetId"].
Example: fetch validation records for a rule set (last 7 days)
from cognite.client import CogniteClient
from datetime import datetime, timedelta
client = CogniteClient()
client.config.headers["cdf-version"] = "alpha"
stream_id = "dq_validation_stream" # or your settings["records"]["stream_id"]
space = "dataQuality"
container = "DataQualityValidationRecord"
rule_set_id = "EquipmentSHACLv1"
seven_days_ago_ms = int((datetime.now() - timedelta(days=7)).timestamp() * 1000)
response = client.post(
f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/filter",
json={
"lastUpdatedTime": {"gt": seven_days_ago_ms},
"limit": 1000,
"filter": {
"equals": {
"property": [space, container, "ruleSetId"],
"value": rule_set_id,
}
},
},
)
data = response.json()
records = data.get("items", [])
for record in records:
props = record.get("properties", {}).get(space, {}).get(container, {})
passed = props.get("passedValidation")
focus = props.get("focusNode")
failed = props.get("failedConstraints", [])
print(f" {record.get('externalId')}: passed={passed}, focus={focus}, failed={len(failed)}")
Example: filter by pass/fail
# Only failed validations
response = client.post(
f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/filter",
json={
"filter": {
"equals": {
"property": [space, container, "passedValidation"],
"value": False,
}
},
"limit": 500,
},
)
Aggregate records
Endpoint: POST .../streams/{stream_id}/records/aggregate
Use this to get counts, unique values, or other aggregates without loading all records.
Request body:
filter(optional): Same as in filter (e.g. byruleSetId,passedValidation).lastUpdatedTime(optional): Time range in milliseconds.aggregates: Map of aggregate names to definitions, e.g.:"totalCount": {"count": {}}"uniqueJobs": {"uniqueValues": {"property": [space, container, "jobRunId"]}}"passedBreakdown": {"uniqueValues": {"property": [space, container, "passedValidation"]}}"severityBreakdown": {"uniqueValues": {"property": [space, container, "resultSeverity"]}}
Example: aggregate stats for a rule set
payload = {
"lastUpdatedTime": {"gt": seven_days_ago_ms},
"filter": {
"equals": {"property": [space, container, "ruleSetId"], "value": rule_set_id}
},
"aggregates": {
"totalCount": {"count": {}},
"jobRuns": {"uniqueValues": {"property": [space, container, "jobRunId"]}},
"passedBreakdown": {"uniqueValues": {"property": [space, container, "passedValidation"]}},
},
}
response = client.post(
f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/aggregate",
json=payload,
)
data = response.json()
total = data.get("aggregates", {}).get("totalCount", {}).get("count", 0)
job_buckets = data.get("aggregates", {}).get("jobRuns", {}).get("uniqueValueBuckets", [])
unique_jobs = len(job_buckets)
# passedBreakdown.uniqueValueBuckets: [{"value": true, "count": n}, {"value": false, "count": m}]
Example: list distinct rule set IDs in the stream
payload = {
"lastUpdatedTime": {"gt": seven_days_ago_ms},
"aggregates": {
"ruleSetIds": {"uniqueValues": {"property": [space, container, "ruleSetId"]}}
},
}
response = client.post(
f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/aggregate",
json=payload,
)
data = response.json()
buckets = data.get("aggregates", {}).get("ruleSetIds", {}).get("uniqueValueBuckets", [])
rule_set_ids = [b["value"] for b in buckets if b.get("value")]
Restoring the client default headers
After querying Records, restore the client headers if you need to avoid sending cdf-version: alpha to other APIs:
original_headers = client.config.headers.copy()
client.config.headers["cdf-version"] = "alpha"
try:
# ... Records API calls ...
finally:
client.config.headers = original_headers
RuleEngineResult records (inference output)
When a SHACL file contains sh:rule / sh:SPARQLRule shapes (SHACL-AF), the engine runs inference automatically alongside quality validation. Each CONSTRUCT triple typed dqs:RuleEngineResult is written as a separate record in the RuleEngineResult container.
Container properties
| Property | Type | Description |
|---|---|---|
ruleSetId |
string | RuleSet external ID the rule came from |
ruleSetVersion |
string | RuleSet version |
ruleId |
string | Value of dqs:ruleId on the SHACL rule |
runId |
string | Shared job run ID (same as DataQualityValidationRecord.jobRunId) |
resultType |
string | Always "Inference" for rule engine results |
focusNode |
string | URI of the focus node |
focusNodeInstance |
DirectRelation | Hard link to the DMS instance (space, externalId) |
resultValue |
string | Optional scalar value (e.g. "Critical", "Overdue") |
resultPayload |
JSON | Optional free-form JSON body |
causedBy |
DirectRelation list | Lineage — upstream RuleEngineResult records this result depends on |
dataDomainExternalId |
string | Optional domain tag |
producedAt |
Timestamp | Materialisation time |
Querying RuleEngineResult records
client.config.headers["cdf-version"] = "alpha"
rule_engine_stream_id = "dq_validation_stream" # same stream as DQ records by default
space = "dataQuality"
container = "RuleEngineResult"
# Fetch all "MO_Criticality" inferences produced today
response = client.post(
f"/api/v1/projects/{client.config.project}/streams/{rule_engine_stream_id}/records/filter",
json={
"filter": {
"equals": {
"property": [space, container, "ruleId"],
"value": "MO_Criticality",
}
},
"limit": 1000,
},
)
for record in response.json().get("items", []):
props = record["properties"][space][container]
print(f"{props['focusNode']}: {props.get('resultValue')}")
Incremental reads for rule chaining
For downstream rule stages, use the rule_engine_result_sync function handler.
- It supports both Records
syncmode and latest-changedfiltermode. - It persists per-listener checkpoint state (
cursor+watermark). - It supports first-class rule-scoped filters (
rule_ids,rule_set_ids,rule_set_versions,focus_nodes,focus_node_instances) in both modes. - It supports sync pagination beyond 1,000 records per run via
page_limit+max_records.
For end-to-end chained pipeline usage, see Chained Conditional Logic.
Writing inference rules (SHACL-AF)
See Rule sources for the full authoring guide.
Best practices
- Use typed fields (
validationType,groupViolationType,ruleId) for automation instead of parsing message text. - Keep dashboard queries time-bounded with
lastUpdatedTime. - Separate operational and exploratory queries by stream and filter scope.
Troubleshooting
- Empty query results: verify stream/space/container and alpha header usage.
- Missing grouped uniqueness signals: confirm uniqueness workflow is scheduled and active.
- Missing inference lineage: verify downstream rules populate
causedBy.
Dashboard
You can build your own dashboard using the filter and aggregate endpoints above. Query by ruleSetId, passedValidation, jobRunId, or any other container property to visualize pass/fail rates and violations over time. The RuleEngineResult container supports the same filter/aggregate API and can feed KPI dashboards directly.