Skip to content

cognite-data-quality — Deep Architecture Reference

Scope: This document covers the internal design of the cognite-data-quality Python package and the CDF infrastructure it drives. It is aimed at engineers who need to understand, extend, or debug the system. For a higher-level overview see ARCHITECTURE.md.


Table of Contents

  1. Package layout
  2. Single-function dispatch model
  3. DMS API usage patterns
  4. Instance validation — sync (real-time)
  5. Instance validation — historic (partitioned)
  6. Instance sync-cursor validation
  7. Time series validation
  8. RAW table validation
  9. Orchestrator
  10. Workflow design
  11. Records API integration
  12. State management (DMS containers)
  13. Fault-tolerance and timeout handling
  14. End-to-end data flows
  15. Combined lifecycle: historic batch + sync trigger

1. Package layout

cognite_data_quality/
├── _function_code/
│   ├── handler.py                          # CDF Function entry-point (dispatcher)
│   ├── handlers/
│   │   ├── instance_validation.py          # Real-time DMS validation
│   │   ├── instance_sync_cursor_validation.py  # Cursor-based ongoing sync validation
│   │   ├── partitioned_validation.py       # Historic DMS partition validation
│   │   ├── orchestrator.py                 # Historic orchestration controller
│   │   ├── orchestration_status.py         # Orchestration status monitoring
│   │   ├── timeseries_validation.py        # Time series SHACL validation
│   │   ├── raw_validation.py               # RAW table validation
│   │   └── test_rule.py                    # Dry-run SHACL rule testing
│   └── common/
│       ├── cursor_state.py                 # DMS-backed cursor state (partitioned + TS)
│       ├── records_api.py                  # Records API posting with batching + retry
│       ├── shacl_utils.py                  # SHACL file loading from CDF Files
│       ├── time_window.py                  # Time-window replacement in SHACL rules
│       ├── timestamp_collection_state.py   # Orchestrator timestamp-collection state
│       └── raw_state.py                    # RAW incremental state helpers

All handlers live in one deployed CDF Function. The handler.py entry-point routes by validation_type.


2. Single-function dispatch model

CDF allows only one Python entry-point per Function. Rather than deploying one Function per validation type, the package deploys a single Function (data-quality-validation) that inspects the validation_type field of its input data dict and forwards the call:

handler.handle(data, client, secrets)
  ├── "instance"              → handle_instance_validation()
  ├── "instance_sync_cursor"  → handle_instance_sync_cursor_validation()
  ├── "partitioned"           → handle_partitioned_validation()
  ├── "timeseries"            → handle_timeseries_validation()
  ├── "orchestrator"          → handle_orchestrator()
  ├── "orchestration_status"  → handle_orchestration_status()
  ├── "raw_incremental"       → handle_raw_validation_incremental()
  ├── "raw_historic"          → handle_raw_validation_historic()
  └── "test"                  → handle_test_rule()

Secrets (client-id, client-secret) are passed to all handlers but only actively consumed by the orchestrator, which uses them to create Function schedules and workflow triggers with dedicated credentials (to avoid nested OAuth token chains).

Version logging: On every invocation, handler.py logs cognite-data-quality.__version__ — this is the primary way to confirm which package version is executing inside CDF.


3. DMS API usage patterns

The system makes heavy use of the CDF Data Modeling Service (DMS) for:

3.1 Querying instances

Operation API call Used by
List instances with filter client.data_modeling.instances.list() TS validation, orchestrator count
Sync (paginated, with cursor) client.data_modeling.instances.sync(query) partitioned, sync-cursor handlers
Retrieve by NodeId client.data_modeling.instances.retrieve() RAW state load, cursor state load
Aggregate (count/min/max) client.data_modeling.instances.aggregate() Orchestrator data distribution
Apply (upsert) client.data_modeling.instances.apply() All cursor state writes
Delete client.data_modeling.instances.delete() Cursor cleanup after completion

3.2 Sync query structure

The instances.sync() call is the workhorse for both historic partitioned and ongoing sync-cursor modes. A typical query looks like:

Query(
    with_={
        "nodes": NodeResultSetExpression(
            filter=And(
                HasData(views=[view_id]),
                In(["node", "space"], instance_spaces),
                Range(property=["node", partition_field], gte=min_val, lt=max_val),
            ),
            limit=chunk_size,        # adaptive; steps down on timeout
            sync_mode="two_phase",   # ensures stable cursor semantics
        )
    },
    select={
        "nodes": Select(sources=[SourceSelector(source=view_id, properties=[])])
    },
    cursors={"nodes": current_cursor} if current_cursor else None,
)
result = client.data_modeling.instances.sync(query=query)

sync_mode="two_phase" is used for historic/partitioned to get a stable snapshot with reliable cursor pagination. sync_mode="no_backfill" is used for the workflow trigger (see §10) so it watches only future changes.

3.3 Filter construction

