Skip to content

Commit eab300b

Browse files
authored
Switching to use SpanConcurrentMap in the concurrency plugin (#2173)
1 parent 32bd427 commit eab300b

File tree

3 files changed

+61
-17
lines changed

3 files changed

+61
-17
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ exclude known static files in all applications might be changed to `/app/static/
5353
** `DynamicTransformer.Accessor.get().ensureInstrumented` is now `DynamicTransformer.ensureInstrumented` - {pull}#2164[#2164]
5454
* Switching last instrumentations (`trace_methods`, sparkjava, JDK `HttpServer` and Struts 2) to
5555
`TracerAwareInstrumentation` - {pull}#2170[#2170]
56+
* Replace concurrency plugin maps to `SpanConcurrentHashMap` ones - {pull}#2173[#2173]
5657
5758
[[release-notes-1.x]]
5859
=== Java Agent version 1.x

apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/JavaConcurrent.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
*/
1919
package co.elastic.apm.agent.concurrent;
2020

21+
import co.elastic.apm.agent.collections.WeakConcurrentProviderImpl;
2122
import co.elastic.apm.agent.impl.Tracer;
2223
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
2324
import co.elastic.apm.agent.sdk.DynamicTransformer;
2425
import co.elastic.apm.agent.sdk.ElasticApmInstrumentation;
2526
import co.elastic.apm.agent.sdk.state.GlobalState;
26-
import co.elastic.apm.agent.sdk.weakconcurrent.WeakConcurrent;
2727
import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap;
2828

2929
import javax.annotation.Nullable;
@@ -41,7 +41,8 @@
4141
@GlobalState
4242
public class JavaConcurrent {
4343

44-
private static final WeakMap<Object, AbstractSpan<?>> contextMap = WeakConcurrent.buildMap();
44+
private static final WeakMap<Object, AbstractSpan<?>> contextMap = WeakConcurrentProviderImpl.createWeakSpanMap();
45+
4546
private static final List<Class<? extends ElasticApmInstrumentation>> RUNNABLE_CALLABLE_FJTASK_INSTRUMENTATION = Collections.
4647
<Class<? extends ElasticApmInstrumentation>>singletonList(RunnableCallableForkJoinTaskInstrumentation.class);
4748
static final ThreadLocal<Boolean> needsContext = new ThreadLocal<>();
@@ -57,10 +58,7 @@ public class JavaConcurrent {
5758
}
5859

5960
private static void removeContext(Object o) {
60-
AbstractSpan<?> context = contextMap.remove(o);
61-
if (context != null) {
62-
context.decrementReferences();
63-
}
61+
contextMap.remove(o);
6462
}
6563

6664
private static boolean shouldAvoidContextPropagation(@Nullable Object executable) {
@@ -69,21 +67,34 @@ private static boolean shouldAvoidContextPropagation(@Nullable Object executable
6967
needsContext.get() == Boolean.FALSE;
7068
}
7169

70+
/**
71+
* Retrieves the context mapped to the provided task and activates it on the current thread.
72+
* It is the responsibility of the caller to deactivate the returned context at the right time.
73+
* If the mapped context is already the active span of this thread, this method returns {@code null}.
74+
* @param o a task for which running there may be a context to activate
75+
* @param tracer the tracer
76+
* @return the context mapped to the provided task or {@code null} if such does not exist or if the mapped context
77+
* is already the active one on the current thread.
78+
*/
7279
@Nullable
7380
public static AbstractSpan<?> restoreContext(Object o, Tracer tracer) {
7481
// When an Executor executes directly on the current thread we need to enable this thread for context propagation again
7582
needsContext.set(Boolean.TRUE);
76-
AbstractSpan<?> context = contextMap.remove(o);
83+
84+
// we cannot remove yet, as this decrements the reference count, which may cause already ended spans to be recycled ahead of time
85+
AbstractSpan<?> context = contextMap.get(o);
7786
if (context == null) {
7887
return null;
7988
}
80-
if (tracer.getActive() != context) {
81-
context.activate();
82-
context.decrementReferences();
83-
return context;
84-
} else {
85-
context.decrementReferences();
86-
return null;
89+
90+
try {
91+
if (tracer.getActive() != context) {
92+
return context.activate();
93+
} else {
94+
return null;
95+
}
96+
} finally {
97+
contextMap.remove(o);
8798
}
8899
}
89100

@@ -110,7 +121,6 @@ public static Runnable withContext(@Nullable Runnable runnable, Tracer tracer) {
110121
private static void captureContext(Object task, AbstractSpan<?> active) {
111122
DynamicTransformer.ensureInstrumented(task.getClass(), RUNNABLE_CALLABLE_FJTASK_INSTRUMENTATION);
112123
contextMap.put(task, active);
113-
active.incrementReferences();
114124
// Do no discard branches leading to async operations so not to break span references
115125
active.setNonDiscardable();
116126
}

apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ScheduledExecutorServiceTest.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,53 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
5656

5757
@AfterEach
5858
void tearDown() {
59-
assertThat(tracer.getActive()).isEqualTo(transaction);
60-
transaction.deactivate().end();
59+
if (tracer.getActive() != null) {
60+
transaction.deactivate().end();
61+
}
6162
}
6263

6364
@Test
6465
void testScheduleCallable() throws Exception {
6566
final ScheduledFuture<? extends AbstractSpan<?>> future = scheduler.schedule(() -> tracer.getActive(), 0, TimeUnit.SECONDS);
6667
assertThat(future.get()).isEqualTo(transaction);
68+
assertThat(tracer.getActive()).isEqualTo(transaction);
6769
}
6870

6971
@Test
7072
void testScheduleRunnable() throws Exception {
7173
AtomicReference<AbstractSpan<?>> ref = new AtomicReference<>();
7274
scheduler.schedule(() -> ref.set(tracer.getActive()), 0, TimeUnit.SECONDS).get();
7375
assertThat(ref.get()).isEqualTo(transaction);
76+
assertThat(tracer.getActive()).isEqualTo(transaction);
77+
}
78+
79+
@Test
80+
void testScheduleCallable_delayAndEndTransaction() throws Exception {
81+
final ScheduledFuture<? extends AbstractSpan<?>> scheduledFuture = scheduler.schedule(() -> tracer.getActive(), 50, TimeUnit.MILLISECONDS);
82+
verifyEndedTransactionIsStillReferenced(scheduledFuture);
83+
assertThat(scheduledFuture.get()).isEqualTo(transaction);
84+
}
85+
86+
@Test
87+
void testScheduleRunnable_delayAndEndTransaction() throws Exception {
88+
AtomicReference<AbstractSpan<?>> ref = new AtomicReference<>();
89+
ScheduledFuture<?> scheduledTaskFuture = scheduler.schedule(() -> ref.set(tracer.getActive()), 50, TimeUnit.MILLISECONDS);
90+
verifyEndedTransactionIsStillReferenced(scheduledTaskFuture);
91+
assertThat(ref.get()).isEqualTo(transaction);
92+
}
93+
94+
private void verifyEndedTransactionIsStillReferenced(ScheduledFuture<?> scheduledTaskFuture) throws InterruptedException, java.util.concurrent.ExecutionException {
95+
transaction.deactivate().end();
96+
// decrementing references to mock what the real reporter would do
97+
transaction.decrementReferences();
98+
// make sure the transaction is still referenced and not recycled yet
99+
assertThat(transaction.isReferenced()).isTrue();
100+
assertThat(transaction.getTraceContext().getId().isEmpty()).isFalse();
101+
102+
scheduledTaskFuture.get();
103+
104+
// make sure the transaction is recycled after the task has terminated
105+
assertThat(transaction.isReferenced()).isFalse();
106+
assertThat(transaction.getTraceContext().getId().isEmpty()).isTrue();
74107
}
75108
}

0 commit comments

Comments
 (0)