Skip to content

Chained Conditional Logic User Guide

What this is

This guide explains how to build cross-run chaining with conditional logic:

  • Rule A writes RuleEngineResult records.
  • A listener reads only new Rule A records.
  • Rule B/C runs downstream and writes new RuleEngineResult records.

This is the production pattern for "if this happened in a previous run, then create new derived data now".


When to use it

Use chained conditional logic when downstream processing must react to newly produced RuleEngineResult records across runs.

Use single-pass sh:order chaining in rule_engine.md when all logic can execute in one validation run.


What this solves

Use this when one rule stage should react to outputs from another stage across separate runs:

  • detect new upstream result events
  • process only changed/new records (not full scans)
  • keep lineage from upstream to downstream (causedBy)
  • run deterministic pipelines on schedule

Mental model

Think in three steps:

  1. Produce: upstream conditional logic writes RuleEngineResult.
  2. Listen: incremental listener reads only new upstream records.
  3. Materialize: downstream stage writes new RuleEngineResult.

In this package, step 2 is the rule_engine_result_sync handler.


Runtime contract

rule_engine_result_sync reads changed records from a stream and persists checkpoint state in FunctionValidationState.

  • source modes:
  • sync (default): Records sync endpoint + cursor
  • filter: latest-changed filter + watermark
  • filter-mode bootstrap:
  • filter requests always include lastUpdatedTime
  • on first run with empty state, listener bootstrap watermark is bounded (default ~6 days lookback) unless initial_watermark_ms is provided
  • this avoids unbounded interval failures in environments with strict filter windows
  • first run in sync mode:
  • uses initialize_cursor (default: 7d-ago) when no saved cursor exists
  • response payload:
  • always includes record_external_ids
  • includes full items only when include_items: true (use for probes; scheduled workflows default to false)
  • container scoping:
  • requests include sources for the target container
  • filter includes hasData for that container
  • pagination:
  • page_limit max 1000 per call
  • max_records lets one run process multiple pages
  • checkpoint safety:
  • listener workflows are deployed with max_concurrent_executions = 1

Filter model (AND semantics)

All configured filters are combined with logical AND:

  • rule_ids
  • rule_set_ids
  • rule_set_versions
  • focus_nodes
  • focus_node_instances
  • data_domain_external_ids

This allows precise routing, for example: "only RuleA outputs in DataProduct X for specific focus nodes".

In Data Product mode, align filters with fields upstream producers write:

  • data_domain_external_ids → DataProduct external_id
  • rule_set_ids → view externalId for instance validation, or RuleSet externalId for timeseries validation
  • rule_set_versions → DataProduct semver

End-to-end chaining pattern

Step 1 — Upstream writes results

Run conditional logic (Rule A) from SHACL-AF and post to RuleEngineResult.

Step 2 — Listener consumes new Rule A outputs

Invoke:

{
  "validation_type": "rule_engine_result_sync",
  "listener_id": "chain-rule-a",
  "stream_id": "dq_validation_stream",
  "source_mode": "filter",
  "initial_watermark_ms": 1710000000000,
  "records_space": "dataQuality",
  "records_container": "RuleEngineResult",
  "filter": {
    "rule_ids": ["RuleA"]
  },
  "page_limit": 1000,
  "max_records": 5000
}

Step 3 — Downstream writes new results

Use consumed item(s) to drive downstream logic and write Rule B/C outputs to the same container, optionally setting:

  • causedBy -> upstream record references

Concrete ruleset examples (proven cascade pattern)

The live cascade demo in test_and_deploy/test_rule_engine_result_sync_cascade_from_recent.py validates this pattern:

  • listener filters on one upstream ruleId
  • downstream record writes a new ruleId
  • downstream record sets causedBy to upstream record external IDs

Use the same pattern directly in SHACL-AF rulesets.

Example A - upstream rule that produces source records

@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix dqs: <http://purl.org/cognite/dqs#> .
@prefix ex: <https://example.com/ns#> .

ex:RuleAStageShape
    a sh:NodeShape ;
    sh:targetClass ex:WorkOrder ;
    sh:rule [
        a sh:SPARQLRule ;
        sh:order 10 ;
        dqs:ruleId "RuleA" ;
        sh:construct """
            CONSTRUCT {
              ?result a dqs:RuleEngineResult ;
                      dqs:focusNode ?wo ;
                      dqs:ruleId "RuleA" ;
                      dqs:resultType "Inference" ;
                      dqs:resultValue "MatchedRuleA" .
            }
            WHERE {
              ?wo a ex:WorkOrder ;
                  ex:priority "High" .
              BIND(IRI(CONCAT("urn:rule-result:rule-a:", ENCODE_FOR_URI(STR(?wo)))) AS ?result)
            }
        """ ;
    ] .

