diff options
Diffstat (limited to 'src/main/java/com')
7 files changed, 146 insertions, 97 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java index acf5686671..f5afa63a22 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java @@ -27,7 +27,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.SettableFuture; @@ -50,14 +50,12 @@ import com.google.devtools.build.lib.util.AbruptExitException; import com.google.devtools.build.lib.util.ExitCode; import com.google.devtools.build.lib.util.JavaSleeper; import com.google.devtools.build.lib.util.Sleeper; -import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.v1.BuildStatus.Result; import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest; import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse; import com.google.devtools.build.v1.PublishLifecycleEventRequest; import com.google.protobuf.Any; import io.grpc.Status; -import java.io.IOException; import java.time.Duration; import java.util.Deque; import java.util.Set; @@ -312,8 +310,11 @@ public class BuildEventServiceTransport implements BuildEventTransport { } } - sendOrderedBuildEvent( - new DefaultInternalOrderedBuildEvent(event, namer, besProtoUtil.nextSequenceNumber())); + ListenableFuture<PathConverter> upload = + artifactUploader.upload(event.referencedLocalFiles()); + InternalOrderedBuildEvent buildEvent = new DefaultInternalOrderedBuildEvent(event, namer, + upload, besProtoUtil.nextSequenceNumber()); + sendOrderedBuildEvent(buildEvent); } private String errorMessageFromException(Throwable t) { @@ -484,10 +485,11 @@ public class BuildEventServiceTransport implements BuildEventTransport { do { orderedBuildEvent = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS); if (orderedBuildEvent != null) { - final PathConverter pathConverter; + PathConverter pathConverter; try { - pathConverter = artifactUploader.upload(orderedBuildEvent.referencedLocalFiles()); - } catch (IOException e) { + // Wait for the local file upload to have been completed. + pathConverter = orderedBuildEvent.localFileUploadProgress().get(); + } catch (ExecutionException e) { logger.log( Level.WARNING, String.format( @@ -508,15 +510,13 @@ public class BuildEventServiceTransport implements BuildEventTransport { pendingAck.add(orderedBuildEvent); besClient.sendOverStream(orderedBuildEvent.serialize(pathConverter)); } - checkState(besClient.isStreamActive(), "Stream was closed prematurely."); } while (orderedBuildEvent == null || !orderedBuildEvent.isLastEvent()); logger.log( Level.INFO, String.format( "Will end publishEventStream() isLastEvent: %s isStreamActive: %s", - orderedBuildEvent != null && orderedBuildEvent.isLastEvent(), - besClient.isStreamActive())); + orderedBuildEvent.isLastEvent(), besClient.isStreamActive())); } catch (InterruptedException e) { // By convention the interrupted flag should have been cleared, // but just to be sure clear it. @@ -646,20 +646,24 @@ public class BuildEventServiceTransport implements BuildEventTransport { int getSequenceNumber(); - Set<Path> referencedLocalFiles(); + ListenableFuture<PathConverter> localFileUploadProgress(); - PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter); + PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) + throws ExecutionException, InterruptedException; } private class DefaultInternalOrderedBuildEvent implements InternalOrderedBuildEvent { private final BuildEvent event; private final ArtifactGroupNamer artifactGroupNamer; + private final ListenableFuture<PathConverter> artifactUpload; private final int sequenceNumber; DefaultInternalOrderedBuildEvent( - BuildEvent event, ArtifactGroupNamer artifactGroupNamer, int sequenceNumber) { + BuildEvent event, ArtifactGroupNamer artifactGroupNamer, + ListenableFuture<PathConverter> artifactUpload, int sequenceNumber) { this.event = Preconditions.checkNotNull(event); this.artifactGroupNamer = Preconditions.checkNotNull(artifactGroupNamer); + this.artifactUpload = artifactUpload; this.sequenceNumber = sequenceNumber; } @@ -674,12 +678,13 @@ public class BuildEventServiceTransport implements BuildEventTransport { } @Override - public Set<Path> referencedLocalFiles() { - return event.referencedLocalFiles(); + public ListenableFuture<PathConverter> localFileUploadProgress() { + return artifactUpload; } @Override - public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) { + public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) + throws ExecutionException, InterruptedException { BuildEventStreamProtos.BuildEvent eventProto = event.asStreamProto( new BuildEventContext() { @@ -720,8 +725,8 @@ public class BuildEventServiceTransport implements BuildEventTransport { } @Override - public Set<Path> referencedLocalFiles() { - return ImmutableSet.of(); + public ListenableFuture<PathConverter> localFileUploadProgress() { + return Futures.immediateFuture(PathConverter.NO_CONVERSION); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java index 7b7ffb15ab..5c1fe9c5b2 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java @@ -13,26 +13,32 @@ // limitations under the License. package com.google.devtools.build.lib.buildeventstream; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.buildeventstream.PathConverter.FileUriPathConverter; import com.google.devtools.build.lib.vfs.Path; -import java.io.IOException; import java.util.Set; /** Uploads artifacts referenced by the Build Event Protocol (BEP). */ public interface BuildEventArtifactUploader { - public static final BuildEventArtifactUploader LOCAL_FILES_UPLOADER = - new BuildEventArtifactUploader() { + BuildEventArtifactUploader LOCAL_FILES_UPLOADER = new BuildEventArtifactUploader() { + private final ListenableFuture<PathConverter> completedPathConverter = + Futures.immediateFuture(new FileUriPathConverter()); + @Override - public PathConverter upload(Set<Path> files) { - return new FileUriPathConverter(); + public ListenableFuture<PathConverter> upload(Set<Path> files) { + return completedPathConverter; } }; /** - * Uploads a set of files referenced by the protobuf representation of a {@link BuildEvent}. + * Asynchronously uploads a set of files referenced by the protobuf representation of a + * {@link BuildEvent}. This method is expected to return quickly. * - * <p>Returns a {@link PathConverter} that must provide a name for each uploaded file as it should - * appear in the BEP. + * <p>This method must not throw any exceptions. + * + * <p>Returns a future to a {@link PathConverter} that must provide a name for each uploaded file + * as it should appear in the BEP. */ - PathConverter upload(Set<Path> files) throws IOException, InterruptedException; + ListenableFuture<PathConverter> upload(Set<Path> files); } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java index 2f1d15c25a..72c800e24c 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java @@ -14,6 +14,9 @@ package com.google.devtools.build.lib.buildeventstream.transports; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEvent; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; @@ -45,10 +48,18 @@ public final class BinaryFormatFileTransport extends FileTransport { @Override public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) { - BuildEventStreamProtos.BuildEvent protoEvent = asStreamProto(event, namer); - if (protoEvent == null) { - return; - } - write(protoEvent); + Futures.addCallback(asStreamProto(event, namer), + new FutureCallback<BuildEventStreamProtos.BuildEvent>() { + @Override + public void onSuccess(BuildEventStreamProtos.BuildEvent protoEvent) { + write(protoEvent); + } + + @Override + public void onFailure(Throwable t) { + // Intentionally left empty. The error handling happens in + // FileTransport. + } + }, MoreExecutors.directExecutor()); } } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java index 540a7e84cd..c193ed094b 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java @@ -17,7 +17,9 @@ package com.google.devtools.build.lib.buildeventstream.transports; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Throwables; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -39,7 +41,6 @@ import java.util.Set; import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; /** * Non-blocking file transport. @@ -54,7 +55,6 @@ abstract class FileTransport implements BuildEventTransport { private final BuildEventProtocolOptions options; private final BuildEventArtifactUploader uploader; private final Consumer<AbruptExitException> exitFunc; - private boolean errored; @VisibleForTesting final AsynchronousFileOutputStream out; @@ -111,57 +111,62 @@ abstract class FileTransport implements BuildEventTransport { * a side effect. May return {@code null} if there was an interrupt. This method is not * thread-safe. */ - @Nullable - protected BuildEventStreamProtos.BuildEvent asStreamProto( + protected ListenableFuture<BuildEventStreamProtos.BuildEvent> asStreamProto( BuildEvent event, ArtifactGroupNamer namer) { checkNotNull(event); - PathConverter pathConverter = uploadReferencedFiles(event.referencedLocalFiles()); - if (pathConverter == null) { - return null; - } - - BuildEventContext context = - new BuildEventContext() { - @Override - public PathConverter pathConverter() { - return pathConverter; - } - - @Override - public ArtifactGroupNamer artifactGroupNamer() { - return namer; - } + return Futures.transform( + uploadReferencedFiles(event.referencedLocalFiles()), + new Function<PathConverter, BuildEventStreamProtos.BuildEvent>() { @Override - public BuildEventProtocolOptions getOptions() { - return options; + public BuildEventStreamProtos.BuildEvent apply(PathConverter pathConverter) { + BuildEventContext context = + new BuildEventContext() { + @Override + public PathConverter pathConverter() { + return pathConverter; + } + + @Override + public ArtifactGroupNamer artifactGroupNamer() { + return namer; + } + + @Override + public BuildEventProtocolOptions getOptions() { + return options; + } + }; + return event.asStreamProto(context); } - }; - return event.asStreamProto(context); + }, + MoreExecutors.directExecutor()); } /** * Returns a {@link PathConverter} for the uploaded files, or {@code null} when the uploaded * failed. */ - private @Nullable PathConverter uploadReferencedFiles(Set<Path> artifacts) { + private ListenableFuture<PathConverter> uploadReferencedFiles(Set<Path> artifacts) { checkNotNull(artifacts); - if (errored) { - return null; - } - try { - return uploader.upload(artifacts); - } catch (IOException e) { - errored = true; - exitFunc.accept( - new AbruptExitException( - Throwables.getStackTraceAsString(e), ExitCode.PUBLISH_ERROR, e)); - } catch (InterruptedException e) { - errored = true; - exitFunc.accept(new AbruptExitException(ExitCode.INTERRUPTED, e)); - Thread.currentThread().interrupt(); - } - return null; + ListenableFuture<PathConverter> upload = uploader.upload(artifacts); + Futures.addCallback( + upload, + new FutureCallback<PathConverter>() { + @Override + public void onSuccess(PathConverter result) { + // Intentionally left empty. + } + + @Override + public void onFailure(Throwable t) { + exitFunc.accept( + new AbruptExitException( + Throwables.getStackTraceAsString(t), ExitCode.PUBLISH_ERROR, t)); + } + }, + MoreExecutors.directExecutor()); + return upload; } } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java index 33b296143e..3b67d6c96e 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java @@ -14,6 +14,9 @@ package com.google.devtools.build.lib.buildeventstream.transports; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEvent; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; @@ -47,20 +50,27 @@ public final class JsonFormatFileTransport extends FileTransport { @Override public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) { - BuildEventStreamProtos.BuildEvent protoEvent = asStreamProto(event, namer); - if (protoEvent == null) { - return; - } - String protoJsonRepresentation; - try { - protoJsonRepresentation = - JsonFormat.printer().omittingInsignificantWhitespace().print(protoEvent) + "\n"; - } catch (InvalidProtocolBufferException e) { - // We don't expect any unknown Any fields in our protocol buffer. Nevertheless, handle - // the exception gracefully and, at least, return valid JSON with an id field. - protoJsonRepresentation = - "{\"id\" : \"unknown\", \"exception\" : \"InvalidProtocolBufferException\"}\n"; - } - write(protoJsonRepresentation); + Futures.addCallback(asStreamProto(event, namer), + new FutureCallback<BuildEventStreamProtos.BuildEvent>() { + @Override + public void onSuccess(BuildEventStreamProtos.BuildEvent protoEvent) { + String protoJsonRepresentation; + try { + protoJsonRepresentation = + JsonFormat.printer().omittingInsignificantWhitespace().print(protoEvent) + "\n"; + } catch (InvalidProtocolBufferException e) { + // We don't expect any unknown Any fields in our protocol buffer. Nevertheless, handle + // the exception gracefully and, at least, return valid JSON with an id field. + protoJsonRepresentation = + "{\"id\" : \"unknown\", \"exception\" : \"InvalidProtocolBufferException\"}\n"; + } + write(protoJsonRepresentation); + } + + @Override + public void onFailure(Throwable t) { + // Intentionally left empty. The error handling happens in FileTransport. + } + }, MoreExecutors.directExecutor()); } } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java index af1364f3bf..fcdf545e66 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java @@ -14,6 +14,9 @@ package com.google.devtools.build.lib.buildeventstream.transports; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEvent; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; @@ -48,11 +51,18 @@ public final class TextFormatFileTransport extends FileTransport { @Override public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) { - BuildEventStreamProtos.BuildEvent protoEvent = asStreamProto(event, namer); - if (protoEvent == null) { - return; - } - String protoTextRepresentation = TextFormat.printToString(protoEvent); - write("event {\n" + protoTextRepresentation + "}\n\n"); + Futures.addCallback(asStreamProto(event, namer), + new FutureCallback<BuildEventStreamProtos.BuildEvent>() { + @Override + public void onSuccess(BuildEventStreamProtos.BuildEvent protoEvent) { + String protoTextRepresentation = TextFormat.printToString(protoEvent); + write("event {\n" + protoTextRepresentation + "}\n\n"); + } + + @Override + public void onFailure(Throwable t) { + // Intentionally left empty. The error handling happens in FileTransport. + } + }, MoreExecutors.directExecutor()); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 607b6379b5..7cd35bfb34 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -16,6 +16,8 @@ package com.google.devtools.build.lib.remote; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; @@ -101,9 +103,9 @@ public final class RemoteModule extends BlazeModule { public void serverInit(OptionsProvider startupOptions, ServerBuilder builder) { builder.addBuildEventArtifactUploader(new BuildEventArtifactUploader() { @Override - public PathConverter upload(Set<Path> files) { + public ListenableFuture<PathConverter> upload(Set<Path> files) { // TODO(ulfjack): Actually hook up upload here. - return converter; + return Futures.immediateFuture(converter); } }, "remote"); } |