Skip to content

Composable Pipelines

The pipeline system is the core of Cognitive Companion. Each rule defines its own graph of pipeline steps executed by the PipelineExecutor. Caregivers and operators connect steps on a visual canvas, branch through output ports, and review live or historical runs in the execution inspector.

Step handlers are self-contained plugins, each in its own file under backend/steps/builtin/, auto-discovered at startup via StepRegistry. The frontend loads available step types dynamically from the GET /api/v1/pipeline/step-types endpoint, so new plugins appear automatically in the step palette and config editor. See Extending the Pipeline for how to add custom step types.

How Pipelines Work

When a rule is triggered, the executor:

  1. Creates a WorkflowExecution record to track progress
  2. Initializes a shared pipeline_data dictionary
  3. Captures an immutable graph snapshot for later inspection
  4. Finds entry nodes and executes enabled steps through PipelineEdge rows
  5. Merges each step's output into pipeline_data for downstream steps
  6. Traverses the output ports returned by each step
  7. Handles waiting, cancellation, completion, and error states
python
@dataclass
class StepResult:
    success: bool = True
    data: dict = field(default_factory=dict)   # Merged into pipeline_data
    should_continue: bool = True
    output_ports: tuple[str, ...] = ("main",)  # Runtime ports to traverse
    wait_until: datetime | None = None         # For wait/resume

Every step receives the full pipeline_data dictionary and a TriggerContext with metadata about what initiated the execution:

python
@dataclass
class TriggerContext:
    trigger_type: str              # "sensor_event", "cron", "manual", "webhook", "occupancy_duration", "telegram", "cts_window", "dementia_signal"
    sensor_id: str | None
    room_name: str | None
    media_paths: list[str]
    media_type: str | None
    webhook_payload: dict | None   # Payload from webhook and Telegram triggers
    occupancy_duration_minutes: float | None  # Set for occupancy_duration triggers

For webhook and telegram triggers, the payload is also available in pipeline_data["trigger_input"]. For occupancy_duration triggers, occupancy_duration_minutes reflects how long the sensor has been continuously occupied at the moment the rule fired.

Telegram trigger payload keys (accessible as {{trigger_input.command}} etc.):

KeyDescription
trigger_input.commandThe matched command, e.g. "/medication"
trigger_input.argsList of words following the command
trigger_input.textRaw message text
trigger_input.chat_idTelegram chat ID as a string
trigger_input.from_userTelegram user object (id, first_name, username)

See Telegram Command Triggers below for configuration details.

The executor also injects a localized system object into pipeline_data using app.timezone from settings.yaml. The namespace covers time, day, date, and friendly composite strings so templates can reference any common format without further conversion:

json
{
  "system": {
    "local_time": "8:42 AM",
    "local_time_24h": "08:42",
    "local_hour_12h": "8",
    "local_hour_24h": "08",
    "local_minute": "42",
    "local_ampm": "AM",
    "local_date": "2026-03-29",
    "local_day_of_week": "Sunday",
    "local_day_of_week_short": "Sun",
    "local_day_of_month": 29,
    "local_day_ordinal": "29th",
    "local_month_name": "March",
    "local_month_name_short": "Mar",
    "local_month_number": 3,
    "local_year": 2026,
    "local_date_long": "March 29th, 2026",
    "local_date_friendly": "Sunday, March 29th",
    "timezone": "America/New_York"
  }
}

This makes local wall-clock values available to prompts and notification templates without requiring each step to compute them independently. The complete list of keys is served by GET /api/v1/pipeline/data-keys and surfaced as autocomplete suggestions in the admin UI template editors.

Graph Authoring

The rule canvas stores two resources: PipelineStep nodes and PipelineEdge connections.

ResourceAPIPurpose
StepsGET/POST/PUT/DELETE /api/v1/rules/{rule_id}/stepsCreate and configure pipeline nodes
PositionsPUT /api/v1/rules/{rule_id}/steps/positionsPersist canvas coordinates
EdgesGET/PUT /api/v1/rules/{rule_id}/edgesRead or atomically replace graph connections
ValidationPOST /api/v1/rules/{rule_id}/validateCheck templates and graph structure

Most step types expose one main output port. The condition step exposes true and false, so the canvas can connect each branch to a different downstream step. A source step can have one outgoing edge per output port.

Execution review uses GET /api/v1/workflows/{execution_id}/detail, which returns the graph snapshot, timeline, skipped nodes, activated output ports, outputs, and errors. Lightweight live lists use GET /api/v1/pipeline/runs.

Cool-Off Behavior

Rule cool-off is driven by EventLog.status == "completed". A workflow that runs successfully but does not perform a terminal action is now recorded as ignored, so analytical or verification-only passes do not consume the rule's cool-off window.

Built-in steps that can explicitly mark a run as cool-off-worthy:

  • notification: trigger_cooloff defaults to true
  • ha_action: trigger_cooloff defaults to true
  • activity_detection: trigger_cooloff defaults to true
  • condition: trigger_cooloff defaults to false, and only applies when the expression evaluates to true

These steps add _cooloff_triggered to pipeline_data, and the executor uses that flag when finalizing the related event log.

Pipeline Step Types

Perception

person_identification

Sends media frames to the person identification service for face recognition. Returns detected identities with confidence scores and bounding boxes. Records sightings and updates person location state.

Config fields:

  • include_annotated_image: return frames with bounding boxes and name labels drawn over faces
  • confidence_threshold: minimum confidence to accept an identification (default from settings)
  • target_persons: optional comma-separated list of person IDs to filter for

Output keys: person_detections (list of detections with identity, confidence, bbox), annotated_images (if enabled)

activity_detection

Record a single activity to the PersonActivity table. All fields support prompt templates, so values can be fixed strings or resolved from any upstream step output or trigger context. Use multiple steps in sequence to record multiple activities.

Config fields:

  • activity_type (required): activity to record. Supports templates (e.g. {{logic_response.activity_type}} or a literal like "bathroom_occupancy"). The UI offers 30+ pre-programmed suggestions (eating, sleeping, medication, bathing, etc.) but any free-form string is accepted.
  • person_id (optional): person to attribute the activity to. Supports templates (e.g. {{person_detections.0.person_id}}). Leave empty to record as unknown person.
  • room_name (optional): room where the activity occurred. Supports templates (e.g. {{room_name}}). Defaults to the trigger room when empty.
  • confidence: confidence score (0-1). Accepts a fixed number or {{template}} syntax (e.g. {{logic_response.confidence}}). Defaults to 0.8.
  • capture_scene_description (bool, default false): when true, saves the upstream vision model output into metadata_json.scene_description alongside the activity record. Creates a complete audit trail linking what was observed to what was recorded.
  • scene_description_key (str, default "vision_response"): which pipeline_data key to read when capture_scene_description is enabled. Override to "llm_response" or any other key when using llm_call with a custom output_key.
  • metadata_extra (str, optional): a JSON string (supports {{template}} syntax) merged into metadata_json alongside any captured scene description. Useful for recording structured reasoning alongside the activity: {"reasoning": "{{logic_response.reasoning}}"}.
  • trigger_cooloff: whether a successful activity record should count toward the rule cool-off window. Defaults to true.

