diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java | 18 |
1 files changed, 13 insertions, 5 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index e1f888d521..19e038f552 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -184,7 +184,7 @@ public class GrpcServerImpl implements RPCServer { } /** - * A class that handles communicating through a gRPC interface. + * A class that handles communicating through a gRPC interface for a streaming rpc call. * * <p>It can do four things: * <li>Send a response message over the wire. If the channel is ready, it's sent immediately, if @@ -208,7 +208,10 @@ public class GrpcServerImpl implements RPCServer { private final AtomicLong receivedEventCount = new AtomicLong(0); @VisibleForTesting - GrpcSink(ServerCallStreamObserver<RunResponse> observer, ExecutorService executor) { + GrpcSink( + final String rpcCommandName, + ServerCallStreamObserver<RunResponse> observer, + ExecutorService executor) { // This queue is intentionally unbounded: we always act on it fairly quickly so filling up // RAM is not a concern but we don't want to block in the gRPC cancel/onready handlers. this.actionQueue = new LinkedBlockingQueue<>(); @@ -222,7 +225,10 @@ public class GrpcServerImpl implements RPCServer { if (commandThread != null) { log.info( String.format( - "Interrupting thread %s due to gRPC cancel", commandThread.getName())); + "Interrupting thread %s due to the streaming %s call being cancelled " + + "(likely client hang up or explicit gRPC-level cancellation)", + commandThread.getName(), + rpcCommandName)); commandThread.interrupt(); } @@ -887,7 +893,9 @@ public class GrpcServerImpl implements RPCServer { new CommandServerGrpc.CommandServerImplBase() { @Override public void run(final RunRequest request, final StreamObserver<RunResponse> observer) { - final GrpcSink sink = new GrpcSink((ServerCallStreamObserver<RunResponse>) observer, + final GrpcSink sink = new GrpcSink( + "Run", + (ServerCallStreamObserver<RunResponse>) observer, streamExecutorPool); // Switch to our own threads so that onReadyStateHandler can be called (see class-level // comment) @@ -918,7 +926,7 @@ public class GrpcServerImpl implements RPCServer { @Override public void cancel( final CancelRequest request, final StreamObserver<CancelResponse> streamObserver) { - log.info("Got cancel message for " + request.getCommandId()); + log.info(String.format("Got CancelRequest for command id %s", request.getCommandId())); if (!request.getCookie().equals(requestCookie)) { streamObserver.onCompleted(); return; |