Skip to content

Deployment API

The cognite_data_quality.deploy module provides a unified API for deploying data quality validation infrastructure to CDF. Function handler code is embedded in the package; no separate function/ directory or function_source_root is required.

Overview

  • deploy_validation_infrastructure() – Ensures Records and state containers (OrchestrationState, FunctionValidationState, RuleEngineResult), deploys the monitoring data model and views, the unified validation function, instance and (optionally) time series workflows and triggers.
  • deploy_validation_pipeline() – Runs the full validation pipeline for a view (historic partitions, sync trigger, monitor schedule). Call after infrastructure is deployed.
  • deploy_incremental() – Deploys function and workflow for a single view config (alternative to full infrastructure deploy).
  • Component helpers: deploy_functions(), deploy_instance_workflows(), deploy_timeseries_workflows(), deploy_data_models().

When SHACL/RuleSet contains dqs:uniquenessConstraint or dqs:unique for a view, deploy_instance_workflows() also creates a scheduled dq-{view}-uniqueness workflow. See Uniqueness.

DataProduct mode (0.4.0+): consolidated workflows

When config_source: dataproduct (default), deployment uses one instance workflow and one uniqueness workflow per DataProduct, with a DMS trigger per view. View-specific configuration is passed via trigger input and resolved in the workflow task as ${workflow.input.*}.

Manual cutover from per-view workflows (no auto-migration):

  1. Delete legacy per-view workflows (dq-shacl-{view}, dq-shacl-{view}-uniqueness) and their associated triggers.
  2. Deploy package 0.4.0 (function + settings).
  3. Run deploy_validation_infrastructure() or wait for data_product_sync to deploy consolidated workflows.

Consolidated deploy creates DMS triggers in no_backfill mode and does not enqueue historic jobs. Existing static data is only validated if you run the historic orchestrator manually per view.

Set workflow.consolidate_per_dataproduct: false to keep legacy per-view workflow deployment in dataproduct mode.

Main Functions

deploy_validation_infrastructure()

Unified deployment of all validation infrastructure (recommended). Uses settings_path and views_dir; function code is taken from the package.

from pathlib import Path
from cognite_data_quality import deploy_validation_infrastructure, load_cognite_client_from_toml

client = load_cognite_client_from_toml("config.toml")

settings_path = Path("config/environments/my_env/settings.yaml")
views_dir = Path("config/environments/my_env/views")

deploy_validation_infrastructure(
    client=client,
    settings_path=settings_path,
    views_dir=views_dir,
    function_secrets={"client-id": "...", "client-secret": "..."},  # optional but required for orchestrator triggers
    force=False,
    dry_run=False,
)

deploy_validation_pipeline()

Deploy and run the full validation pipeline for a view (historic data, sync trigger, monitor schedule):

from cognite_data_quality import deploy_validation_pipeline

result = deploy_validation_pipeline(
    client,
    settings_path="config/environments/my_env/settings.yaml",
    view_external_id="MyView",
    wait=True,
)
# result: orchestration_id, partitions_triggered, sync_trigger_external_id, monitor_schedule_name, distribution, ...

deploy_incremental()

Deploy function and workflow for a single view config (takes view config path and optional config_env):

from cognite_data_quality import deploy_incremental

deploy_incremental(
    view_config_path="config/environments/my_env/views/my_view.yaml",
    client=client,
    force=False,
    dry_run=False,
)

Component-Specific Functions

For fine-grained control, you can deploy individual components:

deploy_functions()

Deploy Cognite Functions:

from cognite_data_quality import deploy_functions, load_settings

settings = load_settings("config/environments/my_env/settings.yaml")

deploy_functions(
    client=client,
    settings=settings,
    env_name="my_env",
    force=False,
)

deploy_instance_workflows()

Deploy instance validation workflows:

from cognite_data_quality import deploy_instance_workflows, load_view_configs

view_configs = load_view_configs("config/environments/my_env/views")

deploy_instance_workflows(
    client=client,
    view_configs=view_configs,
    settings=settings,
    env_name="my_env",
    skip_trigger=False,
)

deploy_timeseries_workflows()

Deploy time series validation workflows (scheduled WorkflowScheduledTriggerRule per config schedule.cron):

from cognite_data_quality import deploy_timeseries_workflows, load_timeseries_configs

ts_configs = load_timeseries_configs("config/environments/my_env/timeseries")

deploy_timeseries_workflows(
    client=client,
    settings=settings,
    configs=ts_configs,
    shacl_dir=Path("config/environments/my_env/shacl_rules"),  # None in Data Product mode
    dp_version="1.0.0",  # optional; written as [dp:{version}] workflow tag
)

When configs use shacl_rules.ruleset_references, the task payload passes RuleSet refs to the handler (no CDF File upload). rule_set_id / rule_set_version in records_config default to the RuleSet external ID and Data Product semver when driven by Data Product context.

In Data Product mode, publish is idempotent and may reuse an existing version when payload is unchanged. Always treat the returned/resolved RuleSet/DataProduct version as source of truth for downstream references.

deploy_data_models()

Deploy data models (optional):

from cognite_data_quality import deploy_data_models, DataModelId

data_models = [
    DataModelId(space="my_space", external_id="MyDataModel", version="v1"),
]

deploy_data_models(
    client=client,
    data_models=data_models,
    source_dir="data_models",
)

Configuration Loaders

load_settings()

Load environment settings from YAML:

from cognite_data_quality import load_settings, DeploymentSettings

settings: DeploymentSettings = load_settings("config/environments/my_env/settings.yaml")

load_view_configs()

Load instance validation configurations:

from cognite_data_quality import load_view_configs

configs = load_view_configs("config/environments/my_env/views")
# Returns: dict[str, ViewConfig]

load_timeseries_configs()

Load time series validation configurations:

from cognite_data_quality import load_timeseries_configs

configs = load_timeseries_configs("config/environments/my_env/timeseries")
# Returns: list[TimeseriesConfig] — supports dataproducts: and ruleset_references

load_views_from_cdf()

Load views directly from CDF DMS:

from cognite_data_quality import load_views_from_cdf

views = load_views_from_cdf(
    client=client,
    space="my_space",
    data_model_external_id="MyDataModel",
    version="v1",
)

Data Classes

DeploymentSettings

Environment-specific deployment settings loaded from settings.yaml.

DataModelId

Identifier for a CDF data model:

from cognite_data_quality import DataModelId

dm_id = DataModelId(
    space="my_space",
    external_id="MyDataModel",
    version="v1",
)

Module Reference

cognite_data_quality.deploy

CredentialConfig

Bases: BaseModel

Client credential pair for runtime operations.

Source code in cognite_data_quality/deploy.py
class CredentialConfig(BaseModel):
    """Client credential pair for runtime operations."""

    client_id: str
    client_secret: str

DataProductRef

Bases: BaseModel

Reference to a CDF DataProduct + version that a view belongs to.

Used inside :attr:ViewConfig.dataproducts to declare which DataProducts should include this view when config_source: "dataproduct" is active.

Attributes:

Name Type Description
external_id str

External ID of the DataProduct.

version str

Semantic version string of the DataProduct version to publish. Default: "1.0.0".

schema_space str | None

DMS space for the DataProduct's schemaSpace field. When omitted, the first view's space is used.

datamodel_external_id str | None

Deprecated — the DataProducts API no longer uses a dataModel wrapper. Ignored.

Source code in cognite_data_quality/deploy.py
class DataProductRef(BaseModel):
    """Reference to a CDF DataProduct + version that a view belongs to.

    Used inside :attr:`ViewConfig.dataproducts` to declare which DataProducts
    should include this view when ``config_source: "dataproduct"`` is active.

    Attributes:
        external_id: External ID of the DataProduct.
        version: Semantic version string of the DataProduct version to publish.
            Default: ``"1.0.0"``.
        schema_space: DMS space for the DataProduct's ``schemaSpace`` field.
            When omitted, the first view's space is used.
        datamodel_external_id: Deprecated — the DataProducts API no longer
            uses a ``dataModel`` wrapper.  Ignored.
    """

    external_id: str
    version: str = "1.0.0"
    schema_space: str | None = None
    datamodel_external_id: str | None = None  # Deprecated: the DataProducts API no longer uses a dataModel wrapper.

DeploymentSettings

Bases: BaseModel

Top-level deployment settings loaded from settings.yaml.

Attributes:

Name Type Description
function FunctionSettings | None

Cognite Function settings. Auto-populated with defaults if omitted from the YAML file.

workflow WorkflowSettings

Workflow settings for instance validation workflows.

timeseries_workflow WorkflowSettings | None

Separate workflow settings for time series workflows. Falls back to workflow when None.

trigger TriggerSettings

Data-change trigger settings for instance validation.

records RecordsSettings

Default Records API coordinates (space, container, stream).

shacl_rules ShaclRulesSettings | None

Location settings for SHACL rule files.

timeseries TimeSeriesSettings | None

Time series config directory settings. None if no time series validation is configured.

rule_engine_result_sync RuleEngineResultSyncSettings | None

RuleEngineResult listener config directory settings. None if no listener workflows are configured.

partition_retries int

Maximum partition retry attempts managed by the orchestrator function (no CDF API limit). Default: 3.

function_code_dataset_external_id str | None

Dataset for function code zip uploads. Required when CDF enforces dataset-scoped file access.

config_source Literal['yaml', 'dataproduct']