Output keys: detected_activities (list with one entry, including metadata when populated), optional _cooloff_triggered

Detecting long-duration activities (meals, extended occupancy)

The VLM window in an llm_call step is bounded by the EventAggregator batch (typically 10 s to 1 min). To detect activities that unfold over a longer duration such as a meal, extend the temporal window on the upstream step:

yaml
step_type: llm_call
config:
  model_id: cosmos_reason2
  image_source: both
  image_time_filter:
    since_minutes: 30    # pull the last 30 minutes of frames from MinIO
  prompt: "Looking at frames from the last 30 minutes, has the person had lunch?"
  output_key: vision_response

The activity_detection step then records the conclusion of that extended analysis. Enable capture_scene_description: true to save the VLM's full reasoning alongside the record.

Example: meal detection with scene capture
yaml
# 1. Pull 30 minutes of kitchen frames, ask the vision model
step_type: llm_call
config:
  model_id: cosmos_reason2
  image_source: both
  additional_room_names: [Kitchen]
  image_time_filter:
    since_minutes: 30
  sort_by_sensor_then_time: true
  prompt: "Review all frames. Has {{person_detections.0.name}} eaten lunch today?"
  output_key: vision_response

# 2. Record the activity with the scene description attached
step_type: activity_detection
config:
  activity_type: meal_lunch
  person_id: "{{person_detections.0.person_id}}"
  confidence: "{{logic_response.confidence}}"
  capture_scene_description: true
  scene_description_key: vision_response
  metadata_extra: '{"reasoning": "{{logic_response.reasoning}}"}'

scene_analysis

Calls the standalone scene-analysis-service for multi-modal analysis of a trigger image. Three inference components run in a single request: YOLO11x object detection, Florence-2-large structured scene description, and CLIP ViT-L/14 image embeddings. A YAML-configured hazard rule engine evaluates detections against named hazard rules and emits alerts for matches.

The step always continues the pipeline. When the scene-analysis-service is unreachable or disabled, all result keys are empty.

Config fields:

  • run_detect (bool, default true): run YOLO11x object detection
  • run_describe (bool, default true): run Florence-2-large scene description
  • run_embed (bool, default false): run CLIP ViT-L/14 embedding (higher latency)
  • run_hazards (bool, default true): evaluate YAML hazard rules against detections
  • max_images (int, default 1): number of trigger images to send for analysis

Output keys:

KeyTypeDescription
scene_detectionslistObject detections: label, confidence, bbox [x1,y1,x2,y2], class_id
scene_descriptionstrStructured natural-language caption from Florence-2
scene_embeddinglist[float]768-dim L2-normalized CLIP embedding vector (empty when run_embed is false)
scene_hazardslistTriggered hazard alerts: name, severity, description, detection
scene_detector_availableboolWhether YOLO was loaded in the service
scene_describer_availableboolWhether Florence-2 was loaded in the service
scene_embedder_availableboolWhether CLIP was loaded in the service
Example: detect and alert on hazards
yaml
# 1. Run full scene analysis on the trigger frame
step_type: scene_analysis
config:
  run_detect: true
  run_describe: true
  run_embed: false
  run_hazards: true
  max_images: 1

# 2. Branch on hazards
step_type: condition
config:
  expression: "scene_hazards.count > 0"

# 3. Notify caregiver with the scene description
step_type: notification
config:
  alert_level: warning
  message_template: "Hazard detected: {scene_description}"
  telegram_template: "Hazard in {{room_name}}: {{scene_description}}"

Combining with person identification

Place person_identification before scene_analysis in the pipeline. The scene_detections output complements face detections by identifying objects in the frame. Use a condition step to branch on both person_detections.count > 0 and scene_hazards.count > 0 for context-aware alerts.

object_trend_analysis

Queries the semantic-memory-service for room-level object trend state: clutter scores, persistent/novel objects, and anomaly severity. Designed to precede a condition step (for rule branching) or an llm_call step (for LLM-enriched reasoning).

Config fields:

  • room_ids: list of room IDs to query. Empty = use the trigger room.
  • include_snapshots_hours: if > 0, fetch raw hourly snapshots for LLM context.
  • severity_threshold: anomalies below this severity are stripped (ok, info, warning, critical; default info).
  • output_key: key under which the result map is written (default room_trends).

Output keys:

KeyTypeDescription
room_trendsdictMaps room_id to trend result: clutter_score, trend_direction, overall_severity, persistent_objects, novel_objects, anomalies
room_trends_any_warningboolWhether any room has severity >= warning
room_trends_max_severitystrHighest severity across all rooms
room_trends_summarystrCompact single-line text ready for LLM prompt injection

Graceful degradation: when the semantic-memory-service is unreachable or returns no data, the step writes empty results and continues.

presence_query

Queries the fused PresenceService for the current location and status of a person. Aggregates data from multiple providers (night anchor, HA bed sensor, CTS location, HA device tracker, stale fallback) via the priority chain in config/presence.yaml. Also fetches recent dementia signals when services.signals is wired.

Config fields:

  • person_id: person to query. Leave blank to resolve from pipeline_data (sightings/persons).
  • signal_kind: filter recent dementia signals to this kind (e.g. bathroom_dwell_anomaly).
  • signal_severity_min: minimum severity for signal match (info, warning, emergency; default info).
  • signal_window_minutes: lookback for dementia signals in minutes (default 30).
  • output_key: pipeline_data key for the result dict (default presence).

Output keys:

KeyTypeDescription
{output_key}_statusstrPresence status: present_room, present_home, asleep, away, stale, unknown
{output_key}_room_namestrCurrent room name (or null)
{output_key}_room_idstrCurrent room ID (or null)
{output_key}_dwell_minutesfloatMinutes in current room
{output_key}_confidencefloatProvider confidence (0-1)
{output_key}_signalslistRecent dementia signals matching filters
{output_key}_providerslistActive presence providers that contributed
{output_key}_timestampstrISO-8601 snapshot timestamp
presence_at_homeboolConvenience flat key: person is at home
presence_asleepboolConvenience flat key: person is asleep
presence_awayboolConvenience flat key: person is away

Graceful degradation: when the PresenceService returns UNKNOWN or STALE, all keys default to empty/null and presence_at_home / presence_asleep / presence_away are all false.

home_state

A thin convenience wrapper around presence_query that emits exactly four boolean flags. Designed for use with condition steps that react to high-level home-state changes.

Config fields:

  • person_id: person to query. Leave blank to resolve from pipeline_data.
  • output_key: pipeline_data key prefix for the result (default home).

