Skip to content

Commit 8036e49

Browse files
authored
KAFKA-17554 Flaky testFutureCompletionOutsidePoll in ConsumerNetworkClientTest (#18298)
Jira: https://issues.apache.org/jira/browse/KAFKA-17554 In the previous workflow, the test passes under two conditions: 1. The `t1` thread is waiting for the main thread's `client.wakeup()`. If successful, `t1` will wake up `t2`, allowing `t2` to complete the future. 2. If `t1` fails to receive the `client.wakeup()` from the main thread, `t2` will be woken up by the main thread. In the previous implementation, we used a `CountDownLatch` to control the execution of three threads, but it often led to race conditions. Currently, we have modified it to use two threads to test this scenario. I run `I=0; while ./gradlew :clients:test --tests ConsumerNetworkClientTest.testFutureCompletionOutsidePoll --rerun --fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done` and pass 3000+ times. ![image](https://github.com/user-attachments/assets/3b8d804e-fbe0-4030-8686-4960fc717d07) Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 1f7631c commit 8036e49

File tree

1 file changed

+12
-24
lines changed

1 file changed

+12
-24
lines changed

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.apache.kafka.common.utils.MockTime;
4242
import org.apache.kafka.test.TestUtils;
4343

44-
import org.junit.jupiter.api.Disabled;
4544
import org.junit.jupiter.api.Test;
4645

4746
import java.time.Duration;
@@ -266,45 +265,34 @@ public void testMetadataFailurePropagated() {
266265
assertEquals(metadataException, exc);
267266
}
268267

269-
@Disabled("KAFKA-17554")
270268
@Test
271269
public void testFutureCompletionOutsidePoll() throws Exception {
272270
// Tests the scenario in which the request that is being awaited in one thread
273271
// is received and completed in another thread.
274-
275-
final CountDownLatch t1TheardCountDownLatch = new CountDownLatch(1);
276-
final CountDownLatch t2ThreadCountDownLatch = new CountDownLatch(2);
277-
278272
final RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
279273
consumerClient.pollNoWakeup(); // dequeue and send the request
280274

275+
CountDownLatch bothThreadsReady = new CountDownLatch(2);
276+
281277
client.enableBlockingUntilWakeup(2);
282-
Thread t1 = new Thread(() -> {
283-
t1TheardCountDownLatch.countDown();
278+
279+
Thread t1 = new Thread(() -> {
280+
bothThreadsReady.countDown();
284281
consumerClient.pollNoWakeup();
285-
t2ThreadCountDownLatch.countDown();
286282
});
287-
288-
t1.start();
289283

290284
Thread t2 = new Thread(() -> {
291-
try {
292-
t2ThreadCountDownLatch.await();
293-
consumerClient.poll(future);
294-
} catch (InterruptedException e) {
295-
throw new RuntimeException(e);
296-
}
285+
bothThreadsReady.countDown();
286+
consumerClient.poll(future);
297287
});
288+
289+
t1.start();
298290
t2.start();
299-
300-
// Simulate a network response and return from the poll in t1
291+
292+
// Wait until both threads are blocked in poll
293+
bothThreadsReady.await();
301294
client.respond(heartbeatResponse(Errors.NONE));
302-
// Wait for t1 to block in poll
303-
t1TheardCountDownLatch.await();
304-
305295
client.wakeup();
306-
// while t1 is blocked in poll, t2 should be able to complete the future
307-
t2ThreadCountDownLatch.countDown();
308296

309297
// Both threads should complete since t1 should wakeup t2
310298
t1.join();

0 commit comments

Comments
 (0)