Skip to content

Actor State

cross.stream actors use the same closure shape as the streaming form of Nushell’s generate command. This tutorial teaches that shape in plain Nushell first, then applies it on the stream to build a materialized aggregate: a bread and butter of stream processing.

Prerequisites

Part 1: generate with input

When you pipe a stream into generate, the closure receives two arguments: the current element and the carried state. It returns a record that controls what to emit and what state to carry forward.

Running sum

Terminal window
[3, 1, 4, 1, 5] | generate {|n, sum = 0|
let sum = $sum + $n
{out: $sum, next: $sum}
}
0 | 3
1 | 4
2 | 8
3 | 9
4 | 14

Each element flows through the closure. out is the value emitted downstream. next is the state passed to the next invocation. The default parameter sum = 0 sets the initial state.

Skipping output

Not every element needs to produce output. Return {next: $state} without an out key to carry state forward silently:

Terminal window
[1, 2, 3, 4, 5, 6] | generate {|n, sum = 0|
let sum = $sum + $n
if $n mod 2 == 0 {
{out: $"sum at ($n): ($sum)", next: $sum}
} else {
{next: $sum}
}
}
0 | sum at 2: 3
1 | sum at 4: 10
2 | sum at 6: 21

Odd numbers contribute to the sum but produce no output.

Stopping early

Omit the next key (or return nothing) to stop generation:

Terminal window
[1, 2, 3, 4, 5] | generate {|n, sum = 0|
let sum = $sum + $n
if $sum > 6 {
{out: $"stopped at ($sum)"}
} else {
{out: $sum, next: $sum}
}
}
0 | 1
1 | 3
2 | 6
3 | stopped at 10

The final record has out but no next, so the stream ends.

The contract

ReturnEffect
{out: v, next: s}Emit v, continue with state s
{next: s}No output, continue with state s
{out: v}Emit v, then stop
nothingStop

cross.stream actors use this exact contract. Instead of a list, the input stream is frames from the store.

Prerequisites for Part 2

  • xs installed and on your PATH (see Installation)
  • Two terminal windows, both running Nushell with use xs.nu *

Part 2: Running sum on the stream

Start a store

In terminal 1:

Terminal window
xs serve ./store

Monitor

In terminal 2, start a live monitor:

Terminal window
.cat -f | each { print ($in | table -e) }

Keep this running.

Register the actor

Same running sum, now as an actor. The actor sees every frame on the stream, so it guards on topic first: non-sale frames carry state forward with no output. Sale frames pull the amount from metadata and add it to the sum:

Terminal window
r#'{
run: {|frame, sum = 0|
if $frame.topic != "sale" { return {next: $sum} }
let sum = $sum + ($frame.meta.amount | into float)
{out: {total: $sum}, next: $sum}
}
return_options: { suffix: ".total", ttl: "last:1" }
}'# | .append revenue.register

The monitor shows revenue.active. The actor is live.

Append some data

Terminal window
.append sale --meta {amount: 49.99}
.append sale --meta {amount: 12.50}
.append sale --meta {amount: 7.99}

Three revenue.total frames appear. Check the latest:

Terminal window
.last revenue.total | get meta.total
70.48

The out record is stored directly in the frame’s metadata, so reading the total requires no CAS lookup. The last:1 TTL keeps only the most recent total, giving you a materialized view that’s always queryable.

Frames the actor ignores

Append something on a different topic:

Terminal window
.append heartbeat

No revenue.total frame appears. The actor returned {next: $sum} for the non-sale frame: state carried forward, no output emitted.

Recap

generate with inputcross.stream actor
[data] | generate {|el, state| ...}Frames flow through {|frame, state| ...}
{out: v, next: s} emits downstreamout record stored as frame metadata
{next: s} skipsNo frame emitted, state carried forward
Omit next to stopActor self-terminates, emits .unregistered
Default param sets initial stateDefault param or initial config field