- Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19683: Remove more dead tests and rewrote 3 tests in TaskManagerTest [2/N] #20544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| .withInputPartitions(taskId00Partitions).build(); | ||
| final TasksRegistry tasks = mock(TasksRegistry.class); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These lines added are all temporary. Once we rewrite all the tests, we can do this once in the setUp()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly. these changes to setUpTaskManager are quite confusing and I don't understand why you did it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, it is definitely a bit confusing 😅
The reason I did this is because, I wanted to identify all the tests that would fail after we removed the stateUpdaterEnabled flag. I thought the safest way to rewrite these tests incrementally would be to add another overloaded method without the flag, so we don’t break the CI checks in the meantime. This would temporarily add in a lot of unnecessary code, but my plan was to clean it up once all the tests are updated.
Do you think this approach make sense? I would really appreciate your thoughts, and I’m open to any suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is confusing. Maybe you want to rename it more explicitly (setUpTaskManagerWithStateUpdater or setUpTaskManagerWithoutStateUpdater)?
| public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { | ||
| final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); | ||
| | ||
| final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); | ||
| final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); | ||
| public void shouldNotCommitCorruptedTasksOnTaskCorruptedException() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this test from shouldCommitNonCorruptedTasksOnTaskCorruptedException. Based on my understanding, the commit logic happens at the StreamThread level, but only the exception propagation happens in TaskManager with checkStateUpdater. So I decided to omit the check for commit logic and rewrite the test.
And hence I propose to rename to shouldNotCommitCorruptedTasksOnTaskCorruptedException
Please correct if I am wrong or If I misunderstood!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I don't think I agree with this
The key for this test is that non-corrupted tasks are still committed as usual, the the offsets for the corrupted tasks are reset.
assertTrue(nonCorruptedTask.commitPrepared); assertThat(nonCorruptedTask.partitionsForOffsetReset, equalTo(Collections.emptySet())); assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions)); // check that we should not commit empty map either verify(consumer, never()).commitSync(emptyMap()); verify(stateManager).markChangelogAsCorrupted(taskId00Partitions); This is still a valid test!
But maybe we can skip the handle Assignment / complete restoration part if we immediatelly mock a RUNNING task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rewrote this test again in 6df4e79
| I rewrote only 3 tests for now. I wanted to ensure that my approach is correct before proceeding further. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR removes dead tests and rewrites 3 existing tests in TaskManagerTest to use the stateUpdater thread pattern. An additional overloaded setUpTaskManager() method was temporarily created to pass CI pipelines while working on failing tests incrementally.
- Removed 3 dead tests that were no longer needed
- Rewrote 3 tests to use stateUpdater thread instead of direct task manipulation
- Added temporary overloaded setup method for incremental CI fixes
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| @BeforeEach | ||
| public void setUp() { | ||
| taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false); | ||
| taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false, false); |
Copilot AI Sep 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method call now has two boolean parameters without clear meaning. Consider using named parameters or method overloading to make the intent clearer. The current call setUpTaskManager(..., false, false) is ambiguous about what each boolean controls.
| .withInputPartitions(taskId00Partitions).build(); | ||
| final TasksRegistry tasks = mock(TasksRegistry.class); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); |
Copilot AI Sep 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.
| .withInputPartitions(taskId01Partitions).build(); | ||
| final TasksRegistry tasks = mock(TasksRegistry.class); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); |
Copilot AI Sep 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.
| public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() { | ||
| final TasksRegistry tasks = mock(TasksRegistry.class); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); |
Copilot AI Sep 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.
| .withInputPartitions(taskId01Partitions).build(); | ||
| final TasksRegistry tasks = mock(TasksRegistry.class); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); |
Copilot AI Sep 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.
| .withInputPartitions(taskId01Partitions).build(); | ||
| final TasksRegistry tasks = mock(TasksRegistry.class); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); |
Copilot AI Sep 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.
lucasbru left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I left some comments!
| | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false); | ||
| | ||
| assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding - why do we actually need to call checkStateUpdater here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right! I think it's not actually required for this specific test case, but included it more as a safety check to ensure that the punctuation should happen only when the system is "ready". But, we can safely omit the line
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java Show resolved Hide resolved
| public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { | ||
| final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); | ||
| | ||
| final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); | ||
| final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); | ||
| public void shouldNotCommitCorruptedTasksOnTaskCorruptedException() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I don't think I agree with this
The key for this test is that non-corrupted tasks are still committed as usual, the the offsets for the corrupted tasks are reset.
assertTrue(nonCorruptedTask.commitPrepared); assertThat(nonCorruptedTask.partitionsForOffsetReset, equalTo(Collections.emptySet())); assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions)); // check that we should not commit empty map either verify(consumer, never()).commitSync(emptyMap()); verify(stateManager).markChangelogAsCorrupted(taskId00Partitions); This is still a valid test!
But maybe we can skip the handle Assignment / complete restoration part if we immediatelly mock a RUNNING task?
| .withInputPartitions(taskId00Partitions).build(); | ||
| final TasksRegistry tasks = mock(TasksRegistry.class); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); | ||
| final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly. these changes to setUpTaskManager are quite confusing and I don't understand why you did it.
| @shashankhs11 let me know when you need another review here |
6df4e79 to 85d1863 Compare | @lucasbru I have made the changes as suggested. Tagging for review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 1 comment.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java Show resolved Hide resolved
lucasbru left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
lucasbru left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, still got two comments
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java Show resolved Hide resolved
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java Show resolved Hide resolved
03483b6 to 54ac38b Compare
lucasbru left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. The test "shouldCommitNonCorruptedTasksOnTaskCorruptedException" doesn't seem very thorough, but I see that it was the case before as well, so I think this is good to go.
| Awesome! Thanks a lot for your time and patience, Lucas.
I've made a note of this. Maybe, we can come back to this at the end? |
| We seem to have problems with CI and Java 25. Can you try rebasing on latest trunk please? |
54ac38b to e4b7e3b Compare
Done! |
…rTest [2/N] (apache#20544) Changes made - Additional `setUpTaskManager()` overloaded method -- Created this temporarily to pass the CI pipelines so that I can work on the failing tests incrementally - Rewrote 3 tests to use stateUpdater thread Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Changes made
setUpTaskManager()overloaded method -- Created thistemporarily to pass the CI pipelines so that I can work on the failing
tests incrementally
Reviewers: Lucas Brutschy lbrutschy@confluent.io