Skip to content

invoke

cognite_data_quality.invoke

Invoke deployed Cognite Functions from Python.

Use these helpers to call the data quality validation functions from a notebook, script, or any Python runtime. Each function is a thin wrapper around client.functions.call().

call_validate_instances_shacl(client, data, *, wait=True, external_id='data-quality-validation')

Call the instance SHACL validation function.

Convenience wrapper for call_validation(validation_type="instance", ...).

Parameters:

Name Type Description Default
client CogniteClient

Cognite client instance.

required
data dict

Payload dict. Typical keys: instances (dict with items), shacl_rules or shacl_rules_file_external_id, datamodel_space, datamodel_external_id, datamodel_version, auto_load_depth, records_config (stream_id, rule_set_id, etc.).

required
wait bool

If True, block until the function returns.

True
external_id str

Function external ID (default: data-quality-validation).

'data-quality-validation'

Returns:

Type Description
dict

Response dict with conforms, violations, instances_validated, etc.

Source code in cognite_data_quality/invoke.py
def call_validate_instances_shacl(
    client: CogniteClient,
    data: dict,
    *,
    wait: bool = True,
    external_id: str = "data-quality-validation",
) -> dict:
    """Call the instance SHACL validation function.

    Convenience wrapper for call_validation(validation_type="instance", ...).

    Args:
        client: Cognite client instance.
        data: Payload dict. Typical keys: instances (dict with items), shacl_rules or
            shacl_rules_file_external_id, datamodel_space, datamodel_external_id,
            datamodel_version, auto_load_depth, records_config (stream_id, rule_set_id, etc.).
        wait: If True, block until the function returns.
        external_id: Function external ID (default: data-quality-validation).

    Returns:
        Response dict with conforms, violations, instances_validated, etc.
    """
    return call_validation(client, "instance", data, wait=wait, external_id=external_id)

call_validate_instances_shacl_partitioned(client, data, *, wait=True, external_id='data-quality-validation')

Call the partitioned instance SHACL validation function (worker).

Convenience wrapper for call_validation(validation_type="partitioned", ...).

Parameters:

Name Type Description Default
client CogniteClient

Cognite client instance.

required
data dict

Payload dict. Same shape as validate-instances-shacl; instances are typically provided by the orchestrator.

required
wait bool

If True, block until the function returns.

True
external_id str

Function external ID (default: data-quality-validation).

'data-quality-validation'

Returns:

Type Description
dict

Response dict with status, conforms, violations, instances_validated, etc.

Source code in cognite_data_quality/invoke.py
def call_validate_instances_shacl_partitioned(
    client: CogniteClient,
    data: dict,
    *,
    wait: bool = True,
    external_id: str = "data-quality-validation",
) -> dict:
    """Call the partitioned instance SHACL validation function (worker).

    Convenience wrapper for call_validation(validation_type="partitioned", ...).

    Args:
        client: Cognite client instance.
        data: Payload dict. Same shape as validate-instances-shacl; instances are
            typically provided by the orchestrator.
        wait: If True, block until the function returns.
        external_id: Function external ID (default: data-quality-validation).

    Returns:
        Response dict with status, conforms, violations, instances_validated, etc.
    """
    return call_validation(client, "partitioned", data, wait=wait, external_id=external_id)

call_validate_timeseries_shacl(client, data, *, wait=True, external_id='data-quality-validation')

Call the time series SHACL validation function.

Convenience wrapper for call_validation(validation_type="timeseries", ...).

Parameters:

Name Type Description Default
client CogniteClient

Cognite client instance.

required
data dict

Payload dict. Typical keys: shacl_rules_file_external_id, datamodel_space, datamodel_external_id, datamodel_version, filter or instance_ids, records_config.

required
wait bool

If True, block until the function returns.

True
external_id str

Function external ID (default: data-quality-validation).

'data-quality-validation'

Returns:

Type Description
dict

Response dict with validation results.

