Skip to content

Commit 789406b

Browse files
authored
support Spring AMQP's SimpleMessageListenerContainer (#1657)
1 parent f16425b commit 789406b

File tree

37 files changed

+1644
-90
lines changed

37 files changed

+1644
-90
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ to assist with such efforts.
4343
* Adding capability to automatically create ECS-JSON-formatted version of the original application log files, through
4444
the <<config-log-ecs-reformatting>> config option. This allows effortless ingestion of logs to Elasticsearch without
4545
any further configuration. Supports log4j1, log4j2 and Logback. {pull}1261[#1261]
46+
* Add support to Spring AMQP - {pull}1657[#1657]
4647
4748
[float]
4849
===== Bug fixes

apm-agent-core/src/main/java/co/elastic/apm/agent/configuration/MetricsConfiguration.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,27 @@
1+
/*-
2+
* #%L
3+
* Elastic APM Java agent
4+
* %%
5+
* Copyright (C) 2018 - 2021 Elastic and contributors
6+
* %%
7+
* Licensed to Elasticsearch B.V. under one or more contributor
8+
* license agreements. See the NOTICE file distributed with
9+
* this work for additional information regarding copyright
10+
* ownership. Elasticsearch B.V. licenses this file to you under
11+
* the Apache License, Version 2.0 (the "License"); you may
12+
* not use this file except in compliance with the License.
13+
* You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing,
18+
* software distributed under the License is distributed on an
19+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20+
* KIND, either express or implied. See the License for the
21+
* specific language governing permissions and limitations
22+
* under the License.
23+
* #L%
24+
*/
125
package co.elastic.apm.agent.configuration;
226

327
import org.stagemonitor.configuration.ConfigurationOption;
Lines changed: 30 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,20 @@
2727
import co.elastic.apm.agent.bci.TracerAwareInstrumentation;
2828
import co.elastic.apm.agent.configuration.CoreConfiguration;
2929
import co.elastic.apm.agent.configuration.MessagingConfiguration;
30-
import co.elastic.apm.agent.impl.ElasticApmTracer;
3130
import co.elastic.apm.agent.impl.GlobalTracer;
3231
import co.elastic.apm.agent.impl.context.Message;
3332
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
3433
import co.elastic.apm.agent.matcher.WildcardMatcher;
35-
import com.rabbitmq.client.AMQP;
3634

3735
import javax.annotation.Nullable;
38-
import java.util.Collection;
39-
import java.util.Collections;
4036
import java.util.Date;
4137
import java.util.Map;
4238

43-
public abstract class BaseInstrumentation extends TracerAwareInstrumentation {
39+
public abstract class AbstractBaseInstrumentation extends TracerAwareInstrumentation {
4440

4541
private static final CoreConfiguration coreConfiguration = GlobalTracer.requireTracerImpl().getConfig(CoreConfiguration.class);
4642
private static final MessagingConfiguration messagingConfiguration = GlobalTracer.requireTracerImpl().getConfig(MessagingConfiguration.class);
4743

48-
@Override
49-
public Collection<String> getInstrumentationGroupNames() {
50-
return Collections.singletonList("rabbitmq");
51-
}
52-
5344
/**
5445
* @param name name of the exchange or queue
5546
* @return {@literal true} when exchange or queue is ignored, {@literal false otherwise}
@@ -58,26 +49,49 @@ protected static boolean isIgnored(String name) {
5849
return WildcardMatcher.isAnyMatch(messagingConfiguration.getIgnoreMessageQueues(), name);
5950
}
6051

61-
private static boolean isCaptureHeaders() {
52+
protected static boolean isCaptureHeaders() {
6253
return coreConfiguration.isCaptureHeaders();
6354
}
6455

65-
private static boolean captureHeaderKey(String key) {
56+
protected static boolean captureHeaderKey(String key) {
6657
return !WildcardMatcher.isAnyMatch(coreConfiguration.getSanitizeFieldNames(), key);
6758
}
6859

6960
/**
70-
* Captures que name and optional timestamp
61+
* Captures queue name and optional timestamp
7162
*
7263
* @param queueOrExchange queue or exchange name to use in message.queue.name
73-
* @param properties properties (if any)
64+
* @param age age
7465
* @param context span/transaction context
7566
* @return captured message
7667
*/
77-
protected static Message captureMessage(String queueOrExchange, @Nullable AMQP.BasicProperties properties, AbstractSpan<?> context) {
68+
protected static Message captureMessage(String queueOrExchange, long age, AbstractSpan<?> context) {
7869
return context.getContext().getMessage()
7970
.withQueue(queueOrExchange)
80-
.withAge(getTimestamp(properties));
71+
.withAge(age);
72+
}
73+
74+
protected static long getTimestamp(@Nullable Date timestamp) {
75+
long age = -1L;
76+
if (timestamp != null) {
77+
long now = System.currentTimeMillis();
78+
long time = timestamp.getTime();
79+
age = time <= now ? (now - time) : 0;
80+
}
81+
return age;
82+
}
83+
84+
protected static void captureHeaders(@Nullable Map<String, Object> headers, Message message) {
85+
if (!isCaptureHeaders() || headers == null || headers.size() <= 0) {
86+
return;
87+
}
88+
89+
for (Map.Entry<String, Object> entry : headers.entrySet()) {
90+
if (captureHeaderKey(entry.getKey())) {
91+
// headers aren't stored as String instances here
92+
message.addHeader(entry.getKey(), String.valueOf(entry.getValue()));
93+
}
94+
}
8195
}
8296

8397
protected static String normalizeExchangeName(@Nullable String exchange) {
@@ -100,33 +114,4 @@ protected static String normalizeQueueName(@Nullable String queue) {
100114
}
101115
return queue;
102116
}
103-
104-
private static long getTimestamp(@Nullable AMQP.BasicProperties properties) {
105-
long age = -1L;
106-
if (null != properties) {
107-
108-
Date timestamp = properties.getTimestamp();
109-
if (timestamp != null) {
110-
long now = System.currentTimeMillis();
111-
long time = timestamp.getTime();
112-
age = time <= now ? (now - time) : 0;
113-
}
114-
}
115-
return age;
116-
}
117-
118-
protected static void captureHeaders(@Nullable AMQP.BasicProperties properties, Message message) {
119-
Map<String, Object> headers = properties != null ? properties.getHeaders() : null;
120-
if (!isCaptureHeaders() || headers == null || headers.size() <= 0) {
121-
return;
122-
}
123-
124-
for (Map.Entry<String, Object> entry : headers.entrySet()) {
125-
if (captureHeaderKey(entry.getKey())) {
126-
// headers aren't stored as String instances here
127-
message.addHeader(entry.getKey(), String.valueOf(entry.getValue()));
128-
}
129-
}
130-
131-
}
132117
}

apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/ChannelInstrumentation.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
*/
2525
package co.elastic.apm.agent.rabbitmq;
2626

27-
import co.elastic.apm.agent.impl.ElasticApmTracer;
2827
import co.elastic.apm.agent.impl.context.Destination;
2928
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
3029
import co.elastic.apm.agent.impl.transaction.Span;
@@ -63,7 +62,7 @@
6362
/**
6463
* Instruments implementations of {@link com.rabbitmq.client.Channel}
6564
*/
66-
public abstract class ChannelInstrumentation extends BaseInstrumentation {
65+
public abstract class ChannelInstrumentation extends RabbitmqBaseInstrumentation {
6766

6867
@Override
6968
public ElementMatcher<? super NamedElement> getTypeMatcherPreFilter() {
@@ -148,7 +147,7 @@ public static Object[] onBasicPublish(@Advice.This Channel channel,
148147

149148
properties = propagateTraceContext(exitSpan, properties);
150149

151-
captureMessage(exchange, properties, exitSpan);
150+
captureMessage(exchange, getTimestamp(properties.getTimestamp()), exitSpan);
152151
captureDestination(exchange, channel, exitSpan);
153152

154153
return new Object[]{properties, exitSpan};
@@ -247,7 +246,7 @@ public static void onExit(@Advice.This Channel channel,
247246
span.requestDiscarding();
248247
}
249248

250-
captureMessage(queue, properties, span);
249+
captureMessage(queue, getTimestamp(properties != null ? properties.getTimestamp() : null), span);
251250
captureDestination(exchange, channel, span);
252251

253252
span.captureException(thrown)

apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/ConsumerInstrumentation.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@
2424
*/
2525
package co.elastic.apm.agent.rabbitmq;
2626

27-
import co.elastic.apm.agent.impl.ElasticApmTracer;
2827
import co.elastic.apm.agent.impl.context.Message;
2928
import co.elastic.apm.agent.impl.transaction.Transaction;
3029
import co.elastic.apm.agent.rabbitmq.header.RabbitMQTextHeaderGetter;
3130
import com.rabbitmq.client.AMQP;
3231
import com.rabbitmq.client.Consumer;
3332
import com.rabbitmq.client.Envelope;
3433
import net.bytebuddy.asm.Advice;
34+
import net.bytebuddy.description.NamedElement;
3535
import net.bytebuddy.description.method.MethodDescription;
3636
import net.bytebuddy.description.type.TypeDescription;
3737
import net.bytebuddy.matcher.ElementMatcher;
@@ -41,6 +41,7 @@
4141
import static co.elastic.apm.agent.bci.bytebuddy.CustomElementMatchers.classLoaderCanLoadClass;
4242
import static net.bytebuddy.matcher.ElementMatchers.any;
4343
import static net.bytebuddy.matcher.ElementMatchers.isBootstrapClassLoader;
44+
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
4445
import static net.bytebuddy.matcher.ElementMatchers.named;
4546
import static net.bytebuddy.matcher.ElementMatchers.not;
4647

@@ -50,7 +51,13 @@
5051
* <li>{@link com.rabbitmq.client.Consumer#handleDelivery}</li>
5152
* </ul>
5253
*/
53-
public class ConsumerInstrumentation extends BaseInstrumentation {
54+
public class ConsumerInstrumentation extends RabbitmqBaseInstrumentation {
55+
56+
@Override
57+
public ElementMatcher<? super NamedElement> getTypeMatcherPreFilter() {
58+
// Spring RabbitMQ is supported through Spring interfaces, rather than the RabbitMQ API (apm-rabbitmq-spring module)
59+
return not(nameStartsWith("org.springframework."));
60+
}
5461

5562
@Override
5663
public ElementMatcher<? super TypeDescription> getTypeMatcher() {
@@ -88,7 +95,6 @@ public static Object onHandleDelivery(@Advice.Origin Class<?> originClazz,
8895
if (!tracer.isRunning()) {
8996
return null;
9097
}
91-
9298
String exchange = envelope != null ? envelope.getExchange() : null;
9399

94100
if (null == exchange || isIgnored(exchange)) {
@@ -110,13 +116,12 @@ public static Object onHandleDelivery(@Advice.Origin Class<?> originClazz,
110116

111117
transaction.setFrameworkName("RabbitMQ");
112118

113-
Message message = captureMessage(exchange, properties, transaction);
119+
Message message = captureMessage(exchange, getTimestamp(properties != null ? properties.getTimestamp() : null), transaction);
114120
// only capture incoming messages headers for now (consistent with other messaging plugins)
115-
captureHeaders(properties, message);
116-
121+
if (properties != null) {
122+
captureHeaders(properties.getHeaders(), message);
123+
}
117124
return transaction.activate();
118-
119-
120125
}
121126

122127
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*-
2+
* #%L
3+
* Elastic APM Java agent
4+
* %%
5+
* Copyright (C) 2018 - 2020 Elastic and contributors
6+
* %%
7+
* Licensed to Elasticsearch B.V. under one or more contributor
8+
* license agreements. See the NOTICE file distributed with
9+
* this work for additional information regarding copyright
10+
* ownership. Elasticsearch B.V. licenses this file to you under
11+
* the Apache License, Version 2.0 (the "License"); you may
12+
* not use this file except in compliance with the License.
13+
* You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing,
18+
* software distributed under the License is distributed on an
19+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20+
* KIND, either express or implied. See the License for the
21+
* specific language governing permissions and limitations
22+
* under the License.
23+
* #L%
24+
*/
25+
package co.elastic.apm.agent.rabbitmq;
26+
27+
import java.util.Collection;
28+
import java.util.Collections;
29+
30+
public abstract class RabbitmqBaseInstrumentation extends AbstractBaseInstrumentation {
31+
32+
@Override
33+
public Collection<String> getInstrumentationGroupNames() {
34+
return Collections.singletonList("rabbitmq");
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*-
2+
* #%L
3+
* Elastic APM Java agent
4+
* %%
5+
* Copyright (C) 2018 - 2021 Elastic and contributors
6+
* %%
7+
* Licensed to Elasticsearch B.V. under one or more contributor
8+
* license agreements. See the NOTICE file distributed with
9+
* this work for additional information regarding copyright
10+
* ownership. Elasticsearch B.V. licenses this file to you under
11+
* the Apache License, Version 2.0 (the "License"); you may
12+
* not use this file except in compliance with the License.
13+
* You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing,
18+
* software distributed under the License is distributed on an
19+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20+
* KIND, either express or implied. See the License for the
21+
* specific language governing permissions and limitations
22+
* under the License.
23+
* #L%
24+
*/
25+
package co.elastic.apm.agent.rabbitmq.header;
26+
27+
import co.elastic.apm.agent.impl.transaction.TextHeaderGetter;
28+
29+
import javax.annotation.Nullable;
30+
import java.util.Map;
31+
32+
public abstract class AbstractTextHeaderGetter<T> implements TextHeaderGetter<T> {
33+
34+
@Nullable
35+
@Override
36+
public String getFirstHeader(String headerName, T carrier) {
37+
Map<String, Object> headers = getHeaders(carrier);
38+
if (headers == null || headers.isEmpty()) {
39+
return null;
40+
}
41+
Object headerValue = headers.get(headerName);
42+
if (headerValue != null) {
43+
// com.rabbitmq.client.impl.LongStringHelper.ByteArrayLongString
44+
return headerValue.toString();
45+
}
46+
return null;
47+
}
48+
49+
@Override
50+
public <S> void forEach(String headerName, T carrier, S state, HeaderConsumer<String, S> consumer) {
51+
String header = getFirstHeader(headerName, carrier);
52+
if (header != null) {
53+
consumer.accept(header, state);
54+
}
55+
}
56+
57+
@Nullable
58+
protected abstract Map<String, Object> getHeaders(T carrier);
59+
60+
}

apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/header/RabbitMQTextHeaderGetter.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,39 +24,19 @@
2424
*/
2525
package co.elastic.apm.agent.rabbitmq.header;
2626

27-
import co.elastic.apm.agent.impl.transaction.TextHeaderGetter;
2827
import com.rabbitmq.client.AMQP;
2928

30-
import javax.annotation.Nullable;
3129
import java.util.Map;
3230

33-
public class RabbitMQTextHeaderGetter implements TextHeaderGetter<AMQP.BasicProperties> {
31+
public class RabbitMQTextHeaderGetter extends AbstractTextHeaderGetter<AMQP.BasicProperties> {
3432

3533
public static final RabbitMQTextHeaderGetter INSTANCE = new RabbitMQTextHeaderGetter();
3634

3735
private RabbitMQTextHeaderGetter() {
3836
}
3937

40-
@Nullable
4138
@Override
42-
public String getFirstHeader(String headerName, AMQP.BasicProperties carrier) {
43-
Map<String, Object> headers = carrier.getHeaders();
44-
if (headers == null || headers.isEmpty()) {
45-
return null;
46-
}
47-
Object headerValue = headers.get(headerName);
48-
if (headerValue != null) {
49-
// com.rabbitmq.client.impl.LongStringHelper.ByteArrayLongString
50-
return headerValue.toString();
51-
}
52-
return null;
53-
}
54-
55-
@Override
56-
public <S> void forEach(String headerName, AMQP.BasicProperties carrier, S state, HeaderConsumer<String, S> consumer) {
57-
String header = getFirstHeader(headerName, carrier);
58-
if (header != null) {
59-
consumer.accept(header, state);
60-
}
39+
protected Map<String, Object> getHeaders(AMQP.BasicProperties carrier) {
40+
return carrier.getHeaders();
6141
}
6242
}

0 commit comments

Comments
 (0)