Output keys:

KeyTypeDescription
{output_key}_at_homeboolPerson is in a known room or asleep (status present_room, present_home, or asleep)
{output_key}_asleepboolPerson is asleep (status asleep)
{output_key}_awayboolPerson is away from home (status away)
{output_key}_state_unknownboolStatus is unknown or stale

These mirror the four boolean outputs of the home_state context filter, so the same logic used in rule filters is available inside a pipeline. When the presence service is unavailable, all four flags are false.

semantic_memory_query

Queries the semantic-memory-service for recent scene observations, object presence, and hazard data in a room. Injects both structured results and a compact LLM-ready summary into pipeline_data.

Config fields:

  • room_id: explicit room ID to query. Overrides use_trigger_room.
  • use_trigger_room: when true, uses the trigger's room. Default true.
  • objects_limit: max object presence records to return (default 10).
  • hazards_limit: max hazard observations (default 5).
  • observations_limit: max observation search hits (default 20).
  • within_minutes: lookback window in minutes (default 60).
  • min_confidence: minimum confidence for returned records (default 0.0).
  • output_key: pipeline_data key for the result dict (default memory_context).

Output keys:

KeyTypeDescription
{output_key}.recent_objectslistObject presence records with label, confidence, first/last seen timestamps
{output_key}.recent_hazardslistHazard observations with flags, severity, confidence
{output_key}.observationslistObservation search hits with description, embedding distance
{output_key}.observations_countintTotal matching observations
{output_key}.summarystrCompact single-paragraph text ready for LLM prompt injection

Graceful degradation: when the semantic-memory-service is unreachable or disabled, the output contains empty lists and a "No memory context available." summary.

recamera_media_poll

Fetches recent reCamera images from the MediaCache via the EventAggregator and returns presigned MinIO URLs plus metadata. Snapshot semantics: reads what is currently in cache and returns immediately. To wait for new events to accumulate, place a wait step before this one.

The step pairs well with the Continuous Tracking data path. It lets a rule explicitly collect frames from one or more reCamera sensors or rooms, branch on the image count, then forward the snapshot to a downstream vision or scene-analysis step.

Config fields:

  • sensor_ids (list): reCamera sensor IDs to include. Empty means all cameras (subject to room_names filter if set).
  • room_names (list): room names to include. Combined with sensor_ids, only sensors in these rooms are returned.
  • since_minutes (number, default 5): return images captured within the last N minutes. Images older than the MediaCache retention window are never returned regardless of this setting.
  • images_per_sensor (int, default 3): maximum images returned per sensor when sensor_ids are specified.
  • sensor_frame_limits (dict): per-sensor overrides for images_per_sensor. Keys are sensor IDs, values are integer frame limits.
  • max_images (int, default 10): hard cap on total images returned across all sensors.
  • chronological (bool, default true): when true, images within each sensor are sorted oldest-first (better for temporal reasoning). When false, newest-first.

Output keys: images (list of presigned MinIO URLs), count, sensor_ids, room_names, since_minutes, polled_at (ISO-8601 UTC timestamp).

Graceful degradation: when the EventAggregator is not available, the step returns success=False with an empty payload so downstream steps can branch on it.

cts_window_poll

Pulls a window of recent Continuous Tracking System (CTS) frames enriched with detections, identities, room dwells, and optionally scene captions. Designed to be used inside a pipeline triggered by a reCamera sensor event, so the downstream LLM step can reason over high-quality CTS data even when the trigger fired on a lower-resolution reCamera frame.

The output payload shape is symmetric with the cts_window trigger so downstream template expressions are interchangeable between the two paths.

Config fields:

  • duration_s (number, required, default 10): total window duration in seconds.
  • sample_period_s (number, required, default 1.0): downsample to one frame per this many seconds.
  • cameras (list): CTS camera IDs to include. Empty means all CTS cameras. CTS camera IDs are distinct from reCamera sensor IDs.
  • rooms (list): room names to include.
  • lookback_s (number, default 5): seconds before the trigger time to include in the window.
  • lookahead_s (number, default 5): seconds after the trigger time to wait and collect.
  • include_scene (bool, default false): run scene analysis on sampled frames.
  • include_pose (bool, default false): include pose keypoints. Requires the CTS pose pipeline (TD-005); the option is shown in the UI as disabled until that path is wired.
  • max_frames (int, default 30): hard cap on total frames returned.

Output keys: trigger_id, window_start, window_end, cameras, rooms, frames (list of enriched frame dicts: per-frame detections, identities, optional scene_caption), summary (distinct_identities, detection_count, rooms), partial (true when the bucketizer is not yet wired or when the downsampler trimmed frames).

WARNING

The CTS event bucketizer is the upstream feed for this step. When the bucketizer is not wired into the service container, cts_window_poll logs a warning and returns an empty window with partial: true. Downstream steps can branch on partial to handle the degraded path.

Reasoning

llm_call

The unified LLM step. Replaces vision_analysis, logic_reasoning, and translation with a single model-agnostic interface. The model is selected per step from the named registry in settings.yaml, so each step in a pipeline can use a different model without deploying multiple provider types.

Config fields:

  • model_id (required): ID of a model entry from llm.models in settings.yaml.
  • prompt: prompt text (supports prompt templates).
  • special_instructions: text prepended to the prompt before template rendering. Useful for translation style guides or system-level constraints.
  • include_context: list of pipeline_data keys to include as context above the prompt. If empty, person_detections and vision_response are auto-included when present.
  • image_source: "none" (default), "trigger", "additional", or "both". Image attachment is silently skipped when the selected model does not have the vision capability.
  • max_images: hard cap on total images sent to the model (default 5).
  • additional_sensor_ids: camera sensor IDs to pull images from. When sort_by_sensor_then_time is enabled, the order of this list determines the grouping order.
  • additional_room_names: pull images from all cameras in these rooms (unordered; for ordered multi-sensor analysis, use additional_sensor_ids instead).
  • images_per_sensor: maximum images per sensor when sensor-ordered grouping is active (default 3).
  • sort_by_sensor_then_time: when true, images are grouped by sensor in additional_sensor_ids order, then sorted oldest-first within each group. This produces a temporally coherent sequence across sensors, enabling inter-frame analysis by vision reasoning models such as Cosmos Reason2.
  • image_time_filter: optional object with since_minutes, time_start, time_end to restrict which images are fetched.
  • response_format: "text" (default), "json_schema", or "json_free". Controls whether the output is stored as a plain string or parsed as JSON.
  • response_schema: natural-language description of the expected JSON format, appended to the prompt.
  • response_json_schema: JSON Schema string. When response_format is "json_schema" and the model has guided_decoding: true in settings, the schema is sent as guided_json to the server (vLLM). For other servers, it is injected as a prompt instruction.
  • output_key: the pipeline_data key where the result is stored (default "llm_response"). Set to "logic_response", "vision_response", or "translation" for compatibility with downstream steps that reference those keys.
  • hallucination_marker: if this string is found in the response, the call is retried up to the model's max_retries setting (Tenacity). Useful for translation models with known failure modes.

