Actors
cross.stream actors use Nushell
closures
to process and act on incoming frames as they are appended to the store.
{ run: {|frame, state| if $frame.topic == "ping" { {out: {reply: "pong"}, next: $state} } else { {next: $state} } }}The actor closure receives each new frame and the current state value. It returns a record that controls output and state:
{out: value, next: state}— emitvalue, continue with newstate{next: state}— no output, continue with newstate{out: value}— emitvalue, then self-terminatenull/ nothing — self-terminate, no output
The out value must be a record. It is stored as the output frame’s metadata,
alongside actor_id and frame_id. The output frame is appended to
<actor-name>.out by default. The closure can also explicitly append frames
using the .append command.
Registering
To register an actor, append a registration script with the topic
<actor-name>.register. The script must return a record that configures the
actor’s behavior:
r###'{ # Required: Actor closure (frame, state) -> {out?, next?} run: {|frame, state| if $frame.topic == "ping" { {out: {reply: "pong"}, next: $state} } else { {next: $state} } }
# Optional: Where to start processing from # "new" (default), "first", or scru128 ID start: "new"
# Optional: Heartbeat interval in ms pulse: 1000
# Optional: Control output frame behavior return_options: { suffix: ".response" # Output topic suffix ttl: "last:1" # Keep only most recent frame }}'### | .append echo.registerThe run closure must accept exactly two positional arguments: the incoming
frame and the current state value.
The registration script is stored in CAS and evaluated to obtain the actor’s configuration.
Upon a successful start the actor appends a <actor-name>.active frame
with metadata:
actor_id— the ID of the actor instancestart— the start configuration:"new","first", or{"after": "<frame-id>"}
Configuration Record Fields
| Field | Description |
|---|---|
run | Required actor closure {|frame, state| -> {out?, next?}} |
initial | Initial state value (default: null, or the closure param default) |
start | ”new” (default), “first”, or scru128 ID to control where processing starts |
pulse | Interval in milliseconds to send synthetic xs.pulse events |
return_options | Controls output frames: see Return Options |
Return Options
The return_options field controls how return values are handled:
suffix: String appended to actor’s name for output topic (default: “.out”)ttl: Time-to-live for output frames (default: “forever”)"forever": Never expire"ephemeral": Not stored; only active subscribers receive it"time:<milliseconds>": Expire after duration"last:<n>": Keep only N most recent frames
target: Storage target for output values- Default (omitted):
outmust be a record; stored as frame metadata "cas":outis stored in CAS; any type is accepted including binary
- Default (omitted):
Modules
Actors can use modules registered via *.nu topics. An actor sees the modules as they existed when it was registered. See for details.
r###'{ run: {|frame, state| use my-math {out: {result: (my-math double 8)}, next: $state} }}'### | .append processor.registerState
Actors thread state explicitly through the next key in the return record. The
second closure parameter receives the current state, and next sets the state
for the next invocation:
r#'{ run: {|frame, state| let count = $state + 1 {out: {count: $count}, next: $count} } initial: 0}'# | .append counter.registerThe initial state is resolved in order:
- The
initialfield in the config record - The default value on the closure’s second parameter (e.g.
state = 0) nullif neither is provided
Output
Actors can produce output in two ways:
- Return Values: The
outrecord is stored as frame metadata and appended to the actor’s output topic (<actor-name>.outby default unless modified by return_options.suffix)
{|frame, state| if $frame.topic == "ping" { {out: {reply: "pong"}, next: $state} } else { {next: $state} }}- Explicit Appends: Use the
.appendcommand to create frames on any topic
{|frame, state| if $frame.topic == "ping" { "pong" | .append response.topic --meta { "type": "response" } "logged" | .append audit.topic {next: $state} } else { {next: $state} }}All output frames automatically include:
actor_id: ID of the actor that created the frameframe_id: ID of the frame that triggered the actor- Frames with
meta.actor_idequal to the actor’s ID are ignored to avoid reacting to the actor’s own output
Lifecycle
See for all processor suffixes.
Unregistering
An actor can be unregistered by:
- Appending
<actor-name>.unregister - Registering a new actor with the same name
- Self-termination: returning
{out: value},{}, ornull(nonextkey) - Runtime errors in the actor closure
When unregistered, the actor appends a confirmation frame
<actor-name>.unregistered. If unregistered due to an error, the frame
includes an error field in its metadata.
Error Handling
If an actor encounters an error during execution:
- The actor is automatically unregistered
- A frame is appended to
<actor-name>.unregisteredwith:- The error message in metadata
- Reference to the triggering frame