Filters are assembled from cognite.client.data_classes.filters:

  • HasData(views=[view_id]) — restricts to nodes that have data in the view
  • In(["node", "space"], instance_spaces) — restricts to declared instance spaces
  • Range(property=[...], gte=..., lt=...) — partition range filter
  • System properties: ["node", "lastUpdatedTime"]
  • View/container properties: resolved to [container_space, container_id, container_property] via client.data_modeling.views.retrieve()

3.4 Property access pattern

When nodes come back from sync(), they are converted to plain dicts with node.dump(camel_case=True) so they can be passed to NEAT's validate_instances.with_shacl() unchanged. Properties are nested under properties → {view_id} → {property_name}.


4. Instance validation — sync (real-time)

Handler: handle_instance_validation() Validation type: "instance"

Flow

CDF Data Change Event
Workflow Trigger (DM subscription)
        │  payload: {"items": {"n": [...nodes...], "e": [...edges...]}}
CDF Workflow step → invokes Function with validation_type="instance"
handler.py → handle_instance_validation()
        ├── Extract instances from payload (items.n + items.e)
        ├── Filter deleted instances (deletedTime set → skip)
        ├── Load SHACL rules from CDF Files (by external_id)
        ├── NeatSession.validate_instances.with_shacl(
        │       instances, shacl_rules, datamodel, auto_load_depth=2)
        ├── Parse rdflib report graph → list[violation dicts]
        └── post_validation_results_to_records() → Records API

Key behaviours

  • Deleted instance filtering: Instances with deletedTime set are silently skipped. This handles delete events from CDF subscriptions.
  • SHACL engine failure handling: If pyshacl returns a non-Graph object (internal crash), synthetic SHACLEngineFailure violations are created for all instances in the batch so Records API always reflects the validation attempt.
  • 10% failure threshold: If more than 10% of Records API posts fail, the handler raises RuntimeError to mark the function call as failed (triggering CDF retry logic).
  • Auto-load depth 2: NEAT fetches the instance's direct references AND their references before running SHACL. This is configurable per view.

Input payload structure

{
  "validation_type": "instance",
  "instances": {
    "items": {
      "n": [ /* node dicts */ ],
      "e": [ /* edge dicts */ ]
    }
  },
  "shacl_rules_file_external_id": "my-view-shacl-rules",
  "datamodel_space": "my_space",
  "datamodel_external_id": "MyDataModel",
  "datamodel_version": "v1",
  "auto_load_depth": 2,
  "records_config": {
    "stream_id": "dq_stream",
    "rule_set_id": "MyViewSHACLv1",
    "rule_set_version": "1.0"
  }
}

5. Instance validation — historic (partitioned)

Handler: handle_partitioned_validation() Validation type: "partitioned"

Purpose

Validates the full historical dataset for a view. The orchestrator slices the dataset into N range-based partitions and triggers this handler once per partition. Each handler invocation runs independently and can time out and resume.

Flow

Orchestrator determines partition ranges (via discover_data_distribution())
        ├── Partition 0: [min, P1)  → Function call → partitioned handler
        ├── Partition 1: [P1,  P2)  → Function call → partitioned handler
        ├── ...
        └── Partition N: [PN,  max] → Function call → partitioned handler

Each partitioned handler:
  1. Load SHACL rules from CDF Files
  2. Build dimension lookup from SHACL graph (O(1) enrichment per violation)
  3. Load saved cursor from FunctionValidationState (if resuming)
  4. Loop:
       a. Build DMS sync query with partition range filter + current cursor
       b. client.data_modeling.instances.sync()  ← adaptive limit
       c. NeatSession.validate_instances.with_shacl()
       d. Extract violations, enrich with dimension/targetClass
       e. post_validation_results_to_records()
       f. Save cursor to FunctionValidationState (DMS upsert)
       g. gc.collect()  ← reclaim rdflib Graph memory
       h. Check elapsed time; if > 480s → save cursor, return "partial"
  5. On completion: delete cursor, return "completed"

Adaptive sync limit

On DMS timeout, the handler backs off through a limit sequence:

[chunk_size, 1000, 500, 200, 100, 10]
Each limit gets 2 attempts with a 30-second wait before stepping down. If all limits exhaust, the handler saves the cursor and returns "sync_limits_exhausted" — the orchestrator will retry the partition.

Partition range filter

For system properties (lastUpdatedTime, createdTime, externalId, space):

Range(property=["node", partition_field], gte=min_val, lt=max_val)

For view/container properties, the handler resolves the container reference via:

view_def = client.data_modeling.views.retrieve((space, external_id, version))[0]
container_ref = view_def.properties[partition_field].container
Range(property=[container_ref.space, container_ref.external_id, container_property], ...)

Dimension enrichment

The SHACL file is parsed once at startup into three lookup dicts: - _shape_to_dim: named shape URI → dimension string (custom dq:dimension annotation) - _path_to_dim: property path URI → dimension string - _shape_to_targetclass: named shape URI → target class local name

These are resolved at O(1) per violation and written to the Records API record as dimensions and targetClass lists — enabling downstream filtering by data quality dimension.


6. Instance sync-cursor validation