Output keys: pipeline_data[output_key] (string when response_format is "text", parsed dict when "json_schema" or "json_free"). When output_key is "logic_response" and the response contains is_notification_needed: false, notification_suppressed: true is also written.

Example: vision reasoning with sensor-ordered frames
yaml
step_type: llm_call
config:
  model_id: cosmos_reason2
  prompt: "Analyze the sequence of frames from each camera. Has the person left the stove unattended?"
  image_source: both
  additional_sensor_ids: [kitchen_cam_1, kitchen_cam_2]
  sort_by_sensor_then_time: true
  images_per_sensor: 4
  max_images: 10
  response_format: json_schema
  response_json_schema: |
    {
      "type": "object",
      "properties": {
        "stove_unattended": {"type": "boolean"},
        "reasoning": {"type": "string"},
        "confidence": {"type": "number"}
      },
      "required": ["stove_unattended", "reasoning"]
    }
  output_key: vision_response
Example: reasoning and notification decision
yaml
step_type: llm_call
config:
  model_id: gemma4_26b
  prompt: "Based on the analysis, should the caregiver be alerted?"
  include_context: [vision_response, person_detections]
  response_format: json_schema
  response_json_schema: |
    {
      "type": "object",
      "properties": {
        "is_notification_needed": {"type": "boolean"},
        "user_notification": {"type": "string"},
        "alert_level": {"type": "string", "enum": ["emergency","warning","info","reminder"]},
        "reasoning": {"type": "string"}
      },
      "required": ["is_notification_needed", "user_notification", "reasoning"]
    }
  output_key: logic_response
Example: translation with retry
yaml
step_type: llm_call
config:
  model_id: gemma4_26b
  special_instructions: "Translate using informal Tamil as spoken in Chennai (Tanglish):"
  prompt: "Translate the following to Tamil:\n\n{{logic_response.user_notification}}"
  response_format: text
  output_key: translation
  hallucination_marker: "சென்னை"

condition

Evaluates a boolean expression against pipeline_data to control execution flow. The step activates the true or false output port, and graph edges decide which downstream step runs next.

Config fields:

  • expression: the condition expression to evaluate
  • trigger_cooloff: whether a true result should mark the run as cool-off-worthy. Defaults to false.

Output keys: condition.expression, condition.result, condition.branch, optional _cooloff_triggered

See Condition Expressions below.

verification

Query the PersonActivity database to verify whether household members completed, or did not complete, specific activities within a time window. No LLM calls are involved.

Config fields:

  • conditions: list of condition objects, each with:
    • activity_type (str, required): the activity type to look for
    • person_id (str, optional): person to check; supports prompt templates (e.g. {{person_detections.0.person_id}}). Leave empty to match any person.
    • room_name (str, optional): room to filter by; supports templates (e.g. {{room_name}}). Leave empty to match any room.
    • completed (bool, default true): whether the activity should have been completed
    • within_minutes (float): relative time window from now
    • window_start / window_end (ISO-8601 timestamp): fixed wall-clock window (alternative to within_minutes). The stored time is re-anchored to today's date in app.timezone before querying.
    • min_confidence (float, default 0.5): minimum confidence threshold for matching records
  • match_mode: "all" or "any" (default "all")
  • re_notify_if_failed: bool (default false), re-trigger notification on verification failure
  • re_notify_delay_minutes: int (default 5), delay before re-notification

Output keys: verification.verified (bool), verification.match_mode, verification.matched_conditions, verification.unmatched_conditions

Action

notification

Dispatches an alert to configured notification channels based on alert level. The NotificationDispatcher can route to pwa_popup_text, Telegram, e-ink, ha_speaker_tts, pwa_realtime_ai, pwa_tts_announcement, and outbound webhook channels.

Config fields:

  • alert_level: emergency, warning, info, or reminder
  • channels: optional override of which channels to use (completely replaces the defaults from notifications.yaml when specified)
  • message_template: standard Python format string with {message}, {room}, and any pipeline_data key representing the default broadcast text.
  • telegram_template / eink_template / tts_template / webhook_template: channel-specific template overrides allowing you to format messages optimally for a specific medium. webhook_template should render a JSON string when you want to control the outbound payload body directly.
  • eink_targets: optional list of sensor IDs for targeted e-ink display rendering
  • ha_media_player: optional HA media_player entity ID for targeted TTS playback
  • webhook_url: optional per-step override for the outbound webhook destination
  • trigger_cooloff: whether a successful notification dispatch should count toward the rule cool-off window. Defaults to true.

Output keys: notification_dispatched (boolean), notification_channels (dict of channel -> success), optional _cooloff_triggered

ha_action

Calls a Home Assistant service. Can turn on lights, lock doors, activate scenes, trigger HA automations, or any other HA service call.

Config fields:

  • domain: Home Assistant domain to call (e.g. light, lock, script)
  • service: Home Assistant service name within that domain (e.g. turn_on, lock)
  • entity_id: the target entity
  • data: additional JSON payload to pass to the service
  • trigger_cooloff: whether a successful Home Assistant action should count toward the rule cool-off window. Defaults to true.

Output keys: ha_action.domain, ha_action.service, ha_action.entity_id, ha_action.success, optional _cooloff_triggered

activity_session_start

Open a duration-aware activity session for a person. Idempotent: reuses an existing open session of the same type if one already exists. Stores timeout configuration for automatic stale-session cleanup.

Config fields:

  • activity_type (required): activity type (e.g. sleep, bathroom, meal_prep). Supports {{template}} syntax.
  • person_id: person to attribute this session to. Supports templates (e.g. {{person_detections.0.person_id}}).
  • room_name: room where the activity occurs. Supports templates. Defaults to trigger room.
  • confidence: detection confidence (0-1). Accepts a fixed number or {{template}} syntax (default 0.85).
  • timeout_minutes: maximum session duration in minutes before auto-close. Uses built-in default for the activity type when empty. Supports templates.
  • metadata_extra: optional JSON string of extra fields to merge into session metadata. Supports {{template}} syntax.
  • output_key: pipeline_data key to write the session result under (default session).

Output keys: pipeline_data[output_key] with session_id, person_id, activity_type, room_name, started_at, timeout_minutes, was_existing

activity_session_end

Close an open duration-aware activity session for a person. Computes duration from open to close. Optionally records a PersonActivity with duration_minutes populated. If no open session exists, logs a warning and continues.

Config fields:

  • activity_type (required): activity type to close. Supports {{template}} syntax.
  • person_id: person to close the session for. Supports templates.
  • write_activity_record: when true, also records a PersonActivity with duration_minutes populated (default true).
  • output_key: pipeline_data key to write the closed session result under (default closed_session).