Where view configs are sourced from. "yaml" (default) reads views/*.yaml and uploads SHACL to CDF Files. "dataproduct" reads the same YAML/TTL files but publishes them to the DataProduct and RuleSet APIs instead. Each view's dataproducts list controls which DataProducts it is included in.

external_dataproducts list[ExternalDataProductRef]

Pre-existing DataProducts in CDF (created by others) to deploy validation workflows for. No local view configs or TTL files are needed — the deploy function fetches the DataProduct from CDF and deploys workflows directly.

config_space str | None

DMS space for state and settings containers (DataQualitySettings, OrchestrationState, etc.). When None (default), falls back to records.space.

Source code in cognite_data_quality/deploy.py
class DeploymentSettings(BaseModel):
    """Top-level deployment settings loaded from ``settings.yaml``.

    Attributes:
        function: Cognite Function settings. Auto-populated with defaults
            if omitted from the YAML file.
        workflow: Workflow settings for instance validation workflows.
        timeseries_workflow: Separate workflow settings for time series
            workflows.  Falls back to *workflow* when ``None``.
        trigger: Data-change trigger settings for instance validation.
        records: Default Records API coordinates (space, container, stream).
        shacl_rules: Location settings for SHACL rule files.
        timeseries: Time series config directory settings. ``None`` if no
            time series validation is configured.
        rule_engine_result_sync: RuleEngineResult listener config directory
            settings. ``None`` if no listener workflows are configured.
        partition_retries: Maximum partition retry attempts managed by the
            orchestrator function (no CDF API limit). Default: 3.
        function_code_dataset_external_id: Dataset for function code zip
            uploads.  Required when CDF enforces dataset-scoped file access.
        config_source: Where view configs are sourced from.  ``"yaml"``
            (default) reads ``views/*.yaml`` and uploads SHACL to CDF Files.
            ``"dataproduct"`` reads the same YAML/TTL files but publishes them
            to the DataProduct and RuleSet APIs instead.  Each view's
            ``dataproducts`` list controls which DataProducts it is included in.
        external_dataproducts: Pre-existing DataProducts in CDF (created by
            others) to deploy validation workflows for.  No local view configs
            or TTL files are needed — the deploy function fetches the
            DataProduct from CDF and deploys workflows directly.
        config_space: DMS space for state and settings containers
            (``DataQualitySettings``, ``OrchestrationState``, etc.).
            When ``None`` (default), falls back to ``records.space``.
    """

    function: FunctionSettings | None = None
    workflow: WorkflowSettings
    timeseries_workflow: WorkflowSettings | None = None
    trigger: TriggerSettings
    records: RecordsSettings = Field(default_factory=RecordsSettings)
    shacl_rules: ShaclRulesSettings | None = None
    timeseries: TimeSeriesSettings | None = None
    rule_engine_result_sync: RuleEngineResultSyncSettings | None = None
    partition_retries: int = Field(default=3, description="Orchestrator partition retry limit (no CDF API constraint)")
    function_code_dataset_external_id: str | None = Field(
        default=None,
        description="Dataset for function code uploads. Required if CDF enforces dataset-scoped file access.",
    )
    config_source: Literal["yaml", "dataproduct"] = Field(
        default="yaml",
        description=(
            "Config source: 'yaml' uploads SHACL to CDF Files; 'dataproduct' publishes to RuleSet/DataProduct APIs."
        ),
    )
    external_dataproducts: list[ExternalDataProductRef] = Field(
        default_factory=list,
        description=(
            "Pre-existing DataProducts in CDF to deploy workflows for. "
            "Rules and DataProduct structure must already exist (created externally). "
            "Use version='latest' to auto-pick the newest published version."
        ),
    )
    config_space: str | None = Field(
        default=None,
        description=(
            "DMS space that hosts state and settings containers "
            "(DataQualitySettings, OrchestrationState, FunctionValidationState, etc.). "
            "Defaults to records.space when not set."
        ),
    )
    runtime_credentials: RuntimeCredentialsSettings | None = Field(
        default=None,
        description=(
            "Optional credential groups for runtime resources. "
            "When omitted, deployment client credentials are used as fallback."
        ),
    )
    validation_lock: ValidationLockSettings = Field(default_factory=ValidationLockSettings)

    @property
    def effective_config_space(self) -> str:
        """Space for state/settings containers. Falls back to records.space if config_space is unset."""
        return self.config_space or (self.records.space if self.records else "dataQuality")

    @model_validator(mode="after")
    def require_shacl_rules_in_yaml_mode(self) -> DeploymentSettings:
        if self.config_source == "yaml" and self.shacl_rules is None:
            raise ValueError(
                "'shacl_rules' is required when config_source is 'yaml'. "
                "Add a 'shacl_rules:' section to settings.yaml or set config_source: 'dataproduct'."
            )
        return self

    def model_post_init(self, __context) -> None:
        """Ensure function settings are populated with defaults if not provided."""
        if self.function is None:
            self.function = FunctionSettings()
        if self.config_source == "dataproduct" and not self.workflow.consolidate_per_dataproduct:
            self.workflow.consolidate_per_dataproduct = True

effective_config_space property

Space for state/settings containers. Falls back to records.space if config_space is unset.

model_post_init(__context)

Ensure function settings are populated with defaults if not provided.

Source code in cognite_data_quality/deploy.py
def model_post_init(self, __context) -> None:
    """Ensure function settings are populated with defaults if not provided."""
    if self.function is None:
        self.function = FunctionSettings()
    if self.config_source == "dataproduct" and not self.workflow.consolidate_per_dataproduct:
        self.workflow.consolidate_per_dataproduct = True

ExternalDataProductRef

Bases: BaseModel

Reference to a pre-existing DataProduct in CDF to deploy workflows for.

Used inside :attr:DeploymentSettings.external_dataproducts to declare DataProducts that are owned by others (already published to the DataProduct and RuleSet APIs). The deploy function fetches the DataProduct from CDF, builds :class:ViewConfig objects from it, and deploys instance validation workflows — no local YAML view configs or TTL files are required.

Attributes:

Name Type Description
external_id str

External ID of the DataProduct.

version str

Semantic version string, or "latest" to auto-pick the newest published version. Default: "latest".

Source code in cognite_data_quality/deploy.py
class ExternalDataProductRef(BaseModel):
    """Reference to a pre-existing DataProduct in CDF to deploy workflows for.

    Used inside :attr:`DeploymentSettings.external_dataproducts` to declare
    DataProducts that are owned by others (already published to the DataProduct
    and RuleSet APIs).  The deploy function fetches the DataProduct from CDF,
    builds :class:`ViewConfig` objects from it, and deploys instance validation
    workflows — no local YAML view configs or TTL files are required.

    Attributes:
        external_id: External ID of the DataProduct.
        version: Semantic version string, or ``"latest"`` to auto-pick the
            newest published version.  Default: ``"latest"``.
    """

    external_id: str
    version: str = "latest"

FunctionSettings

Bases: BaseModel

Cognite Function settings.

Attributes:

Name Type Description
external_id str

CDF external ID for the function. Default: "data-quality-validation".

name str

Display name shown in CDF Console. Default: "Data Quality Validation".

description str

Short description stored on the function resource.

runtime str

Python runtime version string (e.g. "py311").

source_dir str

Local directory name used as the function root inside the zip archive.

include_files list[str]

List of paths (relative to source_dir) to bundle.

Source code in cognite_data_quality/deploy.py
class FunctionSettings(BaseModel):
    """Cognite Function settings.

    Attributes:
        external_id: CDF external ID for the function. Default: ``"data-quality-validation"``.
        name: Display name shown in CDF Console. Default: ``"Data Quality Validation"``.
        description: Short description stored on the function resource.
        runtime: Python runtime version string (e.g. ``"py311"``).
        source_dir: Local directory name used as the function root inside the zip archive.
        include_files: List of paths (relative to *source_dir*) to bundle.
    """

    external_id: str = Field(default="data-quality-validation")
    name: str = Field(default="Data Quality Validation")
    description: str = Field(
        default="Unified function for all SHACL validation types (instance, partitioned, timeseries, orchestrator)"
    )
    runtime: str = Field(default="py311")
    source_dir: str = Field(default="function")
    include_files: list[str] = Field(
        default_factory=lambda: [
            "handler.py",
            "handlers/__init__.py",
            "handlers/instance_validation.py",
            "handlers/partitioned_validation.py",
            "handlers/raw_validation.py",
            "handlers/timeseries_validation.py",
            "handlers/orchestrator.py",
            "handlers/orchestration_status.py",
            "handlers/test_rule.py",
            "handlers/data_product_sync.py",
            "handlers/historic_queue_manager.py",
            "handlers/aggregate_uniqueness_validation.py",
            "handlers/rule_engine_result_sync.py",
            "handlers/shacl_validation.py",
            "common/__init__.py",
            "common/shacl_utils.py",
            "common/records_api.py",
            "common/rule_engine_records.py",
            "common/rule_engine_result_sync.py",
            "common/cursor_state.py",
            "common/time_window.py",
            "common/timestamp_collection_state.py",
            "common/view_config_state.py",
            "common/timeseries_config_state.py",
            "requirements.txt",
        ]
    )

RawTableConfig

Bases: BaseModel

Full configuration for one RAW table validation deployment.

Attributes:

Name Type Description
table RawTableRef

Reference to the CDF RAW database and table.

shacl_rules ShaclRulesRef

SHACL rules file reference.

validation ValidationSettings

Validation behaviour settings.

records RecordsOverrides

Per-table Records API overrides.

partition_count int

Number of partitions for historic cursor-based validation. Default: 5.

Source code in cognite_data_quality/deploy.py
class RawTableConfig(BaseModel):
    """Full configuration for one RAW table validation deployment.

    Attributes:
        table: Reference to the CDF RAW database and table.
        shacl_rules: SHACL rules file reference.
        validation: Validation behaviour settings.
        records: Per-table Records API overrides.
        partition_count: Number of partitions for historic cursor-based
            validation. Default: 5.
    """

    table: RawTableRef
    shacl_rules: ShaclRulesRef
    validation: ValidationSettings = Field(default_factory=ValidationSettings)
    records: RecordsOverrides = Field(default_factory=RecordsOverrides)
    partition_count: int = Field(default=5, description="Number of partitions for batch validation")
    _filename: str | None = None

RawTableRef

Bases: BaseModel

Reference to a CDF RAW table.

Attributes:

Name Type Description
database str

CDF RAW database name.

name str

CDF RAW table name within the database.

Source code in cognite_data_quality/deploy.py
class RawTableRef(BaseModel):
    """Reference to a CDF RAW table.

    Attributes:
        database: CDF RAW database name.
        name: CDF RAW table name within the database.
    """

    database: str
    name: str

RecordsOverrides

Bases: BaseModel

Per-view overrides for Records API coordinates.

All fields are optional; unset fields inherit the global :class:RecordsSettings values.

Attributes:

Name Type Description
rule_set_id str | None

Override the rule set ID written to each record.

rule_set_version str | None

Override the rule set version written to each record.

data_domain_external_id str | None

External ID of the data domain attached to each record.

use_instance_space bool

If True, write records into the same DMS space as the validated instance. Default: False.

records_space str | None

Explicit destination space for records. Only used when use_instance_space is False and overrides the global :attr:RecordsSettings.space.

Source code in cognite_data_quality/deploy.py
class RecordsOverrides(BaseModel):
    """Per-view overrides for Records API coordinates.

    All fields are optional; unset fields inherit the global
    :class:`RecordsSettings` values.

    Attributes:
        rule_set_id: Override the rule set ID written to each record.
        rule_set_version: Override the rule set version written to each record.
        data_domain_external_id: External ID of the data domain attached to
            each record.
        use_instance_space: If ``True``, write records into the same DMS
            space as the validated instance. Default: ``False``.
        records_space: Explicit destination space for records. Only used
            when *use_instance_space* is ``False`` and overrides the global
            :attr:`RecordsSettings.space`.
    """

    rule_set_id: str | None = None
    rule_set_version: str | None = None
    data_domain_external_id: str | None = None
    use_instance_space: bool = False
    records_space: str | None = None

RecordsSettings

Bases: BaseModel

Default Records API coordinates used by all deployed workflows.

The rule engine output container co-locates with the validation container in space — there is intentionally no separate space field.

Attributes:

Name Type Description
space str

CDF DMS space that hosts BOTH the validation records container and the rule engine results container. Default: "dataQuality".

container str

External ID of the DMS container that stores validation records. Default: "DataQualityValidationRecord".

stream_id str

External ID of the Records API stream for ingestion. Default: "dq_validation_stream".

rule_engine_container str

External ID of the DMS container that stores SHACL-AF rule engine output records (JSON inference payloads). Default: "RuleEngineResult".

rule_engine_stream_id str | None

Optional separate Records API stream for rule engine outputs. When None (default), rule engine results are written to the same stream as validation records.

Source code in cognite_data_quality/deploy.py
class RecordsSettings(BaseModel):
    """Default Records API coordinates used by all deployed workflows.

    The rule engine output container co-locates with the validation
    container in *space* — there is intentionally no separate space field.

    Attributes:
        space: CDF DMS space that hosts BOTH the validation records
            container and the rule engine results container.
            Default: ``"dataQuality"``.
        container: External ID of the DMS container that stores validation
            records. Default: ``"DataQualityValidationRecord"``.
        stream_id: External ID of the Records API stream for ingestion.
            Default: ``"dq_validation_stream"``.
        rule_engine_container: External ID of the DMS container that stores
            SHACL-AF rule engine output records (JSON inference payloads).
            Default: ``"RuleEngineResult"``.
        rule_engine_stream_id: Optional separate Records API stream for rule
            engine outputs.  When ``None`` (default), rule engine results are
            written to the same stream as validation records.
    """

    space: str = Field(default="dataQuality")
    container: str = Field(default="DataQualityValidationRecord")
    stream_id: str = Field(default="dq_validation_stream")
    rule_engine_container: str = Field(default="RuleEngineResult")
    rule_engine_stream_id: str | None = Field(default=None)

RuleEngineResultSyncConfig

Bases: BaseModel

Configuration for one RuleEngineResult incremental listener workflow.

Source code in cognite_data_quality/deploy.py
class RuleEngineResultSyncConfig(BaseModel):
    """Configuration for one RuleEngineResult incremental listener workflow."""

    name: str
    description: str | None = None
    listener_id: str | None = None
    stream_id: str | None = None
    source_mode: str = Field(default="sync")
    limit: int = Field(default=1000, ge=1, le=10_000)
    page_limit: int = Field(default=1000, ge=1, le=1000)
    initialize_cursor: str = Field(default="7d-ago")
    include_items: bool = False
    initial_watermark_ms: int | None = None
    records_space: str = Field(default="dataQuality")
    records_container: str = Field(default="RuleEngineResult")
    filter: RuleEngineResultSyncFilter = Field(default_factory=RuleEngineResultSyncFilter)
    schedule: ScheduleConfig = Field(default_factory=ScheduleConfig)
    _filename: str | None = None

RuleEngineResultSyncFilter

Bases: BaseModel

Filter contract for incremental RuleEngineResult consumption.

Source code in cognite_data_quality/deploy.py
class RuleEngineResultSyncFilter(BaseModel):
    """Filter contract for incremental RuleEngineResult consumption."""

    rule_ids: list[str] = Field(default_factory=list)
    rule_set_ids: list[str] = Field(default_factory=list)
    rule_set_versions: list[str] = Field(default_factory=list)
    focus_nodes: list[str] = Field(default_factory=list)
    focus_node_instances: list[str | dict[str, str]] = Field(default_factory=list)
    data_domain_external_ids: list[str] = Field(default_factory=list)

RuleEngineResultSyncSettings

Bases: BaseModel

RuleEngineResult listener config directory settings.

Source code in cognite_data_quality/deploy.py
class RuleEngineResultSyncSettings(BaseModel):
    """RuleEngineResult listener config directory settings."""

    config_dir: str = "rule_engine_result_sync"

RuntimeCredentialsSettings

Bases: BaseModel

Optional credential groups for runtime resources.

Source code in cognite_data_quality/deploy.py
class RuntimeCredentialsSettings(BaseModel):
    """Optional credential groups for runtime resources."""

    functions: CredentialConfig | None = None
    workflows: CredentialConfig | None = None
    schedules: CredentialConfig | None = None

ScheduleConfig

Bases: BaseModel

Cron-based schedule for time series validation workflows.

Attributes:

Name Type Description
cron str

Cron expression controlling when the workflow runs. Default: "0 6 * * *" (daily at 06:00 UTC).

Source code in cognite_data_quality/deploy.py
class ScheduleConfig(BaseModel):
    """Cron-based schedule for time series validation workflows.

    Attributes:
        cron: Cron expression controlling when the workflow runs.
            Default: ``"0 6 * * *"`` (daily at 06:00 UTC).
    """

    cron: str = Field(default="0 6 * * *")

ShaclRulesRef

Bases: BaseModel

Reference to SHACL rules -- either a local file or RuleSet API references.

Attributes:

Name Type Description
file str | None

Filename (relative to the SHACL source directory) of the .ttl rules file. Required when using file-based rules.

external_id str | None

CDF Files external ID under which the rules file is uploaded and later retrieved by the validation function. Required when using file-based rules.

ruleset_references list[dict] | None

List of {"externalId": ..., "version": ...} dicts pointing to CDF RuleSet API versions. When set, the validation handler fetches SHACL rules from the RuleSet API at runtime instead of from CDF Files.

Source code in cognite_data_quality/deploy.py
class ShaclRulesRef(BaseModel):
    """Reference to SHACL rules -- either a local file or RuleSet API references.

    Attributes:
        file: Filename (relative to the SHACL source directory) of the
            ``.ttl`` rules file.  Required when using file-based rules.
        external_id: CDF Files external ID under which the rules file is
            uploaded and later retrieved by the validation function.
            Required when using file-based rules.
        ruleset_references: List of ``{"externalId": ..., "version": ...}``
            dicts pointing to CDF RuleSet API versions.  When set, the
            validation handler fetches SHACL rules from the RuleSet API
            at runtime instead of from CDF Files.
    """

    file: str | None = None
    external_id: str | None = None
    ruleset_references: list[dict] | None = None

    @model_validator(mode="after")
    def _require_file_or_ruleset(self) -> ShaclRulesRef:
        if self.ruleset_references:
            return self
        if not self.file or not self.external_id:
            raise ValueError("Both 'file' and 'external_id' are required when 'ruleset_references' is not set")
        return self

ShaclRulesSettings

Bases: BaseModel

Location settings for SHACL .ttl rule files.

Attributes:

Name Type Description
source_dir str

Path (relative to settings.yaml) of the directory containing SHACL Turtle files.

mime_type str

MIME type used when uploading rule files to CDF Files. Default: "text/turtle".

dataset_external_id str | None

External ID of the CDF dataset to associate uploaded rule files with. None means no dataset is set.

Source code in cognite_data_quality/deploy.py
class ShaclRulesSettings(BaseModel):
    """Location settings for SHACL ``.ttl`` rule files.

    Attributes:
        source_dir: Path (relative to ``settings.yaml``) of the directory
            containing SHACL Turtle files.
        mime_type: MIME type used when uploading rule files to CDF Files.
            Default: ``"text/turtle"``.
        dataset_external_id: External ID of the CDF dataset to associate
            uploaded rule files with.  ``None`` means no dataset is set.
    """

    source_dir: str
    mime_type: str = Field(default="text/turtle")
    dataset_external_id: str | None = None

TimeSeriesSettings

Bases: BaseModel

Time series config directory settings.

Source code in cognite_data_quality/deploy.py
class TimeSeriesSettings(BaseModel):
    """Time series config directory settings."""

    config_dir: str

TimeseriesConfig

Bases: BaseModel

Full configuration for one time series validation workflow.

Attributes:

Name Type Description
name str

Unique name for this configuration; used as the workflow external ID suffix and display name.

description str | None

Optional human-readable description stored on the workflow resource.

filter TimeseriesFilter | None

Server-side DMS filter for selecting time series instances. Mutually exclusive with instance_ids.

instance_ids list[TimeseriesInstanceId] | None

Explicit list of time series instances to validate. Mutually exclusive with filter.

shacl_rules ShaclRulesRef

SHACL rules file reference.

datamodel ViewRef

Data model containing the time series view definition.

validation ValidationSettings

Validation behaviour settings.

records RecordsOverrides

Per-config Records API overrides.

schedule: Cron schedule for the validation trigger (WorkflowScheduledTriggerRule, same pattern as other scheduled DQ workflows). backfill: Optional backfill configuration dict with keys enabled, start_time, end_time, and window_minutes. dataproducts: DataProducts this timeseries config belongs to when config_source: "dataproduct" is active.

Source code in cognite_data_quality/deploy.py
class TimeseriesConfig(BaseModel):
    """Full configuration for one time series validation workflow.

    Attributes:
        name: Unique name for this configuration; used as the workflow
            external ID suffix and display name.
        description: Optional human-readable description stored on the
            workflow resource.
        filter: Server-side DMS filter for selecting time series instances.
            Mutually exclusive with *instance_ids*.
        instance_ids: Explicit list of time series instances to validate.
            Mutually exclusive with *filter*.
        shacl_rules: SHACL rules file reference.
        datamodel: Data model containing the time series view definition.
        validation: Validation behaviour settings.
        records: Per-config Records API overrides.
    schedule: Cron schedule for the validation trigger (``WorkflowScheduledTriggerRule``,
            same pattern as other scheduled DQ workflows).
        backfill: Optional backfill configuration dict with keys
            ``enabled``, ``start_time``, ``end_time``, and
            ``window_minutes``.
        dataproducts: DataProducts this timeseries config belongs to when
            ``config_source: "dataproduct"`` is active.
    """

    name: str
    description: str | None = None
    filter: TimeseriesFilter | None = None
    instance_ids: list[TimeseriesInstanceId] | None = None
    shacl_rules: ShaclRulesRef
    datamodel: ViewRef
    validation: ValidationSettings = Field(default_factory=ValidationSettings)
    records: RecordsOverrides = Field(default_factory=RecordsOverrides)
    schedule: ScheduleConfig = Field(default_factory=ScheduleConfig)
    backfill: dict | None = None
    dataproducts: list[DataProductRef] = Field(
        default_factory=list,
        description=(
            "DataProducts this timeseries config belongs to. Used when config_source='dataproduct' "
            "to publish SHACL rules and CogniteTimeSeries view entries into the DataProduct. "
            "A config can belong to multiple DataProducts; deploy writes one TimeseriesConfigState "
            "node per DataProduct."
        ),
    )
    _filename: str | None = None

TimeseriesFilter

Bases: BaseModel

Server-side filter for selecting time series instances to validate.

Attributes:

Name Type Description
space str | None

Restrict to instances in this CDF space.

external_id_prefix str | None

Restrict to instances whose external ID starts with this prefix.

Source code in cognite_data_quality/deploy.py
class TimeseriesFilter(BaseModel):
    """Server-side filter for selecting time series instances to validate.

    Attributes:
        space: Restrict to instances in this CDF space.
        external_id_prefix: Restrict to instances whose external ID starts
            with this prefix.
    """

    space: str | None = None
    external_id_prefix: str | None = None

TimeseriesInstanceId

Bases: BaseModel

Explicit instance reference for a single time series to validate.

Attributes:

Name Type Description
space str

CDF space containing the instance.

external_id str

External ID of the time series instance.

Source code in cognite_data_quality/deploy.py
class TimeseriesInstanceId(BaseModel):
    """Explicit instance reference for a single time series to validate.

    Attributes:
        space: CDF space containing the instance.
        external_id: External ID of the time series instance.
    """

    space: str
    external_id: str

TriggerSettings

Bases: BaseModel

Settings for data-change triggers on instance validation workflows.

Attributes:

Name Type Description
external_id_prefix str

Prefix used when constructing per-view trigger external IDs.

batch_size int

Maximum number of instances per trigger payload. Default: 10.

batch_timeout int

Seconds to wait before flushing a partial batch. Default: 60.

Source code in cognite_data_quality/deploy.py
class TriggerSettings(BaseModel):
    """Settings for data-change triggers on instance validation workflows.

    Attributes:
        external_id_prefix: Prefix used when constructing per-view trigger
            external IDs.
        batch_size: Maximum number of instances per trigger payload.
            Default: 10.
        batch_timeout: Seconds to wait before flushing a partial batch.
            Default: 60.
    """

    external_id_prefix: str = Field(default="dq-shacl-trigger")
    batch_size: int = Field(default=10)
    batch_timeout: int = Field(default=60)

ValidationLockSettings

Bases: BaseModel

Per-view sync execution lease tuning (escape hatch via enabled=False).

Source code in cognite_data_quality/deploy.py
class ValidationLockSettings(BaseModel):
    """Per-view sync execution lease tuning (escape hatch via enabled=False)."""

    enabled: bool = Field(default=True, description="Escape hatch only; lease is default-on in the handler")
    ttl_seconds: int = Field(default=600)
    heartbeat_seconds: int = Field(default=60)

ValidationSettings

Bases: BaseModel

Per-view validation behaviour settings.

Attributes:

Name Type Description
auto_load_depth int

Number of levels of referenced instances to auto-load before validation (0-3). Higher values increase coverage but reduce speed. Default: 2.

verbose bool

Enable verbose logging inside the validation function. Default: True.

Source code in cognite_data_quality/deploy.py
class ValidationSettings(BaseModel):
    """Per-view validation behaviour settings.

    Attributes:
        auto_load_depth: Number of levels of referenced instances to
            auto-load before validation (0-3).  Higher values increase
            coverage but reduce speed. Default: 2.
        verbose: Enable verbose logging inside the validation function.
            Default: ``True``.
    """

    auto_load_depth: int = Field(default=2)
    verbose: bool = Field(default=True)

ViewConfig

Bases: BaseModel

Full configuration for one instance-validation view.

Attributes:

Name Type Description
view ViewRef

Reference to the CDF view whose instances are validated.

instance_spaces list[str]

CDF spaces that contain the instances to validate. Accepts a single string (coerced to a list) or a list.

shacl_rules ShaclRulesRef | None

SHACL rules file reference.

validation ValidationSettings

Validation behaviour settings (depth, verbosity).

records RecordsOverrides

Per-view Records API overrides.

workflow_external_id str | None

Explicit CDF workflow external ID. If None, the ID is derived from settings.workflow.external_id_prefix and view.external_id.

trigger_external_id str | None

Explicit trigger external ID. Auto-derived when None.

partition_count int

Number of partitions for batch historic validation. Default: 10.

partition_field str

DMS property used to range-partition instances (e.g. "lastUpdatedTime"). Default: "lastUpdatedTime".

chunk_size int

Instances validated per function invocation. Default: 200.

sync_batch_size int | None

Overrides :attr:TriggerSettings.batch_size for this view's sync trigger. Falls back to the global value when None.

use_sync_cursor_mode bool

Use cursor-based sync instead of passing instances in the trigger payload. Requires max_concurrent_executions=1. Default: False.

max_concurrent_executions int | None

Maximum simultaneous CDF workflow executions. Default: 10.

historic_reprocess_on_update bool

Controls whether this view is auto-enqueued for historic processing when DataProduct/ruleset versions are updated by data_product_sync. Default: True.

Source code in cognite_data_quality/deploy.py
class ViewConfig(BaseModel):
    """Full configuration for one instance-validation view.

    Attributes:
        view: Reference to the CDF view whose instances are validated.
        instance_spaces: CDF spaces that contain the instances to validate.
            Accepts a single string (coerced to a list) or a list.
        shacl_rules: SHACL rules file reference.
        validation: Validation behaviour settings (depth, verbosity).
        records: Per-view Records API overrides.
        workflow_external_id: Explicit CDF workflow external ID.  If ``None``,
            the ID is derived from *settings.workflow.external_id_prefix*
            and *view.external_id*.
        trigger_external_id: Explicit trigger external ID.  Auto-derived
            when ``None``.
        partition_count: Number of partitions for batch historic validation.
            Default: 10.
        partition_field: DMS property used to range-partition instances
            (e.g. ``"lastUpdatedTime"``). Default: ``"lastUpdatedTime"``.
        chunk_size: Instances validated per function invocation. Default: 200.
        sync_batch_size: Overrides :attr:`TriggerSettings.batch_size` for
            this view's sync trigger.  Falls back to the global value when
            ``None``.
        use_sync_cursor_mode: Use cursor-based sync instead of passing
            instances in the trigger payload.  Requires
            ``max_concurrent_executions=1``. Default: ``False``.
        max_concurrent_executions: Maximum simultaneous CDF workflow
            executions. Default: 10.
        historic_reprocess_on_update: Controls whether this view is
            auto-enqueued for historic processing when DataProduct/ruleset
            versions are updated by `data_product_sync`. Default: ``True``.
    """

    view: ViewRef
    instance_spaces: list[str] = Field(default_factory=list)
    shacl_rules: ShaclRulesRef | None = None
    validation: ValidationSettings = Field(default_factory=ValidationSettings)
    records: RecordsOverrides = Field(default_factory=RecordsOverrides)
    dataproducts: list[DataProductRef] = Field(
        default_factory=list,
        description=(
            "DataProducts this view belongs to. Used when config_source='dataproduct' "
            "to declare which DataProduct(s) the view is published into. "
            "A view can belong to multiple DataProducts."
        ),
    )
    workflow_external_id: str | None = None
    trigger_external_id: str | None = None
    partition_count: int = Field(default=10, description="Number of partitions for batch validation")
    partition_field: str = Field(
        default="lastUpdatedTime", description="Field to use for partitioning (e.g., lastUpdatedTime, createdTime)"
    )
    chunk_size: int = Field(default=200, description="Instances per validation chunk")
    sync_batch_size: int | None = Field(
        default=None, description="Sync trigger batch size override; falls back to trigger.batch_size in settings"
    )
    uniqueness_cron: str | None = Field(
        default=None,
        description=(
            "Optional cron override for global uniqueness validation when SHACL does not define "
            "dqs:schedule. Set to null to rely on SHACL/default scheduling."
        ),
    )
    use_sync_cursor_mode: bool = Field(
        default=True,
        description="Use cursor-based sync instead of payload instances. "
        "Trigger fires as a signal (no properties fetched); function "
        "fetches changes via sync cursor. Requires max_concurrent_executions=1.",
    )
    max_concurrent_executions: int | None = Field(
        default=1, description="Maximum number of concurrent workflow executions (set on the CDF workflow)"
    )
    historic_reprocess_on_update: bool = Field(
        default=True,
        description=(
            "Whether this view should be automatically reprocessed historically when "
            "DataProduct/ruleset versions are updated."
        ),
    )
    _filename: str | None = None

    @model_validator(mode="before")
    @classmethod
    def normalise_instance_space(cls, data: dict) -> dict:
        """Accept both `instance_space` (str) and `instance_spaces` (list) from YAML."""
        if isinstance(data, dict) and "instance_space" in data and "instance_spaces" not in data:
            data["instance_spaces"] = data.pop("instance_space")
        return data

    @field_validator("instance_spaces", mode="before")
    @classmethod
    def coerce_to_list(cls, v: object) -> list[str]:
        """Coerce a bare string to a single-element list."""
        if isinstance(v, str):
            return [v]
        return v  # type: ignore[return-value]

coerce_to_list(v) classmethod

Coerce a bare string to a single-element list.

Source code in cognite_data_quality/deploy.py
@field_validator("instance_spaces", mode="before")
@classmethod
def coerce_to_list(cls, v: object) -> list[str]:
    """Coerce a bare string to a single-element list."""
    if isinstance(v, str):
        return [v]
    return v  # type: ignore[return-value]

normalise_instance_space(data) classmethod

Accept both instance_space (str) and instance_spaces (list) from YAML.

Source code in cognite_data_quality/deploy.py
@model_validator(mode="before")
@classmethod
def normalise_instance_space(cls, data: dict) -> dict:
    """Accept both `instance_space` (str) and `instance_spaces` (list) from YAML."""
    if isinstance(data, dict) and "instance_space" in data and "instance_spaces" not in data:
        data["instance_spaces"] = data.pop("instance_space")
    return data

ViewRef

Bases: BaseModel

Reference to a CDF Data Model view.

Attributes:

Name Type Description
space str

CDF space that contains the view.

external_id str

External ID of the view.

version str

Version string of the view.

Source code in cognite_data_quality/deploy.py
class ViewRef(BaseModel):
    """Reference to a CDF Data Model view.

    Attributes:
        space: CDF space that contains the view.
        external_id: External ID of the view.
        version: Version string of the view.
    """

    space: str
    external_id: str
    version: str

WorkflowSettings

Bases: BaseModel

Top-level settings for a CDF Workflow resource.

Attributes:

Name Type Description
external_id_prefix str

Prefix used when constructing per-view workflow external IDs (e.g. "dq-shacl""dq-shacl-pump").

version str

Workflow version string. Default: "1".

task WorkflowTaskSettings

Per-task retry/timeout settings.

consolidate_per_dataproduct bool

When True, deploy one instance workflow per DataProduct (dataproduct mode only).

Source code in cognite_data_quality/deploy.py
class WorkflowSettings(BaseModel):
    """Top-level settings for a CDF Workflow resource.

    Attributes:
        external_id_prefix: Prefix used when constructing per-view workflow
            external IDs (e.g. ``"dq-shacl"`` → ``"dq-shacl-pump"``).
        version: Workflow version string. Default: ``"1"``.
        task: Per-task retry/timeout settings.
        consolidate_per_dataproduct: When True, deploy one instance workflow per
            DataProduct (dataproduct mode only).
    """

    external_id_prefix: str
    version: str = Field(default="1")
    task: WorkflowTaskSettings = Field(default_factory=WorkflowTaskSettings)
    consolidate_per_dataproduct: bool = Field(
        default=False,
        description="Deploy one shared workflow per DataProduct instead of per view",
    )

WorkflowTaskSettings

Bases: BaseModel

Retry and timeout settings applied to each CDF workflow task.

Attributes:

Name Type Description
retries int

Number of task-level retries (CDF API limit: 0-10). Default: 3.

timeout int

Task timeout in seconds. Default: 600.

on_failure str

Behaviour on unrecoverable failure — "abortWorkflow" or "skipTask". Default: "abortWorkflow".

Source code in cognite_data_quality/deploy.py
class WorkflowTaskSettings(BaseModel):
    """Retry and timeout settings applied to each CDF workflow task.

    Attributes:
        retries: Number of task-level retries (CDF API limit: 0-10). Default: 3.
        timeout: Task timeout in seconds. Default: 600.
        on_failure: Behaviour on unrecoverable failure — ``"abortWorkflow"`` or
            ``"skipTask"``. Default: ``"abortWorkflow"``.
    """

    retries: int = Field(default=3, ge=0, le=10, description="CDF workflow task retries (API limit: 0-10)")
    timeout: int = Field(default=600)
    on_failure: str = Field(default="abortWorkflow")

deploy_data_models(*, client, space='dataQuality', containers=None, data_model_id='DataQualityMonitoring', data_model_version='1', dry_run=False)

Deploy data models with auto-generated views from containers.

Creates views from existing DMS containers, making container data accessible through the CDF Data Modeling UI.

Parameters:

Name Type Description Default
client CogniteClient

CogniteClient instance

required
space str

Space containing containers (default: "dataQuality")

'dataQuality'
containers list[str] | None

List of container IDs to create views for (default: ["OrchestrationState", "FunctionValidationState"])

None
data_model_id str

External ID for the data model

'DataQualityMonitoring'
data_model_version str

Version string

'1'
dry_run bool

Preview without deploying

False

Returns:

Type Description
dict[str, object]

dict with deployment results (datamodel, views, status)

Example

from cognite_data_quality import deploy_data_models from cognite.client import CogniteClient client = CogniteClient() result = deploy_data_models(client=client, dry_run=False) print(f"Created {len(result['views'])} views")

Source code in cognite_data_quality/deploy.py
def deploy_data_models(
    *,
    client: CogniteClient,
    space: str = "dataQuality",
    containers: list[str] | None = None,
    data_model_id: str = "DataQualityMonitoring",
    data_model_version: str = "1",
    dry_run: bool = False,
) -> dict[str, object]:
    """Deploy data models with auto-generated views from containers.

    Creates views from existing DMS containers, making container data
    accessible through the CDF Data Modeling UI.

    Args:
        client: CogniteClient instance
        space: Space containing containers (default: "dataQuality")
        containers: List of container IDs to create views for
                   (default: ["OrchestrationState", "FunctionValidationState"])
        data_model_id: External ID for the data model
        data_model_version: Version string
        dry_run: Preview without deploying

    Returns:
        dict with deployment results (datamodel, views, status)

    Example:
        >>> from cognite_data_quality import deploy_data_models
        >>> from cognite.client import CogniteClient
        >>> client = CogniteClient()
        >>> result = deploy_data_models(client=client, dry_run=False)
        >>> print(f"Created {len(result['views'])} views")
    """
    ensure_client_name(client)

    from cognite.client.data_classes.data_modeling import (
        DataModelApply,
        MappedPropertyApply,
        ViewApply,
    )
    from cognite.client.data_classes.data_modeling.ids import ContainerId, ViewId

    if containers is None:
        containers = ["OrchestrationState", "FunctionValidationState"]

    views = []

    for container_id in containers:
        print(f"  Processing container: {space}/{container_id}")

        # Retrieve container
        try:
            container = client.data_modeling.containers.retrieve((space, container_id))
        except Exception as e:
            print(f"  ERROR: Could not retrieve container: {e}")
            continue

        if not container:
            print(f"  WARNING: Container {space}/{container_id} not found, skipping")
            continue

        # Build view from container
        view_id = f"{container_id}View"
        view_properties = {}

        for prop_name, prop_def in container.properties.items():
            view_properties[prop_name] = MappedPropertyApply(
                container=ContainerId(space=space, external_id=container_id),
                container_property_identifier=prop_name,
                name=prop_def.description.split(".")[0].strip() if prop_def.description else prop_name,
                description=prop_def.description or "",
            )

        view = ViewApply(
            space=space,
            external_id=view_id,
            version=data_model_version,
            name=f"{container_id} View",
            description=f"Auto-generated view for {container_id} container",
            properties=view_properties,
        )

        # Check if view already exists
        try:
            existing_view = client.data_modeling.views.retrieve((space, view_id, data_model_version))
            if existing_view:
                print(f"    ✓ View {view_id} already exists ({len(view_properties)} properties)")
            else:
                print(f"    • Creating view: {view_id} ({len(view_properties)} properties)")
        except Exception:
            print(f"    • Creating view: {view_id} ({len(view_properties)} properties)")

        views.append(view)

    if not views:
        print("  No views to create")
        return {
            "datamodel": None,
            "views": [],
            "status": "skipped",
        }

    if dry_run:
        print("  [DRY RUN] Would create data model and views")
        return {
            "datamodel": {"space": space, "external_id": data_model_id, "version": data_model_version},
            "views": [{"space": v.space, "external_id": v.external_id, "version": v.version} for v in views],
            "status": "dry_run",
        }

    # Apply views
    client.data_modeling.views.apply(views)
    print(f"  Ensured {len(views)} views are deployed")

    # Create data model
    view_ids = [ViewId(space=v.space, external_id=v.external_id, version=v.version) for v in views]
    data_model = DataModelApply(
        space=space,
        external_id=data_model_id,
        version=data_model_version,
        name="Data Quality Monitoring",
        description="Data models for data quality orchestration and state tracking",
        views=view_ids,
    )

    client.data_modeling.data_models.apply(data_model)
    print(f"  Ensured data model: {space}/{data_model_id} v{data_model_version}")

    return {
        "datamodel": {"space": space, "external_id": data_model_id, "version": data_model_version},
        "views": [{"space": v.space, "external_id": v.external_id, "version": v.version} for v in views],
        "status": "deployed",
    }

deploy_functions(*, client, settings, force=False, dry_run=False, debug_save_zip=None, secrets=None)

Deploy the unified SHACL validation function using embedded code.

Uses content hashing to skip redeployment when the function code is unchanged (hash stored in function metadata).

Parameters:

Name Type Description Default
client CogniteClient

Cognite client for CDF API access.

required
settings DeploymentSettings

Deployment settings containing function external ID, runtime, and file list.

required
force bool

Force redeployment even if the function code hash matches the currently deployed function.

False
dry_run bool

Preview changes without deploying.

False
debug_save_zip Path | str | None

Directory path to save the function zip archive for inspection (e.g. "dist/debug-zips").

None
secrets dict[str, str] | None

Function secrets passed at creation time, e.g. {"client-id": "...", "client-secret": "..."}. Falls back to COGNITE_CLIENT_ID / COGNITE_CLIENT_SECRET environment variables if not provided.

None

Returns:

Type Description
dict[str, dict[str, str]]

dict with key "function" whose value is a result dict

dict[str, dict[str, str]]

containing "function" (external ID) and "status"

dict[str, dict[str, str]]

(one of "deployed", "skipped", or "dry_run").

Source code in cognite_data_quality/deploy.py
def deploy_functions(
    *,
    client: CogniteClient,
    settings: DeploymentSettings,
    force: bool = False,
    dry_run: bool = False,
    debug_save_zip: Path | str | None = None,
    secrets: dict[str, str] | None = None,
) -> dict[str, dict[str, str]]:
    """Deploy the unified SHACL validation function using embedded code.

    Uses content hashing to skip redeployment when the function code is
    unchanged (hash stored in function metadata).

    Args:
        client: Cognite client for CDF API access.
        settings: Deployment settings containing function external ID,
            runtime, and file list.
        force: Force redeployment even if the function code hash matches
            the currently deployed function.
        dry_run: Preview changes without deploying.
        debug_save_zip: Directory path to save the function zip archive
            for inspection (e.g. ``"dist/debug-zips"``).
        secrets: Function secrets passed at creation time, e.g.
            ``{"client-id": "...", "client-secret": "..."}``.
            Falls back to ``COGNITE_CLIENT_ID`` / ``COGNITE_CLIENT_SECRET``
            environment variables if not provided.

    Returns:
        dict with key ``"function"`` whose value is a result dict
        containing ``"function"`` (external ID) and ``"status"``
        (one of ``"deployed"``, ``"skipped"``, or ``"dry_run"``).
    """
    ensure_client_name(client)

    if secrets is None:
        secrets = _build_default_function_secrets(client, settings)
    if secrets is None:
        client_id = os.getenv("COGNITE_CLIENT_ID") or os.getenv("IDP_CLIENT_ID")
        client_secret = os.getenv("COGNITE_CLIENT_SECRET") or os.getenv("IDP_CLIENT_SECRET")
        if client_id and client_secret:
            secrets = {"client-id": client_id, "client-secret": client_secret}

    save_zip_dir = Path(debug_save_zip) if debug_save_zip else None

    result = _deploy_function(
        client,
        settings.function,
        force=force,
        dry_run=dry_run,
        function_code_dataset_external_id=settings.function_code_dataset_external_id,
        debug_save_zip=save_zip_dir,
        secrets=secrets,
        env_vars={"DATA_QUALITY_SPACE": settings.effective_config_space} if settings.effective_config_space else None,
    )

    return {"function": result}

deploy_incremental(*, view_config_path, config_env=None, force=False, dry_run=False, client=None)

Deploy function, workflow, and trigger for a single view config.

Loads deployment settings from the repository's scripts/ directory via shared.load_settings and deploys the validation pipeline for one view.

Parameters:

Name Type Description Default
view_config_path Path | str

Path to the view YAML config file.

required
config_env str | None

Environment name (e.g. "cog-ai"). Passed to shared.load_settings as the config_env argument. Defaults to the CONFIG_ENV environment variable when None.

None
force bool

Force redeployment even if content hashes are unchanged.

False
dry_run bool

Preview without making any CDF API calls.

False
client CogniteClient | None

Cognite client. Created from environment variables if None and dry_run is False.

None

Returns:

Type Description
dict[str, object]

dict with keys "function", "workflow", and "dry_run".

Source code in cognite_data_quality/deploy.py
def deploy_incremental(
    *,
    view_config_path: Path | str,
    config_env: str | None = None,
    force: bool = False,
    dry_run: bool = False,
    client: CogniteClient | None = None,
) -> dict[str, object]:
    """Deploy function, workflow, and trigger for a single view config.

    Loads deployment settings from the repository's ``scripts/`` directory
    via ``shared.load_settings`` and deploys the validation pipeline for
    one view.

    Args:
        view_config_path: Path to the view YAML config file.
        config_env: Environment name (e.g. ``"cog-ai"``).  Passed to
            ``shared.load_settings`` as the ``config_env`` argument.
            Defaults to the ``CONFIG_ENV`` environment variable when
            ``None``.
        force: Force redeployment even if content hashes are unchanged.
        dry_run: Preview without making any CDF API calls.
        client: Cognite client.  Created from environment variables if
            ``None`` and *dry_run* is ``False``.

    Returns:
        dict with keys ``"function"``, ``"workflow"``, and ``"dry_run"``.
    """
    import importlib.util
    import sys

    def _load_module(module_name: str, path: Path):
        spec = importlib.util.spec_from_file_location(module_name, path)
        if spec is None or spec.loader is None:
            raise ImportError(f"Could not load module {module_name} from {path}")
        module = importlib.util.module_from_spec(spec)
        sys.modules[module_name] = module
        spec.loader.exec_module(module)
        return module

    def _load_view_config(path: Path) -> dict:
        if not path.exists():
            raise FileNotFoundError(f"View config not found: {path}")

        config = _load_yaml_with_env(path)
        config["_filename"] = path.name
        return config

    def _repo_root() -> Path:
        return Path(__file__).resolve().parents[1]

    repo_root = _repo_root()
    scripts_dir = repo_root / "scripts"

    shared = _load_module("dq_shared", scripts_dir / "shared.py")
    deploy_function = _load_module("dq_deploy_function", scripts_dir / "deploy_function.py")
    deploy_workflows = _load_module("dq_deploy_workflows", scripts_dir / "deploy_workflows.py")

    settings = shared.load_settings(repo_root, config_env)
    view_config = _load_view_config(Path(view_config_path))

    if client is None and not dry_run:
        client = shared.create_client()
    if client is not None:
        ensure_client_name(client)

    function_result = deploy_function.deploy_function_by_settings_key(
        client, repo_root, settings, "function", force=force, dry_run=dry_run
    )
    workflow_result = deploy_workflows.deploy_view(
        client, repo_root, settings, view_config, force=force, dry_run=dry_run
    )

    return {
        "function": function_result,
        "workflow": workflow_result,
        "dry_run": dry_run,
    }

deploy_raw_table_shacl(*, client, settings, tables, shacl_dir, dry_run=False)

Deploy SHACL rules for RAW table validation.

For RAW tables, we only upload SHACL files - no workflows or triggers are created. RAW validation is triggered explicitly via function calls.

Parameters:

Name Type Description Default
client CogniteClient

CogniteClient instance

required
settings DeploymentSettings

Deployment settings with records configuration

required
tables list[RawTableConfig]

List of RAW table configs

required
shacl_dir Path

Directory containing SHACL rule files

required
dry_run bool

Preview without uploading

False

Returns:

Type Description
list[dict[str, object]]

list of deployment results for each table

Source code in cognite_data_quality/deploy.py
def deploy_raw_table_shacl(
    *,
    client: CogniteClient,
    settings: DeploymentSettings,
    tables: list[RawTableConfig],
    shacl_dir: Path,
    dry_run: bool = False,
) -> list[dict[str, object]]:
    """Deploy SHACL rules for RAW table validation.

    For RAW tables, we only upload SHACL files - no workflows or triggers are created.
    RAW validation is triggered explicitly via function calls.

    Args:
        client: CogniteClient instance
        settings: Deployment settings with records configuration
        tables: List of RAW table configs
        shacl_dir: Directory containing SHACL rule files
        dry_run: Preview without uploading

    Returns:
        list of deployment results for each table
    """
    ensure_client_name(client)

    if not tables:
        return []

    results: list[dict[str, object]] = []

    for table_config in tables:
        db_name = table_config.table.database
        table_name = table_config.table.name
        print(f"\n  RAW table: {db_name}.{table_name}")

        # Upload SHACL rules
        records = settings.records
        overrides = table_config.records
        _upload_shacl_rules(
            client,
            settings,
            shacl_dir,
            table_config.shacl_rules,
            dry_run=dry_run,
            rule_set_id=overrides.rule_set_id if overrides else records.container,
            rule_set_version=overrides.rule_set_version if overrides else None,
        )

        results.append(
            {
                "database": db_name,
                "table": table_name,
                "shacl_file": table_config.shacl_rules.external_id,
                "status": "deployed" if not dry_run else "dry_run",
            }
        )

    return results

deploy_rule_engine_result_sync_workflows(*, client, settings, configs, force=False, dry_run=False, config_filter=None, secrets=None)

Deploy scheduled workflows for incremental RuleEngineResult listeners.

Source code in cognite_data_quality/deploy.py
def deploy_rule_engine_result_sync_workflows(
    *,
    client: CogniteClient,
    settings: DeploymentSettings,
    configs: list[RuleEngineResultSyncConfig],
    force: bool = False,
    dry_run: bool = False,
    config_filter: list[str] | None = None,
    secrets: dict[str, str] | None = None,
) -> list[dict[str, object]]:
    """Deploy scheduled workflows for incremental RuleEngineResult listeners."""
    from cognite_data_quality._workflow_deployer import deploy_rule_engine_result_sync_workflows as _deploy_re_sync

    return _deploy_re_sync(
        client=client,
        settings=settings,
        configs=configs,
        force=force,
        dry_run=dry_run,
        config_filter=config_filter,
        secrets=secrets,
    )

deploy_timeseries_workflows(*, client, settings, configs, shacl_dir, force=False, dry_run=False, config_filter=None, skip_unchanged=True, dp_version=None, secrets=None)

Deploy scheduled workflows and triggers for time series validation.

Source code in cognite_data_quality/deploy.py
def deploy_timeseries_workflows(
    *,
    client: CogniteClient,
    settings: DeploymentSettings,
    configs: list[TimeseriesConfig],
    shacl_dir: Path | None,
    force: bool = False,
    dry_run: bool = False,
    config_filter: list[str] | None = None,
    skip_unchanged: bool = True,
    dp_version: str | None = None,
    secrets: dict[str, str] | None = None,
) -> list[dict[str, object]]:
    """Deploy scheduled workflows and triggers for time series validation."""
    from cognite_data_quality._workflow_deployer import deploy_timeseries_workflows as _deploy_ts

    return _deploy_ts(
        client=client,
        settings=settings,
        configs=configs,
        shacl_dir=shacl_dir,
        force=force,
        dry_run=dry_run,
        config_filter=config_filter,
        skip_unchanged=skip_unchanged,
        dp_version=dp_version,
        secrets=secrets,
    )

deploy_validation_infrastructure(*, client, settings_path, views_dir=None, filter_views_by_shacl_rules=False, timeseries_dir=None, rule_engine_result_sync_dir=None, shacl_rules_dir=None, function_external_id=None, function_secrets=None, force=False, force_function=False, force_workflows=False, dry_run=False, debug_save_zip=None, deploy_data_product_sync=False, data_product_sync_cron='0 * * * *', historic_queue_manager_cron='*/5 * * * *')

