Skip to content

Stateful Programs

"Stateless is useless!" – my boss

Almost any streaming program but the simplest of ETL pipelines must utilise program state. The state may be very simple, like the total count of messages received, or very complex like the current relations in a graph.

Let's see how we can make our program stateful.

rs
//! A stateful program
use malstrom::keyed::partitioners::rendezvous_select;
use malstrom::operators::*;
use malstrom::runtime::SingleThreadRuntime;
use malstrom::snapshot::NoPersistence;
use malstrom::sources::{SingleIteratorSource, StatelessSource};
use malstrom::worker::StreamProvider;
use std::time::Duration;

fn main() {
    SingleThreadRuntime::builder()
        .snapshots(Duration::from_secs(300))
        .persistence(NoPersistence)
        .build(build_dataflow)
        .execute()
        .unwrap()
}

fn build_dataflow(provider: &mut dyn StreamProvider) -> () {
    provider
        .new_stream()
        .source(
            "iter-source",
            StatelessSource::new(SingleIteratorSource::new(0..=100)),
        )
        .key_distribute("key-by-value", |_| 0, rendezvous_select)
        .stateful_map("sum", |_key, value, state: i32| {
            let state = state + value;
            (state.clone(), Some(state))
        })
        .inspect("print", |x, _ctx_| println!("{}", x.value));
}

This program will print the running sum of all numbers from 0 to 100 added up. Let's dissect the stateful_map operator.

rust
.stateful_map("sum", |_key, value, state: i32| {
	let state = state + msg.value;
	(state.clone(), Some(state))
})
  • "sum" is the name of the operator
  • _key is a reference to the key of the message
  • msg is the value of the message to be processed
  • state is the last emitted state of this operator or the default value for state type, if the last emitted state is None

We return a tuple of two values here:

  1. The value of the message being passed downstream
  2. Our new operator state

The emitted state may be Some or None. If we return Some, we will receive the state as an input again on the next invocation, if we return None the state is dropped and the next invocation will receive the default state value.

Keyed State

In Malstrom all state is keyed. This means our state is partitioned by the same properties as the datastream being processed. For every invocation of the stateful function in stateful_map the input state is the state for the key of the message. In the example above we used only a single key, giving us a single state, but this is usually not what you would do in a real application.

Let's look at an example with multiple keys:

rs
//! Example using stateful_map with multiple keys
use malstrom::keyed::partitioners::rendezvous_select;
use malstrom::operators::*;
use malstrom::runtime::SingleThreadRuntime;
use malstrom::sinks::{StatelessSink, StdOutSink};
use malstrom::snapshot::NoPersistence;
use malstrom::sources::{SingleIteratorSource, StatelessSource};
use malstrom::worker::StreamProvider;

fn main() {
    SingleThreadRuntime::builder()
        .persistence(NoPersistence)
        .build(build_dataflow)
        .execute()
        .unwrap();
}

fn build_dataflow(provider: &mut dyn StreamProvider) {
    provider
        .new_stream()
        .source(
            "iter-source",
            StatelessSource::new(SingleIteratorSource::new(0..=100)),
        )
        .key_distribute("key-by-value", |x| x.value & 1 == 1, rendezvous_select)
        .stateful_map("sum", |_key, value, state| {
            let state: i32 = state + value;
            (state, Some(state))
        })
        .sink("stdout", StatelessSink::new(StdOutSink));
}

Now instead of getting a running sum of all numbers, we gut running sums of all even and odd numbers. This is because our state is keyed by the parity of the numbers.

For more information about keying see Keyed Streams.

Persistent State

At runtime Malstrom keeps all state in memory, but what if we want to make our program resilient against machine failures or pause execution for some time and resume where we left off? This is where "persistent" state comes in. Malstrom uses snapshotting for persistence, which means periodically saving the program state to a permanent location. If there is a failure, the program can resume from the last snapshot instead of starting with a blank state.

Persistence Backends

Persistence backends are interfaces to persistent storage where snapshots can be saved. Currently Malstrom comes with two different backends to choose from:

  • NoPersistenceBackend: As the name implies, this is a no-op backend which does not persist state. On program restarts all state is lost. This is useful for tests, stateless programs or programs which do not need to recover state on restarts.
  • SlateDbBackend: This backend is available with the feature slatedb. It uses SlateDB with Object Store as a backend which can save snapshots to either local disk or a cloud store like S3, GCS or Azure Blob.

Let's see how we can make our program state persistent:

First make sure you have SlateDB installed: cargo add malstrom -F slatedb.

rs
//! Using SlateDB as a persistence backend
use malstrom::keyed::partitioners::rendezvous_select;
use malstrom::operators::*;
use malstrom::sinks::{StatelessSink, StdOutSink};
use malstrom::{
    runtime::SingleThreadRuntime,
    snapshot::SlateDbBackend,
    sources::{SingleIteratorSource, StatelessSource},
    worker::StreamProvider,
};
use malstrom::snapshot::slatedb::object_store::{local::LocalFileSystem, path::Path};
use std::sync::Arc;
use std::time::Duration;