Output keys: pipeline_data[output_key] with session_id, person_id, activity_type, started_at, closed_at, duration_minutes, status, closed_via

daily_report

Generate end-of-day activity reports for one or all household members. Aggregates sleep, meals, medication, bathroom, door events, exercise, and location data into structured DailyReport records with wellness scoring. Designed to be triggered by a cron rule at end of day.

Config fields:

  • person_ids: list of person IDs to generate reports for. Empty list means all active household members.
  • report_date_offset_days: days offset from today for the report date (0 = today, -1 = yesterday).
  • generate_summary_text: when true, generates an LLM prose summary of the day.
  • summary_model_id: LLM model ID to use for summary generation (default gemma4_26b).
  • notify_on_complete: when true, sends a notification when reports are ready.
  • output_key: pipeline_data key to write the report results under (default daily_reports).

Output keys: pipeline_data[output_key] (list of report results with person_id, report_date, report_id, wellness_score)

info_card

Delivers a curated info card to the senior via PWA popup, e-ink display, or both. Loads an approved InfoCard by ID from the knowledge repository, resolves its image slots through the image pipeline, and dispatches via the knowledge delivery service. A voice instruction override allows per-delivery Gemini Live behavioral guardrails.

Config fields:

  • info_card_id (required): ID of an approved InfoCard to deliver
  • channels: list of delivery channels: pwa, eink, or both (default ["pwa"])
  • pwa_dismiss_seconds: auto-dismiss timeout for the PWA popup (default 60)
  • eink_expiry_minutes: how long the e-ink image stays active before refresh (default 30)
  • voice_instruction: override the default Gemini Live voice instruction for this delivery
  • trigger_cooloff: whether successful delivery consumes the rule cool-off (default true)

Output keys: info_card_delivery with info_card_id, layout_id, delivered_channels, delivery_id

Flow

wait

Pauses pipeline execution for a configured duration. The execution state is persisted to the database and the scheduler resumes it automatically via an APScheduler DateTrigger.

Config fields:

  • minutes: how long to wait before resuming

Output keys: none (the step simply pauses execution)

interactive_prompt

Asks the user a question via popup text and/or voice AI, then pauses pipeline execution until a response is received or timeout occurs. Designed for check-ins, safety confirmations, and escalation workflows where the system needs explicit user input before proceeding.

Config fields:

  • voice_prompt_template: voice prompt template with {{variable}} syntax for the Gemini Live voice channel
  • popup_message_template: popup message template with {{variable}} syntax for the PWA popup text channel
  • auto_escalate: when true, sets auto_escalate_triggered in pipeline_data if the user selects "escalate" or timeout occurs (default false)
  • escalate_button_text: text for the escalation button (default "I need help")
  • dismiss_button_text: text for the dismiss button (default "I'm okay")
  • countdown_seconds: timeout duration in seconds (5-300, default 30)
  • timeout_action: action to take when timeout occurs: "escalate" or "dismiss" (default "escalate")
  • output_key: key for storing response in pipeline_data (default "interactive_response")

At least one of voice_prompt_template or popup_message_template must be configured. Both channels can be used simultaneously for redundancy.

Output keys: pipeline_data[output_key] with channel ("pwa_popup_text", "pwa_realtime_ai", or "timeout"), action ("escalate" or "dismiss"), timestamp, and raw_response. When auto_escalate is enabled and escalation is triggered, auto_escalate_triggered: true is also written.

Workflow status: The execution status is set to "waiting_for_response" while the prompt is active. When a response arrives or timeout fires, the status transitions to "running" and the pipeline resumes from the next step.

Example: bathroom safety check-in

quiz_start

Starts an interactive quiz session via the companion PWA. Loads an approved Quiz by ID from the knowledge repository, creates a QuizSession, and opens the quiz dialog on the senior's PWA. Supports question randomization, per-senior deduplication, configurable session timeout, and voice instruction overrides.

Config fields:

  • quiz_id (required): ID of an approved Quiz to deliver
  • max_questions: limit the number of questions delivered (default 5)
  • randomize_order: randomize question order (default false)
  • session_timeout_minutes: max session duration before auto-complete (default 10)
  • per_senior_dedupe_hours: skip delivery if the same senior completed this quiz within N hours (default 12)
  • voice_instruction: override the default Gemini Live voice instruction for this quiz session
  • trigger_cooloff: whether successful delivery consumes the rule cool-off (default true)

Output keys: quiz_session with quiz_id, session_id, question_count, status, questions

Example: daily medication knowledge quiz

A cron rule fires at 10 AM every day and delivers a medication knowledge quiz:

yaml
# 1. Check home state to make sure the senior is awake and at home
step_type: home_state
config:
  output_key: home

# 2. Only proceed if the senior is home and awake
step_type: condition
config:
  expression: "home.home_at_home and not home.home_asleep"

# 3. Deliver the medication quiz
step_type: quiz_start
config:
  quiz_id: 3
  max_questions: 5
  randomize_order: false
  session_timeout_minutes: 15

A rule with trigger_type: occupancy_duration fires when the bathroom presence sensor has been on for 40 minutes. The pipeline sends a bilingual check-in prompt and waits for a response:

yaml
# 1. Translate the check-in message to Tamil
step_type: llm_call
config:
  model_id: gemma4_26b
  special_instructions: "Translate using informal Tamil as spoken in Chennai:"
  prompt: "Are you okay? Do you need help?"
  output_key: translation

# 2. Send interactive prompt via popup and voice
step_type: interactive_prompt
config:
  popup_message_template: "Are you okay?"
  voice_prompt_template: "{{translation}}"
  auto_escalate: true
  escalate_button_text: "I need help"
  dismiss_button_text: "I'm okay"
  countdown_seconds: 30
  timeout_action: escalate
  output_key: interactive_response

# 3. Branch on response
step_type: condition
config:
  expression: "auto_escalate_triggered == true"
  trigger_cooloff: true

# 4. If escalated, notify caregiver
step_type: notification
config:
  alert_level: emergency
  message_template: "Emergency: {{person_detections.0.name}} needs help in bathroom"
  channels: [telegram, pwa_popup_text, ha_speaker_tts]

If the user dismisses the prompt, the pipeline ends without triggering cool-off. If they select "I need help" or timeout occurs, auto_escalate_triggered is set, the condition step marks the run as cool-off-worthy, and the caregiver is alerted.

:::

Response correlation with voice AI

When using the voice_prompt_template channel, the prompt is sent to the Gemini Live voice companion with execution context metadata (execution_id, step_id) appended. The voice agent can then call the respond_to_interactive_prompt MCP tool to record the user's verbal response, which resumes the pipeline just like a popup button click.

Condition Expressions

