aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java66
-rw-r--r--src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java43
2 files changed, 80 insertions, 29 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index ba7868c298..42e57370f4 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -145,7 +145,8 @@ public class GrpcServerImpl extends RPCServer {
}
}
- private enum StreamType {
+ @VisibleForTesting
+ enum StreamType {
STDOUT,
STDERR,
}
@@ -350,43 +351,50 @@ public class GrpcServerImpl extends RPCServer {
}
// TODO(lberki): Maybe we should implement line buffering?
- private class RpcOutputStream extends OutputStream {
+ @VisibleForTesting
+ static class RpcOutputStream extends OutputStream {
+ private static final int CHUNK_SIZE = 8192;
+
private final String commandId;
+ private final String responseCookie;
private final StreamType type;
private final GrpcSink sink;
- private RpcOutputStream(String commandId, StreamType type, GrpcSink sink) {
+ RpcOutputStream(String commandId, String responseCookie, StreamType type, GrpcSink sink) {
this.commandId = commandId;
+ this.responseCookie = responseCookie;
this.type = type;
this.sink = sink;
}
@Override
- public void write(byte[] b, int off, int inlen) throws IOException {
- ByteString input = ByteString.copyFrom(b, off, inlen);
- RunResponse.Builder response = RunResponse
- .newBuilder()
- .setCookie(responseCookie)
- .setCommandId(commandId);
-
- switch (type) {
- case STDOUT: response.setStandardOutput(input); break;
- case STDERR: response.setStandardError(input); break;
- default: throw new IllegalStateException();
- }
+ public synchronized void write(byte[] b, int off, int inlen) throws IOException {
+ for (int i = 0; i < inlen; i += CHUNK_SIZE) {
+ ByteString input = ByteString.copyFrom(b, off + i, Math.min(CHUNK_SIZE, inlen - i));
+ RunResponse.Builder response = RunResponse
+ .newBuilder()
+ .setCookie(responseCookie)
+ .setCommandId(commandId);
- // Send the chunk to the streamer thread. May block.
- if (!sink.offer(response.build())) {
- // Client disconnected. Terminate the current command as soon as possible. Note that
- // throwing IOException is not enough because we are in the habit of swallowing it. Note
- // that when gRPC notifies us about the disconnection (see the call to setOnCancelHandler)
- // we interrupt the command thread, which should be enough to make the server come around as
- // soon as possible.
- log.info(
- String.format(
- "Client disconnected received for command %s on thread %s",
- commandId, Thread.currentThread().getName()));
- throw new IOException("Client disconnected");
+ switch (type) {
+ case STDOUT: response.setStandardOutput(input); break;
+ case STDERR: response.setStandardError(input); break;
+ default: throw new IllegalStateException();
+ }
+
+ // Send the chunk to the streamer thread. May block.
+ if (!sink.offer(response.build())) {
+ // Client disconnected. Terminate the current command as soon as possible. Note that
+ // throwing IOException is not enough because we are in the habit of swallowing it. Note
+ // that when gRPC notifies us about the disconnection (see the call to setOnCancelHandler)
+ // we interrupt the command thread, which should be enough to make the server come around
+ // as soon as possible.
+ log.info(
+ String.format(
+ "Client disconnected received for command %s on thread %s",
+ commandId, Thread.currentThread().getName()));
+ throw new IOException("Client disconnected");
+ }
}
}
@@ -640,8 +648,8 @@ public class GrpcServerImpl extends RPCServer {
OutErr rpcOutErr =
OutErr.create(
- new RpcOutputStream(command.id, StreamType.STDOUT, sink),
- new RpcOutputStream(command.id, StreamType.STDERR, sink));
+ new RpcOutputStream(command.id, responseCookie, StreamType.STDOUT, sink),
+ new RpcOutputStream(command.id, responseCookie, StreamType.STDERR, sink));
exitCode =
commandExecutor.exec(
diff --git a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java
index a8e67acce3..7f83d477da 100644
--- a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java
@@ -15,10 +15,16 @@ package com.google.devtools.build.lib.server;
import static com.google.common.truth.Truth.assertThat;
import static junit.framework.TestCase.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import com.google.common.base.Strings;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.server.CommandProtos.RunResponse;
+import com.google.devtools.build.lib.server.GrpcServerImpl.StreamType;
import com.google.devtools.build.lib.testutil.Suite;
import com.google.devtools.build.lib.testutil.TestSpec;
import com.google.devtools.build.lib.testutil.TestThread;
@@ -27,6 +33,7 @@ import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -37,6 +44,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.InOrder;
/**
* Unit tests for the gRPC server.
@@ -260,4 +268,39 @@ public class GrpcServerTest {
sender.joinAndAssertState(1000);
observer.waitForMessages(2, 100, TimeUnit.MILLISECONDS);
}
+
+ @Test
+ public void testRpcOutputStreamChunksLargeResponses() throws Exception {
+ GrpcServerImpl.GrpcSink mockSink = mock(GrpcServerImpl.GrpcSink.class);
+ @SuppressWarnings("resource")
+ GrpcServerImpl.RpcOutputStream underTest = new GrpcServerImpl.RpcOutputStream(
+ "command_id", "cookie", StreamType.STDOUT, mockSink);
+
+ when(mockSink.offer(any(RunResponse.class))).thenReturn(true);
+
+ String chunk1 = Strings.repeat("a", 8192);
+ String chunk2 = Strings.repeat("b", 8192);
+ String chunk3 = Strings.repeat("c", 1024);
+
+ underTest.write((chunk1 + chunk2 + chunk3).getBytes(StandardCharsets.ISO_8859_1));
+ InOrder inOrder = inOrder(mockSink);
+ inOrder.verify(mockSink).offer(
+ RunResponse.newBuilder()
+ .setCommandId("command_id")
+ .setCookie("cookie")
+ .setStandardOutput(ByteString.copyFrom(chunk1.getBytes(StandardCharsets.ISO_8859_1)))
+ .build());
+ inOrder.verify(mockSink).offer(
+ RunResponse.newBuilder()
+ .setCommandId("command_id")
+ .setCookie("cookie")
+ .setStandardOutput(ByteString.copyFrom(chunk2.getBytes(StandardCharsets.ISO_8859_1)))
+ .build());
+ inOrder.verify(mockSink).offer(
+ RunResponse.newBuilder()
+ .setCommandId("command_id")
+ .setCookie("cookie")
+ .setStandardOutput(ByteString.copyFrom(chunk3.getBytes(StandardCharsets.ISO_8859_1)))
+ .build());
+ }
}