Joining and Splitting Streams
We saw already that a stream has one input and one output, however this does not prevent us from consuming multiple datasources or writing to multiple sinks.
Let's extend the example from the beginning to take multiple inputs:
//! Combining multiple streams
use malstrom::operators::*;
use malstrom::runtime::SingleThreadRuntime;
use malstrom::sinks::{StatelessSink, StdOutSink};
use malstrom::snapshot::NoPersistence;
use malstrom::sources::{SingleIteratorSource, StatelessSource};
use malstrom::worker::StreamProvider;
fn main() {
SingleThreadRuntime::builder()
.persistence(NoPersistence)
.build(build_dataflow)
.execute()
.unwrap()
}
fn build_dataflow(provider: &mut dyn StreamProvider) -> () {
let numbers = provider.new_stream().source(
"iter-source",
StatelessSource::new(SingleIteratorSource::new(0..=100)),
);
let more_numbers = provider.new_stream().source(
"other-iter-source",
StatelessSource::new(SingleIteratorSource::new(0..=100)),
);
numbers
.union([more_numbers].into_iter())
.sink("std-out-sink", StatelessSink::new(StdOutSink));
}
If you run this example, you'll see we get each number twice. The union
operator takes messages from two streams and fuses them into one. Note that this is different from a zip
operation: union
does not necessarily alternate between the left and right stream.
For multiple outputs we can split our stream just as easily:
//! Combining multiple streams
use malstrom::operators::*;
use malstrom::runtime::SingleThreadRuntime;
use malstrom::sinks::{StatelessSink, StdOutSink};
use malstrom::snapshot::NoPersistence;
use malstrom::sources::{SingleIteratorSource, StatelessSource};
use malstrom::worker::StreamProvider;
fn main() {
SingleThreadRuntime::builder()
.persistence(NoPersistence)
.build(build_dataflow)
.execute()
.unwrap()
}
fn build_dataflow(provider: &mut dyn StreamProvider) -> () {
let [numbers, more_numbers] = provider
.new_stream()
.source(
"iter-source",
StatelessSource::new(SingleIteratorSource::new(0..=100)),
)
.const_cloned("clone-values");
numbers.sink("numbers-sink", StatelessSink::new(StdOutSink));
more_numbers.sink("more-numbers-sink", StatelessSink::new(StdOutSink));
}
Here the const_cloned
operator will clone each message into a fixed number of output streams. There is also the cloned
operator, which allows determining the number of output streams at runtime.
If we want to select into which output stream a message goes, we can use the const_split
and split
operators:
//! Combining multiple streams
use malstrom::operators::*;
use malstrom::runtime::SingleThreadRuntime;
use malstrom::sinks::{StatelessSink, StdOutSink};
use malstrom::snapshot::NoPersistence;
use malstrom::sources::{SingleIteratorSource, StatelessSource};
use malstrom::worker::StreamProvider;
fn main() {
SingleThreadRuntime::builder()
.persistence(NoPersistence)
.build(build_dataflow)
.execute()
.unwrap()
}
fn build_dataflow(provider: &mut dyn StreamProvider) -> () {
let [even_nums, all_nums] = provider
.new_stream()
.source(
"iter-source",
StatelessSource::new(SingleIteratorSource::new(0..=100)),
)
.const_split("split-even-odd", |msg, outputs| {
if msg.value & 1 == 0 {
// is even
*outputs = [true, true]
} else {
*outputs = [false, true]
}
});
even_nums.sink("even-sink", StatelessSink::new(StdOutSink));
all_nums.sink("all-sink", StatelessSink::new(StdOutSink));
}
The split
and const_split
operators take a function as a parameter which determines where messages are routed. The function receives a mutable slice of booleans representing the split outputs. By default all slice values are false
. Setting a value to true
will send the message to that output, e.g. the slice [true, false]
sends a message only to the first (left) output, [true, true]
to both and [false, false]
to neither. The message will be cloned as often as necessary.