All template and condition expressions use a unified Lark-based grammar with wrapping. The grammar supports paths, JMESPath pipes, comparisons, boolean operators, and built-in functions. Simple path references use a fast regex shortcut; complex expressions are parsed into an AST and evaluated.

Server-side validation catches typos and unknown paths at save time, so invalid expressions never persist.

Path Access

Access nested values in pipeline_data:

text
{{ steps.scene_1.outputs.count }}
{{ trigger.sensor_id }}
{{ system.local_time }}

JMESPath Pipes

Apply JMESPath filters and transformations:

text
{{ steps.scene_1.outputs.detections | length(@) }}
{{ steps.scene_1.outputs.detections[?label == 'person'] }}

Comparisons

text
{{ steps.scene_1.outputs.count > 3 }}
{{ steps.logic_1.outputs.alert_level == "emergency" }}
{{ steps.scene_1.outputs.label != null }}

Operators: ==, !=, >, <, >=, <=

Boolean Operators

text
{{ steps.scene_1.outputs.count > 0 and steps.logic_1.outputs.is_notification_needed == true }}
{{ not exists(steps.translate_1.outputs) or contains(steps.scene_1.outputs.description, "empty") }}

Built-in Functions

FunctionDescription
contains(haystack, needle)Substring or list-membership test (case-sensitive)
icontains(haystack, needle)Case-insensitive substring test
length(value)Length of string, list, or dict
lower(value)Convert to lowercase
upper(value)Convert to uppercase
keys(value)Dict keys as a list
values(value)Dict values as a list
exists(path)True if the path resolves to a non-None value

Examples

text
# Notify only if a person was detected and reasoning says to
{{ steps.identify_1.outputs.person_detections | length(@) > 0 and steps.logic_1.outputs.is_notification_needed == true }}

# Skip translation if no notification message exists
{{ exists(steps.logic_1.outputs.notification_message) }}

# Branch based on alert level
{{ steps.logic_1.outputs.alert_level == "emergency" }}

Expression validation runs on every step save. A typo like steps.scene_anaylsis_1.outputs.count is rejected with a suggestion pointing to the correct label.

Prompt Templates

Several step config fields support {{variable}} template syntax: the prompt and special_instructions fields in llm_call; the person_id, activity_type, and room_name fields in activity_detection; and the person_id and room_name fields in verification conditions. At execution time, placeholders are replaced with values from pipeline_data and trigger context.

Syntax

text
{{key}}              -- top-level pipeline_data key
{{key.subkey}}       -- nested dict access
{{key.0.name}}       -- list index + nested access
{{room_name}}        -- trigger context value
{{trigger.sensor_id}} -- explicit trigger namespace
{{system.local_time}} -- localized executor-injected system value

Unresolvable placeholders are left as-is so the LLM still sees the intent.

Available Variables

Any key in pipeline_data is available. The admin UI's template editors load the full reference from GET /api/v1/pipeline/data-keys and surface it as autocomplete when you type {{ in a templated field. Common variables:

VariableSourceExample Value
vision_responsellm_call step (output_key: vision_response)"A person is standing at the stove"
person_detections.0.nameperson_identification step"grandma"
person_detections.0.confidenceperson_identification step0.92
logic_response.user_notificationllm_call step (output_key: logic_response)"Stove left on"
system.local_timeexecutor-injected system context (12-hour, no leading zero)"8:42 AM"
system.local_time_24hexecutor-injected system context (24-hour)"08:42"
system.local_day_of_weekexecutor-injected system context"Sunday"
system.local_day_ordinalexecutor-injected system context (with suffix)"29th"
system.local_date_longexecutor-injected system context (friendly long date)"March 29th, 2026"
system.local_date_friendlyexecutor-injected system context (day plus month)"Sunday, March 29th"
trigger_input.reasonwebhook trigger payload"medication_missed"
room_nametrigger context"Kitchen"
sensor_idtrigger context"kitchen_cam_01"

Template Examples

Vision prompt:

text
Look at the person in the {{room_name}}. Are they using the stove safely?

Logic reasoning prompt:

text
{{vision_response}}

The person identified is {{person_detections.0.name}}.
Determine if they need a reminder about stove safety.

Notification template:

text
Reminder for {{person_detections.0.name}} at {{system.local_time}} in the {{room_name}}.

Translation source text:

text
{{logic_response.user_notification}}

Example Pipeline Configurations

Camera Monitoring

The classic detect-analyze-notify chain using the unified llm_call step:

text
person_identification → llm_call (vision, output_key: vision_response)
  → llm_call (reasoning, output_key: logic_response)
  → llm_call (translation, output_key: translation)
  → notification

Each llm_call step selects its own model_id, so vision reasoning can use Cosmos Reason2 while logic and translation use Gemma 4 on the same GPU node.

Lunch Reminder

Identify who is in the room, record a lunch activity attributed to them, wait, then verify against the database and remind if they haven't eaten:

text
person_identification → activity_detection (activity_type: "lunch", person_id: {{person_detections.0.person_id}}) → wait (30 min) → verification → notification

Light Monitor

Analyze, decide, notify caregiver, wait for response, then act:

text
llm_call (vision) → llm_call (reasoning, output_key: logic_response) → notification → wait (5 min) → verification → ha_action

Conditional Alert Escalation

Use conditions to route based on severity:

text
person_identification → llm_call (vision) → llm_call (reasoning, output_key: logic_response) → condition
  ├── (true: emergency) → notification [emergency level] → ha_action
  └── (false: routine)  → notification [info level]

Occupancy Safety Alert

Triggered by the occupancy_duration trigger type when a presence sensor has been on for longer than the configured threshold. The pipeline below sends a multilingual voice prompt asking if the person needs help:

text
llm_call (translation, output_key: translation) → notification [alert_level: warning, channels: [pwa_popup_text, telegram, pwa_realtime_ai]]

Rule configuration:

  • trigger_types: ["occupancy_duration"]
  • primary_sensor_id: the presence sensor to watch (e.g. bathroom_sensor_01)
  • occupancy_config: {"min_minutes": 40}
  • cool_off_minutes: 30, which prevents re-firing until a cool-off-triggering execution ages out
  • Context filters: add time_range, person_presence, or room filters as needed

The llm_call step localises the message before the notification step dispatches it. The pwa_realtime_ai channel initiates an interactive voice check-in via Gemini Live; the pwa_popup_text and telegram channels alert the admin console and caregiver simultaneously.

Dementia Signal Trigger

Rules with trigger_types containing "dementia_signal" fire when the CTS subscriber receives a behavioral signal from the tracking orchestrator that passes the per-person alert configuration gate. This allows caregivers to author rules that respond to pacing, sundowning, bathroom dwell anomalies, absence, or other CTS-detected behavioral patterns.

Dispatch path: DementiaSignalSubscriber persists every signal to the database, then checks whether the signal is enabled for that person via HouseholdMember.cts_alert_config. If allowed, it calls PipelineExecutor.fire_event, which queries all rules with trigger_types containing "dementia_signal" and evaluates their dementia_signal context filters against the event payload.

