aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Lukacs Berki <lberki@google.com>2016-10-11 07:51:25 +0000
committerGravatar Yue Gan <yueg@google.com>2016-10-11 08:46:39 +0000
commitc925f34ee5a04126da64882ad63c7a29a0b560c4 (patch)
tree09f0595c38b01d4e5c487cc2b1094dbb74be8357 /src
parent5e5aa9d9a4c2eb2b0c185a8108b3bdce0b66d552 (diff)
Avoid deadlock when a client disconnects with the gRPC queue full.
-- MOS_MIGRATED_REVID=135768834
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java16
-rw-r--r--src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java31
2 files changed, 47 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index ffdf473a78..03af9d78a6 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -198,6 +198,7 @@ public class GrpcServerImpl implements RPCServer {
private final Future<?> future;
private final AtomicReference<Thread> commandThread = new AtomicReference<>();
private final AtomicBoolean disconnected = new AtomicBoolean(false);
+ private final AtomicLong receivedEventCount = new AtomicLong(0);
@VisibleForTesting
GrpcSink(ServerCallStreamObserver<RunResponse> observer, ExecutorService executor) {
@@ -240,6 +241,11 @@ public class GrpcServerImpl implements RPCServer {
}
@VisibleForTesting
+ long getReceivedEventCount() {
+ return receivedEventCount.get();
+ }
+
+ @VisibleForTesting
void setCommandThread(Thread thread) {
Thread old = commandThread.getAndSet(thread);
if (old != null) {
@@ -320,8 +326,14 @@ public class GrpcServerImpl implements RPCServer {
while (true) {
SinkThreadAction action;
action = Uninterruptibles.takeUninterruptibly(actionQueue);
+ receivedEventCount.incrementAndGet();
switch (action) {
case FINISH:
+ if (itemPending) {
+ exchange(new SinkThreadItem(false, null), true);
+ itemPending = false;
+ }
+
// Reset the interrupted bit so that it doesn't stay set for the next command that is
// handled by this thread
Thread.interrupted();
@@ -337,6 +349,10 @@ public class GrpcServerImpl implements RPCServer {
case DISCONNECT:
log.info("Client disconnected for stream thread " + Thread.currentThread().getName());
disconnected.set(true);
+ if (itemPending) {
+ exchange(new SinkThreadItem(false, null), true);
+ itemPending = false;
+ }
break;
case SEND:
diff --git a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java
index 7f83d477da..7d504d347e 100644
--- a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java
@@ -28,6 +28,7 @@ import com.google.devtools.build.lib.server.GrpcServerImpl.StreamType;
import com.google.devtools.build.lib.testutil.Suite;
import com.google.devtools.build.lib.testutil.TestSpec;
import com.google.devtools.build.lib.testutil.TestThread;
+import com.google.devtools.build.lib.testutil.TestUtils;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.protobuf.ByteString;
import io.grpc.Status;
@@ -270,6 +271,36 @@ public class GrpcServerTest {
}
@Test
+ public void testDeadlockWhenDisconnectedWithQueueFull() throws Exception {
+ MockObserver observer = new MockObserver();
+ final GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink(observer, executor);
+
+ observer.ready.set(false);
+ TestThread sender = new TestThread() {
+ @Override
+ public void runTest() {
+ // Should return false due to the disconnect
+ assertThat(sink.offer(runResponse())).isFalse();
+ }
+ };
+
+ sender.setDaemon(true);
+ sender.start();
+
+ // Wait until the sink thread has processed the SEND message from #offer()
+ while (sink.getReceivedEventCount() < 1) {
+ Thread.sleep(200);
+ }
+
+ // Disconnect while there is an item pending
+ observer.onCancelHandler.run();
+
+ // Make sure that both the sink and the sender thread finish
+ assertThat(sink.finish()).isTrue();
+ sender.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS);
+ }
+
+ @Test
public void testRpcOutputStreamChunksLargeResponses() throws Exception {
GrpcServerImpl.GrpcSink mockSink = mock(GrpcServerImpl.GrpcSink.class);
@SuppressWarnings("resource")