0

I am developing an Android application that publishes ECG packets using the Java HiveMQ MQTT Client library to a HiveMQ broker. The application works as follows:

The user enters a userId(topic), and upon clicking a Start Publish button, a scheduled task generates data packets every 900 ms and adds them to a queue. After adding a packet to the queue, the task calls the publishAllQueuedPackets method, which publishes the queued packets with QoS 1 via the Mqtt5AsyncClient. Finally, the queue is cleared after publishing.

However, I am facing two recurring issues:

  1. Repeated Publishing of Previously Sent Packets: Some previously sent queued packets seem to be resent multiple times (20–40 times). I suspect this could be due to either mishandling of the queue (e.g., the previous publish operation takes time, causing the queued messages not to be cleared and subsequently being considered in the next publishing cycle, effectively publishing some packets twice) or an asynchronous issue with the publishAllQueuedPackets method. This issue is primarily observed when the client disconnects and reconnects.
  2. Intermittent Stopping of Publishing Despite Stable Connectivity: Occasionally, even when the client is connected to a stable internet connection, the publishing process pauses and then resumes randomly after some time.

Note: My implementation for publishing packets may not follow best practices, as I am a beginner in MQTT and Android development. I am open to any suggestions for improvement. Also, please note that in-order publishing of packets to the broker is important.

Below is my full code for reference:

