Skip to content

v0.5 Protocol Specifications

Status

TL;DR

v0.5 ships four new plugin kinds at the protocol level. Two are lift-and-shift extractions of existing in-tree responsibilities (workflow_runner, queue) — their reference implementations live as bundled plugins in the default flavor. Two are net-new external integrations (durable_store → DBOS, memory_store → Zep). All four share the same plugin host, JSON-RPC envelope, signing, and lifecycle defined in animus-plugin-protocol — they only add typed RPC surfaces.

Plugin kindNew protocol crateFirst implementation
workflow_runneranimus-workflow-runner-protocolanimus-workflow-runner-default (Rust, lift)
queueanimus-queue-protocolanimus-queue-default (Rust, lift)
durable_storeanimus-durable-store-protocolanimus-step-durable-dbos (TypeScript, DBOS)
memory_storeanimus-memory-store-protocolanimus-memory-zep (TypeScript, Zep)

Common conventions (apply to all four crates)

Each new protocol crate follows the pattern established by animus-subject-protocol and animus-provider-protocol:

rust
//! Protocol types for {kind} plugins.

#![warn(missing_docs)]

use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;

/// Plugin-kind constant — must match the value added to
/// `animus-plugin-protocol::PLUGIN_KIND_*`.
pub const KIND: &str = "{kind}";

/// Method constants used as JSON-RPC `method` strings.
pub const METHOD_X: &str = "{kind}/{verb}";

Wire conventions across all four:

  • Newline-delimited JSON-RPC 2.0 over stdio (existing animus-plugin-protocol envelope)
  • All params/results use serde with skip_serializing_if = "Option::is_none" on optional fields
  • Timestamps as RFC 3339 strings (String), not chrono::DateTime, to keep protocol crate dep-free of chrono
  • Value (serde_json::Value) for opaque/backend-specific payloads
  • No Arc<T>, no trait objects, no closures, no streams in protocol types — everything is Serialize + Deserialize + Send + 'static
  • Method naming standard: all methods use snake_case verbs (e.g., workflow/run_phase, queue/mark_assigned). No hyphens. (Fixes codex P3-2.)

Project-scope binding (applies to ALL four kinds):

Every plugin process in v0.5 is bound to exactly one project root at startup time, via an extension to the standard initialize handshake. The host passes the project root as part of an init_extensions map:

jsonc
// JSON-RPC initialize params (extension on existing animus-plugin-protocol)
{
  "protocol_version": "1.1.0",
  "init_extensions": {
    "project_binding": {
      "project_root": "/abs/path/to/project",
      "repo_scope": "<scope id from kernel>"
    }
  }
}

Plugins MUST refuse subsequent RPCs that imply a different project root. The host MUST NOT reuse a single plugin process across projects. This avoids needing a project_root field on every RPC and keeps the protocol concrete. (Fixes codex P1-1.)

Backward-compatibility policy:

  • Adding a new plugin kind = animus-plugin-protocol minor version bump (e.g., 1.0.01.1.0)
  • Adding a method to an existing kind's protocol crate = minor bump on that crate
  • Changing an existing method's params/result = major bump on that crate
  • Plugins declare which protocol crate versions they implement at handshake via InitializeResult::capabilities (see §"Cross-protocol concerns" for the concrete capability schema)

Status vocabularies:

Where types use bare String status fields (workflow_status, phase_status, queue entry status, durable run status), each protocol crate declares the allowed values as pub const constants in a status submodule. Plugin authors reference these constants instead of inlining strings. (Fixes codex P3-4.)


1. animus-workflow-runner-protocol

The workflow runner is the engine that executes Animus workflow YAML — iterating phases, evaluating decision contracts, handling rework loops, applying post-success actions. v0.5 extracts the in-tree workflow-runner-v2 crate into a bundled plugin.

Kind constant

In animus-plugin-protocol/src/lib.rs:

rust
/// Plugin kind for workflow runner plugins.
///
/// Workflow runners execute Animus workflow YAML by orchestrating phases,
/// evaluating decision contracts, and handling rework loops.
pub const PLUGIN_KIND_WORKFLOW_RUNNER: &str = "workflow_runner";

Methods

rust
pub const METHOD_WORKFLOW_EXECUTE: &str = "workflow/execute";
pub const METHOD_WORKFLOW_RUN_PHASE: &str = "workflow/run_phase";

workflow/execute — Full workflow run

