aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/server
diff options
context:
space:
mode:
authorGravatar Michajlo Matijkiw <michajlo@google.com>2016-09-23 16:41:22 +0000
committerGravatar Laszlo Csomor <laszlocsomor@google.com>2016-09-26 07:46:52 +0000
commitfc23b324013fb6b14422f00d420c0d5ba6f6ec99 (patch)
tree13a023eb60e101956a3ef2e15038af7771fa27f6 /src/main/java/com/google/devtools/build/lib/server
parent5c7f4b0a488a859a94e073cf0946a9552e68ae7c (diff)
Chunk large stdout/err writes in RpcOutputStream
Prevents overly large responses from overwhelming grpc. -- MOS_MIGRATED_REVID=134083479
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/server')
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java66
1 files changed, 37 insertions, 29 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index ba7868c298..42e57370f4 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -145,7 +145,8 @@ public class GrpcServerImpl extends RPCServer {
}
}
- private enum StreamType {
+ @VisibleForTesting
+ enum StreamType {
STDOUT,
STDERR,
}
@@ -350,43 +351,50 @@ public class GrpcServerImpl extends RPCServer {
}
// TODO(lberki): Maybe we should implement line buffering?
- private class RpcOutputStream extends OutputStream {
+ @VisibleForTesting
+ static class RpcOutputStream extends OutputStream {
+ private static final int CHUNK_SIZE = 8192;
+
private final String commandId;
+ private final String responseCookie;
private final StreamType type;
private final GrpcSink sink;
- private RpcOutputStream(String commandId, StreamType type, GrpcSink sink) {
+ RpcOutputStream(String commandId, String responseCookie, StreamType type, GrpcSink sink) {
this.commandId = commandId;
+ this.responseCookie = responseCookie;
this.type = type;
this.sink = sink;
}
@Override
- public void write(byte[] b, int off, int inlen) throws IOException {
- ByteString input = ByteString.copyFrom(b, off, inlen);
- RunResponse.Builder response = RunResponse
- .newBuilder()
- .setCookie(responseCookie)
- .setCommandId(commandId);
-
- switch (type) {
- case STDOUT: response.setStandardOutput(input); break;
- case STDERR: response.setStandardError(input); break;
- default: throw new IllegalStateException();
- }
+ public synchronized void write(byte[] b, int off, int inlen) throws IOException {
+ for (int i = 0; i < inlen; i += CHUNK_SIZE) {
+ ByteString input = ByteString.copyFrom(b, off + i, Math.min(CHUNK_SIZE, inlen - i));
+ RunResponse.Builder response = RunResponse
+ .newBuilder()
+ .setCookie(responseCookie)
+ .setCommandId(commandId);
- // Send the chunk to the streamer thread. May block.
- if (!sink.offer(response.build())) {
- // Client disconnected. Terminate the current command as soon as possible. Note that
- // throwing IOException is not enough because we are in the habit of swallowing it. Note
- // that when gRPC notifies us about the disconnection (see the call to setOnCancelHandler)
- // we interrupt the command thread, which should be enough to make the server come around as
- // soon as possible.
- log.info(
- String.format(
- "Client disconnected received for command %s on thread %s",
- commandId, Thread.currentThread().getName()));
- throw new IOException("Client disconnected");
+ switch (type) {
+ case STDOUT: response.setStandardOutput(input); break;
+ case STDERR: response.setStandardError(input); break;
+ default: throw new IllegalStateException();
+ }
+
+ // Send the chunk to the streamer thread. May block.
+ if (!sink.offer(response.build())) {
+ // Client disconnected. Terminate the current command as soon as possible. Note that
+ // throwing IOException is not enough because we are in the habit of swallowing it. Note
+ // that when gRPC notifies us about the disconnection (see the call to setOnCancelHandler)
+ // we interrupt the command thread, which should be enough to make the server come around
+ // as soon as possible.
+ log.info(
+ String.format(
+ "Client disconnected received for command %s on thread %s",
+ commandId, Thread.currentThread().getName()));
+ throw new IOException("Client disconnected");
+ }
}
}
@@ -640,8 +648,8 @@ public class GrpcServerImpl extends RPCServer {
OutErr rpcOutErr =
OutErr.create(
- new RpcOutputStream(command.id, StreamType.STDOUT, sink),
- new RpcOutputStream(command.id, StreamType.STDERR, sink));
+ new RpcOutputStream(command.id, responseCookie, StreamType.STDOUT, sink),
+ new RpcOutputStream(command.id, responseCookie, StreamType.STDERR, sink));
exitCode =
commandExecutor.exec(