Handler: handle_instance_sync_cursor_validation() Validation type: "instance_sync_cursor"

Purpose

This is the ongoing validation mode that runs after historic validation completes. Instead of receiving instances in the workflow payload, it maintains its own DMS sync cursor and independently fetches all changes since the last run.

How it differs from the standard instance handler

Aspect instance instance_sync_cursor
Instance source Workflow payload instances.sync() with saved cursor
Triggered by Data-change event (payload includes instances) Data-change event (lightweight signal only)
Cursor state None Persistent in FunctionValidationState
Timeout handling No Yes — saves cursor, returns "partial"
Initial cutoff N/A initial_sync_cutoff filters first-run changes

First-run cutoff alignment

The orchestrator injects initial_sync_cutoff (the max lastUpdatedTime seen during historic timestamp collection) into the workflow task payload. On the first invocation (no saved cursor), this is used as a Range(lastUpdatedTime >= cutoff) filter so the sync handler picks up exactly where historic validation stopped — no gap, no double-validation.

Workflow trigger mode

When use_sync_cursor_mode: true in the view config, the trigger fetches no view properties (Select(sources=[])). The trigger fires as a lightweight signal; the handler fetches fresh data via instances.sync().


7. Time series validation

Handler: handle_timeseries_validation() Validation type: "timeseries"

Modes

Normal mode — runs on a CRON schedule (e.g. hourly): 1. Fetch time series instances via DMS filter or explicit IDs 2. Load SHACL rules from CDF Files 3. Replace "60m-ago" / "now" placeholders with actual hour-aligned timestamps 4. Run NeatSession.validate_instances.with_shacl() with auto_load_depth=0 5. Parse violations from SHACL report text 6. Post one record per time series to Records API 7. Save cursor state (PASSED/FAILED + timestamp) to TimeSeriesValidationCursor container

Backfill mode — processes historic data in time-aligned chunks: 1. Parse start_time / end_time from config (supports "30d-ago" relative syntax) 2. Align to hour boundaries 3. Resume from backfillProgress cursor if available 4. For each window_minutes-sized chunk: - Replace SHACL time placeholders with absolute chunk timestamps - Run validation, post records, save cursor 5. Automatically switches to Normal mode when start >= end

DMS instance fetching

By DMS filter:

filters = [HasData(views=[view_id])]
if "space" in filter_config:
    filters.append(Equals(["node", "space"], filter_config["space"]))
if "external_id_prefix" in filter_config:
    filters.append(Prefix(["node", "externalId"], filter_config["external_id_prefix"]))
nodes = client.data_modeling.instances.list(instance_type="node", filter=combined_filter, limit=1000)

By explicit IDs:

node_ids = [NodeId(space=item["space"], external_id=item["external_id"]) for item in instance_ids]
result = client.data_modeling.instances.retrieve(nodes=node_ids)

Time window replacement

SHACL rule files contain string literals like "60m-ago" and "now". replace_time_windows_in_shacl() in common/time_window.py substitutes these with actual ISO 8601 timestamps before passing rules to pyshacl. This enables the same SHACL file to work for both normal scheduled runs and backfill processing.

Prefetch error injection

If the INDSL or SDK SPARQL function fails to fetch time series data after retries, get_prefetch_errors() returns the affected time series IDs. The handler injects synthetic DataFetchError / Warning violations for these so Records API reflects that data was unavailable for validation.

Cursor state container

TimeSeriesValidationCursor (container in dataQuality space):

externalId: "{rule_set_id}/{space}/{ts_external_id}"
properties:
  timeseriesId, ruleSetId
  lastValidatedTimestamp  (ISO timestamp of last run)
  lastStatus              ("PASSED" | "FAILED")
  backfillProgress        (ISO timestamp of last completed chunk, backfill only)
  subscriptionCursor      (reserved)


8. RAW table validation

Handler: handle_raw_validation_incremental() and handle_raw_validation_historic() Validation types: "raw_incremental" / "raw_historic"

RAW rows are converted to RDF triples by NEAT before SHACL validation. The URI scheme is:

Subject:   http://purl.org/cognite/raw/{db}/{table}/#{row_key}
Type:      http://purl.org/cognite/raw/{db}/{table}/#{table_name}
Property:  http://purl.org/cognite/raw/{db}/{table}/#{column_name}

Incremental mode (scheduled)

CRON trigger → Workflow → Function (raw_incremental)
  1. Load RawValidationState from DMS (→ last_processed_timestamp)
  2. NeatSession.validate_instances.with_shacl_raw(
         db_name, table_name, shacl_rules,
         min_last_updated_time=last_timestamp,   ← incremental filter
         limit=batch_size)
  3. Parse violations from SHACL report
  4. Post to Records API (one record per row)
  5. Save new timestamp (current time) to RawValidationState

State container RawValidationState (keyed {db_name}_{table_name}):

last_processed_timestamp   (ms epoch — used as min_last_updated_time next run)
last_run_time              (ISO timestamp)
rows_processed             (int)
last_validation_conforms   (bool stored as "true"/"false")
last_violation_count       (int)

