diff options
author | Michajlo Matijkiw <michajlo@google.com> | 2016-09-23 16:41:22 +0000 |
---|---|---|
committer | Laszlo Csomor <laszlocsomor@google.com> | 2016-09-26 07:46:52 +0000 |
commit | fc23b324013fb6b14422f00d420c0d5ba6f6ec99 (patch) | |
tree | 13a023eb60e101956a3ef2e15038af7771fa27f6 /src/main/java/com/google/devtools/build/lib/server | |
parent | 5c7f4b0a488a859a94e073cf0946a9552e68ae7c (diff) |
Chunk large stdout/err writes in RpcOutputStream
Prevents overly large responses from overwhelming grpc.
--
MOS_MIGRATED_REVID=134083479
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/server')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java | 66 |
1 files changed, 37 insertions, 29 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index ba7868c298..42e57370f4 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -145,7 +145,8 @@ public class GrpcServerImpl extends RPCServer { } } - private enum StreamType { + @VisibleForTesting + enum StreamType { STDOUT, STDERR, } @@ -350,43 +351,50 @@ public class GrpcServerImpl extends RPCServer { } // TODO(lberki): Maybe we should implement line buffering? - private class RpcOutputStream extends OutputStream { + @VisibleForTesting + static class RpcOutputStream extends OutputStream { + private static final int CHUNK_SIZE = 8192; + private final String commandId; + private final String responseCookie; private final StreamType type; private final GrpcSink sink; - private RpcOutputStream(String commandId, StreamType type, GrpcSink sink) { + RpcOutputStream(String commandId, String responseCookie, StreamType type, GrpcSink sink) { this.commandId = commandId; + this.responseCookie = responseCookie; this.type = type; this.sink = sink; } @Override - public void write(byte[] b, int off, int inlen) throws IOException { - ByteString input = ByteString.copyFrom(b, off, inlen); - RunResponse.Builder response = RunResponse - .newBuilder() - .setCookie(responseCookie) - .setCommandId(commandId); - - switch (type) { - case STDOUT: response.setStandardOutput(input); break; - case STDERR: response.setStandardError(input); break; - default: throw new IllegalStateException(); - } + public synchronized void write(byte[] b, int off, int inlen) throws IOException { + for (int i = 0; i < inlen; i += CHUNK_SIZE) { + ByteString input = ByteString.copyFrom(b, off + i, Math.min(CHUNK_SIZE, inlen - i)); + RunResponse.Builder response = RunResponse + .newBuilder() + .setCookie(responseCookie) + .setCommandId(commandId); - // Send the chunk to the streamer thread. May block. - if (!sink.offer(response.build())) { - // Client disconnected. Terminate the current command as soon as possible. Note that - // throwing IOException is not enough because we are in the habit of swallowing it. Note - // that when gRPC notifies us about the disconnection (see the call to setOnCancelHandler) - // we interrupt the command thread, which should be enough to make the server come around as - // soon as possible. - log.info( - String.format( - "Client disconnected received for command %s on thread %s", - commandId, Thread.currentThread().getName())); - throw new IOException("Client disconnected"); + switch (type) { + case STDOUT: response.setStandardOutput(input); break; + case STDERR: response.setStandardError(input); break; + default: throw new IllegalStateException(); + } + + // Send the chunk to the streamer thread. May block. + if (!sink.offer(response.build())) { + // Client disconnected. Terminate the current command as soon as possible. Note that + // throwing IOException is not enough because we are in the habit of swallowing it. Note + // that when gRPC notifies us about the disconnection (see the call to setOnCancelHandler) + // we interrupt the command thread, which should be enough to make the server come around + // as soon as possible. + log.info( + String.format( + "Client disconnected received for command %s on thread %s", + commandId, Thread.currentThread().getName())); + throw new IOException("Client disconnected"); + } } } @@ -640,8 +648,8 @@ public class GrpcServerImpl extends RPCServer { OutErr rpcOutErr = OutErr.create( - new RpcOutputStream(command.id, StreamType.STDOUT, sink), - new RpcOutputStream(command.id, StreamType.STDERR, sink)); + new RpcOutputStream(command.id, responseCookie, StreamType.STDOUT, sink), + new RpcOutputStream(command.id, responseCookie, StreamType.STDERR, sink)); exitCode = commandExecutor.exec( |