Skip to content

Uniqueness (User Guide)

What this is

Use uniqueness to detect duplicate values for one indexed property across a target class.

When to use it

Use uniqueness when a property must be globally unique across all instances (for example order number, tag, or external business key).

Use standard SHACL constraints when you only need per-instance checks.

User mental model

  • Input: SHACL declares a uniqueness property.
  • Execution: scheduled aggregate scan finds duplicate value groups.
  • Output: per-instance failures and/or grouped overflow/pass summaries.

Runtime behavior

How uniqueness works

  • Uniqueness is declared in SHACL using dqs:uniquenessConstraint (or dqs:unique).
  • Deploy creates a dedicated scheduled workflow: dq-{view}-uniqueness.
  • Runtime executes aggregate-first duplicate detection, then expands duplicates to instance-level results.
  • Results are written to DataQualityValidationRecord.
  • Duplicate failure writes are deduplicated: if a newer/equal prior failure already exists for the same focusNode, a new failure record is not written until the instance changes (lastUpdatedTime).

Hard requirement: property must be indexed

Uniqueness constraints are only supported on indexed properties in Data Modeling.

Reference: CDF Data Modeling - Indexes

What to expect in each run

Uniqueness emits typed records so apps and users can distinguish normal findings from overflow conditions.

From a user perspective:

  • Normal duplicate findings are emitted as validationType = "single_violation" when runtime expands duplicate groups to instance-level records.
  • Single-violation expansion is bounded by a run-level guardrail: up to 100 duplicate-value groups are expanded per run.
  • Grouped summary records are emitted as validationType = "group_violation" when guardrails or pass conditions apply:
  • groupViolationType = "global_overflow" when more than 100 different values have duplicates in the same run.
  • groupViolationType = "value_overflow" when one duplicate value has more than 100 violating instances.
  • groupViolationType = "pass" when no duplicates are found.
  • If you receive an overflow group record, treat it as "more duplicates remain" and rerun uniqueness after cleanup.

Minimal happy path

SHACL declaration (minimal)

@prefix sh:  <http://www.w3.org/ns/shacl#> .
@prefix dqs: <http://purl.org/cognite/dqs#> .
@prefix sp:  <http://purl.org/cognite/my_space/WorkOrder/> .

sp:WorkOrderUniqueness
    a sh:NodeShape ;
    sh:targetClass sp:WorkOrder ;
    dqs:uniquenessConstraint [
        dqs:property "workOrderNumber" ;
    ] .

Rules:

  • dqs:property is required.
  • The property must be indexed.
  • Grouped null values are ignored by the uniqueness executor.

Scheduling

Uniqueness runs on a dedicated schedule, not on every sync run.

  • Default schedule: 0 6 * * * (unless overridden)
  • Override in view YAML:
uniqueness_cron: "0 */4 * * *"   # every 4 hours
# uniqueness_cron: null           # disable uniqueness schedule

Output behavior

For a duplicate value with count N, runtime creates N failing instance-level results (one per focus node), unless suppressed by dedupe.

Dedupe rule for failures:

  • If latest existing failed record for (ruleSetId, ruleSetVersion, focusNode) has record.lastUpdatedTime >= instance.lastUpdatedTime, a new failure record is suppressed.
  • If the instance changes later, a new failure record is written again.

This prevents repeated failure spam for unchanged data.

How users detect "more duplicates remain"

From validation records:

  • Prefer typed signals over free-text parsing:
  • groupViolationType = "global_overflow" means run-level overflow.
  • groupViolationType = "value_overflow" means per-value overflow.
  • resultMessage can still include human-readable overflow details, but automation should rely on typed fields.
  • Fix known duplicates and rerun uniqueness until no overflow records appear and no new violations are reported.

Grouped uniqueness records (heuristic)

Grouped records are emitted by specific runtime guardrail conditions:

  • validationType = "group_violation" + groupViolationType = "global_overflow"
  • Trigger: more than 100 different values have duplicates in the same run.
  • Meaning: runtime emits a run-level overflow summary instead of expanding all duplicate-value groups.
  • validationType = "group_violation" + groupViolationType = "value_overflow"
  • Trigger: a specific duplicate value has more than 100 violating instances.
  • Meaning: runtime emits a per-value overflow summary for that value instead of writing all violating focus nodes.
  • validationType = "group_violation" + groupViolationType = "pass"
  • Trigger: no duplicate-value buckets are found (count <= 1 across returned buckets).
  • Meaning: grouped pass summary that confirms no duplicates were detected in that run.

These are grouped summaries and may omit focusNode / focusNodeInstance. Use groupViolationType as the primary machine-readable signal for investigation and rerun workflows.

Best practices

  • Keep uniqueness semantics in SHACL/RuleSet, not in ad-hoc scripts.
  • Build automations around validationType and groupViolationType, not free-text messages.
  • Treat overflow records as actionable backlog signals and rerun after cleanup.

