Async kafka client in pure Rust.
- Multiple async runtime (
tokio,async-std, etc.) - All versions of kafka are supported
- Compression (
gzip,snappy,lz4)
- Producer
- Consumer
- Streams
- Connect
- Admin client
[dependencies] kafkas = { git = "https://github.com/iamazy/kafkas", branch = "main" }To get started using kafkas:
- Producer
#[tokio::main] async fn main() -> Result<(), Box<Error>> { let client = Kafka::new("127.0.0.1:9092", KafkaOptions::default(), TokioExecutor).await?; let producer = Producer::new(client, ProducerOptions::default()).await?; let (mut tx, mut rx) = futures::channel::mpsc::unbounded(); tokio::task::spawn(Box::pin(async move { while let Some(fut) = rx.next().await { if let Err(e) = fut.await { error!("{e}"); } } })); let topic = topic_name("kafka"); for _ in 0..10000_0000 { let record = TestData::new("hello kafka"); let ret = producer.send(&topic, record).await?; let _ = tx.send(ret).await; } }- Consumer
#[tokio::main] async fn main() -> Result<(), Box<Error>> { let client = Kafka::new("127.0.0.1:9092", KafkaOptions::default(), TokioExecutor).await?; let mut consumer_options = ConsumerOptions::new("default"); consumer_options.auto_commit_enabled = false; let mut consumer = Consumer::new(kafka_client, consumer_options).await?; let consume_stream = consumer.subscribe::<&str, ConsumerRecord>(vec!["kafka"]).await?; pin_mut!(consume_stream); while let Some(records) = consume_stream.next().await { for record in records { if let Some(value) = record.value { println!("{:?} - {}", String::from_utf8(value.to_vec())?, record.offset); } } // needed only when `auto_commit_enabled` is false consumer.commit_async().await?; } }Examples can be found in examples.
The rust version used for kafkas development is 1.65.
- kafka-protocol-rs : Rust implementation of the Kafka wire protocol.
- pulsar-rs : Rust Client library for Apache Pulsar
- rskafka : A minimal Rust client for Apache Kafka