I’m working with BullMQ (v5+) and need to implement a worker to process messages for different chats, with the following requirements:
Rate Limit:
- Each queue should have a rate limit of 1 job per 3 seconds, but rate limits should be specific to each chat / group, not global.
Behavior:
- The worker should process jobs in the order they were added.
- Jobs should be processed with the rate limit in place.
- Each chat / group should have a separate processing order with its own rate limit.
The code below consumes an INCREDIBLE amount of memory (0.5 gb+). I can only have one copy of the application and trying to find a way to optimize the part with JS Maps multiple queues and workers. Seems like there should be another simpler way to implement a chat queue.
const { Queue, Worker } = require('bullmq'); const Redis = require('ioredis'); const redisConnection = { host: 'localhost', port: 6379, maxRetriesPerRequest: null, }; const redis = new Redis(redisConnection); const createQueueName = (group) => `chat_messages__${group}`; class ChatMessageWorker { constructor() { this.queues = new Map(); this.workers = new Map(); } createWorkerForGroup(group) { const queueName = createQueueName(group); const queue = new Queue(queueName, { connection: redis, defaultJobOptions: { removeOnComplete: true, removeOnFail: true, }, }); const worker = new Worker( queueName, async (job) => { const processingTime = new Date().toLocaleTimeString(); console.log(`>>> Processing job ${job.id} \ for group ${group} at ${processingTime} - message sent to chat!`); }, { connection: redis, limiter: { max: 1, duration: 3000, }, } ); worker.on('error', (error) => { console.error(`Worker for group ${group} encountered an error:`, error); }); this.queues.set(group, queue); this.workers.set(group, worker); return queue; } async addJobToGroup(group, data) { const queue = this.queues.get(group) || this.createWorkerForGroup(group); const addTime = new Date().toLocaleTimeString(); const job = await queue.add(group, { ...data, addedAt: addTime, }); console.log(`Job added to group ${group} at ${addTime}`); return job; } async close() { for (const worker of this.workers.values()) { await worker.close(); } for (const queue of this.queues.values()) { await queue.close(); } } } // Example usage async function runChatMessageExample() { const chatMessageWorker = new ChatMessageWorker(); const chatIds = ['chat1', 'chat2', 'chat3']; // Add 5 messages to each chat group for (const chatId of chatIds) { for (let i = 1; i <= 5; i++) { await chatMessageWorker.addJobToGroup(chatId, { message: `Message ${i} for ${chatId}`, }); } } // Optional: Keep the process running to allow workers to process jobs await new Promise(resolve => setTimeout(resolve, 30000)); // Close workers and queues await chatMessageWorker.close(); } runChatMessageExample().catch(console.error);