package com.example.hivemqpublisher; import android.annotation.SuppressLint; import android.os.Bundle; import android.text.TextUtils; import android.util.Log; import android.view.View; import android.widget.Button; import android.widget.EditText; import android.widget.TextView; import androidx.appcompat.app.AppCompatActivity; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; import org.json.JSONArray; import org.json.JSONObject; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class MainActivity extends AppCompatActivity { private static final String TAG = "HiveMQPublisher"; private static final double[] DATA = {0.0, -0.004, /* ... 2500 data points ... */}; private static final int PACKET_SIZE = 125; private static final long GENERATE_INTERVAL = 900L; // 900ms private Mqtt5AsyncClient mqttClient; private ScheduledExecutorService scheduler; private int packetNumber = 0; private Queue<JSONObject> packetQueue = new ConcurrentLinkedQueue<>(); private EditText userIdEditText; private Button startButton, stopButton; private TextView clientStatusTextView, lastPublishedTextView, queueEndTextView; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); userIdEditText = findViewById(R.id.userIdEditText); startButton = findViewById(R.id.startPublishingButton); stopButton = findViewById(R.id.stopPublishingButton); clientStatusTextView = findViewById(R.id.clientStatusTextView); lastPublishedTextView = findViewById(R.id.lastPublishedPacketTextView); queueEndTextView = findViewById(R.id.queueEndPacketTextView); startButton.setOnClickListener(v -> startPublishing()); stopButton.setOnClickListener(v -> stopPublishing()); Log.d(TAG, "onCreate: UI initialized."); } @SuppressLint("SetTextI18n") private void startPublishing() { Log.d(TAG, "startPublishing: Attempting to start publishing."); String userId = userIdEditText.getText().toString().trim(); if (TextUtils.isEmpty(userId)) { clientStatusTextView.setText("User ID is required!"); Log.e(TAG, "startPublishing: User ID is empty."); return; } userIdEditText.setEnabled(false); String topic = "hivemq/ff/" + userId; // Create MQTT Client Log.d(TAG, "startPublishing: Creating MQTT client."); mqttClient = Mqtt5Client.builder() .serverHost("broker.hivemq.com") .identifier("android_ecg_publisher_" + userId) .serverPort(8883) .sslWithDefaultConfig() .automaticReconnect() .initialDelay(1, TimeUnit.SECONDS) .maxDelay(10, TimeUnit.SECONDS) .applyAutomaticReconnect() .buildAsync(); // Connect to Broker mqttClient.connectWith() .cleanStart(true) .keepAlive(1) .sessionExpiryInterval(300) .send() .thenAccept(connAck -> { Log.i(TAG, "Connected to MQTT Broker successfully."); runOnUiThread(() -> clientStatusTextView.setText("Connected to MQTT Broker")); startScheduler(topic); }) .exceptionally(throwable -> { Log.e(TAG, "Failed to connect to MQTT Broker: " + throwable.getMessage(), throwable); runOnUiThread(() -> clientStatusTextView.setText("Failed to connect: " + throwable.getMessage())); return null; }); startButton.setVisibility(View.GONE); stopButton.setVisibility(View.VISIBLE); } @SuppressLint("SetTextI18n") private void stopPublishing() { Log.d(TAG, "stopPublishing: Attempting to stop publishing."); if (scheduler != null) { scheduler.shutdownNow(); Log.d(TAG, "stopPublishing: Scheduler shut down."); } packetQueue.clear(); // Clear the queue to reset Log.d(TAG, "stopPublishing: Packet queue cleared."); if (mqttClient != null && mqttClient.getState().isConnected()) { mqttClient.disconnect() .thenRun(() -> Log.i(TAG, "MQTT Client disconnected")) .exceptionally(throwable -> { Log.e(TAG, "Failed to disconnect MQTT client: " + throwable.getMessage(), throwable); return null; }); } packetNumber = 1; // Reset the packet number runOnUiThread(() -> { startButton.setVisibility(View.VISIBLE); stopButton.setVisibility(View.GONE); clientStatusTextView.setText("Publishing stopped"); lastPublishedTextView.setText("Last Published Packet No: 0"); queueEndTextView.setText("Queue End Packet No: 0"); }); } private void startScheduler(String topic) { Log.d(TAG, "startScheduler: Starting scheduler for packet generation."); scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleWithFixedDelay(() -> generatePacket(topic), 0, GENERATE_INTERVAL, TimeUnit.MILLISECONDS); } private void generatePacket(String topic) { try { Log.d(TAG, "generatePacket: Preparing data for packetNumber: " + packetNumber); int startIdx = (packetNumber * PACKET_SIZE) % DATA.length; // Prepare Data Packet JSONArray jsonArray = new JSONArray(); for (int i = startIdx; i < startIdx + PACKET_SIZE; i++) { jsonArray.put(DATA[i % DATA.length]); } JSONObject packetJson = new JSONObject(); packetJson.put("type", "livecg-data"); packetJson.put("data", jsonArray); packetJson.put("packetNo", packetNumber); packetJson.put("Timestamp", System.currentTimeMillis()); packetQueue.add(packetJson); Log.d(TAG, "generatePacket: Packet added to queue: " + packetNumber); packetNumber++; runOnUiThread(() -> queueEndTextView.setText("Queue End Packet No: " + (packetNumber - 1))); publishAllQueuedPackets(topic); } catch (Exception e) { Log.e(TAG, "generatePacket: Error preparing packet: " + e.getMessage(), e); } } private void publishAllQueuedPackets(String topic) { if (packetQueue.isEmpty()) { return; // Nothing to publish } JSONArray batch = new JSONArray(); AtomicInteger lastPacketNumber = new AtomicInteger(-1); // Atomic wrapper for lambda compatibility // Build the batch and keep track of the last packet number for (JSONObject packet : packetQueue) { batch.put(packet); lastPacketNumber.set(packet.optInt("packetNo", -1)); // Update the atomic variable } mqttClient.publishWith() .topic(topic) .qos(MqttQos.AT_LEAST_ONCE) .payload(batch.toString().getBytes()) .send() .thenAccept(publishResult -> { int finalPacketNumber = lastPacketNumber.get(); // Get the last packet number Log.i(TAG, "publishAllQueuedPackets: Published batch successfully. Last packet: " + finalPacketNumber); // Update UI with the last published packet runOnUiThread(() -> { lastPublishedTextView.setText("Last Published Packet No: " + finalPacketNumber); }); // Clear the queue after successful publish packetQueue.clear(); }) .exceptionally(throwable -> { Log.e(TAG, "publishAllQueuedPackets: Failed to publish batch: " + throwable.getMessage(), throwable); // Retain the queue for retrying later return null; }); } } 

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.