Getting Started
Malstrom can run distributed on many machines in parallel, but getting started locally is just as simple. Malstom's API language is Rust, if you are not familiar with Rust, don't worry, check the Rust language's "Getting started" guide, you don't need to be an expert to write programs with Malstrom.
Installing Malstrom
Create a new Rust app and Install Malstrom using cargo: cargo init --bin && cargo install malstrom
The maltstrom
crate contains the framework core and essentials. We will look at other crates which add more features later.
Your First Program
In our main.rs
file we will want to create two functions:
- A function to build the dataflow graph
- A function to execute the graph
Lets look at the code and then go through it step-by-step
//! A basic example which runs a no-op dataflow
use malstrom::runtime::SingleThreadRuntime;
use malstrom::snapshot::NoPersistence;
use malstrom::worker::StreamProvider;
fn main() {
SingleThreadRuntime::builder()
.persistence(NoPersistence)
.build(build_dataflow)
.execute()
.unwrap()
}
fn build_dataflow(provider: &mut dyn StreamProvider) -> () {
provider.new_stream();
}
Feel free to copy and run this snippet, it does absolutely nothing! Let's go through it:
SingleThreadRuntime::builder
: This creates a runtime for Malstrom. A "Runtime" tells us where the computation will be happening. In this case, all operations will happen in a single thread, but we will learn more about runtimes shortly.
.persistence(NoPersistence)
: Means we do not want to persist any snapshots. You can learn more about persistent state here
fn build_dataflow(provider: &mut dyn StreamProvider)
: This is the function which builds our execution graph. It takes a special reference to the runtime as an input, we call this a "provider". We then use this provider to create datastreams.
provider.new_stream()
: Here we create a new data stream and then... nothing with it. Let's fix that.
Adding Operators
A core concept of Malstrom are [[Operators]]. Operators are the nodes of our execution graph and in the most general sense they do ✨something✨ with our data. Malstrom comes with many pre-built operators (though you can create your own). Let's import them and use the .source
operator to add some data to our program.
//! A basic example which runs a no-op dataflow
use malstrom::operators::*;
use malstrom::runtime::SingleThreadRuntime;
use malstrom::snapshot::NoPersistence;
use malstrom::sources::{SingleIteratorSource, StatelessSource};
use malstrom::worker::StreamProvider;
fn main() {
SingleThreadRuntime::builder()
.persistence(NoPersistence)
.build(build_dataflow)
.execute()
.unwrap()
}
fn build_dataflow(provider: &mut dyn StreamProvider) -> () {
provider
.new_stream()
.source(
// <-- this is an operator
"iter-source",
StatelessSource::new(SingleIteratorSource::new(0..=100)),
)
.map("double", |x| x * 2)
.inspect("print", |x, _| println!("{}", x.value)); // <-- and this too
}
If you now run this code you'll see every second number from 0 to 200 printed to the console. Let's look at what we did:
.source(name, source)
: The source
operator adds data into a stream. It can only be applied to streams which do not yet have a source, see [[Joining Streams]] for how to use multiple sources. The source operator takes a "Source" as an input. In this case we used a stateless source which emits items taken from an iterator.
.map(name, value)
: The map
operator applies a simple transformation to every value.
.inspect(name, value, context)
: This operators allows us to observe values without manipulating them. This is ideal for debugging or exporting metrics. We will learn about the context
argument shortly.
As you can see, all operators must have a unique name. Choosing a good name is important; It will greatly help you with debugging and tracing and is essential for Persistent State
More Power
What if we want to do something more complex than doubling numbers? We will need more power! Luckily going from a single to multiple threads (or machines) is super easy:
//! A multithreaded program
use malstrom::keyed::partitioners::rendezvous_select;
use malstrom::operators::*;
use malstrom::runtime::MultiThreadRuntime;
use malstrom::snapshot::NoPersistence;
use malstrom::sources::{SingleIteratorSource, StatelessSource};
use malstrom::worker::StreamProvider;
fn main() {
MultiThreadRuntime::builder()
.parrallelism(4)
.persistence(NoPersistence)
.build(build_dataflow)
.execute()
.unwrap()
}
fn build_dataflow(provider: &mut dyn StreamProvider) -> () {
provider
.new_stream()
.source(
"iter-source",
StatelessSource::new(SingleIteratorSource::new(0..=100)),
)
.key_distribute("key-by-value", |x| x.value, rendezvous_select)
.map("double", |x| x * 2)
.inspect("print", |x, ctx| {
println!("{x:?} @ Worker {}", ctx.worker_id)
});
}
You will again see all numbers printed, along with the ID of the thread where they where processed (a number between 0 and 3). Let's reflect on the changes we made:
MultiThreadRuntime:::builder().parrallelism(4)
: We swapped the SingleThreadRuntime
for a MultiThreadRuntime
and created it with 4 threads.
.key_distribute("key-by-value", |x| x.value, rendezvous_select)
: This operator distributes our data across multiple workers (i.e. threads). If you want to know more about how this works, check the Keyed Streams documentation.