NATS as backend with Queue package (Connective Technology for Adaptive Edge & Distributed Systems)
setup the nats server
docker run -d --name nats-main -p 4222:4222 -p 8222:8222 nats:latest docker run -d --name nats-slave -p 4223:4222 -p 8223:8222 nats:latestrun the test
go test -v ./...package main import ( "context" "encoding/json" "fmt" "log" "time" "github.com/golang-queue/nats" "github.com/golang-queue/queue" ) type job struct { Message string } func (j *job) Bytes() []byte { b, err := json.Marshal(j) if err != nil { panic(err) } return b } func main() { taskN := 100 rets := make(chan string, taskN) // define the worker w := nats.NewWorker( nats.WithAddr("127.0.0.1:4222"), nats.WithSubj("example"), nats.WithQueue("foobar"), nats.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error { var v *job if err := json.Unmarshal(m.Bytes(), &v); err != nil { return err } rets <- v.Message return nil }), ) // define the queue q, err := queue.NewQueue( queue.WithWorkerCount(10), queue.WithWorker(w), ) if err != nil { log.Fatal(err) } // start the five worker q.Start() // assign tasks in queue for i := 0; i < taskN; i++ { go func(i int) { if err := q.Queue(&job{ Message: fmt.Sprintf("handle the job: %d", i+1), }); err != nil { log.Fatal(err) } }(i) } // wait until all tasks done for i := 0; i < taskN; i++ { fmt.Println("message:", <-rets) time.Sleep(50 * time.Millisecond) } // shutdown the service and notify all the worker q.Release() }