Deploy functions, workflows, and triggers using embedded function code.

View configs are loaded from views/*.yaml files in the settings directory. When settings.config_source is "dataproduct", the SHACL .ttl files are published to the RuleSet API and view configs are published to the DataProduct API instead of being uploaded as CDF Files. When config_source is "yaml" (default), SHACL and view configs are uploaded as CDF Files as usual.

When filter_views_by_shacl_rules is True, only views referenced via sh:targetClass in their SHACL rules are deployed.

This function also ensures all required DMS containers exist in CDF before deploying.

Parameters:

Name Type Description Default
client CogniteClient

Cognite client for CDF API access.

required
settings_path Path | str

Path to the settings.yaml file for this environment.

required
views_dir Path | str | None

Directory containing view YAML configs. Defaults to settings_path.parent / "views".

None
filter_views_by_shacl_rules bool

If True, restrict deployment to views that appear as sh:targetClass targets in the SHACL rules file. Default False.

False
timeseries_dir Path | str | None

Directory containing time series validation YAML configs. Defaults to settings.timeseries.config_dir from settings.yaml.

None
rule_engine_result_sync_dir Path | str | None

Directory containing incremental RuleEngineResult listener YAML configs. Defaults to settings.rule_engine_result_sync.config_dir from settings.yaml when that section is present.

None
shacl_rules_dir Path | str | None

Directory containing SHACL .ttl rule files. Defaults to settings.shacl_rules.source_dir from settings.yaml.

None
function_external_id str | None

Override for the deployed function external ID. Defaults to "data-quality-validation".

None
function_secrets dict[str, str] | None

Secrets to inject into the function at creation time, e.g. {"client-id": "...", "client-secret": "..."}. If not provided, falls back to the COGNITE_CLIENT_ID / COGNITE_CLIENT_SECRET environment variables.

None
force bool

Force redeployment of both functions and workflows even when content hashes are unchanged.

False
force_function bool

Force redeployment of the function only; workflows are still skipped if their hashes are unchanged.

False
force_workflows bool

Force redeployment of workflows only; the function is still skipped if its hash is unchanged.

False
dry_run bool

Preview all changes without making any CDF API calls.

False
debug_save_zip Path | str | None

Directory path to save each function zip archive for inspection (e.g. "dist/debug-zips").

None
deploy_data_product_sync bool

Deploy a scheduled workflow that discovers all DataProducts with quality rules and auto-deploys validation workflows for them.

False
data_product_sync_cron str

Cron expression for the data product sync trigger. Default: "0 * * * *" (hourly).

'0 * * * *'
historic_queue_manager_cron str

Cron expression for the historic queue manager trigger. Default: "*/5 * * * *" (every 5 minutes). Only deployed when deploy_data_product_sync is True.

