1

I have a linkedblockingqueue where I am seeing duplicate messages when processing the list. Anyone have any idea how duplicates could be produced in using this method? Below is the declaration of the queue and then the run() functions for each the producer and consumer.

When I do this in one thread, I don't get duplicates. Meaning I don't use a queue and just read from the UDP input and directly call ProcessScadaMsg newMessage = new ProcessScadaMsg(byte[]) in a single thread. This was causing issues because UDP messages were coming in too quickly and got missed so I had to split them up into a producer/consumer.

//Declaration public static BlockingQueue<byte[]> UDPMessageQueue = new LinkedBlockingQueue<byte[]>(); // Producer public void run() { DatagramSocket receiveSock = null; // Create socket for receiving data try { receiveSock = new DatagramSocket(port); } catch (SocketException e2) { // TODO Auto-generated catch block errorLog.error("Unable to open socket."); } while (true) { // buffer to receive incoming data byte[] buffer = new byte[DataAdapterFB1.HEADER_SIZE + DataAdapterFB1.MAX_DATA_BYTES]; DatagramPacket incoming = new DatagramPacket(buffer, buffer.length); try { receiveSock.receive(incoming); } catch (IOException e1) { errorLog.fatal("Failed to read from IO port."); System.exit(1); } catch (NullPointerException e2) { errorLog.fatal("IO Port unavailable or in use."); System.exit(1); } DataAdapterFB1.UDPMessageQueue.add(incoming.getData()); } // Consumer public void run() { while (true) { try { ProcessScadaMsg newMessage = new ProcessScadaMsg(DataAdapterFB1.UDPMessageQueue.take()); } catch (InterruptedException e) { errorLog.warn("Queue processing interrupted."); } } } 

1 Answer 1

1

You are creating a single DatagramPacket object, containing a buffer, then you're receiving multiple times in a loop on that packet. So, suppose you receive two packets back-to-back. When the first is received, the buffer is filled with it, and added as first node to the list. When the second packet is received, it is written in the very same buffer in memory, and such buffer is enqueued in the second node.

So, from the consumer perspective, when the first node is dequeued it will point to the same buffer as the second node, whose content has been overwritten by the second read.

You need to allocate the buffer and DatagramPacket in the loop:

while (true) { // buffer to receive incoming data byte[] buffer = new byte[DataAdapterFB1.HEADER_SIZE + DataAdapterFB1.MAX_DATA_BYTES]; DatagramPacket incoming = new DatagramPacket(buffer, buffer.length); ... 
Sign up to request clarification or add additional context in comments.

11 Comments

Thanks Roberto. I'll have to try this when I get back to the office after the new year.
This worked in fixing the duplicates, but have sufficiently slowed the processing that the queue continually grows and can't be dequeued quickly enough. Any ideas?
Also CPU usage is a concern.
Not sure if you're already doing that, but you want to have more than one thread consuming from the queue, to take advantage of multicore architectures. This won't save CPU for your direct processing, but should make it possible to consume fast enough not to have the queue grow too much; the garbage collector won't kick in as much, and that should indirectly save CPU cycles
I think the queue is fine. You may want to make it bounded (specify a capacity in the constructor) so that you can cause back-pressure to the sender. if the packet size is relatively small, you could also investigate using a single byte[] buffer and slicing it when creating DatagramPacket instances, with the idea here being that there would be more memory locality across subsequent packets, which should use caches more efficiently.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.