fn main() {
    let filesystem = LocalFileSystem::new();
    let persistence = SlateDbBackend::new(Arc::new(filesystem), Path::from("/tmp")).unwrap();

    SingleThreadRuntime::builder()
        .persistence(persistence)
        .snapshots(Duration::from_secs(10))
        .build(build_dataflow)
        .execute()
        .unwrap()
}

fn build_dataflow(provider: &mut dyn StreamProvider) {
    provider
        .new_stream()
        .source(
            "iter-source",
            StatelessSource::new(SingleIteratorSource::new(0..=100)),
        )
        .key_distribute("key-by-value", |x| x.value & 1 == 1, rendezvous_select)
        .stateful_map("sum", |_key, value, state: i32| {
            let state = state + value;
            (state, Some(state))
        })
        .sink("stdout", StatelessSink::new(StdOutSink));
}

Just like this, we have made our programs state persistent. Let's review the changes we introduced:

  • .snapshots(Duration::from_secs(10)): This tells Malstrom to take a state snapshot every 10 seconds. This means, in the worst case, our program will have to re-process 10 seconds of data on restarts.
  • SlateDbBackend::new(...): This is our persistence backend. For this example we save snapshots to a temporary directory.

Unfortunately right now we have too little data, the program will finish before even taking the first snapshot. Let's take more data and introduce some failures:

rs
//! Using SlateDB as a persistence backend
use malstrom::keyed::partitioners::rendezvous_select;
use malstrom::operators::*;
use malstrom::sinks::{StatelessSink, StdOutSink};
use malstrom::sources::{StatefulSource, StatefulSourceImpl, StatefulSourcePartition};
use malstrom::{runtime::SingleThreadRuntime, snapshot::SlateDbBackend, worker::StreamProvider};
use malstrom::snapshot::slatedb::object_store::{local::LocalFileSystem, path::Path};
use std::sync::Arc;
use std::thread::sleep;
use std::time::{Duration, Instant};

fn main() {
    let filesystem = LocalFileSystem::new();
    let persistence = SlateDbBackend::new(Arc::new(filesystem), Path::from("/tmp")).unwrap();

    loop {
        let job = SingleThreadRuntime::builder()
            .persistence(persistence.clone())
            .snapshots(Duration::from_secs(1))
            .build(build_dataflow);
        let thread = std::thread::spawn(move || job.execute().unwrap());
        match thread.join() {
            Ok(_) => return,
            Err(_) => {
                println!("Restarting worker");
                continue;
            }
        }
    }
}

fn build_dataflow(provider: &mut dyn StreamProvider) {
    let start_time = Instant::now();
    let fail_interval = Duration::from_secs(10);
    provider
        .new_stream()
        .source("iter-source", StatefulSource::new(StatefulNumberSource(0)))
        .key_distribute("key-by-value", |x| x.value & 1 == 1, rendezvous_select)
        .stateful_map("sum", |_key, value, state: i32| {
            let state = state + value;
            (state, Some(state))
        })
        .inspect("expensive-operation", |_msg, _ctx| {
            // we need this to not overflow the sum before "crashing"
            sleep(Duration::from_millis(100))
        })
        .inspect("fail-random", move |_msg, _ctx| {
            if Instant::now().duration_since(start_time) > fail_interval {
                panic!("Oh no!")
            }
        })
        .sink("stdout", StatelessSink::new(StdOutSink));
}

struct StatefulNumberSource(i32);

impl StatefulSourceImpl<i32, i32> for StatefulNumberSource {
    type Part = ();
    type PartitionState = i32;
    type SourcePartition = Self;

    fn list_parts(&self) -> Vec<Self::Part> {
        vec![()]
    }

    fn build_part(
        &mut self,
        _part: &Self::Part,
        part_state: Option<Self::PartitionState>,
    ) -> Self::SourcePartition {
        println!("Build with {part_state:?}");
        Self(part_state.unwrap_or_default())
    }
}

impl StatefulSourcePartition<i32, i32> for StatefulNumberSource {
    type PartitionState = i32;

    fn poll(&mut self) -> Option<(i32, i32)> {
        let out = Some((self.0, self.0));
        self.0 += 1;
        out
    }

    fn is_finished(&mut self) -> bool {
        false
    }

    fn snapshot(&self) -> Self::PartitionState {
        println!("SNAPSHOTTING SOURCE");
        self.0
    }

    fn collect(self) -> Self::PartitionState {
        self.0
    }
}

Our program will now "fail" and restart every 10 seconds. You may observe some duplicate outputs, but the running total calculated remains correct, i.e. every integer is added exactly once.

Exactly Once

Malstroms checkpointing system guarantees all datapoints affect the program state exactly once. Notably this means, you will always get the correct end results, but in failure cases messages may be processed and emitted more than once. While this sounds like a weak compromise, it is actually the strongest guarantee any stream processing system offers to date (to the best of the author's knowledge). You can however still make your outputs exactly-once observable by using idempotent output operations or atomic commits.