WARNING

dementia_signal triggers require CTS to be enabled (cts.enabled: true in config/settings.yaml). The feature also requires each household member to be enrolled with an appropriate alert profile (Senior, Adult, or Presence only). See Continuous Tracking for configuration details.

Rule settings tab:

  • trigger_types: ["dementia_signal"]
  • Add a dementia_signal context filter to scope by signal kind, person, severity, or time window:
    • kinds: list of signal kinds to match (pacing, sundowning_index, bathroom_dwell_anomaly, nighttime_movement, stillness_anomaly, absence, room_revisit_rate). Empty matches any kind.
    • person_ids: list of person identity IDs. Empty matches any person.
    • min_severity: numeric threshold where 0.33 = info, 0.66 = warning, 1.0 = emergency.
    • cooldown_minutes: suppress repeated matches for N minutes per (rule, person, kind). Uses DementiaSignal.acknowledged_at from the Alerts UI.
Example: pacing alert to caregiver Telegram

When grandma has been pacing at warning severity, send an alert to Telegram with current room context:

text
Rule: trigger_types=["dementia_signal"]
Context filters:
  - dementia_signal: kinds=["pacing"], min_severity=0.66, cooldown_minutes=30
Pipeline:
  1. presence_query (person_id: grandma, output_key: presence)
  2. notification   (channels: [telegram, pwa_popup_text],
                     telegram_template: "Grandma is pacing in {{presence.room_name}}.")

Telegram Command Trigger

Rules with trigger_type: telegram fire when a matching Telegram bot command is received. The TelegramTriggerService polls the Bot API on a short interval (default 5 s, configurable via notifications.telegram.trigger_poll_interval_seconds). The service only starts when a Telegram bot token is configured.

Rule settings tab:

  • trigger_types: ["telegram"]
  • telegram_trigger_config.command: the command to match, e.g. /medication. Leave empty to match any command.
  • telegram_trigger_config.allowed_chat_ids: list of Telegram chat IDs that may fire the rule. Falls back to notifications.telegram.trigger_allowed_chat_ids in settings. An absent or empty whitelist blocks the command (fail-closed).
  • telegram_trigger_config.respond_with_ack: send a brief reply confirming the rule was triggered (default true).

Dispatch path: identical to webhook triggers. A TriggerContext(trigger_type="telegram") is built and executed by PipelineExecutor. The command payload is available via trigger_input keys.

Example: medication reminder on demand

A caregiver or the senior sends /medication in the family Telegram group. The rule records a medication activity and sends a Tanglish voice confirmation:

text
Rule: trigger_type=telegram, command=/medication
Pipeline:
  activity_detection (activity_type: medication, person_id: {{trigger_input.from_user.id}})
  → llm_call (translation, prompt: "Confirm: medication taken. Translate to Tamil.")
  → notification [channels: telegram, ha_speaker_tts]

The notification step sends the translated confirmation back to Telegram and plays it over the home speaker.

Real-world Examples

The examples below assume a household with one senior (grandma) and a small set of caregivers. Each example names the trigger, every filter, every step, and every notification channel so you can replicate it end-to-end. All filter and step type names match what GET /api/v1/pipeline/filter-types and GET /api/v1/pipeline/step-types return on a current build.

Stove caution when grandma is alone in the kitchen

The goal: when grandma is the only person at home and steps into the kitchen, gently remind her about stove safety. The reminder fires at most once per hour even if she walks in and out.

Rule settings:

  • trigger_types: ["sensor_event"]
  • primary_sensor_id: kitchen camera
  • cool_off_minutes: 60

Filters (all must pass):

FilterConfigNegate
roomroom_name: Kitchenno
home_stateperson_id: grandma, gate on at_homeno
person_presenceperson_id: grandmano
person_presenceperson_id: caregiver_1yes
person_presenceperson_id: caregiver_2yes
time_range06:00 - 22:00 (avoid waking her at night)no

Pipeline:

text
1. scene_analysis  (run_detect=true, run_describe=true, run_hazards=true)
2. condition       expression: any_in(scene_detections.*.label, ["stove","oven","cooktop"])
                   on_true → step 3, on_false → end
3. notification    channels: [pwa_tts_announcement, eink]
                   pwa_tts_announcement_template: "Hi grandma, please be careful around the stove. Turn it off when you're done cooking."
                   eink_template_id: caution_template
                   eink_expiry_minutes: 30

The TTS announcement plays from the kitchen PWA tablet; the e-ink display in the kitchen also shows the message. cool_off_minutes: 60 means subsequent stove sightings within an hour record EventLog.status = ignored and do not re-fire.

Confirm grandma had lunch and send images to a caretaker

The goal: between 12:00 and 14:00, observe the kitchen periodically. If grandma appears to be eating, record a meal_lunch activity with the VLM's full reasoning attached, and forward an annotated image to the caretaker on Telegram.

Rule settings:

  • trigger_types: ["cron"]
  • Cron trigger: */10 12-13 * * * (every 10 minutes from 12:00 to 13:50, managed via CronTrigger model)
  • cool_off_minutes: 120 (one lunch confirmation per day is enough)

Filters: none beyond cron.

Pipeline:

text
1. person_identification  include_annotated_image: true, target_persons: ["grandma"]
2. condition              expression: person_detections.count > 0
                          on_true → step 3, on_false → end
3. llm_call (vision)      model_id: cosmos_reason2
                          image_source: both
                          additional_room_names: ["Kitchen"]
                          image_time_filter.since_minutes: 30
                          sort_by_sensor_then_time: true
                          response_format: json_schema
                          response_json_schema: {ate_lunch: bool, evidence: str, confidence: float}
                          output_key: vision_response
4. condition              expression: vision_response.ate_lunch == true
                          on_true → step 5, on_false → end
5. activity_detection     activity_type: meal_lunch
                          person_id: "{{person_detections.0.person_id}}"
                          confidence: "{{vision_response.confidence}}"
                          capture_scene_description: true
                          scene_description_key: vision_response
                          metadata_extra: '{"reasoning": "{{vision_response.evidence}}"}'
6. notification           channels: [telegram]
                          telegram_template: "Grandma had lunch. {{vision_response.evidence}}"
                          telegram_attach_images: true   # forwards annotated_images from step 1

The activity_detection step writes a row that downstream rules can read via verification. The cool_off_minutes: 120 plus meal_lunch activity record is what stops the next rule (below) from also firing.

Reminder when grandma has not had lunch by 14:00

The goal: at 14:00, check whether a meal_lunch activity was recorded for grandma since 11:00. If not, remind her over the kitchen tablet and e-ink, and notify caregivers.

