Skip to content

Implementing Custom Sinks

Sinks are how data leaves a Malstrom computation. This usually means sending or writing data somewhere like a message broker, database or file.

Currently Malstrom supports three types of sinks

  • Kafka compatible message brokers via the malstrom-kafka crate. You can find more information about that here.
  • Printing records to stdout via the built-in StdOutSink
  • Collecting records in a vector via the built-in VecSink

We aim to support more types of sinks in the future, until then you can implement any sink yourself. This guide will teach you how to do that. Make sure to read about Keying and Event Time first, as these concepts are important to fully understand how sinks work.

Custom Stateless Sink

If you read the guide on Custom Sources before, the following will feel very familiar.

As an example we will implement a sink which writes records to a specific file on the local filesystem.

Meaning if we have the keys "foo", "bar" and "baz" our directory should look like this:

/the/directory
  |_ foo.txt
  |_ bar.txt
  |_ baz.txt

and we would like to construct the sink in Rust like this:

rs
StatelessSink::new(FileSink::new("/path/to/the/directory".to_string()))

The StatelessSinkImpl Trait

To turn any type into a source, we need to implement the StatelessSinkImpl trait on it. Example:

rs
/// Write records as lines to a file
struct FileSink {
    directory: String,
}

impl FileSink {
    pub fn new(directory: String) -> Self {
        Self { directory }
    }
}

impl<T> StatelessSinkImpl<String, String, T> for FileSink {
    fn sink(&mut self, msg: DataMessage<String, String, T>) {
        let file_path = format!("{}/{}.txt", self.directory, msg.key);
        // open file in append-mode, creating it if it does not exist
        let mut file = OpenOptions::new()
            .create_new(true)
            .write(true)
            .append(true)
            .open(file_path)
            .unwrap();
        file.write(msg.value.as_bytes()).unwrap();
        file.write(b"\n").unwrap();
    }
}

Let's see what is going on here:

The implementation impl<T> StatelessSinkImpl<String, String, T> for FileSink means we implement FileSink as a sink for any datastream where the records have a Key of type String (the first type parameter) and a value of type String (the second type parameter). The generic T indicates the records may have a timestamp of any type.

For a stateless sink, we only need to implement a single method: fn sink(&mut self, msg: DataMessage<String, String, T>) This method gets called for every record arriving at the sink and we can do with that record as we whish.

And that's it. That is a fully functioning sink!

Full Code

You can find the fully functional, runnable example code below:

Full code
rs
//! Example of a stateless sink writing to files on the local filesystem
use malstrom::{
    keyed::{partitioners::rendezvous_select, KeyDistribute},
    operators::{Map, Sink, Source},
    runtime::SingleThreadRuntime,
    sinks::{StatelessSink, StatelessSinkImpl},
    snapshot::NoPersistence,
    sources::{SingleIteratorSource, StatelessSource},
    types::DataMessage,
    worker::StreamProvider,
};
use std::{fs::OpenOptions, io::Write};
// #region sink_impl
/// Write records as lines to a file
struct FileSink {
    directory: String,
}

impl FileSink {
    pub fn new(directory: String) -> Self {
        Self { directory }
    }
}

impl<T> StatelessSinkImpl<String, String, T> for FileSink {
    fn sink(&mut self, msg: DataMessage<String, String, T>) {
        let file_path = format!("{}/{}.txt", self.directory, msg.key);
        // open file in append-mode, creating it if it does not exist
        let mut file = OpenOptions::new()
            .create_new(true)
            .write(true)
            .append(true)
            .open(file_path)
            .unwrap();
        file.write(msg.value.as_bytes()).unwrap();
        file.write(b"\n").unwrap();
    }
}
// #endregion sink_impl

