aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/runtime/BlazeCommandDispatcher.java19
-rw-r--r--src/main/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStream.java88
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java20
-rw-r--r--src/test/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStreamTest.java78
4 files changed, 200 insertions, 5 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeCommandDispatcher.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeCommandDispatcher.java
index 2e363c382a..2cb98293d2 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeCommandDispatcher.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeCommandDispatcher.java
@@ -419,6 +419,7 @@ public class BlazeCommandDispatcher {
BlazeCommandEventHandler.Options eventHandlerOptions =
optionsParser.getOptions(BlazeCommandEventHandler.Options.class);
OutErr colorfulOutErr = outErr;
+
if (!eventHandlerOptions.useColor()) {
outErr = ansiStripOut(ansiStripErr(outErr));
if (!commandAnnotation.binaryStdOut()) {
@@ -429,6 +430,14 @@ public class BlazeCommandDispatcher {
}
}
+ if (!commandAnnotation.binaryStdOut()) {
+ outErr = lineBufferOut(outErr);
+ }
+
+ if (!commandAnnotation.binaryStdErr()) {
+ outErr = lineBufferErr(outErr);
+ }
+
CommonCommandOptions commonOptions = optionsParser.getOptions(CommonCommandOptions.class);
BlazeRuntime.setupLogging(commonOptions.verbosity);
@@ -627,6 +636,16 @@ public class BlazeCommandDispatcher {
accumulator.add(commandAnnotation.name());
}
+ private OutErr lineBufferOut(OutErr outErr) {
+ OutputStream wrappedOut = new LineBufferedOutputStream(outErr.getOutputStream());
+ return OutErr.create(wrappedOut, outErr.getErrorStream());
+ }
+
+ private OutErr lineBufferErr(OutErr outErr) {
+ OutputStream wrappedErr = new LineBufferedOutputStream(outErr.getErrorStream());
+ return OutErr.create(outErr.getOutputStream(), wrappedErr);
+ }
+
private OutErr ansiStripOut(OutErr outErr) {
OutputStream wrappedOut = new AnsiStrippingOutputStream(outErr.getOutputStream());
return OutErr.create(wrappedOut, outErr.getErrorStream());
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStream.java b/src/main/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStream.java
new file mode 100644
index 0000000000..3d0c44f93c
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStream.java
@@ -0,0 +1,88 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.runtime;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A decorator output stream that does line buffering.
+ */
+public class LineBufferedOutputStream extends OutputStream {
+ private static final int DEFAULT_BUFFER_SIZE = 1024;
+
+ private final OutputStream wrapped;
+ private final byte[] buffer;
+ private int pos;
+
+ public LineBufferedOutputStream(OutputStream wrapped) {
+ this(wrapped, DEFAULT_BUFFER_SIZE);
+ }
+
+ public LineBufferedOutputStream(OutputStream wrapped, int bufferSize) {
+ this.wrapped = wrapped;
+ this.buffer = new byte[bufferSize];
+ this.pos = 0;
+ }
+
+ @Override
+ public synchronized void write(byte[] b, int off, int inlen) throws IOException {
+ if (inlen > buffer.length * 2) {
+ // Do not buffer large writes
+ if (pos > 0) {
+ wrapped.write(buffer, 0, pos);
+ pos = 0;
+ }
+ wrapped.write(b, off, inlen);
+ return;
+ }
+
+ int next = off;
+ while (next < off + inlen) {
+ buffer[pos++] = b[next];
+ if (b[next] == '\n') {
+ wrapped.write(buffer, 0, pos);
+ pos = 0;
+ }
+
+ if (pos == buffer.length) {
+ wrapped.write(buffer, 0, pos);
+ pos = 0;
+ }
+
+ next++;
+ }
+ }
+
+ @Override
+ public void write(int byteAsInt) throws IOException {
+ byte b = (byte) byteAsInt; // make sure we work with bytes in comparisons
+ write(new byte[] {b}, 0, 1);
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ if (pos != 0) {
+ wrapped.write(buffer, 0, pos);
+ pos = 0;
+ }
+ wrapped.flush();
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ flush();
+ wrapped.close();
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index 42e57370f4..a3678c72c9 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -350,7 +350,18 @@ public class GrpcServerImpl extends RPCServer {
}
}
- // TODO(lberki): Maybe we should implement line buffering?
+ /**
+ * An output stream that forwards the data written to it over the gRPC command stream.
+ *
+ * <p>Note that wraping this class with a {@code Channel} can cause a deadlock if there is an
+ * {@link OutputStream} in between that synchronizes both on {@code #close()} and {@code #write()}
+ * because then if an interrupt happens in {@link GrpcSink#exchange(SinkThreadItem, boolean)},
+ * the thread on which {@code interrupt()} was called will wait until the {@code Channel} closes
+ * itself while holding a lock for interrupting the thread on which {@code #exchange()} is
+ * being executed and that thread will hold a lock that is needed for the {@code Channel} to be
+ * closed and call {@code interrupt()} in {@code #exchange()}, which will in turn try to acquire
+ * the interrupt lock.
+ */
@VisibleForTesting
static class RpcOutputStream extends OutputStream {
private static final int CHUNK_SIZE = 8192;
@@ -646,10 +657,9 @@ public class GrpcServerImpl extends RPCServer {
"The client cancelled the command before receiving the command id: " + e.getMessage());
}
- OutErr rpcOutErr =
- OutErr.create(
- new RpcOutputStream(command.id, responseCookie, StreamType.STDOUT, sink),
- new RpcOutputStream(command.id, responseCookie, StreamType.STDERR, sink));
+ OutErr rpcOutErr = OutErr.create(
+ new RpcOutputStream(command.id, responseCookie, StreamType.STDOUT, sink),
+ new RpcOutputStream(command.id, responseCookie, StreamType.STDERR, sink));
exitCode =
commandExecutor.exec(
diff --git a/src/test/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStreamTest.java b/src/test/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStreamTest.java
new file mode 100644
index 0000000000..bc136b41fb
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/runtime/LineBufferedOutputStreamTest.java
@@ -0,0 +1,78 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.runtime;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.base.Strings;
+import com.google.devtools.build.lib.testutil.Suite;
+import com.google.devtools.build.lib.testutil.TestSpec;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link LineBufferedOutputStream} .
+ */
+@TestSpec(size = Suite.SMALL_TESTS)
+@RunWith(JUnit4.class)
+public class LineBufferedOutputStreamTest {
+ private static class MockOutputStream extends OutputStream {
+ private final List<String> writes = new ArrayList<>();
+
+ @Override
+ public void write(int byteAsInt) throws IOException {
+ byte b = (byte) byteAsInt; // make sure we work with bytes in comparisons
+ write(new byte[] {b}, 0, 1);
+ }
+
+ @Override
+ public synchronized void write(byte[] b, int off, int inlen) throws IOException {
+ writes.add(new String(b, off, inlen, StandardCharsets.UTF_8));
+ }
+ }
+
+ private List<String> lineBuffer(String... inputs) throws Exception {
+ MockOutputStream mockOutputStream = new MockOutputStream();
+ try (LineBufferedOutputStream cut = new LineBufferedOutputStream(mockOutputStream, 6)) {
+ for (String input : inputs) {
+ cut.write(input.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ return mockOutputStream.writes;
+ }
+
+ @Test
+ public void testLineBuffering() throws Exception {
+ String large = Strings.repeat("a", 100);
+
+ assertThat(lineBuffer("foo\nbar")).containsExactly("foo\n", "bar");
+ assertThat(lineBuffer("foobarfoobar")).containsExactly("foobar", "foobar");
+ assertThat(lineBuffer("fivey\none\n")).containsExactly("fivey\n", "one\n");
+ assertThat(lineBuffer("sixish\none\n")).containsExactly("sixish", "\n", "one\n");
+ assertThat(lineBuffer("s")).containsExactly("s");
+ assertThat(lineBuffer("\n\n\n\n")).containsExactly("\n", "\n", "\n", "\n");
+ assertThat(lineBuffer("foo\n\nbar\n")).containsExactly("foo\n", "\n", "bar\n");
+
+ assertThat(lineBuffer("a", "a", large, large, "a")).containsExactly(
+ "aa", large, large, "a");
+
+ }
+}