Rule settings:

  • trigger_types: ["cron"]
  • Cron trigger: 0 14 * * * (managed via CronTrigger model)
  • dependencies: must depend on the lunch-confirmation rule (above) so this rule does not fire on a day where the database write hasn't propagated.

Pipeline:

text
1. verification           conditions:
                            - activity_type: meal_lunch
                              person_id: grandma
                              window_start: "11:00"
                              window_end:   "14:00"
                              min_confidence: 0.6
                          On at-least-one match → end (status: ignored).
2. notification           channels: [telegram]
                          telegram_template: "Reminder: grandma hasn't been recorded as having lunch yet today."
3. notification           channels: [pwa_tts_announcement, eink]
                          pwa_tts_announcement_template: "Grandma, it's lunch time. Would you like me to suggest something?"
                          eink_template_id: lunch_reminder

verification returns success and stops the pipeline (should_continue=false) when the activity exists, so the notifications never fire on a normal day.

Unknown person enters when grandma is alone

The goal: if grandma is the only person home and an unidentified face is detected on the entry camera, send an emergency Telegram alert with an annotated image.

Rule settings:

  • trigger_types: ["sensor_event"]
  • primary_sensor_id: entry camera
  • cool_off_minutes: 5 (we want quick re-firing if the person stays)

Filters:

FilterConfigNegate
home_stateperson_id: grandma, gate on at_homeno
person_presenceperson_id: caregiver_1yes
person_presenceperson_id: caregiver_2yes

Pipeline:

text
1. person_identification  include_annotated_image: true
2. condition              expression: any(person_detections, d -> d.identity == "unknown")
                          on_true → step 3, on_false → end
3. notification           channels: [telegram, pwa_popup_text]
                          alert_level: emergency
                          telegram_template: "Unknown person at the door while grandma is home alone."
                          telegram_attach_images: true

This rule has zero LLM calls. It relies entirely on person identification and the home_state + person_presence filters, which makes it cheap enough to run on every entry-camera frame.

Composing reminders with interactive_prompt

The reminder examples above push notifications and stop. To turn a reminder into a two-way exchange, replace the final notification with an interactive_prompt step. It pushes the question through the same channels, persists a pending response, and resumes the workflow with the answer in pipeline_data["interactive_response"]. A downstream condition step can then branch on yes / no / timeout.

Cron Triggers

Cron schedules are decoupled from rules through a dedicated CronTrigger model with a many-to-many relationship. A rule can have multiple cron schedules, and multiple rules can share the same schedule.

Cron-triggered rules go through RulesEngine like sensor events, so context filters (time_range, day_of_week, person_presence), dependencies, and rate limits all apply. The scheduler creates one APScheduler job per CronTrigger, not per rule.

The admin UI includes a Cron Builder with preset modes: Daily, Weekly, Hourly, Every N Minutes, and Custom. A live "next 5 runs" preview shows when the rule will fire in the operator's timezone. Raw cron expressions are always editable for power users.

The POST /api/v1/pipeline/cron/preview endpoint validates expressions and returns parsed structure plus next-run timestamps.

Rule Import/Export

Rules can be exported to portable YAML or JSON bundles and imported across installations. Bundles use stable string identifiers (labels, type names, sensor ids) instead of database primary keys, so they survive round-trips between different installs.

  • Export: GET /api/v1/rules/{id}/export returns a self-contained RuleBundle document with steps, contexts, dependencies, cron expressions, and external references
  • Import preview: POST /api/v1/rules/import/preview validates a bundle and returns an ImportReport with per-step migration status and any warnings, without writing to the database
  • Import commit: POST /api/v1/rules/import commits a validated bundle within a single transaction. All-or-nothing: if any step fails validation or any reference is unresolvable, the entire import rolls back
  • Migration chains: Each step handler can declare ConfigMigration entries that transform config dicts from older schema versions to the current one. Migrations are pure functions with unit tests

Bundles are also accessible to AI agents through the MCP server (get_rule_bundle, import_rule_bundle).

Live Pipeline Activity View

The Process Activity view (/activity/process) provides real-time visibility into pipeline runs and CTS ingest activity. It is backed by PipelineRunService and a dedicated WebSocket channel.

Pipeline run endpoints

MethodPathDescription
GET/api/v1/pipeline/runsList recent pipeline runs; ?status=active returns only running/waiting executions
GET/api/v1/pipeline/runs/{execution_id}Single run envelope (PipelineRunEnvelope)
GET/api/v1/pipeline/ingest/activityRecent reCamera ingest events (IngestActivityEnvelope)

WebSocket live channel

Connect to GET /ws/pipeline (authenticated via sec-websocket-protocol) for a push feed of PipelineExecutionEvent messages. Each event carries the execution ID, rule name, status transition, current step label, and elapsed milliseconds. The ProcessActivityView uses the useLivePipeline composable to manage connection state and display a live DAG of the running pipeline steps.

Connection-state rendering rules:

  • Connecting: show a spinner; do not display a stale DAG as live data.
  • Connected: stream events and update the step timeline in real time.
  • Disconnected: freeze the last known state and show a reconnection indicator; useLivePipeline applies 3-second exponential backoff.

PipelineRunEnvelope

Each run envelope carries:

FieldDescription
execution_idUnique execution identifier
rule_id / rule_nameThe rule that triggered the run
statusrunning, waiting, completed, failed, or cancelled
triggered_atISO-8601 UTC start timestamp
stepsList of step summaries with label, type, status, and elapsed_ms
trigger_typeHow the run was initiated

Workflow Execution

When a rule's pipeline is triggered, a WorkflowExecution record tracks the full lifecycle:

StatusMeaning
runningPipeline is actively executing steps
waitingPaused at a wait or interactive_prompt step, will resume at resume_at time
completedAll steps finished successfully
failedA step failed or the pipeline timed out
cancelledManually cancelled via the API or admin UI

Cancel: Running or waiting executions can be cancelled from the Live Run tab or via POST /api/v1/workflows/{id}/cancel. The executor checks for cancellation between steps (cooperative cancellation) and stops at the next step boundary.

Rerun: Any execution can be rerun from the beginning via POST /api/v1/workflows/{id}/rerun. This copies the original trigger context and re-executes all enabled steps. Useful for testing rule changes against the same trigger.

Execution detail: GET /api/v1/workflows/{id}/detail returns a rich view model with a step-by-step timeline showing labels, icons, categories, elapsed seconds, success/failure status, outputs, and errors. The same component renders both live and historical executions.

Per-step timeout: Each step has a 60-second timeout to prevent stuck LLM calls from hanging the pipeline. If a step times out, it is marked failed and the pipeline continues.

All intermediate results are persisted in pipeline_data_json on both the WorkflowExecution and the EventLog for debugging and auditability. Step timings include labels, elapsed seconds, and success/failure status.

Separately, the related EventLog is finalized as either completed or ignored. Only completed events participate in rule cool-off checks.

Released under the AGPL-3.0 License.