aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/buildeventstream
diff options
context:
space:
mode:
authorGravatar buchgr <buchgr@google.com>2018-07-06 04:01:37 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-07-06 04:02:46 -0700
commit1d4c707e3e03ab21f04783e99db9ee9115ba4fb2 (patch)
tree53650d18efb5da46afad270c0b7a6af97a2ae03a /src/main/java/com/google/devtools/build/lib/buildeventstream
parentbc898cabe7cece0cf868447392f6863cb134d85c (diff)
bep: guarantee event ordering in file transports.
Commit d3f7f7ae introduced a bug in the FileTransport implementations where events might not be written to file in the correct order. This change fixes this. Fixes #5515 RELNOTES: None PiperOrigin-RevId: 203457636
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventstream')
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java32
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java186
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java41
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java23
4 files changed, 165 insertions, 117 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java
index 72c800e24c..5cba921d92 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java
@@ -14,16 +14,14 @@
package com.google.devtools.build.lib.buildeventstream.transports;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
import com.google.devtools.build.lib.util.AbruptExitException;
+import com.google.protobuf.CodedOutputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.function.Consumer;
@@ -32,6 +30,7 @@ import java.util.function.Consumer;
* {@link BuildEvent} protocol buffers to a file.
*/
public final class BinaryFormatFileTransport extends FileTransport {
+
BinaryFormatFileTransport(
String path,
BuildEventProtocolOptions options,
@@ -47,19 +46,16 @@ public final class BinaryFormatFileTransport extends FileTransport {
}
@Override
- public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) {
- Futures.addCallback(asStreamProto(event, namer),
- new FutureCallback<BuildEventStreamProtos.BuildEvent>() {
- @Override
- public void onSuccess(BuildEventStreamProtos.BuildEvent protoEvent) {
- write(protoEvent);
- }
-
- @Override
- public void onFailure(Throwable t) {
- // Intentionally left empty. The error handling happens in
- // FileTransport.
- }
- }, MoreExecutors.directExecutor());
+ protected byte[] serializeEvent(BuildEventStreamProtos.BuildEvent buildEvent) {
+ final int size = buildEvent.getSerializedSize();
+ ByteArrayOutputStream bos =
+ new ByteArrayOutputStream(CodedOutputStream.computeUInt32SizeNoTag(size) + size);
+ try {
+ buildEvent.writeDelimitedTo(bos);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Unexpected error serializing protobuf to in memory outputstream.", e);
+ }
+ return bos.toByteArray();
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index cce4f2d213..186a6a5f0c 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -15,6 +15,7 @@
package com.google.devtools.build.lib.buildeventstream.transports;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
@@ -24,6 +25,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile;
@@ -37,12 +39,19 @@ import com.google.devtools.build.lib.util.AbruptExitException;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.protobuf.Message;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.annotation.concurrent.ThreadSafe;
/**
* Non-blocking file transport.
@@ -57,55 +66,131 @@ abstract class FileTransport implements BuildEventTransport {
private final BuildEventProtocolOptions options;
private final BuildEventArtifactUploader uploader;
private final Consumer<AbruptExitException> exitFunc;
-
- @VisibleForTesting
- final AsynchronousFileOutputStream out;
+ @VisibleForTesting final SequentialWriter writer;
FileTransport(
String path,
BuildEventProtocolOptions options,
BuildEventArtifactUploader uploader,
- Consumer<AbruptExitException> exitFunc)
- throws IOException {
+ Consumer<AbruptExitException> exitFunc) {
this.uploader = uploader;
this.options = options;
this.exitFunc = exitFunc;
- out = new AsynchronousFileOutputStream(path);
+ this.writer = new SequentialWriter(path, this::serializeEvent, exitFunc);
}
- // Silent wrappers to AsynchronousFileOutputStream methods.
+ @ThreadSafe
+ @VisibleForTesting
+ static final class SequentialWriter implements Runnable {
+ private static final Logger logger = Logger.getLogger(SequentialWriter.class.getName());
+ private static final ListenableFuture<BuildEventStreamProtos.BuildEvent> CLOSE =
+ Futures.immediateCancelledFuture();
+
+ private final Thread writerThread;
+ @VisibleForTesting OutputStream out;
+ private final Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc;
+ private final Consumer<AbruptExitException> exitFunc;
+
+ @VisibleForTesting
+ final BlockingQueue<ListenableFuture<BuildEventStreamProtos.BuildEvent>> pendingWrites =
+ new LinkedBlockingDeque<>();
+
+ private final SettableFuture<Void> closeFuture = SettableFuture.create();
- protected void write(Message m) {
- try {
- out.write(m);
- } catch (Exception e) {
- logger.log(Level.SEVERE, e.getMessage(), e);
+ SequentialWriter(
+ String path,
+ Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc,
+ Consumer<AbruptExitException> exitFunc) {
+ try {
+ this.out = new BufferedOutputStream(new FileOutputStream(path));
+ } catch (FileNotFoundException e) {
+ this.out = new ByteArrayOutputStream(0);
+ closeNow();
+ exitFunc.accept(
+ new AbruptExitException(
+ format("Couldn't open BEP file '%s' for writing.", path),
+ ExitCode.PUBLISH_ERROR,
+ e));
+ }
+ this.writerThread = new Thread(this);
+ this.serializeFunc = serializeFunc;
+ this.exitFunc = exitFunc;
+ writerThread.start();
+ }
+
+ @Override
+ public void run() {
+ ListenableFuture<BuildEventStreamProtos.BuildEvent> buildEventF;
+ try {
+ while ((buildEventF = pendingWrites.take()) != CLOSE) {
+ BuildEventStreamProtos.BuildEvent buildEvent = buildEventF.get();
+ byte[] serialized = serializeFunc.apply(buildEvent);
+ out.write(serialized);
+ }
+ } catch (Exception e) {
+ exitFunc.accept(
+ new AbruptExitException(
+ "Failed to write BEP events to file.", ExitCode.PUBLISH_ERROR, e));
+ pendingWrites.clear();
+ logger.log(Level.SEVERE, "Failed to write BEP events to file.", e);
+ } finally {
+ try {
+ out.flush();
+ out.close();
+ } catch (IOException e) {
+ logger.log(Level.SEVERE, "Failed to close BEP file output stream.", e);
+ }
+ closeFuture.set(null);
+ }
+ }
+
+ public void closeNow() {
+ if (closeFuture.isDone()) {
+ return;
+ }
+ try {
+ pendingWrites.clear();
+ pendingWrites.put(CLOSE);
+ } catch (InterruptedException e) {
+ logger.log(Level.SEVERE, "Failed to immediately close the sequential writer.", e);
+ }
+ }
+
+ public ListenableFuture<Void> close() {
+ if (closeFuture.isDone()) {
+ return closeFuture;
+ }
+ try {
+ pendingWrites.put(CLOSE);
+ } catch (InterruptedException e) {
+ closeNow();
+ logger.log(Level.SEVERE, "Failed to close the sequential writer.", e);
+ closeFuture.set(null);
+ }
+ return closeFuture;
}
}
- protected void write(String s) {
- try {
- out.write(s);
- } catch (Exception e) {
- logger.log(Level.SEVERE, e.getMessage(), e);
+ @Override
+ public void sendBuildEvent(BuildEvent event, ArtifactGroupNamer namer) {
+ if (writer.closeFuture.isDone()) {
+ return;
+ }
+ if (!writer.pendingWrites.add(asStreamProto(event, namer))) {
+ logger.log(Level.SEVERE, "Failed to add BEP event to the write queue");
}
}
+ protected abstract byte[] serializeEvent(BuildEventStreamProtos.BuildEvent buildEvent);
+
@Override
- public synchronized ListenableFuture<Void> close() {
- return Futures.catching(
- out.closeAsync(),
- Throwable.class,
- (t) -> {
- logger.log(Level.SEVERE, t.getMessage(), t);
- return null;
- },
- MoreExecutors.directExecutor());
+ public ListenableFuture<Void> close() {
+ return writer.close();
}
@Override
- public void closeNow() {
- out.closeNow();
+ public synchronized void closeNow() {
+ writer.closeNow();
}
/**
@@ -113,34 +198,31 @@ abstract class FileTransport implements BuildEventTransport {
* a side effect. May return {@code null} if there was an interrupt. This method is not
* thread-safe.
*/
- protected ListenableFuture<BuildEventStreamProtos.BuildEvent> asStreamProto(
+ private ListenableFuture<BuildEventStreamProtos.BuildEvent> asStreamProto(
BuildEvent event, ArtifactGroupNamer namer) {
checkNotNull(event);
return Futures.transform(
uploadReferencedFiles(event.referencedLocalFiles()),
- new Function<PathConverter, BuildEventStreamProtos.BuildEvent>() {
- @Override
- public BuildEventStreamProtos.BuildEvent apply(PathConverter pathConverter) {
- BuildEventContext context =
- new BuildEventContext() {
- @Override
- public PathConverter pathConverter() {
- return pathConverter;
- }
-
- @Override
- public ArtifactGroupNamer artifactGroupNamer() {
- return namer;
- }
-
- @Override
- public BuildEventProtocolOptions getOptions() {
- return options;
- }
- };
- return event.asStreamProto(context);
- }
+ pathConverter -> {
+ BuildEventContext context =
+ new BuildEventContext() {
+ @Override
+ public PathConverter pathConverter() {
+ return pathConverter;
+ }
+
+ @Override
+ public ArtifactGroupNamer artifactGroupNamer() {
+ return namer;
+ }
+
+ @Override
+ public BuildEventProtocolOptions getOptions() {
+ return options;
+ }
+ };
+ return event.asStreamProto(context);
},
MoreExecutors.directExecutor());
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java
index 3b67d6c96e..d888036f74 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java
@@ -14,11 +14,7 @@
package com.google.devtools.build.lib.buildeventstream.transports;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
-import com.google.devtools.build.lib.buildeventstream.BuildEvent;
+import com.google.common.base.Charsets;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
@@ -49,28 +45,17 @@ public final class JsonFormatFileTransport extends FileTransport {
}
@Override
- public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) {
- Futures.addCallback(asStreamProto(event, namer),
- new FutureCallback<BuildEventStreamProtos.BuildEvent>() {
- @Override
- public void onSuccess(BuildEventStreamProtos.BuildEvent protoEvent) {
- String protoJsonRepresentation;
- try {
- protoJsonRepresentation =
- JsonFormat.printer().omittingInsignificantWhitespace().print(protoEvent) + "\n";
- } catch (InvalidProtocolBufferException e) {
- // We don't expect any unknown Any fields in our protocol buffer. Nevertheless, handle
- // the exception gracefully and, at least, return valid JSON with an id field.
- protoJsonRepresentation =
- "{\"id\" : \"unknown\", \"exception\" : \"InvalidProtocolBufferException\"}\n";
- }
- write(protoJsonRepresentation);
- }
-
- @Override
- public void onFailure(Throwable t) {
- // Intentionally left empty. The error handling happens in FileTransport.
- }
- }, MoreExecutors.directExecutor());
+ protected byte[] serializeEvent(BuildEventStreamProtos.BuildEvent buildEvent) {
+ String protoJsonRepresentation;
+ try {
+ protoJsonRepresentation =
+ JsonFormat.printer().omittingInsignificantWhitespace().print(buildEvent) + "\n";
+ } catch (InvalidProtocolBufferException e) {
+ // We don't expect any unknown Any fields in our protocol buffer. Nevertheless, handle
+ // the exception gracefully and, at least, return valid JSON with an id field.
+ protoJsonRepresentation =
+ "{\"id\" : \"unknown\", \"exception\" : \"InvalidProtocolBufferException\"}\n";
+ }
+ return protoJsonRepresentation.getBytes(Charsets.UTF_8);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java
index fcdf545e66..ff439b1e5a 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java
@@ -14,11 +14,7 @@
package com.google.devtools.build.lib.buildeventstream.transports;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
-import com.google.devtools.build.lib.buildeventstream.BuildEvent;
+import com.google.common.base.Charsets;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
@@ -50,19 +46,8 @@ public final class TextFormatFileTransport extends FileTransport {
}
@Override
- public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) {
- Futures.addCallback(asStreamProto(event, namer),
- new FutureCallback<BuildEventStreamProtos.BuildEvent>() {
- @Override
- public void onSuccess(BuildEventStreamProtos.BuildEvent protoEvent) {
- String protoTextRepresentation = TextFormat.printToString(protoEvent);
- write("event {\n" + protoTextRepresentation + "}\n\n");
- }
-
- @Override
- public void onFailure(Throwable t) {
- // Intentionally left empty. The error handling happens in FileTransport.
- }
- }, MoreExecutors.directExecutor());
+ protected byte[] serializeEvent(BuildEventStreamProtos.BuildEvent buildEvent) {
+ String protoTextRepresentation = TextFormat.printToString(buildEvent);
+ return ("event {\n" + protoTextRepresentation + "}\n\n").getBytes(Charsets.UTF_8);
}
}