Actors
cross.stream actors use Nushell
closures
to process and act on incoming frames as they are appended to the store.
{ run: {|frame| if $frame.topic == "ping" { "pong" # Will be appended to actor.out } }}The actor closure receives each new frame and can:
- Process the frame’s content
- Return a value (which gets automatically appended to
<actor-name>.out) - Explicitly append new frames using the
.appendcommand - Filter which frames to process using conditionals
Registering
To register an actor, append a registration script with the topic
<actor-name>.register. The script must return a record that configures the
actor’s behavior:
r###'{ # Required: Actor closure run: {|frame| if $frame.topic == "ping" { "pong" # Will be appended to actor.out } }
# Optional: Where to start processing from # "new" (default), "first", or scru128 ID start: "new"
# Optional: Heartbeat interval in ms pulse: 1000
# Optional: Control output frame behavior return_options: { suffix: ".response" # Output topic suffix ttl: "last:1" # Keep only most recent frame }}'### | .append echo.registerThe run closure must accept exactly one positional argument which is the
incoming frame.
The registration script is stored in CAS and evaluated to obtain the actor’s configuration.
Upon a successful start the actor appends a <actor-name>.active frame
with metadata:
actor_id— the ID of the actor instancenew— whether processing started from the end of the topic (new items only)after— the frame ID that processing resumed after (if any)
Configuration Record Fields
| Field | Description |
|---|---|
run | Required actor closure that processes each frame |
start | ”new” (default), “first”, or scru128 ID to control where processing starts |
pulse | Interval in milliseconds to send synthetic xs.pulse events |
return_options | Controls output frames: see Return Options |
Return Options
The return_options field controls how return values are handled:
suffix: String appended to actor’s name for output topic (default: “.out”)ttl: Time-to-live for output frames"forever": Never expire"ephemeral": Not stored; only active subscribers receive it"time:<milliseconds>": Expire after duration"last:<n>": Keep only N most recent frames
Modules
Actors can use modules registered via *.nu topics. An actor sees the modules as they existed when it was registered. See for details.
r###'{ run: {|frame| use xs/my-math my-math double 8 }}'### | .append processor.registerState and Environment
Actors can maintain state using environment variables which persist between calls:
r#'{ run: {|frame| # Initialize or increment counter let env.count = ($env | get -i count | default 0) + 1 $"Processed ($env.count) frames" }}'# | .append counter.registerOutput
Actors can produce output in two ways:
- Return Values: Any non-null return value is automatically appended to the
actor’s output topic (
<actor-name>.outby default unless modified by return_options.suffix)
{|frame| if $frame.topic == "ping" { "pong" # Automatically appended to actor.out }}- Explicit Appends: Use the
.appendcommand to create frames on any topic
{|frame| if $frame.topic == "ping" { "pong" | .append response.topic --meta { "type": "response" } "logged" | .append audit.topic }}All output frames automatically include:
actor_id: ID of the actor that created the frameframe_id: ID of the frame that triggered the actor- Frames with
meta.actor_idequal to the actor’s ID are ignored to avoid reacting to the actor’s own output
Lifecycle
See for all processor suffixes.
Unregistering
An actor can be unregistered by:
- Appending
<actor-name>.unregister - Registering a new actor with the same name
- Runtime errors in the actor closure
When unregistered, the actor appends a confirmation frame
<actor-name>.unregistered. If unregistered due to an error, the frame
includes an error field in its metadata.
Error Handling
If an actor encounters an error during execution:
- The actor is automatically unregistered
- A frame is appended to
<actor-name>.unregisteredwith:- The error message in metadata
- Reference to the triggering frame