diff options
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java | 66 | ||||
-rw-r--r-- | src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java | 43 |
2 files changed, 80 insertions, 29 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index ba7868c298..42e57370f4 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -145,7 +145,8 @@ public class GrpcServerImpl extends RPCServer { } } - private enum StreamType { + @VisibleForTesting + enum StreamType { STDOUT, STDERR, } @@ -350,43 +351,50 @@ public class GrpcServerImpl extends RPCServer { } // TODO(lberki): Maybe we should implement line buffering? - private class RpcOutputStream extends OutputStream { + @VisibleForTesting + static class RpcOutputStream extends OutputStream { + private static final int CHUNK_SIZE = 8192; + private final String commandId; + private final String responseCookie; private final StreamType type; private final GrpcSink sink; - private RpcOutputStream(String commandId, StreamType type, GrpcSink sink) { + RpcOutputStream(String commandId, String responseCookie, StreamType type, GrpcSink sink) { this.commandId = commandId; + this.responseCookie = responseCookie; this.type = type; this.sink = sink; } @Override - public void write(byte[] b, int off, int inlen) throws IOException { - ByteString input = ByteString.copyFrom(b, off, inlen); - RunResponse.Builder response = RunResponse - .newBuilder() - .setCookie(responseCookie) - .setCommandId(commandId); - - switch (type) { - case STDOUT: response.setStandardOutput(input); break; - case STDERR: response.setStandardError(input); break; - default: throw new IllegalStateException(); - } + public synchronized void write(byte[] b, int off, int inlen) throws IOException { + for (int i = 0; i < inlen; i += CHUNK_SIZE) { + ByteString input = ByteString.copyFrom(b, off + i, Math.min(CHUNK_SIZE, inlen - i)); + RunResponse.Builder response = RunResponse + .newBuilder() + .setCookie(responseCookie) + .setCommandId(commandId); - // Send the chunk to the streamer thread. May block. - if (!sink.offer(response.build())) { - // Client disconnected. Terminate the current command as soon as possible. Note that - // throwing IOException is not enough because we are in the habit of swallowing it. Note - // that when gRPC notifies us about the disconnection (see the call to setOnCancelHandler) - // we interrupt the command thread, which should be enough to make the server come around as - // soon as possible. - log.info( - String.format( - "Client disconnected received for command %s on thread %s", - commandId, Thread.currentThread().getName())); - throw new IOException("Client disconnected"); + switch (type) { + case STDOUT: response.setStandardOutput(input); break; + case STDERR: response.setStandardError(input); break; + default: throw new IllegalStateException(); + } + + // Send the chunk to the streamer thread. May block. + if (!sink.offer(response.build())) { + // Client disconnected. Terminate the current command as soon as possible. Note that + // throwing IOException is not enough because we are in the habit of swallowing it. Note + // that when gRPC notifies us about the disconnection (see the call to setOnCancelHandler) + // we interrupt the command thread, which should be enough to make the server come around + // as soon as possible. + log.info( + String.format( + "Client disconnected received for command %s on thread %s", + commandId, Thread.currentThread().getName())); + throw new IOException("Client disconnected"); + } } } @@ -640,8 +648,8 @@ public class GrpcServerImpl extends RPCServer { OutErr rpcOutErr = OutErr.create( - new RpcOutputStream(command.id, StreamType.STDOUT, sink), - new RpcOutputStream(command.id, StreamType.STDERR, sink)); + new RpcOutputStream(command.id, responseCookie, StreamType.STDOUT, sink), + new RpcOutputStream(command.id, responseCookie, StreamType.STDERR, sink)); exitCode = commandExecutor.exec( diff --git a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java index a8e67acce3..7f83d477da 100644 --- a/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java +++ b/src/test/java/com/google/devtools/build/lib/server/GrpcServerTest.java @@ -15,10 +15,16 @@ package com.google.devtools.build.lib.server; import static com.google.common.truth.Truth.assertThat; import static junit.framework.TestCase.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import com.google.common.base.Strings; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.Uninterruptibles; import com.google.devtools.build.lib.server.CommandProtos.RunResponse; +import com.google.devtools.build.lib.server.GrpcServerImpl.StreamType; import com.google.devtools.build.lib.testutil.Suite; import com.google.devtools.build.lib.testutil.TestSpec; import com.google.devtools.build.lib.testutil.TestThread; @@ -27,6 +33,7 @@ import com.google.protobuf.ByteString; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.ServerCallStreamObserver; +import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -37,6 +44,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.InOrder; /** * Unit tests for the gRPC server. @@ -260,4 +268,39 @@ public class GrpcServerTest { sender.joinAndAssertState(1000); observer.waitForMessages(2, 100, TimeUnit.MILLISECONDS); } + + @Test + public void testRpcOutputStreamChunksLargeResponses() throws Exception { + GrpcServerImpl.GrpcSink mockSink = mock(GrpcServerImpl.GrpcSink.class); + @SuppressWarnings("resource") + GrpcServerImpl.RpcOutputStream underTest = new GrpcServerImpl.RpcOutputStream( + "command_id", "cookie", StreamType.STDOUT, mockSink); + + when(mockSink.offer(any(RunResponse.class))).thenReturn(true); + + String chunk1 = Strings.repeat("a", 8192); + String chunk2 = Strings.repeat("b", 8192); + String chunk3 = Strings.repeat("c", 1024); + + underTest.write((chunk1 + chunk2 + chunk3).getBytes(StandardCharsets.ISO_8859_1)); + InOrder inOrder = inOrder(mockSink); + inOrder.verify(mockSink).offer( + RunResponse.newBuilder() + .setCommandId("command_id") + .setCookie("cookie") + .setStandardOutput(ByteString.copyFrom(chunk1.getBytes(StandardCharsets.ISO_8859_1))) + .build()); + inOrder.verify(mockSink).offer( + RunResponse.newBuilder() + .setCommandId("command_id") + .setCookie("cookie") + .setStandardOutput(ByteString.copyFrom(chunk2.getBytes(StandardCharsets.ISO_8859_1))) + .build()); + inOrder.verify(mockSink).offer( + RunResponse.newBuilder() + .setCommandId("command_id") + .setCookie("cookie") + .setStandardOutput(ByteString.copyFrom(chunk3.getBytes(StandardCharsets.ISO_8859_1))) + .build()); + } } |