aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java18
1 files changed, 13 insertions, 5 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index e1f888d521..19e038f552 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -184,7 +184,7 @@ public class GrpcServerImpl implements RPCServer {
}
/**
- * A class that handles communicating through a gRPC interface.
+ * A class that handles communicating through a gRPC interface for a streaming rpc call.
*
* <p>It can do four things:
* <li>Send a response message over the wire. If the channel is ready, it's sent immediately, if
@@ -208,7 +208,10 @@ public class GrpcServerImpl implements RPCServer {
private final AtomicLong receivedEventCount = new AtomicLong(0);
@VisibleForTesting
- GrpcSink(ServerCallStreamObserver<RunResponse> observer, ExecutorService executor) {
+ GrpcSink(
+ final String rpcCommandName,
+ ServerCallStreamObserver<RunResponse> observer,
+ ExecutorService executor) {
// This queue is intentionally unbounded: we always act on it fairly quickly so filling up
// RAM is not a concern but we don't want to block in the gRPC cancel/onready handlers.
this.actionQueue = new LinkedBlockingQueue<>();
@@ -222,7 +225,10 @@ public class GrpcServerImpl implements RPCServer {
if (commandThread != null) {
log.info(
String.format(
- "Interrupting thread %s due to gRPC cancel", commandThread.getName()));
+ "Interrupting thread %s due to the streaming %s call being cancelled "
+ + "(likely client hang up or explicit gRPC-level cancellation)",
+ commandThread.getName(),
+ rpcCommandName));
commandThread.interrupt();
}
@@ -887,7 +893,9 @@ public class GrpcServerImpl implements RPCServer {
new CommandServerGrpc.CommandServerImplBase() {
@Override
public void run(final RunRequest request, final StreamObserver<RunResponse> observer) {
- final GrpcSink sink = new GrpcSink((ServerCallStreamObserver<RunResponse>) observer,
+ final GrpcSink sink = new GrpcSink(
+ "Run",
+ (ServerCallStreamObserver<RunResponse>) observer,
streamExecutorPool);
// Switch to our own threads so that onReadyStateHandler can be called (see class-level
// comment)
@@ -918,7 +926,7 @@ public class GrpcServerImpl implements RPCServer {
@Override
public void cancel(
final CancelRequest request, final StreamObserver<CancelResponse> streamObserver) {
- log.info("Got cancel message for " + request.getCommandId());
+ log.info(String.format("Got CancelRequest for command id %s", request.getCommandId()));
if (!request.getCookie().equals(requestCookie)) {
streamObserver.onCompleted();
return;