rust
use animus_subject_protocol::{SubjectDispatch, SubjectRef};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowExecuteRequest {
    // `project_root` is bound at initialize-time (see Common Conventions);
    // it is NOT a per-request field.

    /// Existing workflow id to resume, or `None` to start a fresh run.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub workflow_id: Option<String>,

    /// Subject envelope. Exactly one of `subject_dispatch` OR
    /// (`subject_ref` + one of [`task_id`, `requirement_id`, `title`+`description`])
    /// must be set. Generic subject backends (Linear, Jira, custom kinds) MUST
    /// use `subject_dispatch`; task and requirement backends MAY use the
    /// convenience fields. (Fixes codex P1-3.)
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub subject_dispatch: Option<SubjectDispatch>,

    /// For convenience callers (task/requirement subjects), the subject_ref
    /// identifies what to run when `subject_dispatch` is not set.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub subject_ref: Option<SubjectRef>,

    /// Task id (used only when `subject_dispatch` is None and `subject_ref.kind == "task"`).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub task_id: Option<String>,

    /// Requirement id (used only when `subject_dispatch` is None and `subject_ref.kind == "requirement"`).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub requirement_id: Option<String>,

    /// For custom ad-hoc subjects without an existing subject_ref: human-readable title.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub title: Option<String>,

    /// For custom ad-hoc subjects: description / prompt seed.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub description: Option<String>,

    /// Workflow YAML ref (e.g., `"standard"`, `"research-first"`).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub workflow_ref: Option<String>,

    /// Initial input JSON for workflow variables.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub input: Option<Value>,

    /// Workflow scalar variables.
    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
    pub vars: HashMap<String, String>,

    /// Force a specific model (e.g., `"claude-opus-4-7"`).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub model: Option<String>,

    /// Force a specific tool (e.g., `"claude"`, `"codex"`, `"gemini"`).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub tool: Option<String>,

    /// Per-phase timeout in seconds.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub phase_timeout_secs: Option<u64>,

    /// Single-phase filter: run only this phase id.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub phase_filter: Option<String>,

    /// Opaque phase routing config (backend-specific).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub phase_routing: Option<Value>,

    /// Opaque MCP runtime config (backend-specific).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub mcp_config: Option<Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowExecuteResult {
    pub workflow_id: String,
    pub workflow_ref: String,
    /// One of: `"completed"`, `"running"`, `"failed"`, `"escalated"`,
    /// `"cancelled"`.
    pub workflow_status: String,
    pub subject_id: String,
    pub execution_cwd: String,
    pub phases_requested: Vec<String>,
    pub phases_completed: usize,
    pub phases_total: usize,
    pub total_duration_secs: u64,
    pub phase_results: Vec<PhaseResultSnapshot>,
    /// Post-success action outcome (push, PR creation, merge, etc.).
    pub post_success: Value,
    /// True iff `workflow_status == "completed"`.
    pub success: bool,
    /// Phase events emitted during execution (replaces in-process callback).
    #[serde(default)]
    pub phase_events: Vec<PhaseEvent>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PhaseResultSnapshot {
    pub phase_id: String,
    /// `"completed"`, `"rework"`, `"closed"`, `"failed"`, `"manual_pending"`, etc.
    pub status: String,
    pub duration_secs: u64,
    pub outcome: Value,
    pub metadata: Value,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub next_phase_id: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub close_reason: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum PhaseEvent {
    Started { phase_id: String, attempt: u32, ts: String },
    Decision { phase_id: String, verdict: String, confidence: Option<f32>, ts: String },
    Completed { phase_id: String, status: String, ts: String },
}

workflow/run-phase — Single phase execution

Used by the daemon's per-phase scheduler and CLI --phase filters. The runner does not iterate phases here — it runs exactly one phase and returns.

rust
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowPhaseRunRequest {
    // project_root is bound at initialize-time (see Common Conventions).
    // NOT a per-request field.
    pub execution_cwd: String,
    pub workflow_id: String,
    pub workflow_ref: String,
    pub subject_id: String,
    pub subject_title: String,
    pub subject_description: String,
    pub phase_id: String,
    pub phase_attempt: u32,

    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub phase_timeout_secs: Option<u64>,

    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub model_override: Option<String>,

    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub tool_override: Option<String>,

    /// One of `"minimal"`, `"low"`, `"medium"`, `"high"`, `"critical"`.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub task_complexity: Option<String>,

    /// Rework context from a prior phase's `rework` verdict.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub rework_context: Option<String>,

    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
    pub pipeline_vars: HashMap<String, String>,

    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub dispatch_input: Option<String>,

    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub schedule_input: Option<String>,

    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub phase_routing: Option<Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowPhaseRunResult {
    /// `"completed"`, `"manual_pending"`, `"failed"`.
    pub phase_status: String,
    pub duration_secs: u64,
    pub outcome: Value,
    pub metadata: Value,
    #[serde(default)]
    pub signals: Vec<Value>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub model: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub tool: Option<String>,
}

Error codes (workflow_runner)

rust
pub mod error_codes {
    /// Workflow id not found.
    pub const WORKFLOW_NOT_FOUND: i32 = -32101;
    /// Phase id not found within workflow.
    pub const PHASE_NOT_FOUND: i32 = -32102;
    /// Workflow already in a terminal state.
    pub const WORKFLOW_TERMINAL: i32 = -32103;
    /// Project root mismatch (plugin is bound to a different project).
    pub const PROJECT_BINDING_MISMATCH: i32 = -32104;
    /// Manual gate not satisfied; workflow paused.
    pub const MANUAL_GATE_PENDING: i32 = -32105;
    /// Decision contract evaluation failed (parser, validator).
    pub const DECISION_CONTRACT_INVALID: i32 = -32106;
}

Manifest declaration

rust
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowRunnerManifest {
    pub name: String,
    pub version: String,
    pub description: String,
    pub capabilities: WorkflowRunnerCapabilities,
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WorkflowRunnerCapabilities {
    #[serde(default)]
    pub phase_decision_parsing: bool,
    #[serde(default)]
    pub rework_context_support: bool,
    #[serde(default)]
    pub post_success_actions: bool,
    #[serde(default)]
    pub crash_recovery: bool,
    #[serde(default)]
    pub manual_pause_support: bool,
}

IPC mitigations table

In-process patternPlugin pattern
PhaseEventCallback closureReturn Vec<PhaseEvent> in WorkflowExecuteResult::phase_events
SharedWorkflowEventEmitter: Arc<dyn …>Removed; plugin emits events into the result
Arc<dyn ServiceHub> paramRemoved; plugin loads its own hub from project_root
Phase output file I/OPlugin owns file paths under <project_root>/.animus/
Async event subscriptions (workflow/events channel)Paginate via cursor on a separate workflow/events/poll method (out of scope for v0.5 protocol — daemon polls via existing control plane)

Tests to lift

All from crates/workflow-runner-v2/tests/:

  • durability_idempotency_and_markers.rs
  • durability_manual_pending_and_failed_events.rs
  • session_resume.rs
  • notification_log.rs
  • phase_duration_histogram_records_observations.rs

Plus module-level mod tests blocks in:

  • phase_executor::tests::
  • payload_traversal::tests::
  • phase_output::tests::
  • workflow_helpers::tests::

2. animus-queue-protocol

The queue is a per-project FIFO buffer for SubjectDispatch work items awaiting scheduling. Currently in-tree at crates/orchestrator-daemon-runtime/src/queue/. v0.5 extracts to a bundled plugin.

Kind constant

rust
pub const PLUGIN_KIND_QUEUE: &str = "queue";

Methods

rust
pub const METHOD_QUEUE_ENQUEUE: &str = "queue/enqueue";
pub const METHOD_QUEUE_LIST: &str = "queue/list";
pub const METHOD_QUEUE_LEASE: &str = "queue/lease";
pub const METHOD_QUEUE_STATS: &str = "queue/stats";
pub const METHOD_QUEUE_HOLD: &str = "queue/hold";
pub const METHOD_QUEUE_RELEASE: &str = "queue/release";
pub const METHOD_QUEUE_DROP: &str = "queue/drop";
pub const METHOD_QUEUE_REORDER: &str = "queue/reorder";
pub const METHOD_QUEUE_MARK_ASSIGNED: &str = "queue/mark_assigned";
pub const METHOD_QUEUE_COMPLETION: &str = "queue/completion";

Status vocabulary

rust
pub mod status {
    pub const PENDING: &str = "pending";
    pub const ASSIGNED: &str = "assigned";
    pub const HELD: &str = "held";
    // Completion statuses
    pub const COMPLETED: &str = "completed";
    pub const FAILED: &str = "failed";
    pub const CANCELLED: &str = "cancelled";
}

Types

rust
use animus_subject_protocol::SubjectDispatch;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueEnqueueRequest {
    pub subject_dispatch: SubjectDispatch,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueEnqueueResponse {
    /// `true` if a new entry was created. `false` if the dispatch was rejected
    /// as a duplicate of an existing pending/assigned entry (idempotent).
    pub enqueued: bool,
    /// Stable entry id assigned by the plugin. Used by all subsequent
    /// mutation calls.
    pub entry_id: String,
    /// Convenience: the subject id from the dispatch envelope.
    pub subject_id: String,
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct QueueListRequest {
    /// Filter by status. Values from `status::*`. Empty means all.
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    pub status: Vec<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub limit: Option<usize>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub offset: Option<usize>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueListResponse {
    pub entries: Vec<QueueEntry>,
    pub total: usize,
    pub stats: QueueStats,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueEntry {
    /// Stable entry id (unique within the project queue). Mutation calls
    /// target this. (Fixes codex P2-3.)
    pub entry_id: String,
    pub subject_id: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub task_id: Option<String>,
    /// Full dispatch envelope — included so the daemon can lease an entry
    /// and start work without a second roundtrip. (Fixes codex P1-2.)
    pub subject_dispatch: SubjectDispatch,
    /// Value from `status::*`.
    pub status: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub workflow_id: Option<String>,
    pub enqueued_at: String, // RFC 3339
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub assigned_at: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub held_at: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct QueueStats {
    pub total: usize,
    pub pending: usize,
    pub assigned: usize,
    pub held: usize,
}

/// Atomic dispatch path: claim up to `max` pending entries in priority order,
/// returning each with its full `SubjectDispatch`. The plugin transitions each
/// returned entry from Pending → Assigned in a single transaction.
/// (Replaces "list then mark_assigned" with atomic lease per codex P1-2.)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueLeaseRequest {
    pub max: usize,
    /// Optional daemon-provided workflow ids to attach to leased entries.
    /// If set, length MUST be exactly `max` (the plugin returns an error
    /// otherwise). If `None`, the plugin generates synthetic UUIDs.
    /// (Fixes codex round-2 P2-2.)
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub workflow_ids: Option<Vec<String>>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueLeaseResponse {
    pub leased: Vec<QueueEntry>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueHoldRequest {
    pub entry_id: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub reason: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueMutationResponse {
    /// `true` if the entry state changed. `false` if the entry was already in
    /// the requested state (idempotent no-op) or was not found (in which case
    /// `not_found` is `true`). (Fixes codex P2-2.)
    pub changed: bool,
    #[serde(default)]
    pub not_found: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueReleaseRequest {
    pub entry_id: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueDropRequest {
    pub entry_id: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueReorderRequest {
    /// New order (partial — entries not in this list keep their existing position).
    pub entry_ids: Vec<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueReorderResponse {
    pub reordered_count: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueMarkAssignedRequest {
    pub entry_id: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub workflow_id: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueCompletionRequest {
    pub entry_id: String,
    /// Value from `status::*` (completed | failed | cancelled).
    pub status: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub workflow_ref: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub workflow_id: Option<String>,
}

Hold / release / drop / mark_assigned / completion all return QueueMutationResponse. Reorder returns QueueReorderResponse with the count of entries whose position changed.

Capacity / throttling note

The queue protocol does not carry capacity logic. Capacity is a daemon-side concern (dispatch_headroom = max_tasks_per_tick.min(pool_size - active_agents)). The daemon decides how many items to lease from the queue per tick; the queue plugin just provides ordered access. This keeps the queue protocol minimal and lets capacity policy stay in the kernel without coupling to the queue backend.

Error codes

rust
pub mod error_codes {
    pub const QUEUE_ENTRY_NOT_FOUND: i32 = -32001;
    pub const QUEUE_ENTRY_ALREADY_ASSIGNED: i32 = -32002;
    pub const QUEUE_ENTRY_NOT_PENDING: i32 = -32003;
    pub const QUEUE_REORDER_FAILED: i32 = -32004;
    pub const QUEUE_LOCK_ACQUISITION_FAILED: i32 = -32005;
}

Tests to lift

From crates/orchestrator-daemon-runtime/src/queue/queue_service.rs:

  • enqueue_subject_dispatch_is_idempotent_for_same_task_pipeline
  • hold_release_and_reorder_use_subject_ids
  • enqueue_subject_dispatch_accepts_non_task_subjects
  • reorder_subjects_keeps_all_entries_for_same_subject
  • generic_subjects_use_kind_qualified_queue_ids

3. animus-durable-store-protocol

Durable step checkpointing for crash-safe workflow execution. First implementation: animus-step-durable-dbos (TypeScript, DBOS as durable journal — Option A from the DBOS analysis). Future implementations may include Temporal, SQLite-based, or distributed engines.

Kind constant

rust
pub const PLUGIN_KIND_DURABLE_STORE: &str = "durable_store";

Methods

rust
pub const METHOD_DURABLE_BEGIN_WORKFLOW_RUN: &str = "durable/begin_workflow_run";
pub const METHOD_DURABLE_BEGIN_STEP: &str = "durable/begin_step";
pub const METHOD_DURABLE_COMMIT_STEP: &str = "durable/commit_step";
pub const METHOD_DURABLE_ABANDON_STEP: &str = "durable/abandon_step";
pub const METHOD_DURABLE_RECOVER_IN_FLIGHT: &str = "durable/recover_in_flight";
pub const METHOD_DURABLE_QUERY_RUN: &str = "durable/query_run";

Status vocabulary

rust
pub mod step_status {
    /// First time the plugin has seen this idempotency_key. Caller proceeds
    /// to execute the side effect, then calls `commit_step`.
    pub const NEW: &str = "new";
    /// Another caller has an outstanding reservation for this idempotency_key
    /// with an unexpired lease. The current caller MUST NOT execute the side
    /// effect; it should wait, retry, or surface to the daemon for arbitration.
    pub const IN_PROGRESS: &str = "in_progress";
    /// Step was previously committed with an output. `prior_output` is set.
    pub const ALREADY_COMMITTED: &str = "already_committed";
    /// Step was previously committed with an error. `prior_error` is set.
    /// PRIOR_ERROR is TERMINAL for this idempotency_key — the plugin will
    /// never re-execute the side effect for this key. Callers that want to
    /// retry after a permanent error must use a different idempotency_key
    /// (typically by incrementing an attempt counter). `abandon_step` does
    /// NOT clear committed errors. (See DBOS guidance for the retry contract.)
    pub const PRIOR_ERROR: &str = "prior_error";
}

pub mod run_status {
    pub const PENDING: &str = "pending";
    pub const SUCCESS: &str = "success";
    pub const ERROR: &str = "error";
    pub const CANCELLED: &str = "cancelled";
}

Fence semantics (resolves codex P1-4)

The durable store implements a side-effect fence: between begin_step and commit_step, the plugin holds a reservation that prevents concurrent or replay executions of the same idempotency_key. The contract:

  1. begin_step with a new key returns status = NEW and creates a reservation with a TTL (default 5 minutes; configurable per plugin manifest).
  2. Caller executes the side effect.
  3. Caller calls commit_step with output (success) or error (failure). Either commits the result and clears the reservation.
  4. If the caller crashes between step 1 and step 3, the reservation expires after TTL. On recovery, a fresh begin_step with the same key returns IN_PROGRESS until the TTL expires, then transitions to allowing a new reservation.
  5. If the caller explicitly abandons (e.g., decides not to retry), it calls abandon_step to release the reservation immediately.

This does not eliminate at-most-once side effects (the daemon can still crash mid-side-effect), but it gives the daemon a deterministic recovery signal and prevents two callers from running the same step concurrently. The "four hard problems" deferral from the DBOS analysis applies: solving end-to-end exactly-once requires deeper integration than Option A. The fence is the minimum that makes Option A useful.

Types

rust
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BeginWorkflowRunRequest {
    pub run_id: String,
    pub phase_id: String,
    #[serde(default, skip_serializing_if = "Value::is_null")]
    pub inputs: Value,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BeginWorkflowRunResponse {
    /// Plugin-issued, monotonically increasing epoch. The plugin persists
    /// the current epoch counter to its durable store; on restart, it MUST
    /// resume from the persisted value, not zero. The daemon does not
    /// store epoch values across restarts; instead it queries
    /// `recover_in_flight` with `since_epoch = 0` after a restart to get
    /// all in-flight runs and resumes from there. The epoch is project-scoped
    /// (plugin process is bound to one project at init). (Fixes codex P2-5.)
    pub epoch: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BeginStepRequest {
    pub run_id: String,
    pub phase_id: String,
    pub step_name: String,
    /// Caller-supplied idempotency key. The plugin uses this to deduplicate
    /// replays. Daemon convention: `format!("{}:{}:{}", run_id, phase_id, step_name)`.
    pub idempotency_key: String,
    #[serde(default, skip_serializing_if = "Value::is_null")]
    pub payload: Value,
    /// Reservation TTL in seconds. If unset, the plugin uses its configured
    /// default (manifest-declared, typically 300). Caller MAY override per-step
    /// for known-long operations.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub reservation_ttl_secs: Option<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BeginStepResponse {
    pub step_id: String,
    /// Value from `step_status::*`.
    pub status: String,
    /// Present when `status == ALREADY_COMMITTED`. Contains the previously
    /// committed `output` so the caller can short-circuit the side effect.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub prior_output: Option<Value>,
    /// Present when `status == PRIOR_ERROR`. Contains the previously
    /// committed error.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub prior_error: Option<StepError>,
    /// Present when `status == IN_PROGRESS`. RFC 3339 timestamp of when
    /// the existing reservation expires.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub reservation_expires_at: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommitStepRequest {
    pub step_id: String,
    /// Required: `"success"` or `"error"`. Use `commit_outcome::*` constants.
    /// This is the authoritative dedupe signal for replay — `output`/`error`
    /// payloads MAY be null (no-payload commits are valid in both directions),
    /// so the outcome cannot be inferred from payload presence.
    /// (Fixes codex round-5 P1.)
    pub outcome: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub output: Option<Value>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub error: Option<StepError>,
}

pub mod commit_outcome {
    pub const SUCCESS: &str = "success";
    pub const ERROR: &str = "error";
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AbandonStepRequest {
    pub step_id: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub reason: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AbandonStepResponse {
    pub ack: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepError {
    pub code: String,
    pub message: String,
    #[serde(default, skip_serializing_if = "Value::is_null")]
    pub details: Value,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommitStepResponse {
    pub ack: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecoverInFlightRequest {
    pub since_epoch: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecoverInFlightResponse {
    pub in_flight: Vec<InFlightRun>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InFlightRun {
    pub run_id: String,
    pub phase_id: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub last_committed_step: Option<String>,
    /// Backend-specific replay state (e.g., DBOS workflow status payload).
    #[serde(default, skip_serializing_if = "Value::is_null")]
    pub replay_state: Value,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryRunRequest {
    pub run_id: String,
    /// Required: DBOS workflow id is keyed by `(run_id, phase_id)`. Different
    /// phases of the same run are different workflows. (Fixes codex round-2 P2-1.)
    pub phase_id: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryRunResponse {
    pub run_id: String,
    /// `"pending"`, `"success"`, `"error"`, `"cancelled"`.
    pub status: String,
    pub steps: Vec<StepRecord>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepRecord {
    pub step_id: String,
    pub step_name: String,
    pub idempotency_key: String,
    pub committed_at: String, // RFC 3339
    /// `"success"` or `"error"` — the authoritative outcome (see
    /// `commit_outcome::*`). Independent of payload nulls.
    pub outcome: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub output: Option<Value>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub error: Option<StepError>,
}

Error codes (durable_store)

rust
pub mod error_codes {
    /// Run id not found (begin_workflow_run was never called).
    pub const RUN_NOT_FOUND: i32 = -32201;
    /// Step id not found (commit_step / abandon_step on unknown reservation).
    pub const STEP_NOT_FOUND: i32 = -32202;
    /// Reservation expired before commit.
    pub const RESERVATION_EXPIRED: i32 = -32203;
    /// Backend (e.g., DBOS / Postgres) unavailable.
    pub const BACKEND_UNAVAILABLE: i32 = -32204;
    /// Project root mismatch.
    pub const PROJECT_BINDING_MISMATCH: i32 = -32205;
    /// Replay determinism violated (step issued out of order vs. recorded).
    pub const REPLAY_NONDETERMINISM: i32 = -32206;
}

DBOS implementation guidance (Option A)

DBOS's native checkpoint model commits the step result inside @DBOS.step(). The Animus protocol layers a reservation table on top to fence side effects across the IPC boundary. The plugin maintains a separate Postgres table for reservations alongside DBOS's checkpoint tables.

Reservation table schema (plugin-owned, separate from DBOS's tables):

sql
CREATE TABLE animus_step_reservations (
    step_id           TEXT PRIMARY KEY,
    workflow_id       TEXT NOT NULL,   -- DBOS workflowID
    idempotency_key   TEXT NOT NULL,
    reserved_at       TIMESTAMPTZ NOT NULL,
    expires_at        TIMESTAMPTZ NOT NULL,
    -- Lifecycle column drives the partial unique index. Postgres does NOT
    -- allow now() in a partial index predicate, so use an explicit column
    -- set by commit_step / abandon_step / sweeper. (Fixes codex round-4 P1-1.)
    released          BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE UNIQUE INDEX animus_step_reservations_active_key
    ON animus_step_reservations (idempotency_key)
    WHERE released = FALSE;
CREATE INDEX animus_step_reservations_expiry_idx
    ON animus_step_reservations (expires_at)
    WHERE released = FALSE;

Within begin_step, the plugin sweeps expired-but-not-released rows in the same transaction (UPDATE animus_step_reservations SET released = TRUE WHERE expires_at < now() AND released = FALSE) before checking liveness. A background tokio task does the same job globally on a 30s timer.

The plugin also maintains a committed-step table keyed by idempotency_key, populated inside the commit_step transaction. This is the authoritative dedupe source:

sql
CREATE TABLE animus_step_commits (
    idempotency_key   TEXT PRIMARY KEY,
    step_id           TEXT NOT NULL,
    workflow_id       TEXT NOT NULL,
    step_name         TEXT NOT NULL,
    committed_at      TIMESTAMPTZ NOT NULL,
    -- Explicit outcome: a no-payload success and a no-payload error are
    -- distinguishable even when both output and error columns are NULL.
    -- (Fixes codex round-4 P1-3.)
    outcome           TEXT NOT NULL CHECK (outcome IN ('success', 'error')),
    output            JSONB,
    error             JSONB
);

PRIOR_ERROR retry contract (fixes codex round-4 P1-2):

PRIOR_ERROR is terminal for that idempotency_key. Once a step is committed with outcome = 'error', future begin_step calls with the same key will continue to return PRIOR_ERROR and will never re-execute the side effect. If a caller wants to retry after a permanent error, it must use a different idempotency_key (typically by incrementing a retry counter in the key: format!("animus:run:{}:phase:{}:attempt:{}", ...). The Animus daemon owns this policy; the durable_store plugin does not provide a "reset error" RPC in v0.5.

abandon_step only affects active reservations (status NEW or IN_PROGRESS), not committed errors. It is for the "caller decided not to call commit_step at all" case — typically because of an upstream cancellation before the side effect even started.

Protocol-to-DBOS mapping:

Protocol RPCPlugin implementation
durable/begin_workflow_runDBOS.startWorkflow(AnimusJournal, { workflowID: format!("animus:run:{}:phase:{}", run_id, phase_id) }).beginPhase(...). Persist epoch counter in a single-row table (animus_journal_epoch); increment and return
durable/begin_stepIn a single transaction: (1) UPDATE animus_step_reservations SET released = TRUE WHERE expires_at < now() AND released = FALSE to clear expired live rows. (2) SELECT FROM animus_step_commits WHERE idempotency_key = $1. If outcome = 'success': return ALREADY_COMMITTED with prior_output. If outcome = 'error': return PRIOR_ERROR with prior_error. (3) SELECT FROM animus_step_reservations WHERE idempotency_key = $1 AND released = FALSE. If exists: return IN_PROGRESS with reservation_expires_at. (4) INSERT a new reservation row (TTL = reservation_ttl_secs or default 300s); return NEW with fresh step_id
durable/commit_stepIn a single transaction: (a) inside @DBOS.step(), write the step record to DBOS's checkpoint tables; (b) INSERT INTO animus_step_commits (idempotency_key, step_id, workflow_id, step_name, committed_at, outcome, output, error) ON CONFLICT (idempotency_key) DO NOTHING (idempotent double-commit safe); (c) UPDATE animus_step_reservations SET released = TRUE WHERE step_id = $1
durable/abandon_stepUPDATE animus_step_reservations SET released = TRUE WHERE step_id = $1. Affects only active reservations; does NOT clear committed errors
durable/recover_in_flightDBOS.listWorkflows({ status: "PENDING" }) → filter epoch → for each, JOIN with animus_step_commits to compute last_committed_step
durable/query_runDBOS.listWorkflowSteps(workflowID) plus animus_step_commits records → map to StepRecord[]

Critical determinism note: Steps within a workflow must be issued in the same order on every invocation. The Animus daemon guarantees this by deriving step_name deterministically from (phase_id, step_ordinal).

Gotchas (from DBOS research)

  • DBOS.launch() is process-global. One plugin process = one DBOS instance = one Postgres database.
  • recoverPendingWorkflows runs at plugin startup automatically — daemon must tolerate side-effect replay before any explicit recover_in_flight call.
  • DBOS step args are serialized to Postgres; keep payloads small and stream large artifacts via Animus's artifact store with refs only.
  • DBOS may bind a default admin HTTP port (3001). Plugin must disable or configure explicitly to avoid collisions with other plugins.
  • Node ≥20 hard requirement. Plugin preflight surfaces clear error if older.

4. animus-memory-store-protocol

Persistent semantic memory across runs, agents, and tasks. First implementation: animus-memory-zep (TypeScript, Zep Cloud).

Kind constant

rust
pub const PLUGIN_KIND_MEMORY_STORE: &str = "memory_store";

Methods

rust
pub const METHOD_MEMORY_PUT: &str = "memory/put";
pub const METHOD_MEMORY_GET: &str = "memory/get";
pub const METHOD_MEMORY_QUERY: &str = "memory/query";
pub const METHOD_MEMORY_LIST_SCOPES: &str = "memory/list_scopes";
pub const METHOD_MEMORY_DELETE_SCOPE: &str = "memory/delete_scope";

Types

rust
/// Memory scope hierarchy. The scope id is derived by the plugin from these
/// fields as a flat namespace. Project-wide memory: project_id only. Per-agent:
/// project_id + agent_id. Per-task: project_id + agent_id + task_id.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryScope {
    pub project_id: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub agent_id: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub task_id: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PutMemoryRequest {
    pub scope: MemoryScope,
    pub key: String,
    pub value: Value,
    /// Optional hint; backends MAY ignore. Backends MUST declare
    /// `native_ttl` in their capabilities; when false, the TTL is recorded
    /// in metadata only and the caller is responsible for eviction.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub ttl_secs: Option<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PutMemoryResponse {
    pub ack: bool,
    /// True if the backend has fully indexed the value and `query` is
    /// expected to find it immediately. False when ingestion is async
    /// (e.g., Zep). Callers needing read-after-write semantics must
    /// surface this. (Fixes codex P2-6.)
    pub indexed_immediately: bool,
    /// Backend-issued identifier for the stored entry. Useful for delete-by-id
    /// or audit. May be empty if the backend doesn't expose stable record ids.
    #[serde(default, skip_serializing_if = "String::is_empty")]
    pub record_id: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetMemoryRequest {
    pub scope: MemoryScope,
    pub key: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetMemoryResponse {
    pub found: bool,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub value: Option<Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryMemoryRequest {
    pub scope: MemoryScope,
    pub query: String,
    pub top_k: u32,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryMemoryResponse {
    pub results: Vec<MemoryQueryResult>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryQueryResult {
    pub key: String,
    pub value: Value,
    pub score: f32,
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ListScopesRequest {
    /// If set, only return scopes under this project. Else return all scopes.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub project_id: Option<String>,
    /// Cursor-based pagination. Set to `None` for the first page.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub cursor: Option<String>,
    /// Page size. Backends MAY clamp to their per-backend maximum (declared
    /// in capabilities). Defaults to 100 if unset.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub page_size: Option<u32>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListScopesResponse {
    pub scopes: Vec<MemoryScope>,
    /// Cursor for the next page, or `None` if exhausted. (Fixes codex P2-7.)
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub next_cursor: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteScopeRequest {
    pub scope: MemoryScope,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteScopeResponse {
    pub ack: bool,
}

Error codes (memory_store)

rust
pub mod error_codes {
    /// Scope not found (e.g., delete_scope on unknown id).
    pub const SCOPE_NOT_FOUND: i32 = -32301;
    /// Memory key not found (memory/get when key doesn't exist).
    pub const KEY_NOT_FOUND: i32 = -32302;
    /// Two distinct scopes normalize to the same backend graph id.
    pub const MEMORY_SCOPE_COLLISION: i32 = -32303;
    /// Backend (Zep, etc.) unavailable.
    pub const BACKEND_UNAVAILABLE: i32 = -32304;
    /// Backend rate-limited the request. Caller should back off.
    pub const RATE_LIMITED: i32 = -32305;
    /// `top_k` exceeded backend-declared maximum.
    pub const QUERY_TOP_K_EXCEEDED: i32 = -32306;
    /// Project root mismatch.
    pub const PROJECT_BINDING_MISMATCH: i32 = -32307;
}

Zep implementation guidance

Use standalone Graphs (not Users) for all scope levels. Map scope to graphId.

ID escaping rule (fixes codex P2-8): all project_id, agent_id, and task_id segments are normalized before interpolation:

fn normalize(segment: &str) -> String {
    // 1. Lowercase
    // 2. Replace any character not in [a-z0-9_-] with '-'
    // 3. Collapse repeated '-' into one
    // 4. Trim leading/trailing '-'
    // 5. Truncate to 64 chars
    // 6. If result is empty after normalization, replace with hex of sha256(input)[0..16]
}

The plugin records the un-normalized original in scope metadata so reverse lookup is possible. Two scopes whose normalized graph IDs collide trigger a MEMORY_SCOPE_COLLISION error (see error codes).

ScopegraphId pattern
project-wide ({project_id})proj_${normalize(project_id)}
per-agent ({project_id, agent_id})proj_${normalize(project_id)}__agent_${normalize(agent_id)}
per-task ({project_id, agent_id, task_id})proj_${normalize(project_id)}__agent_${normalize(agent_id)}__task_${normalize(task_id)}
Protocol RPCZep operation
memory/putgraph.create() if not exists (catch 409); then graph.add({ graphId, type: "text", data: JSON.stringify({key, value}), metadata: { key, ttl_s } })
memory/getgraph.search({ graphId, query: key, scope: "episodes", limit: 1 }) — return first hit whose metadata.key matches exactly
memory/querygraph.search({ graphId, query, scope: "edges", limit: top_k, reranker: "rrf" })
memory/list_scopesgraph.listAll({ pageSize: page_size ?? 100, cursor }) filtered by id prefix proj_${normalize(project_id)} if provided (use the SAME normalize function from the graphId mapping above — never raw project_id). (Fixes codex round-2 P2-3.)
memory/delete_scopegraph.delete(graphId)

Gotchas (from Zep research)

  • No native TTL. Backends MUST document this in their manifest capabilities. The ttl_secs field is advisory.
  • No native key-value get. Zep is semantic; memory/get falls back to graph.search() and filtering. Document this in plugin README: callers should use query for fuzzy match and get only for exact-key recall on small scopes.
  • Ingestion is async. A put immediately followed by query may miss. Plugin should document eventual consistency.
  • data must be a string. Plugin JSON-stringifies on the way in, parses on the way out.
  • Metadata cap: 10 keys, scalar values only. Plugin enforces or surfaces error.
  • Rate limits. Zep Cloud free tier degrades under load. Plugin surfaces 429s up to the daemon for backoff.
  • Search hard cap 50 for non-auto scopes. Plugin clamps top_k and warns if higher requested.
  • userId and graphId are mutually exclusive. Plugin always uses graphId.

Capability declaration

rust
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct MemoryStoreCapabilities {
    /// Whether backend honors `ttl_secs`. Zep: false. SQLite-backed: true.
    #[serde(default)]
    pub native_ttl: bool,
    /// Whether `memory/get` is O(1) exact-key. Zep: false (search-based).
    #[serde(default)]
    pub native_key_get: bool,
    /// Whether put → query is immediately consistent. Zep: false.
    #[serde(default)]
    pub strong_consistency: bool,
    /// Max `top_k` allowed.
    #[serde(default)]
    pub max_query_top_k: u32,
}

Cross-protocol concerns

Subject type ownership — canonical home is animus-subject-protocol

SubjectRef and SubjectDispatch are referenced by animus-workflow-runner-protocol (§1), animus-queue-protocol (§2), and the existing animus-subject-protocol. They MUST have exactly one definition. (Fixes codex round-3 P1-3.)

Decision: animus-subject-protocol is the canonical home. Both new protocol crates depend on animus-subject-protocol for these types. There is no parallel definition.

Consequences for the lift work (Wave 2A):

The current crates/protocol/ crate inside ao-cli also defines SubjectRef and SubjectDispatch. As part of v0.5:

  1. animus-subject-protocol v0.5.0 includes the authoritative type definitions, copied from protocol (preserving the existing wire format; this is a re-homing, not a redesign).
  2. protocol inside ao-cli deprecates its local definitions and re-exports from animus-subject-protocol:
    rust
    pub use animus_subject_protocol::{SubjectRef, SubjectDispatch};
  3. Wave 2A lifted plugins (workflow_runner_default, queue_default) reference animus_subject_protocol::SubjectDispatch directly, not the (now-deprecated) re-export. The lifted code's existing protocol::SubjectDispatch references are migrated to animus_subject_protocol::SubjectDispatch during the lift.
  4. The Wave 3 daemon refactor also migrates remaining ao-cli references off protocol::SubjectDispatch onto animus_subject_protocol::SubjectDispatch. Once complete, the deprecated re-export can be removed in a future release.

This is included as a Wave 1 task below.

Plugin-kind constants summary (additions to animus-plugin-protocol)

rust
pub const PLUGIN_KIND_WORKFLOW_RUNNER: &str = "workflow_runner";
pub const PLUGIN_KIND_QUEUE: &str = "queue";
pub const PLUGIN_KIND_DURABLE_STORE: &str = "durable_store";
pub const PLUGIN_KIND_MEMORY_STORE: &str = "memory_store";

animus-plugin-protocol version

Bumps from 1.0.01.1.0. Two additive changes (fixes codex P1-5):

  1. New plugin-kind constants (above).
  2. InitializeParams.init_extensions map — opaque per-extension blobs the host may pass on initialize. Used by v0.5 to send project_binding (see Common Conventions). Plugins ignore extensions they don't recognize. Forward-compatible.
  3. InitializeResult.capabilities typed shape — currently this field exists but is loosely typed. v0.5 standardizes it to:
jsonc
{
  "protocol_version": "1.1.0",
  "kinds": ["workflow_runner"],          // list of PLUGIN_KIND_* this process implements
  "capabilities": {
    "workflow_runner": {                  // keyed by kind
      "crate_version": "0.1.0",           // the per-kind protocol crate version
      "extra": {                          // backend-specific capability flags (typed by each crate)
        "phase_decision_parsing": true,
        "rework_context_support": true,
        "post_success_actions": true,
        "crash_recovery": true,
        "manual_pause_support": true
      }
    }
  }
}

The daemon enforces:

  • Plugin's declared protocol_version major matches daemon's expected major
  • Plugin advertises at least one kind the daemon needs
  • Plugin's per-kind crate_version major matches the protocol crate the daemon depends on

Wave 1 MUST include the typed Capabilities struct in animus-plugin-protocol — this is a concrete, testable code change and is captured in the implementation tasks below.

Default flavor manifest additions

The flavors/ directory at the repo root is a new v0.5 addition (it does not exist in v0.4.x). It is owned by ao-cli (the kernel repo); flavor manifests are checked-in TOML files. The Brief A agent creates the directory and the initial default.toml. (Fixes codex P3-3.)

Update flavors/default.toml:

toml
[workflow_runner]
required = ["launchapp-dev/animus-workflow-runner-default"]

[queue]
required = ["launchapp-dev/animus-queue-default"]

# durable_store and memory_store are OPT-IN, not required for the default
# flavor. Users who want crash-safe execution install the DBOS plugin;
# users who want persistent agent memory install the Zep plugin.
[durable_store]
recommended = ["launchapp-dev/animus-step-durable-dbos"]

[memory_store]
recommended = ["launchapp-dev/animus-memory-zep"]

v0.5 implementation tasks

These are the concrete code-level tasks for the v0.5 swarm. Each Wave 1 agent owns one protocol crate; each Wave 2 agent owns one plugin implementation.

Wave 1 (in animus-staging/animus-protocol workspace, must complete before Wave 2)

  • [ ] Add 4 plugin-kind constants to animus-plugin-protocol/src/lib.rs; bump PROTOCOL_VERSION to "1.1.0"
  • [ ] In animus-subject-protocol: add authoritative SubjectRef and SubjectDispatch definitions (copy from ao-cli's crates/protocol/ preserving wire format). Tag animus-subject-protocol v0.5.0
  • [ ] Extend animus-plugin-protocol InitializeParams with init_extensions: HashMap<String, Value> for the project-binding extension and forward-compat extension points
  • [ ] Extend animus-plugin-protocol InitializeResult with the typed Capabilities struct (one crate_version + per-kind extra blob per kind the plugin implements). Add KindCapability type and Capabilities = HashMap<String, KindCapability>
  • [ ] Create animus-workflow-runner-protocol/ crate (this spec, §1) — includes typed WorkflowRunnerCapabilities for the extra field
  • [ ] Create animus-queue-protocol/ crate (this spec, §2) — includes typed QueueCapabilities (page-size limits, supported status filters)
  • [ ] Create animus-durable-store-protocol/ crate (this spec, §3) — includes typed DurableStoreCapabilities (default reservation TTL, max payload size, etc.)
  • [ ] Create animus-memory-store-protocol/ crate (this spec, §4) — includes typed MemoryStoreCapabilities
  • [ ] Update workspace Cargo.toml to include all four new crates
  • [ ] Update spec.md with the four new kind sections
  • [ ] Tag the protocol workspace v0.5.0

Wave 2A — Rust lift-and-shift (after Wave 1)

  • [ ] animus-workflow-runner-default plugin repo: lift workflow-runner-v2 per kernel-extraction-v0.5.md
  • [ ] animus-queue-default plugin repo: lift queue logic per the extraction map

Wave 2B — TS greenfield (parallel with 2A)

  • [ ] animus-step-durable-dbos plugin repo: implement durable_store via DBOS Option A
  • [ ] animus-memory-zep plugin repo: implement memory_store via Zep Cloud

Wave 3 — Kernel cleanup (after 2A)

  • [ ] Update ao-cli to use the workflow_runner plugin for execution instead of the in-tree crate
  • [ ] Update ao-cli to use the queue plugin for queue ops
  • [ ] Update flavors/default.toml to include the bundled defaults
  • [ ] Add animus flavor CLI subcommand
  • [ ] Update docs/reference/cli/index.md

Known limitations in v0.5 (must be documented in plugin READMEs)

  • Phase event volatility on crash. workflow/execute returns phase_events as a final vector. If the workflow_runner plugin crashes mid-execution, in-memory phase events are lost. The daemon detects unfinished runs via durable_store/recover_in_flight (when a durable_store is installed) and restarts the workflow, but the original event timeline up to the crash is not recoverable from the runner alone. Plugins SHOULD persist critical decision events (Decision, Completed { status: "manual_pending" }) to disk under <project_root>/.animus/workflow-events/<run_id>.jsonl for post-crash inspection. (Addresses codex P2-1.)
  • memory_store eventual consistency. Zep ingestion is async; put followed immediately by query may miss. Use PutMemoryResponse.indexed_immediately to detect.
  • durable_store fence is not a transaction across the IPC boundary. A daemon crash between the side effect and commit_step leaves the reservation outstanding until TTL; the recovered workflow sees IN_PROGRESS and the caller (daemon) decides retry policy. End-to-end exactly-once is deferred to v0.7+ Option B.

Open questions deferred to v0.6+

  • Phase event real-time streaming: v0.5 returns events at workflow end. A workflow/events/poll cursor method is deferred to v0.6.
  • Distributed queue: animus-queue-default is in-memory + JSON file. Multi-machine coordination requires a future animus-queue-redis or animus-queue-postgres plugin.
  • Workflow runner mode = runner: v0.5 ships journal-mode durable_store + in-process workflow_runner. The Option B "runner mode" (DBOS owns the workflow loop) is deferred to v0.7+ and depends on detached agent execution as a prerequisite capability.
  • Memory_store consistency: v0.5 documents eventual consistency. A future strong-consistency profile (for backends that support it) may extend the capability declaration.

This document is the source of truth for Wave 1 protocol authoring. Wave 2 agents read this spec plus the kernel-extraction map.

Released under the Elastic License 2.0 (ELv2).