Historic mode (cursor-based)

Orchestrator fetches N cursors via getCursors API
  ├── Partition 0 → cursor_0 → Function (raw_historic)
  ├── Partition 1 → cursor_1 → Function (raw_historic)
  └── ...

Each raw_historic handler:
  1. Try to resume from saved FunctionValidationState cursor
  2. Loop (timeout=480s, cursor save interval=30s):
       a. NeatSession.validate_instances.with_shacl_raw(
              cursor=current_cursor, limit=1000)
       b. Parse violations, post to Records API
       c. Periodically save cursor state
       d. Check completion (batch_size == 0 → done)
  3. On complete: delete cursor state

Note: The RAW historic handler currently breaks out of the pagination loop after one batch. A TODO in the code marks where proper nextCursor pagination should be implemented once the NEAT API exposes it.


9. Orchestrator

Handler: handle_orchestrator() Validation type: "orchestrator"

The orchestrator operates in two modes based on whether orchestration_id is present in the input.

Initial mode (no orchestration_id)

Called once to kick off a historic validation run:

1. Generate orchestration_id  (UUID)
2. Discover data distribution:
     - client.data_modeling.instances.aggregate(count)     ← total instances
     - client.data_modeling.instances.aggregate(min/max)   ← range bounds
       (falls back to instances.sync() for datetime fields)
3. Collect all partition field values (timestamp collection):
     - Streams instances via sync() in two_phase mode
     - Saves progress to TimestampCollectionState (for resumption)
4. Compute N equal-width partition ranges from collected values
5. Save OrchestrationState to DMS
6. Create Function schedule (every 2 min) for monitor mode
7. Trigger all N partitions:
     client.functions.call(external_id="data-quality-validation",
                           data={validation_type: "partitioned", ...},
                           wait=False)
8. Create sync trigger (workflow trigger in no_backfill mode)
   with initial_sync_cutoff = max partition date seen
9. Return immediately

Monitor mode (orchestration_id present)

Called every 2 minutes by the Function schedule:

1. Load OrchestrationState from DMS
2. check_partition_statuses():
     - For each partition call_id: client.functions.calls.retrieve()
     - Map status: Completed+completed → done, partial/error → retry, Failed/Timeout → retry or fail
3. Retrigger incomplete partitions (up to max_retries=3)
4. Update OrchestrationState with progress
5. If all partitions done or max retries exhausted:
     - Delete Function schedule
     - Delete OrchestrationState
     - Return final summary

Timestamp collection with state

Because collecting all values from a large view can take longer than a Function timeout, the orchestrator saves progress in TimestampCollectionState (DMS node). If interrupted, the next invocation resumes from where it left off rather than restarting.

Partition count selection

The orchestrator calculates partition count as:

partition_count = min(max_partitions, math.ceil(total_count / instances_per_partition))
Defaults: instances_per_partition=10000, max_partitions=50. These are configurable per view.


10. Workflow design

Instance validation workflows (real-time)

Workflow: dq-shacl-{view_external_id}
  └── Trigger: DataModeling subscription
        filter: HasData(views=[view_id]) AND In(space, instance_spaces)
        sync_mode: "two_phase" (standard) or "no_backfill" (sync-cursor mode)
        batch_size: 100  (configurable)
        batch_timeout: 300s
  └── Task 1: FunctionTask
        function: data-quality-validation
        data: {
          validation_type: "instance" | "instance_sync_cursor",
          shacl_rules_file_external_id: "...",
          datamodel_space: "...",
          ...
          records_config: { stream_id, rule_set_id, rule_set_version }
        }

The workflow trigger receives batches of changed instances from DMS and forwards them to the Function. In sync-cursor mode, the trigger payload carries only metadata (no view properties); the handler fetches full data via its own instances.sync() call.

Time series validation workflows (scheduled)

Workflow: dq-ts-{config_name}
  └── Trigger: CRON ("0 * * * *" for hourly)
  └── Task 1: FunctionTask
        function: data-quality-validation
        data: {
          validation_type: "timeseries",
          shacl_rules_file_external_id: "...",
          filter: { space, external_id_prefix },
          backfill: { enabled, start_time, end_time, window_minutes },
          records_config: { ... }
        }

RAW table validation workflows (scheduled)

Workflow: dq-raw-{table_name}
  └── Trigger: CRON ("*/15 * * * *" for every 15 min)
  └── Task 1: FunctionTask
        function: data-quality-validation
        data: {
          validation_type: "raw_incremental",
          db_name: "...",
          table_name: "...",
          batch_size: 10000,
          records_config: { ... }
        }

Historic validation flow (orchestrated)

Manual/API trigger → Workflow (single task)
  └── Task 1: FunctionTask → orchestrator (initial mode)
        Returns immediately after triggering N partition Functions
        and creating the monitor Function schedule

  (running in background every 2 minutes):
  Function schedule → orchestrator (monitor mode)
        Checks partition statuses, retriggers failures, cleans up on completion