fn build_dataflow(provider: &mut dyn StreamProvider) {
    std::fs::create_dir_all("/tmp/file-sink").unwrap();
    provider
        .new_stream()
        .source(
            "number",
            StatelessSource::new(SingleIteratorSource::new(0..5)),
        )
        .key_distribute(
            "key-by-value",
            |msg| msg.value.to_string(),
            rendezvous_select,
        )
        .map("int-to-string", |value| value.to_string())
        .sink(
            "file-sink",
            StatelessSink::new(FileSink::new("/tmp/file-sink".to_string())),
        );
}

fn main() {
    SingleThreadRuntime::builder()
        .persistence(NoPersistence)
        .build(build_dataflow)
        .execute()
        .unwrap();
}

Custom Stateful Sinks

The sink we implemented above is not able to retain any state across job restarts or rescalings. Generally data sinks rarely need to be stateful. This is also the reason the example used in this guide may seem a bit contrived, because it is!

We will use the same sink as above, but this time adding line numbers to the files we write. Note that you could achieve the same with a stateful_map before a stateless sink.

The StatefulSinkImpl Trait

Instead of StatelessSinkImpl we must implement StatefulSinkImpl for our sink type:

rs
/// Write records as lines to a file with line numbers
struct FileSink {
    directory: String,
}

impl FileSink {
    pub fn new(directory: String) -> Self {
        Self { directory }
    }
}

impl<T> StatefulSinkImpl<String, String, T> for FileSink {
    type Part = String;
    type PartitionState = usize;
    type SinkPartition = FileSinkPartition;

    fn assign_part(&self, msg: &DataMessage<String, String, T>) -> Self::Part {
        format!("{}/{}.txt", self.directory, msg.key)
    }

    fn build_part(
        &mut self,
        part: &Self::Part,
        part_state: Option<Self::PartitionState>,
    ) -> Self::SinkPartition {
        FileSinkPartition::new(part.clone(), part_state)
    }
}

There is a lot more going on here than in the stateless version. First we must understand that stateful sinks in Malstrom are explicitely partitioned. This means, each sink is really composed of many smaller sinks, it's partitions.

In our case we will create one partition per file we write, as you will see below. Each partition belongs to a Part. You can think of the Part as the name of the partition. Here we use the filepath of the file the partition is writing to as the Part.

Each partition also has a state, the PartitionState. In our example we will keep the next line number to be used in a file as the partition's state, which is of type usize.

For the StatefulSinkImpl trait we must implement two methods:

  • assign_part: This function determines the mapping between a record's key and the partition it will go to. This function must be stable and deterministic. In this example we simply use the key to construct a filepath, which is our target partition's Part
  • build_part: This function constructs partitions and is called everytime we encounter a key for which we do not yet have a partition, when the partition needs to be recreated because its state was moved or when we are restoring the sink from a snapshot.

Next we need to implement the partition itself

The StatefulSinkPartition Trait

The trait StatefulSinkPartition is what our sinks partitions must implement. Let's look at the implementation for our example:

rs
struct FileSinkPartition {
    file: std::fs::File,
    next_line_no: usize,
}
impl FileSinkPartition {
    fn new(file_path: String, next_line_no: Option<usize>) -> Self {
        let file = OpenOptions::new()
            .create_new(true)
            .write(true)
            .append(true)
            .open(file_path)
            .unwrap();
        Self {
            file,
            next_line_no: next_line_no.unwrap_or(0),
        }
    }
}

impl<T> StatefulSinkPartition<String, String, T> for FileSinkPartition {
    type PartitionState = usize;

    fn sink(&mut self, msg: DataMessage<String, String, T>) {
        self.file
            .write(format!("{} ", self.next_line_no).as_bytes())
            .unwrap();
        self.file.write(msg.value.as_bytes()).unwrap();
        self.file.write(b"\n").unwrap();
        self.next_line_no += 1;
    }

    fn snapshot(&self) -> Self::PartitionState {
        self.next_line_no
    }

    fn collect(self) -> Self::PartitionState {
        self.next_line_no
    }
}