Source code in cognite_data_quality/invoke.py
def call_validate_timeseries_shacl(
    client: CogniteClient,
    data: dict,
    *,
    wait: bool = True,
    external_id: str = "data-quality-validation",
) -> dict:
    """Call the time series SHACL validation function.

    Convenience wrapper for call_validation(validation_type="timeseries", ...).

    Args:
        client: Cognite client instance.
        data: Payload dict. Typical keys: shacl_rules_file_external_id, datamodel_space,
            datamodel_external_id, datamodel_version, filter or instance_ids,
            records_config.
        wait: If True, block until the function returns.
        external_id: Function external ID (default: data-quality-validation).

    Returns:
        Response dict with validation results.
    """
    return call_validation(client, "timeseries", data, wait=wait, external_id=external_id)

call_validation(client, validation_type, data, *, wait=True, external_id='data-quality-validation')

Call the unified SHACL validation function.

The unified function consolidates all validation types into a single deployment. Use this function for new code. Legacy individual functions are still supported.

Parameters:

Name Type Description Default
client CogniteClient

Cognite client instance.

required
validation_type Literal['instance', 'partitioned', 'timeseries', 'orchestrator', 'test']

Type of validation to perform: - "instance": Basic instance validation (real-time) - "partitioned": Partitioned historic instance validation - "timeseries": Time series validation (normal/backfill) - "orchestrator": Historic validation orchestrator - "test": Test SHACL rules without writing to Records API

required
data dict

Payload dict. Contents depend on validation_type.

required
wait bool

If True, block until the function returns.

True
external_id str

Function external ID (default: data-quality-validation).

'data-quality-validation'

Returns:

Type Description
dict

Response dict from the appropriate handler.

Example

Instance validation

result = call_validation( client=client, validation_type="instance", data={ "instances": {"items": [...]}, "shacl_rules_file_external_id": "my_rules", "datamodel_space": "my_space", "datamodel_external_id": "MyModel", "datamodel_version": "v1", "records_config": {...} } )

Orchestrator

result = call_validation( client=client, validation_type="orchestrator", data={ "datamodel_space": "my_space", "view_external_id": "MyView", "instance_space": "my_instances", ... } )

Source code in cognite_data_quality/invoke.py
def call_validation(
    client: CogniteClient,
    validation_type: Literal["instance", "partitioned", "timeseries", "orchestrator", "test"],
    data: dict,
    *,
    wait: bool = True,
    external_id: str = "data-quality-validation",
) -> dict:
    """Call the unified SHACL validation function.

    The unified function consolidates all validation types into a single deployment.
    Use this function for new code. Legacy individual functions are still supported.

    Args:
        client: Cognite client instance.
        validation_type: Type of validation to perform:
            - "instance": Basic instance validation (real-time)
            - "partitioned": Partitioned historic instance validation
            - "timeseries": Time series validation (normal/backfill)
            - "orchestrator": Historic validation orchestrator
            - "test": Test SHACL rules without writing to Records API
        data: Payload dict. Contents depend on validation_type.
        wait: If True, block until the function returns.
        external_id: Function external ID (default: data-quality-validation).

    Returns:
        Response dict from the appropriate handler.

    Example:
        # Instance validation
        result = call_validation(
            client=client,
            validation_type="instance",
            data={
                "instances": {"items": [...]},
                "shacl_rules_file_external_id": "my_rules",
                "datamodel_space": "my_space",
                "datamodel_external_id": "MyModel",
                "datamodel_version": "v1",
                "records_config": {...}
            }
        )

        # Orchestrator
        result = call_validation(
            client=client,
            validation_type="orchestrator",
            data={
                "datamodel_space": "my_space",
                "view_external_id": "MyView",
                "instance_space": "my_instances",
                ...
            }
        )
    """
    payload = {"validation_type": validation_type, **data}
    call = client.functions.call(external_id=external_id, data=payload, wait=wait)
    return call.get_response() or {}

deploy_validation_pipeline(client, data=None, *, settings_path=None, views_dir=None, view_external_id=None, view_config_external_id=None, settings_external_id='data-quality-validation_settings', wait=True, external_id='data-quality-validation')

Deploy and run a full validation pipeline with batch and incremental validation.

