Deploy
What this is
This guide explains how to deploy and operate data-quality infrastructure, workflows, and lifecycle-driven configurations in CDF.
When to use it
Use this guide when moving from local validation to scheduled or event-driven production execution.
User mental model
- Persist rules in Data Product + RuleSet (primary model).
- Configure deployment/runtime via YAML (CDF Toolkit representation of Data Product + RuleSet bindings).
- Publish/Deploy infrastructure and workflow definitions.
- Operate with idempotent lifecycle updates and scheduled execution.
Minimal happy path
Quick flow
- Develop rules – Use
run_validation()to validate against DMS. Iterate on rules until results are acceptable. - Deploy – Use
deploy_validation_infrastructure()to deploy the function, workflows, triggers, and data quality containers. - Pipeline (optional) – Use
deploy_validation_pipeline()to run historic validation for a view and set up sync and monitor schedules. - Invoke – Use the invoke helpers to call deployed functions from Python when needed.
Use Data Product + RuleSet as the source of truth for persisted rules. Use YAML as the CDF Toolkit representation of those bindings and runtime settings so local validation and deployment behavior stay consistent.
Deploy validation infrastructure (recommended)
Deploy all validation infrastructure for an environment in one call:
from pathlib import Path
from cognite_data_quality import deploy_validation_infrastructure, load_cognite_client_from_toml
client = load_cognite_client_from_toml("config.toml")
config_root = Path("config/environments/my_env")
settings_path = config_root / "settings.yaml"
views_dir = config_root / "views"
# Orchestrator needs credentials to create triggers
function_secrets = {
"client-id": "your-client-id",
"client-secret": "your-client-secret",
}
deploy_validation_infrastructure(
client=client,
settings_path=settings_path,
views_dir=views_dir,
function_secrets=function_secrets,
force=False,
dry_run=False,
)
This call:
- Ensures Records API container (
dataQuality/DataQualityValidationRecord) - Ensures state containers (
OrchestrationState,FunctionValidationState) and deploys the monitoring data model and views - Deploys the unified validation function (with embedded handler code)
- Deploys instance validation workflows and triggers
- Deploys scheduled uniqueness workflows (
dq-{view}-uniqueness) when SHACL/RuleSet containsdqs:uniquenessConstraintordqs:uniquefor the view - Deploys time series validation workflows when
timeseries_diris provided - Uploads SHACL rules to CDF Files as needed
- Uploads each view config YAML to CDF Files (
{view_external_id}_view_config, e.g.Pump_view_config) - Uploads the environment settings to CDF Files (
{function_external_id}_settings, e.g.data-quality-validation_settings)
Time series datapoint tests should be scheduled in production. Use the environment timeseries config cron cadence and validate rule behavior with the scripts documented in Time Series Datapoint Rule Examples.
You can extract credentials from the client when available:
creds = client.config.credentials
if hasattr(creds, "client_id") and hasattr(creds, "client_secret"):
function_secrets = {"client-id": creds.client_id, "client-secret": creds.client_secret}
else:
function_secrets = None # Orchestrator triggers will fail without secrets
Runtime behavior
Deploy validation pipeline (historic + incremental)
After infrastructure is deployed, run the full validation pipeline for a view (historic partitions, sync trigger, monitor schedule).
From CDF Files (recommended — no local files needed)
Because deployment uploads view configs and settings to CDF Files, you can invoke the pipeline from any environment without needing the local config directory:
from cognite_data_quality import deploy_validation_pipeline
result = deploy_validation_pipeline(
client,
view_config_external_id="Pump_view_config", # uploaded during deploy
)
Settings are automatically fetched from data-quality-validation_settings in CDF Files. Override with settings_external_id if your function uses a non-standard external ID:
result = deploy_validation_pipeline(
client,
view_config_external_id="Pump_view_config",
settings_external_id="my-function_settings",
)
From local YAML files
result = deploy_validation_pipeline(
client,
settings_path=str(settings_path),
view_external_id="MyView",
wait=True,
)
# result: orchestration_id, status, partitions_triggered, partition_count,
# sync_trigger_external_id, monitor_schedule_name, distribution, ...
Settings and view config are read from settings_path and the corresponding view YAML under the same environment. The pipeline triggers partitioned validation for historic data and sets up the sync trigger and monitor schedule for incremental runs.
Advanced parameters
Runtime credentials and fallback
You can keep using function_secrets, or define grouped runtime credentials in settings.yaml:
runtime_credentials:
functions:
client_id: ${DQ_FUNCTION_CLIENT_ID}
client_secret: ${DQ_FUNCTION_CLIENT_SECRET}
workflows:
client_id: ${DQ_WORKFLOW_CLIENT_ID}
client_secret: ${DQ_WORKFLOW_CLIENT_SECRET}
schedules:
client_id: ${DQ_SCHEDULE_CLIENT_ID}
client_secret: ${DQ_SCHEDULE_CLIENT_SECRET}
Resolution order:
- Explicit grouped credentials for the target (
functions,workflows,schedules) - Deployment client credentials (
client.config.credentials) - For function deployment secrets only: legacy env fallback
COGNITE_CLIENT_ID/COGNITE_CLIENT_SECRET(orIDP_*)
This keeps old deployments backward-compatible while allowing credential separation.
Function secrets (explicit override)
function_secrets still takes precedence when passed directly:
deploy_validation_infrastructure(
client=client,
settings_path=settings_path,
views_dir=views_dir,
function_secrets={"client-id": "...", "client-secret": "..."},
)
The orchestrator now supports optional specialized runtime secret keys in addition to legacy keys:
- Legacy fallback keys:
client-id,client-secret - Workflow trigger keys:
workflow-client-id,workflow-client-secret - Function schedule keys:
schedule-client-id,schedule-client-secret
Custom function external ID
Override the default function external ID (e.g. when multiple environments share a project):
deploy_validation_infrastructure(
client=client,
settings_path=settings_path,
views_dir=views_dir,
function_external_id="data-quality-validation-myenv",
)
Time series and SHACL dirs
Optional directories for time series configs and SHACL rules:
deploy_validation_infrastructure(
client=client,
settings_path=settings_path,
views_dir=views_dir,
timeseries_dir=config_root / "timeseries",
shacl_rules_dir=config_root / "shacl_rules",
)
Dry run and force
dry_run=True– Log what would be deployed without making changes.force=True– Redeploy even when content hashes match (useful after dependency or config changes).
DataProduct and RuleSet mode (recommended default)
Set config_source: "dataproduct" in settings.yaml to publish SHACL rules to the CDF RuleSet API and view configs to the DataProduct API. This is the primary model for rule management and persistence. Each view config must declare which DataProduct(s) it belongs to via the dataproducts: list.
# views/my_view.yaml
shacl_rules:
file: "my_view_shacl.ttl"
external_id: "my_view_shacl"
dataproducts:
- external_id: "my-data-product"
version: "1.0.0"
schema_space: "my_schema_space"
In DP mode the view YAML uses the same fields as the standard config (partition settings, validation depth, sync cursor mode, etc.) — these are needed and applied during deploy. The records: block is the one exception: rule_set_id and rule_set_version are set automatically from the DataProduct external_id and version and should be omitted.
deploy_validation_infrastructure() stores each view's config as a ViewConfigState DMS node so the data_product_sync scheduled handler uses the correct settings when it reacts to new DataProduct versions.
Publish behavior is payload-aware and idempotent:
- if the requested RuleSet/DataProduct version already exists, deploy skips version creation
- if the requested version is missing but payload is unchanged, deploy reuses the latest existing version instead of creating a new immutable version
- downstream references use the resolved version (the one that exists after publish/reuse), not just the requested version
Time series datapoint rules in Data Product mode
Time series validation follows the same Data Product / RuleSet model as instance validation. Each timeseries YAML under timeseries/ can declare dataproducts: (one or more DataProducts) and a local TTL file; deploy publishes the TTL to the RuleSet API, adds a CogniteTimeSeries view entry to each DataProduct version, and stores operational settings (filter, schedule, backfill, rule_set_id) in one TimeseriesConfigState DMS node per DataProduct for sync-time replay. At sync time, only RuleSets whose SHACL sh:targetClass matches the timeseries datamodel are attached (so instance RuleSets on the same Data Product are ignored).
# timeseries/demo_sensors.yaml
name: demo-sensors
description: "Datapoint quality for demo sensors"
dataproducts:
- external_id: "iot-data-product"
version: "1.0.0"
shacl_rules:
file: "demo_sensors_quality.ttl"
external_id: "demo-sensors-shacl"
datamodel:
space: "cdf_cdm"
external_id: "CogniteTimeSeries"
version: "v1"
filter:
space: "sensor-space"
external_id_prefix: "demo_"
schedule:
cron: "0 * * * *" # WorkflowScheduledTriggerRule — same pattern as other scheduled DQ workflows
records:
data_domain_external_id: "iot-data-product" # optional; defaults to the owning DataProduct per state node
# Omit rule_set_id and rule_set_version — set from the published RuleSet externalId + DP semver at deploy
In Data Product mode:
- Workflow task payloads use
ruleset_references(notshacl_rules_file_external_id). - Instance validation records use the view
externalIdasrule_set_idand the DataProductexternal_idasdata_domain_external_id. - Timeseries validation records use the RuleSet
externalIdasrule_set_idand the DataProductexternal_idasdata_domain_external_id. - Records use view
externalId+ Data Product semver for instance-validationrule_set_id/rule_set_version. - For timeseries datapoint validation, records use RuleSet
externalId+ Data Product semver. - Timeseries workflows carry
[dp:{version}]tags;data_product_syncredeploys them when the DP version changes.
File-based timeseries deploy (config_source: "yaml") is a transitional path and continues to upload SHACL to CDF Files during migration.
RuleEngineResult incremental listener workflows
You can define scheduled listener workflows that consume changed RuleEngineResult records incrementally (for rule chaining pipelines).
Listener configs are deploy-side YAML (not stored on the Data Product API). Filter on record fields that upstream producers already write.
- Add YAML configs under
rule_engine_result_sync/(or setrule_engine_result_sync.config_dirinsettings.yaml). deploy_validation_infrastructure()loads that directory automatically when the section exists (or passrule_engine_result_sync_dir=...explicitly).- Each config deploys one scheduled workflow and trigger that calls
validation_type: "rule_engine_result_sync". - Deployed workflows enforce
max_concurrent_executions=1(checkpoint safety).
# settings.yaml (optional — enables automatic listener deploy from config dir)
rule_engine_result_sync:
config_dir: "rule_engine_result_sync"
Example listener config for an instance-validation Data Product:
name: maintenance_order_rule_a_listener
description: "Consume new RuleA outputs for maintenance orders"
listener_id: mo-rule-a-listener
source_mode: filter
stream_id: dq_validation_stream
records_space: dataQuality
records_container: RuleEngineResult
initial_watermark_ms: 1710000000000
initialize_cursor: "7d-ago" # sync mode first-run bootstrap when no saved cursor
page_limit: 1000
limit: 1000
schedule:
cron: "*/5 * * * *"
filter:
rule_ids: ["RuleA"]
data_domain_external_ids: ["equipment-data-product"]
rule_set_ids: ["YourOrgMaintenanceOrder"] # view externalId for instance validation
rule_set_versions: ["1.2.0"]
For timeseries upstream producers, use the RuleSet externalId in rule_set_ids instead of the view external ID.
For a runnable end-to-end chain test, see test_and_deploy/test_rule_engine_result_sync_chaining.py.
Operational notes:
source_mode: syncremains the default listener mode. Setinitialize_cursorexplicitly for first-run bootstrap (deploy default:7d-ago).- Scheduled workflows omit full record payloads by default (
include_items: false); userecord_external_idsin the handler response. - In environments with strict filter interval limits, prefer
source_mode: filterwith boundedinitial_watermark_msfor first-run bootstrap. - Listener workflows are deployed with single concurrency to protect checkpoint correctness.
To also deploy the data_product_sync workflow:
deploy_validation_infrastructure(
client=client,
settings_path=settings_path,
views_dir=views_dir,
deploy_data_product_sync=True,
data_product_sync_cron="0 * * * *",
)
When deploy_data_product_sync=True, deployment also provisions a second scheduled workflow:
dq-historic-queue-manager(default cron:*/5 * * * *)- Processes
HistoricJobQueuesequentially (one running orchestration at a time) - Starts the next queued historic job after the current one completes/fails
- When
data_product_syncenqueues a historic job for a new ruleset/DataProduct version, any queued or running jobs for the same view and older version are markedsuperseded, the monitor schedule is removed, and orchestration state is cleaned up so the new version can start on the next queue-manager cycle - Each historic orchestration relies on a per-view function schedule named
Monitor {view} validation(every 2 minutes) to retrigger the orchestrator in monitor mode. If that schedule is missing whileOrchestrationStateis stillin_progress/monitoring, the queue manager marks the jobfailedas orphaned so the queue does not block forever
You can override its schedule:
deploy_validation_infrastructure(
client=client,
settings_path=settings_path,
views_dir=views_dir,
deploy_data_product_sync=True,
data_product_sync_cron="0 * * * *",
historic_queue_manager_cron="*/10 * * * *",
)
Per-view control of automatic historic re-processing is available via:
Set this in a view YAML to prevent data_product_sync from auto-enqueueing historic jobs for that view on DataProduct/ruleset version updates.
Deploying workflows for externally-owned DataProducts
When another team owns the DataProducts and RuleSets in CDF, add external_dataproducts to settings.yaml. No local TTL or view YAML files are required — the deploy function reads the DataProduct from CDF and deploys workflows immediately:
# settings.yaml
config_source: "dataproduct" # or "yaml" — external_dataproducts works with both
external_dataproducts:
- external_id: "equipment-product"
version: "latest" # auto-picks the highest published semver
- external_id: "iot-sensors-product"
version: "1.2.0" # pin to a specific version
deploy_validation_infrastructure(
client=client,
settings_path=settings_path,
# views_dir is optional — external_dataproducts doesn't need local view YAMLs
)
deploy_validation_infrastructure() fetches each DataProduct from CDF via DataProductClient, builds ViewConfig objects from the DataProduct's views and quality.rules, and deploys instance validation workflows directly. The rule_set_id in each validation record is set to the view's externalId; data_domain_external_id tracks the DataProduct. Since no local YAML exists, the data_product_sync handler uses DataProduct API defaults for workflow settings.
Version changes in external_dataproducts are part of workflow change detection. Updating a pinned version in settings.yaml should trigger workflow redeployment even when SHACL content is unchanged.
For time series datapoint validation owned by an external DataProduct, operational settings must still be present as TimeseriesConfigState nodes (written during a prior deploy from local timeseries YAML). The sync handler then rebuilds TimeseriesConfig objects via load_timeseries_configs_from_data_product().
Incremental deployment (single view)
For deploying only the function and workflow for a single view config, see the API reference for deploy_incremental(). It takes a view config path and optional config_env; the main recommended path is deploy_validation_infrastructure() plus deploy_validation_pipeline() as above.
Scheduled uniqueness workflows
Global uniqueness is declared in SHACL/RuleSet — not in view YAML. When deploy scans TTL (local file or fetched RuleSet chunks) and finds dqs:uniquenessConstraint or dqs:unique for a view's target class, it creates:
| Resource | Pattern |
|---|---|
| Workflow | dq-{view}-uniqueness |
| Trigger | cron from dqs:schedule, optional YAML uniqueness_cron override, or default 0 6 * * * |
| Function payload | validation_type: shacl with same rules ref as the sync workflow |
Views without uniqueness constraints in SHACL get sync/incremental workflows only.
Sync-cursor and instance validation handlers do not run uniqueness after each sync — only the scheduled workflow (or an explicit call_validate_shacl() / validation_type: shacl invoke).
Current record behavior for uniqueness failures:
- Runtime suppresses repeated failure records for unchanged instances.
- If a prior failed record for the same
focusNodeis newer/equal to the instancelastUpdatedTime, no new failure record is written. - Once the instance changes, failure records are written again if the violation remains.
Optional YAML override to disable or override schedule:
# views/my_view.yaml — deployment mechanics only; rules stay in TTL/RuleSet
uniqueness_cron: null # disable scheduled uniqueness workflow
# uniqueness_cron: "0 * * * *" # override when SHACL has no dqs:schedule
See Uniqueness and Rule sources for SHACL authoring.
Best practices
- Keep deployment config and local validation config aligned to avoid behavior drift.
- Treat DataProduct/RuleSet publishing as idempotent and payload-aware.
- Use scheduled uniqueness and listener workflows for global checks and chaining.
Troubleshooting
- Deploy succeeds but workflows do not run: verify secrets/credentials and trigger schedules.
- Duplicate lifecycle behavior unexpected: verify version existence, payload reuse, and requested semver.
- Missing uniqueness workflow: ensure SHACL declares uniqueness for target class/property.