v0.5 Protocol Specifications
Status
- Version: v0.5 protocol commitment
- Type: Multi-crate design specification
- Source of truth for: Wave 1 protocol-crate authoring agents and Wave 2 implementation agents
- Builds on: kernel-and-flavors.md, plugin-pack-kernel.md, plugin-system.md, animus-vs-dbos-transact.md
- Companion: kernel-extraction-v0.5.md (lift-and-shift map for the Rust reference implementations)
TL;DR
v0.5 ships four new plugin kinds at the protocol level. Two are lift-and-shift extractions of existing in-tree responsibilities (workflow_runner, queue) — their reference implementations live as bundled plugins in the default flavor. Two are net-new external integrations (durable_store → DBOS, memory_store → Zep). All four share the same plugin host, JSON-RPC envelope, signing, and lifecycle defined in animus-plugin-protocol — they only add typed RPC surfaces.
| Plugin kind | New protocol crate | First implementation |
|---|---|---|
workflow_runner | animus-workflow-runner-protocol | animus-workflow-runner-default (Rust, lift) |
queue | animus-queue-protocol | animus-queue-default (Rust, lift) |
durable_store | animus-durable-store-protocol | animus-step-durable-dbos (TypeScript, DBOS) |
memory_store | animus-memory-store-protocol | animus-memory-zep (TypeScript, Zep) |
Common conventions (apply to all four crates)
Each new protocol crate follows the pattern established by animus-subject-protocol and animus-provider-protocol:
//! Protocol types for {kind} plugins.
#![warn(missing_docs)]
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// Plugin-kind constant — must match the value added to
/// `animus-plugin-protocol::PLUGIN_KIND_*`.
pub const KIND: &str = "{kind}";
/// Method constants used as JSON-RPC `method` strings.
pub const METHOD_X: &str = "{kind}/{verb}";Wire conventions across all four:
- Newline-delimited JSON-RPC 2.0 over stdio (existing
animus-plugin-protocolenvelope) - All params/results use
serdewithskip_serializing_if = "Option::is_none"on optional fields - Timestamps as RFC 3339 strings (
String), notchrono::DateTime, to keep protocol crate dep-free ofchrono Value(serde_json::Value) for opaque/backend-specific payloads- No
Arc<T>, no trait objects, no closures, no streams in protocol types — everything isSerialize + Deserialize + Send + 'static - Method naming standard: all methods use
snake_caseverbs (e.g.,workflow/run_phase,queue/mark_assigned). No hyphens. (Fixes codex P3-2.)
Project-scope binding (applies to ALL four kinds):
Every plugin process in v0.5 is bound to exactly one project root at startup time, via an extension to the standard initialize handshake. The host passes the project root as part of an init_extensions map:
// JSON-RPC initialize params (extension on existing animus-plugin-protocol)
{
"protocol_version": "1.1.0",
"init_extensions": {
"project_binding": {
"project_root": "/abs/path/to/project",
"repo_scope": "<scope id from kernel>"
}
}
}Plugins MUST refuse subsequent RPCs that imply a different project root. The host MUST NOT reuse a single plugin process across projects. This avoids needing a project_root field on every RPC and keeps the protocol concrete. (Fixes codex P1-1.)
Backward-compatibility policy:
- Adding a new plugin kind =
animus-plugin-protocolminor version bump (e.g.,1.0.0→1.1.0) - Adding a method to an existing kind's protocol crate = minor bump on that crate
- Changing an existing method's params/result = major bump on that crate
- Plugins declare which protocol crate versions they implement at handshake via
InitializeResult::capabilities(see §"Cross-protocol concerns" for the concrete capability schema)
Status vocabularies:
Where types use bare String status fields (workflow_status, phase_status, queue entry status, durable run status), each protocol crate declares the allowed values as pub const constants in a status submodule. Plugin authors reference these constants instead of inlining strings. (Fixes codex P3-4.)
1. animus-workflow-runner-protocol
The workflow runner is the engine that executes Animus workflow YAML — iterating phases, evaluating decision contracts, handling rework loops, applying post-success actions. v0.5 extracts the in-tree workflow-runner-v2 crate into a bundled plugin.
Kind constant
In animus-plugin-protocol/src/lib.rs:
/// Plugin kind for workflow runner plugins.
///
/// Workflow runners execute Animus workflow YAML by orchestrating phases,
/// evaluating decision contracts, and handling rework loops.
pub const PLUGIN_KIND_WORKFLOW_RUNNER: &str = "workflow_runner";Methods
pub const METHOD_WORKFLOW_EXECUTE: &str = "workflow/execute";
pub const METHOD_WORKFLOW_RUN_PHASE: &str = "workflow/run_phase";workflow/execute — Full workflow run
use animus_subject_protocol::{SubjectDispatch, SubjectRef};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowExecuteRequest {
// `project_root` is bound at initialize-time (see Common Conventions);
// it is NOT a per-request field.
/// Existing workflow id to resume, or `None` to start a fresh run.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub workflow_id: Option<String>,
/// Subject envelope. Exactly one of `subject_dispatch` OR
/// (`subject_ref` + one of [`task_id`, `requirement_id`, `title`+`description`])
/// must be set. Generic subject backends (Linear, Jira, custom kinds) MUST
/// use `subject_dispatch`; task and requirement backends MAY use the
/// convenience fields. (Fixes codex P1-3.)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub subject_dispatch: Option<SubjectDispatch>,
/// For convenience callers (task/requirement subjects), the subject_ref
/// identifies what to run when `subject_dispatch` is not set.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub subject_ref: Option<SubjectRef>,
/// Task id (used only when `subject_dispatch` is None and `subject_ref.kind == "task"`).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub task_id: Option<String>,
/// Requirement id (used only when `subject_dispatch` is None and `subject_ref.kind == "requirement"`).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub requirement_id: Option<String>,
/// For custom ad-hoc subjects without an existing subject_ref: human-readable title.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub title: Option<String>,
/// For custom ad-hoc subjects: description / prompt seed.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
/// Workflow YAML ref (e.g., `"standard"`, `"research-first"`).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub workflow_ref: Option<String>,
/// Initial input JSON for workflow variables.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub input: Option<Value>,
/// Workflow scalar variables.
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub vars: HashMap<String, String>,
/// Force a specific model (e.g., `"claude-opus-4-7"`).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
/// Force a specific tool (e.g., `"claude"`, `"codex"`, `"gemini"`).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool: Option<String>,
/// Per-phase timeout in seconds.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub phase_timeout_secs: Option<u64>,
/// Single-phase filter: run only this phase id.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub phase_filter: Option<String>,
/// Opaque phase routing config (backend-specific).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub phase_routing: Option<Value>,
/// Opaque MCP runtime config (backend-specific).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mcp_config: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowExecuteResult {
pub workflow_id: String,
pub workflow_ref: String,
/// One of: `"completed"`, `"running"`, `"failed"`, `"escalated"`,
/// `"cancelled"`.
pub workflow_status: String,
pub subject_id: String,
pub execution_cwd: String,
pub phases_requested: Vec<String>,
pub phases_completed: usize,
pub phases_total: usize,
pub total_duration_secs: u64,
pub phase_results: Vec<PhaseResultSnapshot>,
/// Post-success action outcome (push, PR creation, merge, etc.).
pub post_success: Value,
/// True iff `workflow_status == "completed"`.
pub success: bool,
/// Phase events emitted during execution (replaces in-process callback).
#[serde(default)]
pub phase_events: Vec<PhaseEvent>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PhaseResultSnapshot {
pub phase_id: String,
/// `"completed"`, `"rework"`, `"closed"`, `"failed"`, `"manual_pending"`, etc.
pub status: String,
pub duration_secs: u64,
pub outcome: Value,
pub metadata: Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub next_phase_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub close_reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum PhaseEvent {
Started { phase_id: String, attempt: u32, ts: String },
Decision { phase_id: String, verdict: String, confidence: Option<f32>, ts: String },
Completed { phase_id: String, status: String, ts: String },
}workflow/run-phase — Single phase execution
Used by the daemon's per-phase scheduler and CLI --phase filters. The runner does not iterate phases here — it runs exactly one phase and returns.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowPhaseRunRequest {
// project_root is bound at initialize-time (see Common Conventions).
// NOT a per-request field.
pub execution_cwd: String,
pub workflow_id: String,
pub workflow_ref: String,
pub subject_id: String,
pub subject_title: String,
pub subject_description: String,
pub phase_id: String,
pub phase_attempt: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub phase_timeout_secs: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model_override: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_override: Option<String>,
/// One of `"minimal"`, `"low"`, `"medium"`, `"high"`, `"critical"`.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub task_complexity: Option<String>,
/// Rework context from a prior phase's `rework` verdict.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rework_context: Option<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub pipeline_vars: HashMap<String, String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub dispatch_input: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub schedule_input: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub phase_routing: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowPhaseRunResult {
/// `"completed"`, `"manual_pending"`, `"failed"`.
pub phase_status: String,
pub duration_secs: u64,
pub outcome: Value,
pub metadata: Value,
#[serde(default)]
pub signals: Vec<Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool: Option<String>,
}Error codes (workflow_runner)
pub mod error_codes {
/// Workflow id not found.
pub const WORKFLOW_NOT_FOUND: i32 = -32101;
/// Phase id not found within workflow.
pub const PHASE_NOT_FOUND: i32 = -32102;
/// Workflow already in a terminal state.
pub const WORKFLOW_TERMINAL: i32 = -32103;
/// Project root mismatch (plugin is bound to a different project).
pub const PROJECT_BINDING_MISMATCH: i32 = -32104;
/// Manual gate not satisfied; workflow paused.
pub const MANUAL_GATE_PENDING: i32 = -32105;
/// Decision contract evaluation failed (parser, validator).
pub const DECISION_CONTRACT_INVALID: i32 = -32106;
}Manifest declaration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowRunnerManifest {
pub name: String,
pub version: String,
pub description: String,
pub capabilities: WorkflowRunnerCapabilities,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WorkflowRunnerCapabilities {
#[serde(default)]
pub phase_decision_parsing: bool,
#[serde(default)]
pub rework_context_support: bool,
#[serde(default)]
pub post_success_actions: bool,
#[serde(default)]
pub crash_recovery: bool,
#[serde(default)]
pub manual_pause_support: bool,
}IPC mitigations table
| In-process pattern | Plugin pattern |
|---|---|
PhaseEventCallback closure | Return Vec<PhaseEvent> in WorkflowExecuteResult::phase_events |
SharedWorkflowEventEmitter: Arc<dyn …> | Removed; plugin emits events into the result |
Arc<dyn ServiceHub> param | Removed; plugin loads its own hub from project_root |
| Phase output file I/O | Plugin owns file paths under <project_root>/.animus/ |
Async event subscriptions (workflow/events channel) | Paginate via cursor on a separate workflow/events/poll method (out of scope for v0.5 protocol — daemon polls via existing control plane) |
Tests to lift
All from crates/workflow-runner-v2/tests/:
durability_idempotency_and_markers.rsdurability_manual_pending_and_failed_events.rssession_resume.rsnotification_log.rsphase_duration_histogram_records_observations.rs
Plus module-level mod tests blocks in:
phase_executor::tests::payload_traversal::tests::phase_output::tests::workflow_helpers::tests::
2. animus-queue-protocol
The queue is a per-project FIFO buffer for SubjectDispatch work items awaiting scheduling. Currently in-tree at crates/orchestrator-daemon-runtime/src/queue/. v0.5 extracts to a bundled plugin.
Kind constant
pub const PLUGIN_KIND_QUEUE: &str = "queue";Methods
pub const METHOD_QUEUE_ENQUEUE: &str = "queue/enqueue";
pub const METHOD_QUEUE_LIST: &str = "queue/list";
pub const METHOD_QUEUE_LEASE: &str = "queue/lease";
pub const METHOD_QUEUE_STATS: &str = "queue/stats";
pub const METHOD_QUEUE_HOLD: &str = "queue/hold";
pub const METHOD_QUEUE_RELEASE: &str = "queue/release";
pub const METHOD_QUEUE_DROP: &str = "queue/drop";
pub const METHOD_QUEUE_REORDER: &str = "queue/reorder";
pub const METHOD_QUEUE_MARK_ASSIGNED: &str = "queue/mark_assigned";
pub const METHOD_QUEUE_COMPLETION: &str = "queue/completion";Status vocabulary
pub mod status {
pub const PENDING: &str = "pending";
pub const ASSIGNED: &str = "assigned";
pub const HELD: &str = "held";
// Completion statuses
pub const COMPLETED: &str = "completed";
pub const FAILED: &str = "failed";
pub const CANCELLED: &str = "cancelled";
}Types
use animus_subject_protocol::SubjectDispatch;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueEnqueueRequest {
pub subject_dispatch: SubjectDispatch,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueEnqueueResponse {
/// `true` if a new entry was created. `false` if the dispatch was rejected
/// as a duplicate of an existing pending/assigned entry (idempotent).
pub enqueued: bool,
/// Stable entry id assigned by the plugin. Used by all subsequent
/// mutation calls.
pub entry_id: String,
/// Convenience: the subject id from the dispatch envelope.
pub subject_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct QueueListRequest {
/// Filter by status. Values from `status::*`. Empty means all.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub status: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub limit: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub offset: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueListResponse {
pub entries: Vec<QueueEntry>,
pub total: usize,
pub stats: QueueStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueEntry {
/// Stable entry id (unique within the project queue). Mutation calls
/// target this. (Fixes codex P2-3.)
pub entry_id: String,
pub subject_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub task_id: Option<String>,
/// Full dispatch envelope — included so the daemon can lease an entry
/// and start work without a second roundtrip. (Fixes codex P1-2.)
pub subject_dispatch: SubjectDispatch,
/// Value from `status::*`.
pub status: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub workflow_id: Option<String>,
pub enqueued_at: String, // RFC 3339
#[serde(default, skip_serializing_if = "Option::is_none")]
pub assigned_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub held_at: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct QueueStats {
pub total: usize,
pub pending: usize,
pub assigned: usize,
pub held: usize,
}
/// Atomic dispatch path: claim up to `max` pending entries in priority order,
/// returning each with its full `SubjectDispatch`. The plugin transitions each
/// returned entry from Pending → Assigned in a single transaction.
/// (Replaces "list then mark_assigned" with atomic lease per codex P1-2.)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueLeaseRequest {
pub max: usize,
/// Optional daemon-provided workflow ids to attach to leased entries.
/// If set, length MUST be exactly `max` (the plugin returns an error
/// otherwise). If `None`, the plugin generates synthetic UUIDs.
/// (Fixes codex round-2 P2-2.)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub workflow_ids: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueLeaseResponse {
pub leased: Vec<QueueEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueHoldRequest {
pub entry_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueMutationResponse {
/// `true` if the entry state changed. `false` if the entry was already in
/// the requested state (idempotent no-op) or was not found (in which case
/// `not_found` is `true`). (Fixes codex P2-2.)
pub changed: bool,
#[serde(default)]
pub not_found: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueReleaseRequest {
pub entry_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueDropRequest {
pub entry_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueReorderRequest {
/// New order (partial — entries not in this list keep their existing position).
pub entry_ids: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueReorderResponse {
pub reordered_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueMarkAssignedRequest {
pub entry_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub workflow_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueCompletionRequest {
pub entry_id: String,
/// Value from `status::*` (completed | failed | cancelled).
pub status: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub workflow_ref: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub workflow_id: Option<String>,
}Hold / release / drop / mark_assigned / completion all return QueueMutationResponse. Reorder returns QueueReorderResponse with the count of entries whose position changed.
Capacity / throttling note
The queue protocol does not carry capacity logic. Capacity is a daemon-side concern (dispatch_headroom = max_tasks_per_tick.min(pool_size - active_agents)). The daemon decides how many items to lease from the queue per tick; the queue plugin just provides ordered access. This keeps the queue protocol minimal and lets capacity policy stay in the kernel without coupling to the queue backend.
Error codes
pub mod error_codes {
pub const QUEUE_ENTRY_NOT_FOUND: i32 = -32001;
pub const QUEUE_ENTRY_ALREADY_ASSIGNED: i32 = -32002;
pub const QUEUE_ENTRY_NOT_PENDING: i32 = -32003;
pub const QUEUE_REORDER_FAILED: i32 = -32004;
pub const QUEUE_LOCK_ACQUISITION_FAILED: i32 = -32005;
}Tests to lift
From crates/orchestrator-daemon-runtime/src/queue/queue_service.rs:
enqueue_subject_dispatch_is_idempotent_for_same_task_pipelinehold_release_and_reorder_use_subject_idsenqueue_subject_dispatch_accepts_non_task_subjectsreorder_subjects_keeps_all_entries_for_same_subjectgeneric_subjects_use_kind_qualified_queue_ids
3. animus-durable-store-protocol
Durable step checkpointing for crash-safe workflow execution. First implementation: animus-step-durable-dbos (TypeScript, DBOS as durable journal — Option A from the DBOS analysis). Future implementations may include Temporal, SQLite-based, or distributed engines.
Kind constant
pub const PLUGIN_KIND_DURABLE_STORE: &str = "durable_store";Methods
pub const METHOD_DURABLE_BEGIN_WORKFLOW_RUN: &str = "durable/begin_workflow_run";
pub const METHOD_DURABLE_BEGIN_STEP: &str = "durable/begin_step";
pub const METHOD_DURABLE_COMMIT_STEP: &str = "durable/commit_step";
pub const METHOD_DURABLE_ABANDON_STEP: &str = "durable/abandon_step";
pub const METHOD_DURABLE_RECOVER_IN_FLIGHT: &str = "durable/recover_in_flight";
pub const METHOD_DURABLE_QUERY_RUN: &str = "durable/query_run";Status vocabulary
pub mod step_status {
/// First time the plugin has seen this idempotency_key. Caller proceeds
/// to execute the side effect, then calls `commit_step`.
pub const NEW: &str = "new";
/// Another caller has an outstanding reservation for this idempotency_key
/// with an unexpired lease. The current caller MUST NOT execute the side
/// effect; it should wait, retry, or surface to the daemon for arbitration.
pub const IN_PROGRESS: &str = "in_progress";
/// Step was previously committed with an output. `prior_output` is set.
pub const ALREADY_COMMITTED: &str = "already_committed";
/// Step was previously committed with an error. `prior_error` is set.
/// PRIOR_ERROR is TERMINAL for this idempotency_key — the plugin will
/// never re-execute the side effect for this key. Callers that want to
/// retry after a permanent error must use a different idempotency_key
/// (typically by incrementing an attempt counter). `abandon_step` does
/// NOT clear committed errors. (See DBOS guidance for the retry contract.)
pub const PRIOR_ERROR: &str = "prior_error";
}
pub mod run_status {
pub const PENDING: &str = "pending";
pub const SUCCESS: &str = "success";
pub const ERROR: &str = "error";
pub const CANCELLED: &str = "cancelled";
}Fence semantics (resolves codex P1-4)
The durable store implements a side-effect fence: between begin_step and commit_step, the plugin holds a reservation that prevents concurrent or replay executions of the same idempotency_key. The contract:
begin_stepwith a new key returnsstatus = NEWand creates a reservation with a TTL (default 5 minutes; configurable per plugin manifest).- Caller executes the side effect.
- Caller calls
commit_stepwithoutput(success) orerror(failure). Either commits the result and clears the reservation. - If the caller crashes between step 1 and step 3, the reservation expires after TTL. On recovery, a fresh
begin_stepwith the same key returnsIN_PROGRESSuntil the TTL expires, then transitions to allowing a new reservation. - If the caller explicitly abandons (e.g., decides not to retry), it calls
abandon_stepto release the reservation immediately.
This does not eliminate at-most-once side effects (the daemon can still crash mid-side-effect), but it gives the daemon a deterministic recovery signal and prevents two callers from running the same step concurrently. The "four hard problems" deferral from the DBOS analysis applies: solving end-to-end exactly-once requires deeper integration than Option A. The fence is the minimum that makes Option A useful.
Types
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BeginWorkflowRunRequest {
pub run_id: String,
pub phase_id: String,
#[serde(default, skip_serializing_if = "Value::is_null")]
pub inputs: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BeginWorkflowRunResponse {
/// Plugin-issued, monotonically increasing epoch. The plugin persists
/// the current epoch counter to its durable store; on restart, it MUST
/// resume from the persisted value, not zero. The daemon does not
/// store epoch values across restarts; instead it queries
/// `recover_in_flight` with `since_epoch = 0` after a restart to get
/// all in-flight runs and resumes from there. The epoch is project-scoped
/// (plugin process is bound to one project at init). (Fixes codex P2-5.)
pub epoch: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BeginStepRequest {
pub run_id: String,
pub phase_id: String,
pub step_name: String,
/// Caller-supplied idempotency key. The plugin uses this to deduplicate
/// replays. Daemon convention: `format!("{}:{}:{}", run_id, phase_id, step_name)`.
pub idempotency_key: String,
#[serde(default, skip_serializing_if = "Value::is_null")]
pub payload: Value,
/// Reservation TTL in seconds. If unset, the plugin uses its configured
/// default (manifest-declared, typically 300). Caller MAY override per-step
/// for known-long operations.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reservation_ttl_secs: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BeginStepResponse {
pub step_id: String,
/// Value from `step_status::*`.
pub status: String,
/// Present when `status == ALREADY_COMMITTED`. Contains the previously
/// committed `output` so the caller can short-circuit the side effect.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub prior_output: Option<Value>,
/// Present when `status == PRIOR_ERROR`. Contains the previously
/// committed error.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub prior_error: Option<StepError>,
/// Present when `status == IN_PROGRESS`. RFC 3339 timestamp of when
/// the existing reservation expires.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reservation_expires_at: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommitStepRequest {
pub step_id: String,
/// Required: `"success"` or `"error"`. Use `commit_outcome::*` constants.
/// This is the authoritative dedupe signal for replay — `output`/`error`
/// payloads MAY be null (no-payload commits are valid in both directions),
/// so the outcome cannot be inferred from payload presence.
/// (Fixes codex round-5 P1.)
pub outcome: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output: Option<Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<StepError>,
}
pub mod commit_outcome {
pub const SUCCESS: &str = "success";
pub const ERROR: &str = "error";
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AbandonStepRequest {
pub step_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AbandonStepResponse {
pub ack: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepError {
pub code: String,
pub message: String,
#[serde(default, skip_serializing_if = "Value::is_null")]
pub details: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommitStepResponse {
pub ack: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecoverInFlightRequest {
pub since_epoch: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecoverInFlightResponse {
pub in_flight: Vec<InFlightRun>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InFlightRun {
pub run_id: String,
pub phase_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_committed_step: Option<String>,
/// Backend-specific replay state (e.g., DBOS workflow status payload).
#[serde(default, skip_serializing_if = "Value::is_null")]
pub replay_state: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryRunRequest {
pub run_id: String,
/// Required: DBOS workflow id is keyed by `(run_id, phase_id)`. Different
/// phases of the same run are different workflows. (Fixes codex round-2 P2-1.)
pub phase_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryRunResponse {
pub run_id: String,
/// `"pending"`, `"success"`, `"error"`, `"cancelled"`.
pub status: String,
pub steps: Vec<StepRecord>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepRecord {
pub step_id: String,
pub step_name: String,
pub idempotency_key: String,
pub committed_at: String, // RFC 3339
/// `"success"` or `"error"` — the authoritative outcome (see
/// `commit_outcome::*`). Independent of payload nulls.
pub outcome: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output: Option<Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<StepError>,
}Error codes (durable_store)
pub mod error_codes {
/// Run id not found (begin_workflow_run was never called).
pub const RUN_NOT_FOUND: i32 = -32201;
/// Step id not found (commit_step / abandon_step on unknown reservation).
pub const STEP_NOT_FOUND: i32 = -32202;
/// Reservation expired before commit.
pub const RESERVATION_EXPIRED: i32 = -32203;
/// Backend (e.g., DBOS / Postgres) unavailable.
pub const BACKEND_UNAVAILABLE: i32 = -32204;
/// Project root mismatch.
pub const PROJECT_BINDING_MISMATCH: i32 = -32205;
/// Replay determinism violated (step issued out of order vs. recorded).
pub const REPLAY_NONDETERMINISM: i32 = -32206;
}DBOS implementation guidance (Option A)
DBOS's native checkpoint model commits the step result inside @DBOS.step(). The Animus protocol layers a reservation table on top to fence side effects across the IPC boundary. The plugin maintains a separate Postgres table for reservations alongside DBOS's checkpoint tables.
Reservation table schema (plugin-owned, separate from DBOS's tables):
CREATE TABLE animus_step_reservations (
step_id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL, -- DBOS workflowID
idempotency_key TEXT NOT NULL,
reserved_at TIMESTAMPTZ NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
-- Lifecycle column drives the partial unique index. Postgres does NOT
-- allow now() in a partial index predicate, so use an explicit column
-- set by commit_step / abandon_step / sweeper. (Fixes codex round-4 P1-1.)
released BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE UNIQUE INDEX animus_step_reservations_active_key
ON animus_step_reservations (idempotency_key)
WHERE released = FALSE;
CREATE INDEX animus_step_reservations_expiry_idx
ON animus_step_reservations (expires_at)
WHERE released = FALSE;Within begin_step, the plugin sweeps expired-but-not-released rows in the same transaction (UPDATE animus_step_reservations SET released = TRUE WHERE expires_at < now() AND released = FALSE) before checking liveness. A background tokio task does the same job globally on a 30s timer.
The plugin also maintains a committed-step table keyed by idempotency_key, populated inside the commit_step transaction. This is the authoritative dedupe source:
CREATE TABLE animus_step_commits (
idempotency_key TEXT PRIMARY KEY,
step_id TEXT NOT NULL,
workflow_id TEXT NOT NULL,
step_name TEXT NOT NULL,
committed_at TIMESTAMPTZ NOT NULL,
-- Explicit outcome: a no-payload success and a no-payload error are
-- distinguishable even when both output and error columns are NULL.
-- (Fixes codex round-4 P1-3.)
outcome TEXT NOT NULL CHECK (outcome IN ('success', 'error')),
output JSONB,
error JSONB
);PRIOR_ERROR retry contract (fixes codex round-4 P1-2):
PRIOR_ERROR is terminal for that idempotency_key. Once a step is committed with outcome = 'error', future begin_step calls with the same key will continue to return PRIOR_ERROR and will never re-execute the side effect. If a caller wants to retry after a permanent error, it must use a different idempotency_key (typically by incrementing a retry counter in the key: format!("animus:run:{}:phase:{}:attempt:{}", ...). The Animus daemon owns this policy; the durable_store plugin does not provide a "reset error" RPC in v0.5.
abandon_step only affects active reservations (status NEW or IN_PROGRESS), not committed errors. It is for the "caller decided not to call commit_step at all" case — typically because of an upstream cancellation before the side effect even started.
Protocol-to-DBOS mapping:
| Protocol RPC | Plugin implementation |
|---|---|
durable/begin_workflow_run | DBOS.startWorkflow(AnimusJournal, { workflowID: format!("animus:run:{}:phase:{}", run_id, phase_id) }).beginPhase(...). Persist epoch counter in a single-row table (animus_journal_epoch); increment and return |
durable/begin_step | In a single transaction: (1) UPDATE animus_step_reservations SET released = TRUE WHERE expires_at < now() AND released = FALSE to clear expired live rows. (2) SELECT FROM animus_step_commits WHERE idempotency_key = $1. If outcome = 'success': return ALREADY_COMMITTED with prior_output. If outcome = 'error': return PRIOR_ERROR with prior_error. (3) SELECT FROM animus_step_reservations WHERE idempotency_key = $1 AND released = FALSE. If exists: return IN_PROGRESS with reservation_expires_at. (4) INSERT a new reservation row (TTL = reservation_ttl_secs or default 300s); return NEW with fresh step_id |
durable/commit_step | In a single transaction: (a) inside @DBOS.step(), write the step record to DBOS's checkpoint tables; (b) INSERT INTO animus_step_commits (idempotency_key, step_id, workflow_id, step_name, committed_at, outcome, output, error) ON CONFLICT (idempotency_key) DO NOTHING (idempotent double-commit safe); (c) UPDATE animus_step_reservations SET released = TRUE WHERE step_id = $1 |
durable/abandon_step | UPDATE animus_step_reservations SET released = TRUE WHERE step_id = $1. Affects only active reservations; does NOT clear committed errors |
durable/recover_in_flight | DBOS.listWorkflows({ status: "PENDING" }) → filter epoch → for each, JOIN with animus_step_commits to compute last_committed_step |
durable/query_run | DBOS.listWorkflowSteps(workflowID) plus animus_step_commits records → map to StepRecord[] |
Critical determinism note: Steps within a workflow must be issued in the same order on every invocation. The Animus daemon guarantees this by deriving step_name deterministically from (phase_id, step_ordinal).
Gotchas (from DBOS research)
DBOS.launch()is process-global. One plugin process = one DBOS instance = one Postgres database.recoverPendingWorkflowsruns at plugin startup automatically — daemon must tolerate side-effect replay before any explicitrecover_in_flightcall.- DBOS step args are serialized to Postgres; keep payloads small and stream large artifacts via Animus's artifact store with refs only.
- DBOS may bind a default admin HTTP port (3001). Plugin must disable or configure explicitly to avoid collisions with other plugins.
- Node ≥20 hard requirement. Plugin preflight surfaces clear error if older.
4. animus-memory-store-protocol
Persistent semantic memory across runs, agents, and tasks. First implementation: animus-memory-zep (TypeScript, Zep Cloud).
Kind constant
pub const PLUGIN_KIND_MEMORY_STORE: &str = "memory_store";Methods
pub const METHOD_MEMORY_PUT: &str = "memory/put";
pub const METHOD_MEMORY_GET: &str = "memory/get";
pub const METHOD_MEMORY_QUERY: &str = "memory/query";
pub const METHOD_MEMORY_LIST_SCOPES: &str = "memory/list_scopes";
pub const METHOD_MEMORY_DELETE_SCOPE: &str = "memory/delete_scope";Types
/// Memory scope hierarchy. The scope id is derived by the plugin from these
/// fields as a flat namespace. Project-wide memory: project_id only. Per-agent:
/// project_id + agent_id. Per-task: project_id + agent_id + task_id.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryScope {
pub project_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub agent_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub task_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PutMemoryRequest {
pub scope: MemoryScope,
pub key: String,
pub value: Value,
/// Optional hint; backends MAY ignore. Backends MUST declare
/// `native_ttl` in their capabilities; when false, the TTL is recorded
/// in metadata only and the caller is responsible for eviction.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ttl_secs: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PutMemoryResponse {
pub ack: bool,
/// True if the backend has fully indexed the value and `query` is
/// expected to find it immediately. False when ingestion is async
/// (e.g., Zep). Callers needing read-after-write semantics must
/// surface this. (Fixes codex P2-6.)
pub indexed_immediately: bool,
/// Backend-issued identifier for the stored entry. Useful for delete-by-id
/// or audit. May be empty if the backend doesn't expose stable record ids.
#[serde(default, skip_serializing_if = "String::is_empty")]
pub record_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetMemoryRequest {
pub scope: MemoryScope,
pub key: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetMemoryResponse {
pub found: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub value: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryMemoryRequest {
pub scope: MemoryScope,
pub query: String,
pub top_k: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryMemoryResponse {
pub results: Vec<MemoryQueryResult>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryQueryResult {
pub key: String,
pub value: Value,
pub score: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ListScopesRequest {
/// If set, only return scopes under this project. Else return all scopes.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub project_id: Option<String>,
/// Cursor-based pagination. Set to `None` for the first page.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cursor: Option<String>,
/// Page size. Backends MAY clamp to their per-backend maximum (declared
/// in capabilities). Defaults to 100 if unset.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub page_size: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListScopesResponse {
pub scopes: Vec<MemoryScope>,
/// Cursor for the next page, or `None` if exhausted. (Fixes codex P2-7.)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub next_cursor: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteScopeRequest {
pub scope: MemoryScope,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteScopeResponse {
pub ack: bool,
}Error codes (memory_store)
pub mod error_codes {
/// Scope not found (e.g., delete_scope on unknown id).
pub const SCOPE_NOT_FOUND: i32 = -32301;
/// Memory key not found (memory/get when key doesn't exist).
pub const KEY_NOT_FOUND: i32 = -32302;
/// Two distinct scopes normalize to the same backend graph id.
pub const MEMORY_SCOPE_COLLISION: i32 = -32303;
/// Backend (Zep, etc.) unavailable.
pub const BACKEND_UNAVAILABLE: i32 = -32304;
/// Backend rate-limited the request. Caller should back off.
pub const RATE_LIMITED: i32 = -32305;
/// `top_k` exceeded backend-declared maximum.
pub const QUERY_TOP_K_EXCEEDED: i32 = -32306;
/// Project root mismatch.
pub const PROJECT_BINDING_MISMATCH: i32 = -32307;
}Zep implementation guidance
Use standalone Graphs (not Users) for all scope levels. Map scope to graphId.
ID escaping rule (fixes codex P2-8): all project_id, agent_id, and task_id segments are normalized before interpolation:
fn normalize(segment: &str) -> String {
// 1. Lowercase
// 2. Replace any character not in [a-z0-9_-] with '-'
// 3. Collapse repeated '-' into one
// 4. Trim leading/trailing '-'
// 5. Truncate to 64 chars
// 6. If result is empty after normalization, replace with hex of sha256(input)[0..16]
}The plugin records the un-normalized original in scope metadata so reverse lookup is possible. Two scopes whose normalized graph IDs collide trigger a MEMORY_SCOPE_COLLISION error (see error codes).
| Scope | graphId pattern |
|---|---|
project-wide ({project_id}) | proj_${normalize(project_id)} |
per-agent ({project_id, agent_id}) | proj_${normalize(project_id)}__agent_${normalize(agent_id)} |
per-task ({project_id, agent_id, task_id}) | proj_${normalize(project_id)}__agent_${normalize(agent_id)}__task_${normalize(task_id)} |
| Protocol RPC | Zep operation |
|---|---|
memory/put | graph.create() if not exists (catch 409); then graph.add({ graphId, type: "text", data: JSON.stringify({key, value}), metadata: { key, ttl_s } }) |
memory/get | graph.search({ graphId, query: key, scope: "episodes", limit: 1 }) — return first hit whose metadata.key matches exactly |
memory/query | graph.search({ graphId, query, scope: "edges", limit: top_k, reranker: "rrf" }) |
memory/list_scopes | graph.listAll({ pageSize: page_size ?? 100, cursor }) filtered by id prefix proj_${normalize(project_id)} if provided (use the SAME normalize function from the graphId mapping above — never raw project_id). (Fixes codex round-2 P2-3.) |
memory/delete_scope | graph.delete(graphId) |
Gotchas (from Zep research)
- No native TTL. Backends MUST document this in their manifest capabilities. The
ttl_secsfield is advisory. - No native key-value get. Zep is semantic;
memory/getfalls back tograph.search()and filtering. Document this in plugin README: callers should usequeryfor fuzzy match andgetonly for exact-key recall on small scopes. - Ingestion is async. A
putimmediately followed byquerymay miss. Plugin should document eventual consistency. datamust be a string. Plugin JSON-stringifies on the way in, parses on the way out.- Metadata cap: 10 keys, scalar values only. Plugin enforces or surfaces error.
- Rate limits. Zep Cloud free tier degrades under load. Plugin surfaces 429s up to the daemon for backoff.
- Search hard cap 50 for non-auto scopes. Plugin clamps
top_kand warns if higher requested. userIdandgraphIdare mutually exclusive. Plugin always usesgraphId.
Capability declaration
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct MemoryStoreCapabilities {
/// Whether backend honors `ttl_secs`. Zep: false. SQLite-backed: true.
#[serde(default)]
pub native_ttl: bool,
/// Whether `memory/get` is O(1) exact-key. Zep: false (search-based).
#[serde(default)]
pub native_key_get: bool,
/// Whether put → query is immediately consistent. Zep: false.
#[serde(default)]
pub strong_consistency: bool,
/// Max `top_k` allowed.
#[serde(default)]
pub max_query_top_k: u32,
}Cross-protocol concerns
Subject type ownership — canonical home is animus-subject-protocol
SubjectRef and SubjectDispatch are referenced by animus-workflow-runner-protocol (§1), animus-queue-protocol (§2), and the existing animus-subject-protocol. They MUST have exactly one definition. (Fixes codex round-3 P1-3.)
Decision: animus-subject-protocol is the canonical home. Both new protocol crates depend on animus-subject-protocol for these types. There is no parallel definition.
Consequences for the lift work (Wave 2A):
The current crates/protocol/ crate inside ao-cli also defines SubjectRef and SubjectDispatch. As part of v0.5:
animus-subject-protocolv0.5.0 includes the authoritative type definitions, copied fromprotocol(preserving the existing wire format; this is a re-homing, not a redesign).protocolinsideao-clideprecates its local definitions and re-exports fromanimus-subject-protocol:rustpub use animus_subject_protocol::{SubjectRef, SubjectDispatch};- Wave 2A lifted plugins (workflow_runner_default, queue_default) reference
animus_subject_protocol::SubjectDispatchdirectly, not the (now-deprecated) re-export. The lifted code's existingprotocol::SubjectDispatchreferences are migrated toanimus_subject_protocol::SubjectDispatchduring the lift. - The Wave 3 daemon refactor also migrates remaining ao-cli references off
protocol::SubjectDispatchontoanimus_subject_protocol::SubjectDispatch. Once complete, the deprecated re-export can be removed in a future release.
This is included as a Wave 1 task below.
Plugin-kind constants summary (additions to animus-plugin-protocol)
pub const PLUGIN_KIND_WORKFLOW_RUNNER: &str = "workflow_runner";
pub const PLUGIN_KIND_QUEUE: &str = "queue";
pub const PLUGIN_KIND_DURABLE_STORE: &str = "durable_store";
pub const PLUGIN_KIND_MEMORY_STORE: &str = "memory_store";animus-plugin-protocol version
Bumps from 1.0.0 → 1.1.0. Two additive changes (fixes codex P1-5):
- New plugin-kind constants (above).
InitializeParams.init_extensionsmap — opaque per-extension blobs the host may pass on initialize. Used by v0.5 to sendproject_binding(see Common Conventions). Plugins ignore extensions they don't recognize. Forward-compatible.InitializeResult.capabilitiestyped shape — currently this field exists but is loosely typed. v0.5 standardizes it to:
{
"protocol_version": "1.1.0",
"kinds": ["workflow_runner"], // list of PLUGIN_KIND_* this process implements
"capabilities": {
"workflow_runner": { // keyed by kind
"crate_version": "0.1.0", // the per-kind protocol crate version
"extra": { // backend-specific capability flags (typed by each crate)
"phase_decision_parsing": true,
"rework_context_support": true,
"post_success_actions": true,
"crash_recovery": true,
"manual_pause_support": true
}
}
}
}The daemon enforces:
- Plugin's declared
protocol_versionmajor matches daemon's expected major - Plugin advertises at least one kind the daemon needs
- Plugin's per-kind
crate_versionmajor matches the protocol crate the daemon depends on
Wave 1 MUST include the typed Capabilities struct in animus-plugin-protocol — this is a concrete, testable code change and is captured in the implementation tasks below.
Default flavor manifest additions
The flavors/ directory at the repo root is a new v0.5 addition (it does not exist in v0.4.x). It is owned by ao-cli (the kernel repo); flavor manifests are checked-in TOML files. The Brief A agent creates the directory and the initial default.toml. (Fixes codex P3-3.)
Update flavors/default.toml:
[workflow_runner]
required = ["launchapp-dev/animus-workflow-runner-default"]
[queue]
required = ["launchapp-dev/animus-queue-default"]
# durable_store and memory_store are OPT-IN, not required for the default
# flavor. Users who want crash-safe execution install the DBOS plugin;
# users who want persistent agent memory install the Zep plugin.
[durable_store]
recommended = ["launchapp-dev/animus-step-durable-dbos"]
[memory_store]
recommended = ["launchapp-dev/animus-memory-zep"]v0.5 implementation tasks
These are the concrete code-level tasks for the v0.5 swarm. Each Wave 1 agent owns one protocol crate; each Wave 2 agent owns one plugin implementation.
Wave 1 (in animus-staging/animus-protocol workspace, must complete before Wave 2)
- [ ] Add 4 plugin-kind constants to
animus-plugin-protocol/src/lib.rs; bumpPROTOCOL_VERSIONto"1.1.0" - [ ] In
animus-subject-protocol: add authoritativeSubjectRefandSubjectDispatchdefinitions (copy fromao-cli'scrates/protocol/preserving wire format). Taganimus-subject-protocol v0.5.0 - [ ] Extend
animus-plugin-protocolInitializeParamswithinit_extensions: HashMap<String, Value>for the project-binding extension and forward-compat extension points - [ ] Extend
animus-plugin-protocolInitializeResultwith the typedCapabilitiesstruct (onecrate_version+ per-kindextrablob per kind the plugin implements). AddKindCapabilitytype andCapabilities = HashMap<String, KindCapability> - [ ] Create
animus-workflow-runner-protocol/crate (this spec, §1) — includes typedWorkflowRunnerCapabilitiesfor theextrafield - [ ] Create
animus-queue-protocol/crate (this spec, §2) — includes typedQueueCapabilities(page-size limits, supported status filters) - [ ] Create
animus-durable-store-protocol/crate (this spec, §3) — includes typedDurableStoreCapabilities(default reservation TTL, max payload size, etc.) - [ ] Create
animus-memory-store-protocol/crate (this spec, §4) — includes typedMemoryStoreCapabilities - [ ] Update workspace
Cargo.tomlto include all four new crates - [ ] Update
spec.mdwith the four new kind sections - [ ] Tag the protocol workspace
v0.5.0
Wave 2A — Rust lift-and-shift (after Wave 1)
- [ ]
animus-workflow-runner-defaultplugin repo: liftworkflow-runner-v2per kernel-extraction-v0.5.md - [ ]
animus-queue-defaultplugin repo: lift queue logic per the extraction map
Wave 2B — TS greenfield (parallel with 2A)
- [ ]
animus-step-durable-dbosplugin repo: implement durable_store via DBOS Option A - [ ]
animus-memory-zepplugin repo: implement memory_store via Zep Cloud
Wave 3 — Kernel cleanup (after 2A)
- [ ] Update
ao-clito use the workflow_runner plugin for execution instead of the in-tree crate - [ ] Update
ao-clito use the queue plugin for queue ops - [ ] Update
flavors/default.tomlto include the bundled defaults - [ ] Add
animus flavorCLI subcommand - [ ] Update
docs/reference/cli/index.md
Known limitations in v0.5 (must be documented in plugin READMEs)
- Phase event volatility on crash.
workflow/executereturnsphase_eventsas a final vector. If the workflow_runner plugin crashes mid-execution, in-memory phase events are lost. The daemon detects unfinished runs viadurable_store/recover_in_flight(when a durable_store is installed) and restarts the workflow, but the original event timeline up to the crash is not recoverable from the runner alone. Plugins SHOULD persist critical decision events (Decision,Completed { status: "manual_pending" }) to disk under<project_root>/.animus/workflow-events/<run_id>.jsonlfor post-crash inspection. (Addresses codex P2-1.) memory_storeeventual consistency. Zep ingestion is async;putfollowed immediately byquerymay miss. UsePutMemoryResponse.indexed_immediatelyto detect.durable_storefence is not a transaction across the IPC boundary. A daemon crash between the side effect andcommit_stepleaves the reservation outstanding until TTL; the recovered workflow seesIN_PROGRESSand the caller (daemon) decides retry policy. End-to-end exactly-once is deferred to v0.7+ Option B.
Open questions deferred to v0.6+
- Phase event real-time streaming: v0.5 returns events at workflow end. A
workflow/events/pollcursor method is deferred to v0.6. - Distributed queue:
animus-queue-defaultis in-memory + JSON file. Multi-machine coordination requires a futureanimus-queue-redisoranimus-queue-postgresplugin. - Workflow runner mode = runner: v0.5 ships journal-mode durable_store + in-process workflow_runner. The Option B "runner mode" (DBOS owns the workflow loop) is deferred to v0.7+ and depends on detached agent execution as a prerequisite capability.
- Memory_store consistency: v0.5 documents eventual consistency. A future strong-consistency profile (for backends that support it) may extend the capability declaration.
This document is the source of truth for Wave 1 protocol authoring. Wave 2 agents read this spec plus the kernel-extraction map.