diff options
author | 2018-06-29 13:00:36 -0700 | |
---|---|---|
committer | 2018-06-29 13:02:14 -0700 | |
commit | d3f7f7ae1369834b5f81d62474d20541d8b1a918 (patch) | |
tree | 98fc8fba21092224e0950f5e27556c3ae07b19e2 /src/main/java/com/google/devtools/build/lib/buildeventservice | |
parent | 6e92f97f2953890055c6d724ce2aa6b3c0fca4dc (diff) |
bep: Make the BuildEventArtifactUploader async.
This changes the BuildEventArtifactUploader to an async interface,
thereby no longer potentially delaying event delivery over the
eventbus. Additionally, the BES transport is changed to start
uploading local files immediately as the events are delivered.
RELNOTES: None
PiperOrigin-RevId: 202694121
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventservice')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java | 43 |
1 files changed, 24 insertions, 19 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java index acf5686671..f5afa63a22 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java @@ -27,7 +27,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.SettableFuture; @@ -50,14 +50,12 @@ import com.google.devtools.build.lib.util.AbruptExitException; import com.google.devtools.build.lib.util.ExitCode; import com.google.devtools.build.lib.util.JavaSleeper; import com.google.devtools.build.lib.util.Sleeper; -import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.v1.BuildStatus.Result; import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest; import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse; import com.google.devtools.build.v1.PublishLifecycleEventRequest; import com.google.protobuf.Any; import io.grpc.Status; -import java.io.IOException; import java.time.Duration; import java.util.Deque; import java.util.Set; @@ -312,8 +310,11 @@ public class BuildEventServiceTransport implements BuildEventTransport { } } - sendOrderedBuildEvent( - new DefaultInternalOrderedBuildEvent(event, namer, besProtoUtil.nextSequenceNumber())); + ListenableFuture<PathConverter> upload = + artifactUploader.upload(event.referencedLocalFiles()); + InternalOrderedBuildEvent buildEvent = new DefaultInternalOrderedBuildEvent(event, namer, + upload, besProtoUtil.nextSequenceNumber()); + sendOrderedBuildEvent(buildEvent); } private String errorMessageFromException(Throwable t) { @@ -484,10 +485,11 @@ public class BuildEventServiceTransport implements BuildEventTransport { do { orderedBuildEvent = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS); if (orderedBuildEvent != null) { - final PathConverter pathConverter; + PathConverter pathConverter; try { - pathConverter = artifactUploader.upload(orderedBuildEvent.referencedLocalFiles()); - } catch (IOException e) { + // Wait for the local file upload to have been completed. + pathConverter = orderedBuildEvent.localFileUploadProgress().get(); + } catch (ExecutionException e) { logger.log( Level.WARNING, String.format( @@ -508,15 +510,13 @@ public class BuildEventServiceTransport implements BuildEventTransport { pendingAck.add(orderedBuildEvent); besClient.sendOverStream(orderedBuildEvent.serialize(pathConverter)); } - checkState(besClient.isStreamActive(), "Stream was closed prematurely."); } while (orderedBuildEvent == null || !orderedBuildEvent.isLastEvent()); logger.log( Level.INFO, String.format( "Will end publishEventStream() isLastEvent: %s isStreamActive: %s", - orderedBuildEvent != null && orderedBuildEvent.isLastEvent(), - besClient.isStreamActive())); + orderedBuildEvent.isLastEvent(), besClient.isStreamActive())); } catch (InterruptedException e) { // By convention the interrupted flag should have been cleared, // but just to be sure clear it. @@ -646,20 +646,24 @@ public class BuildEventServiceTransport implements BuildEventTransport { int getSequenceNumber(); - Set<Path> referencedLocalFiles(); + ListenableFuture<PathConverter> localFileUploadProgress(); - PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter); + PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) + throws ExecutionException, InterruptedException; } private class DefaultInternalOrderedBuildEvent implements InternalOrderedBuildEvent { private final BuildEvent event; private final ArtifactGroupNamer artifactGroupNamer; + private final ListenableFuture<PathConverter> artifactUpload; private final int sequenceNumber; DefaultInternalOrderedBuildEvent( - BuildEvent event, ArtifactGroupNamer artifactGroupNamer, int sequenceNumber) { + BuildEvent event, ArtifactGroupNamer artifactGroupNamer, + ListenableFuture<PathConverter> artifactUpload, int sequenceNumber) { this.event = Preconditions.checkNotNull(event); this.artifactGroupNamer = Preconditions.checkNotNull(artifactGroupNamer); + this.artifactUpload = artifactUpload; this.sequenceNumber = sequenceNumber; } @@ -674,12 +678,13 @@ public class BuildEventServiceTransport implements BuildEventTransport { } @Override - public Set<Path> referencedLocalFiles() { - return event.referencedLocalFiles(); + public ListenableFuture<PathConverter> localFileUploadProgress() { + return artifactUpload; } @Override - public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) { + public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) + throws ExecutionException, InterruptedException { BuildEventStreamProtos.BuildEvent eventProto = event.asStreamProto( new BuildEventContext() { @@ -720,8 +725,8 @@ public class BuildEventServiceTransport implements BuildEventTransport { } @Override - public Set<Path> referencedLocalFiles() { - return ImmutableSet.of(); + public ListenableFuture<PathConverter> localFileUploadProgress() { + return Futures.immediateFuture(PathConverter.NO_CONVERSION); } @Override |