spring-stream plugin released

Posted 2024-08-25 05:19:42 ‐ 1 min read

The spring-stream plugin is a programming tool for real-time streaming message processing, which can greatly simplify message processing based on files, redis stream, and kafka.

Here is a simple producer:

#[auto_config(WebConfigurator)] #[tokio::main] async fn main() {     App::new()         .add_plugin(StreamPlugin)         .add_plugin(WebPlugin)         .run()         .await }  #[get("/")] async fn send_msg(Component(producer): Component<Producer>) -> Result<impl IntoResponse> {     let now = SystemTime::now();     let json = json!({         "success": true,         "msg": format!("This message was sent at {:?}", now),     });     let resp = producer         .send_json("topic", json)         .await         .context("send msg failed")?;      let seq = resp.sequence();     Ok(Json(json!({"seq":seq}))) } 

Producer is used to send messages to the message store. Spring-stream is implemented using sea-streamer at the bottom layer, which abstracts file, stdio, redis, and kafka The message storage layer allows developers to send and process messages using a unified interface.

Here is a simple consumer code:

#[tokio::main] async fn main() {     App::new()         .add_plugin(StreamPlugin)         .add_consumer(consumers())         .run()         .await }  fn consumers() -> Consumers {     Consumers::new().typed_consumer(listen_topic_do_something) }  #[stream_listener("topic")] async fn listen_topic_do_something(Json(payload): Json<Payload>) {     tracing::info!("{:#?}", payload);     // do something } 

Click here to view the relevant documentation.