Invoking validation functions
Local validation vs. CDF Function invocation
run_validation() |
Invoke helpers (call_validate_instances_shacl, call_validation, etc.) |
|
|---|---|---|
| Where it runs | Locally in your Python environment | In a deployed Cognite Function in CDF |
| Requires deployed function | No | Yes — deploy first with deploy_validation_infrastructure() |
| Typical use | Iterating on rules, quick checks, notebooks | Triggering validation jobs from scripts or orchestration |
Use run_validation() when you want to validate data without any CDF Functions. Use the invoke helpers to trigger the already-deployed function running in CDF.
Invoke helpers
Quick example
from cognite_data_quality import load_cognite_client_from_toml, call_validate_instances_shacl
client = load_cognite_client_from_toml("config.toml")
data = {
"instances": {"items": {"n": [...]}}, # From workflow trigger or your own query
"shacl_rules_file_external_id": "my_shacl_rules",
"datamodel_space": "my_space",
"datamodel_external_id": "MyDataModel",
"datamodel_version": "v1",
"records_config": {
"stream_id": "dq_validation_stream",
"rule_set_id": "MyViewSHACLv1",
"rule_set_version": "1.0",
},
}
result = call_validate_instances_shacl(client, data)
print(result.get("conforms"), result.get("instances_validated"))
Deploy validation pipeline (orchestrator)
To deploy and run the full validation pipeline for a view (historic partitions, sync trigger, monitor schedule), use deploy_validation_pipeline() rather than a raw invoke.
From CDF Files (no local config needed)
After deployment, view configs and settings are stored in CDF Files. Reference them by external ID to run the pipeline from any environment:
from cognite_data_quality import deploy_validation_pipeline
# Simplest — fetches both view config and settings from CDF Files:
result = deploy_validation_pipeline(
client,
view_config_external_id="Pump_view_config",
)
# Override settings external ID (non-standard function name):
result = deploy_validation_pipeline(
client,
view_config_external_id="Pump_view_config",
settings_external_id="my-function_settings",
)
External ID scheme: {view_external_id}_view_config (e.g. Pump_view_config, HeatExchanger_view_config). Settings default external ID: data-quality-validation_settings.
From local YAML files
result = deploy_validation_pipeline(
client,
settings_path="settings.yaml",
view_external_id="MyView",
wait=True,
)
# result: orchestration_id, partitions_triggered, sync_trigger_external_id, ...
Unified function and convenience wrappers
Unified entry point
from cognite_data_quality import call_validation
result = call_validation(
client=client,
validation_type="instance", # or "instance_sync_cursor", "timeseries", "orchestrator", "partitioned", "test"
data=data,
)
Convenience wrappers
| Function | Validation type | Use case |
|---|---|---|
deploy_validation_pipeline() |
orchestrator | Deploy and run full pipeline (batch + incremental) |
call_validate_instances_shacl() |
instance | Validate instances (single invocation) |
call_validate_instances_shacl_partitioned() |
partitioned | Worker for partitioned validation |
call_validate_timeseries_shacl() |
timeseries | Validate time series data |
call_validation(validation_type="instance_sync_cursor", ...) |
instance_sync_cursor | Cursor-based incremental validation (sync cursor mode) |
Each helper is a wrapper around call_validation() with the appropriate validation_type. Pass a payload dict and optional wait=True (default). See the function docstrings for payload keys.
Validation pipeline deployment
For deploying the full validation pipeline, the payload typically includes:
datamodel_space,datamodel_external_id,datamodel_versionview_space,view_external_id,view_versioninstance_spaceshacl_file_external_idpartition_count,partition_fieldrecords_space,records_container,stream_id,rule_set_id,rule_set_version
Instance validation
For instance validation (single or partitioned), the payload typically includes:
instances: Dict withitems(e.g.{"items": {"n": [...]}})shacl_rules_file_external_idorshacl_rules(TTL string)datamodel_space,datamodel_external_id,datamodel_versionrecords_config:stream_id,rule_set_id,rule_set_version, optionalrecords_space,records_containerauto_load_depth(optional, default 2)
Sync cursor validation
Views configured with use_sync_cursor_mode: true use a cursor-based sync handler instead of receiving instances in the workflow payload. The trigger fires as a lightweight signal; the function independently fetches all changes since the last cursor from DMS and validates them.
The payload is set by the deployed workflow (populated by deploy_validation_infrastructure()) and patched with initial_sync_cutoff by the orchestrator when it creates the sync trigger. You do not normally invoke this handler directly.
Key payload parameters:
view_space,view_external_id,view_versioninstance_spaceshacl_rules_file_external_iddatamodel_space,datamodel_external_id,datamodel_versionchunk_size– instances per sync batchauto_load_depth(optional, default 2)data_quality_space– space for cursor state storageinitial_sync_cutoff– ISO timestamp; first-run filter to skip instances predating historic validation (patched by orchestrator)records_config
Time series validation
For time series validation, the payload typically includes:
shacl_rules_file_external_iddatamodel_space,datamodel_external_id,datamodel_versionfilterorinstance_idsrecords_config