Skip to content

Invoking validation functions

Local validation vs. CDF Function invocation

run_validation() Invoke helpers (call_validate_instances_shacl, call_validation, etc.)
Where it runs Locally in your Python environment In a deployed Cognite Function in CDF
Requires deployed function No Yes — deploy first with deploy_validation_infrastructure()
Typical use Iterating on rules, quick checks, notebooks Triggering validation jobs from scripts or orchestration

Use run_validation() when you want to validate data without any CDF Functions. Use the invoke helpers to trigger the already-deployed function running in CDF.

Invoke helpers

Quick example

from cognite_data_quality import load_cognite_client_from_toml, call_validate_instances_shacl

client = load_cognite_client_from_toml("config.toml")

data = {
    "instances": {"items": {"n": [...]}},  # From workflow trigger or your own query
    "shacl_rules_file_external_id": "my_shacl_rules",
    "datamodel_space": "my_space",
    "datamodel_external_id": "MyDataModel",
    "datamodel_version": "v1",
    "records_config": {
        "stream_id": "dq_validation_stream",
        "rule_set_id": "MyViewSHACLv1",
        "rule_set_version": "1.0",
    },
}

result = call_validate_instances_shacl(client, data)
print(result.get("conforms"), result.get("instances_validated"))

Deploy validation pipeline (orchestrator)

To deploy and run the full validation pipeline for a view (historic partitions, sync trigger, monitor schedule), use deploy_validation_pipeline() rather than a raw invoke.

From CDF Files (no local config needed)

After deployment, view configs and settings are stored in CDF Files. Reference them by external ID to run the pipeline from any environment:

from cognite_data_quality import deploy_validation_pipeline

# Simplest — fetches both view config and settings from CDF Files:
result = deploy_validation_pipeline(
    client,
    view_config_external_id="Pump_view_config",
)

# Override settings external ID (non-standard function name):
result = deploy_validation_pipeline(
    client,
    view_config_external_id="Pump_view_config",
    settings_external_id="my-function_settings",
)

External ID scheme: {view_external_id}_view_config (e.g. Pump_view_config, HeatExchanger_view_config). Settings default external ID: data-quality-validation_settings.

From local YAML files

result = deploy_validation_pipeline(
    client,
    settings_path="settings.yaml",
    view_external_id="MyView",
    wait=True,
)
# result: orchestration_id, partitions_triggered, sync_trigger_external_id, ...

Unified function and convenience wrappers

Unified entry point

from cognite_data_quality import call_validation

result = call_validation(
    client=client,
    validation_type="instance",  # or "instance_sync_cursor", "timeseries", "orchestrator", "partitioned", "test"
    data=data,
)

Convenience wrappers

Function Validation type Use case
deploy_validation_pipeline() orchestrator Deploy and run full pipeline (batch + incremental)
call_validate_instances_shacl() instance Validate instances (single invocation)
call_validate_instances_shacl_partitioned() partitioned Worker for partitioned validation
call_validate_timeseries_shacl() timeseries Validate time series data
call_validation(validation_type="instance_sync_cursor", ...) instance_sync_cursor Cursor-based incremental validation (sync cursor mode)

Each helper is a wrapper around call_validation() with the appropriate validation_type. Pass a payload dict and optional wait=True (default). See the function docstrings for payload keys.

Validation pipeline deployment

For deploying the full validation pipeline, the payload typically includes:

  • datamodel_space, datamodel_external_id, datamodel_version
  • view_space, view_external_id, view_version
  • instance_space
  • shacl_file_external_id
  • partition_count, partition_field
  • records_space, records_container, stream_id, rule_set_id, rule_set_version

Instance validation

For instance validation (single or partitioned), the payload typically includes:

  • instances: Dict with items (e.g. {"items": {"n": [...]}})
  • shacl_rules_file_external_id or shacl_rules (TTL string)
  • datamodel_space, datamodel_external_id, datamodel_version
  • records_config: stream_id, rule_set_id, rule_set_version, optional records_space, records_container
  • auto_load_depth (optional, default 2)

Sync cursor validation

Views configured with use_sync_cursor_mode: true use a cursor-based sync handler instead of receiving instances in the workflow payload. The trigger fires as a lightweight signal; the function independently fetches all changes since the last cursor from DMS and validates them.

The payload is set by the deployed workflow (populated by deploy_validation_infrastructure()) and patched with initial_sync_cutoff by the orchestrator when it creates the sync trigger. You do not normally invoke this handler directly.

Key payload parameters:

  • view_space, view_external_id, view_version
  • instance_space
  • shacl_rules_file_external_id
  • datamodel_space, datamodel_external_id, datamodel_version
  • chunk_size – instances per sync batch
  • auto_load_depth (optional, default 2)
  • data_quality_space – space for cursor state storage
  • initial_sync_cutoff – ISO timestamp; first-run filter to skip instances predating historic validation (patched by orchestrator)
  • records_config

Time series validation

For time series validation, the payload typically includes:

  • shacl_rules_file_external_id
  • datamodel_space, datamodel_external_id, datamodel_version
  • filter or instance_ids
  • records_config