Sync trigger creation

After historic completion, the orchestrator calls create_sync_trigger() which: 1. Updates the workflow version to embed job_run_id and initial_sync_cutoff in the task payload 2. Upserts a WorkflowTriggerUpsert with no_backfill sync mode 3. Uses the same external_id sync-{view_external_id} each time — so re-running historic validation idempotently replaces the existing trigger


11. Records API integration

All validation results are posted to the CDF Records API (alpha endpoint). The integration is in common/records_api.py.

Endpoint

POST /api/v1/projects/{project}/streams/{stream_id}/records
Header: cdf-version: alpha

Record structure

One record per validated instance (whether passed or failed):

{
  "space": "dataQuality",
  "externalId": "dq_{rule_set_id}_{instance_external_id}_{job_run_id}",
  "sources": [{
    "source": {
      "type": "container",
      "space": "dataQuality",
      "externalId": "DataQualityValidationRecord"
    },
    "properties": {
      "ruleSetId": "MyViewSHACLv1",
      "ruleSetVersion": "1.0",
      "jobRunId": "historic_0_1711234567",
      "passedValidation": false,
      "resultSeverity": ["Violation"],
      "failedConstraints": ["MinCountConstraintComponent::my_property"],
      "focusNode": "http://purl.org/cognite/my_space/my_instance",
      "focusNodeInstance": { "space": "my_space", "externalId": "my_instance" },
      "sourceShape": ["http://example.org/MyShape"],
      "dimensions": ["Completeness"],
      "targetClass": ["MyEquipment"],
      "validationReport": {
        "violationCount": 1,
        "violations": [{
          "sourceConstraintComponent": "MinCountConstraintComponent",
          "resultMessage": "my_property is required",
          "resultSeverity": "Violation",
          "resultPath": "http://purl.org/cognite/my_space/my_property",
          "dimension": "Completeness"
        }],
        "summary": "1 violation(s)"
      }
    }
  }]
}

For RAW rows: focusNodeInstance is omitted (space is empty string, not a DMS space). The focusNode URI uses the RAW namespace: http://purl.org/cognite/raw/{db}/{table}/#{row_key}.

Batching and retry

Records are posted in batches of 1000. Each batch gets up to 3 attempts with exponential backoff (1s → 2s → 4s) for 5xx / gateway / timeout errors. If a batch fails after retries, it falls back to individual record posting. Errors from individual records are collected and returned to the caller (which checks the 10% failure threshold).

Passed instances

Instances with no violations still get a record posted with passedValidation: true and failedConstraints: []. This is deliberate — it creates a complete audit trail of every instance that was checked.

constraint string format

Failed constraint strings are formatted as "{component}::{resultPath}" when a path is available, e.g. "MinCountConstraintComponent::http://purl.org/cognite/ns/my_property". This allows downstream queries to filter by both constraint type and affected property.


12. State management (DMS containers)

All runtime state is stored in the dataQuality DMS space. Containers are created by scripts/ensure_container.py.

FunctionValidationState

Purpose: Cursor resumption for partitioned and sync-cursor validation.

Property Type Description
orchestration_id String Links to orchestration run
partitionId String Partition identifier (or "sync" for sync-cursor)
syncCursor String DMS sync cursor from instances.sync()
processedCount Int Instances processed so far
lastUpdated Int (ms) Epoch milliseconds of last write

External ID patterns: - Partitioned: partition_state_orch_{orchestration_id}_p{partition_id} - Sync cursor: sync_cursor_{job_run_id}_{view_space}_{view_external_id}_{instance_space} - RAW historic: raw_{db_name}_{table_name}_{partition_id}

RawValidationState

Purpose: Incremental watermark for RAW table validation.

Property Type Description
db_name String RAW database name
table_name String RAW table name
last_processed_timestamp Int (ms) Min lastUpdatedTime for next run
last_run_time Timestamp ISO timestamp of last run
rows_processed Int Rows processed in last run
last_validation_conforms String "true" or "false"
last_violation_count Int Violations in last run

External ID: {db_name}_{table_name}

OrchestrationState

Purpose: Tracks overall historic orchestration progress.

Property Type Description
orchestration_id String UUID for this orchestration run
view_external_id String View being validated
partition_count Int Total partitions
instance_space String JSON-encoded list of instance spaces
completed_partitions JSON Map of completed partition IDs
failed_partitions JSON Map of permanently failed partition IDs
partition_call_ids JSON Map of partition_id → function call_id
partition_retry_count JSON Map of partition_id → retry count

TimestampCollectionState

Purpose: Checkpoint for orchestrator timestamp collection (allows resumption across Function timeouts).

TimeSeriesValidationCursor

Purpose: Per-time-series validation state (last run, backfill progress).


13. Fault-tolerance and timeout handling

Function timeout constraint

CDF Functions time out after 10 minutes. All long-running handlers use an internal max_runtime_seconds=480 (8 minutes) safety margin to save state before the timeout kills them.

Cursor save on timeout

