aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com')
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java22
1 files changed, 18 insertions, 4 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index de29f4b7d4..e8391a37d9 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -45,9 +45,12 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.time.Duration;
+import java.time.Instant;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -88,6 +91,7 @@ abstract class FileTransport implements BuildEventTransport {
private final Thread writerThread;
@VisibleForTesting OutputStream out;
+ @VisibleForTesting static final Duration FLUSH_INTERVAL = Duration.ofMillis(250);
private final Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc;
private final Consumer<AbruptExitException> exitFunc;
private final BuildEventArtifactUploader uploader;
@@ -125,10 +129,20 @@ abstract class FileTransport implements BuildEventTransport {
public void run() {
ListenableFuture<BuildEventStreamProtos.BuildEvent> buildEventF;
try {
- while ((buildEventF = pendingWrites.take()) != CLOSE) {
- BuildEventStreamProtos.BuildEvent buildEvent = buildEventF.get();
- byte[] serialized = serializeFunc.apply(buildEvent);
- out.write(serialized);
+ Instant prevFlush = Instant.now();
+ while ((buildEventF = pendingWrites.poll(FLUSH_INTERVAL.toMillis(), TimeUnit.MILLISECONDS))
+ != CLOSE) {
+ if (buildEventF != null) {
+ BuildEventStreamProtos.BuildEvent buildEvent = buildEventF.get();
+ byte[] serialized = serializeFunc.apply(buildEvent);
+ out.write(serialized);
+ }
+ Instant now = Instant.now();
+ if (buildEventF == null || now.compareTo(prevFlush.plus(FLUSH_INTERVAL)) > 0) {
+ // Some users, e.g. Tulsi, expect prompt BEP stream flushes for interactive use.
+ out.flush();
+ prevFlush = now;
+ }
}
} catch (Exception e) {
exitFunc.accept(