diff options
4 files changed, 200 insertions, 5 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeCommandDispatcher.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeCommandDispatcher.java index 2e363c382a..2cb98293d2 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeCommandDispatcher.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeCommandDispatcher.java @@ -419,6 +419,7 @@ public class BlazeCommandDispatcher { BlazeCommandEventHandler.Options eventHandlerOptions = optionsParser.getOptions(BlazeCommandEventHandler.Options.class); OutErr colorfulOutErr = outErr; + if (!eventHandlerOptions.useColor()) { outErr = ansiStripOut(ansiStripErr(outErr)); if (!commandAnnotation.binaryStdOut()) { @@ -429,6 +430,14 @@ public class BlazeCommandDispatcher { } } + if (!commandAnnotation.binaryStdOut()) { + outErr = lineBufferOut(outErr); + } + + if (!commandAnnotation.binaryStdErr()) { + outErr = lineBufferErr(outErr); + } + CommonCommandOptions commonOptions = optionsParser.getOptions(CommonCommandOptions.class); BlazeRuntime.setupLogging(commonOptions.verbosity); @@ -627,6 +636,16 @@ public class BlazeCommandDispatcher { accumulator.add(commandAnnotation.name()); } + private OutErr lineBufferOut(OutErr outErr) { + OutputStream wrappedOut = new LineBufferedOutputStream(outErr.getOutputStream()); + return OutErr.create(wrappedOut, outErr.getErrorStream()); + } + + private OutErr lineBufferErr(OutErr outErr) { + OutputStream wrappedErr = new LineBufferedOutputStream(outErr.getErrorStream()); + return OutErr.create(outErr.getOutputStream(), wrappedErr); + } + private OutErr ansiStripOut(OutErr outErr) { OutputStream wrappedOut = new AnsiStrippingOutputStream(outErr.getOutputStream()); return OutErr.create(wrappedOut, outErr.getErrorStream()); diff --git a/src/main/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStream.java b/src/main/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStream.java new file mode 100644 index 0000000000..3d0c44f93c --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStream.java @@ -0,0 +1,88 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.runtime; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * A decorator output stream that does line buffering. + */ +public class LineBufferedOutputStream extends OutputStream { + private static final int DEFAULT_BUFFER_SIZE = 1024; + + private final OutputStream wrapped; + private final byte[] buffer; + private int pos; + + public LineBufferedOutputStream(OutputStream wrapped) { + this(wrapped, DEFAULT_BUFFER_SIZE); + } + + public LineBufferedOutputStream(OutputStream wrapped, int bufferSize) { + this.wrapped = wrapped; + this.buffer = new byte[bufferSize]; + this.pos = 0; + } + + @Override + public synchronized void write(byte[] b, int off, int inlen) throws IOException { + if (inlen > buffer.length * 2) { + // Do not buffer large writes + if (pos > 0) { + wrapped.write(buffer, 0, pos); + pos = 0; + } + wrapped.write(b, off, inlen); + return; + } + + int next = off; + while (next < off + inlen) { + buffer[pos++] = b[next]; + if (b[next] == '\n') { + wrapped.write(buffer, 0, pos); + pos = 0; + } + + if (pos == buffer.length) { + wrapped.write(buffer, 0, pos); + pos = 0; + } + + next++; + } + } + + @Override + public void write(int byteAsInt) throws IOException { + byte b = (byte) byteAsInt; // make sure we work with bytes in comparisons + write(new byte[] {b}, 0, 1); + } + + @Override + public synchronized void flush() throws IOException { + if (pos != 0) { + wrapped.write(buffer, 0, pos); + pos = 0; + } + wrapped.flush(); + } + + @Override + public synchronized void close() throws IOException { + flush(); + wrapped.close(); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index 42e57370f4..a3678c72c9 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -350,7 +350,18 @@ public class GrpcServerImpl extends RPCServer { } } - // TODO(lberki): Maybe we should implement line buffering? + /** + * An output stream that forwards the data written to it over the gRPC command stream. + * + * <p>Note that wraping this class with a {@code Channel} can cause a deadlock if there is an + * {@link OutputStream} in between that synchronizes both on {@code #close()} and {@code #write()} + * because then if an interrupt happens in {@link GrpcSink#exchange(SinkThreadItem, boolean)}, + * the thread on which {@code interrupt()} was called will wait until the {@code Channel} closes + * itself while holding a lock for interrupting the thread on which {@code #exchange()} is + * being executed and that thread will hold a lock that is needed for the {@code Channel} to be + * closed and call {@code interrupt()} in {@code #exchange()}, which will in turn try to acquire + * the interrupt lock. + */ @VisibleForTesting static class RpcOutputStream extends OutputStream { private static final int CHUNK_SIZE = 8192; @@ -646,10 +657,9 @@ public class GrpcServerImpl extends RPCServer { "The client cancelled the command before receiving the command id: " + e.getMessage()); } - OutErr rpcOutErr = - OutErr.create( - new RpcOutputStream(command.id, responseCookie, StreamType.STDOUT, sink), - new RpcOutputStream(command.id, responseCookie, StreamType.STDERR, sink)); + OutErr rpcOutErr = OutErr.create( + new RpcOutputStream(command.id, responseCookie, StreamType.STDOUT, sink), + new RpcOutputStream(command.id, responseCookie, StreamType.STDERR, sink)); exitCode = commandExecutor.exec( diff --git a/src/test/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStreamTest.java b/src/test/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStreamTest.java new file mode 100644 index 0000000000..bc136b41fb --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStreamTest.java @@ -0,0 +1,78 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.runtime; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.base.Strings; +import com.google.devtools.build.lib.testutil.Suite; +import com.google.devtools.build.lib.testutil.TestSpec; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link LineBufferedOutputStream} . + */ +@TestSpec(size = Suite.SMALL_TESTS) +@RunWith(JUnit4.class) +public class LineBufferedOutputStreamTest { + private static class MockOutputStream extends OutputStream { + private final List<String> writes = new ArrayList<>(); + + @Override + public void write(int byteAsInt) throws IOException { + byte b = (byte) byteAsInt; // make sure we work with bytes in comparisons + write(new byte[] {b}, 0, 1); + } + + @Override + public synchronized void write(byte[] b, int off, int inlen) throws IOException { + writes.add(new String(b, off, inlen, StandardCharsets.UTF_8)); + } + } + + private List<String> lineBuffer(String... inputs) throws Exception { + MockOutputStream mockOutputStream = new MockOutputStream(); + try (LineBufferedOutputStream cut = new LineBufferedOutputStream(mockOutputStream, 6)) { + for (String input : inputs) { + cut.write(input.getBytes(StandardCharsets.UTF_8)); + } + } + + return mockOutputStream.writes; + } + + @Test + public void testLineBuffering() throws Exception { + String large = Strings.repeat("a", 100); + + assertThat(lineBuffer("foo\nbar")).containsExactly("foo\n", "bar"); + assertThat(lineBuffer("foobarfoobar")).containsExactly("foobar", "foobar"); + assertThat(lineBuffer("fivey\none\n")).containsExactly("fivey\n", "one\n"); + assertThat(lineBuffer("sixish\none\n")).containsExactly("sixish", "\n", "one\n"); + assertThat(lineBuffer("s")).containsExactly("s"); + assertThat(lineBuffer("\n\n\n\n")).containsExactly("\n", "\n", "\n", "\n"); + assertThat(lineBuffer("foo\n\nbar\n")).containsExactly("foo\n", "\n", "bar\n"); + + assertThat(lineBuffer("a", "a", large, large, "a")).containsExactly( + "aa", large, large, "a"); + + } +} |