This is the stage your listener targets with filter.rule_ids: ["RuleA"].

Example B - downstream rule that creates cascaded output with lineage

@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix dqs: <http://purl.org/cognite/dqs#> .
@prefix ex: <https://example.com/ns#> .

ex:RuleCascadeRecentShape
    a sh:NodeShape ;
    sh:targetClass ex:WorkOrder ;
    sh:rule [
        a sh:SPARQLRule ;
        sh:order 30 ;
        dqs:ruleId "RuleCascadeRecent" ;
        dqs:dependsOn "RuleA" ;
        sh:construct """
            CONSTRUCT {
              ?result a dqs:RuleEngineResult ;
                      dqs:focusNode ?wo ;
                      dqs:ruleId "RuleCascadeRecent" ;
                      dqs:resultType "Inference" ;
                      dqs:resultValue "CascadedFromRecent" ;
                      dqs:causedBy ?upstreamResult .
            }
            WHERE {
              ?upstreamResult a dqs:RuleEngineResult ;
                              dqs:focusNode ?wo ;
                              dqs:ruleId "RuleA" .
              ?wo a ex:WorkOrder .
              BIND(IRI(CONCAT("urn:rule-result:rule-cascade-recent:", ENCODE_FOR_URI(STR(?wo)))) AS ?result)
            }
        """ ;
    ] .

Notes:

  • Keep dqs:ruleId stable (RuleA, RuleCascadeRecent) so listener filters remain valid.
  • Keep result IRIs deterministic to avoid duplicate records.
  • Always set dqs:causedBy on downstream records for explainable chaining.

Listener config that matches the rules above

name: chain_rule_a_listener
description: "Consume RuleA records and feed downstream cascade"
listener_id: chain-rule-a
source_mode: filter
stream_id: dq_validation_stream
records_space: dataQuality
records_container: RuleEngineResult
initial_watermark_ms: 1710000000000
page_limit: 1000
max_records: 2000
filter:
  rule_ids: ["RuleA"]

This listener contract is the same shape used in the live cascade script:

  • filter scoped by rule_ids
  • bounded bootstrap watermark in filter mode
  • writes downstream records that reference upstream record external IDs via causedBy

Working example from test_and_deploy

Use:

  • test_and_deploy/test_rule_engine_result_sync_chaining.py

What it validates:

  1. writes record #1 (RuleA) and record #2 (RuleB) to RuleEngineResult
  2. runs rule_engine_result_sync filtered on RuleA
  3. asserts listener consumed #1 and did not consume #2
  4. writes record #3 (RuleC) with causedBy -> record #1
  5. verifies record #3 exists in RuleEngineResult

Run:

uv run python test_and_deploy/test_rule_engine_result_sync_chaining.py --stream-id <rule_engine_stream>

If omitted, --stream-id is resolved from DataQualitySettings (records_rule_engine_stream_id / records_stream_id).

Related script for upstream inference generation:

  • test_and_deploy/test_uniqueness_handler_live.py (produces RuleEngineResult outputs from SHACL execution path)

Deployment pattern

Deploy listener workflows via rule_engine_result_sync_dir in deploy_validation_infrastructure(...).

Each YAML config creates one scheduled workflow + trigger calling validation_type: "rule_engine_result_sync".

See Deploy.


Relationship to Conditional Logic guide

Use Conditional Logic (Rule Engine) for:

  • SHACL-AF authoring
  • intra-pass rule ordering (sh:order, dqs:dependsOn)

Use this guide for:

  • cross-run chaining on new RuleEngineResult records
  • incremental listener behavior and checkpointing

Troubleshooting

  • No records consumed in sync mode:
  • check listener has valid cursor or initialize_cursor
  • confirm stream and container are correct
  • confirm filter values match stored property values
  • Filter mode fails with interval errors:
  • ensure bootstrap watermark is bounded (for example now-6d / now-7d)
  • pass initial_watermark_ms explicitly when needed
  • Records from wrong rule consumed:
  • verify filter.rule_ids and rule_set_ids
  • confirm pipeline writes expected ruleId values
  • Duplicate processing concerns:
  • keep one listener per checkpoint (listener_id)
  • keep workflow concurrency at 1 for each listener

Best practices

  • Keep listener filters tight (rule_ids, rule_set_ids, version/domain filters) to avoid accidental fan-in.
  • Keep first-run bootstrap bounded in filter mode.
  • Always set lineage (causedBy) on downstream records for explainability.

Previous section

Next section