elapsed = time.time() - start_time
if elapsed > max_runtime_seconds:
    save_cursor(...)     # persist progress to DMS
    return {"status": "partial", ...}

The cursor is saved after each successfully posted chunk (not before) so it always points to the start of unprocessed data.

Retry architecture (partitioned)

Partition Function call
  ├── Completes successfully → "completed" → orchestrator removes from retry list
  ├── Times out (status "partial") → orchestrator retriggers with same cursor
  ├── DMS sync timeout → adaptive limit reduction → eventually "sync_limits_exhausted"
  └── Unhandled exception → "error" status → orchestrator retriggers (max 3 times)
         └── After 3 failures → marked as "failed" in OrchestrationState

GC between chunks

After each chunk in partitioned and sync-cursor modes:

del neat, report_graph
gc.collect()
This explicitly releases the rdflib in-memory graph so memory doesn't accumulate across chunks. NeatSession is re-instantiated fresh per chunk.

Records API failure threshold

If > 10% of Records API posts fail within a single chunk, the handler raises RuntimeError. The cursor was saved before the post attempt, so the orchestrator will retrigger the partition from the same point.


14. End-to-end data flows

A. Real-time instance validation

User upserts DMS instance
CDF detects change in subscribed view
Workflow trigger fires with batch of changed nodes (up to batch_size=100)
Workflow executes FunctionTask → data-quality-validation
    ├── validation_type: "instance"
    ├── instances: {items: {n: [node dicts], e: [edge dicts]}}
    └── ...config...
handler.py → handle_instance_validation()
NEAT loads SHACL rules from CDF Files (cached in runtime)
NEAT auto-loads referenced nodes (depth 2) via DMS API
pyshacl validates RDF graph against SHACL shapes
Violations extracted from rdflib report graph
post_validation_results_to_records() → 1 record per instance
Records API stream (cdf-version: alpha)
graph TD
    A[DMS Instance Change] --> B[DataModelingTrigger\nbatch_size: 100, timeout: 60s]
    B --> C[Validation Workflow\ndq-shacl-view-name]
    C --> D[Cognite Function\ndata-quality-validation\ntype: instance]

    D --> E{Load SHACL Rules}
    E -->|RuleSet API| F[RuleSet References\nruleset_references]
    E -->|CDF Files| G[CDF File\nshacl_rules_file_external_id]
    E -->|Inline| H[Inline Turtle\nshacl_rules]

    D --> I[Extract Instances\nfrom workflow payload]
    I --> J[Filter deleted instances\nwith deletedTime set]
    J --> K[Fetch full instance data\nfrom DMS multi-spaces]
    K --> L[Auto-load References\ndepth: 0-3 levels]

    F --> M[NeatSession\nSHACL Validation\npyshacl engine]
    G --> M
    H --> M
    L --> M

    M --> N{Conforms?}
    N -->|Yes| O[Records API\nPASSED]
    N -->|No| P[Records API\nFAILED + violations\nconstraint, path, message, severity]
    O --> Q[DataQualityValidationRecord\ndataQuality space]
    P --> Q

B. Historic instance validation

API or manual trigger → Workflow → orchestrator (initial mode)
discover_data_distribution():
    client.instances.aggregate(count) + aggregate(min) + aggregate(max)
collect_all_timestamps():
    instances.sync() in chunks → all partition field values
    (saves progress to TimestampCollectionState)
compute N equal partition ranges
create Function schedule (monitor, every 2 min)
trigger N partition functions (async, no wait)
create_sync_trigger() with initial_sync_cutoff = max partition date
return immediately

--- meanwhile ---

Each partition function runs independently:
    instances.sync() chunk by chunk (with adaptive limit)
        → NEAT validate → post to Records API → save cursor
    On timeout: save cursor, return "partial"
    On complete: delete cursor, return "completed"

--- every 2 minutes ---

Function schedule → orchestrator (monitor mode)
    check_partition_statuses():
        functions.calls.retrieve() for each partition
        → retrigger partial/error/failed partitions (max 3x)
    When all done: delete schedule, delete OrchestrationState
graph TD
    A[Manual Trigger\nor Scheduled] --> B[Cognite Function\ntype: orchestrator]

    B --> C{First Run?}
    C -->|Yes - Initial Mode| D[Query DMS\nfor data distribution]
    D --> E[Calculate N partition ranges\nmin/max field splits]
    E --> F[Trigger N partition functions\nnon-blocking]
    F --> G[Create sync cursor trigger\nfor ongoing changes]
    F --> H[Create 2-min monitor schedule\norchestration_status]

    C -->|No - Monitor Mode| I[Load OrchestrationState\nfrom DMS container]
    I --> J[Check partition statuses]
    J --> K{All complete?}
    K -->|No| L[Retrigger incomplete\npartitions max 3x]
    K -->|Yes| M[Delete schedule & state]

    subgraph "Partition Worker (type: partitioned)"
        N[Load cursor if resuming] --> O[Query DMS range filter\nfield IN min..max]
        O --> P[Validate chunk 200 instances]
        P --> Q[Post to Records API]
        Q --> R{480s timeout?}
        R -->|No| O
        R -->|Yes| S[Save cursor state\nFunctionValidationState]
    end

    F --> N
    L --> N