'*/5 * * * *'

Returns:

Type Description
dict[str, object]

dict with keys "functions", "workflows",

dict[str, object]

"timeseries_workflows", "rule_engine_result_sync_workflows", "raw_tables", and optionally

dict[str, object]

"data_product_sync" and "historic_queue_manager", each

dict[str, object]

containing a list of per-item deployment result dicts with at

dict[str, object]

minimum "status"

dict[str, object]

("deployed", "skipped", or "dry_run").

Source code in cognite_data_quality/deploy.py
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
def deploy_validation_infrastructure(
    *,
    client: CogniteClient,
    settings_path: Path | str,
    views_dir: Path | str | None = None,
    filter_views_by_shacl_rules: bool = False,
    timeseries_dir: Path | str | None = None,
    rule_engine_result_sync_dir: Path | str | None = None,
    shacl_rules_dir: Path | str | None = None,
    function_external_id: str | None = None,
    function_secrets: dict[str, str] | None = None,
    force: bool = False,
    force_function: bool = False,
    force_workflows: bool = False,
    dry_run: bool = False,
    debug_save_zip: Path | str | None = None,
    deploy_data_product_sync: bool = False,
    data_product_sync_cron: str = "0 * * * *",
    historic_queue_manager_cron: str = "*/5 * * * *",
) -> dict[str, object]:
    """Deploy functions, workflows, and triggers using embedded function code.

    View configs are loaded from ``views/*.yaml`` files in the settings
    directory.  When ``settings.config_source`` is ``"dataproduct"``, the
    SHACL ``.ttl`` files are published to the RuleSet API and view configs
    are published to the DataProduct API instead of being uploaded as CDF
    Files.  When ``config_source`` is ``"yaml"`` (default), SHACL and view
    configs are uploaded as CDF Files as usual.

    When *filter_views_by_shacl_rules* is ``True``, only views referenced
    via ``sh:targetClass`` in their SHACL rules are deployed.

    This function also ensures all required DMS containers exist in CDF
    before deploying.

    Args:
        client: Cognite client for CDF API access.
        settings_path: Path to the ``settings.yaml`` file for this environment.
        views_dir: Directory containing view YAML configs.  Defaults to
            ``settings_path.parent / "views"``.
        filter_views_by_shacl_rules: If ``True``, restrict deployment to
            views that appear as ``sh:targetClass`` targets in the SHACL
            rules file.  Default ``False``.
        timeseries_dir: Directory containing time series validation YAML
            configs.  Defaults to ``settings.timeseries.config_dir`` from
            ``settings.yaml``.
        rule_engine_result_sync_dir: Directory containing incremental
            RuleEngineResult listener YAML configs. Defaults to
            ``settings.rule_engine_result_sync.config_dir`` from
            ``settings.yaml`` when that section is present.
        shacl_rules_dir: Directory containing SHACL ``.ttl`` rule files.
            Defaults to ``settings.shacl_rules.source_dir`` from
            ``settings.yaml``.
        function_external_id: Override for the deployed function external ID.
            Defaults to ``"data-quality-validation"``.
        function_secrets: Secrets to inject into the function at creation
            time, e.g. ``{"client-id": "...", "client-secret": "..."}``.
            If not provided, falls back to the ``COGNITE_CLIENT_ID`` /
            ``COGNITE_CLIENT_SECRET`` environment variables.
        force: Force redeployment of both functions and workflows even when
            content hashes are unchanged.
        force_function: Force redeployment of the function only; workflows
            are still skipped if their hashes are unchanged.
        force_workflows: Force redeployment of workflows only; the function
            is still skipped if its hash is unchanged.
        dry_run: Preview all changes without making any CDF API calls.
        debug_save_zip: Directory path to save each function zip archive for
            inspection (e.g. ``"dist/debug-zips"``).
        deploy_data_product_sync: Deploy a scheduled workflow that
            discovers all DataProducts with quality rules and
            auto-deploys validation workflows for them.
        data_product_sync_cron: Cron expression for the data product
            sync trigger.  Default: ``"0 * * * *"`` (hourly).
        historic_queue_manager_cron: Cron expression for the historic
            queue manager trigger.  Default: ``"*/5 * * * *"`` (every
            5 minutes).  Only deployed when ``deploy_data_product_sync``
            is ``True``.

    Returns:
        dict with keys ``"functions"``, ``"workflows"``,
        ``"timeseries_workflows"``, ``"rule_engine_result_sync_workflows"``, ``"raw_tables"``, and optionally
        ``"data_product_sync"`` and ``"historic_queue_manager"``, each
        containing a list of per-item deployment result dicts with at
        minimum ``"status"``
        (``"deployed"``, ``"skipped"``, or ``"dry_run"``).
    """
    ensure_client_name(client)

    settings_path = Path(settings_path)
    settings = load_settings(settings_path)
    base_dir = settings_path.parent

    # Override external_id if provided
    if function_external_id is not None:
        settings.function.external_id = function_external_id

    # Ensure all data quality infrastructure containers exist
    if not dry_run:
        print("Ensuring data quality infrastructure containers...")

        # Ensure space exists once (all containers use the same space)
        from cognite_data_quality._containers import _ensure_space_exists

        space = settings.effective_config_space
        _ensure_space_exists(client, space)

        # Records container (for validation results)
        print(f"  Records: {settings.records.space}/{settings.records.container}")
        _ensure_records_container(client, settings.records.space, settings.records.container)

        # Rule engine results container (for SHACL-AF construct rule outputs).
        # Co-locates in the same space as the validation records container.
        print(f"  Rule Engine: {settings.records.space}/{settings.records.rule_engine_container}")
        _ensure_rule_engine_container(client, settings.records.space, settings.records.rule_engine_container)

        # State containers (for orchestration and function state tracking)
        from cognite_data_quality._containers import (
            _ensure_data_product_sync_state_container,
            _ensure_data_quality_settings_container,
            _ensure_function_validation_state_container,
            _ensure_historic_job_queue_container,
            _ensure_orchestration_state_container,
            _ensure_raw_validation_state_container,
            _ensure_timeseries_config_state_container,
            _ensure_view_config_state_container,
            _upsert_settings_to_dms,
        )

        print(f"  Orchestration: {space}/OrchestrationState")
        _ensure_orchestration_state_container(client, space, "OrchestrationState")

        print(f"  Function State: {space}/FunctionValidationState")
        _ensure_function_validation_state_container(client, space, "FunctionValidationState")

        print(f"  RAW Validation State: {space}/RawValidationState")
        _ensure_raw_validation_state_container(client, space, "RawValidationState")

        print(f"  DataQuality Settings: {space}/DataQualitySettings")
        _ensure_data_quality_settings_container(client, space, "DataQualitySettings")

        print(f"  DataProduct Sync State: {space}/DataProductSyncState")
        _ensure_data_product_sync_state_container(client, space, "DataProductSyncState")

        print(f"  View Config State: {space}/ViewConfigState")
        _ensure_view_config_state_container(client, space, "ViewConfigState")
        print(f"  Historic Job Queue: {space}/HistoricJobQueue")
        _ensure_historic_job_queue_container(client, space, "HistoricJobQueue")

        print(f"  Timeseries Config State: {space}/TimeseriesConfigState")
        _ensure_timeseries_config_state_container(client, space, "TimeseriesConfigState")

        # Write deployment settings to DMS for runtime access by sync handler
        _upsert_settings_to_dms(client, space, "DataQualitySettings", settings)

        # Deploy monitoring data models and views (for UI access to state containers)
        print("\nDeploying data quality monitoring data model...")
        deploy_data_models(
            client=client,
            space=space,
            containers=[
                "FunctionValidationState",
                "OrchestrationState",
                "RawValidationState",
                "DataQualitySettings",
                "DataProductSyncState",
                "ViewConfigState",
                "HistoricJobQueue",
                "TimeseriesConfigState",
            ],
            data_model_id="DataQualityMonitoring",
            data_model_version="1",
            dry_run=False,
        )

    # shacl_rules.source_dir is relative to settings file directory
    shacl_dir = (
        _resolve_path(base_dir, shacl_rules_dir)
        if shacl_rules_dir is not None
        else _resolve_path(base_dir, settings.shacl_rules.source_dir)
        if settings.shacl_rules is not None
        else base_dir / "shacl_rules"
    )
    if timeseries_dir is None and settings.timeseries is not None:
        timeseries_dir = settings.timeseries.config_dir
    if rule_engine_result_sync_dir is None and settings.rule_engine_result_sync is not None:
        rule_engine_result_sync_dir = settings.rule_engine_result_sync.config_dir
    resolved_timeseries_dir = _resolve_path(base_dir, timeseries_dir) if timeseries_dir is not None else None
    resolved_rule_engine_sync_dir = (
        _resolve_path(base_dir, rule_engine_result_sync_dir) if rule_engine_result_sync_dir is not None else None
    )

    resolved_views_dir = _resolve_path(base_dir, views_dir or "views")
    view_configs = load_view_configs(resolved_views_dir)

    timeseries_configs = (
        load_timeseries_configs(resolved_timeseries_dir)
        if resolved_timeseries_dir is not None and resolved_timeseries_dir.exists()
        else []
    )
    rule_engine_sync_configs = (
        load_rule_engine_result_sync_configs(resolved_rule_engine_sync_dir)
        if resolved_rule_engine_sync_dir is not None and resolved_rule_engine_sync_dir.exists()
        else []
    )

    if settings.config_source == "dataproduct":
        from collections import defaultdict

        from cognite_data_quality._dataproduct_loader import timeseries_view_entry_from_config
        from cognite_data_quality._dataproduct_publisher import (
            normalize_ruleset_external_id,
            publish_shacl_to_ruleset,
            publish_view_config_to_data_product,
        )

        print("\nPublishing SHACL rules to RuleSet API and views to DataProduct API...")

        # Step 1: publish each view's SHACL to its RuleSet; rewrite shacl_rules
        # to use ruleset_references.  A view's rs_version is taken from its
        # records.rule_set_version if set, otherwise falls back to the first
        # DataProduct version it belongs to (or "1.0.0" as a safe default).
        for vc in view_configs:
            # Skip views with no local rules (rules come from the DataProduct API).
            if vc.shacl_rules is None:
                continue
            # Skip views already using ruleset_references (already migrated).
            if vc.shacl_rules.ruleset_references:
                continue
            if not vc.shacl_rules.file or not vc.shacl_rules.external_id:
                continue
            rs_ext_id = normalize_ruleset_external_id(vc.shacl_rules.external_id)
            if vc.dataproducts:
                fallback_version = vc.dataproducts[0].version
            else:
                fallback_version = "1.0.0"
            rs_version = vc.records.rule_set_version or fallback_version

            if not dry_run:
                rs_result = publish_shacl_to_ruleset(
                    client,
                    shacl_path=shacl_dir / vc.shacl_rules.file,
                    ruleset_ext_id=rs_ext_id,
                    version=rs_version,
                    create_ruleset=True,
                    ruleset_name=f"{vc.view.external_id} SHACL Rules",
                )
                resolved_rs_version = str(rs_result.get("version", rs_version))
                print(
                    _format_ruleset_publish_message(
                        ruleset_ext_id=rs_ext_id,
                        requested_version=rs_version,
                        resolved_version=resolved_rs_version,
                    )
                )
                rs_version = resolved_rs_version
            else:
                print(f"  [dry-run] Would publish RuleSet: {rs_ext_id}@{rs_version}")

            vc.shacl_rules = ShaclRulesRef(ruleset_references=[{"externalId": rs_ext_id, "version": rs_version}])

        # Step 1b: publish timeseries SHACL to RuleSet API
        for tc in timeseries_configs:
            if tc.shacl_rules.ruleset_references:
                continue
            if not tc.shacl_rules.file or not tc.shacl_rules.external_id:
                continue
            rs_ext_id = normalize_ruleset_external_id(tc.shacl_rules.external_id)
            if tc.dataproducts:
                fallback_version = tc.dataproducts[0].version
            else:
                fallback_version = "1.0.0"
            rs_version = tc.records.rule_set_version or fallback_version

            if not dry_run:
                rs_result = publish_shacl_to_ruleset(
                    client,
                    shacl_path=shacl_dir / tc.shacl_rules.file,
                    ruleset_ext_id=rs_ext_id,
                    version=rs_version,
                    create_ruleset=True,
                    ruleset_name=f"{tc.name} Time Series SHACL Rules",
                )
                resolved_rs_version = str(rs_result.get("version", rs_version))
                print(
                    _format_ruleset_publish_message(
                        ruleset_ext_id=rs_ext_id,
                        requested_version=rs_version,
                        resolved_version=resolved_rs_version,
                        kind="timeseries RuleSet",
                    )
                )
                rs_version = resolved_rs_version
            else:
                print(f"  [dry-run] Would publish timeseries RuleSet: {rs_ext_id}@{rs_version}")

            tc.shacl_rules = ShaclRulesRef(ruleset_references=[{"externalId": rs_ext_id, "version": rs_version}])
            tc.records.rule_set_id = rs_ext_id
            tc.records.rule_set_version = rs_version
            if tc.dataproducts and not tc.records.data_domain_external_id:
                tc.records.data_domain_external_id = tc.dataproducts[0].external_id

        # Step 2: group views by (dp_external_id, dp_version) and publish each DP.
        # A view may appear in multiple DataProducts.  Also track the first dp_ref
        # seen for each key to capture schema_space.
        dp_groups: dict[tuple[str, str], list[ViewConfig]] = defaultdict(list)
        dp_ts_groups: dict[tuple[str, str], list[TimeseriesConfig]] = defaultdict(list)
        dp_refs: dict[tuple[str, str], DataProductRef] = {}
        for vc in view_configs:
            for dp_ref in vc.dataproducts:
                key = (dp_ref.external_id, dp_ref.version)
                dp_groups[key].append(vc)
                dp_refs.setdefault(key, dp_ref)
        for tc in timeseries_configs:
            for dp_ref in tc.dataproducts:
                key = (dp_ref.external_id, dp_ref.version)
                dp_ts_groups[key].append(tc)
                dp_refs.setdefault(key, dp_ref)

        if not dp_groups and not dp_ts_groups:
            print(
                "  Warning: no views or timeseries configs have a 'dataproducts' list — "
                "no DataProducts will be published."
            )

        all_dp_keys = set(dp_groups.keys()) | set(dp_ts_groups.keys())
        for dp_key in all_dp_keys:
            dp_ext_id, dp_version = dp_key
            dp_views = dp_groups.get(dp_key, [])
            dp_ts = dp_ts_groups.get(dp_key, [])
            ref = dp_refs[dp_key]
            extra_views = [timeseries_view_entry_from_config(tc) for tc in dp_ts]
            if not dry_run:
                dp_result = publish_view_config_to_data_product(
                    client,
                    view_configs=dp_views,
                    dp_ext_id=dp_ext_id,
                    dp_version=dp_version,
                    create_dp=True,
                    schema_space=ref.schema_space,
                    status="published",
                    extra_views=extra_views or None,
                    timeseries_configs=dp_ts or None,
                )
                resolved_version = dp_result.get("version", dp_version)
                print(
                    _format_dataproduct_publish_message(
                        dp_ext_id=dp_ext_id,
                        requested_version=dp_version,
                        resolved_version=str(resolved_version),
                        view_count=len(dp_views),
                        timeseries_count=len(dp_ts),
                    )
                )
            else:
                print(
                    f"  [dry-run] Would publish DataProduct: {dp_ext_id}@{dp_version} "
                    f"({len(dp_views)} view(s), {len(dp_ts)} timeseries config(s))"
                )

        effective_shacl_dir = None  # no CDF File uploads in dataproduct mode
    else:
        effective_shacl_dir = shacl_dir

    if filter_views_by_shacl_rules and view_configs:
        target_views: set[str] = set()
        for vc in view_configs:
            if vc.shacl_rules and vc.shacl_rules.file and effective_shacl_dir:
                target_views |= extract_target_views_from_shacl(effective_shacl_dir / vc.shacl_rules.file)
        view_configs = [vc for vc in view_configs if vc.view.external_id in target_views]

    # Load RAW table configs from tables/ directory
    resolved_tables_dir = _resolve_path(base_dir, "tables")
    table_configs = load_table_configs(resolved_tables_dir) if resolved_tables_dir.exists() else []

    function_results = deploy_functions(
        client=client,
        settings=settings,
        force=force or force_function,
        dry_run=dry_run,
        debug_save_zip=debug_save_zip,
        secrets=function_secrets,
    )
    if not dry_run:
        _wait_for_functions_ready(client, [settings.function.external_id])

    # Persist per-view settings to DMS so data_product_sync can use them
    # instead of the DataProduct API defaults.
    if not dry_run and view_configs:
        from cognite_data_quality._containers import _upsert_view_config_state

        space = settings.effective_config_space
        print("\nPersisting view config settings to DMS...")
        for vc in view_configs:
            _upsert_view_config_state(client, space, "ViewConfigState", vc)

    if not dry_run and timeseries_configs:
        from cognite_data_quality._containers import _upsert_timeseries_config_state

        space = settings.records.space
        print("\nPersisting timeseries config settings to DMS...")
        for tc in timeseries_configs:
            dp_ids = [dp.external_id for dp in tc.dataproducts] if tc.dataproducts else [tc.name]
            for dp_id in dp_ids:
                _upsert_timeseries_config_state(
                    client, space, "TimeseriesConfigState", tc, data_product_external_id=dp_id
                )

    # In dataproduct mode, workflow deployment is handled by the data_product_sync
    # scheduled function, which detects DataProduct version changes and deploys
    # workflows with the correct rule_set_id from the DataProduct API.
    workflow_results: list[dict[str, object]] = []
    if settings.config_source != "dataproduct":
        workflow_results = deploy_instance_workflows(
            client=client,
            settings=settings,
            views=view_configs,
            shacl_dir=effective_shacl_dir,
            force=force or force_workflows,
            dry_run=dry_run,
        )

    # Deploy workflows for externally pre-defined DataProducts (no local TTL/YAML needed).
    ext_results: list[dict[str, object]] = []
    if settings.external_dataproducts:
        print("\nDeploying workflows for external DataProducts...")
        ext_results = _deploy_external_dataproduct_workflows(
            client=client,
            settings=settings,
            refs=settings.external_dataproducts,
            local_view_configs=view_configs,
            force=force or force_workflows,
            dry_run=dry_run,
        )
    timeseries_results: list[dict[str, object]] = []
    if timeseries_configs:
        for tc in timeseries_configs:
            tc_dp_version = (
                tc.records.rule_set_version
                if tc.shacl_rules.ruleset_references
                else (tc.dataproducts[0].version if tc.dataproducts else None)
            )
            single_result = deploy_timeseries_workflows(
                client=client,
                settings=settings,
                configs=[tc],
                shacl_dir=effective_shacl_dir if settings.config_source == "dataproduct" else shacl_dir,
                force=force or force_workflows,
                dry_run=dry_run,
                dp_version=tc_dp_version,
                secrets=function_secrets,
            )
            timeseries_results.extend(single_result)
    rule_engine_sync_results = deploy_rule_engine_result_sync_workflows(
        client=client,
        settings=settings,
        configs=rule_engine_sync_configs,
        force=force or force_workflows,
        dry_run=dry_run,
        secrets=function_secrets,
    )

    # Deploy RAW table SHACL rules
    table_results = deploy_raw_table_shacl(
        client=client,
        settings=settings,
        tables=table_configs,
        shacl_dir=shacl_dir,
        dry_run=dry_run,
    )

    # Deploy DataProduct sync workflow (discovers all DPs and auto-deploys validation)
    dp_sync_results: list[dict[str, object]] = []
    hq_manager_results: list[dict[str, object]] = []
    if deploy_data_product_sync:
        dp_sync_results = _deploy_data_product_sync_workflow(
            client=client,
            settings=settings,
            cron=data_product_sync_cron,
            dry_run=dry_run,
        )
        hq_manager_results = _deploy_historic_queue_manager_workflow(
            client=client,
            settings=settings,
            cron=historic_queue_manager_cron,
            dry_run=dry_run,
        )

    return {
        "functions": function_results,
        "workflows": workflow_results,
        "timeseries_workflows": timeseries_results,
        "rule_engine_result_sync_workflows": rule_engine_sync_results,
        "raw_tables": table_results,
        "data_product_sync": dp_sync_results,
        "external_dataproduct_workflows": ext_results,
        "historic_queue_manager": hq_manager_results,
    }

