Skip to content

Records output

What this is

Validation produces two types of records in the CDF Records API:

Container Purpose
DataQualityValidationRecord One record per focus node per run — pass/fail, violation details, severity
RuleEngineResult One record per inference result — derived state from SHACL-AF sh:rule blocks

Both containers live in the same configurable space (default dataQuality) and stream.

When to use it

Use this guide when building dashboards, triage workflows, or automations that consume validation and inference outputs.

User mental model

  • DataQualityValidationRecord captures validation outcomes.
  • RuleEngineResult captures inferred/derived outputs.
  • Both are queryable through the same Records API filter/aggregate endpoints.

Runtime behavior

Grouped uniqueness record mode

To support high-volume grouped violations (uniqueness overflow and green pass markers), DataQualityValidationRecord includes an explicit record mode:

  • validationType = "single_violation" for existing per-instance records
  • validationType = "group_violation" for grouped summary records

Grouped records also include:

  • groupViolationType subtype:
  • global_overflow
  • value_overflow
  • pass

For group_violation records, focusNode and focusNodeInstance may be intentionally omitted. Consumers (apps/agents) should branch on validationType and groupViolationType rather than parsing free text.

Default configuration

Records are written to:

  • Space: dataQuality (configurable via records_space in settings)
  • Container: DataQualityValidationRecord (configurable via records.container in settings)
  • Stream: Set in settings as records.stream_id (e.g. dq_validation_stream, test_stream_1)

To post results when running validation locally, set post_to_records=True and provide a stream_id in RecordsConfig when calling run_validation.

Record schema (container properties)

Each record in the stream has a properties object keyed by space and container. The data quality fields live under properties[records_space][records_container] (e.g. dataQuality / DataQualityValidationRecord). Main properties:

Property Type Description
ruleSetId string Rule set identifier (e.g. from view config)
ruleSetVersion string Rule set version
jobRunId string Unique identifier for the validation job run
passedValidation boolean true if no violations, false otherwise
resultSeverity list of string Severities from violations: Violation, Warning, Info
failedConstraints list of string Failed constraint identifiers (e.g. MinCountConstraintComponent::name)
focusNode string URI of the validated instance (focus node)
focusNodeInstance object Reference to the instance (space, externalId)
dataDomainExternalId string Data domain / view identifier
validationReport object Full report with violations, violationCount, summary
sourceShape list of string Source shape URIs from SHACL

Record-level fields (outside properties) include externalId, createdTime, and lastUpdatedTime.

Minimal happy path

Querying records (Records API)

The Records API is in alpha. Use the CDF alpha header for all requests:

client.config.headers["cdf-version"] = "alpha"

Base URL pattern: /api/v1/projects/{project}/streams/{stream_id}/records.

Filter records

Endpoint: POST .../streams/{stream_id}/records/filter

Request body:

  • filter (optional): Filter on container properties using equals, in, etc.
  • lastUpdatedTime (optional): Time range, e.g. {"gt": start_ms} or {"gte": start_ms, "lte": end_ms} (milliseconds since epoch).
  • limit (optional): Max number of records to return (default behavior is API-specific).
  • cursor (optional): For pagination.

Property paths in filters are arrays: [space, container, propertyName], e.g. ["dataQuality", "DataQualityValidationRecord", "ruleSetId"].

Example: fetch validation records for a rule set (last 7 days)

from cognite.client import CogniteClient
from datetime import datetime, timedelta

client = CogniteClient()
client.config.headers["cdf-version"] = "alpha"

stream_id = "dq_validation_stream"  # or your settings["records"]["stream_id"]
space = "dataQuality"
container = "DataQualityValidationRecord"
rule_set_id = "EquipmentSHACLv1"

seven_days_ago_ms = int((datetime.now() - timedelta(days=7)).timestamp() * 1000)

response = client.post(
    f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/filter",
    json={
        "lastUpdatedTime": {"gt": seven_days_ago_ms},
        "limit": 1000,
        "filter": {
            "equals": {
                "property": [space, container, "ruleSetId"],
                "value": rule_set_id,
            }
        },
    },
)
data = response.json()
records = data.get("items", [])

for record in records:
    props = record.get("properties", {}).get(space, {}).get(container, {})
    passed = props.get("passedValidation")
    focus = props.get("focusNode")
    failed = props.get("failedConstraints", [])
    print(f"  {record.get('externalId')}: passed={passed}, focus={focus}, failed={len(failed)}")

Example: filter by pass/fail

# Only failed validations
response = client.post(
    f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/filter",
    json={
        "filter": {
            "equals": {
                "property": [space, container, "passedValidation"],
                "value": False,
            }
        },
        "limit": 500,
    },
)

Aggregate records

Endpoint: POST .../streams/{stream_id}/records/aggregate

Use this to get counts, unique values, or other aggregates without loading all records.

Request body:

  • filter (optional): Same as in filter (e.g. by ruleSetId, passedValidation).
  • lastUpdatedTime (optional): Time range in milliseconds.
  • aggregates: Map of aggregate names to definitions, e.g.:
  • "totalCount": {"count": {}}
  • "uniqueJobs": {"uniqueValues": {"property": [space, container, "jobRunId"]}}
  • "passedBreakdown": {"uniqueValues": {"property": [space, container, "passedValidation"]}}
  • "severityBreakdown": {"uniqueValues": {"property": [space, container, "resultSeverity"]}}

