Invoking validation functions
What this is
This guide explains how to invoke already-deployed validation handlers in CDF Functions from Python.
When to use it
Use invoke helpers when you need remote execution in CDF (scheduled, orchestrated, or triggered flows).
Use run_validation() for local iteration without deployed infrastructure.
User mental model
- Local:
run_validation()executes in your Python process. - Remote: invoke helpers call deployed handlers through a unified validation entry point.
Minimal happy path
Local validation vs. CDF Function invocation
run_validation() |
Invoke helpers (call_validate_instances_shacl, call_validation, etc.) |
|
|---|---|---|
| Where it runs | Locally in your Python environment | In a deployed Cognite Function in CDF |
| Requires deployed function | No | Yes — deploy first with deploy_validation_infrastructure() |
| Typical use | Iterating on rules, quick checks, notebooks | Triggering validation jobs from scripts or orchestration |
Use run_validation() when you want to validate data without any CDF Functions. Use the invoke helpers to trigger the already-deployed function running in CDF.
Invoke helpers
The following helpers wrap call_validation() for specific validation types. All require a deployed function — run deploy_validation_infrastructure() first.
Prefer payloads that use ruleset_references (Data Product + RuleSet). Use shacl_rules_file_external_id only for legacy/file-based deployments.
Quick example
from cognite_data_quality import load_cognite_client_from_toml, call_validate_instances_shacl
client = load_cognite_client_from_toml("config.toml")
data = {
"instances": {"items": {"n": [...]}}, # From workflow trigger or your own query
"shacl_rules_file_external_id": "my_shacl_rules",
"datamodel_space": "my_space",
"datamodel_external_id": "MyDataModel",
"datamodel_version": "v1",
"records_config": {
"stream_id": "dq_validation_stream",
"rule_set_id": "MyViewSHACLv1",
"rule_set_version": "1.0",
},
}
result = call_validate_instances_shacl(client, data)
print(result.get("conforms"), result.get("instances_validated"))
Runtime behavior
Deploy validation pipeline (orchestrator)
To deploy and run the full validation pipeline for a view (historic partitions, sync trigger, monitor schedule), use deploy_validation_pipeline() rather than a raw invoke.
From CDF Files (no local config needed)
After deployment, view configs and settings are stored in CDF Files. Reference them by external ID to run the pipeline from any environment:
from cognite_data_quality import deploy_validation_pipeline
# Simplest — fetches both view config and settings from CDF Files:
result = deploy_validation_pipeline(
client,
view_config_external_id="Pump_view_config",
)
# Override settings external ID (non-standard function name):
result = deploy_validation_pipeline(
client,
view_config_external_id="Pump_view_config",
settings_external_id="my-function_settings",
)
External ID scheme: {view_external_id}_view_config (e.g. Pump_view_config, HeatExchanger_view_config). Settings default external ID: data-quality-validation_settings.
From local YAML files
result = deploy_validation_pipeline(
client,
settings_path="settings.yaml",
view_external_id="MyView",
wait=True,
)
# result: orchestration_id, partitions_triggered, sync_trigger_external_id, ...
Unified function and convenience wrappers
Unified entry point
from cognite_data_quality import call_validation
result = call_validation(
client=client,
validation_type="instance", # or "instance_sync_cursor", "timeseries", "orchestrator", "partitioned", "test", "aggregate_uniqueness", "shacl"
data=data,
)
Convenience wrappers
| Function | Validation type | Use case |
|---|---|---|
deploy_validation_pipeline() |
orchestrator | Deploy and run full pipeline (batch + incremental) |
call_validate_instances_shacl() |
instance | Validate instances (single invocation) |
call_validate_instances_shacl_partitioned() |
partitioned | Worker for partitioned validation |
call_validate_timeseries_shacl() |
timeseries | Validate time series data |
call_validate_shacl() |
shacl | Discover dqs:unique from SHACL and run aggregate uniqueness (scheduled workflow uses this) |
call_validate_aggregate_uniqueness() |
aggregate_uniqueness | Low-level aggregate uniqueness executor (explicit payload; no SHACL discovery) |
call_validation(validation_type="instance_sync_cursor", ...) |
instance_sync_cursor | Cursor-based incremental validation (sync cursor mode) |
Each helper is a wrapper around call_validation() with the appropriate validation_type. Pass a payload dict and optional wait=True (default). See the function docstrings for payload keys.
Validation pipeline deployment
For deploying the full validation pipeline, the payload typically includes:
datamodel_space,datamodel_external_id,datamodel_versionview_space,view_external_id,view_versioninstance_spaceshacl_file_external_idpartition_count,partition_fieldrecords_space,records_container,stream_id,rule_set_id,rule_set_version
Instance validation
For instance validation (single or partitioned), the payload typically includes:
instances: Dict withitems(e.g.{"items": {"n": [...]}})shacl_rules_file_external_idorshacl_rules(TTL string)datamodel_space,datamodel_external_id,datamodel_versionrecords_config:stream_id,rule_set_id,rule_set_version, optionalrecords_space,records_containerauto_load_depth(optional, default 2)
Sync cursor validation
Views configured with use_sync_cursor_mode: true use a cursor-based sync handler instead of receiving instances in the workflow payload. The trigger fires as a lightweight signal; the function independently fetches all changes since the last cursor from DMS and validates them.
The payload is set by the deployed workflow (populated by deploy_validation_infrastructure()) and patched with initial_sync_cutoff by the orchestrator when it creates the sync trigger. You do not normally invoke this handler directly.
Key payload parameters:
view_space,view_external_id,view_versioninstance_spaceshacl_rules_file_external_iddatamodel_space,datamodel_external_id,datamodel_versionchunk_size– instances per sync batchauto_load_depth(optional, default 2)data_quality_space– space for cursor state storageinitial_sync_cutoff– ISO timestamp; first-run filter to skip instances predating historic validation (patched by orchestrator)records_config
Aggregate uniqueness (shacl / aggregate_uniqueness)
Production uniqueness runs on the scheduled dq-{view}-uniqueness workflow with validation_type: shacl. The handler loads SHACL from file or RuleSet refs, discovers dqs:uniquenessConstraint / dqs:unique, and runs the aggregate executor.
from cognite_data_quality import call_validate_shacl
result = call_validate_shacl(
client,
data={
"ruleset_references": [{"externalId": "my-ruleset", "version": "1.0.0"}],
"instance_spaces": ["my_instances"],
"datamodel_space": "my_space",
"datamodel_external_id": "WorkOrder",
"datamodel_version": "v1",
"records_config": {
"stream_id": "dq_validation_stream",
"rule_set_id": "Uniqueness_WorkOrderNumber",
"rule_set_version": "1.0",
},
},
)
Key payload parameters:
ruleset_references(recommended) orshacl_rules_file_external_id(legacy/file mode) or inlineshacl_rulesdatamodel_space,datamodel_external_id,datamodel_version(for SHACL discovery)instance_spaces— scope aggregate and detail queries to these spacesrecords_config— postsRuleEngineResultrecords per violating instance
Use call_validate_aggregate_uniqueness() only when calling the low-level executor directly with an explicit property/view payload (no SHACL discovery). See Uniqueness.
Time series validation
For time series validation, the payload typically includes:
ruleset_references(recommended, Data Product + RuleSet mode) orshacl_rules_file_external_id(legacy / file mode)datamodel_space,datamodel_external_id,datamodel_versionfilterorinstance_idsrecords_config(rule_set_id= RuleSet external ID,rule_set_version= DP semver in DP mode)
For concrete YourOrgTimeSeries datapoint examples and an end-to-end test script, see Time Series Datapoint Rule Examples.
RuleEngineResult incremental consumption (rule_engine_result_sync)
Use this validation type when downstream rules or pipelines should consume only new/changed RuleEngineResult records.
Required payload:
validation_type: "rule_engine_result_sync"listener_idstream_id
Optional payload:
source_mode:"sync"(default) or"filter"filter: rule-scoped filtersrule_idsrule_set_idsrule_set_versionsfocus_nodesfocus_node_instancesrecords_space(defaultdataQuality)records_container(defaultRuleEngineResult)initialize_cursor(default7d-ago, sync mode only when no saved cursor exists)initial_watermark_ms(filter mode bootstrap; milliseconds since epoch)include_items(defaultfalse) — return full record payloads; probes and tests should settruelimit(legacy alias formax_records)page_limit(default1000, max1000)max_records(default10000) — enables multi-page reads (>1000) in one run
The handler persists cursor/watermark state per listener_id in FunctionValidationState and returns record_external_ids (and items when include_items is true).
Notes:
syncmode advances by API cursor only; watermark does not drive progression.filtermode always useslastUpdatedTimeand advances watermark to the max seen timestamp.- For first-run
filtermode, use a boundedinitial_watermark_ms(for example now-6d) when your environment enforces max interval windows.
For a full cross-run chaining pattern and working script, see Chained Conditional Logic.
Best practices
- Prefer specific helper wrappers for readability, then fall back to
call_validation()for advanced routing. - Keep payloads sourced from deployed config artifacts where possible to reduce drift.
- Use
wait=Truefor scripts that need immediate result inspection.
Troubleshooting
- Invocation fails: ensure infrastructure/function is deployed and credentials allow function calls.
- Handler returns empty outputs: verify payload keys for selected
validation_type. - Unexpected rule scope: check datamodel/view identifiers and rules source fields.