Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ exclude known static files in all applications might be changed to `/app/static/
===== Features
* Improved capturing of logged exceptions when using Log4j2 - {pull}2139[#2139]
* Update to async-profiler 1.8.7 and set configured `safemode` at load time though a new system property - {pull}2165[#2165]
* Added support to capture `context.message.routing-key` in rabbitmq, spring amqp instrumentations - {pull}1767[#1767]

[float]
===== Performance improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public void recycle(StringBuilder object) {
@Nullable
private StringBuilder body;

@Nullable
private String routingKey;

/**
* Represents the message age in milliseconds. Since 0 is a valid value (can occur due to clock skews between
* sender and receiver) - a negative value represents invalid or unavailable age.
Expand All @@ -72,6 +75,16 @@ public Message withQueue(String queueName) {
return this;
}

public Message withRoutingKey(String routingKey) {
this.routingKey = routingKey;
return this;
}

@Nullable
public String getRoutingKey() {
return routingKey;
}

/**
* Gets a body StringBuilder to write content to. If this message's body is not initializes, this method will
* initialize from the StringBuilder pool.
Expand Down Expand Up @@ -149,6 +162,7 @@ public void resetState() {
stringBuilderPool.recycle(body);
body = null;
}
routingKey = null;
}

public void copyFrom(Message other) {
Expand All @@ -159,5 +173,6 @@ public void copyFrom(Message other) {
}
this.headers.copyFrom(other.getHeaders());
this.age = other.getAge();
this.routingKey = other.getRoutingKey();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,9 @@ private void serializeMessageContext(final Message message) {
jw.writeByte(OBJECT_END);
jw.writeByte(COMMA);
}

if (message.getRoutingKey() != null && !message.getRoutingKey().isEmpty()) {
writeField("routing_key", message.getRoutingKey());
}
writeFieldName("queue");
jw.writeByte(OBJECT_START);
writeLastField("name", message.getQueueName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package co.elastic.apm.agent.impl.context;

import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

class MessageTest {

@Test
void testResetState() {
Message message = createMessage(100L, "test-body", "test-q", "test-*", Map.of("test-header", "test-value"));

message.resetState();

assertThat(message.getAge()).isEqualTo(-1L);
assertThat(message.getQueueName()).isNull();
assertThat(message.getRoutingKey()).isNull();
assertThat(message.getHeaders()).isEmpty();
assertThat(message.getBodyForRead()).isNull();
}

@Test
void testCopyFrom() {
Message firstMessage = createMessage(100L, "test-body", "test-q", "test-*", Map.of("test-header", "test-value"));
Message secondMessage = createMessage(999L, "updated-body", "updated-test-q", "updated-test-*", Map.of("updated-test-header", "updated-test-value"));

firstMessage.copyFrom(secondMessage);

assertThat(firstMessage.getAge()).isEqualTo(999L);
assertThat(firstMessage.getQueueName()).isEqualTo("updated-test-q");
assertThat(firstMessage.getRoutingKey()).isEqualTo("updated-test-*");
Headers firstMessageHeaders = firstMessage.getHeaders();
assertThat(firstMessageHeaders.size()).isEqualTo(1);
Headers.Header firstMessageHeader = firstMessageHeaders.iterator().next();
assertThat(firstMessageHeader.getKey()).isEqualTo("updated-test-header");
assertThat(firstMessageHeader.getValue()).isEqualTo("updated-test-value");
StringBuilder firstMessageBody = firstMessage.getBodyForRead();
assertThat(firstMessageBody).isNotNull();
assertThat(firstMessageBody.toString()).isEqualTo("updated-body");
}

private Message createMessage(long age, String body, String queue, String routingKey, Map<String, String> headers) {
Message message = new Message()
.withAge(age)
.withBody(body)
.withQueue(queue)
.withRoutingKey(routingKey);
for (Map.Entry<String, String> entry : headers.entrySet()) {
message.addHeader(entry.getKey(), entry.getValue());
}
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ void testSpanValidServiceAndAddressResourceSerialization() {
void testSpanMessageContextSerialization() {
Span span = new Span(MockTracer.create());
span.getContext().getMessage()
.withRoutingKey("routing-key")
.withQueue("test-queue")
.withBody("test-body")
.addHeader("text-header", "text-value")
Expand All @@ -561,6 +562,23 @@ void testSpanMessageContextSerialization() {
JsonNode ms = age.get("ms");
assertThat(ms).isNotNull();
assertThat(ms.longValue()).isEqualTo(20);
JsonNode routingKey = message.get("routing_key");
assertThat(routingKey.textValue()).isEqualTo("routing-key");
}

@Test
void testSpanMessageContextSerializationWithoutRoutingKey() {
Span span = new Span(MockTracer.create());
span.getContext().getMessage()
.withQueue("test-queue")
.withBody("test-body")
.addHeader("text-header", "text-value")
.addHeader("binary-header", "binary-value".getBytes(StandardCharsets.UTF_8))
.withAge(20);

JsonNode spanJson = readJsonString(serializer.toJsonString(span));
JsonNode routingKey = spanJson.get("context").get("message").get("routing_key");
assertThat(routingKey).isNull();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ protected static boolean captureHeaderKey(String key) {
* @param context span/transaction context
* @return captured message
*/
protected static Message captureMessage(String queueOrExchange, long age, AbstractSpan<?> context) {
protected static Message captureMessage(String queueOrExchange, @Nullable String routingKey, long age, AbstractSpan<?> context) {
return context.getContext().getMessage()
.withQueue(queueOrExchange)
.withRoutingKey(routingKey)
.withAge(age);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public static class AdviceClass {
@Nullable
public static Object[] onBasicPublish(@Advice.This Channel channel,
@Advice.Argument(0) @Nullable String exchange,
@Advice.Argument(1) @Nullable String routingKey,
@Advice.Argument(4) @Nullable AMQP.BasicProperties properties) {
if (!tracer.isRunning()) {
return null;
Expand All @@ -144,7 +145,7 @@ public static Object[] onBasicPublish(@Advice.This Channel channel,

properties = propagateTraceContext(exitSpan, properties);

captureMessage(exchange, getTimestamp(properties.getTimestamp()), exitSpan);
captureMessage(exchange, routingKey, getTimestamp(properties.getTimestamp()), exitSpan);
Connection connection = channel.getConnection();
RabbitMqHelper.captureDestination(exchange, connection.getAddress(), connection.getPort(), exitSpan);

Expand Down Expand Up @@ -246,7 +247,7 @@ public static void onExit(@Advice.This Channel channel,
span.requestDiscarding();
}

captureMessage(queue, getTimestamp(properties != null ? properties.getTimestamp() : null), span);
captureMessage(queue, envelope != null ? envelope.getRoutingKey() : null, getTimestamp(properties != null ? properties.getTimestamp() : null), span);
Connection connection = channel.getConnection();
RabbitMqHelper.captureDestination(exchange, connection.getAddress(), connection.getPort(), span);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static Object onHandleDelivery(@Advice.Origin Class<?> originClazz,

transaction.setFrameworkName("RabbitMQ");

Message message = captureMessage(exchange, getTimestamp(properties != null ? properties.getTimestamp() : null), transaction);
Message message = captureMessage(exchange, envelope.getRoutingKey(), getTimestamp(properties != null ? properties.getTimestamp() : null), transaction);
// only capture incoming messages headers for now (consistent with other messaging plugins)
if (properties != null) {
captureHeaders(properties.getHeaders(), message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ void testPollingWithinTransactionNoMessage() throws IOException {
reporter.awaitSpanCount(1);

Span pollingSpan = reporter.getFirstSpan();
checkPollSpan(pollingSpan, queueName, "<unknown>");
checkPollSpan(pollingSpan, queueName, "<unknown>", false);
}

@Test
Expand All @@ -340,7 +340,7 @@ void testPollingWithinTransactionGetMessage() throws IOException {
reporter.awaitSpanCount(1);

Span pollingSpan = reporter.getFirstSpan();
checkPollSpan(pollingSpan, queueName, exchange);
checkPollSpan(pollingSpan, queueName, exchange, true);
}


Expand Down Expand Up @@ -617,14 +617,19 @@ static void checkTransaction(Transaction transaction, String exchange, String fr

assertThat(transaction.getOutcome()).isEqualTo(Outcome.SUCCESS);

checkMessage(transaction.getContext().getMessage(), exchange);
checkMessage(transaction.getContext().getMessage(), exchange, true);
}

private static void checkMessage(Message message, String queueName) {
private static void checkMessage(Message message, String queueName, boolean withRoutingKeyCheck) {
assertThat(message.getQueueName()).isEqualTo(queueName);

// RabbitMQ does not provide timestamp by default
assertThat(message.getAge()).isLessThan(0);
if (withRoutingKeyCheck) {
assertThat(message.getRoutingKey()).isNotBlank();
} else {
assertThat(message.getRoutingKey()).isNull();
}
}


Expand Down Expand Up @@ -677,17 +682,19 @@ static void checkSendSpan(Span span, String exchange, String host, int port) {
checkSpanCommon(span,
"send",
String.format("RabbitMQ SEND to %s", exchangeName),
exchangeName
exchangeName,
true
);

checkSpanDestination(span, host, port, String.format("rabbitmq/%s", exchangeName));
}

private static void checkPollSpan(Span span, String queue, String normalizedExchange) {
private static void checkPollSpan(Span span, String queue, String normalizedExchange, boolean withRoutingKeyCheck) {
checkSpanCommon(span,
"poll",
String.format("RabbitMQ POLL from %s", queue),
queue);
queue,
withRoutingKeyCheck);

checkSpanDestination(span,
connection.getAddress().getHostAddress(),
Expand All @@ -696,15 +703,15 @@ private static void checkPollSpan(Span span, String queue, String normalizedExch
);
}

private static void checkSpanCommon(Span span, String expectedAction, String expectedName, String expectedQueueName) {
private static void checkSpanCommon(Span span, String expectedAction, String expectedName, String expectedQueueName, boolean withRoutingKeyCheck) {
assertThat(span.getType()).isEqualTo("messaging");
assertThat(span.getSubtype()).isEqualTo("rabbitmq");
assertThat(span.getAction()).isEqualTo(expectedAction);

assertThat(span.getNameAsString())
.isEqualTo(expectedName);

checkMessage(span.getContext().getMessage(), expectedQueueName);
checkMessage(span.getContext().getMessage(), expectedQueueName, withRoutingKeyCheck);

assertThat(span.getOutcome()).isEqualTo(Outcome.SUCCESS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static Object beforeMessageHandle(@Advice.This MessageListener listener,
transaction.setFrameworkName("Spring AMQP");

long timestamp = getTimestamp(messageProperties.getTimestamp());
co.elastic.apm.agent.impl.context.Message internalMessage = captureMessage(exchangeOrQueue, timestamp, transaction);
co.elastic.apm.agent.impl.context.Message internalMessage = captureMessage(exchangeOrQueue, messageProperties.getReceivedRoutingKey(), timestamp, transaction);
// only capture incoming messages headers for now (consistent with other messaging plugins)
captureHeaders(messageProperties.getHeaders(), internalMessage);
return transaction.activate();
Expand Down