load_rule_engine_result_sync_configs(path)

Load RuleEngineResult sync listener configs from a directory.

Source code in cognite_data_quality/deploy.py
def load_rule_engine_result_sync_configs(path: Path | str) -> list[RuleEngineResultSyncConfig]:
    """Load RuleEngineResult sync listener configs from a directory."""
    config_dir = Path(path)
    if not config_dir.exists():
        return []
    configs: list[RuleEngineResultSyncConfig] = []
    for config_file in sorted(config_dir.glob("*.yaml")):
        data = _load_yaml_with_env(config_file)
        data["_filename"] = config_file.name
        configs.append(RuleEngineResultSyncConfig(**data))
    return configs

load_settings(path)

Load deployment settings from a YAML file.

Parameters:

Name Type Description Default
path Path | str

Path to the settings.yaml file.

required

Returns:

Name Type Description
Parsed DeploymentSettings

class:DeploymentSettings object.

Source code in cognite_data_quality/deploy.py
def load_settings(path: Path | str) -> DeploymentSettings:
    """Load deployment settings from a YAML file.

    Args:
        path: Path to the ``settings.yaml`` file.

    Returns:
        Parsed :class:`DeploymentSettings` object.
    """
    settings_path = Path(path)
    data = _load_yaml_with_env(settings_path)
    return DeploymentSettings(**data)

