Actors
cross.stream actors use Nushell
closures
to process and act on incoming frames as they are appended to the store.
{ run: {|frame, state| if $frame.topic == "ping" { {out: {reply: "pong"}, next: $state} } else { {next: $state} } }}The actor closure receives each new frame and the current state value. It returns a record that controls output and state:
{out: value, next: state}, emitvalue, continue with newstate{next: state}, no output, continue with newstate{out: value}, emitvalue, then self-terminatenull/ nothing, self-terminate, no output
The out value must be a record. It is stored as the output frame’s metadata,
alongside actor_id and frame_id. The output frame is appended to
<actor-name>.out by default. The closure can also explicitly append frames
using the .append command.
Registering
To register an actor, append a registration script with the topic
xs.actor.<name>.create. The script must return a record that configures the
actor’s behavior:
r###'{ # Required: Actor closure (frame, state) -> {out?, next?} run: {|frame, state| if $frame.topic == "ping" { {out: {reply: "pong"}, next: $state} } else { {next: $state} } }
# Optional: Where to start processing from # "new" (default), "first", or scru128 ID start: "new"
# Optional: Heartbeat interval in ms pulse: 1000
# Optional: a topic filter (see Read Options). Omit to receive every # frame. The actor's own lifecycle frames and synthetic xs.pulse / # xs.threshold are always delivered regardless of the filter. topics: ["ping"]
# Optional: Control output frame behavior return_options: { suffix: ".response" # Output topic suffix ttl: "last:1" # Keep only most recent frame }}'### | .append xs.actor.echo.createThe run closure must accept exactly two positional arguments: the incoming
frame and the current state value.
The registration script is stored in CAS and evaluated to obtain the actor’s configuration.
Upon a successful start the actor appends an xs.actor.<name>.active frame
with metadata:
actor_id, the ID of the actor instance (thecreateframe’s id)start, the start configuration:"new","first", or{"after": "<frame-id>"}
Configuration Record Fields
| Field | Description |
|---|---|
run | Required actor closure {|frame, state| -> {out?, next?}} |
initial | Initial state value (default: null, or the closure param default) |
start | Where to start: “new” (default), “first”, or scru128 ID (see Read Options) |
pulse | Heartbeat interval in ms (see Read Options) |
topics | Topic filter: list of strings or one comma-separated string (see Read Options) |
return_options | Controls output frames: see Return Options |
Return Options
The return_options field controls how return values are handled:
suffix: String appended to actor’s name for output topic (default: “.out”)ttl: Time-to-live for output frames (default: “forever”)"forever": Never expire"ephemeral": Not stored; only active subscribers receive it"time:<milliseconds>": Expire after duration"last:<n>": Keep only N most recent frames
target: Storage target for output values- Default (omitted):
outmust be a record; stored as frame metadata "cas":outis stored in CAS; any type is accepted including binary
- Default (omitted):
Modules
Actors can use modules registered via xs.module.<name> topics. An actor sees
the modules as they existed when it was registered. See
Module Topics for details.
r###'{ run: {|frame, state| use my-math {out: {result: (my-math double 8)}, next: $state} }}'### | .append xs.actor.processor.createState
Actors thread state explicitly through the next key in the return record. The
second closure parameter receives the current state, and next sets the state
for the next invocation:
r#'{ run: {|frame, state| let count = $state + 1 {out: {count: $count}, next: $count} } initial: 0}'# | .append xs.actor.counter.createThe initial state is resolved in order:
- The
initialfield in the config record - The default value on the closure’s second parameter (e.g.
state = 0) nullif neither is provided
Output
Actors can produce output in two ways:
- Return Values: The
outrecord is stored as frame metadata and appended to the actor’s output topic (<actor-name>.outby default unless modified byreturn_options.suffix).
{|frame, state| if $frame.topic == "ping" { {out: {reply: "pong"}, next: $state} } else { {next: $state} }}- Explicit Appends: Use the
.appendcommand to create frames on any topic.
{|frame, state| if $frame.topic == "ping" { "pong" | .append response.topic --meta { "type": "response" } "logged" | .append audit.topic {next: $state} } else { {next: $state} }}All output frames automatically include:
actor_id: ID of the actor that created the frameframe_id: ID of the frame that triggered the actor- Frames with
meta.actor_idequal to the actor’s ID are ignored to avoid reacting to the actor’s own output.
Built-in Store Commands
Inside the closure an actor has the same store helper commands as actions and services:
.append, append a new frame (merged withactor_idandframe_id)..cat, read frames from the store..last, fetch the most recent frame(s), optionally filtered by topic..cas, read content from CAS by hash..cas-post, write content to CAS and return its hash..get, retrieve a frame by ID..remove, delete a frame from the stream..import, insert a frame verbatim, preserving its ID.
Lifecycle
Actors share the unified lifecycle vocabulary documented in the lifecycle reference. In summary:
| User input | Runtime ack on success | Runtime ack on failure |
|---|---|---|
xs.actor.<name>.create | xs.actor.<name>.active | xs.actor.<name>.invalid |
xs.actor.<name>.term | xs.actor.<name>.fin.term | , |
Plus the runtime emits:
xs.actor.<name>.fin.okwhen the actor self-terminates (returns nonext).xs.actor.<name>.fin.erroron a runtime crash.xs.actor.<name>.replacedwhen a newerxs.actor.<name>.createdisplaces it.
See the lifecycle reference for the full event list, the compaction algorithm, and the invariants the dispatcher honors.