Composable Pipelines
The pipeline system is the core of Cognitive Companion. Each rule defines its own graph of pipeline steps executed by the PipelineExecutor. Caregivers and operators connect steps on a visual canvas, branch through output ports, and review live or historical runs in the execution inspector.
Step handlers are self-contained plugins, each in its own file under backend/steps/builtin/, auto-discovered at startup via StepRegistry. The frontend loads available step types dynamically from the GET /api/v1/pipeline/step-types endpoint, so new plugins appear automatically in the step palette and config editor. See Extending the Pipeline for how to add custom step types.
How Pipelines Work
When a rule is triggered, the executor:
- Creates a
WorkflowExecutionrecord to track progress - Initializes a shared
pipeline_datadictionary - Captures an immutable graph snapshot for later inspection
- Finds entry nodes and executes enabled steps through
PipelineEdgerows - Merges each step's output into
pipeline_datafor downstream steps - Traverses the output ports returned by each step
- Handles waiting, cancellation, completion, and error states
@dataclass
class StepResult:
success: bool = True
data: dict = field(default_factory=dict) # Merged into pipeline_data
should_continue: bool = True
output_ports: tuple[str, ...] = ("main",) # Runtime ports to traverse
wait_until: datetime | None = None # For wait/resumeEvery step receives the full pipeline_data dictionary and a TriggerContext with metadata about what initiated the execution:
@dataclass
class TriggerContext:
trigger_type: str # "sensor_event", "cron", "manual", "webhook", "occupancy_duration", "telegram", "cts_window", "dementia_signal"
sensor_id: str | None
room_name: str | None
media_paths: list[str]
media_type: str | None
webhook_payload: dict | None # Payload from webhook and Telegram triggers
occupancy_duration_minutes: float | None # Set for occupancy_duration triggersFor webhook and telegram triggers, the payload is also available in pipeline_data["trigger_input"]. For occupancy_duration triggers, occupancy_duration_minutes reflects how long the sensor has been continuously occupied at the moment the rule fired.
Telegram trigger payload keys (accessible as {{trigger_input.command}} etc.):
| Key | Description |
|---|---|
trigger_input.command | The matched command, e.g. "/medication" |
trigger_input.args | List of words following the command |
trigger_input.text | Raw message text |
trigger_input.chat_id | Telegram chat ID as a string |
trigger_input.from_user | Telegram user object (id, first_name, username) |
See Telegram Command Triggers below for configuration details.
The executor also injects a localized system object into pipeline_data using app.timezone from settings.yaml. The namespace covers time, day, date, and friendly composite strings so templates can reference any common format without further conversion:
{
"system": {
"local_time": "8:42 AM",
"local_time_24h": "08:42",
"local_hour_12h": "8",
"local_hour_24h": "08",
"local_minute": "42",
"local_ampm": "AM",
"local_date": "2026-03-29",
"local_day_of_week": "Sunday",
"local_day_of_week_short": "Sun",
"local_day_of_month": 29,
"local_day_ordinal": "29th",
"local_month_name": "March",
"local_month_name_short": "Mar",
"local_month_number": 3,
"local_year": 2026,
"local_date_long": "March 29th, 2026",
"local_date_friendly": "Sunday, March 29th",
"timezone": "America/New_York"
}
}This makes local wall-clock values available to prompts and notification templates without requiring each step to compute them independently. The complete list of keys is served by GET /api/v1/pipeline/data-keys and surfaced as autocomplete suggestions in the admin UI template editors.
Graph Authoring
The rule canvas stores two resources: PipelineStep nodes and PipelineEdge connections.
| Resource | API | Purpose |
|---|---|---|
| Steps | GET/POST/PUT/DELETE /api/v1/rules/{rule_id}/steps | Create and configure pipeline nodes |
| Positions | PUT /api/v1/rules/{rule_id}/steps/positions | Persist canvas coordinates |
| Edges | GET/PUT /api/v1/rules/{rule_id}/edges | Read or atomically replace graph connections |
| Validation | POST /api/v1/rules/{rule_id}/validate | Check templates and graph structure |
Most step types expose one main output port. The condition step exposes true and false, so the canvas can connect each branch to a different downstream step. A source step can have one outgoing edge per output port.
Execution review uses GET /api/v1/workflows/{execution_id}/detail, which returns the graph snapshot, timeline, skipped nodes, activated output ports, outputs, and errors. Lightweight live lists use GET /api/v1/pipeline/runs.
Cool-Off Behavior
Rule cool-off is driven by EventLog.status == "completed". A workflow that runs successfully but does not perform a terminal action is now recorded as ignored, so analytical or verification-only passes do not consume the rule's cool-off window.
Built-in steps that can explicitly mark a run as cool-off-worthy:
notification:trigger_cooloffdefaults totrueha_action:trigger_cooloffdefaults totrueactivity_detection:trigger_cooloffdefaults totruecondition:trigger_cooloffdefaults tofalse, and only applies when the expression evaluates totrue
These steps add _cooloff_triggered to pipeline_data, and the executor uses that flag when finalizing the related event log.
Pipeline Step Types
Perception
person_identification
Sends media frames to the person identification service for face recognition. Returns detected identities with confidence scores and bounding boxes. Records sightings and updates person location state.
Config fields:
include_annotated_image: return frames with bounding boxes and name labels drawn over facesconfidence_threshold: minimum confidence to accept an identification (default from settings)target_persons: optional comma-separated list of person IDs to filter for
Output keys: person_detections (list of detections with identity, confidence, bbox), annotated_images (if enabled)
activity_detection
Record a single activity to the PersonActivity table. All fields support prompt templates, so values can be fixed strings or resolved from any upstream step output or trigger context. Use multiple steps in sequence to record multiple activities.
Config fields:
activity_type(required): activity to record. Supports templates (e.g.{{logic_response.activity_type}}or a literal like"bathroom_occupancy"). The UI offers 30+ pre-programmed suggestions (eating, sleeping, medication, bathing, etc.) but any free-form string is accepted.person_id(optional): person to attribute the activity to. Supports templates (e.g.{{person_detections.0.person_id}}). Leave empty to record as unknown person.room_name(optional): room where the activity occurred. Supports templates (e.g.{{room_name}}). Defaults to the trigger room when empty.confidence: confidence score (0-1). Accepts a fixed number or{{template}}syntax (e.g.{{logic_response.confidence}}). Defaults to0.8.capture_scene_description(bool, defaultfalse): whentrue, saves the upstream vision model output intometadata_json.scene_descriptionalongside the activity record. Creates a complete audit trail linking what was observed to what was recorded.scene_description_key(str, default"vision_response"): whichpipeline_datakey to read whencapture_scene_descriptionis enabled. Override to"llm_response"or any other key when usingllm_callwith a customoutput_key.metadata_extra(str, optional): a JSON string (supports{{template}}syntax) merged intometadata_jsonalongside any captured scene description. Useful for recording structured reasoning alongside the activity:{"reasoning": "{{logic_response.reasoning}}"}.trigger_cooloff: whether a successful activity record should count toward the rule cool-off window. Defaults totrue.
Output keys: detected_activities (list with one entry, including metadata when populated), optional _cooloff_triggered
Detecting long-duration activities (meals, extended occupancy)
The VLM window in an llm_call step is bounded by the EventAggregator batch (typically 10 s to 1 min). To detect activities that unfold over a longer duration such as a meal, extend the temporal window on the upstream step:
step_type: llm_call
config:
model_id: cosmos_reason2
image_source: both
image_time_filter:
since_minutes: 30 # pull the last 30 minutes of frames from MinIO
prompt: "Looking at frames from the last 30 minutes, has the person had lunch?"
output_key: vision_responseThe activity_detection step then records the conclusion of that extended analysis. Enable capture_scene_description: true to save the VLM's full reasoning alongside the record.
Example: meal detection with scene capture
# 1. Pull 30 minutes of kitchen frames, ask the vision model
step_type: llm_call
config:
model_id: cosmos_reason2
image_source: both
additional_room_names: [Kitchen]
image_time_filter:
since_minutes: 30
sort_by_sensor_then_time: true
prompt: "Review all frames. Has {{person_detections.0.name}} eaten lunch today?"
output_key: vision_response
# 2. Record the activity with the scene description attached
step_type: activity_detection
config:
activity_type: meal_lunch
person_id: "{{person_detections.0.person_id}}"
confidence: "{{logic_response.confidence}}"
capture_scene_description: true
scene_description_key: vision_response
metadata_extra: '{"reasoning": "{{logic_response.reasoning}}"}'scene_analysis
Calls the standalone scene-analysis-service for multi-modal analysis of a trigger image. Three inference components run in a single request: YOLO11x object detection, Florence-2-large structured scene description, and CLIP ViT-L/14 image embeddings. A YAML-configured hazard rule engine evaluates detections against named hazard rules and emits alerts for matches.
The step always continues the pipeline. When the scene-analysis-service is unreachable or disabled, all result keys are empty.
Config fields:
run_detect(bool, defaulttrue): run YOLO11x object detectionrun_describe(bool, defaulttrue): run Florence-2-large scene descriptionrun_embed(bool, defaultfalse): run CLIP ViT-L/14 embedding (higher latency)run_hazards(bool, defaulttrue): evaluate YAML hazard rules against detectionsmax_images(int, default1): number of trigger images to send for analysis
Output keys:
| Key | Type | Description |
|---|---|---|
scene_detections | list | Object detections: label, confidence, bbox [x1,y1,x2,y2], class_id |
scene_description | str | Structured natural-language caption from Florence-2 |
scene_embedding | list[float] | 768-dim L2-normalized CLIP embedding vector (empty when run_embed is false) |
scene_hazards | list | Triggered hazard alerts: name, severity, description, detection |
scene_detector_available | bool | Whether YOLO was loaded in the service |
scene_describer_available | bool | Whether Florence-2 was loaded in the service |
scene_embedder_available | bool | Whether CLIP was loaded in the service |
Example: detect and alert on hazards
# 1. Run full scene analysis on the trigger frame
step_type: scene_analysis
config:
run_detect: true
run_describe: true
run_embed: false
run_hazards: true
max_images: 1
# 2. Branch on hazards
step_type: condition
config:
expression: "scene_hazards.count > 0"
# 3. Notify caregiver with the scene description
step_type: notification
config:
alert_level: warning
message_template: "Hazard detected: {scene_description}"
telegram_template: "Hazard in {{room_name}}: {{scene_description}}"Combining with person identification
Place person_identification before scene_analysis in the pipeline. The scene_detections output complements face detections by identifying objects in the frame. Use a condition step to branch on both person_detections.count > 0 and scene_hazards.count > 0 for context-aware alerts.
object_trend_analysis
Queries the semantic-memory-service for room-level object trend state: clutter scores, persistent/novel objects, and anomaly severity. Designed to precede a condition step (for rule branching) or an llm_call step (for LLM-enriched reasoning).
Config fields:
room_ids: list of room IDs to query. Empty = use the trigger room.include_snapshots_hours: if > 0, fetch raw hourly snapshots for LLM context.severity_threshold: anomalies below this severity are stripped (ok,info,warning,critical; defaultinfo).output_key: key under which the result map is written (defaultroom_trends).
Output keys:
| Key | Type | Description |
|---|---|---|
room_trends | dict | Maps room_id to trend result: clutter_score, trend_direction, overall_severity, persistent_objects, novel_objects, anomalies |
room_trends_any_warning | bool | Whether any room has severity >= warning |
room_trends_max_severity | str | Highest severity across all rooms |
room_trends_summary | str | Compact single-line text ready for LLM prompt injection |
Graceful degradation: when the semantic-memory-service is unreachable or returns no data, the step writes empty results and continues.
presence_query
Queries the fused PresenceService for the current location and status of a person. Aggregates data from multiple providers (night anchor, HA bed sensor, CTS location, HA device tracker, stale fallback) via the priority chain in config/presence.yaml. Also fetches recent dementia signals when services.signals is wired.
Config fields:
person_id: person to query. Leave blank to resolve from pipeline_data (sightings/persons).signal_kind: filter recent dementia signals to this kind (e.g.bathroom_dwell_anomaly).signal_severity_min: minimum severity for signal match (info,warning,emergency; defaultinfo).signal_window_minutes: lookback for dementia signals in minutes (default30).output_key: pipeline_data key for the result dict (defaultpresence).
Output keys:
| Key | Type | Description |
|---|---|---|
{output_key}_status | str | Presence status: present_room, present_home, asleep, away, stale, unknown |
{output_key}_room_name | str | Current room name (or null) |
{output_key}_room_id | str | Current room ID (or null) |
{output_key}_dwell_minutes | float | Minutes in current room |
{output_key}_confidence | float | Provider confidence (0-1) |
{output_key}_signals | list | Recent dementia signals matching filters |
{output_key}_providers | list | Active presence providers that contributed |
{output_key}_timestamp | str | ISO-8601 snapshot timestamp |
presence_at_home | bool | Convenience flat key: person is at home |
presence_asleep | bool | Convenience flat key: person is asleep |
presence_away | bool | Convenience flat key: person is away |
Graceful degradation: when the PresenceService returns UNKNOWN or STALE, all keys default to empty/null and presence_at_home / presence_asleep / presence_away are all false.
home_state
A thin convenience wrapper around presence_query that emits exactly four boolean flags. Designed for use with condition steps that react to high-level home-state changes.
Config fields:
person_id: person to query. Leave blank to resolve from pipeline_data.output_key: pipeline_data key prefix for the result (defaulthome).
Output keys:
| Key | Type | Description |
|---|---|---|
{output_key}_at_home | bool | Person is in a known room or asleep (status present_room, present_home, or asleep) |
{output_key}_asleep | bool | Person is asleep (status asleep) |
{output_key}_away | bool | Person is away from home (status away) |
{output_key}_state_unknown | bool | Status is unknown or stale |
These mirror the four boolean outputs of the home_state context filter, so the same logic used in rule filters is available inside a pipeline. When the presence service is unavailable, all four flags are false.
semantic_memory_query
Queries the semantic-memory-service for recent scene observations, object presence, and hazard data in a room. Injects both structured results and a compact LLM-ready summary into pipeline_data.
Config fields:
room_id: explicit room ID to query. Overridesuse_trigger_room.use_trigger_room: whentrue, uses the trigger's room. Defaulttrue.objects_limit: max object presence records to return (default10).hazards_limit: max hazard observations (default5).observations_limit: max observation search hits (default20).within_minutes: lookback window in minutes (default60).min_confidence: minimum confidence for returned records (default0.0).output_key: pipeline_data key for the result dict (defaultmemory_context).
Output keys:
| Key | Type | Description |
|---|---|---|
{output_key}.recent_objects | list | Object presence records with label, confidence, first/last seen timestamps |
{output_key}.recent_hazards | list | Hazard observations with flags, severity, confidence |
{output_key}.observations | list | Observation search hits with description, embedding distance |
{output_key}.observations_count | int | Total matching observations |
{output_key}.summary | str | Compact single-paragraph text ready for LLM prompt injection |
Graceful degradation: when the semantic-memory-service is unreachable or disabled, the output contains empty lists and a "No memory context available." summary.
recamera_media_poll
Fetches recent reCamera images from the MediaCache via the EventAggregator and returns presigned MinIO URLs plus metadata. Snapshot semantics: reads what is currently in cache and returns immediately. To wait for new events to accumulate, place a wait step before this one.
The step pairs well with the Continuous Tracking data path. It lets a rule explicitly collect frames from one or more reCamera sensors or rooms, branch on the image count, then forward the snapshot to a downstream vision or scene-analysis step.
Config fields:
sensor_ids(list): reCamera sensor IDs to include. Empty means all cameras (subject toroom_namesfilter if set).room_names(list): room names to include. Combined withsensor_ids, only sensors in these rooms are returned.since_minutes(number, default5): return images captured within the last N minutes. Images older than theMediaCacheretention window are never returned regardless of this setting.images_per_sensor(int, default3): maximum images returned per sensor whensensor_idsare specified.sensor_frame_limits(dict): per-sensor overrides forimages_per_sensor. Keys are sensor IDs, values are integer frame limits.max_images(int, default10): hard cap on total images returned across all sensors.chronological(bool, defaulttrue): whentrue, images within each sensor are sorted oldest-first (better for temporal reasoning). Whenfalse, newest-first.
Output keys: images (list of presigned MinIO URLs), count, sensor_ids, room_names, since_minutes, polled_at (ISO-8601 UTC timestamp).
Graceful degradation: when the EventAggregator is not available, the step returns success=False with an empty payload so downstream steps can branch on it.
cts_window_poll
Pulls a window of recent Continuous Tracking System (CTS) frames enriched with detections, identities, room dwells, and optionally scene captions. Designed to be used inside a pipeline triggered by a reCamera sensor event, so the downstream LLM step can reason over high-quality CTS data even when the trigger fired on a lower-resolution reCamera frame.
The output payload shape is symmetric with the cts_window trigger so downstream template expressions are interchangeable between the two paths.
Config fields:
duration_s(number, required, default10): total window duration in seconds.sample_period_s(number, required, default1.0): downsample to one frame per this many seconds.cameras(list): CTS camera IDs to include. Empty means all CTS cameras. CTS camera IDs are distinct from reCamera sensor IDs.rooms(list): room names to include.lookback_s(number, default5): seconds before the trigger time to include in the window.lookahead_s(number, default5): seconds after the trigger time to wait and collect.include_scene(bool, defaultfalse): run scene analysis on sampled frames.include_pose(bool, defaultfalse): include pose keypoints. Requires the CTS pose pipeline (TD-005); the option is shown in the UI as disabled until that path is wired.max_frames(int, default30): hard cap on total frames returned.
Output keys: trigger_id, window_start, window_end, cameras, rooms, frames (list of enriched frame dicts: per-frame detections, identities, optional scene_caption), summary (distinct_identities, detection_count, rooms), partial (true when the bucketizer is not yet wired or when the downsampler trimmed frames).
WARNING
The CTS event bucketizer is the upstream feed for this step. When the bucketizer is not wired into the service container, cts_window_poll logs a warning and returns an empty window with partial: true. Downstream steps can branch on partial to handle the degraded path.
Reasoning
llm_call
The unified LLM step. Replaces vision_analysis, logic_reasoning, and translation with a single model-agnostic interface. The model is selected per step from the named registry in settings.yaml, so each step in a pipeline can use a different model without deploying multiple provider types.
Config fields:
model_id(required): ID of a model entry fromllm.modelsinsettings.yaml.prompt: prompt text (supports prompt templates).special_instructions: text prepended to the prompt before template rendering. Useful for translation style guides or system-level constraints.include_context: list ofpipeline_datakeys to include as context above the prompt. If empty,person_detectionsandvision_responseare auto-included when present.image_source:"none"(default),"trigger","additional", or"both". Image attachment is silently skipped when the selected model does not have thevisioncapability.max_images: hard cap on total images sent to the model (default5).additional_sensor_ids: camera sensor IDs to pull images from. Whensort_by_sensor_then_timeis enabled, the order of this list determines the grouping order.additional_room_names: pull images from all cameras in these rooms (unordered; for ordered multi-sensor analysis, useadditional_sensor_idsinstead).images_per_sensor: maximum images per sensor when sensor-ordered grouping is active (default3).sort_by_sensor_then_time: whentrue, images are grouped by sensor inadditional_sensor_idsorder, then sorted oldest-first within each group. This produces a temporally coherent sequence across sensors, enabling inter-frame analysis by vision reasoning models such as Cosmos Reason2.image_time_filter: optional object withsince_minutes,time_start,time_endto restrict which images are fetched.response_format:"text"(default),"json_schema", or"json_free". Controls whether the output is stored as a plain string or parsed as JSON.response_schema: natural-language description of the expected JSON format, appended to the prompt.response_json_schema: JSON Schema string. Whenresponse_formatis"json_schema"and the model hasguided_decoding: truein settings, the schema is sent asguided_jsonto the server (vLLM). For other servers, it is injected as a prompt instruction.output_key: thepipeline_datakey where the result is stored (default"llm_response"). Set to"logic_response","vision_response", or"translation"for compatibility with downstream steps that reference those keys.hallucination_marker: if this string is found in the response, the call is retried up to the model'smax_retriessetting (Tenacity). Useful for translation models with known failure modes.
Output keys: pipeline_data[output_key] (string when response_format is "text", parsed dict when "json_schema" or "json_free"). When output_key is "logic_response" and the response contains is_notification_needed: false, notification_suppressed: true is also written.
Example: vision reasoning with sensor-ordered frames
step_type: llm_call
config:
model_id: cosmos_reason2
prompt: "Analyze the sequence of frames from each camera. Has the person left the stove unattended?"
image_source: both
additional_sensor_ids: [kitchen_cam_1, kitchen_cam_2]
sort_by_sensor_then_time: true
images_per_sensor: 4
max_images: 10
response_format: json_schema
response_json_schema: |
{
"type": "object",
"properties": {
"stove_unattended": {"type": "boolean"},
"reasoning": {"type": "string"},
"confidence": {"type": "number"}
},
"required": ["stove_unattended", "reasoning"]
}
output_key: vision_responseExample: reasoning and notification decision
step_type: llm_call
config:
model_id: gemma4_26b
prompt: "Based on the analysis, should the caregiver be alerted?"
include_context: [vision_response, person_detections]
response_format: json_schema
response_json_schema: |
{
"type": "object",
"properties": {
"is_notification_needed": {"type": "boolean"},
"user_notification": {"type": "string"},
"alert_level": {"type": "string", "enum": ["emergency","warning","info","reminder"]},
"reasoning": {"type": "string"}
},
"required": ["is_notification_needed", "user_notification", "reasoning"]
}
output_key: logic_responseExample: translation with retry
step_type: llm_call
config:
model_id: gemma4_26b
special_instructions: "Translate using informal Tamil as spoken in Chennai (Tanglish):"
prompt: "Translate the following to Tamil:\n\n{{logic_response.user_notification}}"
response_format: text
output_key: translation
hallucination_marker: "சென்னை"condition
Evaluates a boolean expression against pipeline_data to control execution flow. The step activates the true or false output port, and graph edges decide which downstream step runs next.
Config fields:
expression: the condition expression to evaluatetrigger_cooloff: whether atrueresult should mark the run as cool-off-worthy. Defaults tofalse.
Output keys: condition.expression, condition.result, condition.branch, optional _cooloff_triggered
See Condition Expressions below.
verification
Query the PersonActivity database to verify whether household members completed, or did not complete, specific activities within a time window. No LLM calls are involved.
Config fields:
conditions: list of condition objects, each with:activity_type(str, required): the activity type to look forperson_id(str, optional): person to check; supports prompt templates (e.g.{{person_detections.0.person_id}}). Leave empty to match any person.room_name(str, optional): room to filter by; supports templates (e.g.{{room_name}}). Leave empty to match any room.completed(bool, defaulttrue): whether the activity should have been completedwithin_minutes(float): relative time window from nowwindow_start/window_end(ISO-8601 timestamp): fixed wall-clock window (alternative towithin_minutes). The stored time is re-anchored to today's date inapp.timezonebefore querying.min_confidence(float, default0.5): minimum confidence threshold for matching records
match_mode:"all"or"any"(default"all")re_notify_if_failed: bool (defaultfalse), re-trigger notification on verification failurere_notify_delay_minutes: int (default5), delay before re-notification
Output keys: verification.verified (bool), verification.match_mode, verification.matched_conditions, verification.unmatched_conditions
Action
notification
Dispatches an alert to configured notification channels based on alert level. The NotificationDispatcher can route to pwa_popup_text, Telegram, e-ink, ha_speaker_tts, pwa_realtime_ai, pwa_tts_announcement, and outbound webhook channels.
Config fields:
alert_level:emergency,warning,info, orreminderchannels: optional override of which channels to use (completely replaces the defaults fromnotifications.yamlwhen specified)message_template: standard Python format string with{message},{room}, and anypipeline_datakey representing the default broadcast text.telegram_template/eink_template/tts_template/webhook_template: channel-specific template overrides allowing you to format messages optimally for a specific medium.webhook_templateshould render a JSON string when you want to control the outbound payload body directly.eink_targets: optional list of sensor IDs for targeted e-ink display renderingha_media_player: optional HA media_player entity ID for targeted TTS playbackwebhook_url: optional per-step override for the outbound webhook destinationtrigger_cooloff: whether a successful notification dispatch should count toward the rule cool-off window. Defaults totrue.
Output keys: notification_dispatched (boolean), notification_channels (dict of channel -> success), optional _cooloff_triggered
ha_action
Calls a Home Assistant service. Can turn on lights, lock doors, activate scenes, trigger HA automations, or any other HA service call.
Config fields:
domain: Home Assistant domain to call (e.g.light,lock,script)service: Home Assistant service name within that domain (e.g.turn_on,lock)entity_id: the target entitydata: additional JSON payload to pass to the servicetrigger_cooloff: whether a successful Home Assistant action should count toward the rule cool-off window. Defaults totrue.
Output keys: ha_action.domain, ha_action.service, ha_action.entity_id, ha_action.success, optional _cooloff_triggered
activity_session_start
Open a duration-aware activity session for a person. Idempotent: reuses an existing open session of the same type if one already exists. Stores timeout configuration for automatic stale-session cleanup.
Config fields:
activity_type(required): activity type (e.g.sleep,bathroom,meal_prep). Supports{{template}}syntax.person_id: person to attribute this session to. Supports templates (e.g.{{person_detections.0.person_id}}).room_name: room where the activity occurs. Supports templates. Defaults to trigger room.confidence: detection confidence (0-1). Accepts a fixed number or{{template}}syntax (default0.85).timeout_minutes: maximum session duration in minutes before auto-close. Uses built-in default for the activity type when empty. Supports templates.metadata_extra: optional JSON string of extra fields to merge into session metadata. Supports{{template}}syntax.output_key:pipeline_datakey to write the session result under (defaultsession).
Output keys: pipeline_data[output_key] with session_id, person_id, activity_type, room_name, started_at, timeout_minutes, was_existing
activity_session_end
Close an open duration-aware activity session for a person. Computes duration from open to close. Optionally records a PersonActivity with duration_minutes populated. If no open session exists, logs a warning and continues.
Config fields:
activity_type(required): activity type to close. Supports{{template}}syntax.person_id: person to close the session for. Supports templates.write_activity_record: whentrue, also records aPersonActivitywithduration_minutespopulated (defaulttrue).output_key:pipeline_datakey to write the closed session result under (defaultclosed_session).
Output keys: pipeline_data[output_key] with session_id, person_id, activity_type, started_at, closed_at, duration_minutes, status, closed_via
daily_report
Generate end-of-day activity reports for one or all household members. Aggregates sleep, meals, medication, bathroom, door events, exercise, and location data into structured DailyReport records with wellness scoring. Designed to be triggered by a cron rule at end of day.
Config fields:
person_ids: list of person IDs to generate reports for. Empty list means all active household members.report_date_offset_days: days offset from today for the report date (0 = today, -1 = yesterday).generate_summary_text: whentrue, generates an LLM prose summary of the day.summary_model_id: LLM model ID to use for summary generation (defaultgemma4_26b).notify_on_complete: whentrue, sends a notification when reports are ready.output_key:pipeline_datakey to write the report results under (defaultdaily_reports).
Output keys: pipeline_data[output_key] (list of report results with person_id, report_date, report_id, wellness_score)
info_card
Delivers a curated info card to the senior via PWA popup, e-ink display, or both. Loads an approved InfoCard by ID from the knowledge repository, resolves its image slots through the image pipeline, and dispatches via the knowledge delivery service. A voice instruction override allows per-delivery Gemini Live behavioral guardrails.
Config fields:
info_card_id(required): ID of an approvedInfoCardto deliverchannels: list of delivery channels:pwa,eink, or both (default["pwa"])pwa_dismiss_seconds: auto-dismiss timeout for the PWA popup (default 60)eink_expiry_minutes: how long the e-ink image stays active before refresh (default 30)voice_instruction: override the default Gemini Live voice instruction for this deliverytrigger_cooloff: whether successful delivery consumes the rule cool-off (defaulttrue)
Output keys: info_card_delivery with info_card_id, layout_id, delivered_channels, delivery_id
Flow
wait
Pauses pipeline execution for a configured duration. The execution state is persisted to the database and the scheduler resumes it automatically via an APScheduler DateTrigger.
Config fields:
minutes: how long to wait before resuming
Output keys: none (the step simply pauses execution)
interactive_prompt
Asks the user a question via popup text and/or voice AI, then pauses pipeline execution until a response is received or timeout occurs. Designed for check-ins, safety confirmations, and escalation workflows where the system needs explicit user input before proceeding.
Config fields:
voice_prompt_template: voice prompt template with{{variable}}syntax for the Gemini Live voice channelpopup_message_template: popup message template with{{variable}}syntax for the PWA popup text channelauto_escalate: whentrue, setsauto_escalate_triggeredinpipeline_dataif the user selects "escalate" or timeout occurs (defaultfalse)escalate_button_text: text for the escalation button (default"I need help")dismiss_button_text: text for the dismiss button (default"I'm okay")countdown_seconds: timeout duration in seconds (5-300, default30)timeout_action: action to take when timeout occurs:"escalate"or"dismiss"(default"escalate")output_key: key for storing response inpipeline_data(default"interactive_response")
At least one of voice_prompt_template or popup_message_template must be configured. Both channels can be used simultaneously for redundancy.
Output keys: pipeline_data[output_key] with channel ("pwa_popup_text", "pwa_realtime_ai", or "timeout"), action ("escalate" or "dismiss"), timestamp, and raw_response. When auto_escalate is enabled and escalation is triggered, auto_escalate_triggered: true is also written.
Workflow status: The execution status is set to "waiting_for_response" while the prompt is active. When a response arrives or timeout fires, the status transitions to "running" and the pipeline resumes from the next step.
Example: bathroom safety check-in
quiz_start
Starts an interactive quiz session via the companion PWA. Loads an approved Quiz by ID from the knowledge repository, creates a QuizSession, and opens the quiz dialog on the senior's PWA. Supports question randomization, per-senior deduplication, configurable session timeout, and voice instruction overrides.
Config fields:
quiz_id(required): ID of an approvedQuizto delivermax_questions: limit the number of questions delivered (default 5)randomize_order: randomize question order (defaultfalse)session_timeout_minutes: max session duration before auto-complete (default 10)per_senior_dedupe_hours: skip delivery if the same senior completed this quiz within N hours (default 12)voice_instruction: override the default Gemini Live voice instruction for this quiz sessiontrigger_cooloff: whether successful delivery consumes the rule cool-off (defaulttrue)
Output keys: quiz_session with quiz_id, session_id, question_count, status, questions
Example: daily medication knowledge quiz
A cron rule fires at 10 AM every day and delivers a medication knowledge quiz:
# 1. Check home state to make sure the senior is awake and at home
step_type: home_state
config:
output_key: home
# 2. Only proceed if the senior is home and awake
step_type: condition
config:
expression: "home.home_at_home and not home.home_asleep"
# 3. Deliver the medication quiz
step_type: quiz_start
config:
quiz_id: 3
max_questions: 5
randomize_order: false
session_timeout_minutes: 15A rule with trigger_type: occupancy_duration fires when the bathroom presence sensor has been on for 40 minutes. The pipeline sends a bilingual check-in prompt and waits for a response:
# 1. Translate the check-in message to Tamil
step_type: llm_call
config:
model_id: gemma4_26b
special_instructions: "Translate using informal Tamil as spoken in Chennai:"
prompt: "Are you okay? Do you need help?"
output_key: translation
# 2. Send interactive prompt via popup and voice
step_type: interactive_prompt
config:
popup_message_template: "Are you okay?"
voice_prompt_template: "{{translation}}"
auto_escalate: true
escalate_button_text: "I need help"
dismiss_button_text: "I'm okay"
countdown_seconds: 30
timeout_action: escalate
output_key: interactive_response
# 3. Branch on response
step_type: condition
config:
expression: "auto_escalate_triggered == true"
trigger_cooloff: true
# 4. If escalated, notify caregiver
step_type: notification
config:
alert_level: emergency
message_template: "Emergency: {{person_detections.0.name}} needs help in bathroom"
channels: [telegram, pwa_popup_text, ha_speaker_tts]If the user dismisses the prompt, the pipeline ends without triggering cool-off. If they select "I need help" or timeout occurs, auto_escalate_triggered is set, the condition step marks the run as cool-off-worthy, and the caregiver is alerted.
:::
Response correlation with voice AI
When using the voice_prompt_template channel, the prompt is sent to the Gemini Live voice companion with execution context metadata (execution_id, step_id) appended. The voice agent can then call the respond_to_interactive_prompt MCP tool to record the user's verbal response, which resumes the pipeline just like a popup button click.
Condition Expressions
All template and condition expressions use a unified Lark-based grammar with wrapping. The grammar supports paths, JMESPath pipes, comparisons, boolean operators, and built-in functions. Simple path references use a fast regex shortcut; complex expressions are parsed into an AST and evaluated.
Server-side validation catches typos and unknown paths at save time, so invalid expressions never persist.
Path Access
Access nested values in pipeline_data:
{{ steps.scene_1.outputs.count }}
{{ trigger.sensor_id }}
{{ system.local_time }}JMESPath Pipes
Apply JMESPath filters and transformations:
{{ steps.scene_1.outputs.detections | length(@) }}
{{ steps.scene_1.outputs.detections[?label == 'person'] }}Comparisons
{{ steps.scene_1.outputs.count > 3 }}
{{ steps.logic_1.outputs.alert_level == "emergency" }}
{{ steps.scene_1.outputs.label != null }}Operators: ==, !=, >, <, >=, <=
Boolean Operators
{{ steps.scene_1.outputs.count > 0 and steps.logic_1.outputs.is_notification_needed == true }}
{{ not exists(steps.translate_1.outputs) or contains(steps.scene_1.outputs.description, "empty") }}Built-in Functions
| Function | Description |
|---|---|
contains(haystack, needle) | Substring or list-membership test (case-sensitive) |
icontains(haystack, needle) | Case-insensitive substring test |
length(value) | Length of string, list, or dict |
lower(value) | Convert to lowercase |
upper(value) | Convert to uppercase |
keys(value) | Dict keys as a list |
values(value) | Dict values as a list |
exists(path) | True if the path resolves to a non-None value |
Examples
# Notify only if a person was detected and reasoning says to
{{ steps.identify_1.outputs.person_detections | length(@) > 0 and steps.logic_1.outputs.is_notification_needed == true }}
# Skip translation if no notification message exists
{{ exists(steps.logic_1.outputs.notification_message) }}
# Branch based on alert level
{{ steps.logic_1.outputs.alert_level == "emergency" }}Expression validation runs on every step save. A typo like steps.scene_anaylsis_1.outputs.count is rejected with a suggestion pointing to the correct label.
Prompt Templates
Several step config fields support {{variable}} template syntax: the prompt and special_instructions fields in llm_call; the person_id, activity_type, and room_name fields in activity_detection; and the person_id and room_name fields in verification conditions. At execution time, placeholders are replaced with values from pipeline_data and trigger context.
Syntax
{{key}} -- top-level pipeline_data key
{{key.subkey}} -- nested dict access
{{key.0.name}} -- list index + nested access
{{room_name}} -- trigger context value
{{trigger.sensor_id}} -- explicit trigger namespace
{{system.local_time}} -- localized executor-injected system valueUnresolvable placeholders are left as-is so the LLM still sees the intent.
Available Variables
Any key in pipeline_data is available. The admin UI's template editors load the full reference from GET /api/v1/pipeline/data-keys and surface it as autocomplete when you type {{ in a templated field. Common variables:
| Variable | Source | Example Value |
|---|---|---|
vision_response | llm_call step (output_key: vision_response) | "A person is standing at the stove" |
person_detections.0.name | person_identification step | "grandma" |
person_detections.0.confidence | person_identification step | 0.92 |
logic_response.user_notification | llm_call step (output_key: logic_response) | "Stove left on" |
system.local_time | executor-injected system context (12-hour, no leading zero) | "8:42 AM" |
system.local_time_24h | executor-injected system context (24-hour) | "08:42" |
system.local_day_of_week | executor-injected system context | "Sunday" |
system.local_day_ordinal | executor-injected system context (with suffix) | "29th" |
system.local_date_long | executor-injected system context (friendly long date) | "March 29th, 2026" |
system.local_date_friendly | executor-injected system context (day plus month) | "Sunday, March 29th" |
trigger_input.reason | webhook trigger payload | "medication_missed" |
room_name | trigger context | "Kitchen" |
sensor_id | trigger context | "kitchen_cam_01" |
Template Examples
Vision prompt:
Look at the person in the {{room_name}}. Are they using the stove safely?Logic reasoning prompt:
{{vision_response}}
The person identified is {{person_detections.0.name}}.
Determine if they need a reminder about stove safety.Notification template:
Reminder for {{person_detections.0.name}} at {{system.local_time}} in the {{room_name}}.Translation source text:
{{logic_response.user_notification}}Example Pipeline Configurations
Camera Monitoring
The classic detect-analyze-notify chain using the unified llm_call step:
person_identification → llm_call (vision, output_key: vision_response)
→ llm_call (reasoning, output_key: logic_response)
→ llm_call (translation, output_key: translation)
→ notificationEach llm_call step selects its own model_id, so vision reasoning can use Cosmos Reason2 while logic and translation use Gemma 4 on the same GPU node.
Lunch Reminder
Identify who is in the room, record a lunch activity attributed to them, wait, then verify against the database and remind if they haven't eaten:
person_identification → activity_detection (activity_type: "lunch", person_id: {{person_detections.0.person_id}}) → wait (30 min) → verification → notificationLight Monitor
Analyze, decide, notify caregiver, wait for response, then act:
llm_call (vision) → llm_call (reasoning, output_key: logic_response) → notification → wait (5 min) → verification → ha_actionConditional Alert Escalation
Use conditions to route based on severity:
person_identification → llm_call (vision) → llm_call (reasoning, output_key: logic_response) → condition
├── (true: emergency) → notification [emergency level] → ha_action
└── (false: routine) → notification [info level]Occupancy Safety Alert
Triggered by the occupancy_duration trigger type when a presence sensor has been on for longer than the configured threshold. The pipeline below sends a multilingual voice prompt asking if the person needs help:
llm_call (translation, output_key: translation) → notification [alert_level: warning, channels: [pwa_popup_text, telegram, pwa_realtime_ai]]Rule configuration:
trigger_types:["occupancy_duration"]primary_sensor_id: the presence sensor to watch (e.g.bathroom_sensor_01)occupancy_config:{"min_minutes": 40}cool_off_minutes:30, which prevents re-firing until a cool-off-triggering execution ages out- Context filters: add
time_range,person_presence, orroomfilters as needed
The llm_call step localises the message before the notification step dispatches it. The pwa_realtime_ai channel initiates an interactive voice check-in via Gemini Live; the pwa_popup_text and telegram channels alert the admin console and caregiver simultaneously.
Dementia Signal Trigger
Rules with trigger_types containing "dementia_signal" fire when the CTS subscriber receives a behavioral signal from the tracking orchestrator that passes the per-person alert configuration gate. This allows caregivers to author rules that respond to pacing, sundowning, bathroom dwell anomalies, absence, or other CTS-detected behavioral patterns.
Dispatch path: DementiaSignalSubscriber persists every signal to the database, then checks whether the signal is enabled for that person via HouseholdMember.cts_alert_config. If allowed, it calls PipelineExecutor.fire_event, which queries all rules with trigger_types containing "dementia_signal" and evaluates their dementia_signal context filters against the event payload.
WARNING
dementia_signal triggers require CTS to be enabled (cts.enabled: true in config/settings.yaml). The feature also requires each household member to be enrolled with an appropriate alert profile (Senior, Adult, or Presence only). See Continuous Tracking for configuration details.
Rule settings tab:
trigger_types:["dementia_signal"]- Add a
dementia_signalcontext filter to scope by signal kind, person, severity, or time window:kinds: list of signal kinds to match (pacing,sundowning_index,bathroom_dwell_anomaly,nighttime_movement,stillness_anomaly,absence,room_revisit_rate). Empty matches any kind.person_ids: list of person identity IDs. Empty matches any person.min_severity: numeric threshold where0.33 = info,0.66 = warning,1.0 = emergency.cooldown_minutes: suppress repeated matches for N minutes per (rule, person, kind). UsesDementiaSignal.acknowledged_atfrom the Alerts UI.
Example: pacing alert to caregiver Telegram
When grandma has been pacing at warning severity, send an alert to Telegram with current room context:
Rule: trigger_types=["dementia_signal"]
Context filters:
- dementia_signal: kinds=["pacing"], min_severity=0.66, cooldown_minutes=30
Pipeline:
1. presence_query (person_id: grandma, output_key: presence)
2. notification (channels: [telegram, pwa_popup_text],
telegram_template: "Grandma is pacing in {{presence.room_name}}.")Telegram Command Trigger
Rules with trigger_type: telegram fire when a matching Telegram bot command is received. The TelegramTriggerService polls the Bot API on a short interval (default 5 s, configurable via notifications.telegram.trigger_poll_interval_seconds). The service only starts when a Telegram bot token is configured.
Rule settings tab:
trigger_types:["telegram"]telegram_trigger_config.command: the command to match, e.g./medication. Leave empty to match any command.telegram_trigger_config.allowed_chat_ids: list of Telegram chat IDs that may fire the rule. Falls back tonotifications.telegram.trigger_allowed_chat_idsin settings. An absent or empty whitelist blocks the command (fail-closed).telegram_trigger_config.respond_with_ack: send a brief reply confirming the rule was triggered (defaulttrue).
Dispatch path: identical to webhook triggers. A TriggerContext(trigger_type="telegram") is built and executed by PipelineExecutor. The command payload is available via trigger_input keys.
Example: medication reminder on demand
A caregiver or the senior sends /medication in the family Telegram group. The rule records a medication activity and sends a Tanglish voice confirmation:
Rule: trigger_type=telegram, command=/medication
Pipeline:
activity_detection (activity_type: medication, person_id: {{trigger_input.from_user.id}})
→ llm_call (translation, prompt: "Confirm: medication taken. Translate to Tamil.")
→ notification [channels: telegram, ha_speaker_tts]The notification step sends the translated confirmation back to Telegram and plays it over the home speaker.
Real-world Examples
The examples below assume a household with one senior (grandma) and a small set of caregivers. Each example names the trigger, every filter, every step, and every notification channel so you can replicate it end-to-end. All filter and step type names match what GET /api/v1/pipeline/filter-types and GET /api/v1/pipeline/step-types return on a current build.
Stove caution when grandma is alone in the kitchen
The goal: when grandma is the only person at home and steps into the kitchen, gently remind her about stove safety. The reminder fires at most once per hour even if she walks in and out.
Rule settings:
trigger_types:["sensor_event"]primary_sensor_id: kitchen cameracool_off_minutes:60
Filters (all must pass):
| Filter | Config | Negate |
|---|---|---|
room | room_name: Kitchen | no |
home_state | person_id: grandma, gate on at_home | no |
person_presence | person_id: grandma | no |
person_presence | person_id: caregiver_1 | yes |
person_presence | person_id: caregiver_2 | yes |
time_range | 06:00 - 22:00 (avoid waking her at night) | no |
Pipeline:
1. scene_analysis (run_detect=true, run_describe=true, run_hazards=true)
2. condition expression: any_in(scene_detections.*.label, ["stove","oven","cooktop"])
on_true → step 3, on_false → end
3. notification channels: [pwa_tts_announcement, eink]
pwa_tts_announcement_template: "Hi grandma, please be careful around the stove. Turn it off when you're done cooking."
eink_template_id: caution_template
eink_expiry_minutes: 30The TTS announcement plays from the kitchen PWA tablet; the e-ink display in the kitchen also shows the message. cool_off_minutes: 60 means subsequent stove sightings within an hour record EventLog.status = ignored and do not re-fire.
Confirm grandma had lunch and send images to a caretaker
The goal: between 12:00 and 14:00, observe the kitchen periodically. If grandma appears to be eating, record a meal_lunch activity with the VLM's full reasoning attached, and forward an annotated image to the caretaker on Telegram.
Rule settings:
trigger_types:["cron"]- Cron trigger:
*/10 12-13 * * *(every 10 minutes from 12:00 to 13:50, managed viaCronTriggermodel) cool_off_minutes:120(one lunch confirmation per day is enough)
Filters: none beyond cron.
Pipeline:
1. person_identification include_annotated_image: true, target_persons: ["grandma"]
2. condition expression: person_detections.count > 0
on_true → step 3, on_false → end
3. llm_call (vision) model_id: cosmos_reason2
image_source: both
additional_room_names: ["Kitchen"]
image_time_filter.since_minutes: 30
sort_by_sensor_then_time: true
response_format: json_schema
response_json_schema: {ate_lunch: bool, evidence: str, confidence: float}
output_key: vision_response
4. condition expression: vision_response.ate_lunch == true
on_true → step 5, on_false → end
5. activity_detection activity_type: meal_lunch
person_id: "{{person_detections.0.person_id}}"
confidence: "{{vision_response.confidence}}"
capture_scene_description: true
scene_description_key: vision_response
metadata_extra: '{"reasoning": "{{vision_response.evidence}}"}'
6. notification channels: [telegram]
telegram_template: "Grandma had lunch. {{vision_response.evidence}}"
telegram_attach_images: true # forwards annotated_images from step 1The activity_detection step writes a row that downstream rules can read via verification. The cool_off_minutes: 120 plus meal_lunch activity record is what stops the next rule (below) from also firing.
Reminder when grandma has not had lunch by 14:00
The goal: at 14:00, check whether a meal_lunch activity was recorded for grandma since 11:00. If not, remind her over the kitchen tablet and e-ink, and notify caregivers.
Rule settings:
trigger_types:["cron"]- Cron trigger:
0 14 * * *(managed viaCronTriggermodel) dependencies: must depend on the lunch-confirmation rule (above) so this rule does not fire on a day where the database write hasn't propagated.
Pipeline:
1. verification conditions:
- activity_type: meal_lunch
person_id: grandma
window_start: "11:00"
window_end: "14:00"
min_confidence: 0.6
On at-least-one match → end (status: ignored).
2. notification channels: [telegram]
telegram_template: "Reminder: grandma hasn't been recorded as having lunch yet today."
3. notification channels: [pwa_tts_announcement, eink]
pwa_tts_announcement_template: "Grandma, it's lunch time. Would you like me to suggest something?"
eink_template_id: lunch_reminderverification returns success and stops the pipeline (should_continue=false) when the activity exists, so the notifications never fire on a normal day.
Unknown person enters when grandma is alone
The goal: if grandma is the only person home and an unidentified face is detected on the entry camera, send an emergency Telegram alert with an annotated image.
Rule settings:
trigger_types:["sensor_event"]primary_sensor_id: entry cameracool_off_minutes:5(we want quick re-firing if the person stays)
Filters:
| Filter | Config | Negate |
|---|---|---|
home_state | person_id: grandma, gate on at_home | no |
person_presence | person_id: caregiver_1 | yes |
person_presence | person_id: caregiver_2 | yes |
Pipeline:
1. person_identification include_annotated_image: true
2. condition expression: any(person_detections, d -> d.identity == "unknown")
on_true → step 3, on_false → end
3. notification channels: [telegram, pwa_popup_text]
alert_level: emergency
telegram_template: "Unknown person at the door while grandma is home alone."
telegram_attach_images: trueThis rule has zero LLM calls. It relies entirely on person identification and the home_state + person_presence filters, which makes it cheap enough to run on every entry-camera frame.
Composing reminders with interactive_prompt
The reminder examples above push notifications and stop. To turn a reminder into a two-way exchange, replace the final notification with an interactive_prompt step. It pushes the question through the same channels, persists a pending response, and resumes the workflow with the answer in pipeline_data["interactive_response"]. A downstream condition step can then branch on yes / no / timeout.
Cron Triggers
Cron schedules are decoupled from rules through a dedicated CronTrigger model with a many-to-many relationship. A rule can have multiple cron schedules, and multiple rules can share the same schedule.
Cron-triggered rules go through RulesEngine like sensor events, so context filters (time_range, day_of_week, person_presence), dependencies, and rate limits all apply. The scheduler creates one APScheduler job per CronTrigger, not per rule.
The admin UI includes a Cron Builder with preset modes: Daily, Weekly, Hourly, Every N Minutes, and Custom. A live "next 5 runs" preview shows when the rule will fire in the operator's timezone. Raw cron expressions are always editable for power users.
The POST /api/v1/pipeline/cron/preview endpoint validates expressions and returns parsed structure plus next-run timestamps.
Rule Import/Export
Rules can be exported to portable YAML or JSON bundles and imported across installations. Bundles use stable string identifiers (labels, type names, sensor ids) instead of database primary keys, so they survive round-trips between different installs.
- Export:
GET /api/v1/rules/{id}/exportreturns a self-containedRuleBundledocument with steps, contexts, dependencies, cron expressions, and external references - Import preview:
POST /api/v1/rules/import/previewvalidates a bundle and returns anImportReportwith per-step migration status and any warnings, without writing to the database - Import commit:
POST /api/v1/rules/importcommits a validated bundle within a single transaction. All-or-nothing: if any step fails validation or any reference is unresolvable, the entire import rolls back - Migration chains: Each step handler can declare
ConfigMigrationentries that transform config dicts from older schema versions to the current one. Migrations are pure functions with unit tests
Bundles are also accessible to AI agents through the MCP server (get_rule_bundle, import_rule_bundle).
Live Pipeline Activity View
The Process Activity view (/activity/process) provides real-time visibility into pipeline runs and CTS ingest activity. It is backed by PipelineRunService and a dedicated WebSocket channel.
Pipeline run endpoints
| Method | Path | Description |
|---|---|---|
GET | /api/v1/pipeline/runs | List recent pipeline runs; ?status=active returns only running/waiting executions |
GET | /api/v1/pipeline/runs/{execution_id} | Single run envelope (PipelineRunEnvelope) |
GET | /api/v1/pipeline/ingest/activity | Recent reCamera ingest events (IngestActivityEnvelope) |
WebSocket live channel
Connect to GET /ws/pipeline (authenticated via sec-websocket-protocol) for a push feed of PipelineExecutionEvent messages. Each event carries the execution ID, rule name, status transition, current step label, and elapsed milliseconds. The ProcessActivityView uses the useLivePipeline composable to manage connection state and display a live DAG of the running pipeline steps.
Connection-state rendering rules:
- Connecting: show a spinner; do not display a stale DAG as live data.
- Connected: stream events and update the step timeline in real time.
- Disconnected: freeze the last known state and show a reconnection indicator;
useLivePipelineapplies 3-second exponential backoff.
PipelineRunEnvelope
Each run envelope carries:
| Field | Description |
|---|---|
execution_id | Unique execution identifier |
rule_id / rule_name | The rule that triggered the run |
status | running, waiting, completed, failed, or cancelled |
triggered_at | ISO-8601 UTC start timestamp |
steps | List of step summaries with label, type, status, and elapsed_ms |
trigger_type | How the run was initiated |
Workflow Execution
When a rule's pipeline is triggered, a WorkflowExecution record tracks the full lifecycle:
| Status | Meaning |
|---|---|
running | Pipeline is actively executing steps |
waiting | Paused at a wait or interactive_prompt step, will resume at resume_at time |
completed | All steps finished successfully |
failed | A step failed or the pipeline timed out |
cancelled | Manually cancelled via the API or admin UI |
Cancel: Running or waiting executions can be cancelled from the Live Run tab or via POST /api/v1/workflows/{id}/cancel. The executor checks for cancellation between steps (cooperative cancellation) and stops at the next step boundary.
Rerun: Any execution can be rerun from the beginning via POST /api/v1/workflows/{id}/rerun. This copies the original trigger context and re-executes all enabled steps. Useful for testing rule changes against the same trigger.
Execution detail: GET /api/v1/workflows/{id}/detail returns a rich view model with a step-by-step timeline showing labels, icons, categories, elapsed seconds, success/failure status, outputs, and errors. The same component renders both live and historical executions.
Per-step timeout: Each step has a 60-second timeout to prevent stuck LLM calls from hanging the pipeline. If a step times out, it is marked failed and the pipeline continues.
All intermediate results are persisted in pipeline_data_json on both the WorkflowExecution and the EventLog for debugging and auditability. Step timings include labels, elapsed seconds, and success/failure status.
Separately, the related EventLog is finalized as either completed or ignored. Only completed events participate in rule cool-off checks.