Connecting to Kafka
You can use a Kafka broker as a datasource or sink with Malstrom using the malstrom-kafka crate with cargo add malstrom-kafka
. See the API docs and the example below for usage information.
Example
rs
//! A basic example of reading from and writing to Kafka
use malstrom::operators::{Inspect, Sink};
use malstrom::sinks::StatelessSink;
use malstrom::snapshot::NoPersistence;
use malstrom::sources::StatefulSource;
use malstrom::worker::StreamProvider;
use malstrom::{operators::Source, runtime::SingleThreadRuntime};
use malstrom_kafka::{KafkaSink, KafkaSource};
fn main() {
let _rt = SingleThreadRuntime::builder()
.persistence(NoPersistence)
.build(build_dataflow);
}
fn build_dataflow(provider: &mut dyn StreamProvider) -> () {
let source = KafkaSource::builder()
.broker("some-broker-url.com")
.broker("some-other-broker-url.com")
.topic("foobar")
.group_id("my-group")
.auto_offset_reset("latest")
// https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
// for all supported config values
.conf("max.in.flight", "10000")
.build();
let sink = KafkaSink::builder()
.broker("sink-broker.com")
.group_id("my-group")
.build();
provider
.new_stream()
.source("kafka-source", StatefulSource::new(source))
.inspect("print", |msg, _| println!("{msg:?}"))
.sink("kafka-sink", StatelessSink::new(sink));
}