Skip to content

Invoke

The cognite_data_quality.invoke module provides helpers for calling already-deployed Cognite Functions from Python. These helpers send a payload to the function running in CDF — they do not run validation locally.

  • call_validation() — unified entry point, dispatches by validation_type
  • call_validate_instances_shacl() — instance validation (type: instance)
  • call_validate_instances_shacl_partitioned() — partitioned worker (type: partitioned)
  • call_validate_timeseries_shacl() — time series validation (type: timeseries)
  • call_validate_aggregate_uniqueness() — aggregate uniqueness (type: aggregate_uniqueness)
  • call_validate_shacl() — SHACL-centric entrypoint with extension support (type: shacl)
  • deploy_validation_pipeline() — deploy and run the full validation pipeline (orchestrator)

See Invoking validation functions for usage examples and payload key descriptions.

cognite_data_quality.invoke

Invoke deployed Cognite Functions from Python.

Use these helpers to call the data quality validation functions from a notebook, script, or any Python runtime. Each function is a thin wrapper around client.functions.call().

call_rule_engine_result_sync(client, data, *, wait=True, external_id='data-quality-validation')

Call the incremental RuleEngineResult sync handler.

Source code in cognite_data_quality/invoke.py
def call_rule_engine_result_sync(
    client: CogniteClient,
    data: dict,
    *,
    wait: bool = True,
    external_id: str = "data-quality-validation",
) -> dict:
    """Call the incremental RuleEngineResult sync handler."""
    return call_validation(client, "rule_engine_result_sync", data, wait=wait, external_id=external_id)

call_validate_aggregate_uniqueness(client, data, *, wait=True, external_id='data-quality-validation')

Call the aggregate uniqueness validation handler.

Convenience wrapper for call_validation(validation_type="aggregate_uniqueness", ...).

Source code in cognite_data_quality/invoke.py
def call_validate_aggregate_uniqueness(
    client: CogniteClient,
    data: dict,
    *,
    wait: bool = True,
    external_id: str = "data-quality-validation",
) -> dict:
    """Call the aggregate uniqueness validation handler.

    Convenience wrapper for call_validation(validation_type="aggregate_uniqueness", ...).
    """
    return call_validation(client, "aggregate_uniqueness", data, wait=wait, external_id=external_id)

call_validate_instances_shacl(client, data, *, wait=True, external_id='data-quality-validation')

Call the instance SHACL validation function.

Convenience wrapper for call_validation(validation_type="instance", ...).

Parameters:

Name Type Description Default
client CogniteClient

Cognite client instance.

required
data dict

Payload dict. Typical keys: instances (dict with items), shacl_rules or shacl_rules_file_external_id, datamodel_space, datamodel_external_id, datamodel_version, auto_load_depth, records_config (stream_id, rule_set_id, etc.).

required
wait bool

If True, block until the function returns.

True
external_id str

Function external ID (default: data-quality-validation).

'data-quality-validation'

Returns:

Type Description
dict

Response dict with conforms, violations, instances_validated, etc.

Source code in cognite_data_quality/invoke.py
def call_validate_instances_shacl(
    client: CogniteClient,
    data: dict,
    *,
    wait: bool = True,
    external_id: str = "data-quality-validation",
) -> dict:
    """Call the instance SHACL validation function.

    Convenience wrapper for call_validation(validation_type="instance", ...).

    Args:
        client: Cognite client instance.
        data: Payload dict. Typical keys: instances (dict with items), shacl_rules or
            shacl_rules_file_external_id, datamodel_space, datamodel_external_id,
            datamodel_version, auto_load_depth, records_config (stream_id, rule_set_id, etc.).
        wait: If True, block until the function returns.
        external_id: Function external ID (default: data-quality-validation).

    Returns:
        Response dict with conforms, violations, instances_validated, etc.
    """
    return call_validation(client, "instance", data, wait=wait, external_id=external_id)

call_validate_instances_shacl_partitioned(client, data, *, wait=True, external_id='data-quality-validation')

