Skip to content

Commit 931852e

Browse files
committed
address comments
1 parent dd1aa46 commit 931852e

File tree

1 file changed

+4
-9
lines changed

1 file changed

+4
-9
lines changed

streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@
8585
import java.util.Optional;
8686
import java.util.Set;
8787
import java.util.concurrent.CompletableFuture;
88-
import java.util.concurrent.atomic.AtomicReference;
8988
import java.util.stream.Collectors;
9089

9190
import static java.util.Arrays.asList;
@@ -1999,10 +1998,10 @@ public void shouldComputeOffsetSumFromCheckpointFileForCreatedAndClosedTasks(fin
19991998
.build();
20001999

20012000
final TasksRegistry tasks = mock(TasksRegistry.class);
2002-
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
2003-
20042001
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, task)));
20052002

2003+
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
2004+
20062005
expectLockObtainedFor(taskId00);
20072006
makeTaskFolders(taskId00.toString());
20082007
writeCheckpointFile(taskId00, changelogOffsets);
@@ -4172,9 +4171,6 @@ public void shouldProcessActiveTasks() {
41724171

41734172
@Test
41744173
public void shouldNotFailOnTimeoutException() {
4175-
final AtomicReference<TimeoutException> timeoutException = new AtomicReference<>();
4176-
timeoutException.set(new TimeoutException("Skip me!"));
4177-
41784174
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
41794175
.inState(State.RUNNING)
41804176
.withInputPartitions(taskId00Partitions)
@@ -4195,7 +4191,7 @@ public void shouldNotFailOnTimeoutException() {
41954191
.thenReturn(false);
41964192

41974193
when(task01.process(anyLong()))
4198-
.thenThrow(timeoutException.get()) // throws TimeoutException
4194+
.thenThrow(new TimeoutException("Skip me!")) // throws TimeoutException
41994195
.thenReturn(true)
42004196
.thenReturn(true)
42014197
.thenReturn(false);
@@ -4214,8 +4210,7 @@ public void shouldNotFailOnTimeoutException() {
42144210
assertThat(taskManager.process(1, time), is(2));
42154211
verify(task01).maybeInitTaskTimeoutOrThrow(anyLong(), any(TimeoutException.class));
42164212

4217-
// retry without error - clear the timeout and update the mock
4218-
timeoutException.set(null);
4213+
// retry without error
42194214
assertThat(taskManager.process(1, time), is(3));
42204215
verify(task01).clearTaskTimeout();
42214216

0 commit comments

Comments
 (0)