aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java18
-rw-r--r--src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java12
2 files changed, 19 insertions, 11 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index e1f888d521..19e038f552 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -184,7 +184,7 @@ public class GrpcServerImpl implements RPCServer {
}
/**
- * A class that handles communicating through a gRPC interface.
+ * A class that handles communicating through a gRPC interface for a streaming rpc call.
*
* <p>It can do four things:
* <li>Send a response message over the wire. If the channel is ready, it's sent immediately, if
@@ -208,7 +208,10 @@ public class GrpcServerImpl implements RPCServer {
private final AtomicLong receivedEventCount = new AtomicLong(0);
@VisibleForTesting
- GrpcSink(ServerCallStreamObserver<RunResponse> observer, ExecutorService executor) {
+ GrpcSink(
+ final String rpcCommandName,
+ ServerCallStreamObserver<RunResponse> observer,
+ ExecutorService executor) {
// This queue is intentionally unbounded: we always act on it fairly quickly so filling up
// RAM is not a concern but we don't want to block in the gRPC cancel/onready handlers.
this.actionQueue = new LinkedBlockingQueue<>();
@@ -222,7 +225,10 @@ public class GrpcServerImpl implements RPCServer {
if (commandThread != null) {
log.info(
String.format(
- "Interrupting thread %s due to gRPC cancel", commandThread.getName()));
+ "Interrupting thread %s due to the streaming %s call being cancelled "
+ + "(likely client hang up or explicit gRPC-level cancellation)",
+ commandThread.getName(),
+ rpcCommandName));
commandThread.interrupt();
}
@@ -887,7 +893,9 @@ public class GrpcServerImpl implements RPCServer {
new CommandServerGrpc.CommandServerImplBase() {
@Override
public void run(final RunRequest request, final StreamObserver<RunResponse> observer) {
- final GrpcSink sink = new GrpcSink((ServerCallStreamObserver<RunResponse>) observer,
+ final GrpcSink sink = new GrpcSink(
+ "Run",
+ (ServerCallStreamObserver<RunResponse>) observer,
streamExecutorPool);
// Switch to our own threads so that onReadyStateHandler can be called (see class-level
// comment)
@@ -918,7 +926,7 @@ public class GrpcServerImpl implements RPCServer {
@Override
public void cancel(
final CancelRequest request, final StreamObserver<CancelResponse> streamObserver) {
- log.info("Got cancel message for " + request.getCommandId());
+ log.info(String.format("Got CancelRequest for command id %s", request.getCommandId()));
if (!request.getCookie().equals(requestCookie)) {
streamObserver.onCompleted();
return;
diff --git a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java
index 7d504d347e..5a0ae62eb5 100644
--- a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java
@@ -169,7 +169,7 @@ public class GrpcServerTest {
@Test
public void testSendingSimpleMessage() {
MockObserver observer = new MockObserver();
- GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink(observer, executor);
+ GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink("Dummy", observer, executor);
assertThat(sink.offer(runResponse())).isTrue();
assertThat(sink.finish()).isFalse();
@@ -179,7 +179,7 @@ public class GrpcServerTest {
@Test
public void testSurvivesLateOnCancelHandler() {
MockObserver observer = new MockObserver();
- GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink(observer, executor);
+ GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink("Dummy", observer, executor);
// First make the observer cancelled...
observer.cancelled.set(true);
@@ -197,7 +197,7 @@ public class GrpcServerTest {
@Test
public void testCancellationTurnsSinkIntoBlackHole() {
MockObserver observer = new MockObserver();
- GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink(observer, executor);
+ GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink("Dummy", observer, executor);
observer.cancelled.set(true);
observer.onCancelHandler.run();
@@ -226,7 +226,7 @@ public class GrpcServerTest {
victim.start();
MockObserver observer = new MockObserver();
- GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink(observer, executor);
+ GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink("Dummy", observer, executor);
sink.setCommandThread(victim);
observer.cancelled.set(true);
observer.onCancelHandler.run();
@@ -240,7 +240,7 @@ public class GrpcServerTest {
@Test
public void testObeysReadySignal() throws Exception {
MockObserver observer = new MockObserver();
- final GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink(observer, executor);
+ final GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink("Dummy", observer, executor);
// First check if we can send a simple message
assertThat(sink.offer(runResponse())).isTrue();
@@ -273,7 +273,7 @@ public class GrpcServerTest {
@Test
public void testDeadlockWhenDisconnectedWithQueueFull() throws Exception {
MockObserver observer = new MockObserver();
- final GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink(observer, executor);
+ final GrpcServerImpl.GrpcSink sink = new GrpcServerImpl.GrpcSink("Dummy", observer, executor);
observer.ready.set(false);
TestThread sender = new TestThread() {