Skip to content

Conversation

@shashankhs11
Copy link
Contributor

@shashankhs11 shashankhs11 commented Sep 16, 2025

Changes made

  • Additional setUpTaskManager() overloaded method -- Created this
    temporarily to pass the CI pipelines so that I can work on the failing
    tests incrementally
  • Rewrote 3 tests to use stateUpdater thread

Reviewers: Lucas Brutschy lbrutschy@confluent.io

@github-actions github-actions bot added triage PRs from the community streams tests Test fixes (including flaky tests) labels Sep 16, 2025
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link
Contributor Author

@shashankhs11 shashankhs11 Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines added are all temporary. Once we rewrite all the tests, we can do this once in the setUp()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly. these changes to setUpTaskManager are quite confusing and I don't understand why you did it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it is definitely a bit confusing 😅

The reason I did this is because, I wanted to identify all the tests that would fail after we removed the stateUpdaterEnabled flag. I thought the safest way to rewrite these tests incrementally would be to add another overloaded method without the flag, so we don’t break the CI checks in the meantime. This would temporarily add in a lot of unnecessary code, but my plan was to clean it up once all the tests are updated.

Do you think this approach make sense? I would really appreciate your thoughts, and I’m open to any suggestions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is confusing. Maybe you want to rename it more explicitly (setUpTaskManagerWithStateUpdater or setUpTaskManagerWithoutStateUpdater)?

Comment on lines 2283 to 2285
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);

final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
public void shouldNotCommitCorruptedTasksOnTaskCorruptedException() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this test from shouldCommitNonCorruptedTasksOnTaskCorruptedException. Based on my understanding, the commit logic happens at the StreamThread level, but only the exception propagation happens in TaskManager with checkStateUpdater. So I decided to omit the check for commit logic and rewrite the test.

And hence I propose to rename to shouldNotCommitCorruptedTasksOnTaskCorruptedException

Please correct if I am wrong or If I misunderstood!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think I agree with this

The key for this test is that non-corrupted tasks are still committed as usual, the the offsets for the corrupted tasks are reset.

 assertTrue(nonCorruptedTask.commitPrepared); assertThat(nonCorruptedTask.partitionsForOffsetReset, equalTo(Collections.emptySet())); assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions)); // check that we should not commit empty map either verify(consumer, never()).commitSync(emptyMap()); verify(stateManager).markChangelogAsCorrupted(taskId00Partitions); 

This is still a valid test!

But maybe we can skip the handle Assignment / complete restoration part if we immediatelly mock a RUNNING task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote this test again in 6df4e79

@shashankhs11
Copy link
Contributor Author

I rewrote only 3 tests for now. I wanted to ensure that my approach is correct before proceeding further.
@lucasbru -- tagging for review

@github-actions github-actions bot removed the triage PRs from the community label Sep 17, 2025
@lucasbru lucasbru self-assigned this Sep 18, 2025
@lucasbru lucasbru requested a review from Copilot September 18, 2025 08:21
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR removes dead tests and rewrites 3 existing tests in TaskManagerTest to use the stateUpdater thread pattern. An additional overloaded setUpTaskManager() method was temporarily created to pass CI pipelines while working on failing tests incrementally.

  • Removed 3 dead tests that were no longer needed
  • Rewrote 3 tests to use stateUpdater thread instead of direct task manipulation
  • Added temporary overloaded setup method for incremental CI fixes

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@BeforeEach
public void setUp() {
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false, false);
Copy link

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method call now has two boolean parameters without clear meaning. Consider using named parameters or method overloading to make the intent clearer. The current call setUpTaskManager(..., false, false) is ambiguous about what each boolean controls.

Copilot uses AI. Check for mistakes.
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.

Copilot uses AI. Check for mistakes.
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.

Copilot uses AI. Check for mistakes.
public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.

Copilot uses AI. Check for mistakes.
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.

Copilot uses AI. Check for mistakes.
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.

Copilot uses AI. Check for mistakes.
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I left some comments!


final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);

assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding - why do we actually need to call checkStateUpdater here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right! I think it's not actually required for this specific test case, but included it more as a safety check to ensure that the punctuation should happen only when the system is "ready". But, we can safely omit the line

Comment on lines 2283 to 2285
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);

final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
public void shouldNotCommitCorruptedTasksOnTaskCorruptedException() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think I agree with this

The key for this test is that non-corrupted tasks are still committed as usual, the the offsets for the corrupted tasks are reset.

 assertTrue(nonCorruptedTask.commitPrepared); assertThat(nonCorruptedTask.partitionsForOffsetReset, equalTo(Collections.emptySet())); assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions)); // check that we should not commit empty map either verify(consumer, never()).commitSync(emptyMap()); verify(stateManager).markChangelogAsCorrupted(taskId00Partitions); 

This is still a valid test!

But maybe we can skip the handle Assignment / complete restoration part if we immediatelly mock a RUNNING task?

.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly. these changes to setUpTaskManager are quite confusing and I don't understand why you did it.

@lucasbru
Copy link
Member

@shashankhs11 let me know when you need another review here

@shashankhs11
Copy link
Contributor Author

@lucasbru I have made the changes as suggested. Tagging for review

@lucasbru lucasbru requested a review from Copilot October 8, 2025 12:36
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated 1 comment.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, still got two comments

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. The test "shouldCommitNonCorruptedTasksOnTaskCorruptedException" doesn't seem very thorough, but I see that it was the case before as well, so I think this is good to go.

@shashankhs11
Copy link
Contributor Author

shashankhs11 commented Oct 8, 2025

Awesome! Thanks a lot for your time and patience, Lucas.
I have worked on more tests and will be making the next PR as soon as this PR has been merged. Does that sound good?

The test "shouldCommitNonCorruptedTasksOnTaskCorruptedException" doesn't seem very thorough

I've made a note of this. Maybe, we can come back to this at the end?

@lucasbru
Copy link
Member

We seem to have problems with CI and Java 25. Can you try rebasing on latest trunk please?

@shashankhs11
Copy link
Contributor Author

Can you try rebasing on latest trunk please?

Done!

@lucasbru lucasbru merged commit 59f51fb into apache:trunk Oct 10, 2025
20 checks passed
@shashankhs11 shashankhs11 deleted the KAFKA-19683-2 branch October 11, 2025 13:36
eduwercamacaro pushed a commit to littlehorse-enterprises/kafka that referenced this pull request Nov 12, 2025
…rTest [2/N] (apache#20544) Changes made - Additional `setUpTaskManager()` overloaded method -- Created this temporarily to pass the CI pipelines so that I can work on the failing tests incrementally - Rewrote 3 tests to use stateUpdater thread Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

2 participants