Chained Conditional Logic User Guide
What this is
This guide explains how to build cross-run chaining with conditional logic:
- Rule A writes
RuleEngineResultrecords. - A listener reads only new Rule A records.
- Rule B/C runs downstream and writes new
RuleEngineResultrecords.
This is the production pattern for "if this happened in a previous run, then create new derived data now".
When to use it
Use chained conditional logic when downstream processing must react to newly produced RuleEngineResult records across runs.
Use single-pass sh:order chaining in rule_engine.md when all logic can execute in one validation run.
What this solves
Use this when one rule stage should react to outputs from another stage across separate runs:
- detect new upstream result events
- process only changed/new records (not full scans)
- keep lineage from upstream to downstream (
causedBy) - run deterministic pipelines on schedule
Mental model
Think in three steps:
- Produce: upstream conditional logic writes
RuleEngineResult. - Listen: incremental listener reads only new upstream records.
- Materialize: downstream stage writes new
RuleEngineResult.
In this package, step 2 is the rule_engine_result_sync handler.
Runtime contract
rule_engine_result_sync reads changed records from a stream and persists checkpoint state in FunctionValidationState.
- source modes:
sync(default): Records sync endpoint + cursorfilter: latest-changed filter + watermark- filter-mode bootstrap:
- filter requests always include
lastUpdatedTime - on first run with empty state, listener bootstrap watermark is bounded (default ~6 days lookback) unless
initial_watermark_msis provided - this avoids unbounded interval failures in environments with strict filter windows
- first run in sync mode:
- uses
initialize_cursor(default:7d-ago) when no saved cursor exists - response payload:
- always includes
record_external_ids - includes full
itemsonly wheninclude_items: true(use for probes; scheduled workflows default tofalse) - container scoping:
- requests include
sourcesfor the target container - filter includes
hasDatafor that container - pagination:
page_limitmax 1000 per callmax_recordslets one run process multiple pages- checkpoint safety:
- listener workflows are deployed with
max_concurrent_executions = 1
Filter model (AND semantics)
All configured filters are combined with logical AND:
rule_idsrule_set_idsrule_set_versionsfocus_nodesfocus_node_instancesdata_domain_external_ids
This allows precise routing, for example: "only RuleA outputs in DataProduct X for specific focus nodes".
In Data Product mode, align filters with fields upstream producers write:
data_domain_external_ids→ DataProductexternal_idrule_set_ids→ viewexternalIdfor instance validation, or RuleSetexternalIdfor timeseries validationrule_set_versions→ DataProduct semver
End-to-end chaining pattern
Step 1 — Upstream writes results
Run conditional logic (Rule A) from SHACL-AF and post to RuleEngineResult.
Step 2 — Listener consumes new Rule A outputs
Invoke:
{
"validation_type": "rule_engine_result_sync",
"listener_id": "chain-rule-a",
"stream_id": "dq_validation_stream",
"source_mode": "filter",
"initial_watermark_ms": 1710000000000,
"records_space": "dataQuality",
"records_container": "RuleEngineResult",
"filter": {
"rule_ids": ["RuleA"]
},
"page_limit": 1000,
"max_records": 5000
}
Step 3 — Downstream writes new results
Use consumed item(s) to drive downstream logic and write Rule B/C outputs to the same container, optionally setting:
causedBy-> upstream record references
Concrete ruleset examples (proven cascade pattern)
The live cascade demo in test_and_deploy/test_rule_engine_result_sync_cascade_from_recent.py validates this pattern:
- listener filters on one upstream
ruleId - downstream record writes a new
ruleId - downstream record sets
causedByto upstream record external IDs
Use the same pattern directly in SHACL-AF rulesets.
Example A - upstream rule that produces source records
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix dqs: <http://purl.org/cognite/dqs#> .
@prefix ex: <https://example.com/ns#> .
ex:RuleAStageShape
a sh:NodeShape ;
sh:targetClass ex:WorkOrder ;
sh:rule [
a sh:SPARQLRule ;
sh:order 10 ;
dqs:ruleId "RuleA" ;
sh:construct """
CONSTRUCT {
?result a dqs:RuleEngineResult ;
dqs:focusNode ?wo ;
dqs:ruleId "RuleA" ;
dqs:resultType "Inference" ;
dqs:resultValue "MatchedRuleA" .
}
WHERE {
?wo a ex:WorkOrder ;
ex:priority "High" .
BIND(IRI(CONCAT("urn:rule-result:rule-a:", ENCODE_FOR_URI(STR(?wo)))) AS ?result)
}
""" ;
] .
This is the stage your listener targets with filter.rule_ids: ["RuleA"].
Example B - downstream rule that creates cascaded output with lineage
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix dqs: <http://purl.org/cognite/dqs#> .
@prefix ex: <https://example.com/ns#> .
ex:RuleCascadeRecentShape
a sh:NodeShape ;
sh:targetClass ex:WorkOrder ;
sh:rule [
a sh:SPARQLRule ;
sh:order 30 ;
dqs:ruleId "RuleCascadeRecent" ;
dqs:dependsOn "RuleA" ;
sh:construct """
CONSTRUCT {
?result a dqs:RuleEngineResult ;
dqs:focusNode ?wo ;
dqs:ruleId "RuleCascadeRecent" ;
dqs:resultType "Inference" ;
dqs:resultValue "CascadedFromRecent" ;
dqs:causedBy ?upstreamResult .
}
WHERE {
?upstreamResult a dqs:RuleEngineResult ;
dqs:focusNode ?wo ;
dqs:ruleId "RuleA" .
?wo a ex:WorkOrder .
BIND(IRI(CONCAT("urn:rule-result:rule-cascade-recent:", ENCODE_FOR_URI(STR(?wo)))) AS ?result)
}
""" ;
] .
Notes:
- Keep
dqs:ruleIdstable (RuleA,RuleCascadeRecent) so listener filters remain valid. - Keep result IRIs deterministic to avoid duplicate records.
- Always set
dqs:causedByon downstream records for explainable chaining.
Listener config that matches the rules above
name: chain_rule_a_listener
description: "Consume RuleA records and feed downstream cascade"
listener_id: chain-rule-a
source_mode: filter
stream_id: dq_validation_stream
records_space: dataQuality
records_container: RuleEngineResult
initial_watermark_ms: 1710000000000
page_limit: 1000
max_records: 2000
filter:
rule_ids: ["RuleA"]
This listener contract is the same shape used in the live cascade script:
- filter scoped by
rule_ids - bounded bootstrap watermark in filter mode
- writes downstream records that reference upstream record external IDs via
causedBy
Working example from test_and_deploy
Use:
test_and_deploy/test_rule_engine_result_sync_chaining.py
What it validates:
- writes record #1 (
RuleA) and record #2 (RuleB) toRuleEngineResult - runs
rule_engine_result_syncfiltered onRuleA - asserts listener consumed #1 and did not consume #2
- writes record #3 (
RuleC) withcausedBy-> record #1 - verifies record #3 exists in
RuleEngineResult
Run:
uv run python test_and_deploy/test_rule_engine_result_sync_chaining.py --stream-id <rule_engine_stream>
If omitted, --stream-id is resolved from DataQualitySettings (records_rule_engine_stream_id / records_stream_id).
Related script for upstream inference generation:
test_and_deploy/test_uniqueness_handler_live.py(produces RuleEngineResult outputs from SHACL execution path)
Deployment pattern
Deploy listener workflows via rule_engine_result_sync_dir in deploy_validation_infrastructure(...).
Each YAML config creates one scheduled workflow + trigger calling validation_type: "rule_engine_result_sync".
See Deploy.
Relationship to Conditional Logic guide
Use Conditional Logic (Rule Engine) for:
- SHACL-AF authoring
- intra-pass rule ordering (
sh:order,dqs:dependsOn)
Use this guide for:
- cross-run chaining on new
RuleEngineResultrecords - incremental listener behavior and checkpointing
Troubleshooting
- No records consumed in sync mode:
- check listener has valid cursor or
initialize_cursor - confirm stream and container are correct
- confirm filter values match stored property values
- Filter mode fails with interval errors:
- ensure bootstrap watermark is bounded (for example now-6d / now-7d)
- pass
initial_watermark_msexplicitly when needed - Records from wrong rule consumed:
- verify
filter.rule_idsandrule_set_ids - confirm pipeline writes expected
ruleIdvalues - Duplicate processing concerns:
- keep one listener per checkpoint (
listener_id) - keep workflow concurrency at 1 for each listener
Best practices
- Keep listener filters tight (
rule_ids,rule_set_ids, version/domain filters) to avoid accidental fan-in. - Keep first-run bootstrap bounded in
filtermode. - Always set lineage (
causedBy) on downstream records for explainability.