Call the partitioned instance SHACL validation function (worker).

Convenience wrapper for call_validation(validation_type="partitioned", ...).

Parameters:

Name Type Description Default
client CogniteClient

Cognite client instance.

required
data dict

Payload dict. Same shape as validate-instances-shacl; instances are typically provided by the orchestrator.

required
wait bool

If True, block until the function returns.

True
external_id str

Function external ID (default: data-quality-validation).

'data-quality-validation'

Returns:

Type Description
dict

Response dict with status, conforms, violations, instances_validated, etc.

Source code in cognite_data_quality/invoke.py
def call_validate_instances_shacl_partitioned(
    client: CogniteClient,
    data: dict,
    *,
    wait: bool = True,
    external_id: str = "data-quality-validation",
) -> dict:
    """Call the partitioned instance SHACL validation function (worker).

    Convenience wrapper for call_validation(validation_type="partitioned", ...).

    Args:
        client: Cognite client instance.
        data: Payload dict. Same shape as validate-instances-shacl; instances are
            typically provided by the orchestrator.
        wait: If True, block until the function returns.
        external_id: Function external ID (default: data-quality-validation).

    Returns:
        Response dict with status, conforms, violations, instances_validated, etc.
    """
    return call_validation(client, "partitioned", data, wait=wait, external_id=external_id)

call_validate_shacl(client, data, *, wait=True, external_id='data-quality-validation')

Call the SHACL-centric validation entrypoint with optional global-rule extensions.

Convenience wrapper for call_validation(validation_type="shacl", ...).

Source code in cognite_data_quality/invoke.py
def call_validate_shacl(
    client: CogniteClient,
    data: dict,
    *,
    wait: bool = True,
    external_id: str = "data-quality-validation",
) -> dict:
    """Call the SHACL-centric validation entrypoint with optional global-rule extensions.

    Convenience wrapper for call_validation(validation_type="shacl", ...).
    """
    return call_validation(client, "shacl", data, wait=wait, external_id=external_id)

call_validate_timeseries_shacl(client, data, *, wait=True, external_id='data-quality-validation')

Call the time series SHACL validation function.

Convenience wrapper for call_validation(validation_type="timeseries", ...).

Parameters:

Name Type Description Default
client CogniteClient

Cognite client instance.

required
data dict

Payload dict. Typical keys: shacl_rules_file_external_id, datamodel_space, datamodel_external_id, datamodel_version, filter or instance_ids, records_config.

required
wait bool

If True, block until the function returns.

True
external_id str

Function external ID (default: data-quality-validation).

'data-quality-validation'

Returns:

Type Description
dict

Response dict with validation results.

Source code in cognite_data_quality/invoke.py
def call_validate_timeseries_shacl(
    client: CogniteClient,
    data: dict,
    *,
    wait: bool = True,
    external_id: str = "data-quality-validation",
) -> dict:
    """Call the time series SHACL validation function.

    Convenience wrapper for call_validation(validation_type="timeseries", ...).

    Args:
        client: Cognite client instance.
        data: Payload dict. Typical keys: shacl_rules_file_external_id, datamodel_space,
            datamodel_external_id, datamodel_version, filter or instance_ids,
            records_config.
        wait: If True, block until the function returns.
        external_id: Function external ID (default: data-quality-validation).

    Returns:
        Response dict with validation results.
    """
    return call_validation(client, "timeseries", data, wait=wait, external_id=external_id)

call_validation(client, validation_type, data, *, wait=True, external_id='data-quality-validation')

Call the unified SHACL validation function.

The unified function consolidates all validation types into a single deployment. Use this function for new code. Legacy individual functions are still supported.

Parameters:

Name Type Description Default
client CogniteClient

Cognite client instance.

required
validation_type Literal['instance', 'partitioned', 'timeseries', 'orchestrator', 'test', 'aggregate_uniqueness', 'rule_engine_result_sync', 'shacl']

