Skip to content

Commit 535854d

Browse files
fix: removing delivery attempt attribute when dead lettering is not enabled (#72)
* fix: creating fix to not populate delivery attempt attribute when dead lettering is not enabled * Adding unit test for case in which a received message has no delivery attempt
1 parent c618bc8 commit 535854d

File tree

4 files changed

+46
-10
lines changed

4 files changed

+46
-10
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -346,10 +346,16 @@ private void processBatch(List<OutstandingMessage> batch) {
346346
}
347347

348348
private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
349-
return PubsubMessage.newBuilder(receivedMessage.getMessage())
350-
.putAttributes(
351-
"googclient_deliveryattempt", Integer.toString(receivedMessage.getDeliveryAttempt()))
352-
.build();
349+
PubsubMessage originalMessage = receivedMessage.getMessage();
350+
int deliveryAttempt = receivedMessage.getDeliveryAttempt();
351+
// Delivery Attempt will be set to 0 if DeadLetterPolicy is not set on the subscription. In
352+
// this case, do not populate the PubsubMessage with the delivery attempt attribute.
353+
if (deliveryAttempt > 0) {
354+
return PubsubMessage.newBuilder(originalMessage)
355+
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
356+
.build();
357+
}
358+
return originalMessage;
353359
}
354360

355361
private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,11 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver)
207207
}
208208

209209
/** Returns the delivery attempt count for a received {@link PubsubMessage} */
210-
public static int getDeliveryAttempt(PubsubMessage message) {
211-
return Integer.parseInt(message.getAttributesOrDefault("googclient_deliveryattempt", "0"));
210+
public static Integer getDeliveryAttempt(PubsubMessage message) {
211+
if (!message.containsAttributes("googclient_deliveryattempt")) {
212+
return null;
213+
}
214+
return Integer.parseInt(message.getAttributesOrThrow("googclient_deliveryattempt"));
212215
}
213216

214217
/** Subscription which the subscriber is subscribed to. */

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.google.cloud.pubsub.v1;
1818

1919
import static com.google.common.truth.Truth.assertThat;
20+
import static org.junit.Assert.assertFalse;
21+
import static org.junit.Assert.assertTrue;
2022

2123
import com.google.api.gax.batching.FlowControlSettings;
2224
import com.google.api.gax.batching.FlowController;
@@ -59,6 +61,7 @@ public void run() {
5961
private List<ModAckItem> sentModAcks;
6062
private FakeClock clock;
6163
private FlowController flowController;
64+
private boolean messageContainsDeliveryAttempt;
6265

6366
@AutoValue
6467
abstract static class ModAckItem {
@@ -82,8 +85,13 @@ public void setUp() {
8285
@Override
8386
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
8487
assertThat(message.getData()).isEqualTo(MESSAGE_DATA);
85-
assertThat(message.getAttributesOrThrow("googclient_deliveryattempt"))
86-
.isEqualTo(Integer.toString(DELIVERY_INFO_COUNT));
88+
if (messageContainsDeliveryAttempt) {
89+
assertTrue(message.containsAttributes("googclient_deliveryattempt"));
90+
assertThat(message.getAttributesOrThrow("googclient_deliveryattempt"))
91+
.isEqualTo(Integer.toString(DELIVERY_INFO_COUNT));
92+
} else {
93+
assertFalse(message.containsAttributes("googclient_deliveryattempt"));
94+
}
8795
consumers.add(consumer);
8896
}
8997
};
@@ -126,6 +134,8 @@ public void sendAckOperations(
126134
systemExecutor,
127135
clock);
128136
dispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS);
137+
138+
messageContainsDeliveryAttempt = true;
129139
}
130140

131141
@Test
@@ -136,6 +146,22 @@ public void testReceipt() {
136146
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
137147
}
138148

149+
@Test
150+
public void testReceiptNoDeliveryAttempt() {
151+
messageContainsDeliveryAttempt = false;
152+
ReceivedMessage messageNoDeliveryAttempt =
153+
ReceivedMessage.newBuilder()
154+
.setAckId("ackid")
155+
.setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build())
156+
.build();
157+
dispatcher.processReceivedMessages(Collections.singletonList(messageNoDeliveryAttempt));
158+
dispatcher.processOutstandingAckOperations();
159+
assertThat(sentModAcks)
160+
.contains(
161+
ModAckItem.of(
162+
messageNoDeliveryAttempt.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
163+
}
164+
139165
@Test
140166
public void testAck() throws Exception {
141167
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,16 @@ public void tearDown() throws Exception {
8686

8787
@Test
8888
public void testDeliveryAttemptHelper() {
89-
int deliveryAttempt = 3;
89+
Integer deliveryAttempt = 3;
9090
PubsubMessage message =
9191
PubsubMessage.newBuilder()
9292
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
9393
.build();
9494
assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt);
9595

96+
// In the case where delivery attempt attribute is not populated, expect null
9697
PubsubMessage emptyMessage = PubsubMessage.newBuilder().build();
97-
assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), 0);
98+
assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), null);
9899
}
99100

100101
@Test

0 commit comments

Comments
 (0)