Skip to content

Invoking validation functions

What this is

This guide explains how to invoke already-deployed validation handlers in CDF Functions from Python.

When to use it

Use invoke helpers when you need remote execution in CDF (scheduled, orchestrated, or triggered flows).

Use run_validation() for local iteration without deployed infrastructure.

User mental model

  • Local: run_validation() executes in your Python process.
  • Remote: invoke helpers call deployed handlers through a unified validation entry point.

Minimal happy path

Local validation vs. CDF Function invocation

run_validation() Invoke helpers (call_validate_instances_shacl, call_validation, etc.)
Where it runs Locally in your Python environment In a deployed Cognite Function in CDF
Requires deployed function No Yes — deploy first with deploy_validation_infrastructure()
Typical use Iterating on rules, quick checks, notebooks Triggering validation jobs from scripts or orchestration

Use run_validation() when you want to validate data without any CDF Functions. Use the invoke helpers to trigger the already-deployed function running in CDF.

Invoke helpers

The following helpers wrap call_validation() for specific validation types. All require a deployed function — run deploy_validation_infrastructure() first.

Prefer payloads that use ruleset_references (Data Product + RuleSet). Use shacl_rules_file_external_id only for legacy/file-based deployments.

Quick example

from cognite_data_quality import load_cognite_client_from_toml, call_validate_instances_shacl

client = load_cognite_client_from_toml("config.toml")

data = {
    "instances": {"items": {"n": [...]}},  # From workflow trigger or your own query
    "shacl_rules_file_external_id": "my_shacl_rules",
    "datamodel_space": "my_space",
    "datamodel_external_id": "MyDataModel",
    "datamodel_version": "v1",
    "records_config": {
        "stream_id": "dq_validation_stream",
        "rule_set_id": "MyViewSHACLv1",
        "rule_set_version": "1.0",
    },
}

result = call_validate_instances_shacl(client, data)
print(result.get("conforms"), result.get("instances_validated"))

Runtime behavior

Deploy validation pipeline (orchestrator)

To deploy and run the full validation pipeline for a view (historic partitions, sync trigger, monitor schedule), use deploy_validation_pipeline() rather than a raw invoke.

From CDF Files (no local config needed)

After deployment, view configs and settings are stored in CDF Files. Reference them by external ID to run the pipeline from any environment:

from cognite_data_quality import deploy_validation_pipeline

# Simplest — fetches both view config and settings from CDF Files:
result = deploy_validation_pipeline(
    client,
    view_config_external_id="Pump_view_config",
)

# Override settings external ID (non-standard function name):
result = deploy_validation_pipeline(
    client,
    view_config_external_id="Pump_view_config",
    settings_external_id="my-function_settings",
)

External ID scheme: {view_external_id}_view_config (e.g. Pump_view_config, HeatExchanger_view_config). Settings default external ID: data-quality-validation_settings.

From local YAML files

result = deploy_validation_pipeline(
    client,
    settings_path="settings.yaml",
    view_external_id="MyView",
    wait=True,
)
# result: orchestration_id, partitions_triggered, sync_trigger_external_id, ...

Unified function and convenience wrappers

Unified entry point

from cognite_data_quality import call_validation

result = call_validation(
    client=client,
    validation_type="instance",  # or "instance_sync_cursor", "timeseries", "orchestrator", "partitioned", "test", "aggregate_uniqueness", "shacl"
    data=data,
)

Convenience wrappers

Function Validation type Use case
deploy_validation_pipeline() orchestrator Deploy and run full pipeline (batch + incremental)
call_validate_instances_shacl() instance Validate instances (single invocation)
call_validate_instances_shacl_partitioned() partitioned Worker for partitioned validation
call_validate_timeseries_shacl() timeseries Validate time series data
call_validate_shacl() shacl Discover dqs:unique from SHACL and run aggregate uniqueness (scheduled workflow uses this)
call_validate_aggregate_uniqueness() aggregate_uniqueness Low-level aggregate uniqueness executor (explicit payload; no SHACL discovery)
call_validation(validation_type="instance_sync_cursor", ...) instance_sync_cursor Cursor-based incremental validation (sync cursor mode)

Each helper is a wrapper around call_validation() with the appropriate validation_type. Pass a payload dict and optional wait=True (default). See the function docstrings for payload keys.

Validation pipeline deployment

For deploying the full validation pipeline, the payload typically includes:

  • datamodel_space, datamodel_external_id, datamodel_version
  • view_space, view_external_id, view_version
  • instance_space
  • shacl_file_external_id
  • partition_count, partition_field
  • records_space, records_container, stream_id, rule_set_id, rule_set_version

Instance validation

For instance validation (single or partitioned), the payload typically includes:

  • instances: Dict with items (e.g. {"items": {"n": [...]}})
  • shacl_rules_file_external_id or shacl_rules (TTL string)
  • datamodel_space, datamodel_external_id, datamodel_version
  • records_config: stream_id, rule_set_id, rule_set_version, optional records_space, records_container
  • auto_load_depth (optional, default 2)

