Skip to content

Deployment API

The cognite_data_quality.deploy module provides a unified API for deploying data quality validation infrastructure to CDF. Function handler code is embedded in the package; no separate function/ directory or function_source_root is required.

Overview

  • deploy_validation_infrastructure() – Ensures Records and state containers (OrchestrationState, FunctionValidationState), deploys the monitoring data model and views, the unified validation function, instance and (optionally) time series workflows and triggers.
  • deploy_validation_pipeline() – Runs the full validation pipeline for a view (historic partitions, sync trigger, monitor schedule). Call after infrastructure is deployed.
  • deploy_incremental() – Deploys function and workflow for a single view config (alternative to full infrastructure deploy).
  • Component helpers: deploy_functions(), deploy_instance_workflows(), deploy_timeseries_workflows(), deploy_data_models().

Main Functions

deploy_validation_infrastructure()

Unified deployment of all validation infrastructure (recommended). Uses settings_path and views_dir; function code is taken from the package.

from pathlib import Path
from cognite_data_quality import deploy_validation_infrastructure, load_cognite_client_from_toml

client = load_cognite_client_from_toml("config.toml")

settings_path = Path("config/environments/my_env/settings.yaml")
views_dir = Path("config/environments/my_env/views")

deploy_validation_infrastructure(
    client=client,
    settings_path=settings_path,
    views_dir=views_dir,
    function_secrets={"client-id": "...", "client-secret": "..."},  # optional but required for orchestrator triggers
    force=False,
    dry_run=False,
)

deploy_validation_pipeline()

Deploy and run the full validation pipeline for a view (historic data, sync trigger, monitor schedule):

from cognite_data_quality import deploy_validation_pipeline

result = deploy_validation_pipeline(
    client,
    settings_path="config/environments/my_env/settings.yaml",
    view_external_id="MyView",
    wait=True,
)
# result: orchestration_id, partitions_triggered, sync_trigger_external_id, monitor_schedule_name, distribution, ...

deploy_incremental()

Deploy function and workflow for a single view config (takes view config path and optional config_env):

from cognite_data_quality import deploy_incremental

deploy_incremental(
    view_config_path="config/environments/my_env/views/my_view.yaml",
    client=client,
    force=False,
    dry_run=False,
)

Component-Specific Functions

For fine-grained control, you can deploy individual components:

deploy_functions()

Deploy Cognite Functions:

from cognite_data_quality import deploy_functions, load_settings

settings = load_settings("config/environments/my_env/settings.yaml")

deploy_functions(
    client=client,
    settings=settings,
    env_name="my_env",
    force=False,
)

deploy_instance_workflows()

Deploy instance validation workflows:

from cognite_data_quality import deploy_instance_workflows, load_view_configs

view_configs = load_view_configs("config/environments/my_env/views")

deploy_instance_workflows(
    client=client,
    view_configs=view_configs,
    settings=settings,
    env_name="my_env",
    skip_trigger=False,
)

deploy_timeseries_workflows()

Deploy time series validation workflows:

from cognite_data_quality import deploy_timeseries_workflows, load_timeseries_configs

ts_configs = load_timeseries_configs("config/environments/my_env/timeseries")

deploy_timeseries_workflows(
    client=client,
    ts_configs=ts_configs,
    settings=settings,
    env_name="my_env",
)

deploy_data_models()

Deploy data models (optional):

from cognite_data_quality import deploy_data_models, DataModelId

data_models = [
    DataModelId(space="my_space", external_id="MyDataModel", version="v1"),
]

deploy_data_models(
    client=client,
    data_models=data_models,
    source_dir="data_models",
)

Configuration Loaders

load_settings()

Load environment settings from YAML:

from cognite_data_quality import load_settings, DeploymentSettings

settings: DeploymentSettings = load_settings("config/environments/my_env/settings.yaml")

load_view_configs()

Load instance validation configurations:

from cognite_data_quality import load_view_configs

configs = load_view_configs("config/environments/my_env/views")
# Returns: dict[str, ViewConfig]

load_timeseries_configs()

Load time series validation configurations:

from cognite_data_quality import load_timeseries_configs

configs = load_timeseries_configs("config/environments/my_env/timeseries")
# Returns: dict[str, TimeSeriesConfig]

load_views_from_cdf()

Load views directly from CDF DMS:

from cognite_data_quality import load_views_from_cdf

views = load_views_from_cdf(
    client=client,
    space="my_space",
    data_model_external_id="MyDataModel",
    version="v1",
)

Data Classes

DeploymentSettings

Environment-specific deployment settings loaded from settings.yaml.

DataModelId

Identifier for a CDF data model:

from cognite_data_quality import DataModelId

dm_id = DataModelId(
    space="my_space",
    external_id="MyDataModel",
    version="v1",
)

Module Reference

cognite_data_quality.deploy

DataProductRef

Bases: BaseModel

Reference to a CDF DataProduct + version that a view belongs to.

Used inside :attr:ViewConfig.dataproducts to declare which DataProducts should include this view when config_source: "dataproduct" is active.

Attributes:

Name Type Description
external_id str

External ID of the DataProduct.

version str

Semantic version string of the DataProduct version to publish. Default: "1.0.0".

schema_space str | None

DMS space for the DataProduct's schemaSpace field. When omitted, the first view's space is used.

datamodel_external_id str | None

Deprecated — the DataProducts API no longer uses a dataModel wrapper. Ignored.

Source code in cognite_data_quality/deploy.py
class DataProductRef(BaseModel):
    """Reference to a CDF DataProduct + version that a view belongs to.

    Used inside :attr:`ViewConfig.dataproducts` to declare which DataProducts
    should include this view when ``config_source: "dataproduct"`` is active.

    Attributes:
        external_id: External ID of the DataProduct.
        version: Semantic version string of the DataProduct version to publish.
            Default: ``"1.0.0"``.
        schema_space: DMS space for the DataProduct's ``schemaSpace`` field.
            When omitted, the first view's space is used.
        datamodel_external_id: Deprecated — the DataProducts API no longer
            uses a ``dataModel`` wrapper.  Ignored.
    """

    external_id: str
    version: str = "1.0.0"
    schema_space: str | None = None
    datamodel_external_id: str | None = None  # Deprecated: the DataProducts API no longer uses a dataModel wrapper.

DeploymentSettings

Bases: BaseModel

Top-level deployment settings loaded from settings.yaml.

Attributes:

Name Type Description
function FunctionSettings | None

Cognite Function settings. Auto-populated with defaults if omitted from the YAML file.

workflow WorkflowSettings

Workflow settings for instance validation workflows.

timeseries_workflow WorkflowSettings | None

Separate workflow settings for time series workflows. Falls back to workflow when None.

trigger TriggerSettings

Data-change trigger settings for instance validation.

records RecordsSettings

Default Records API coordinates (space, container, stream).

shacl_rules ShaclRulesSettings

Location settings for SHACL rule files.

timeseries TimeSeriesSettings | None

Time series config directory settings. None if no time series validation is configured.

partition_retries int

Maximum partition retry attempts managed by the orchestrator function (no CDF API limit). Default: 3.

function_code_dataset_external_id str | None

Dataset for function code zip uploads. Required when CDF enforces dataset-scoped file access.

config_source Literal['yaml', 'dataproduct']

