diff options
author | 2016-10-11 07:51:25 +0000 | |
---|---|---|
committer | 2016-10-11 08:46:39 +0000 | |
commit | c925f34ee5a04126da64882ad63c7a29a0b560c4 (patch) | |
tree | 09f0595c38b01d4e5c487cc2b1094dbb74be8357 /src | |
parent | 5e5aa9d9a4c2eb2b0c185a8108b3bdce0b66d552 (diff) |
Avoid deadlock when a client disconnects with the gRPC queue full.
--
MOS_MIGRATED_REVID=135768834
Diffstat (limited to 'src')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java | 16 | ||||
-rw-r--r-- | src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java | 31 |
2 files changed, 47 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index ffdf473a78..03af9d78a6 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -198,6 +198,7 @@ public class GrpcServerImpl implements RPCServer { private final Future<?> future; private final AtomicReference<Thread> commandThread = new AtomicReference<>(); private final AtomicBoolean disconnected = new AtomicBoolean(false); + private final AtomicLong receivedEventCount = new AtomicLong(0); @VisibleForTesting GrpcSink(ServerCallStreamObserver<RunResponse> observer, ExecutorService executor) { @@ -240,6 +241,11 @@ public class GrpcServerImpl implements RPCServer { } @VisibleForTesting + long getReceivedEventCount() { + return receivedEventCount.get(); + } + + @VisibleForTesting void setCommandThread(Thread thread) { Thread old = commandThread.getAndSet(thread); if (old != null) { @@ -320,8 +326,14 @@ public class GrpcServerImpl implements RPCServer { while (true) { SinkThreadAction action; action = Uninterruptibles.takeUninterruptibly(actionQueue); + receivedEventCount.incrementAndGet(); switch (action) { case FINISH: + if (itemPending) { + exchange(new SinkThreadItem(false, null), true); + itemPending = false; + } + // Reset the interrupted bit so that it doesn't stay set for the next command that is // handled by this thread Thread.interrupted(); @@ -337,6 +349,10 @@ public class GrpcServerImpl implements RPCServer { case DISCONNECT: log.info("Client disconnected for stream thread " + Thread.currentThread().getName()); disconnected.set(true); + if (itemPending) { + exchange(new SinkThreadItem(false, null), true); + itemPending = false; + } break; case SEND: diff --git a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java index 7f83d477da..7d504d347e 100644 --- a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java +++ b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java @@ -28,6 +28,7 @@ import com.google.devtools.build.lib.server.GrpcServerImpl.StreamType; import com.google.devtools.build.lib.testutil.Suite; import com.google.devtools.build.lib.testutil.TestSpec; import com.google.devtools.build.lib.testutil.TestThread; +import com.google.devtools.build.lib.testutil.TestUtils; import com.google.devtools.build.lib.util.Preconditions; import com.google.protobuf.ByteString; import io.grpc.Status; @@ -270,6 +271,36 @@ public class GrpcServerTest { } @Test + public void testDeadlockWhenDisconnectedWithQueueFull() throws Exception { + MockObserver observer = new MockObserver(); + final GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink(observer, executor); + + observer.ready.set(false); + TestThread sender = new TestThread() { + @Override + public void runTest() { + // Should return false due to the disconnect + assertThat(sink.offer(runResponse())).isFalse(); + } + }; + + sender.setDaemon(true); + sender.start(); + + // Wait until the sink thread has processed the SEND message from #offer() + while (sink.getReceivedEventCount() < 1) { + Thread.sleep(200); + } + + // Disconnect while there is an item pending + observer.onCancelHandler.run(); + + // Make sure that both the sink and the sender thread finish + assertThat(sink.finish()).isTrue(); + sender.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS); + } + + @Test public void testRpcOutputStreamChunksLargeResponses() throws Exception { GrpcServerImpl.GrpcSink mockSink = mock(GrpcServerImpl.GrpcSink.class); @SuppressWarnings("resource") |