C. Ongoing sync-cursor validation

Any DMS change in subscribed view
Workflow trigger fires (lightweight — no view properties in payload)
Workflow → data-quality-validation (validation_type: "instance_sync_cursor")
Load sync cursor from FunctionValidationState
Loop: instances.sync(two_phase, filter includes cutoff on first run)
    ├── Chunk → NEAT validate → post Records API → save cursor
    └── Timeout: save cursor, return "partial" (next trigger continues)
All caught up: save cursor, return "completed"
graph LR
    A[DMS Change Event\nlightweight signal] --> B[Cognite Function\ntype: instance_sync_cursor]
    B --> C[Load cursor state\nFunctionValidationState]
    C --> D[Load SHACL Rules\nRuleSet API or CDF Files]
    D --> E[instances.sync\nfetch changes since cursor]
    E --> F[Filter deleted instances]
    F --> G[Validate chunk\nNeatSession + SHACL]
    G --> H[Post to Records API]
    H --> I{More changes\nor 480s timeout?}
    I -->|More changes| E
    I -->|Timeout| J[Save cursor\nfor resumption]
    I -->|Done| K[Save final cursor]

D. Time series validation (scheduled)

CRON trigger (e.g. hourly)
Workflow → data-quality-validation (validation_type: "timeseries")
Fetch CogniteTimeSeries nodes from DMS (filter or explicit IDs)
Load SHACL rules from CDF Files
Replace time window placeholders with actual timestamps:
    "60m-ago" → ISO timestamp, "now" → ISO timestamp
NEAT.validate_instances.with_shacl(auto_load_depth=0)
    SHACL rules call CDF SDK / INDSL SPARQL functions to fetch datapoints
Parse SHACL text report for violations per time series
Post 1 record per time series to Records API (with time_window in externalId)
Save cursor state (PASSED/FAILED + timestamp) to TimeSeriesValidationCursor
graph LR
    A[Scheduled Trigger\ncron] --> B[Cognite Function\ndata-quality-validation\ntype: timeseries]
    B --> C[Load SHACL Rules\nfrom CDF Files]
    B --> D{Selection Method}
    D -->|DMS Filter| E[Filter CogniteTimeSeries\nby prefix or properties]
    D -->|Instance IDs| F[Explicit list\nof time series]
    E --> G[Fetch Time Series\nfrom DMS]
    F --> G
    C --> H[NEAT Validation\nCDF SDK + INDSL functions]
    G --> H
    H --> I{Backfill Mode?}
    I -->|Yes| J[Process historic chunks\nhour-aligned windows]
    I -->|No| K[Validate last\ncomplete hour]
    J --> L[Save cursor state\nfor resume]
    K --> M[Records API\nPer time series result]
    L --> M
    M --> N[DataQualityValidationRecord]

E. RAW table validation (incremental)

CRON trigger (e.g. every 15 min)
Workflow → data-quality-validation (validation_type: "raw_incremental")
Load last_processed_timestamp from RawValidationState
NEAT.validate_instances.with_shacl_raw(
    min_last_updated_time=last_timestamp,
    limit=batch_size)
    (fetches RAW rows updated since last run, converts to RDF, runs SHACL)
Parse violations, fetch full row data for Records API
post_validation_results_to_records() with RAW namespace_base
Save new timestamp (current time) to RawValidationState
graph LR
    A[Scheduled Trigger\ncron] --> B{Validation Mode}

    B -->|Incremental| C[Cognite Function\ntype: raw_incremental]
    C --> D[Load last_processed_timestamp\nRawValidationState container]
    D --> E[Fetch RAW rows\nupdated since timestamp]
    E --> F[Convert rows to RDF\nusing RAW URI scheme]
    F --> G[SHACL Validation\nper-row constraints]
    G --> H[Post to Records API]
    H --> I[Save new timestamp\nRawValidationState]

    B -->|Historic| J[Cognite Function\ntype: raw_historic]
    J --> K[getCursors API\ndistribute N cursors]
    K --> L[Process cursor chunk\nwith SHACL]
    L --> M{480s timeout?}
    M -->|No| L
    M -->|Yes| N[Save cursor state\nFunctionValidationState]

15. Combined lifecycle: historic batch + sync trigger

Overview

Every instance validation pipeline operates in two sequential phases that together provide complete, continuous coverage:

  1. Phase 1 — Historic batch: The orchestrator validates all existing instances at a point in time using parallel partitioned workers.
  2. Phase 2 — Ongoing sync: A DMS change-subscription trigger watches for new and updated instances and validates them in near-real-time.

The two phases are stitched together by a shared initial_sync_cutoff timestamp so no instance falls through the gap between them.


Phase 1: Historic partitioned validation

