spring-stream插件发布了
Posted 2024-08-25 05:19:42 ‐ 1 min read
spring-stream插件是实时流式处理消息的编程工具,可以大大简化基于文件、redis stream、kafka的消息处理
下面是一个简单的生产者:
#[auto_config(WebConfigurator)] #[tokio::main] async fn main() { App::new() .add_plugin(StreamPlugin) .add_plugin(WebPlugin) .run() .await } #[get("/")] async fn send_msg(Component(producer): Component<Producer>) -> Result<impl IntoResponse> { let now = SystemTime::now(); let json = json!({ "success": true, "msg": format!("This message was sent at {:?}", now), }); let resp = producer .send_json("topic", json) .await .context("send msg failed")?; let seq = resp.sequence(); Ok(Json(json!({"seq":seq}))) } Producer用于向消息存储发送消息。spring-stream底层使用sea-streamer实现,它抽象了file、stdio、redis、kafka消息存储层,使开发者可以用统一的接口来发送和处理消息。
下面是一个简单的消费者的代码:
#[tokio::main] async fn main() { App::new() .add_plugin(StreamPlugin) .add_consumer(consumers()) .run() .await } fn consumers() -> Consumers { Consumers::new().typed_consumer(listen_topic_do_something) } #[stream_listener("topic")] async fn listen_topic_do_something(Json(payload): Json<Payload>) { tracing::info!("{:#?}", payload); // do something } 点击这里可以查看相关文档。