Skip to content

Commit 36e609e

Browse files
authored
fix race in SubscriberImplTest::receiveMessage (#1586)
This bug manifests as testBundleAcks deadlocking. The test 1. publishes a series of messages to a mock server, 2. waits to receive them back, 3. and acknowledges them. For performance reasons, the client does not immediately send the acknowledgement request to the server. Instead, it sends acks in batch every 100ms. Using a fake clock, the test advances the time by 100ms, sending the ack-batch, then verify that the mock server receives the acks. The bug is in step 2. The test thread waits by waiting on a CountDownLatch, which is counted down by GRPC thread calling receiveMessage(). However, the method decrements the latch before acking the message. On a bad day, the test thread can wake up, advance the clock, and send the ack-batch before the GRPC thread could add the message to the batch. The test then waits for the server to receive an ack it never sent, deadlocking the test. The fix is for receiveMessage() to ack the message before decrementing the counter.
1 parent d325afb commit 36e609e

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,6 @@ void waitForExpectedMessages() throws InterruptedException {
115115

116116
@Override
117117
public void receiveMessage(PubsubMessage message, SettableFuture<AckReply> response) {
118-
if (messageCountLatch.isPresent()) {
119-
messageCountLatch.get().countDown();
120-
}
121-
122118
if (explicitAckReplies) {
123119
try {
124120
outstandingMessageReplies.put(response);
@@ -128,6 +124,10 @@ public void receiveMessage(PubsubMessage message, SettableFuture<AckReply> respo
128124
} else {
129125
replyTo(response);
130126
}
127+
128+
if (messageCountLatch.isPresent()) {
129+
messageCountLatch.get().countDown();
130+
}
131131
}
132132

133133
public void replyNextOutstandingMessage() {

0 commit comments

Comments
 (0)