spring-stream插件发布了

Posted 2024-08-25 05:19:42 ‐ 1 min read

spring-stream插件是实时流式处理消息的编程工具,可以大大简化基于文件、redis stream、kafka的消息处理

下面是一个简单的生产者:

#[auto_config(WebConfigurator)] #[tokio::main] async fn main() {     App::new()         .add_plugin(StreamPlugin)         .add_plugin(WebPlugin)         .run()         .await }  #[get("/")] async fn send_msg(Component(producer): Component<Producer>) -> Result<impl IntoResponse> {     let now = SystemTime::now();     let json = json!({         "success": true,         "msg": format!("This message was sent at {:?}", now),     });     let resp = producer         .send_json("topic", json)         .await         .context("send msg failed")?;      let seq = resp.sequence();     Ok(Json(json!({"seq":seq}))) } 

Producer用于向消息存储发送消息。spring-stream底层使用sea-streamer实现,它抽象了file、stdio、redis、kafka消息存储层,使开发者可以用统一的接口来发送和处理消息。

下面是一个简单的消费者的代码:

#[tokio::main] async fn main() {     App::new()         .add_plugin(StreamPlugin)         .add_consumer(consumers())         .run()         .await }  fn consumers() -> Consumers {     Consumers::new().typed_consumer(listen_topic_do_something) }  #[stream_listener("topic")] async fn listen_topic_do_something(Json(payload): Json<Payload>) {     tracing::info!("{:#?}", payload);     // do something } 

点击这里可以查看相关文档。