In this article, we will look at SeaStreamer to build a basic streaming system on top of Kafka including both a consumer and a producer. This is useful to build async services that do not block the user experience.
Kafka Setup
In order to run Kafka, we will simply create a `docker-compose.yml` as shown here
---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.2
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
and run it like so:
docker compose up
This sets up a local Kafka instance ready to stream messages.
Producer
Now, let’s use SeaStreamer to create a very simple producer.
Setup a producer crate
To do so, let’s first create a new Rust crate:
cargo new producer
and add the following dependencies to our `Cargo.toml`
[dependencies]
anyhow = { version = "1" }
tokio = { version = "1.10", features = ["full"] }
sea-streamer = { version = "0.2", features = [ "kafka", "socket", "runtime-tokio" ] }
Note that we have enabled the features `kafka` and `socket` for our sea-streamer dependency. Since we are using `tokio` for async APIs, we have also added `runtime-tokio` in the feature set.
Create a streamer
Now, let’s modify our `main.rs` to create an async main that connects to a Kafka stream:
use anyhow::Result;
use sea_streamer::{Producer, SeaStreamer, SeaProducer, StreamKey, Streamer, StreamerUri, export::url::Url};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
let stream_uri = StreamerUri::one(Url::parse("kafka://localhost:9092")?);
let streamer = SeaStreamer::connect(stream_uri, Default::default()).await?;
Ok(())
}
Create a producer
This can be done like so:
#[tokio::main]
async fn main() -> Result<()> {
....
let stream_key = StreamKey::new("hello1".to_string())?;
let producer: SeaProducer = streamer
.create_producer(stream_key, Default::default())
.await?;
Ok(())
}
The code above creates a producer capable of pushing messages to a stream named `hello1`.
Produce some messages
#[tokio::main]
async fn main() -> Result<()> {
....
for tick in 0..100 {
let message = format!(r#""tick {tick}""#);
eprintln!("{message}");
producer.send(message)?;
tokio::time::sleep(Duration::from_secs(1)).await;
}
producer.end().await?;
Ok(())
}
The code above simply sends a message like `"tick #"` each second.
We have now successfully created a Kafka Producer that sends a message every second to a stream.
Consumer
Now, its time to consume these messages!
Create a consumer crate and streamer
Similar to the producer crate above, we create a new one
cargo new consumer
and add the exact same dependencies as above.
Create a streamer
We will also modify `main.rs` as above to create a streamer.
use anyhow::Result;
use sea_streamer::{
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, SeaConsumer, SeaConsumerOptions,
SeaMessage, SeaStreamReset, SeaStreamer, Streamer, StreamKey, StreamerUri, export::url::Url,
};
#[tokio::main]
async fn main() -> Result<()> {
let stream_uri = StreamerUri::one(Url::parse("kafka://localhost:9092")?);
let streamer = SeaStreamer::connect(stream_uri, Default::default()).await?;
Ok(())
}
Create a consumer
Now, let’s setup our consumer to read from `hello1` stream in real-time. This is done like so:
#[tokio::main]
async fn main() -> Result<()> {
...
let stream_key = StreamKey::new("hello1".to_string())?;
let mut options = SeaConsumerOptions::new(ConsumerMode::RealTime);
options.set_auto_stream_reset(SeaStreamReset::Earliest);
let consumer: SeaConsumer = streamer
.create_consumer(&[stream_key], options)
.await?;
Ok(())
}
Consume messages
Finally, let’s set up our loop to process the streamed messages like so:
#[tokio::main]
async fn main() -> Result<()> {
...
loop {
let mess: SeaMessage = consumer.next().await?;
println!("[{}] {}", mess.timestamp(), mess.message().as_str()?);
}
}
If you run the consumer, it will start to print the messages produced above :)