I have a linkedblockingqueue where I am seeing duplicate messages when processing the list. Anyone have any idea how duplicates could be produced in using this method? Below is the declaration of the queue and then the run() functions for each the producer and consumer.
When I do this in one thread, I don't get duplicates. Meaning I don't use a queue and just read from the UDP input and directly call ProcessScadaMsg newMessage = new ProcessScadaMsg(byte[]) in a single thread. This was causing issues because UDP messages were coming in too quickly and got missed so I had to split them up into a producer/consumer.
//Declaration public static BlockingQueue<byte[]> UDPMessageQueue = new LinkedBlockingQueue<byte[]>(); // Producer public void run() { DatagramSocket receiveSock = null; // Create socket for receiving data try { receiveSock = new DatagramSocket(port); } catch (SocketException e2) { // TODO Auto-generated catch block errorLog.error("Unable to open socket."); } while (true) { // buffer to receive incoming data byte[] buffer = new byte[DataAdapterFB1.HEADER_SIZE + DataAdapterFB1.MAX_DATA_BYTES]; DatagramPacket incoming = new DatagramPacket(buffer, buffer.length); try { receiveSock.receive(incoming); } catch (IOException e1) { errorLog.fatal("Failed to read from IO port."); System.exit(1); } catch (NullPointerException e2) { errorLog.fatal("IO Port unavailable or in use."); System.exit(1); } DataAdapterFB1.UDPMessageQueue.add(incoming.getData()); } // Consumer public void run() { while (true) { try { ProcessScadaMsg newMessage = new ProcessScadaMsg(DataAdapterFB1.UDPMessageQueue.take()); } catch (InterruptedException e) { errorLog.warn("Queue processing interrupted."); } } }