This function: 1. Triggers partitioned batch load of all instances (splits data into partitions) 2. Sets up sync trigger for ongoing incremental validation 3. Creates scheduled workflow for monitoring progress 4. Continuously manages orchestration (retries, cleanup)

Convenience wrapper for call_validation(validation_type="orchestrator", ...).

Three usage modes:

Mode 1: Pass data dict directly (legacy): deploy_validation_pipeline(client, data={...}, wait=True)

Mode 2: Load from local YAML configs: deploy_validation_pipeline( client, settings_path="config/environments/cog-sail/settings.yaml", view_external_id="SmallBoat", wait=True )

Mode 3: Load everything from CDF Files (no local files required): deploy_validation_pipeline( client, view_config_external_id="pump_view_config", )

Parameters:

Name Type Description Default
client CogniteClient

Cognite client instance.

required
data dict | None

(Legacy) Payload dict. Typical keys: datamodel_space, datamodel_external_id, datamodel_version, view_space, view_external_id, view_version, instance_space, shacl_file_external_id, partition_count, partition_field, records_space, records_container, stream_id, rule_set_id, rule_set_version. If None, must provide settings_path or view_config_external_id.

None
settings_path str | None

Path to a local settings.yaml file. Optional when using view_config_external_id — settings are then fetched from CDF Files instead.

None
views_dir str | None

Directory containing view YAML configs. Defaults to settings_path parent / "views". Only used in mode 2.

None
view_external_id str | None

Specific view to deploy from local files (mode 2).

None
view_config_external_id str | None

External ID of a view config stored in CDF Files (mode 3). Uploaded during deployment via deploy_instance_workflows(). External ID scheme: {view_external_id}_view_config e.g. Pump_view_config.

None
settings_external_id str

External ID of the settings file stored in CDF Files. Defaults to data-quality-validation_settings (the ID used by deploy_instance_workflows). Only used in mode 3 when settings_path is not provided.

'data-quality-validation_settings'
wait bool

If True, block until the function returns.

True
external_id str

Function external ID (default: data-quality-validation).

'data-quality-validation'

Returns:

Type Description
dict

Response dict from the function.

Example

Mode 3: fully from CDF — no local files needed

result = deploy_validation_pipeline( client, view_config_external_id="pump_view_config", )

Mode 3: override settings external ID

result = deploy_validation_pipeline( client, view_config_external_id="pump_view_config", settings_external_id="my-function_settings", )

Mode 2: local YAML files

result = deploy_validation_pipeline( client, settings_path="config/environments/cog-sail/settings.yaml", view_external_id="SmallBoat", )

Mode 1: legacy data dict

result = deploy_validation_pipeline( client, data={"datamodel_space": "sailboat", ...} )