Sync cursor validation

Views configured with use_sync_cursor_mode: true use a cursor-based sync handler instead of receiving instances in the workflow payload. The trigger fires as a lightweight signal; the function independently fetches all changes since the last cursor from DMS and validates them.

The payload is set by the deployed workflow (populated by deploy_validation_infrastructure()) and patched with initial_sync_cutoff by the orchestrator when it creates the sync trigger. You do not normally invoke this handler directly.

Key payload parameters:

  • view_space, view_external_id, view_version
  • instance_space
  • shacl_rules_file_external_id
  • datamodel_space, datamodel_external_id, datamodel_version
  • chunk_size – instances per sync batch
  • auto_load_depth (optional, default 2)
  • data_quality_space – space for cursor state storage
  • initial_sync_cutoff – ISO timestamp; first-run filter to skip instances predating historic validation (patched by orchestrator)
  • records_config

Aggregate uniqueness (shacl / aggregate_uniqueness)

Production uniqueness runs on the scheduled dq-{view}-uniqueness workflow with validation_type: shacl. The handler loads SHACL from file or RuleSet refs, discovers dqs:uniquenessConstraint / dqs:unique, and runs the aggregate executor.

from cognite_data_quality import call_validate_shacl

result = call_validate_shacl(
    client,
    data={
        "ruleset_references": [{"externalId": "my-ruleset", "version": "1.0.0"}],
        "instance_spaces": ["my_instances"],
        "datamodel_space": "my_space",
        "datamodel_external_id": "WorkOrder",
        "datamodel_version": "v1",
        "records_config": {
            "stream_id": "dq_validation_stream",
            "rule_set_id": "Uniqueness_WorkOrderNumber",
            "rule_set_version": "1.0",
        },
    },
)

Key payload parameters:

  • ruleset_references (recommended) or shacl_rules_file_external_id (legacy/file mode) or inline shacl_rules
  • datamodel_space, datamodel_external_id, datamodel_version (for SHACL discovery)
  • instance_spaces — scope aggregate and detail queries to these spaces
  • records_config — posts RuleEngineResult records per violating instance

Use call_validate_aggregate_uniqueness() only when calling the low-level executor directly with an explicit property/view payload (no SHACL discovery). See Uniqueness.

Time series validation

For time series validation, the payload typically includes:

  • ruleset_references (recommended, Data Product + RuleSet mode) or shacl_rules_file_external_id (legacy / file mode)
  • datamodel_space, datamodel_external_id, datamodel_version
  • filter or instance_ids
  • records_config (rule_set_id = RuleSet external ID, rule_set_version = DP semver in DP mode)

For concrete YourOrgTimeSeries datapoint examples and an end-to-end test script, see Time Series Datapoint Rule Examples.

RuleEngineResult incremental consumption (rule_engine_result_sync)

Use this validation type when downstream rules or pipelines should consume only new/changed RuleEngineResult records.

Required payload:

  • validation_type: "rule_engine_result_sync"
  • listener_id
  • stream_id

Optional payload:

  • source_mode: "sync" (default) or "filter"
  • filter: rule-scoped filters
  • rule_ids
  • rule_set_ids
  • rule_set_versions
  • focus_nodes
  • focus_node_instances
  • records_space (default dataQuality)
  • records_container (default RuleEngineResult)
  • initialize_cursor (default 7d-ago, sync mode only when no saved cursor exists)
  • initial_watermark_ms (filter mode bootstrap; milliseconds since epoch)
  • include_items (default false) — return full record payloads; probes and tests should set true
  • limit (legacy alias for max_records)
  • page_limit (default 1000, max 1000)
  • max_records (default 10000) — enables multi-page reads (>1000) in one run

The handler persists cursor/watermark state per listener_id in FunctionValidationState and returns record_external_ids (and items when include_items is true).

Notes:

  • sync mode advances by API cursor only; watermark does not drive progression.
  • filter mode always uses lastUpdatedTime and advances watermark to the max seen timestamp.
  • For first-run filter mode, use a bounded initial_watermark_ms (for example now-6d) when your environment enforces max interval windows.

For a full cross-run chaining pattern and working script, see Chained Conditional Logic.

Best practices

  • Prefer specific helper wrappers for readability, then fall back to call_validation() for advanced routing.
  • Keep payloads sourced from deployed config artifacts where possible to reduce drift.
  • Use wait=True for scripts that need immediate result inspection.

Troubleshooting

  • Invocation fails: ensure infrastructure/function is deployed and credentials allow function calls.
  • Handler returns empty outputs: verify payload keys for selected validation_type.
  • Unexpected rule scope: check datamodel/view identifiers and rules source fields.

Previous section

Next section