Example: notebook or script
This page shows a minimal end-to-end example you can paste into a notebook cell or a Python script. It covers setup, running validation locally, and reading results.
The example follows the same model as Usage docs:
- Data Product + RuleSet is the primary rule persistence model.
- YAML is the CDF Toolkit representation of those bindings.
- TTL is supported as a legacy transition path.
1. Install and import
Restart the kernel after install so imports pick up the package.
import cognite_data_quality
from cognite_data_quality import (
load_cognite_client_from_toml,
run_validation,
DataModelConfig,
RecordsConfig,
)
2. Load credentials
client = load_cognite_client_from_toml("config.toml")
client.config.timeout = 300 # increase for large function deployments
Verify that function handler files are embedded in the installed package:
from cognite_data_quality.deploy import _get_embedded_function_files
embedded = _get_embedded_function_files()
print(f"Embedded function files: {len(embedded)}") # should be > 0
3. Run validation locally (YAML + RuleSet references)
Use run_validation() to validate instances directly from DMS. No Functions or Workflows are required.
result = run_validation(
client=client,
rules_path="views/my_view.yaml",
rules_format="yaml",
limit=10,
print_output=True,
post_to_records=False,
)
print("Conforms:", result.conforms)
print("Instances validated:", result.instance_count)
4. Inspect results
violations = [v for v in result.violations if (v.resultSeverity or "").endswith("Violation")]
warnings = [v for v in result.violations if (v.resultSeverity or "").endswith("Warning")]
print(f"Violations: {len(violations)}, Warnings: {len(warnings)}")
for v in violations[:5]:
print(v.resultMessage, "—", v.focusNode)
5. Post results to Records (optional)
Set post_to_records=True and provide a RecordsConfig:
result = run_validation(
client=client,
rules_path="views/my_view.yaml",
rules_format="yaml",
post_to_records=True,
records_config=RecordsConfig(
stream_id="dq_validation_stream",
rule_set_id="MyViewSHACLv1",
rule_set_version="1.0",
),
limit=10,
)
6. Query Records API
After posting results, query them back:
client.config.headers["cdf-version"] = "alpha"
stream_id = "dq_validation_stream"
space = "dataQuality"
container = "DataQualityValidationRecord"
response = client.post(
f"/api/v1/projects/{client.config.project}/streams/{stream_id}/records/filter",
json={
"filter": {
"equals": {
"property": [space, container, "passedValidation"],
"value": False,
}
},
"limit": 100,
},
)
for record in response.json().get("items", []):
props = record.get("properties", {}).get(space, {}).get(container, {})
print(record["externalId"], "—", props.get("failedConstraints"))
Restore headers after Records calls
The cdf-version: alpha header can interfere with other CDF API calls. Restore client.config.headers when done, or use a context manager that scopes the header change.
Next steps
- Deploy: Deploy Functions, Workflows, and Triggers to CDF.
- Run validation: Full parameter reference for
run_validation(). - Records output: Filter, aggregate, and interpret Records API output.