Where view configs are sourced from. "yaml" (default) reads views/*.yaml and uploads SHACL to CDF Files. "dataproduct" reads the same YAML/TTL files but publishes them to the DataProduct and RuleSet APIs instead. Each view's dataproducts list controls which DataProducts it is included in.

Source code in cognite_data_quality/deploy.py
class DeploymentSettings(BaseModel):
    """Top-level deployment settings loaded from ``settings.yaml``.

    Attributes:
        function: Cognite Function settings. Auto-populated with defaults
            if omitted from the YAML file.
        workflow: Workflow settings for instance validation workflows.
        timeseries_workflow: Separate workflow settings for time series
            workflows.  Falls back to *workflow* when ``None``.
        trigger: Data-change trigger settings for instance validation.
        records: Default Records API coordinates (space, container, stream).
        shacl_rules: Location settings for SHACL rule files.
        timeseries: Time series config directory settings. ``None`` if no
            time series validation is configured.
        partition_retries: Maximum partition retry attempts managed by the
            orchestrator function (no CDF API limit). Default: 3.
        function_code_dataset_external_id: Dataset for function code zip
            uploads.  Required when CDF enforces dataset-scoped file access.
        config_source: Where view configs are sourced from.  ``"yaml"``
            (default) reads ``views/*.yaml`` and uploads SHACL to CDF Files.
            ``"dataproduct"`` reads the same YAML/TTL files but publishes them
            to the DataProduct and RuleSet APIs instead.  Each view's
            ``dataproducts`` list controls which DataProducts it is included in.
    """

    function: FunctionSettings | None = None
    workflow: WorkflowSettings
    timeseries_workflow: WorkflowSettings | None = None
    trigger: TriggerSettings
    records: RecordsSettings = Field(default_factory=RecordsSettings)
    shacl_rules: ShaclRulesSettings
    timeseries: TimeSeriesSettings | None = None
    partition_retries: int = Field(default=3, description="Orchestrator partition retry limit (no CDF API constraint)")
    function_code_dataset_external_id: str | None = Field(
        default=None,
        description="Dataset for function code uploads. Required if CDF enforces dataset-scoped file access.",
    )
    config_source: Literal["yaml", "dataproduct"] = Field(
        default="yaml",
        description=(
            "Config source: 'yaml' uploads SHACL to CDF Files; 'dataproduct' publishes to RuleSet/DataProduct APIs."
        ),
    )

    def model_post_init(self, __context) -> None:
        """Ensure function settings are populated with defaults if not provided."""
        if self.function is None:
            self.function = FunctionSettings()

model_post_init(__context)

Ensure function settings are populated with defaults if not provided.

Source code in cognite_data_quality/deploy.py
def model_post_init(self, __context) -> None:
    """Ensure function settings are populated with defaults if not provided."""
    if self.function is None:
        self.function = FunctionSettings()

FunctionSettings

Bases: BaseModel

Cognite Function settings.

Attributes:

Name Type Description
external_id str

CDF external ID for the function. Default: "data-quality-validation".

name str

Display name shown in CDF Console. Default: "Data Quality Validation".

description str

Short description stored on the function resource.

runtime str

Python runtime version string (e.g. "py311").

source_dir str

Local directory name used as the function root inside the zip archive.

include_files list[str]

List of paths (relative to source_dir) to bundle.

Source code in cognite_data_quality/deploy.py
class FunctionSettings(BaseModel):
    """Cognite Function settings.

    Attributes:
        external_id: CDF external ID for the function. Default: ``"data-quality-validation"``.
        name: Display name shown in CDF Console. Default: ``"Data Quality Validation"``.
        description: Short description stored on the function resource.
        runtime: Python runtime version string (e.g. ``"py311"``).
        source_dir: Local directory name used as the function root inside the zip archive.
        include_files: List of paths (relative to *source_dir*) to bundle.
    """

    external_id: str = Field(default="data-quality-validation")
    name: str = Field(default="Data Quality Validation")
    description: str = Field(
        default="Unified function for all SHACL validation types (instance, partitioned, timeseries, orchestrator)"
    )
    runtime: str = Field(default="py311")
    source_dir: str = Field(default="function")
    include_files: list[str] = Field(
        default_factory=lambda: [
            "handler.py",
            "handlers/__init__.py",
            "handlers/instance_validation.py",
            "handlers/partitioned_validation.py",
            "handlers/raw_validation.py",
            "handlers/timeseries_validation.py",
            "handlers/orchestrator.py",
            "handlers/orchestration_status.py",
            "handlers/test_rule.py",
            "handlers/data_product_sync.py",
            "common/__init__.py",
            "common/shacl_utils.py",
            "common/records_api.py",
            "common/cursor_state.py",
            "common/time_window.py",
            "common/timestamp_collection_state.py",
            "requirements.txt",
        ]
    )

RawTableConfig

Bases: BaseModel

Full configuration for one RAW table validation deployment.

Attributes:

Name Type Description
table RawTableRef

Reference to the CDF RAW database and table.

shacl_rules ShaclRulesRef

SHACL rules file reference.

validation ValidationSettings

Validation behaviour settings.

records RecordsOverrides

Per-table Records API overrides.

partition_count int

Number of partitions for historic cursor-based validation. Default: 5.

Source code in cognite_data_quality/deploy.py
class RawTableConfig(BaseModel):
    """Full configuration for one RAW table validation deployment.

    Attributes:
        table: Reference to the CDF RAW database and table.
        shacl_rules: SHACL rules file reference.
        validation: Validation behaviour settings.
        records: Per-table Records API overrides.
        partition_count: Number of partitions for historic cursor-based
            validation. Default: 5.
    """

    table: RawTableRef
    shacl_rules: ShaclRulesRef
    validation: ValidationSettings = Field(default_factory=ValidationSettings)
    records: RecordsOverrides = Field(default_factory=RecordsOverrides)
    partition_count: int = Field(default=5, description="Number of partitions for batch validation")
    _filename: str | None = None

RawTableRef

Bases: BaseModel

Reference to a CDF RAW table.

Attributes:

Name Type Description
database str

CDF RAW database name.

name str

CDF RAW table name within the database.

Source code in cognite_data_quality/deploy.py
class RawTableRef(BaseModel):
    """Reference to a CDF RAW table.

    Attributes:
        database: CDF RAW database name.
        name: CDF RAW table name within the database.
    """

    database: str
    name: str

RecordsOverrides

Bases: BaseModel

Per-view overrides for Records API coordinates.

All fields are optional; unset fields inherit the global :class:RecordsSettings values.

Attributes:

Name Type Description
rule_set_id str | None

Override the rule set ID written to each record.

rule_set_version str | None

Override the rule set version written to each record.

data_domain_external_id str | None

External ID of the data domain attached to each record.

use_instance_space bool

If True, write records into the same DMS space as the validated instance. Default: False.

records_space str | None

Explicit destination space for records. Only used when use_instance_space is False and overrides the global :attr:RecordsSettings.space.

Source code in cognite_data_quality/deploy.py
class RecordsOverrides(BaseModel):
    """Per-view overrides for Records API coordinates.

    All fields are optional; unset fields inherit the global
    :class:`RecordsSettings` values.

    Attributes:
        rule_set_id: Override the rule set ID written to each record.
        rule_set_version: Override the rule set version written to each record.
        data_domain_external_id: External ID of the data domain attached to
            each record.
        use_instance_space: If ``True``, write records into the same DMS
            space as the validated instance. Default: ``False``.
        records_space: Explicit destination space for records. Only used
            when *use_instance_space* is ``False`` and overrides the global
            :attr:`RecordsSettings.space`.
    """

    rule_set_id: str | None = None
    rule_set_version: str | None = None
    data_domain_external_id: str | None = None
    use_instance_space: bool = False
    records_space: str | None = None

RecordsSettings

Bases: BaseModel

Default Records API coordinates used by all deployed workflows.

Attributes:

Name Type Description
space str

CDF DMS space that hosts the Records container. Default: "dataQuality".

container str

External ID of the DMS container that stores validation records. Default: "DataQualityValidationRecord".

stream_id str

External ID of the Records API stream for ingestion. Default: "dq_validation_stream".

Source code in cognite_data_quality/deploy.py
class RecordsSettings(BaseModel):
    """Default Records API coordinates used by all deployed workflows.

    Attributes:
        space: CDF DMS space that hosts the Records container.
            Default: ``"dataQuality"``.
        container: External ID of the DMS container that stores validation
            records. Default: ``"DataQualityValidationRecord"``.
        stream_id: External ID of the Records API stream for ingestion.
            Default: ``"dq_validation_stream"``.
    """

    space: str = Field(default="dataQuality")
    container: str = Field(default="DataQualityValidationRecord")
    stream_id: str = Field(default="dq_validation_stream")

ScheduleConfig

Bases: BaseModel

Cron-based schedule for time series validation workflows.

Attributes:

Name Type Description
cron str

Cron expression controlling when the workflow runs. Default: "0 6 * * *" (daily at 06:00 UTC).

Source code in cognite_data_quality/deploy.py
class ScheduleConfig(BaseModel):
    """Cron-based schedule for time series validation workflows.

    Attributes:
        cron: Cron expression controlling when the workflow runs.
            Default: ``"0 6 * * *"`` (daily at 06:00 UTC).
    """

    cron: str = Field(default="0 6 * * *")

ShaclRulesRef

Bases: BaseModel

Reference to SHACL rules -- either a local file or RuleSet API references.

Attributes:

Name Type Description
file str | None

Filename (relative to the SHACL source directory) of the .ttl rules file. Required when using file-based rules.

external_id str | None

CDF Files external ID under which the rules file is uploaded and later retrieved by the validation function. Required when using file-based rules.

ruleset_references list[dict] | None

List of {"externalId": ..., "version": ...} dicts pointing to CDF RuleSet API versions. When set, the validation handler fetches SHACL rules from the RuleSet API at runtime instead of from CDF Files.

Source code in cognite_data_quality/deploy.py
class ShaclRulesRef(BaseModel):
    """Reference to SHACL rules -- either a local file or RuleSet API references.

    Attributes:
        file: Filename (relative to the SHACL source directory) of the
            ``.ttl`` rules file.  Required when using file-based rules.
        external_id: CDF Files external ID under which the rules file is
            uploaded and later retrieved by the validation function.
            Required when using file-based rules.
        ruleset_references: List of ``{"externalId": ..., "version": ...}``
            dicts pointing to CDF RuleSet API versions.  When set, the
            validation handler fetches SHACL rules from the RuleSet API
            at runtime instead of from CDF Files.
    """

    file: str | None = None
    external_id: str | None = None
    ruleset_references: list[dict] | None = None

    @model_validator(mode="after")
    def _require_file_or_ruleset(self) -> ShaclRulesRef:
        if self.ruleset_references:
            return self
        if not self.file or not self.external_id:
            raise ValueError("Both 'file' and 'external_id' are required when 'ruleset_references' is not set")
        return self

ShaclRulesSettings

Bases: BaseModel

Location settings for SHACL .ttl rule files.

Attributes:

Name Type Description
source_dir str

Path (relative to settings.yaml) of the directory containing SHACL Turtle files.

mime_type str

MIME type used when uploading rule files to CDF Files. Default: "text/turtle".

dataset_external_id str | None

External ID of the CDF dataset to associate uploaded rule files with. None means no dataset is set.

Source code in cognite_data_quality/deploy.py
class ShaclRulesSettings(BaseModel):
    """Location settings for SHACL ``.ttl`` rule files.

    Attributes:
        source_dir: Path (relative to ``settings.yaml``) of the directory
            containing SHACL Turtle files.
        mime_type: MIME type used when uploading rule files to CDF Files.
            Default: ``"text/turtle"``.
        dataset_external_id: External ID of the CDF dataset to associate
            uploaded rule files with.  ``None`` means no dataset is set.
    """

    source_dir: str
    mime_type: str = Field(default="text/turtle")
    dataset_external_id: str | None = None

TimeSeriesSettings

Bases: BaseModel

Time series config directory settings.

Source code in cognite_data_quality/deploy.py
class TimeSeriesSettings(BaseModel):
    """Time series config directory settings."""

    config_dir: str

TimeseriesConfig

Bases: BaseModel

Full configuration for one time series validation workflow.

Attributes:

Name Type Description
name str

Unique name for this configuration; used as the workflow external ID suffix and display name.

description str | None

Optional human-readable description stored on the workflow resource.

filter TimeseriesFilter | None

Server-side DMS filter for selecting time series instances. Mutually exclusive with instance_ids.

instance_ids list[TimeseriesInstanceId] | None

Explicit list of time series instances to validate. Mutually exclusive with filter.

shacl_rules ShaclRulesRef

SHACL rules file reference.

datamodel ViewRef

Data model containing the time series view definition.

validation ValidationSettings

Validation behaviour settings.

records RecordsOverrides

Per-config Records API overrides.

schedule ScheduleConfig

Cron schedule for the validation trigger.

backfill dict | None

Optional backfill configuration dict with keys enabled, start_time, end_time, and window_minutes.

Source code in cognite_data_quality/deploy.py
class TimeseriesConfig(BaseModel):
    """Full configuration for one time series validation workflow.

    Attributes:
        name: Unique name for this configuration; used as the workflow
            external ID suffix and display name.
        description: Optional human-readable description stored on the
            workflow resource.
        filter: Server-side DMS filter for selecting time series instances.
            Mutually exclusive with *instance_ids*.
        instance_ids: Explicit list of time series instances to validate.
            Mutually exclusive with *filter*.
        shacl_rules: SHACL rules file reference.
        datamodel: Data model containing the time series view definition.
        validation: Validation behaviour settings.
        records: Per-config Records API overrides.
        schedule: Cron schedule for the validation trigger.
        backfill: Optional backfill configuration dict with keys
            ``enabled``, ``start_time``, ``end_time``, and
            ``window_minutes``.
    """

    name: str
    description: str | None = None
    filter: TimeseriesFilter | None = None
    instance_ids: list[TimeseriesInstanceId] | None = None
    shacl_rules: ShaclRulesRef
    datamodel: ViewRef
    validation: ValidationSettings = Field(default_factory=ValidationSettings)
    records: RecordsOverrides = Field(default_factory=RecordsOverrides)
    schedule: ScheduleConfig = Field(default_factory=ScheduleConfig)
    backfill: dict | None = None
    _filename: str | None = None

TimeseriesFilter

Bases: BaseModel

Server-side filter for selecting time series instances to validate.

Attributes:

Name Type Description
space str | None

Restrict to instances in this CDF space.

external_id_prefix str | None

Restrict to instances whose external ID starts with this prefix.

Source code in cognite_data_quality/deploy.py
class TimeseriesFilter(BaseModel):
    """Server-side filter for selecting time series instances to validate.

    Attributes:
        space: Restrict to instances in this CDF space.
        external_id_prefix: Restrict to instances whose external ID starts
            with this prefix.
    """

    space: str | None = None
    external_id_prefix: str | None = None

TimeseriesInstanceId

Bases: BaseModel

Explicit instance reference for a single time series to validate.

Attributes:

Name Type Description
space str

CDF space containing the instance.

external_id str

External ID of the time series instance.

Source code in cognite_data_quality/deploy.py
class TimeseriesInstanceId(BaseModel):
    """Explicit instance reference for a single time series to validate.

    Attributes:
        space: CDF space containing the instance.
        external_id: External ID of the time series instance.
    """

    space: str
    external_id: str

TriggerSettings

Bases: BaseModel

Settings for data-change triggers on instance validation workflows.

Attributes:

Name Type Description
external_id_prefix str

Prefix used when constructing per-view trigger external IDs.

batch_size int

Maximum number of instances per trigger payload. Default: 10.

batch_timeout int

Seconds to wait before flushing a partial batch. Default: 60.

Source code in cognite_data_quality/deploy.py
class TriggerSettings(BaseModel):
    """Settings for data-change triggers on instance validation workflows.

    Attributes:
        external_id_prefix: Prefix used when constructing per-view trigger
            external IDs.
        batch_size: Maximum number of instances per trigger payload.
            Default: 10.
        batch_timeout: Seconds to wait before flushing a partial batch.
            Default: 60.
    """

    external_id_prefix: str
    batch_size: int = Field(default=10)
    batch_timeout: int = Field(default=60)

ValidationSettings

Bases: BaseModel

Per-view validation behaviour settings.

Attributes:

Name Type Description
auto_load_depth int

Number of levels of referenced instances to auto-load before validation (0-3). Higher values increase coverage but reduce speed. Default: 2.

verbose bool

Enable verbose logging inside the validation function. Default: True.

Source code in cognite_data_quality/deploy.py
class ValidationSettings(BaseModel):
    """Per-view validation behaviour settings.

    Attributes:
        auto_load_depth: Number of levels of referenced instances to
            auto-load before validation (0-3).  Higher values increase
            coverage but reduce speed. Default: 2.
        verbose: Enable verbose logging inside the validation function.
            Default: ``True``.
    """

    auto_load_depth: int = Field(default=2)
    verbose: bool = Field(default=True)

ViewConfig

Bases: BaseModel

Full configuration for one instance-validation view.

Attributes:

Name Type Description
view ViewRef

Reference to the CDF view whose instances are validated.

instance_spaces list[str]

CDF spaces that contain the instances to validate. Accepts a single string (coerced to a list) or a list.

shacl_rules ShaclRulesRef

SHACL rules file reference.

validation ValidationSettings

Validation behaviour settings (depth, verbosity).

records RecordsOverrides

Per-view Records API overrides.

workflow_external_id str | None

Explicit CDF workflow external ID. If None, the ID is derived from settings.workflow.external_id_prefix and view.external_id.

trigger_external_id str | None

Explicit trigger external ID. Auto-derived when None.

partition_count int

Number of partitions for batch historic validation. Default: 10.

partition_field str

DMS property used to range-partition instances (e.g. "lastUpdatedTime"). Default: "lastUpdatedTime".

chunk_size int

Instances validated per function invocation. Default: 200.

sync_batch_size int | None

Overrides :attr:TriggerSettings.batch_size for this view's sync trigger. Falls back to the global value when None.

use_sync_cursor_mode bool

Use cursor-based sync instead of passing instances in the trigger payload. Requires max_concurrent_executions=1. Default: False.

max_concurrent_executions int | None

Maximum simultaneous CDF workflow executions. Default: 10.

Source code in cognite_data_quality/deploy.py
class ViewConfig(BaseModel):
    """Full configuration for one instance-validation view.

    Attributes:
        view: Reference to the CDF view whose instances are validated.
        instance_spaces: CDF spaces that contain the instances to validate.
            Accepts a single string (coerced to a list) or a list.
        shacl_rules: SHACL rules file reference.
        validation: Validation behaviour settings (depth, verbosity).
        records: Per-view Records API overrides.
        workflow_external_id: Explicit CDF workflow external ID.  If ``None``,
            the ID is derived from *settings.workflow.external_id_prefix*
            and *view.external_id*.
        trigger_external_id: Explicit trigger external ID.  Auto-derived
            when ``None``.
        partition_count: Number of partitions for batch historic validation.
            Default: 10.
        partition_field: DMS property used to range-partition instances
            (e.g. ``"lastUpdatedTime"``). Default: ``"lastUpdatedTime"``.
        chunk_size: Instances validated per function invocation. Default: 200.
        sync_batch_size: Overrides :attr:`TriggerSettings.batch_size` for
            this view's sync trigger.  Falls back to the global value when
            ``None``.
        use_sync_cursor_mode: Use cursor-based sync instead of passing
            instances in the trigger payload.  Requires
            ``max_concurrent_executions=1``. Default: ``False``.
        max_concurrent_executions: Maximum simultaneous CDF workflow
            executions. Default: 10.
    """

    view: ViewRef
    instance_spaces: list[str]
    shacl_rules: ShaclRulesRef
    validation: ValidationSettings = Field(default_factory=ValidationSettings)
    records: RecordsOverrides = Field(default_factory=RecordsOverrides)
    dataproducts: list[DataProductRef] = Field(
        default_factory=list,
        description=(
            "DataProducts this view belongs to. Used when config_source='dataproduct' "
            "to declare which DataProduct(s) the view is published into. "
            "A view can belong to multiple DataProducts."
        ),
    )
    workflow_external_id: str | None = None
    trigger_external_id: str | None = None
    partition_count: int = Field(default=10, description="Number of partitions for batch validation")
    partition_field: str = Field(
        default="lastUpdatedTime", description="Field to use for partitioning (e.g., lastUpdatedTime, createdTime)"
    )
    chunk_size: int = Field(default=200, description="Instances per validation chunk")
    sync_batch_size: int | None = Field(
        default=None, description="Sync trigger batch size override; falls back to trigger.batch_size in settings"
    )
    use_sync_cursor_mode: bool = Field(
        default=False,
        description="Use cursor-based sync instead of payload instances. "
        "Trigger fires as a signal (no properties fetched); function "
        "fetches changes via sync cursor. Requires max_concurrent_executions=1.",
    )
    max_concurrent_executions: int | None = Field(
        default=10, description="Maximum number of concurrent workflow executions (set on the CDF workflow)"
    )
    _filename: str | None = None

    @model_validator(mode="before")
    @classmethod
    def normalise_instance_space(cls, data: dict) -> dict:
        """Accept both `instance_space` (str) and `instance_spaces` (list) from YAML."""
        if isinstance(data, dict) and "instance_space" in data and "instance_spaces" not in data:
            data["instance_spaces"] = data.pop("instance_space")
        return data

    @field_validator("instance_spaces", mode="before")
    @classmethod
    def coerce_to_list(cls, v: object) -> list[str]:
        """Coerce a bare string to a single-element list."""
        if isinstance(v, str):
            return [v]
        return v  # type: ignore[return-value]

    @model_validator(mode="after")
    def check_cursor_mode_concurrency(self) -> ViewConfig:
        if self.use_sync_cursor_mode and self.max_concurrent_executions != 1:
            raise ValueError(
                "'use_sync_cursor_mode' is enabled, so 'max_concurrent_executions' must be 1 "
                "to prevent race conditions on the sync cursor."
            )
        return self

coerce_to_list(v) classmethod

Coerce a bare string to a single-element list.

Source code in cognite_data_quality/deploy.py
@field_validator("instance_spaces", mode="before")
@classmethod
def coerce_to_list(cls, v: object) -> list[str]:
    """Coerce a bare string to a single-element list."""
    if isinstance(v, str):
        return [v]
    return v  # type: ignore[return-value]

normalise_instance_space(data) classmethod

Accept both instance_space (str) and instance_spaces (list) from YAML.

Source code in cognite_data_quality/deploy.py
@model_validator(mode="before")
@classmethod
def normalise_instance_space(cls, data: dict) -> dict:
    """Accept both `instance_space` (str) and `instance_spaces` (list) from YAML."""
    if isinstance(data, dict) and "instance_space" in data and "instance_spaces" not in data:
        data["instance_spaces"] = data.pop("instance_space")
    return data

ViewRef

Bases: BaseModel

Reference to a CDF Data Model view.

Attributes:

Name Type Description
space str

CDF space that contains the view.

external_id str

External ID of the view.

version str

Version string of the view.

Source code in cognite_data_quality/deploy.py
class ViewRef(BaseModel):
    """Reference to a CDF Data Model view.

    Attributes:
        space: CDF space that contains the view.
        external_id: External ID of the view.
        version: Version string of the view.
    """

    space: str
    external_id: str
    version: str

WorkflowSettings

Bases: BaseModel

Top-level settings for a CDF Workflow resource.

Attributes:

Name Type Description
external_id_prefix str

Prefix used when constructing per-view workflow external IDs (e.g. "dq-shacl""dq-shacl-pump").

version str

Workflow version string. Default: "1".

task WorkflowTaskSettings

Per-task retry/timeout settings.

Source code in cognite_data_quality/deploy.py
class WorkflowSettings(BaseModel):
    """Top-level settings for a CDF Workflow resource.

    Attributes:
        external_id_prefix: Prefix used when constructing per-view workflow
            external IDs (e.g. ``"dq-shacl"`` → ``"dq-shacl-pump"``).
        version: Workflow version string. Default: ``"1"``.
        task: Per-task retry/timeout settings.
    """

    external_id_prefix: str
    version: str = Field(default="1")
    task: WorkflowTaskSettings = Field(default_factory=WorkflowTaskSettings)

WorkflowTaskSettings

Bases: BaseModel

Retry and timeout settings applied to each CDF workflow task.

Attributes:

Name Type Description
retries int

Number of task-level retries (CDF API limit: 0-10). Default: 3.

timeout int

Task timeout in seconds. Default: 600.

on_failure str

Behaviour on unrecoverable failure — "abortWorkflow" or "skipTask". Default: "abortWorkflow".

Source code in cognite_data_quality/deploy.py
class WorkflowTaskSettings(BaseModel):
    """Retry and timeout settings applied to each CDF workflow task.

    Attributes:
        retries: Number of task-level retries (CDF API limit: 0-10). Default: 3.
        timeout: Task timeout in seconds. Default: 600.
        on_failure: Behaviour on unrecoverable failure — ``"abortWorkflow"`` or
            ``"skipTask"``. Default: ``"abortWorkflow"``.
    """

    retries: int = Field(default=3, ge=0, le=10, description="CDF workflow task retries (API limit: 0-10)")
    timeout: int = Field(default=600)
    on_failure: str = Field(default="abortWorkflow")

deploy_data_models(*, client, space='dataQuality', containers=None, data_model_id='DataQualityMonitoring', data_model_version='1', dry_run=False)

Deploy data models with auto-generated views from containers.

Creates views from existing DMS containers, making container data accessible through the CDF Data Modeling UI.

Parameters:

Name Type Description Default
client CogniteClient

CogniteClient instance

required
space str

Space containing containers (default: "dataQuality")

'dataQuality'
containers list[str] | None

List of container IDs to create views for (default: ["OrchestrationState", "FunctionValidationState"])

None
data_model_id str

External ID for the data model

'DataQualityMonitoring'
data_model_version str

Version string

'1'
dry_run bool

Preview without deploying

False

Returns:

Type Description
dict[str, object]

dict with deployment results (datamodel, views, status)

Example

from cognite_data_quality import deploy_data_models from cognite.client import CogniteClient client = CogniteClient() result = deploy_data_models(client=client, dry_run=False) print(f"Created {len(result['views'])} views")

Source code in cognite_data_quality/deploy.py
def deploy_data_models(
    *,
    client: CogniteClient,
    space: str = "dataQuality",
    containers: list[str] | None = None,
    data_model_id: str = "DataQualityMonitoring",
    data_model_version: str = "1",
    dry_run: bool = False,
) -> dict[str, object]:
    """Deploy data models with auto-generated views from containers.

    Creates views from existing DMS containers, making container data
    accessible through the CDF Data Modeling UI.

    Args:
        client: CogniteClient instance
        space: Space containing containers (default: "dataQuality")
        containers: List of container IDs to create views for
                   (default: ["OrchestrationState", "FunctionValidationState"])
        data_model_id: External ID for the data model
        data_model_version: Version string
        dry_run: Preview without deploying

    Returns:
        dict with deployment results (datamodel, views, status)

    Example:
        >>> from cognite_data_quality import deploy_data_models
        >>> from cognite.client import CogniteClient
        >>> client = CogniteClient()
        >>> result = deploy_data_models(client=client, dry_run=False)
        >>> print(f"Created {len(result['views'])} views")
    """
    from cognite.client.data_classes.data_modeling import (
        DataModelApply,
        MappedPropertyApply,
        ViewApply,
    )
    from cognite.client.data_classes.data_modeling.ids import ContainerId, ViewId

    if containers is None:
        containers = ["OrchestrationState", "FunctionValidationState"]

    views = []

    for container_id in containers:
        print(f"  Processing container: {space}/{container_id}")

        # Retrieve container
        try:
            container = client.data_modeling.containers.retrieve((space, container_id))
        except Exception as e:
            print(f"  ERROR: Could not retrieve container: {e}")
            continue

        if not container:
            print(f"  WARNING: Container {space}/{container_id} not found, skipping")
            continue

        # Build view from container
        view_id = f"{container_id}View"
        view_properties = {}

        for prop_name, prop_def in container.properties.items():
            view_properties[prop_name] = MappedPropertyApply(
                container=ContainerId(space=space, external_id=container_id),
                container_property_identifier=prop_name,
                name=prop_def.description.split(".")[0].strip() if prop_def.description else prop_name,
                description=prop_def.description or "",
            )

        view = ViewApply(
            space=space,
            external_id=view_id,
            version=data_model_version,
            name=f"{container_id} View",
            description=f"Auto-generated view for {container_id} container",
            properties=view_properties,
        )

        # Check if view already exists
        try:
            existing_view = client.data_modeling.views.retrieve((space, view_id, data_model_version))
            if existing_view:
                print(f"    ✓ View {view_id} already exists ({len(view_properties)} properties)")
            else:
                print(f"    • Creating view: {view_id} ({len(view_properties)} properties)")
        except Exception:
            print(f"    • Creating view: {view_id} ({len(view_properties)} properties)")

        views.append(view)

    if not views:
        print("  No views to create")
        return {
            "datamodel": None,
            "views": [],
            "status": "skipped",
        }

    if dry_run:
        print("  [DRY RUN] Would create data model and views")
        return {
            "datamodel": {"space": space, "external_id": data_model_id, "version": data_model_version},
            "views": [{"space": v.space, "external_id": v.external_id, "version": v.version} for v in views],
            "status": "dry_run",
        }

    # Apply views
    client.data_modeling.views.apply(views)
    print(f"  Ensured {len(views)} views are deployed")

    # Create data model
    view_ids = [ViewId(space=v.space, external_id=v.external_id, version=v.version) for v in views]
    data_model = DataModelApply(
        space=space,
        external_id=data_model_id,
        version=data_model_version,
        name="Data Quality Monitoring",
        description="Data models for data quality orchestration and state tracking",
        views=view_ids,
    )

    client.data_modeling.data_models.apply(data_model)
    print(f"  Ensured data model: {space}/{data_model_id} v{data_model_version}")

    return {
        "datamodel": {"space": space, "external_id": data_model_id, "version": data_model_version},
        "views": [{"space": v.space, "external_id": v.external_id, "version": v.version} for v in views],
        "status": "deployed",
    }

deploy_functions(*, client, settings, force=False, dry_run=False, debug_save_zip=None, secrets=None)

Deploy the unified SHACL validation function using embedded code.

Uses content hashing to skip redeployment when the function code is unchanged (hash stored in function metadata).

Parameters:

Name Type Description Default
client CogniteClient

Cognite client for CDF API access.

required
settings DeploymentSettings

Deployment settings containing function external ID, runtime, and file list.

required
force bool

Force redeployment even if the function code hash matches the currently deployed function.

False
dry_run bool

Preview changes without deploying.

False
debug_save_zip Path | str | None

Directory path to save the function zip archive for inspection (e.g. "dist/debug-zips").

None
secrets dict[str, str] | None

Function secrets passed at creation time, e.g. {"client-id": "...", "client-secret": "..."}. Falls back to COGNITE_CLIENT_ID / COGNITE_CLIENT_SECRET environment variables if not provided.

None

Returns:

Type Description
dict[str, dict[str, str]]

dict with key "function" whose value is a result dict

dict[str, dict[str, str]]

containing "function" (external ID) and "status"

dict[str, dict[str, str]]

(one of "deployed", "skipped", or "dry_run").

Source code in cognite_data_quality/deploy.py
def deploy_functions(
    *,
    client: CogniteClient,
    settings: DeploymentSettings,
    force: bool = False,
    dry_run: bool = False,
    debug_save_zip: Path | str | None = None,
    secrets: dict[str, str] | None = None,
) -> dict[str, dict[str, str]]:
    """Deploy the unified SHACL validation function using embedded code.

    Uses content hashing to skip redeployment when the function code is
    unchanged (hash stored in function metadata).

    Args:
        client: Cognite client for CDF API access.
        settings: Deployment settings containing function external ID,
            runtime, and file list.
        force: Force redeployment even if the function code hash matches
            the currently deployed function.
        dry_run: Preview changes without deploying.
        debug_save_zip: Directory path to save the function zip archive
            for inspection (e.g. ``"dist/debug-zips"``).
        secrets: Function secrets passed at creation time, e.g.
            ``{"client-id": "...", "client-secret": "..."}``.
            Falls back to ``COGNITE_CLIENT_ID`` / ``COGNITE_CLIENT_SECRET``
            environment variables if not provided.

    Returns:
        dict with key ``"function"`` whose value is a result dict
        containing ``"function"`` (external ID) and ``"status"``
        (one of ``"deployed"``, ``"skipped"``, or ``"dry_run"``).
    """
    if secrets is None:
        import os

        client_id = os.getenv("COGNITE_CLIENT_ID") or os.getenv("IDP_CLIENT_ID")
        client_secret = os.getenv("COGNITE_CLIENT_SECRET") or os.getenv("IDP_CLIENT_SECRET")
        if client_id and client_secret:
            secrets = {"client-id": client_id, "client-secret": client_secret}

    save_zip_dir = Path(debug_save_zip) if debug_save_zip else None

    result = _deploy_function(
        client,
        settings.function,
        force=force,
        dry_run=dry_run,
        function_code_dataset_external_id=settings.function_code_dataset_external_id,
        debug_save_zip=save_zip_dir,
        secrets=secrets,
    )

    return {"function": result}

deploy_incremental(*, view_config_path, config_env=None, force=False, dry_run=False, client=None)

Deploy function, workflow, and trigger for a single view config.

Loads deployment settings from the repository's scripts/ directory via shared.load_settings and deploys the validation pipeline for one view.

Parameters:

Name Type Description Default
view_config_path Path | str

Path to the view YAML config file.

required
config_env str | None

Environment name (e.g. "cog-ai"). Passed to shared.load_settings as the config_env argument. Defaults to the CONFIG_ENV environment variable when None.

None
force bool

Force redeployment even if content hashes are unchanged.

False
dry_run bool

Preview without making any CDF API calls.

False
client CogniteClient | None

Cognite client. Created from environment variables if None and dry_run is False.

None

Returns:

Type Description
dict[str, object]

dict with keys "function", "workflow", and "dry_run".

Source code in cognite_data_quality/deploy.py
def deploy_incremental(
    *,
    view_config_path: Path | str,
    config_env: str | None = None,
    force: bool = False,
    dry_run: bool = False,
    client: CogniteClient | None = None,
) -> dict[str, object]:
    """Deploy function, workflow, and trigger for a single view config.

    Loads deployment settings from the repository's ``scripts/`` directory
    via ``shared.load_settings`` and deploys the validation pipeline for
    one view.

    Args:
        view_config_path: Path to the view YAML config file.
        config_env: Environment name (e.g. ``"cog-ai"``).  Passed to
            ``shared.load_settings`` as the ``config_env`` argument.
            Defaults to the ``CONFIG_ENV`` environment variable when
            ``None``.
        force: Force redeployment even if content hashes are unchanged.
        dry_run: Preview without making any CDF API calls.
        client: Cognite client.  Created from environment variables if
            ``None`` and *dry_run* is ``False``.

    Returns:
        dict with keys ``"function"``, ``"workflow"``, and ``"dry_run"``.
    """
    import importlib.util
    import sys

    def _load_module(module_name: str, path: Path):
        spec = importlib.util.spec_from_file_location(module_name, path)
        if spec is None or spec.loader is None:
            raise ImportError(f"Could not load module {module_name} from {path}")
        module = importlib.util.module_from_spec(spec)
        sys.modules[module_name] = module
        spec.loader.exec_module(module)
        return module

    def _load_view_config(path: Path) -> dict:
        if not path.exists():
            raise FileNotFoundError(f"View config not found: {path}")
        import yaml

        config = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
        config["_filename"] = path.name
        return config

    def _repo_root() -> Path:
        return Path(__file__).resolve().parents[1]

    repo_root = _repo_root()
    scripts_dir = repo_root / "scripts"

    shared = _load_module("dq_shared", scripts_dir / "shared.py")
    deploy_function = _load_module("dq_deploy_function", scripts_dir / "deploy_function.py")
    deploy_workflows = _load_module("dq_deploy_workflows", scripts_dir / "deploy_workflows.py")

    settings = shared.load_settings(repo_root, config_env)
    view_config = _load_view_config(Path(view_config_path))

    if client is None and not dry_run:
        client = shared.create_client()

    function_result = deploy_function.deploy_function_by_settings_key(
        client, repo_root, settings, "function", force=force, dry_run=dry_run
    )
    workflow_result = deploy_workflows.deploy_view(
        client, repo_root, settings, view_config, force=force, dry_run=dry_run
    )

    return {
        "function": function_result,
        "workflow": workflow_result,
        "dry_run": dry_run,
    }

deploy_raw_table_shacl(*, client, settings, tables, shacl_dir, dry_run=False)

Deploy SHACL rules for RAW table validation.

For RAW tables, we only upload SHACL files - no workflows or triggers are created. RAW validation is triggered explicitly via function calls.

Parameters:

Name Type Description Default
client CogniteClient

CogniteClient instance

required
settings DeploymentSettings

Deployment settings with records configuration

required
tables list[RawTableConfig]

List of RAW table configs

required
shacl_dir Path

Directory containing SHACL rule files

required
dry_run bool

Preview without uploading

False

Returns:

Type Description
list[dict[str, object]]

list of deployment results for each table

Source code in cognite_data_quality/deploy.py
def deploy_raw_table_shacl(
    *,
    client: CogniteClient,
    settings: DeploymentSettings,
    tables: list[RawTableConfig],
    shacl_dir: Path,
    dry_run: bool = False,
) -> list[dict[str, object]]:
    """Deploy SHACL rules for RAW table validation.

    For RAW tables, we only upload SHACL files - no workflows or triggers are created.
    RAW validation is triggered explicitly via function calls.

    Args:
        client: CogniteClient instance
        settings: Deployment settings with records configuration
        tables: List of RAW table configs
        shacl_dir: Directory containing SHACL rule files
        dry_run: Preview without uploading

    Returns:
        list of deployment results for each table
    """
    if not tables:
        return []

    results: list[dict[str, object]] = []

    for table_config in tables:
        db_name = table_config.table.database
        table_name = table_config.table.name
        print(f"\n  RAW table: {db_name}.{table_name}")

        # Upload SHACL rules
        records = settings.records
        overrides = table_config.records
        _upload_shacl_rules(
            client,
            settings,
            shacl_dir,
            table_config.shacl_rules,
            dry_run=dry_run,
            rule_set_id=overrides.rule_set_id if overrides else records.container,
            rule_set_version=overrides.rule_set_version if overrides else None,
        )

        results.append(
            {
                "database": db_name,
                "table": table_name,
                "shacl_file": table_config.shacl_rules.external_id,
                "status": "deployed" if not dry_run else "dry_run",
            }
        )

    return results

deploy_timeseries_workflows(*, client, settings, configs, shacl_dir, force=False, dry_run=False, config_filter=None, skip_unchanged=True)

Deploy scheduled workflows and triggers for time series validation.

Parameters:

Name Type Description Default
client CogniteClient

CogniteClient instance

required
settings DeploymentSettings

Deployment settings

required
configs list[TimeseriesConfig]

List of timeseries configurations

required
shacl_dir Path

Directory containing SHACL rules files

required
force bool

Force redeployment even if config unchanged

False
dry_run bool

Preview without making changes

False
config_filter list[str] | None

Optional list of config names to deploy (deploy all if None)

None
skip_unchanged bool

Skip deployment if hash matches deployed workflow (default: True)

True

Returns:

Type Description
list[dict[str, object]]

List of deployment results with status for each config

Source code in cognite_data_quality/deploy.py
def deploy_timeseries_workflows(
    *,
    client: CogniteClient,
    settings: DeploymentSettings,
    configs: list[TimeseriesConfig],
    shacl_dir: Path,
    force: bool = False,
    dry_run: bool = False,
    config_filter: list[str] | None = None,
    skip_unchanged: bool = True,
) -> list[dict[str, object]]:
    """Deploy scheduled workflows and triggers for time series validation.

    Args:
        client: CogniteClient instance
        settings: Deployment settings
        configs: List of timeseries configurations
        shacl_dir: Directory containing SHACL rules files
        force: Force redeployment even if config unchanged
        dry_run: Preview without making changes
        config_filter: Optional list of config names to deploy (deploy all if None)
        skip_unchanged: Skip deployment if hash matches deployed workflow (default: True)

    Returns:
        List of deployment results with status for each config
    """
    # Filter configs if specified
    if config_filter:
        configs = [c for c in configs if c.name in config_filter]
        print(f"  Filtered to {len(configs)} timeseries configs")

    # Get dataset ID for workflows (use SHACL rules dataset)
    dataset_id = None
    if settings.shacl_rules.dataset_external_id:
        dataset = client.data_sets.retrieve(external_id=settings.shacl_rules.dataset_external_id)
        if dataset:
            dataset_id = dataset.id

    workflow_settings = settings.timeseries_workflow or settings.workflow
    results: list[dict[str, object]] = []
    for config in configs:
        workflow_external_id = f"{workflow_settings.external_id_prefix}-{config.name}"

        # Compute current config hash for change detection
        shacl_file_path = shacl_dir / config.shacl_rules.file
        shacl_content = shacl_file_path.read_text() if shacl_file_path.exists() else ""
        current_hash = _compute_timeseries_config_hash(config, shacl_content)

        # Check if deployment needed
        if not dry_run:
            deployed_hash = _get_deployed_workflow_hash(client, workflow_external_id)

            if skip_unchanged and not force and current_hash == deployed_hash:
                print(f"  Workflow {workflow_external_id} up to date (hash: {current_hash})")
                results.append(
                    {
                        "name": config.name,
                        "workflow_external_id": workflow_external_id,
                        "trigger_external_id": f"dq-ts-trigger-{config.name}",
                        "status": "skipped",
                        "reason": "unchanged",
                    }
                )
                continue

            if deployed_hash and deployed_hash != current_hash:
                print(f"  Config changed: {deployed_hash} -> {current_hash}")
            elif not deployed_hash:
                print(f"  New workflow (hash: {current_hash})")

        records = settings.records
        overrides = config.records
        _upload_shacl_rules(
            client,
            settings,
            shacl_dir,
            config.shacl_rules,
            dry_run=dry_run,
            rule_set_id=overrides.rule_set_id if overrides else records.container,
            rule_set_version=overrides.rule_set_version if overrides else None,
        )

        task_data: dict[str, object] = {
            "shacl_rules_file_external_id": config.shacl_rules.external_id,
            "datamodel_space": config.datamodel.space,
            "datamodel_external_id": config.datamodel.external_id,
            "datamodel_version": config.datamodel.version,
            "verbose": config.validation.verbose,
            "max_retries": settings.partition_retries,
            "records_config": {
                "stream_id": records.stream_id,
                "rule_set_id": overrides.rule_set_id or f"{config.name}SHACL",
                "rule_set_version": overrides.rule_set_version or "1.0",
                "data_domain_external_id": overrides.data_domain_external_id or "TimeSeries",
                "records_space": records.space,
                "records_container": records.container,
                "use_instance_space": overrides.use_instance_space,
                "record_space": overrides.records_space,
            },
        }
        if config.filter is not None:
            task_data["filter"] = config.filter.model_dump(exclude_none=True)
        if config.instance_ids is not None:
            task_data["instance_ids"] = [item.model_dump() for item in config.instance_ids]
        if config.backfill:
            task_data["backfill"] = config.backfill

        task_data["validation_type"] = "timeseries"

        task = WorkflowTask(
            external_id=f"validate-ts-{config.name}",
            name=f"Validate Time Series: {config.name}",
            description=config.description or "Time series quality validation",
            parameters=FunctionTaskParameters(
                external_id=settings.function.external_id,
                data=task_data,
            ),
            retries=workflow_settings.task.retries,
            timeout=workflow_settings.task.timeout,
            on_failure=workflow_settings.task.on_failure,
        )

        if not dry_run:
            print(f"  Creating/updating workflow: {workflow_external_id}")
            client.workflows.upsert(
                WorkflowUpsert(
                    external_id=workflow_external_id,
                    description=f"Time series quality validation: {config.name} [hash:{current_hash}]",
                    data_set_id=dataset_id,
                )
            )
            workflow_definition = WorkflowDefinitionUpsert(
                tasks=[task], description=f"Time series validation workflow: {config.name}"
            )
            workflow_version = WorkflowVersionUpsert(
                workflow_external_id=workflow_external_id,
                version=workflow_settings.version,
                workflow_definition=workflow_definition,
            )
            client.workflows.versions.upsert(workflow_version)

        trigger_external_id = f"dq-ts-trigger-{config.name}"
        if not dry_run:
            creds = client.config.credentials
            trigger_credentials = ClientCredentials(
                client_id=creds.client_id,
                client_secret=creds.client_secret,
            )
            trigger_rule = WorkflowScheduledTriggerRule(cron_expression=config.schedule.cron)
            trigger = WorkflowTriggerUpsert(
                external_id=trigger_external_id,
                trigger_rule=trigger_rule,
                workflow_external_id=workflow_external_id,
                workflow_version=workflow_settings.version,
                metadata={"description": f"Scheduled trigger for {config.name}", "cron": config.schedule.cron},
            )
            try:
                client.workflows.triggers.delete(external_id=trigger_external_id)
            except Exception:
                pass
            client.workflows.triggers.upsert(trigger, trigger_credentials)

        results.append(
            {
                "name": config.name,
                "workflow_external_id": workflow_external_id,
                "trigger_external_id": trigger_external_id,
                "status": "deployed" if not dry_run else "dry_run",
            }
        )
    return results

deploy_validation_infrastructure(*, client, settings_path, views_dir=None, filter_views_by_shacl_rules=False, timeseries_dir=None, shacl_rules_dir=None, function_external_id=None, function_secrets=None, force=False, force_function=False, force_workflows=False, dry_run=False, debug_save_zip=None, deploy_data_product_sync=False, data_product_sync_cron='0 * * * *')

Deploy functions, workflows, and triggers using embedded function code.

View configs are loaded from views/*.yaml files in the settings directory. When settings.config_source is "dataproduct", the SHACL .ttl files are published to the RuleSet API and view configs are published to the DataProduct API instead of being uploaded as CDF Files. When config_source is "yaml" (default), SHACL and view configs are uploaded as CDF Files as usual.

When filter_views_by_shacl_rules is True, only views referenced via sh:targetClass in their SHACL rules are deployed.

This function also ensures all required DMS containers exist in CDF before deploying.

Parameters:

Name Type Description Default
client CogniteClient

Cognite client for CDF API access.

required
settings_path Path | str

Path to the settings.yaml file for this environment.

required
views_dir Path | str | None

Directory containing view YAML configs. Defaults to settings_path.parent / "views".

None
filter_views_by_shacl_rules bool

If True, restrict deployment to views that appear as sh:targetClass targets in the SHACL rules file. Default False.

False
timeseries_dir Path | str | None

Directory containing time series validation YAML configs. Defaults to settings.timeseries.config_dir from settings.yaml.

None
shacl_rules_dir Path | str | None

Directory containing SHACL .ttl rule files. Defaults to settings.shacl_rules.source_dir from settings.yaml.

None
function_external_id str | None

Override for the deployed function external ID. Defaults to "data-quality-validation".

None
function_secrets dict[str, str] | None

Secrets to inject into the function at creation time, e.g. {"client-id": "...", "client-secret": "..."}. If not provided, falls back to the COGNITE_CLIENT_ID / COGNITE_CLIENT_SECRET environment variables.

None
force bool

Force redeployment of both functions and workflows even when content hashes are unchanged.

False
force_function bool

Force redeployment of the function only; workflows are still skipped if their hashes are unchanged.

False
force_workflows bool

Force redeployment of workflows only; the function is still skipped if its hash is unchanged.

False
dry_run bool

Preview all changes without making any CDF API calls.

False
debug_save_zip Path | str | None

Directory path to save each function zip archive for inspection (e.g. "dist/debug-zips").

None
deploy_data_product_sync bool

Deploy a scheduled workflow that discovers all DataProducts with quality rules and auto-deploys validation workflows for them.

False
data_product_sync_cron str

Cron expression for the data product sync trigger. Default: "0 * * * *" (hourly).

'0 * * * *'

Returns:

Type Description
dict[str, object]

dict with keys "functions", "workflows",

dict[str, object]

"timeseries_workflows", "raw_tables", and optionally

dict[str, object]

"data_product_sync", each containing a list of per-item

dict[str, object]

deployment result dicts with at minimum "status"

dict[str, object]

("deployed", "skipped", or "dry_run").

Source code in cognite_data_quality/deploy.py
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
def deploy_validation_infrastructure(
    *,
    client: CogniteClient,
    settings_path: Path | str,
    views_dir: Path | str | None = None,
    filter_views_by_shacl_rules: bool = False,
    timeseries_dir: Path | str | None = None,
    shacl_rules_dir: Path | str | None = None,
    function_external_id: str | None = None,
    function_secrets: dict[str, str] | None = None,
    force: bool = False,
    force_function: bool = False,
    force_workflows: bool = False,
    dry_run: bool = False,
    debug_save_zip: Path | str | None = None,
    deploy_data_product_sync: bool = False,
    data_product_sync_cron: str = "0 * * * *",
) -> dict[str, object]:
    """Deploy functions, workflows, and triggers using embedded function code.

    View configs are loaded from ``views/*.yaml`` files in the settings
    directory.  When ``settings.config_source`` is ``"dataproduct"``, the
    SHACL ``.ttl`` files are published to the RuleSet API and view configs
    are published to the DataProduct API instead of being uploaded as CDF
    Files.  When ``config_source`` is ``"yaml"`` (default), SHACL and view
    configs are uploaded as CDF Files as usual.

    When *filter_views_by_shacl_rules* is ``True``, only views referenced
    via ``sh:targetClass`` in their SHACL rules are deployed.

    This function also ensures all required DMS containers exist in CDF
    before deploying.

    Args:
        client: Cognite client for CDF API access.
        settings_path: Path to the ``settings.yaml`` file for this environment.
        views_dir: Directory containing view YAML configs.  Defaults to
            ``settings_path.parent / "views"``.
        filter_views_by_shacl_rules: If ``True``, restrict deployment to
            views that appear as ``sh:targetClass`` targets in the SHACL
            rules file.  Default ``False``.
        timeseries_dir: Directory containing time series validation YAML
            configs.  Defaults to ``settings.timeseries.config_dir`` from
            ``settings.yaml``.
        shacl_rules_dir: Directory containing SHACL ``.ttl`` rule files.
            Defaults to ``settings.shacl_rules.source_dir`` from
            ``settings.yaml``.
        function_external_id: Override for the deployed function external ID.
            Defaults to ``"data-quality-validation"``.
        function_secrets: Secrets to inject into the function at creation
            time, e.g. ``{"client-id": "...", "client-secret": "..."}``.
            If not provided, falls back to the ``COGNITE_CLIENT_ID`` /
            ``COGNITE_CLIENT_SECRET`` environment variables.
        force: Force redeployment of both functions and workflows even when
            content hashes are unchanged.
        force_function: Force redeployment of the function only; workflows
            are still skipped if their hashes are unchanged.
        force_workflows: Force redeployment of workflows only; the function
            is still skipped if its hash is unchanged.
        dry_run: Preview all changes without making any CDF API calls.
        debug_save_zip: Directory path to save each function zip archive for
            inspection (e.g. ``"dist/debug-zips"``).
        deploy_data_product_sync: Deploy a scheduled workflow that
            discovers all DataProducts with quality rules and
            auto-deploys validation workflows for them.
        data_product_sync_cron: Cron expression for the data product
            sync trigger.  Default: ``"0 * * * *"`` (hourly).

    Returns:
        dict with keys ``"functions"``, ``"workflows"``,
        ``"timeseries_workflows"``, ``"raw_tables"``, and optionally
        ``"data_product_sync"``, each containing a list of per-item
        deployment result dicts with at minimum ``"status"``
        (``"deployed"``, ``"skipped"``, or ``"dry_run"``).
    """
    settings_path = Path(settings_path)
    settings = load_settings(settings_path)
    base_dir = settings_path.parent

    # Override external_id if provided
    if function_external_id is not None:
        settings.function.external_id = function_external_id

    # Ensure all data quality infrastructure containers exist
    if not dry_run:
        print("Ensuring data quality infrastructure containers...")

        # Ensure space exists once (all containers use the same space)
        from cognite_data_quality._containers import _ensure_space_exists

        space = settings.records.space
        _ensure_space_exists(client, space)

        # Records container (for validation results)
        print(f"  Records: {settings.records.space}/{settings.records.container}")
        _ensure_records_container(client, settings.records.space, settings.records.container)

        # State containers (for orchestration and function state tracking)
        from cognite_data_quality._containers import (
            _ensure_data_product_sync_state_container,
            _ensure_data_quality_settings_container,
            _ensure_function_validation_state_container,
            _ensure_orchestration_state_container,
            _ensure_raw_validation_state_container,
            _upsert_settings_to_dms,
        )

        print(f"  Orchestration: {space}/OrchestrationState")
        _ensure_orchestration_state_container(client, space, "OrchestrationState")

        print(f"  Function State: {space}/FunctionValidationState")
        _ensure_function_validation_state_container(client, space, "FunctionValidationState")

        print(f"  RAW Validation State: {space}/RawValidationState")
        _ensure_raw_validation_state_container(client, space, "RawValidationState")

        print(f"  DataQuality Settings: {space}/DataQualitySettings")
        _ensure_data_quality_settings_container(client, space, "DataQualitySettings")

        print(f"  DataProduct Sync State: {space}/DataProductSyncState")
        _ensure_data_product_sync_state_container(client, space, "DataProductSyncState")

        # Write deployment settings to DMS for runtime access by sync handler
        _upsert_settings_to_dms(client, space, "DataQualitySettings", settings)

        # Deploy monitoring data models and views (for UI access to state containers)
        print("\nDeploying data quality monitoring data model...")
        deploy_data_models(
            client=client,
            space=space,
            containers=[
                "FunctionValidationState",
                "OrchestrationState",
                "RawValidationState",
                "DataQualitySettings",
                "DataProductSyncState",
            ],
            data_model_id="DataQualityMonitoring",
            data_model_version="1",
            dry_run=False,
        )

    # shacl_rules.source_dir is relative to settings file directory
    shacl_dir = (
        _resolve_path(base_dir, shacl_rules_dir)
        if shacl_rules_dir is not None
        else _resolve_path(base_dir, settings.shacl_rules.source_dir)
    )
    if timeseries_dir is None and settings.timeseries is not None:
        timeseries_dir = settings.timeseries.config_dir
    resolved_timeseries_dir = _resolve_path(base_dir, timeseries_dir) if timeseries_dir is not None else None

    resolved_views_dir = _resolve_path(base_dir, views_dir or "views")
    view_configs = load_view_configs(resolved_views_dir)

    if settings.config_source == "dataproduct":
        from collections import defaultdict

        from cognite_data_quality._dataproduct_publisher import (
            publish_shacl_to_ruleset,
            publish_view_config_to_data_product,
        )

        print("\nPublishing SHACL rules to RuleSet API and views to DataProduct API...")

        # Step 1: publish each view's SHACL to its RuleSet; rewrite shacl_rules
        # to use ruleset_references.  A view's rs_version is taken from its
        # records.rule_set_version if set, otherwise falls back to the first
        # DataProduct version it belongs to (or "1.0.0" as a safe default).
        for vc in view_configs:
            # Skip views already using ruleset_references (already migrated).
            if vc.shacl_rules.ruleset_references:
                continue
            if not vc.shacl_rules.file or not vc.shacl_rules.external_id:
                continue
            rs_ext_id = vc.shacl_rules.external_id
            if vc.dataproducts:
                fallback_version = vc.dataproducts[0].version
            else:
                fallback_version = "1.0.0"
            rs_version = vc.records.rule_set_version or fallback_version

            if not dry_run:
                publish_shacl_to_ruleset(
                    client,
                    shacl_path=shacl_dir / vc.shacl_rules.file,
                    ruleset_ext_id=rs_ext_id,
                    version=rs_version,
                    create_ruleset=True,
                    ruleset_name=f"{vc.view.external_id} SHACL Rules",
                )
                print(f"  Published RuleSet: {rs_ext_id}@{rs_version}")
            else:
                print(f"  [dry-run] Would publish RuleSet: {rs_ext_id}@{rs_version}")

            vc.shacl_rules = ShaclRulesRef(ruleset_references=[{"externalId": rs_ext_id, "version": rs_version}])

        # Step 2: group views by (dp_external_id, dp_version) and publish each DP.
        # A view may appear in multiple DataProducts.  Also track the first dp_ref
        # seen for each key to capture schema_space.
        dp_groups: dict[tuple[str, str], list[ViewConfig]] = defaultdict(list)
        dp_refs: dict[tuple[str, str], DataProductRef] = {}
        for vc in view_configs:
            for dp_ref in vc.dataproducts:
                key = (dp_ref.external_id, dp_ref.version)
                dp_groups[key].append(vc)
                dp_refs.setdefault(key, dp_ref)

        if not dp_groups:
            print(
                "  Warning: no views have a 'dataproducts' list — "
                "no DataProducts will be published.  Add 'dataproducts:' "
                "to each view YAML to specify which DataProduct(s) it belongs to."
            )

        for (dp_ext_id, dp_version), dp_views in dp_groups.items():
            ref = dp_refs[(dp_ext_id, dp_version)]
            if not dry_run:
                publish_view_config_to_data_product(
                    client,
                    view_configs=dp_views,
                    dp_ext_id=dp_ext_id,
                    dp_version=dp_version,
                    create_dp=True,
                    schema_space=ref.schema_space,
                    status="published",
                )
                print(f"  Published DataProduct: {dp_ext_id}@{dp_version} ({len(dp_views)} view(s))")
            else:
                print(f"  [dry-run] Would publish DataProduct: {dp_ext_id}@{dp_version} ({len(dp_views)} view(s))")

        effective_shacl_dir = None  # no CDF File uploads in dataproduct mode
    else:
        effective_shacl_dir = shacl_dir

    if filter_views_by_shacl_rules and view_configs:
        target_views: set[str] = set()
        for vc in view_configs:
            if vc.shacl_rules.file and effective_shacl_dir:
                target_views |= extract_target_views_from_shacl(effective_shacl_dir / vc.shacl_rules.file)
        view_configs = [vc for vc in view_configs if vc.view.external_id in target_views]

    timeseries_configs = (
        load_timeseries_configs(resolved_timeseries_dir)
        if resolved_timeseries_dir is not None and resolved_timeseries_dir.exists()
        else []
    )

    # Load RAW table configs from tables/ directory
    resolved_tables_dir = _resolve_path(base_dir, "tables")
    table_configs = load_table_configs(resolved_tables_dir) if resolved_tables_dir.exists() else []

    function_results = deploy_functions(
        client=client,
        settings=settings,
        force=force or force_function,
        dry_run=dry_run,
        debug_save_zip=debug_save_zip,
        secrets=function_secrets,
    )
    if not dry_run:
        _wait_for_functions_ready(client, [settings.function.external_id])
    workflow_results = deploy_instance_workflows(
        client=client,
        settings=settings,
        views=view_configs,
        shacl_dir=effective_shacl_dir,
        force=force or force_workflows,
        dry_run=dry_run,
    )
    timeseries_results = deploy_timeseries_workflows(
        client=client,
        settings=settings,
        configs=timeseries_configs,
        shacl_dir=shacl_dir,
        force=force or force_workflows,
        dry_run=dry_run,
    )

    # Deploy RAW table SHACL rules
    table_results = deploy_raw_table_shacl(
        client=client,
        settings=settings,
        tables=table_configs,
        shacl_dir=shacl_dir,
        dry_run=dry_run,
    )

    # Deploy DataProduct sync workflow (discovers all DPs and auto-deploys validation)
    dp_sync_results: list[dict[str, object]] = []
    if deploy_data_product_sync:
        dp_sync_results = _deploy_data_product_sync_workflow(
            client=client,
            settings=settings,
            cron=data_product_sync_cron,
            dry_run=dry_run,
        )

    return {
        "functions": function_results,
        "workflows": workflow_results,
        "timeseries_workflows": timeseries_results,
        "raw_tables": table_results,
        "data_product_sync": dp_sync_results,
    }

load_settings(path)

Load deployment settings from a YAML file.

Parameters:

Name Type Description Default
path Path | str

Path to the settings.yaml file.

required

Returns:

Name Type Description
Parsed DeploymentSettings

class:DeploymentSettings object.

Source code in cognite_data_quality/deploy.py
def load_settings(path: Path | str) -> DeploymentSettings:
    """Load deployment settings from a YAML file.

    Args:
        path: Path to the ``settings.yaml`` file.

    Returns:
        Parsed :class:`DeploymentSettings` object.
    """
    settings_path = Path(path)
    data = yaml.safe_load(settings_path.read_text(encoding="utf-8")) or {}
    return DeploymentSettings(**data)

load_table_configs(path)

Load RAW table validation configs from a directory.

Reads all *.yaml files in path and parses them as :class:RawTableConfig objects. Returns an empty list if the directory does not exist.

Parameters:

Name Type Description Default
path Path | str

Directory containing RAW table YAML config files.

required

Returns:

Type Description
list[RawTableConfig]

List of :class:RawTableConfig objects, sorted by filename.

list[RawTableConfig]

Empty list if path does not exist.

Source code in cognite_data_quality/deploy.py
def load_table_configs(path: Path | str) -> list[RawTableConfig]:
    """Load RAW table validation configs from a directory.

    Reads all ``*.yaml`` files in *path* and parses them as
    :class:`RawTableConfig` objects.  Returns an empty list if the
    directory does not exist.

    Args:
        path: Directory containing RAW table YAML config files.

    Returns:
        List of :class:`RawTableConfig` objects, sorted by filename.
        Empty list if *path* does not exist.
    """
    config_dir = Path(path)
    if not config_dir.exists():
        return []
    configs: list[RawTableConfig] = []
    for config_file in sorted(config_dir.glob("*.yaml")):
        data = yaml.safe_load(config_file.read_text(encoding="utf-8")) or {}
        data["_filename"] = config_file.name
        configs.append(RawTableConfig(**data))
    return configs

load_timeseries_configs(path)

Load time series validation configs from a directory.

Reads all *.yaml files in path and parses them as :class:TimeseriesConfig objects.

Parameters:

Name Type Description Default
path Path | str

Directory containing time series YAML config files.

required

Returns:

Type Description
list[TimeseriesConfig]

List of :class:TimeseriesConfig objects, sorted by filename.

Source code in cognite_data_quality/deploy.py
def load_timeseries_configs(path: Path | str) -> list[TimeseriesConfig]:
    """Load time series validation configs from a directory.

    Reads all ``*.yaml`` files in *path* and parses them as
    :class:`TimeseriesConfig` objects.

    Args:
        path: Directory containing time series YAML config files.

    Returns:
        List of :class:`TimeseriesConfig` objects, sorted by filename.
    """
    config_dir = Path(path)
    configs: list[TimeseriesConfig] = []
    for config_file in sorted(config_dir.glob("*.yaml")):
        data = yaml.safe_load(config_file.read_text(encoding="utf-8")) or {}
        data["_filename"] = config_file.name
        configs.append(TimeseriesConfig(**data))
    return configs

load_view_configs(path)

Load instance validation view configs from a directory.

Reads all *.yaml files in path and parses them as :class:ViewConfig objects.

Parameters:

Name Type Description Default
path Path | str

Directory containing view YAML config files.

required

Returns:

Type Description
list[ViewConfig]

List of :class:ViewConfig objects, sorted by filename.

Source code in cognite_data_quality/deploy.py
def load_view_configs(path: Path | str) -> list[ViewConfig]:
    """Load instance validation view configs from a directory.

    Reads all ``*.yaml`` files in *path* and parses them as
    :class:`ViewConfig` objects.

    Args:
        path: Directory containing view YAML config files.

    Returns:
        List of :class:`ViewConfig` objects, sorted by filename.
    """
    config_dir = Path(path)
    configs: list[ViewConfig] = []
    for config_file in sorted(config_dir.glob("*.yaml")):
        data = yaml.safe_load(config_file.read_text(encoding="utf-8")) or {}
        data["_filename"] = config_file.name
        configs.append(ViewConfig(**data))
    return configs