What is happening in this code?

  • In the new method, we open or create the file we want to write to and set our internal state, the next line number to use, to either the state passed in or a default of 0 for the first line.
  • type PartitionState = usize as above declares the persistent state of our partition to be of type usize
  • The method sink is called for every record reaching this sink. For every record we first write write the line number followed by space, then the record and then a newline. Finally we increment the line number by one.
  • snapshot gets called by Malstrom when it takes a snapshot for persistence. In this method we must return the state we whish to persist, in our case the next line number to be used.
  • collect gets called by Malstrom when it moves the partition to another worker. Note how collect takes self by value and not by reference, as the partition will be dropped here and recreated on another worker using the returned state.

WARNING

It is very easy to introduce off-by-one errors when implementing snapshot and collect. Think carefully about the state being persisted and how it is used in build_part.

Full code

You can find the full code and usage below.

Full code
rs
//! Example of a stateless sink writing to files on the local filesystem
use malstrom::{
    keyed::{partitioners::rendezvous_select, KeyDistribute},
    operators::{Map, Sink, Source},
    runtime::SingleThreadRuntime,
    sinks::{StatefulSink, StatefulSinkImpl, StatefulSinkPartition},
    snapshot::NoPersistence,
    sources::{SingleIteratorSource, StatelessSource},
    types::DataMessage,
    worker::StreamProvider,
};
use std::{fs::OpenOptions, io::Write};
// #region sink_impl
/// Write records as lines to a file with line numbers
struct FileSink {
    directory: String,
}

impl FileSink {
    pub fn new(directory: String) -> Self {
        Self { directory }
    }
}

impl<T> StatefulSinkImpl<String, String, T> for FileSink {
    type Part = String;
    type PartitionState = usize;
    type SinkPartition = FileSinkPartition;

    fn assign_part(&self, msg: &DataMessage<String, String, T>) -> Self::Part {
        format!("{}/{}.txt", self.directory, msg.key)
    }

    fn build_part(
        &mut self,
        part: &Self::Part,
        part_state: Option<Self::PartitionState>,
    ) -> Self::SinkPartition {
        FileSinkPartition::new(part.clone(), part_state)
    }
}
// #endregion sink_impl

// #region partition
struct FileSinkPartition {
    file: std::fs::File,
    next_line_no: usize,
}
impl FileSinkPartition {
    fn new(file_path: String, next_line_no: Option<usize>) -> Self {
        let file = OpenOptions::new()
            .create_new(true)
            .write(true)
            .append(true)
            .open(file_path)
            .unwrap();
        Self {
            file,
            next_line_no: next_line_no.unwrap_or(0),
        }
    }
}

impl<T> StatefulSinkPartition<String, String, T> for FileSinkPartition {
    type PartitionState = usize;

    fn sink(&mut self, msg: DataMessage<String, String, T>) {
        self.file
            .write(format!("{} ", self.next_line_no).as_bytes())
            .unwrap();
        self.file.write(msg.value.as_bytes()).unwrap();
        self.file.write(b"\n").unwrap();
        self.next_line_no += 1;
    }

    fn snapshot(&self) -> Self::PartitionState {
        self.next_line_no
    }

    fn collect(self) -> Self::PartitionState {
        self.next_line_no
    }
}
// #endregion partition

fn build_dataflow(provider: &mut dyn StreamProvider) {
    std::fs::create_dir_all("/tmp/file-sink").unwrap();
    provider
        .new_stream()
        .source(
            "number",
            StatelessSource::new(SingleIteratorSource::new(0..50)),
        )
        .key_distribute(
            "key-by-mod",
            |msg| (msg.value % 5).to_string(),
            rendezvous_select,
        )
        .map("int-to-string", |value| value.to_string())
        .sink(
            "file-sink",
            StatefulSink::new(FileSink::new("/tmp/file-sink".to_string())),
        );
}

fn main() {
    SingleThreadRuntime::builder()
        .persistence(NoPersistence)
        .build(build_dataflow)
        .execute()
        .unwrap();
}