load_table_configs(path)

Load RAW table validation configs from a directory.

Reads all *.yaml files in path and parses them as :class:RawTableConfig objects. Returns an empty list if the directory does not exist.

Parameters:

Name Type Description Default
path Path | str

Directory containing RAW table YAML config files.

required

Returns:

Type Description
list[RawTableConfig]

List of :class:RawTableConfig objects, sorted by filename.

list[RawTableConfig]

Empty list if path does not exist.

Source code in cognite_data_quality/deploy.py
def load_table_configs(path: Path | str) -> list[RawTableConfig]:
    """Load RAW table validation configs from a directory.

    Reads all ``*.yaml`` files in *path* and parses them as
    :class:`RawTableConfig` objects.  Returns an empty list if the
    directory does not exist.

    Args:
        path: Directory containing RAW table YAML config files.

    Returns:
        List of :class:`RawTableConfig` objects, sorted by filename.
        Empty list if *path* does not exist.
    """
    config_dir = Path(path)
    if not config_dir.exists():
        return []
    configs: list[RawTableConfig] = []
    for config_file in sorted(config_dir.glob("*.yaml")):
        data = _load_yaml_with_env(config_file)
        data["_filename"] = config_file.name
        configs.append(RawTableConfig(**data))
    return configs

load_timeseries_configs(path)