Type of validation to perform: - "instance": Basic instance validation (real-time) - "partitioned": Partitioned historic instance validation - "timeseries": Time series validation (normal/backfill) - "orchestrator": Historic validation orchestrator - "test": Test SHACL rules without writing to Records API - "aggregate_uniqueness": Aggregate-based uniqueness validation - "rule_engine_result_sync": Incremental RuleEngineResult listener - "shacl": SHACL-centric entrypoint with optional global-rule extensions

required
data dict

Payload dict. Contents depend on validation_type.

required
wait bool

If True, block until the function returns.

True
external_id str

Function external ID (default: data-quality-validation).

'data-quality-validation'

Returns:

Type Description
dict

Response dict from the appropriate handler.

Example

Instance validation

result = call_validation( client=client, validation_type="instance", data={ "instances": {"items": [...]}, "shacl_rules_file_external_id": "my_rules", "datamodel_space": "my_space", "datamodel_external_id": "MyModel", "datamodel_version": "v1", "records_config": {...} } )

Orchestrator

result = call_validation( client=client, validation_type="orchestrator", data={ "datamodel_space": "my_space", "view_external_id": "MyView", "instance_space": "my_instances", ... } )

Source code in cognite_data_quality/invoke.py
def call_validation(
    client: CogniteClient,
    validation_type: Literal[
        "instance",
        "partitioned",
        "timeseries",
        "orchestrator",
        "test",
        "aggregate_uniqueness",
        "rule_engine_result_sync",
        "shacl",
    ],
    data: dict,
    *,
    wait: bool = True,
    external_id: str = "data-quality-validation",
) -> dict:
    """Call the unified SHACL validation function.

    The unified function consolidates all validation types into a single deployment.
    Use this function for new code. Legacy individual functions are still supported.

    Args:
        client: Cognite client instance.
        validation_type: Type of validation to perform:
            - "instance": Basic instance validation (real-time)
            - "partitioned": Partitioned historic instance validation
            - "timeseries": Time series validation (normal/backfill)
            - "orchestrator": Historic validation orchestrator
            - "test": Test SHACL rules without writing to Records API
            - "aggregate_uniqueness": Aggregate-based uniqueness validation
            - "rule_engine_result_sync": Incremental RuleEngineResult listener
            - "shacl": SHACL-centric entrypoint with optional global-rule extensions
        data: Payload dict. Contents depend on validation_type.
        wait: If True, block until the function returns.
        external_id: Function external ID (default: data-quality-validation).

    Returns:
        Response dict from the appropriate handler.

    Example:
        # Instance validation
        result = call_validation(
            client=client,
            validation_type="instance",
            data={
                "instances": {"items": [...]},
                "shacl_rules_file_external_id": "my_rules",
                "datamodel_space": "my_space",
                "datamodel_external_id": "MyModel",
                "datamodel_version": "v1",
                "records_config": {...}
            }
        )

        # Orchestrator
        result = call_validation(
            client=client,
            validation_type="orchestrator",
            data={
                "datamodel_space": "my_space",
                "view_external_id": "MyView",
                "instance_space": "my_instances",
                ...
            }
        )
    """
    payload = {"validation_type": validation_type, **data}
    call = client.functions.call(external_id=external_id, data=payload, wait=wait)
    return call.get_response() or {}

deploy_validation_pipeline(client, data=None, *, settings_path=None, views_dir=None, view_external_id=None, view_config_external_id=None, settings_external_id='data-quality-validation_settings', data_product_external_id=None, data_product_version=None, data_quality_space='dataQuality', wait=True, external_id='data-quality-validation')

Deploy and run a full validation pipeline with batch and incremental validation.

This function: 1. Triggers partitioned batch load of all instances (splits data into partitions) 2. Sets up sync trigger for ongoing incremental validation 3. Creates scheduled workflow for monitoring progress 4. Continuously manages orchestration (retries, cleanup)

Convenience wrapper for call_validation(validation_type="orchestrator", ...).

Three usage modes:

