Uniqueness (User Guide)
What this is
Use uniqueness to detect duplicate values for one indexed property across a target class.
When to use it
Use uniqueness when a property must be globally unique across all instances (for example order number, tag, or external business key).
Use standard SHACL constraints when you only need per-instance checks.
User mental model
- Input: SHACL declares a uniqueness property.
- Execution: scheduled aggregate scan finds duplicate value groups.
- Output: per-instance failures and/or grouped overflow/pass summaries.
Runtime behavior
How uniqueness works
- Uniqueness is declared in SHACL using
dqs:uniquenessConstraint(ordqs:unique). - Deploy creates a dedicated scheduled workflow:
dq-{view}-uniqueness. - Runtime executes aggregate-first duplicate detection, then expands duplicates to instance-level results.
- Results are written to
DataQualityValidationRecord. - Duplicate failure writes are deduplicated: if a newer/equal prior failure already exists for the same
focusNode, a new failure record is not written until the instance changes (lastUpdatedTime).
Hard requirement: property must be indexed
Uniqueness constraints are only supported on indexed properties in Data Modeling.
Reference: CDF Data Modeling - Indexes
What to expect in each run
Uniqueness emits typed records so apps and users can distinguish normal findings from overflow conditions.
From a user perspective:
- Normal duplicate findings are emitted as
validationType = "single_violation"when runtime expands duplicate groups to instance-level records. - Single-violation expansion is bounded by a run-level guardrail: up to
100duplicate-value groups are expanded per run. - Grouped summary records are emitted as
validationType = "group_violation"when guardrails or pass conditions apply: groupViolationType = "global_overflow"when more than100different values have duplicates in the same run.groupViolationType = "value_overflow"when one duplicate value has more than100violating instances.groupViolationType = "pass"when no duplicates are found.- If you receive an overflow group record, treat it as "more duplicates remain" and rerun uniqueness after cleanup.
Minimal happy path
SHACL declaration (minimal)
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix dqs: <http://purl.org/cognite/dqs#> .
@prefix sp: <http://purl.org/cognite/my_space/WorkOrder/> .
sp:WorkOrderUniqueness
a sh:NodeShape ;
sh:targetClass sp:WorkOrder ;
dqs:uniquenessConstraint [
dqs:property "workOrderNumber" ;
] .
Rules:
dqs:propertyis required.- The property must be indexed.
- Grouped
nullvalues are ignored by the uniqueness executor.
Scheduling
Uniqueness runs on a dedicated schedule, not on every sync run.
- Default schedule:
0 6 * * *(unless overridden) - Override in view YAML:
uniqueness_cron: "0 */4 * * *" # every 4 hours
# uniqueness_cron: null # disable uniqueness schedule
Output behavior
For a duplicate value with count N, runtime creates N failing instance-level results (one per focus node), unless suppressed by dedupe.
Dedupe rule for failures:
- If latest existing failed record for
(ruleSetId, ruleSetVersion, focusNode)hasrecord.lastUpdatedTime >= instance.lastUpdatedTime, a new failure record is suppressed. - If the instance changes later, a new failure record is written again.
This prevents repeated failure spam for unchanged data.
How users detect "more duplicates remain"
From validation records:
- Prefer typed signals over free-text parsing:
groupViolationType = "global_overflow"means run-level overflow.groupViolationType = "value_overflow"means per-value overflow.resultMessagecan still include human-readable overflow details, but automation should rely on typed fields.- Fix known duplicates and rerun uniqueness until no overflow records appear and no new violations are reported.
Grouped uniqueness records (heuristic)
Grouped records are emitted by specific runtime guardrail conditions:
validationType = "group_violation"+groupViolationType = "global_overflow"- Trigger: more than
100different values have duplicates in the same run. - Meaning: runtime emits a run-level overflow summary instead of expanding all duplicate-value groups.
validationType = "group_violation"+groupViolationType = "value_overflow"- Trigger: a specific duplicate value has more than
100violating instances. - Meaning: runtime emits a per-value overflow summary for that value instead of writing all violating focus nodes.
validationType = "group_violation"+groupViolationType = "pass"- Trigger: no duplicate-value buckets are found (
count <= 1across returned buckets). - Meaning: grouped pass summary that confirms no duplicates were detected in that run.
These are grouped summaries and may omit focusNode / focusNodeInstance.
Use groupViolationType as the primary machine-readable signal for investigation and rerun workflows.
Best practices
- Keep uniqueness semantics in SHACL/RuleSet, not in ad-hoc scripts.
- Build automations around
validationTypeandgroupViolationType, not free-text messages. - Treat overflow records as actionable backlog signals and rerun after cleanup.
Troubleshooting
- Confirm SHACL contains
dqs:uniquenessConstraintfor the expected target class/property. - Confirm target property is indexed.
- Confirm uniqueness workflow exists and has an active trigger.
- If
recordsPostedis low but duplicates still exist, checkrecordsSuppressed(dedupe may be working as intended). - If grouped records include
groupViolationType = "global_overflow"orgroupViolationType = "value_overflow", fix current duplicates and rerun to continue through backlog.
Pro tip: list all duplicates with a Python script
If you see grouped overflow records (groupViolationType = "global_overflow" or "value_overflow"), you can run a one-off script to inspect all current duplicate values directly.
What this script does:
- Reads Data Product metadata (data contract context).
- Runs one aggregate query grouped by the uniqueness property.
- Keeps only values with count > 1 (duplicate values).
- For each duplicate value, runs
instances.listto fetch impacted instances. - Prints duplicate groups ordered by duplicate count (largest first).
from __future__ import annotations
import tomllib
from pathlib import Path
from cognite.client import ClientConfig, CogniteClient
from cognite.client import data_modeling as dm
from cognite.client.credentials import Token
from cognite.client.data_classes.data_modeling import ViewId
from cognite_data_quality._dataproduct_client import DataProductClient
from cognite_data_quality._function_code.handlers.data_product_sync import _pick_latest_released
def load_client_from_toml(toml_path: Path) -> CogniteClient:
content = tomllib.loads(toml_path.read_text(encoding="utf-8"))
cognite_cfg = content["cognite"]
return CogniteClient(
ClientConfig(
client_name="dq-uniqueness-pro-tip",
project=cognite_cfg["project"],
base_url=f"https://{cognite_cfg['cdf_cluster']}.cognitedata.com",
credentials=Token(cognite_cfg["bearer_token"]),
)
)
def main() -> None:
# --- Change these values for your setup ---
toml_path = Path("test_and_deploy/ai-demo-arn.toml")
data_product_id = "enterprise-process-industry"
view_space = "sp_enterprise_process_industry"
view_external_id = "YourOrgMaintenanceOrder"
view_version = "v1"
uniqueness_property = "name"
# Optional filter by instance spaces from your data contract view definition:
instance_spaces: list[str] = ["test_data_pumps"]
# ------------------------------------------
client = load_client_from_toml(toml_path)
# 1) Read latest released Data Product version (data contract context)
versions = DataProductClient.list_versions(client, data_product_id, limit=100)
latest = _pick_latest_released(versions)
if not latest:
raise RuntimeError(f"No released version found for Data Product '{data_product_id}'.")
print(f"Data Product: {data_product_id}@{latest['version']}")
view_id = ViewId(view_space, view_external_id, view_version)
aggregate_filter = (
{"in": {"property": ["node", "space"], "values": instance_spaces}}
if instance_spaces
else None
)
# 2) Aggregate duplicates by property value
buckets = list(
client.data_modeling.instances.aggregate(
view=view_id,
aggregates=[dm.aggregations.Count("externalId")],
group_by=[uniqueness_property],
instance_type="node",
filter=aggregate_filter,
limit=1000, # CDF API max per request
)
or []
)
# 3) Keep only duplicate values (count > 1)
duplicate_groups: list[tuple[str, int]] = []
for bucket in buckets:
value = (bucket.group or {}).get(uniqueness_property)
count_obj = (bucket.aggregates or [None])[0]
count = int(getattr(count_obj, "value", 0) or 0)
if value is not None and count > 1:
duplicate_groups.append((str(value), count))
# 4) Order groups by duplicate size (largest first)
duplicate_groups.sort(key=lambda item: item[1], reverse=True)
print(f"Duplicate value groups found: {len(duplicate_groups)}")
property_path = [view_space, f"{view_external_id}/{view_version}", uniqueness_property]
for duplicate_value, duplicate_count in duplicate_groups:
print("\n" + "=" * 100)
print(f"Value: {duplicate_value!r} | Count: {duplicate_count}")
value_filter: dict[str, object] = {
"equals": {"property": property_path, "value": duplicate_value}
}
detail_filter = {"and": [aggregate_filter, value_filter]} if aggregate_filter else value_filter
# 5) List impacted instances for this duplicate value
rows = list(
client.data_modeling.instances.list(
sources=[view_id],
instance_type="node",
filter=detail_filter,
limit=1000, # CDF API max per request
)
or []
)
for row in rows:
print(f"- {row.space}/{row.external_id} (last_updated_time={row.last_updated_time})")
if __name__ == "__main__":
main()
Use this as an investigation tool when users need full duplicate visibility before cleanup planning.
Note:
- The examples above use
limit=1000, which is the API maximum per request for these calls. - If a duplicate group contains more than 1000 instances, fetch additional pages (for example by partitioning filters) to retrieve the full set.