Load time series validation configs from a directory.

Reads all *.yaml files in path and parses them as :class:TimeseriesConfig objects.

Parameters:

Name Type Description Default
path Path | str

Directory containing time series YAML config files.

required

Returns:

Type Description
list[TimeseriesConfig]

List of :class:TimeseriesConfig objects, sorted by filename.

Source code in cognite_data_quality/deploy.py
def load_timeseries_configs(path: Path | str) -> list[TimeseriesConfig]:
    """Load time series validation configs from a directory.

    Reads all ``*.yaml`` files in *path* and parses them as
    :class:`TimeseriesConfig` objects.

    Args:
        path: Directory containing time series YAML config files.

    Returns:
        List of :class:`TimeseriesConfig` objects, sorted by filename.
    """
    config_dir = Path(path)
    configs: list[TimeseriesConfig] = []
    for config_file in sorted(config_dir.glob("*.yaml")):
        data = _load_yaml_with_env(config_file)
        data["_filename"] = config_file.name
        configs.append(TimeseriesConfig(**data))
    return configs

load_view_configs(path)

Load instance validation view configs from a directory.

Reads all *.yaml files in path and parses them as :class:ViewConfig objects.

Parameters:

Name Type Description Default
path Path | str

Directory containing view YAML config files.

required

Returns:

