aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Googler <noreply@google.com>2018-07-16 12:55:34 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-07-16 12:57:50 -0700
commitde3d8bf821dba97471ab4ccfc1f1b1559f0a1cac (patch)
tree4e9541ae787d0eab36831b9a19dbf0f8520ceada
parent48821a723af41b1561653178e547c7fa86a2a4a6 (diff)
Ensure BEP file output stream is flushed promptly.
Tulsi uses BEP json output in its UI, to simulate Bazel terminal outptut. This means we have to promptly flush the stream. It's sufficient to do flushing at the granularity of whole events, not any specific count of bytes, since whole events are what's being consumed by the user. To balance IO throughput and interactivity, let's flush at a regular sub-second interval. (The alternative solution of using a stream with smaller buffer could still end up with small-sized event descriptions buffered arbitrarily long; and abandoning buffering altogether would be suboptimal for throughput when writing a long sequence of small build events.) RELNOTES: None. PiperOrigin-RevId: 204790794
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java22
-rw-r--r--src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java76
2 files changed, 94 insertions, 4 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index de29f4b7d4..e8391a37d9 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -45,9 +45,12 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.time.Duration;
+import java.time.Instant;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -88,6 +91,7 @@ abstract class FileTransport implements BuildEventTransport {
private final Thread writerThread;
@VisibleForTesting OutputStream out;
+ @VisibleForTesting static final Duration FLUSH_INTERVAL = Duration.ofMillis(250);
private final Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc;
private final Consumer<AbruptExitException> exitFunc;
private final BuildEventArtifactUploader uploader;
@@ -125,10 +129,20 @@ abstract class FileTransport implements BuildEventTransport {
public void run() {
ListenableFuture<BuildEventStreamProtos.BuildEvent> buildEventF;
try {
- while ((buildEventF = pendingWrites.take()) != CLOSE) {
- BuildEventStreamProtos.BuildEvent buildEvent = buildEventF.get();
- byte[] serialized = serializeFunc.apply(buildEvent);
- out.write(serialized);
+ Instant prevFlush = Instant.now();
+ while ((buildEventF = pendingWrites.poll(FLUSH_INTERVAL.toMillis(), TimeUnit.MILLISECONDS))
+ != CLOSE) {
+ if (buildEventF != null) {
+ BuildEventStreamProtos.BuildEvent buildEvent = buildEventF.get();
+ byte[] serialized = serializeFunc.apply(buildEvent);
+ out.write(serialized);
+ }
+ Instant now = Instant.now();
+ if (buildEventF == null || now.compareTo(prevFlush.plus(FLUSH_INTERVAL)) > 0) {
+ // Some users, e.g. Tulsi, expect prompt BEP stream flushes for interactive use.
+ out.flush();
+ prevFlush = now;
+ }
}
} catch (Exception e) {
exitFunc.accept(
diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java
index 83ce626c22..a49e42e90e 100644
--- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java
+++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java
@@ -29,7 +29,9 @@ import com.google.devtools.common.options.Options;
import com.google.protobuf.util.JsonFormat;
import java.io.File;
import java.io.FileInputStream;
+import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.io.Reader;
import org.junit.After;
import org.junit.Before;
@@ -89,4 +91,78 @@ public class JsonFormatFileTransportTest {
assertThat(builder.build()).isEqualTo(started);
}
}
+
+ /**
+ * A thin wrapper around an OutputStream that counts number of bytes written and verifies flushes.
+ */
+ private static final class WrappedOutputStream extends OutputStream {
+ private final OutputStream out;
+ private long byteCount;
+ private int flushCount;
+
+ public WrappedOutputStream(OutputStream out) {
+ this.out = out;
+ }
+
+ public long getByteCount() {
+ return byteCount;
+ }
+
+ public int getFlushCount() {
+ return flushCount;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ byteCount++;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ out.write(b);
+ byteCount += b.length;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ byteCount += len;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ flushCount++;
+ }
+ }
+
+ @Test
+ public void testFlushesStreamAfterSmallWrites() throws Exception {
+ File output = tmp.newFile();
+
+ BuildEventStreamProtos.BuildEvent started =
+ BuildEventStreamProtos.BuildEvent.newBuilder()
+ .setStarted(BuildStarted.newBuilder().setCommand("build"))
+ .build();
+ when(buildEvent.asStreamProto(Matchers.<BuildEventContext>any())).thenReturn(started);
+ JsonFormatFileTransport transport =
+ new JsonFormatFileTransport(
+ output.getAbsolutePath(), defaultOpts, LOCAL_FILES_UPLOADER, (e) -> {});
+ WrappedOutputStream out = new WrappedOutputStream(transport.writer.out);
+ transport.writer.out = out;
+ transport.sendBuildEvent(buildEvent, artifactGroupNamer);
+ Thread.sleep(FileTransport.SequentialWriter.FLUSH_INTERVAL.toMillis() * 3);
+
+ // Some users, e.g. Tulsi, use JSON build event output for interactive use and expect the stream
+ // to be flushed at regular short intervals.
+ assertThat(out.getFlushCount()).isGreaterThan(0);
+
+ // We know that large writes get flushed; test is valuable only if we check small writes,
+ // meaning smaller than 8192, the default buffer size used by BufferedOutputStream.
+ assertThat(out.getByteCount()).isLessThan(8192L);
+ assertThat(out.getByteCount()).isGreaterThan(0L);
+
+ transport.close().get();
+ }
}