Mode 1: Pass data dict directly (legacy): deploy_validation_pipeline(client, data={...}, wait=True)

Mode 2: Load from local YAML configs: deploy_validation_pipeline( client, settings_path="config/environments/cog-sail/settings.yaml", view_external_id="SmallBoat", wait=True )

Mode 3: Load everything from CDF Files (no local files required): deploy_validation_pipeline( client, view_config_external_id="pump_view_config", )

Mode 4: DataProduct mode — triggers historic orchestrator per view: deploy_validation_pipeline( client, data_product_external_id="dp_rmdm_dm", view_external_id="Pump", # omit to run all views data_quality_space="sp_data_quality_demo", ) Loads view configs from the DataProduct API and calls the orchestrator for each matching view. The orchestrator runs the historic batch validation and sets up the per-view sync trigger on completion. Infrastructure (data_product_sync schedule, workflow definitions) must be deployed first via deploy_validation_infrastructure().

Parameters:

Name Type Description Default
client CogniteClient

Cognite client instance.

required
data dict | None

(Legacy) Payload dict. Typical keys: datamodel_space, datamodel_external_id, datamodel_version, view_space, view_external_id, view_version, instance_space, shacl_file_external_id, partition_count, partition_field, records_space, records_container, stream_id, rule_set_id, rule_set_version. If None, must provide settings_path or view_config_external_id.

None
settings_path str | None

Path to a local settings.yaml file. Optional when using view_config_external_id — settings are then fetched from CDF Files instead.

None
views_dir str | None

Directory containing view YAML configs. Defaults to settings_path parent / "views". Only used in mode 2.

None
view_external_id str | list[str] | None

Specific view to deploy from local files (mode 2).

None
view_config_external_id str | None

External ID of a view config stored in CDF Files (mode 3). Uploaded during deployment via deploy_instance_workflows(). External ID scheme: {view_external_id}_view_config e.g. Pump_view_config.

None
settings_external_id str

External ID of the settings file stored in CDF Files. Defaults to data-quality-validation_settings (the ID used by deploy_instance_workflows). Only used in mode 3 when settings_path is not provided.

'data-quality-validation_settings'
data_quality_space str

DMS space where the DataQualitySettings node lives. Must match config_space (or records.space when config_space is unset) in settings.yaml. Default: "dataQuality". Only used in mode 4.

'dataQuality'
wait bool

If True, block until the function returns.

True
external_id str

Function external ID (default: data-quality-validation). Only used in legacy mode (when data dict is provided directly).

'data-quality-validation'

Returns:

Type Description
dict

Response dict from the function.

Example

Mode 4: DataProduct mode — trigger historic validation for one view

result = deploy_validation_pipeline( client, data_product_external_id="dp_rmdm_dm", view_external_id="Pump", # omit for all views data_quality_space="sp_data_quality_demo", )

Mode 3: fully from CDF — no local files needed

result = deploy_validation_pipeline( client, view_config_external_id="pump_view_config", )

Mode 2: local YAML files

result = deploy_validation_pipeline( client, settings_path="config/environments/cog-sail/settings.yaml", view_external_id="SmallBoat", )

Mode 1: legacy data dict

result = deploy_validation_pipeline( client, data={"datamodel_space": "sailboat", ...} )

Source code in cognite_data_quality/invoke.py
def deploy_validation_pipeline(
    client: CogniteClient,
    data: dict | None = None,
    *,
    settings_path: str | None = None,
    views_dir: str | None = None,
    view_external_id: str | list[str] | None = None,
    view_config_external_id: str | None = None,
    settings_external_id: str = "data-quality-validation_settings",
    data_product_external_id: str | None = None,
    data_product_version: str | None = None,
    data_quality_space: str = "dataQuality",
    wait: bool = True,
    external_id: str = "data-quality-validation",
) -> dict:
    """Deploy and run a full validation pipeline with batch and incremental validation.

    This function:
    1. Triggers partitioned batch load of all instances (splits data into partitions)
    2. Sets up sync trigger for ongoing incremental validation
    3. Creates scheduled workflow for monitoring progress
    4. Continuously manages orchestration (retries, cleanup)

    Convenience wrapper for call_validation(validation_type="orchestrator", ...).

    Three usage modes:

    **Mode 1: Pass data dict directly (legacy):**
        deploy_validation_pipeline(client, data={...}, wait=True)

    **Mode 2: Load from local YAML configs:**
        deploy_validation_pipeline(
            client,
            settings_path="config/environments/cog-sail/settings.yaml",
            view_external_id="SmallBoat",
            wait=True
        )

    **Mode 3: Load everything from CDF Files (no local files required):**
        deploy_validation_pipeline(
            client,
            view_config_external_id="pump_view_config",
        )

    **Mode 4: DataProduct mode — triggers historic orchestrator per view:**
        deploy_validation_pipeline(
            client,
            data_product_external_id="dp_rmdm_dm",
            view_external_id="Pump",          # omit to run all views
            data_quality_space="sp_data_quality_demo",
        )
        Loads view configs from the DataProduct API and calls the orchestrator for
        each matching view. The orchestrator runs the historic batch validation and
        sets up the per-view sync trigger on completion.
        Infrastructure (``data_product_sync`` schedule, workflow definitions) must be
        deployed first via ``deploy_validation_infrastructure()``.

    Args:
        client: Cognite client instance.
        data: (Legacy) Payload dict. Typical keys: datamodel_space, datamodel_external_id,
            datamodel_version, view_space, view_external_id, view_version,
            instance_space, shacl_file_external_id, partition_count, partition_field,
            records_space, records_container, stream_id, rule_set_id, rule_set_version.
            If None, must provide settings_path or view_config_external_id.
        settings_path: Path to a local settings.yaml file. Optional when using
            view_config_external_id — settings are then fetched from CDF Files instead.
        views_dir: Directory containing view YAML configs. Defaults to settings_path parent / "views".
            Only used in mode 2.
        view_external_id: Specific view to deploy from local files (mode 2).
        view_config_external_id: External ID of a view config stored in CDF Files (mode 3).
            Uploaded during deployment via deploy_instance_workflows().
            External ID scheme: ``{view_external_id}_view_config`` e.g. ``Pump_view_config``.
        settings_external_id: External ID of the settings file stored in CDF Files.
            Defaults to ``data-quality-validation_settings`` (the ID used by deploy_instance_workflows).
            Only used in mode 3 when settings_path is not provided.
        data_quality_space: DMS space where the ``DataQualitySettings`` node lives.
            Must match ``config_space`` (or ``records.space`` when ``config_space`` is unset)
            in ``settings.yaml``. Default: ``"dataQuality"``. Only used in mode 4.
        wait: If True, block until the function returns.
        external_id: Function external ID (default: data-quality-validation).
            Only used in legacy mode (when ``data`` dict is provided directly).

    Returns:
        Response dict from the function.

    Example:
        # Mode 4: DataProduct mode — trigger historic validation for one view
        result = deploy_validation_pipeline(
            client,
            data_product_external_id="dp_rmdm_dm",
            view_external_id="Pump",           # omit for all views
            data_quality_space="sp_data_quality_demo",
        )

        # Mode 3: fully from CDF — no local files needed
        result = deploy_validation_pipeline(
            client,
            view_config_external_id="pump_view_config",
        )

        # Mode 2: local YAML files
        result = deploy_validation_pipeline(
            client,
            settings_path="config/environments/cog-sail/settings.yaml",
            view_external_id="SmallBoat",
        )

        # Mode 1: legacy data dict
        result = deploy_validation_pipeline(
            client,
            data={"datamodel_space": "sailboat", ...}
        )
    """
    # Mode 1: data dict provided — legacy mode
    if data is not None:
        return call_validation(client, "orchestrator", data, wait=wait, external_id=external_id)

    from pathlib import Path

    import yaml

    from cognite_data_quality.deploy import DeploymentSettings, ViewConfig, load_settings

    # Mode 4: DataProduct + DMS — load view configs from DataProduct API, trigger orchestrator
    if data_product_external_id is not None:
        from cognite_data_quality._containers import (
            _load_settings_from_dms,
            build_deployment_settings_from_props,
        )
        from cognite_data_quality._dataproduct_loader import load_views_from_data_product, pick_latest_released_version

        dms_props = _load_settings_from_dms(client, space=data_quality_space)
        settings_obj = build_deployment_settings_from_props(dms_props)

        resolved_version = data_product_version or pick_latest_released_version(client, data_product_external_id)
        if resolved_version is None:
            raise ValueError(f"No published version found for DataProduct '{data_product_external_id}'")

        view_configs = load_views_from_data_product(client, data_product_external_id, resolved_version)

        view_configs = [vc for vc in view_configs if vc.shacl_rules is not None]
        if not view_configs:
            raise ValueError(f"No views with SHACL rules found in {data_product_external_id}@{resolved_version}")

        if view_external_id is not None:
            requested = {view_external_id} if isinstance(view_external_id, str) else set(view_external_id)
            view_configs = [vc for vc in view_configs if vc.view.external_id in requested]
            if not view_configs:
                raise ValueError(
                    f"No views matching {requested} found in {data_product_external_id}@{resolved_version}"
                )

        results = []
        for vc in view_configs:
            orchestrator_data = _build_orchestrator_data(settings_obj, vc)
            print(f"Triggering {vc.view.external_id}...")
            payload = {"validation_type": "orchestrator", **orchestrator_data}
            fn_call = client.functions.call(
                external_id=settings_obj.function.external_id,
                data=payload,
                wait=False,
            )
            call_id = fn_call.id
            results.append({"view": vc.view.external_id, "call_id": call_id, "status": "Running"})
            print(f"  Triggered {vc.view.external_id}: call_id={call_id}")
        return results

    # Mode 3: fetch both view config and settings from CDF Files — no local files needed
    if view_config_external_id is not None:
        settings_obj = (
            load_settings(Path(settings_path))
            if settings_path is not None
            else DeploymentSettings(
                **yaml.safe_load(client.files.download_bytes(external_id=settings_external_id).decode("utf-8")) or {}
            )
        )
        raw_view = client.files.download_bytes(external_id=view_config_external_id)
        view_config = ViewConfig(**yaml.safe_load(raw_view.decode("utf-8")) or {})
        data = _build_orchestrator_data(settings_obj, view_config)
        return call_validation(client, "orchestrator", data, wait=wait, external_id=settings_obj.function.external_id)

    # Mode 2: scan local YAML files
    if settings_path is None:
        raise ValueError("Must provide either 'data' dict, 'view_config_external_id', or 'settings_path'")

    if view_external_id is None:
        raise ValueError("Must provide 'view_external_id' or 'view_config_external_id' when using settings_path mode")

    settings_obj = load_settings(Path(settings_path))
    base_dir = Path(settings_path).parent
    if views_dir is None:
        views_dir_obj = base_dir / "views"
    else:
        views_dir_obj = Path(views_dir) if not isinstance(views_dir, Path) else views_dir

    view_config = None
    for config_file in views_dir_obj.glob("*.yaml"):
        config_data = yaml.safe_load(config_file.read_text(encoding="utf-8")) or {}
        config_data["_filename"] = config_file.name
        candidate = ViewConfig(**config_data)
        if candidate.view.external_id == view_external_id:
            view_config = candidate
            break

    if view_config is None:
        raise ValueError(
            f"View config for '{view_external_id}' not found in {views_dir_obj}. "
            f"Available views: {[f.stem for f in views_dir_obj.glob('*.yaml')]}"
        )

    data = _build_orchestrator_data(settings_obj, view_config)
    return call_validation(client, "orchestrator", data, wait=wait, external_id=settings_obj.function.external_id)