When deploy_validation_pipeline() or a manual orchestrator invocation kicks off a historic run:

  1. Timestamp collection — The orchestrator streams all partition_field values from DMS via instances.sync(two_phase) and records sync_cutoff_timestamp = max(partition_field_values_seen). Progress is checkpointed in TimestampCollectionState so a Function timeout does not restart the scan from scratch.

  2. Partition triggering — The orchestrator computes N equal-width ranges and fires N independent partitioned Function calls (without waiting). Each runs concurrently and resumes via a FunctionValidationState cursor if it times out.

  3. Sync trigger creation (Step 6)Before returning, the orchestrator calls create_sync_trigger(), which:

  4. Patches the existing view workflow version, injecting job_run_id and initial_sync_cutoff = sync_cutoff_timestamp into the task payload.
  5. Creates a WorkflowTriggerUpsert with sync_mode="no_backfill" so the trigger only delivers changes going forward from the moment it is created.

The sync trigger goes live at this point — Phase 2 is active even while Phase 1 partitions are still running.


Phase 2: Ongoing sync validation

Standard instance mode (use_sync_cursor_mode: false)

The DMS change subscription delivers each batch of changed nodes directly in the workflow payload. The instance handler receives and validates them immediately. No initial_sync_cutoff filtering is applied — every arriving batch is validated as-is.

Sync-cursor mode (use_sync_cursor_mode: true)

The trigger fires as a lightweight signal (the workflow Select fetches no view properties). The instance_sync_cursor handler fetches its own data:

  1. Loads syncCursor from FunctionValidationState (key: sync_cursor_{job_run_id}_{view_external_id}).
  2. On the first invocation (no saved cursor): applies Range(lastUpdatedTime >= initial_sync_cutoff) so it starts exactly where timestamp collection ended.
  3. Pages through instances.sync(two_phase) chunk by chunk, validating and posting records to the Records API.
  4. Saves cursor after each successful chunk. If 480 s elapses it returns "partial" — the next trigger fires and continues from the saved position.

This mode requires max_concurrent_executions: 1 on the workflow to prevent two executions from racing on the same cursor.


The handoff: ensuring no gap

T0              T1                              T2
│               │                               │
│←─ Timestamp collection ──►│                   │
│               │                               │
│               │◄──── Historic partitions ─────►│
│               │                               │
│               │◄──── Sync trigger live ────────────────►
│               │       (no_backfill from T1)   │
│               ↑
│    sync_cutoff_timestamp injected as
│    initial_sync_cutoff into Phase 2 handler
Time span Covered by
Before T1 (historic data) Phase 1 partition ranges (partition_fieldsync_cutoff_timestamp)
T1 onwards (new changes) Phase 2 sync trigger (no_backfill, starts at T1)
Between T1 and T2 (changes during partition runs) Both phases — Phase 2 record overwrites Phase 1 record

Any instance updated before T1 has lastUpdatedTime ≤ sync_cutoff_timestamp and falls inside a partition range, so it is guaranteed to be processed by Phase 1.

Any instance updated at or after T1 is delivered by the sync trigger: - Standard mode: arrives directly in the payload. - Sync-cursor mode: first-run initial_sync_cutoff filter ensures the cursor opens at sync_cutoff_timestamp, catching all changes from T1 onwards.

Instances updated between T1 and T2 (while partitions are still running) may be validated by both phases. This is intentional — the later Phase 2 record shares the same externalId pattern as the Phase 1 record and overwrites it in the Records API, so the final state always reflects the most recent validation.


Idempotency

The sync trigger external ID is always sync-{view_external_id} (e.g. sync-pump). Re-running historic validation upserts the same trigger with a fresh job_run_id and initial_sync_cutoff, seamlessly replacing the previous one. Any FunctionValidationState cursor from the old job_run_id is orphaned but harmless; the sync handler creates a new cursor under the new job_run_id.


Configuration reference

ViewConfig field Effect on pipeline
use_sync_cursor_mode: true Phase 2 handler fetches its own data via sync cursor. Requires max_concurrent_executions: 1.
use_sync_cursor_mode: false (default) Phase 2 handler receives instances directly in the trigger payload.
partition_count Number of Phase 1 parallel workers (default: 10).
partition_field DMS field used to slice Phase 1 ranges (default: lastUpdatedTime).
sync_batch_size Overrides trigger.batch_size for this view's sync trigger.
max_concurrent_executions Max simultaneous workflow executions. Must be 1 for sync-cursor mode.

Failure scenarios

A partition fails permanently (3 retries exhausted): The orchestrator records it in OrchestrationState.failed_partitions and continues. Phase 2 is already live — instances from the failed partition range will be validated the next time they change (sync trigger) or when historic is re-run manually.

Sync trigger fires before a sync-cursor run completes: max_concurrent_executions: 1 prevents a second execution from starting. CDF queues the trigger event and fires it after the current execution finishes.

Historic re-run while Phase 2 is active: The orchestrator upserts a new sync trigger under the same external ID. Any in-flight Phase 2 execution finishes with the old job_run_id; subsequent ones use the new job_run_id cursor, starting from the new initial_sync_cutoff.