Example: aggregate stats for a rule set

payload = {
    "lastUpdatedTime": {"gt": seven_days_ago_ms},
    "filter": {
        "equals": {"property": [space, container, "ruleSetId"], "value": rule_set_id}
    },
    "aggregates": {
        "totalCount": {"count": {}},
        "jobRuns": {"uniqueValues": {"property": [space, container, "jobRunId"]}},
        "passedBreakdown": {"uniqueValues": {"property": [space, container, "passedValidation"]}},
    },
}
response = client.post(
    f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/aggregate",
    json=payload,
)
data = response.json()

total = data.get("aggregates", {}).get("totalCount", {}).get("count", 0)
job_buckets = data.get("aggregates", {}).get("jobRuns", {}).get("uniqueValueBuckets", [])
unique_jobs = len(job_buckets)
# passedBreakdown.uniqueValueBuckets: [{"value": true, "count": n}, {"value": false, "count": m}]

Example: list distinct rule set IDs in the stream

payload = {
    "lastUpdatedTime": {"gt": seven_days_ago_ms},
    "aggregates": {
        "ruleSetIds": {"uniqueValues": {"property": [space, container, "ruleSetId"]}}
    },
}
response = client.post(
    f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/aggregate",
    json=payload,
)
data = response.json()
buckets = data.get("aggregates", {}).get("ruleSetIds", {}).get("uniqueValueBuckets", [])
rule_set_ids = [b["value"] for b in buckets if b.get("value")]

Restoring the client default headers

After querying Records, restore the client headers if you need to avoid sending cdf-version: alpha to other APIs:

original_headers = client.config.headers.copy()
client.config.headers["cdf-version"] = "alpha"
try:
    # ... Records API calls ...
finally:
    client.config.headers = original_headers

RuleEngineResult records (inference output)

When a SHACL file contains sh:rule / sh:SPARQLRule shapes (SHACL-AF), the engine runs inference automatically alongside quality validation. Each CONSTRUCT triple typed dqs:RuleEngineResult is written as a separate record in the RuleEngineResult container.

Container properties

Property Type Description
ruleSetId string RuleSet external ID the rule came from
ruleSetVersion string RuleSet version
ruleId string Value of dqs:ruleId on the SHACL rule
runId string Shared job run ID (same as DataQualityValidationRecord.jobRunId)
resultType string Always "Inference" for rule engine results
focusNode string URI of the focus node
focusNodeInstance DirectRelation Hard link to the DMS instance (space, externalId)
resultValue string Optional scalar value (e.g. "Critical", "Overdue")
resultPayload JSON Optional free-form JSON body
causedBy DirectRelation list Lineage — upstream RuleEngineResult records this result depends on
dataDomainExternalId string Optional domain tag
producedAt Timestamp Materialisation time

Querying RuleEngineResult records

client.config.headers["cdf-version"] = "alpha"

rule_engine_stream_id = "dq_validation_stream"  # same stream as DQ records by default
space = "dataQuality"
container = "RuleEngineResult"

# Fetch all "MO_Criticality" inferences produced today
response = client.post(
    f"/api/v1/projects/{client.config.project}/streams/{rule_engine_stream_id}/records/filter",
    json={
        "filter": {
            "equals": {
                "property": [space, container, "ruleId"],
                "value": "MO_Criticality",
            }
        },
        "limit": 1000,
    },
)
for record in response.json().get("items", []):
    props = record["properties"][space][container]
    print(f"{props['focusNode']}: {props.get('resultValue')}")

Incremental reads for rule chaining

For downstream rule stages, use the rule_engine_result_sync function handler.

  • It supports both Records sync mode and latest-changed filter mode.
  • It persists per-listener checkpoint state (cursor + watermark).
  • It supports first-class rule-scoped filters (rule_ids, rule_set_ids, rule_set_versions, focus_nodes, focus_node_instances) in both modes.
  • It supports sync pagination beyond 1,000 records per run via page_limit + max_records.

For end-to-end chained pipeline usage, see Chained Conditional Logic.

Writing inference rules (SHACL-AF)

See Rule sources for the full authoring guide.


Best practices

  • Use typed fields (validationType, groupViolationType, ruleId) for automation instead of parsing message text.
  • Keep dashboard queries time-bounded with lastUpdatedTime.
  • Separate operational and exploratory queries by stream and filter scope.

Troubleshooting

  • Empty query results: verify stream/space/container and alpha header usage.
  • Missing grouped uniqueness signals: confirm uniqueness workflow is scheduled and active.
  • Missing inference lineage: verify downstream rules populate causedBy.

Dashboard

You can build your own dashboard using the filter and aggregate endpoints above. Query by ruleSetId, passedValidation, jobRunId, or any other container property to visualize pass/fail rates and violations over time. The RuleEngineResult container supports the same filter/aggregate API and can feed KPI dashboards directly.

Previous section

Next section