Actor State
cross.stream actors use the same closure shape as the streaming form of
Nushell’s generate
command. This tutorial teaches that shape in plain Nushell first, then applies
it on the stream to build a materialized aggregate: a bread and butter of
stream processing.
Prerequisites
Nushellinstalled
Part 1: generate with input
When you pipe a stream into generate, the closure receives two arguments: the
current element and the carried state. It returns a record that controls what
to emit and what state to carry forward.
Running sum
[3, 1, 4, 1, 5] | generate {|n, sum = 0| let sum = $sum + $n {out: $sum, next: $sum}} 0 | 3 1 | 4 2 | 8 3 | 9 4 | 14Each element flows through the closure. out is the value emitted downstream.
next is the state passed to the next invocation. The default parameter
sum = 0 sets the initial state.
Skipping output
Not every element needs to produce output. Return {next: $state} without an
out key to carry state forward silently:
[1, 2, 3, 4, 5, 6] | generate {|n, sum = 0| let sum = $sum + $n if $n mod 2 == 0 { {out: $"sum at ($n): ($sum)", next: $sum} } else { {next: $sum} }} 0 | sum at 2: 3 1 | sum at 4: 10 2 | sum at 6: 21Odd numbers contribute to the sum but produce no output.
Stopping early
Omit the next key (or return nothing) to stop generation:
[1, 2, 3, 4, 5] | generate {|n, sum = 0| let sum = $sum + $n if $sum > 6 { {out: $"stopped at ($sum)"} } else { {out: $sum, next: $sum} }} 0 | 1 1 | 3 2 | 6 3 | stopped at 10The final record has out but no next, so the stream ends.
The contract
| Return | Effect |
|---|---|
{out: v, next: s} | Emit v, continue with state s |
{next: s} | No output, continue with state s |
{out: v} | Emit v, then stop |
| nothing | Stop |
cross.stream actors use this exact contract. Instead of a list, the input stream is frames from the store.
Prerequisites for Part 2
- xs installed and on your PATH (see Installation)
- Two terminal windows, both running
Nushellwithuse xs.nu *
Part 2: Running sum on the stream
Start a store
In terminal 1:
xs serve ./store
Monitor
In terminal 2, start a live monitor:
.cat -f | each { print ($in | table -e) }Keep this running.
Register the actor
Same running sum, now as an actor. The actor sees every frame on the stream, so
it guards on topic first: non-sale frames carry state forward with no output.
Sale frames pull the amount from metadata and add it to the sum:
r#'{ run: {|frame, sum = 0| if $frame.topic != "sale" { return {next: $sum} } let sum = $sum + ($frame.meta.amount | into float) {out: {total: $sum}, next: $sum} } return_options: { suffix: ".total", ttl: "last:1" }}'# | .append revenue.registerThe monitor shows revenue.active. The actor is live.
Append some data
.append sale --meta {amount: 49.99}.append sale --meta {amount: 12.50}.append sale --meta {amount: 7.99}Three revenue.total frames appear. Check the latest:
.last revenue.total | get meta.total70.48
The out record is stored directly in the frame’s metadata, so reading the
total requires no CAS lookup. The last:1 TTL keeps only the most recent
total, giving you a materialized view that’s always queryable.
Frames the actor ignores
Append something on a different topic:
.append heartbeatNo revenue.total frame appears. The actor returned {next: $sum} for the
non-sale frame: state carried forward, no output emitted.
Recap
generate with input | cross.stream actor |
|---|---|
[data] | generate {|el, state| ...} | Frames flow through {|frame, state| ...} |
{out: v, next: s} emits downstream | out record stored as frame metadata |
{next: s} skips | No frame emitted, state carried forward |
Omit next to stop | Actor self-terminates, emits .unregistered |
| Default param sets initial state | Default param or initial config field |