Troubleshooting

  • Confirm SHACL contains dqs:uniquenessConstraint for the expected target class/property.
  • Confirm target property is indexed.
  • Confirm uniqueness workflow exists and has an active trigger.
  • If recordsPosted is low but duplicates still exist, check recordsSuppressed (dedupe may be working as intended).
  • If grouped records include groupViolationType = "global_overflow" or groupViolationType = "value_overflow", fix current duplicates and rerun to continue through backlog.

Pro tip: list all duplicates with a Python script

If you see grouped overflow records (groupViolationType = "global_overflow" or "value_overflow"), you can run a one-off script to inspect all current duplicate values directly.

What this script does:

  1. Reads Data Product metadata (data contract context).
  2. Runs one aggregate query grouped by the uniqueness property.
  3. Keeps only values with count > 1 (duplicate values).
  4. For each duplicate value, runs instances.list to fetch impacted instances.
  5. Prints duplicate groups ordered by duplicate count (largest first).
from __future__ import annotations

import tomllib
from pathlib import Path

from cognite.client import ClientConfig, CogniteClient
from cognite.client import data_modeling as dm
from cognite.client.credentials import Token
from cognite.client.data_classes.data_modeling import ViewId

from cognite_data_quality._dataproduct_client import DataProductClient
from cognite_data_quality._function_code.handlers.data_product_sync import _pick_latest_released


def load_client_from_toml(toml_path: Path) -> CogniteClient:
    content = tomllib.loads(toml_path.read_text(encoding="utf-8"))
    cognite_cfg = content["cognite"]
    return CogniteClient(
        ClientConfig(
            client_name="dq-uniqueness-pro-tip",
            project=cognite_cfg["project"],
            base_url=f"https://{cognite_cfg['cdf_cluster']}.cognitedata.com",
            credentials=Token(cognite_cfg["bearer_token"]),
        )
    )


def main() -> None:
    # --- Change these values for your setup ---
    toml_path = Path("test_and_deploy/ai-demo-arn.toml")
    data_product_id = "enterprise-process-industry"
    view_space = "sp_enterprise_process_industry"
    view_external_id = "YourOrgMaintenanceOrder"
    view_version = "v1"
    uniqueness_property = "name"
    # Optional filter by instance spaces from your data contract view definition:
    instance_spaces: list[str] = ["test_data_pumps"]
    # ------------------------------------------

    client = load_client_from_toml(toml_path)

    # 1) Read latest released Data Product version (data contract context)
    versions = DataProductClient.list_versions(client, data_product_id, limit=100)
    latest = _pick_latest_released(versions)
    if not latest:
        raise RuntimeError(f"No released version found for Data Product '{data_product_id}'.")
    print(f"Data Product: {data_product_id}@{latest['version']}")

    view_id = ViewId(view_space, view_external_id, view_version)
    aggregate_filter = (
        {"in": {"property": ["node", "space"], "values": instance_spaces}}
        if instance_spaces
        else None
    )

    # 2) Aggregate duplicates by property value
    buckets = list(
        client.data_modeling.instances.aggregate(
            view=view_id,
            aggregates=[dm.aggregations.Count("externalId")],
            group_by=[uniqueness_property],
            instance_type="node",
            filter=aggregate_filter,
            limit=1000,  # CDF API max per request
        )
        or []
    )

    # 3) Keep only duplicate values (count > 1)
    duplicate_groups: list[tuple[str, int]] = []
    for bucket in buckets:
        value = (bucket.group or {}).get(uniqueness_property)
        count_obj = (bucket.aggregates or [None])[0]
        count = int(getattr(count_obj, "value", 0) or 0)
        if value is not None and count > 1:
            duplicate_groups.append((str(value), count))

    # 4) Order groups by duplicate size (largest first)
    duplicate_groups.sort(key=lambda item: item[1], reverse=True)
    print(f"Duplicate value groups found: {len(duplicate_groups)}")

    property_path = [view_space, f"{view_external_id}/{view_version}", uniqueness_property]

    for duplicate_value, duplicate_count in duplicate_groups:
        print("\n" + "=" * 100)
        print(f"Value: {duplicate_value!r}  |  Count: {duplicate_count}")

        value_filter: dict[str, object] = {
            "equals": {"property": property_path, "value": duplicate_value}
        }
        detail_filter = {"and": [aggregate_filter, value_filter]} if aggregate_filter else value_filter

        # 5) List impacted instances for this duplicate value
        rows = list(
            client.data_modeling.instances.list(
                sources=[view_id],
                instance_type="node",
                filter=detail_filter,
                limit=1000,  # CDF API max per request
            )
            or []
        )

        for row in rows:
            print(f"- {row.space}/{row.external_id} (last_updated_time={row.last_updated_time})")


if __name__ == "__main__":
    main()

Use this as an investigation tool when users need full duplicate visibility before cleanup planning.

Note:

  • The examples above use limit=1000, which is the API maximum per request for these calls.
  • If a duplicate group contains more than 1000 instances, fetch additional pages (for example by partitioning filters) to retrieve the full set.

Previous section

Next section