cognite-data-quality — Deep Architecture Reference
Scope: This document covers the internal design of the
cognite-data-qualityPython package and the CDF infrastructure it drives. It is aimed at engineers who need to understand, extend, or debug the system. For a higher-level overview seeARCHITECTURE.md.
Table of Contents
- Package layout
- Single-function dispatch model
- DMS API usage patterns
- Instance validation — sync (real-time)
- Instance validation — historic (partitioned)
- Instance sync-cursor validation
- Time series validation
- RAW table validation
- Orchestrator
- Workflow design
- Records API integration
- State management (DMS containers)
- Fault-tolerance and timeout handling
- End-to-end data flows
- Combined lifecycle: historic batch + sync trigger
1. Package layout
cognite_data_quality/
├── _function_code/
│ ├── handler.py # CDF Function entry-point (dispatcher)
│ ├── handlers/
│ │ ├── instance_validation.py # Real-time DMS validation
│ │ ├── instance_sync_cursor_validation.py # Cursor-based ongoing sync validation
│ │ ├── partitioned_validation.py # Historic DMS partition validation
│ │ ├── orchestrator.py # Historic orchestration controller
│ │ ├── orchestration_status.py # Orchestration status monitoring
│ │ ├── timeseries_validation.py # Time series SHACL validation
│ │ ├── raw_validation.py # RAW table validation
│ │ └── test_rule.py # Dry-run SHACL rule testing
│ └── common/
│ ├── cursor_state.py # DMS-backed cursor state (partitioned + TS)
│ ├── records_api.py # Records API posting with batching + retry
│ ├── shacl_utils.py # SHACL file loading from CDF Files
│ ├── time_window.py # Time-window replacement in SHACL rules
│ ├── timestamp_collection_state.py # Orchestrator timestamp-collection state
│ └── raw_state.py # RAW incremental state helpers
All handlers live in one deployed CDF Function. The handler.py entry-point routes by validation_type.
2. Single-function dispatch model
CDF allows only one Python entry-point per Function. Rather than deploying one Function per validation type, the package deploys a single Function (data-quality-validation) that inspects the validation_type field of its input data dict and forwards the call:
handler.handle(data, client, secrets)
├── "instance" → handle_instance_validation()
├── "instance_sync_cursor" → handle_instance_sync_cursor_validation()
├── "partitioned" → handle_partitioned_validation()
├── "timeseries" → handle_timeseries_validation()
├── "orchestrator" → handle_orchestrator()
├── "orchestration_status" → handle_orchestration_status()
├── "raw_incremental" → handle_raw_validation_incremental()
├── "raw_historic" → handle_raw_validation_historic()
└── "test" → handle_test_rule()
Secrets (client-id, client-secret) are passed to all handlers but only actively consumed by the orchestrator, which uses them to create Function schedules and workflow triggers with dedicated credentials (to avoid nested OAuth token chains).
Version logging: On every invocation, handler.py logs cognite-data-quality.__version__ — this is the primary way to confirm which package version is executing inside CDF.
3. DMS API usage patterns
The system makes heavy use of the CDF Data Modeling Service (DMS) for:
3.1 Querying instances
| Operation | API call | Used by |
|---|---|---|
| List instances with filter | client.data_modeling.instances.list() |
TS validation, orchestrator count |
| Sync (paginated, with cursor) | client.data_modeling.instances.sync(query) |
partitioned, sync-cursor handlers |
| Retrieve by NodeId | client.data_modeling.instances.retrieve() |
RAW state load, cursor state load |
| Aggregate (count/min/max) | client.data_modeling.instances.aggregate() |
Orchestrator data distribution |
| Apply (upsert) | client.data_modeling.instances.apply() |
All cursor state writes |
| Delete | client.data_modeling.instances.delete() |
Cursor cleanup after completion |
3.2 Sync query structure
The instances.sync() call is the workhorse for both historic partitioned and ongoing sync-cursor modes. A typical query looks like:
Query(
with_={
"nodes": NodeResultSetExpression(
filter=And(
HasData(views=[view_id]),
In(["node", "space"], instance_spaces),
Range(property=["node", partition_field], gte=min_val, lt=max_val),
),
limit=chunk_size, # adaptive; steps down on timeout
sync_mode="two_phase", # ensures stable cursor semantics
)
},
select={
"nodes": Select(sources=[SourceSelector(source=view_id, properties=[])])
},
cursors={"nodes": current_cursor} if current_cursor else None,
)
result = client.data_modeling.instances.sync(query=query)
sync_mode="two_phase" is used for historic/partitioned to get a stable snapshot with reliable cursor pagination. sync_mode="no_backfill" is used for the workflow trigger (see §10) so it watches only future changes.
3.3 Filter construction
Filters are assembled from cognite.client.data_classes.filters:
HasData(views=[view_id])— restricts to nodes that have data in the viewIn(["node", "space"], instance_spaces)— restricts to declared instance spacesRange(property=[...], gte=..., lt=...)— partition range filter- System properties:
["node", "lastUpdatedTime"] - View/container properties: resolved to
[container_space, container_id, container_property]viaclient.data_modeling.views.retrieve()
3.4 Property access pattern
When nodes come back from sync(), they are converted to plain dicts with node.dump(camel_case=True) so they can be passed to NEAT's validate_instances.with_shacl() unchanged. Properties are nested under properties → {view_id} → {property_name}.
4. Instance validation — sync (real-time)
Handler: handle_instance_validation()
Validation type: "instance"
Flow
CDF Data Change Event
│
▼
Workflow Trigger (DM subscription)
│ payload: {"items": {"n": [...nodes...], "e": [...edges...]}}
▼
CDF Workflow step → invokes Function with validation_type="instance"
│
▼
handler.py → handle_instance_validation()
│
├── Extract instances from payload (items.n + items.e)
├── Filter deleted instances (deletedTime set → skip)
├── Load SHACL rules from CDF Files (by external_id)
├── NeatSession.validate_instances.with_shacl(
│ instances, shacl_rules, datamodel, auto_load_depth=2)
├── Parse rdflib report graph → list[violation dicts]
└── post_validation_results_to_records() → Records API
Key behaviours
- Deleted instance filtering: Instances with
deletedTimeset are silently skipped. This handles delete events from CDF subscriptions. - SHACL engine failure handling: If pyshacl returns a non-Graph object (internal crash), synthetic
SHACLEngineFailureviolations are created for all instances in the batch so Records API always reflects the validation attempt. - 10% failure threshold: If more than 10% of Records API posts fail, the handler raises
RuntimeErrorto mark the function call as failed (triggering CDF retry logic). - Auto-load depth 2: NEAT fetches the instance's direct references AND their references before running SHACL. This is configurable per view.
Input payload structure
{
"validation_type": "instance",
"instances": {
"items": {
"n": [ /* node dicts */ ],
"e": [ /* edge dicts */ ]
}
},
"shacl_rules_file_external_id": "my-view-shacl-rules",
"datamodel_space": "my_space",
"datamodel_external_id": "MyDataModel",
"datamodel_version": "v1",
"auto_load_depth": 2,
"records_config": {
"stream_id": "dq_stream",
"rule_set_id": "MyViewSHACLv1",
"rule_set_version": "1.0"
}
}
5. Instance validation — historic (partitioned)
Handler: handle_partitioned_validation()
Validation type: "partitioned"
Purpose
Validates the full historical dataset for a view. The orchestrator slices the dataset into N range-based partitions and triggers this handler once per partition. Each handler invocation runs independently and can time out and resume.
Flow
Orchestrator determines partition ranges (via discover_data_distribution())
│
├── Partition 0: [min, P1) → Function call → partitioned handler
├── Partition 1: [P1, P2) → Function call → partitioned handler
├── ...
└── Partition N: [PN, max] → Function call → partitioned handler
Each partitioned handler:
1. Load SHACL rules from CDF Files
2. Build dimension lookup from SHACL graph (O(1) enrichment per violation)
3. Load saved cursor from FunctionValidationState (if resuming)
4. Loop:
a. Build DMS sync query with partition range filter + current cursor
b. client.data_modeling.instances.sync() ← adaptive limit
c. NeatSession.validate_instances.with_shacl()
d. Extract violations, enrich with dimension/targetClass
e. post_validation_results_to_records()
f. Save cursor to FunctionValidationState (DMS upsert)
g. gc.collect() ← reclaim rdflib Graph memory
h. Check elapsed time; if > 480s → save cursor, return "partial"
5. On completion: delete cursor, return "completed"
Adaptive sync limit
On DMS timeout, the handler backs off through a limit sequence:
Each limit gets 2 attempts with a 30-second wait before stepping down. If all limits exhaust, the handler saves the cursor and returns"sync_limits_exhausted" — the orchestrator will retry the partition.
Partition range filter
For system properties (lastUpdatedTime, createdTime, externalId, space):
For view/container properties, the handler resolves the container reference via:
view_def = client.data_modeling.views.retrieve((space, external_id, version))[0]
container_ref = view_def.properties[partition_field].container
Range(property=[container_ref.space, container_ref.external_id, container_property], ...)
Dimension enrichment
The SHACL file is parsed once at startup into three lookup dicts:
- _shape_to_dim: named shape URI → dimension string (custom dq:dimension annotation)
- _path_to_dim: property path URI → dimension string
- _shape_to_targetclass: named shape URI → target class local name
These are resolved at O(1) per violation and written to the Records API record as dimensions and targetClass lists — enabling downstream filtering by data quality dimension.
6. Instance sync-cursor validation
Handler: handle_instance_sync_cursor_validation()
Validation type: "instance_sync_cursor"
Purpose
This is the ongoing validation mode that runs after historic validation completes. Instead of receiving instances in the workflow payload, it maintains its own DMS sync cursor and independently fetches all changes since the last run.
How it differs from the standard instance handler
| Aspect | instance |
instance_sync_cursor |
|---|---|---|
| Instance source | Workflow payload | instances.sync() with saved cursor |
| Triggered by | Data-change event (payload includes instances) | Data-change event (lightweight signal only) |
| Cursor state | None | Persistent in FunctionValidationState |
| Timeout handling | No | Yes — saves cursor, returns "partial" |
| Initial cutoff | N/A | initial_sync_cutoff filters first-run changes |
First-run cutoff alignment
The orchestrator injects initial_sync_cutoff (the max lastUpdatedTime seen during historic timestamp collection) into the workflow task payload. On the first invocation (no saved cursor), this is used as a Range(lastUpdatedTime >= cutoff) filter so the sync handler picks up exactly where historic validation stopped — no gap, no double-validation.
Workflow trigger mode
When use_sync_cursor_mode: true in the view config, the trigger fetches no view properties (Select(sources=[])). The trigger fires as a lightweight signal; the handler fetches fresh data via instances.sync().
7. Time series validation
Handler: handle_timeseries_validation()
Validation type: "timeseries"
Modes
Normal mode — runs on a CRON schedule (e.g. hourly):
1. Fetch time series instances via DMS filter or explicit IDs
2. Load SHACL rules from CDF Files
3. Replace "60m-ago" / "now" placeholders with actual hour-aligned timestamps
4. Run NeatSession.validate_instances.with_shacl() with auto_load_depth=0
5. Parse violations from SHACL report text
6. Post one record per time series to Records API
7. Save cursor state (PASSED/FAILED + timestamp) to TimeSeriesValidationCursor container
Backfill mode — processes historic data in time-aligned chunks:
1. Parse start_time / end_time from config (supports "30d-ago" relative syntax)
2. Align to hour boundaries
3. Resume from backfillProgress cursor if available
4. For each window_minutes-sized chunk:
- Replace SHACL time placeholders with absolute chunk timestamps
- Run validation, post records, save cursor
5. Automatically switches to Normal mode when start >= end
DMS instance fetching
By DMS filter:
filters = [HasData(views=[view_id])]
if "space" in filter_config:
filters.append(Equals(["node", "space"], filter_config["space"]))
if "external_id_prefix" in filter_config:
filters.append(Prefix(["node", "externalId"], filter_config["external_id_prefix"]))
nodes = client.data_modeling.instances.list(instance_type="node", filter=combined_filter, limit=1000)
By explicit IDs:
node_ids = [NodeId(space=item["space"], external_id=item["external_id"]) for item in instance_ids]
result = client.data_modeling.instances.retrieve(nodes=node_ids)
Time window replacement
SHACL rule files contain string literals like "60m-ago" and "now". replace_time_windows_in_shacl() in common/time_window.py substitutes these with actual ISO 8601 timestamps before passing rules to pyshacl. This enables the same SHACL file to work for both normal scheduled runs and backfill processing.
Prefetch error injection
If the INDSL or SDK SPARQL function fails to fetch time series data after retries, get_prefetch_errors() returns the affected time series IDs. The handler injects synthetic DataFetchError / Warning violations for these so Records API reflects that data was unavailable for validation.
Cursor state container
TimeSeriesValidationCursor (container in dataQuality space):
externalId: "{rule_set_id}/{space}/{ts_external_id}"
properties:
timeseriesId, ruleSetId
lastValidatedTimestamp (ISO timestamp of last run)
lastStatus ("PASSED" | "FAILED")
backfillProgress (ISO timestamp of last completed chunk, backfill only)
subscriptionCursor (reserved)
8. RAW table validation
Handler: handle_raw_validation_incremental() and handle_raw_validation_historic()
Validation types: "raw_incremental" / "raw_historic"
RAW rows are converted to RDF triples by NEAT before SHACL validation. The URI scheme is:
Subject: http://purl.org/cognite/raw/{db}/{table}/#{row_key}
Type: http://purl.org/cognite/raw/{db}/{table}/#{table_name}
Property: http://purl.org/cognite/raw/{db}/{table}/#{column_name}
Incremental mode (scheduled)
CRON trigger → Workflow → Function (raw_incremental)
1. Load RawValidationState from DMS (→ last_processed_timestamp)
2. NeatSession.validate_instances.with_shacl_raw(
db_name, table_name, shacl_rules,
min_last_updated_time=last_timestamp, ← incremental filter
limit=batch_size)
3. Parse violations from SHACL report
4. Post to Records API (one record per row)
5. Save new timestamp (current time) to RawValidationState
State container RawValidationState (keyed {db_name}_{table_name}):
last_processed_timestamp (ms epoch — used as min_last_updated_time next run)
last_run_time (ISO timestamp)
rows_processed (int)
last_validation_conforms (bool stored as "true"/"false")
last_violation_count (int)
Historic mode (cursor-based)
Orchestrator fetches N cursors via getCursors API
├── Partition 0 → cursor_0 → Function (raw_historic)
├── Partition 1 → cursor_1 → Function (raw_historic)
└── ...
Each raw_historic handler:
1. Try to resume from saved FunctionValidationState cursor
2. Loop (timeout=480s, cursor save interval=30s):
a. NeatSession.validate_instances.with_shacl_raw(
cursor=current_cursor, limit=1000)
b. Parse violations, post to Records API
c. Periodically save cursor state
d. Check completion (batch_size == 0 → done)
3. On complete: delete cursor state
Note: The RAW historic handler currently breaks out of the pagination loop after one batch. A TODO in the code marks where proper nextCursor pagination should be implemented once the NEAT API exposes it.
9. Orchestrator
Handler: handle_orchestrator()
Validation type: "orchestrator"
The orchestrator operates in two modes based on whether orchestration_id is present in the input.
Initial mode (no orchestration_id)
Called once to kick off a historic validation run:
1. Generate orchestration_id (UUID)
2. Discover data distribution:
- client.data_modeling.instances.aggregate(count) ← total instances
- client.data_modeling.instances.aggregate(min/max) ← range bounds
(falls back to instances.sync() for datetime fields)
3. Collect all partition field values (timestamp collection):
- Streams instances via sync() in two_phase mode
- Saves progress to TimestampCollectionState (for resumption)
4. Compute N equal-width partition ranges from collected values
5. Save OrchestrationState to DMS
6. Create Function schedule (every 2 min) for monitor mode
7. Trigger all N partitions:
client.functions.call(external_id="data-quality-validation",
data={validation_type: "partitioned", ...},
wait=False)
8. Create sync trigger (workflow trigger in no_backfill mode)
with initial_sync_cutoff = max partition date seen
9. Return immediately
Monitor mode (orchestration_id present)
Called every 2 minutes by the Function schedule:
1. Load OrchestrationState from DMS
2. check_partition_statuses():
- For each partition call_id: client.functions.calls.retrieve()
- Map status: Completed+completed → done, partial/error → retry, Failed/Timeout → retry or fail
3. Retrigger incomplete partitions (up to max_retries=3)
4. Update OrchestrationState with progress
5. If all partitions done or max retries exhausted:
- Delete Function schedule
- Delete OrchestrationState
- Return final summary
Timestamp collection with state
Because collecting all values from a large view can take longer than a Function timeout, the orchestrator saves progress in TimestampCollectionState (DMS node). If interrupted, the next invocation resumes from where it left off rather than restarting.
Partition count selection
The orchestrator calculates partition count as:
Defaults:instances_per_partition=10000, max_partitions=50. These are configurable per view.
10. Workflow design
Instance validation workflows (real-time)
Workflow: dq-shacl-{view_external_id}
└── Trigger: DataModeling subscription
filter: HasData(views=[view_id]) AND In(space, instance_spaces)
sync_mode: "two_phase" (standard) or "no_backfill" (sync-cursor mode)
batch_size: 100 (configurable)
batch_timeout: 300s
└── Task 1: FunctionTask
function: data-quality-validation
data: {
validation_type: "instance" | "instance_sync_cursor",
shacl_rules_file_external_id: "...",
datamodel_space: "...",
...
records_config: { stream_id, rule_set_id, rule_set_version }
}
The workflow trigger receives batches of changed instances from DMS and forwards them to the Function. In sync-cursor mode, the trigger payload carries only metadata (no view properties); the handler fetches full data via its own instances.sync() call.
Time series validation workflows (scheduled)
Workflow: dq-ts-{config_name}
└── Trigger: CRON ("0 * * * *" for hourly)
└── Task 1: FunctionTask
function: data-quality-validation
data: {
validation_type: "timeseries",
shacl_rules_file_external_id: "...",
filter: { space, external_id_prefix },
backfill: { enabled, start_time, end_time, window_minutes },
records_config: { ... }
}
RAW table validation workflows (scheduled)
Workflow: dq-raw-{table_name}
└── Trigger: CRON ("*/15 * * * *" for every 15 min)
└── Task 1: FunctionTask
function: data-quality-validation
data: {
validation_type: "raw_incremental",
db_name: "...",
table_name: "...",
batch_size: 10000,
records_config: { ... }
}
Historic validation flow (orchestrated)
Manual/API trigger → Workflow (single task)
└── Task 1: FunctionTask → orchestrator (initial mode)
Returns immediately after triggering N partition Functions
and creating the monitor Function schedule
(running in background every 2 minutes):
Function schedule → orchestrator (monitor mode)
Checks partition statuses, retriggers failures, cleans up on completion
Sync trigger creation
After historic completion, the orchestrator calls create_sync_trigger() which:
1. Updates the workflow version to embed job_run_id and initial_sync_cutoff in the task payload
2. Upserts a WorkflowTriggerUpsert with no_backfill sync mode
3. Uses the same external_id sync-{view_external_id} each time — so re-running historic validation idempotently replaces the existing trigger
11. Records API integration
All validation results are posted to the CDF Records API (alpha endpoint). The integration is in common/records_api.py.
Endpoint
Record structure
One record per validated instance (whether passed or failed):
{
"space": "dataQuality",
"externalId": "dq_{rule_set_id}_{instance_external_id}_{job_run_id}",
"sources": [{
"source": {
"type": "container",
"space": "dataQuality",
"externalId": "DataQualityValidationRecord"
},
"properties": {
"ruleSetId": "MyViewSHACLv1",
"ruleSetVersion": "1.0",
"jobRunId": "historic_0_1711234567",
"passedValidation": false,
"resultSeverity": ["Violation"],
"failedConstraints": ["MinCountConstraintComponent::my_property"],
"focusNode": "http://purl.org/cognite/my_space/my_instance",
"focusNodeInstance": { "space": "my_space", "externalId": "my_instance" },
"sourceShape": ["http://example.org/MyShape"],
"dimensions": ["Completeness"],
"targetClass": ["MyEquipment"],
"validationReport": {
"violationCount": 1,
"violations": [{
"sourceConstraintComponent": "MinCountConstraintComponent",
"resultMessage": "my_property is required",
"resultSeverity": "Violation",
"resultPath": "http://purl.org/cognite/my_space/my_property",
"dimension": "Completeness"
}],
"summary": "1 violation(s)"
}
}
}]
}
For RAW rows: focusNodeInstance is omitted (space is empty string, not a DMS space). The focusNode URI uses the RAW namespace: http://purl.org/cognite/raw/{db}/{table}/#{row_key}.
Batching and retry
Records are posted in batches of 1000. Each batch gets up to 3 attempts with exponential backoff (1s → 2s → 4s) for 5xx / gateway / timeout errors. If a batch fails after retries, it falls back to individual record posting. Errors from individual records are collected and returned to the caller (which checks the 10% failure threshold).
Passed instances
Instances with no violations still get a record posted with passedValidation: true and failedConstraints: []. This is deliberate — it creates a complete audit trail of every instance that was checked.
constraint string format
Failed constraint strings are formatted as "{component}::{resultPath}" when a path is available, e.g. "MinCountConstraintComponent::http://purl.org/cognite/ns/my_property". This allows downstream queries to filter by both constraint type and affected property.
12. State management (DMS containers)
All runtime state is stored in the dataQuality DMS space. Containers are created by scripts/ensure_container.py.
FunctionValidationState
Purpose: Cursor resumption for partitioned and sync-cursor validation.
| Property | Type | Description |
|---|---|---|
orchestration_id |
String | Links to orchestration run |
partitionId |
String | Partition identifier (or "sync" for sync-cursor) |
syncCursor |
String | DMS sync cursor from instances.sync() |
processedCount |
Int | Instances processed so far |
lastUpdated |
Int (ms) | Epoch milliseconds of last write |
External ID patterns:
- Partitioned: partition_state_orch_{orchestration_id}_p{partition_id}
- Sync cursor: sync_cursor_{job_run_id}_{view_space}_{view_external_id}_{instance_space}
- RAW historic: raw_{db_name}_{table_name}_{partition_id}
RawValidationState
Purpose: Incremental watermark for RAW table validation.
| Property | Type | Description |
|---|---|---|
db_name |
String | RAW database name |
table_name |
String | RAW table name |
last_processed_timestamp |
Int (ms) | Min lastUpdatedTime for next run |
last_run_time |
Timestamp | ISO timestamp of last run |
rows_processed |
Int | Rows processed in last run |
last_validation_conforms |
String | "true" or "false" |
last_violation_count |
Int | Violations in last run |
External ID: {db_name}_{table_name}
OrchestrationState
Purpose: Tracks overall historic orchestration progress.
| Property | Type | Description |
|---|---|---|
orchestration_id |
String | UUID for this orchestration run |
view_external_id |
String | View being validated |
partition_count |
Int | Total partitions |
instance_space |
String | JSON-encoded list of instance spaces |
completed_partitions |
JSON | Map of completed partition IDs |
failed_partitions |
JSON | Map of permanently failed partition IDs |
partition_call_ids |
JSON | Map of partition_id → function call_id |
partition_retry_count |
JSON | Map of partition_id → retry count |
TimestampCollectionState
Purpose: Checkpoint for orchestrator timestamp collection (allows resumption across Function timeouts).
TimeSeriesValidationCursor
Purpose: Per-time-series validation state (last run, backfill progress).
13. Fault-tolerance and timeout handling
Function timeout constraint
CDF Functions time out after 10 minutes. All long-running handlers use an internal max_runtime_seconds=480 (8 minutes) safety margin to save state before the timeout kills them.
Cursor save on timeout
elapsed = time.time() - start_time
if elapsed > max_runtime_seconds:
save_cursor(...) # persist progress to DMS
return {"status": "partial", ...}
The cursor is saved after each successfully posted chunk (not before) so it always points to the start of unprocessed data.
Retry architecture (partitioned)
Partition Function call
├── Completes successfully → "completed" → orchestrator removes from retry list
├── Times out (status "partial") → orchestrator retriggers with same cursor
├── DMS sync timeout → adaptive limit reduction → eventually "sync_limits_exhausted"
└── Unhandled exception → "error" status → orchestrator retriggers (max 3 times)
└── After 3 failures → marked as "failed" in OrchestrationState
GC between chunks
After each chunk in partitioned and sync-cursor modes:
This explicitly releases the rdflib in-memory graph so memory doesn't accumulate across chunks.NeatSession is re-instantiated fresh per chunk.
Records API failure threshold
If > 10% of Records API posts fail within a single chunk, the handler raises RuntimeError. The cursor was saved before the post attempt, so the orchestrator will retrigger the partition from the same point.
14. End-to-end data flows
A. Real-time instance validation
User upserts DMS instance
↓
CDF detects change in subscribed view
↓
Workflow trigger fires with batch of changed nodes (up to batch_size=100)
↓
Workflow executes FunctionTask → data-quality-validation
├── validation_type: "instance"
├── instances: {items: {n: [node dicts], e: [edge dicts]}}
└── ...config...
↓
handler.py → handle_instance_validation()
↓
NEAT loads SHACL rules from CDF Files (cached in runtime)
↓
NEAT auto-loads referenced nodes (depth 2) via DMS API
↓
pyshacl validates RDF graph against SHACL shapes
↓
Violations extracted from rdflib report graph
↓
post_validation_results_to_records() → 1 record per instance
↓
Records API stream (cdf-version: alpha)
graph TD
A[DMS Instance Change] --> B[DataModelingTrigger\nbatch_size: 100, timeout: 60s]
B --> C[Validation Workflow\ndq-shacl-view-name]
C --> D[Cognite Function\ndata-quality-validation\ntype: instance]
D --> E{Load SHACL Rules}
E -->|RuleSet API| F[RuleSet References\nruleset_references]
E -->|CDF Files| G[CDF File\nshacl_rules_file_external_id]
E -->|Inline| H[Inline Turtle\nshacl_rules]
D --> I[Extract Instances\nfrom workflow payload]
I --> J[Filter deleted instances\nwith deletedTime set]
J --> K[Fetch full instance data\nfrom DMS multi-spaces]
K --> L[Auto-load References\ndepth: 0-3 levels]
F --> M[NeatSession\nSHACL Validation\npyshacl engine]
G --> M
H --> M
L --> M
M --> N{Conforms?}
N -->|Yes| O[Records API\nPASSED]
N -->|No| P[Records API\nFAILED + violations\nconstraint, path, message, severity]
O --> Q[DataQualityValidationRecord\ndataQuality space]
P --> Q
B. Historic instance validation
API or manual trigger → Workflow → orchestrator (initial mode)
↓
discover_data_distribution():
client.instances.aggregate(count) + aggregate(min) + aggregate(max)
↓
collect_all_timestamps():
instances.sync() in chunks → all partition field values
(saves progress to TimestampCollectionState)
↓
compute N equal partition ranges
↓
create Function schedule (monitor, every 2 min)
↓
trigger N partition functions (async, no wait)
↓
create_sync_trigger() with initial_sync_cutoff = max partition date
↓
return immediately
--- meanwhile ---
Each partition function runs independently:
instances.sync() chunk by chunk (with adaptive limit)
→ NEAT validate → post to Records API → save cursor
On timeout: save cursor, return "partial"
On complete: delete cursor, return "completed"
--- every 2 minutes ---
Function schedule → orchestrator (monitor mode)
check_partition_statuses():
functions.calls.retrieve() for each partition
→ retrigger partial/error/failed partitions (max 3x)
When all done: delete schedule, delete OrchestrationState
graph TD
A[Manual Trigger\nor Scheduled] --> B[Cognite Function\ntype: orchestrator]
B --> C{First Run?}
C -->|Yes - Initial Mode| D[Query DMS\nfor data distribution]
D --> E[Calculate N partition ranges\nmin/max field splits]
E --> F[Trigger N partition functions\nnon-blocking]
F --> G[Create sync cursor trigger\nfor ongoing changes]
F --> H[Create 2-min monitor schedule\norchestration_status]
C -->|No - Monitor Mode| I[Load OrchestrationState\nfrom DMS container]
I --> J[Check partition statuses]
J --> K{All complete?}
K -->|No| L[Retrigger incomplete\npartitions max 3x]
K -->|Yes| M[Delete schedule & state]
subgraph "Partition Worker (type: partitioned)"
N[Load cursor if resuming] --> O[Query DMS range filter\nfield IN min..max]
O --> P[Validate chunk 200 instances]
P --> Q[Post to Records API]
Q --> R{480s timeout?}
R -->|No| O
R -->|Yes| S[Save cursor state\nFunctionValidationState]
end
F --> N
L --> N
C. Ongoing sync-cursor validation
Any DMS change in subscribed view
↓
Workflow trigger fires (lightweight — no view properties in payload)
↓
Workflow → data-quality-validation (validation_type: "instance_sync_cursor")
↓
Load sync cursor from FunctionValidationState
↓
Loop: instances.sync(two_phase, filter includes cutoff on first run)
├── Chunk → NEAT validate → post Records API → save cursor
└── Timeout: save cursor, return "partial" (next trigger continues)
↓
All caught up: save cursor, return "completed"
graph LR
A[DMS Change Event\nlightweight signal] --> B[Cognite Function\ntype: instance_sync_cursor]
B --> C[Load cursor state\nFunctionValidationState]
C --> D[Load SHACL Rules\nRuleSet API or CDF Files]
D --> E[instances.sync\nfetch changes since cursor]
E --> F[Filter deleted instances]
F --> G[Validate chunk\nNeatSession + SHACL]
G --> H[Post to Records API]
H --> I{More changes\nor 480s timeout?}
I -->|More changes| E
I -->|Timeout| J[Save cursor\nfor resumption]
I -->|Done| K[Save final cursor]
D. Time series validation (scheduled)
CRON trigger (e.g. hourly)
↓
Workflow → data-quality-validation (validation_type: "timeseries")
↓
Fetch CogniteTimeSeries nodes from DMS (filter or explicit IDs)
↓
Load SHACL rules from CDF Files
↓
Replace time window placeholders with actual timestamps:
"60m-ago" → ISO timestamp, "now" → ISO timestamp
↓
NEAT.validate_instances.with_shacl(auto_load_depth=0)
SHACL rules call CDF SDK / INDSL SPARQL functions to fetch datapoints
↓
Parse SHACL text report for violations per time series
↓
Post 1 record per time series to Records API (with time_window in externalId)
↓
Save cursor state (PASSED/FAILED + timestamp) to TimeSeriesValidationCursor
graph LR
A[Scheduled Trigger\ncron] --> B[Cognite Function\ndata-quality-validation\ntype: timeseries]
B --> C[Load SHACL Rules\nfrom CDF Files]
B --> D{Selection Method}
D -->|DMS Filter| E[Filter CogniteTimeSeries\nby prefix or properties]
D -->|Instance IDs| F[Explicit list\nof time series]
E --> G[Fetch Time Series\nfrom DMS]
F --> G
C --> H[NEAT Validation\nCDF SDK + INDSL functions]
G --> H
H --> I{Backfill Mode?}
I -->|Yes| J[Process historic chunks\nhour-aligned windows]
I -->|No| K[Validate last\ncomplete hour]
J --> L[Save cursor state\nfor resume]
K --> M[Records API\nPer time series result]
L --> M
M --> N[DataQualityValidationRecord]
E. RAW table validation (incremental)
CRON trigger (e.g. every 15 min)
↓
Workflow → data-quality-validation (validation_type: "raw_incremental")
↓
Load last_processed_timestamp from RawValidationState
↓
NEAT.validate_instances.with_shacl_raw(
min_last_updated_time=last_timestamp,
limit=batch_size)
(fetches RAW rows updated since last run, converts to RDF, runs SHACL)
↓
Parse violations, fetch full row data for Records API
↓
post_validation_results_to_records() with RAW namespace_base
↓
Save new timestamp (current time) to RawValidationState
graph LR
A[Scheduled Trigger\ncron] --> B{Validation Mode}
B -->|Incremental| C[Cognite Function\ntype: raw_incremental]
C --> D[Load last_processed_timestamp\nRawValidationState container]
D --> E[Fetch RAW rows\nupdated since timestamp]
E --> F[Convert rows to RDF\nusing RAW URI scheme]
F --> G[SHACL Validation\nper-row constraints]
G --> H[Post to Records API]
H --> I[Save new timestamp\nRawValidationState]
B -->|Historic| J[Cognite Function\ntype: raw_historic]
J --> K[getCursors API\ndistribute N cursors]
K --> L[Process cursor chunk\nwith SHACL]
L --> M{480s timeout?}
M -->|No| L
M -->|Yes| N[Save cursor state\nFunctionValidationState]
15. Combined lifecycle: historic batch + sync trigger
Overview
Every instance validation pipeline operates in two sequential phases that together provide complete, continuous coverage:
- Phase 1 — Historic batch: The orchestrator validates all existing instances at a point in time using parallel partitioned workers.
- Phase 2 — Ongoing sync: A DMS change-subscription trigger watches for new and updated instances and validates them in near-real-time.
The two phases are stitched together by a shared initial_sync_cutoff timestamp so no instance falls through the gap between them.
Phase 1: Historic partitioned validation
When deploy_validation_pipeline() or a manual orchestrator invocation kicks off a historic run:
-
Timestamp collection — The orchestrator streams all
partition_fieldvalues from DMS viainstances.sync(two_phase)and recordssync_cutoff_timestamp = max(partition_field_values_seen). Progress is checkpointed inTimestampCollectionStateso a Function timeout does not restart the scan from scratch. -
Partition triggering — The orchestrator computes N equal-width ranges and fires N independent
partitionedFunction calls (without waiting). Each runs concurrently and resumes via aFunctionValidationStatecursor if it times out. -
Sync trigger creation (Step 6) — Before returning, the orchestrator calls
create_sync_trigger(), which: - Patches the existing view workflow version, injecting
job_run_idandinitial_sync_cutoff = sync_cutoff_timestampinto the task payload. - Creates a
WorkflowTriggerUpsertwithsync_mode="no_backfill"so the trigger only delivers changes going forward from the moment it is created.
The sync trigger goes live at this point — Phase 2 is active even while Phase 1 partitions are still running.
Phase 2: Ongoing sync validation
Standard instance mode (use_sync_cursor_mode: false)
The DMS change subscription delivers each batch of changed nodes directly in the workflow payload. The instance handler receives and validates them immediately. No initial_sync_cutoff filtering is applied — every arriving batch is validated as-is.
Sync-cursor mode (use_sync_cursor_mode: true)
The trigger fires as a lightweight signal (the workflow Select fetches no view properties). The instance_sync_cursor handler fetches its own data:
- Loads
syncCursorfromFunctionValidationState(key:sync_cursor_{job_run_id}_{view_external_id}). - On the first invocation (no saved cursor): applies
Range(lastUpdatedTime >= initial_sync_cutoff)so it starts exactly where timestamp collection ended. - Pages through
instances.sync(two_phase)chunk by chunk, validating and posting records to the Records API. - Saves cursor after each successful chunk. If 480 s elapses it returns
"partial"— the next trigger fires and continues from the saved position.
This mode requires max_concurrent_executions: 1 on the workflow to prevent two executions from racing on the same cursor.
The handoff: ensuring no gap
T0 T1 T2
│ │ │
│←─ Timestamp collection ──►│ │
│ │ │
│ │◄──── Historic partitions ─────►│
│ │ │
│ │◄──── Sync trigger live ────────────────►
│ │ (no_backfill from T1) │
│ ↑
│ sync_cutoff_timestamp injected as
│ initial_sync_cutoff into Phase 2 handler
| Time span | Covered by |
|---|---|
| Before T1 (historic data) | Phase 1 partition ranges (partition_field ≤ sync_cutoff_timestamp) |
| T1 onwards (new changes) | Phase 2 sync trigger (no_backfill, starts at T1) |
| Between T1 and T2 (changes during partition runs) | Both phases — Phase 2 record overwrites Phase 1 record |
Any instance updated before T1 has lastUpdatedTime ≤ sync_cutoff_timestamp and falls inside a partition range, so it is guaranteed to be processed by Phase 1.
Any instance updated at or after T1 is delivered by the sync trigger:
- Standard mode: arrives directly in the payload.
- Sync-cursor mode: first-run initial_sync_cutoff filter ensures the cursor opens at sync_cutoff_timestamp, catching all changes from T1 onwards.
Instances updated between T1 and T2 (while partitions are still running) may be validated by both phases. This is intentional — the later Phase 2 record shares the same externalId pattern as the Phase 1 record and overwrites it in the Records API, so the final state always reflects the most recent validation.
Idempotency
The sync trigger external ID is always sync-{view_external_id} (e.g. sync-pump). Re-running historic validation upserts the same trigger with a fresh job_run_id and initial_sync_cutoff, seamlessly replacing the previous one. Any FunctionValidationState cursor from the old job_run_id is orphaned but harmless; the sync handler creates a new cursor under the new job_run_id.
Configuration reference
ViewConfig field |
Effect on pipeline |
|---|---|
use_sync_cursor_mode: true |
Phase 2 handler fetches its own data via sync cursor. Requires max_concurrent_executions: 1. |
use_sync_cursor_mode: false (default) |
Phase 2 handler receives instances directly in the trigger payload. |
partition_count |
Number of Phase 1 parallel workers (default: 10). |
partition_field |
DMS field used to slice Phase 1 ranges (default: lastUpdatedTime). |
sync_batch_size |
Overrides trigger.batch_size for this view's sync trigger. |
max_concurrent_executions |
Max simultaneous workflow executions. Must be 1 for sync-cursor mode. |
Failure scenarios
A partition fails permanently (3 retries exhausted):
The orchestrator records it in OrchestrationState.failed_partitions and continues. Phase 2 is already live — instances from the failed partition range will be validated the next time they change (sync trigger) or when historic is re-run manually.
Sync trigger fires before a sync-cursor run completes:
max_concurrent_executions: 1 prevents a second execution from starting. CDF queues the trigger event and fires it after the current execution finishes.
Historic re-run while Phase 2 is active:
The orchestrator upserts a new sync trigger under the same external ID. Any in-flight Phase 2 execution finishes with the old job_run_id; subsequent ones use the new job_run_id cursor, starting from the new initial_sync_cutoff.