Type Description
list[ViewConfig]

List of :class:ViewConfig objects, sorted by filename.

Source code in cognite_data_quality/deploy.py
def load_view_configs(path: Path | str) -> list[ViewConfig]:
    """Load instance validation view configs from a directory.

    Reads all ``*.yaml`` files in *path* and parses them as
    :class:`ViewConfig` objects.

    Args:
        path: Directory containing view YAML config files.

    Returns:
        List of :class:`ViewConfig` objects, sorted by filename.
    """
    config_dir = Path(path)
    configs: list[ViewConfig] = []
    for config_file in sorted(config_dir.glob("*.yaml")):
        data = _load_yaml_with_env(config_file)
        data["_filename"] = config_file.name
        configs.append(ViewConfig(**data))
    return configs

validate_per_view_sync_cursor_concurrency(view_config)

Enforce max_concurrent_executions=1 for YAML per-view sync-cursor deploy.

Source code in cognite_data_quality/deploy.py
def validate_per_view_sync_cursor_concurrency(view_config: ViewConfig) -> None:
    """Enforce max_concurrent_executions=1 for YAML per-view sync-cursor deploy."""
    if view_config.use_sync_cursor_mode and view_config.max_concurrent_executions != 1:
        raise ValueError(
            "'use_sync_cursor_mode' is enabled, so 'max_concurrent_executions' must be 1 "
            "to prevent race conditions on the sync cursor."
        )