Source code in cognite_data_quality/invoke.py
def deploy_validation_pipeline(
    client: CogniteClient,
    data: dict | None = None,
    *,
    settings_path: str | None = None,
    views_dir: str | None = None,
    view_external_id: str | None = None,
    view_config_external_id: str | None = None,
    settings_external_id: str = "data-quality-validation_settings",
    wait: bool = True,
    external_id: str = "data-quality-validation",
) -> dict:
    """Deploy and run a full validation pipeline with batch and incremental validation.

    This function:
    1. Triggers partitioned batch load of all instances (splits data into partitions)
    2. Sets up sync trigger for ongoing incremental validation
    3. Creates scheduled workflow for monitoring progress
    4. Continuously manages orchestration (retries, cleanup)

    Convenience wrapper for call_validation(validation_type="orchestrator", ...).

    Three usage modes:

    **Mode 1: Pass data dict directly (legacy):**
        deploy_validation_pipeline(client, data={...}, wait=True)

    **Mode 2: Load from local YAML configs:**
        deploy_validation_pipeline(
            client,
            settings_path="config/environments/cog-sail/settings.yaml",
            view_external_id="SmallBoat",
            wait=True
        )

    **Mode 3: Load everything from CDF Files (no local files required):**
        deploy_validation_pipeline(
            client,
            view_config_external_id="pump_view_config",
        )

    Args:
        client: Cognite client instance.
        data: (Legacy) Payload dict. Typical keys: datamodel_space, datamodel_external_id,
            datamodel_version, view_space, view_external_id, view_version,
            instance_space, shacl_file_external_id, partition_count, partition_field,
            records_space, records_container, stream_id, rule_set_id, rule_set_version.
            If None, must provide settings_path or view_config_external_id.
        settings_path: Path to a local settings.yaml file. Optional when using
            view_config_external_id — settings are then fetched from CDF Files instead.
        views_dir: Directory containing view YAML configs. Defaults to settings_path parent / "views".
            Only used in mode 2.
        view_external_id: Specific view to deploy from local files (mode 2).
        view_config_external_id: External ID of a view config stored in CDF Files (mode 3).
            Uploaded during deployment via deploy_instance_workflows().
            External ID scheme: ``{view_external_id}_view_config`` e.g. ``Pump_view_config``.
        settings_external_id: External ID of the settings file stored in CDF Files.
            Defaults to ``data-quality-validation_settings`` (the ID used by deploy_instance_workflows).
            Only used in mode 3 when settings_path is not provided.
        wait: If True, block until the function returns.
        external_id: Function external ID (default: data-quality-validation).

    Returns:
        Response dict from the function.

    Example:
        # Mode 3: fully from CDF — no local files needed
        result = deploy_validation_pipeline(
            client,
            view_config_external_id="pump_view_config",
        )

        # Mode 3: override settings external ID
        result = deploy_validation_pipeline(
            client,
            view_config_external_id="pump_view_config",
            settings_external_id="my-function_settings",
        )

        # Mode 2: local YAML files
        result = deploy_validation_pipeline(
            client,
            settings_path="config/environments/cog-sail/settings.yaml",
            view_external_id="SmallBoat",
        )

        # Mode 1: legacy data dict
        result = deploy_validation_pipeline(
            client,
            data={"datamodel_space": "sailboat", ...}
        )
    """
    # Mode 1: data dict provided — legacy mode
    if data is not None:
        return call_validation(client, "orchestrator", data, wait=wait, external_id=external_id)

    from pathlib import Path

    import yaml

    from cognite_data_quality.deploy import DeploymentSettings, ViewConfig, load_settings

    # Mode 3: fetch both view config and settings from CDF Files — no local files needed
    if view_config_external_id is not None:
        settings_obj = (
            load_settings(Path(settings_path))
            if settings_path is not None
            else DeploymentSettings(
                **yaml.safe_load(client.files.download_bytes(external_id=settings_external_id).decode("utf-8")) or {}
            )
        )
        raw_view = client.files.download_bytes(external_id=view_config_external_id)
        view_config = ViewConfig(**yaml.safe_load(raw_view.decode("utf-8")) or {})
        data = _build_orchestrator_data(settings_obj, view_config)
        return call_validation(client, "orchestrator", data, wait=wait, external_id=settings_obj.function.external_id)

    # Mode 2: scan local YAML files
    if settings_path is None:
        raise ValueError("Must provide either 'data' dict, 'view_config_external_id', or 'settings_path'")

    if view_external_id is None:
        raise ValueError("Must provide 'view_external_id' or 'view_config_external_id' when using settings_path mode")

    settings_obj = load_settings(Path(settings_path))
    base_dir = Path(settings_path).parent
    if views_dir is None:
        views_dir_obj = base_dir / "views"
    else:
        views_dir_obj = Path(views_dir) if not isinstance(views_dir, Path) else views_dir

    view_config = None
    for config_file in views_dir_obj.glob("*.yaml"):
        config_data = yaml.safe_load(config_file.read_text(encoding="utf-8")) or {}
        config_data["_filename"] = config_file.name
        candidate = ViewConfig(**config_data)
        if candidate.view.external_id == view_external_id:
            view_config = candidate
            break

    if view_config is None:
        raise ValueError(
            f"View config for '{view_external_id}' not found in {views_dir_obj}. "
            f"Available views: {[f.stem for f in views_dir_obj.glob('*.yaml')]}"
        )

    data = _build_orchestrator_data(settings_obj, view_config)
    return call_validation(client, "orchestrator", data, wait=wait, external_id=settings_obj.function.external_id)