diff options
author | 2018-07-06 04:01:37 -0700 | |
---|---|---|
committer | 2018-07-06 04:02:46 -0700 | |
commit | 1d4c707e3e03ab21f04783e99db9ee9115ba4fb2 (patch) | |
tree | 53650d18efb5da46afad270c0b7a6af97a2ae03a /src/main/java/com/google/devtools/build/lib/buildeventstream | |
parent | bc898cabe7cece0cf868447392f6863cb134d85c (diff) |
bep: guarantee event ordering in file transports.
Commit d3f7f7ae introduced a bug in the FileTransport implementations
where events might not be written to file in the correct order. This
change fixes this.
Fixes #5515
RELNOTES: None
PiperOrigin-RevId: 203457636
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventstream')
4 files changed, 165 insertions, 117 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java index 72c800e24c..5cba921d92 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java @@ -14,16 +14,14 @@ package com.google.devtools.build.lib.buildeventstream.transports; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEvent; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions; import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos; import com.google.devtools.build.lib.buildeventstream.BuildEventTransport; import com.google.devtools.build.lib.util.AbruptExitException; +import com.google.protobuf.CodedOutputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.function.Consumer; @@ -32,6 +30,7 @@ import java.util.function.Consumer; * {@link BuildEvent} protocol buffers to a file. */ public final class BinaryFormatFileTransport extends FileTransport { + BinaryFormatFileTransport( String path, BuildEventProtocolOptions options, @@ -47,19 +46,16 @@ public final class BinaryFormatFileTransport extends FileTransport { } @Override - public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) { - Futures.addCallback(asStreamProto(event, namer), - new FutureCallback<BuildEventStreamProtos.BuildEvent>() { - @Override - public void onSuccess(BuildEventStreamProtos.BuildEvent protoEvent) { - write(protoEvent); - } - - @Override - public void onFailure(Throwable t) { - // Intentionally left empty. The error handling happens in - // FileTransport. - } - }, MoreExecutors.directExecutor()); + protected byte[] serializeEvent(BuildEventStreamProtos.BuildEvent buildEvent) { + final int size = buildEvent.getSerializedSize(); + ByteArrayOutputStream bos = + new ByteArrayOutputStream(CodedOutputStream.computeUInt32SizeNoTag(size) + size); + try { + buildEvent.writeDelimitedTo(bos); + } catch (IOException e) { + throw new RuntimeException( + "Unexpected error serializing protobuf to in memory outputstream.", e); + } + return bos.toByteArray(); } } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java index cce4f2d213..186a6a5f0c 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java @@ -15,6 +15,7 @@ package com.google.devtools.build.lib.buildeventstream.transports; import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.String.format; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; @@ -24,6 +25,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEvent; import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile; @@ -37,12 +39,19 @@ import com.google.devtools.build.lib.util.AbruptExitException; import com.google.devtools.build.lib.util.ExitCode; import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream; import com.google.devtools.build.lib.vfs.Path; -import com.google.protobuf.Message; +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.concurrent.ThreadSafe; /** * Non-blocking file transport. @@ -57,55 +66,131 @@ abstract class FileTransport implements BuildEventTransport { private final BuildEventProtocolOptions options; private final BuildEventArtifactUploader uploader; private final Consumer<AbruptExitException> exitFunc; - - @VisibleForTesting - final AsynchronousFileOutputStream out; + @VisibleForTesting final SequentialWriter writer; FileTransport( String path, BuildEventProtocolOptions options, BuildEventArtifactUploader uploader, - Consumer<AbruptExitException> exitFunc) - throws IOException { + Consumer<AbruptExitException> exitFunc) { this.uploader = uploader; this.options = options; this.exitFunc = exitFunc; - out = new AsynchronousFileOutputStream(path); + this.writer = new SequentialWriter(path, this::serializeEvent, exitFunc); } - // Silent wrappers to AsynchronousFileOutputStream methods. + @ThreadSafe + @VisibleForTesting + static final class SequentialWriter implements Runnable { + private static final Logger logger = Logger.getLogger(SequentialWriter.class.getName()); + private static final ListenableFuture<BuildEventStreamProtos.BuildEvent> CLOSE = + Futures.immediateCancelledFuture(); + + private final Thread writerThread; + @VisibleForTesting OutputStream out; + private final Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc; + private final Consumer<AbruptExitException> exitFunc; + + @VisibleForTesting + final BlockingQueue<ListenableFuture<BuildEventStreamProtos.BuildEvent>> pendingWrites = + new LinkedBlockingDeque<>(); + + private final SettableFuture<Void> closeFuture = SettableFuture.create(); - protected void write(Message m) { - try { - out.write(m); - } catch (Exception e) { - logger.log(Level.SEVERE, e.getMessage(), e); + SequentialWriter( + String path, + Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc, + Consumer<AbruptExitException> exitFunc) { + try { + this.out = new BufferedOutputStream(new FileOutputStream(path)); + } catch (FileNotFoundException e) { + this.out = new ByteArrayOutputStream(0); + closeNow(); + exitFunc.accept( + new AbruptExitException( + format("Couldn't open BEP file '%s' for writing.", path), + ExitCode.PUBLISH_ERROR, + e)); + } + this.writerThread = new Thread(this); + this.serializeFunc = serializeFunc; + this.exitFunc = exitFunc; + writerThread.start(); + } + + @Override + public void run() { + ListenableFuture<BuildEventStreamProtos.BuildEvent> buildEventF; + try { + while ((buildEventF = pendingWrites.take()) != CLOSE) { + BuildEventStreamProtos.BuildEvent buildEvent = buildEventF.get(); + byte[] serialized = serializeFunc.apply(buildEvent); + out.write(serialized); + } + } catch (Exception e) { + exitFunc.accept( + new AbruptExitException( + "Failed to write BEP events to file.", ExitCode.PUBLISH_ERROR, e)); + pendingWrites.clear(); + logger.log(Level.SEVERE, "Failed to write BEP events to file.", e); + } finally { + try { + out.flush(); + out.close(); + } catch (IOException e) { + logger.log(Level.SEVERE, "Failed to close BEP file output stream.", e); + } + closeFuture.set(null); + } + } + + public void closeNow() { + if (closeFuture.isDone()) { + return; + } + try { + pendingWrites.clear(); + pendingWrites.put(CLOSE); + } catch (InterruptedException e) { + logger.log(Level.SEVERE, "Failed to immediately close the sequential writer.", e); + } + } + + public ListenableFuture<Void> close() { + if (closeFuture.isDone()) { + return closeFuture; + } + try { + pendingWrites.put(CLOSE); + } catch (InterruptedException e) { + closeNow(); + logger.log(Level.SEVERE, "Failed to close the sequential writer.", e); + closeFuture.set(null); + } + return closeFuture; } } - protected void write(String s) { - try { - out.write(s); - } catch (Exception e) { - logger.log(Level.SEVERE, e.getMessage(), e); + @Override + public void sendBuildEvent(BuildEvent event, ArtifactGroupNamer namer) { + if (writer.closeFuture.isDone()) { + return; + } + if (!writer.pendingWrites.add(asStreamProto(event, namer))) { + logger.log(Level.SEVERE, "Failed to add BEP event to the write queue"); } } + protected abstract byte[] serializeEvent(BuildEventStreamProtos.BuildEvent buildEvent); + @Override - public synchronized ListenableFuture<Void> close() { - return Futures.catching( - out.closeAsync(), - Throwable.class, - (t) -> { - logger.log(Level.SEVERE, t.getMessage(), t); - return null; - }, - MoreExecutors.directExecutor()); + public ListenableFuture<Void> close() { + return writer.close(); } @Override - public void closeNow() { - out.closeNow(); + public synchronized void closeNow() { + writer.closeNow(); } /** @@ -113,34 +198,31 @@ abstract class FileTransport implements BuildEventTransport { * a side effect. May return {@code null} if there was an interrupt. This method is not * thread-safe. */ - protected ListenableFuture<BuildEventStreamProtos.BuildEvent> asStreamProto( + private ListenableFuture<BuildEventStreamProtos.BuildEvent> asStreamProto( BuildEvent event, ArtifactGroupNamer namer) { checkNotNull(event); return Futures.transform( uploadReferencedFiles(event.referencedLocalFiles()), - new Function<PathConverter, BuildEventStreamProtos.BuildEvent>() { - @Override - public BuildEventStreamProtos.BuildEvent apply(PathConverter pathConverter) { - BuildEventContext context = - new BuildEventContext() { - @Override - public PathConverter pathConverter() { - return pathConverter; - } - - @Override - public ArtifactGroupNamer artifactGroupNamer() { - return namer; - } - - @Override - public BuildEventProtocolOptions getOptions() { - return options; - } - }; - return event.asStreamProto(context); - } + pathConverter -> { + BuildEventContext context = + new BuildEventContext() { + @Override + public PathConverter pathConverter() { + return pathConverter; + } + + @Override + public ArtifactGroupNamer artifactGroupNamer() { + return namer; + } + + @Override + public BuildEventProtocolOptions getOptions() { + return options; + } + }; + return event.asStreamProto(context); }, MoreExecutors.directExecutor()); } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java index 3b67d6c96e..d888036f74 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java @@ -14,11 +14,7 @@ package com.google.devtools.build.lib.buildeventstream.transports; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; -import com.google.devtools.build.lib.buildeventstream.BuildEvent; +import com.google.common.base.Charsets; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions; import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos; @@ -49,28 +45,17 @@ public final class JsonFormatFileTransport extends FileTransport { } @Override - public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) { - Futures.addCallback(asStreamProto(event, namer), - new FutureCallback<BuildEventStreamProtos.BuildEvent>() { - @Override - public void onSuccess(BuildEventStreamProtos.BuildEvent protoEvent) { - String protoJsonRepresentation; - try { - protoJsonRepresentation = - JsonFormat.printer().omittingInsignificantWhitespace().print(protoEvent) + "\n"; - } catch (InvalidProtocolBufferException e) { - // We don't expect any unknown Any fields in our protocol buffer. Nevertheless, handle - // the exception gracefully and, at least, return valid JSON with an id field. - protoJsonRepresentation = - "{\"id\" : \"unknown\", \"exception\" : \"InvalidProtocolBufferException\"}\n"; - } - write(protoJsonRepresentation); - } - - @Override - public void onFailure(Throwable t) { - // Intentionally left empty. The error handling happens in FileTransport. - } - }, MoreExecutors.directExecutor()); + protected byte[] serializeEvent(BuildEventStreamProtos.BuildEvent buildEvent) { + String protoJsonRepresentation; + try { + protoJsonRepresentation = + JsonFormat.printer().omittingInsignificantWhitespace().print(buildEvent) + "\n"; + } catch (InvalidProtocolBufferException e) { + // We don't expect any unknown Any fields in our protocol buffer. Nevertheless, handle + // the exception gracefully and, at least, return valid JSON with an id field. + protoJsonRepresentation = + "{\"id\" : \"unknown\", \"exception\" : \"InvalidProtocolBufferException\"}\n"; + } + return protoJsonRepresentation.getBytes(Charsets.UTF_8); } } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java index fcdf545e66..ff439b1e5a 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java @@ -14,11 +14,7 @@ package com.google.devtools.build.lib.buildeventstream.transports; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; -import com.google.devtools.build.lib.buildeventstream.BuildEvent; +import com.google.common.base.Charsets; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions; import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos; @@ -50,19 +46,8 @@ public final class TextFormatFileTransport extends FileTransport { } @Override - public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) { - Futures.addCallback(asStreamProto(event, namer), - new FutureCallback<BuildEventStreamProtos.BuildEvent>() { - @Override - public void onSuccess(BuildEventStreamProtos.BuildEvent protoEvent) { - String protoTextRepresentation = TextFormat.printToString(protoEvent); - write("event {\n" + protoTextRepresentation + "}\n\n"); - } - - @Override - public void onFailure(Throwable t) { - // Intentionally left empty. The error handling happens in FileTransport. - } - }, MoreExecutors.directExecutor()); + protected byte[] serializeEvent(BuildEventStreamProtos.BuildEvent buildEvent) { + String protoTextRepresentation = TextFormat.printToString(buildEvent); + return ("event {\n" + protoTextRepresentation + "}\n\n").getBytes(Charsets.UTF_8); } } |