diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventservice')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java | 43 |
1 files changed, 24 insertions, 19 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java index acf5686671..f5afa63a22 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java @@ -27,7 +27,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.SettableFuture; @@ -50,14 +50,12 @@ import com.google.devtools.build.lib.util.AbruptExitException; import com.google.devtools.build.lib.util.ExitCode; import com.google.devtools.build.lib.util.JavaSleeper; import com.google.devtools.build.lib.util.Sleeper; -import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.v1.BuildStatus.Result; import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest; import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse; import com.google.devtools.build.v1.PublishLifecycleEventRequest; import com.google.protobuf.Any; import io.grpc.Status; -import java.io.IOException; import java.time.Duration; import java.util.Deque; import java.util.Set; @@ -312,8 +310,11 @@ public class BuildEventServiceTransport implements BuildEventTransport { } } - sendOrderedBuildEvent( - new DefaultInternalOrderedBuildEvent(event, namer, besProtoUtil.nextSequenceNumber())); + ListenableFuture<PathConverter> upload = + artifactUploader.upload(event.referencedLocalFiles()); + InternalOrderedBuildEvent buildEvent = new DefaultInternalOrderedBuildEvent(event, namer, + upload, besProtoUtil.nextSequenceNumber()); + sendOrderedBuildEvent(buildEvent); } private String errorMessageFromException(Throwable t) { @@ -484,10 +485,11 @@ public class BuildEventServiceTransport implements BuildEventTransport { do { orderedBuildEvent = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS); if (orderedBuildEvent != null) { - final PathConverter pathConverter; + PathConverter pathConverter; try { - pathConverter = artifactUploader.upload(orderedBuildEvent.referencedLocalFiles()); - } catch (IOException e) { + // Wait for the local file upload to have been completed. + pathConverter = orderedBuildEvent.localFileUploadProgress().get(); + } catch (ExecutionException e) { logger.log( Level.WARNING, String.format( @@ -508,15 +510,13 @@ public class BuildEventServiceTransport implements BuildEventTransport { pendingAck.add(orderedBuildEvent); besClient.sendOverStream(orderedBuildEvent.serialize(pathConverter)); } - checkState(besClient.isStreamActive(), "Stream was closed prematurely."); } while (orderedBuildEvent == null || !orderedBuildEvent.isLastEvent()); logger.log( Level.INFO, String.format( "Will end publishEventStream() isLastEvent: %s isStreamActive: %s", - orderedBuildEvent != null && orderedBuildEvent.isLastEvent(), - besClient.isStreamActive())); + orderedBuildEvent.isLastEvent(), besClient.isStreamActive())); } catch (InterruptedException e) { // By convention the interrupted flag should have been cleared, // but just to be sure clear it. @@ -646,20 +646,24 @@ public class BuildEventServiceTransport implements BuildEventTransport { int getSequenceNumber(); - Set<Path> referencedLocalFiles(); + ListenableFuture<PathConverter> localFileUploadProgress(); - PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter); + PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) + throws ExecutionException, InterruptedException; } private class DefaultInternalOrderedBuildEvent implements InternalOrderedBuildEvent { private final BuildEvent event; private final ArtifactGroupNamer artifactGroupNamer; + private final ListenableFuture<PathConverter> artifactUpload; private final int sequenceNumber; DefaultInternalOrderedBuildEvent( - BuildEvent event, ArtifactGroupNamer artifactGroupNamer, int sequenceNumber) { + BuildEvent event, ArtifactGroupNamer artifactGroupNamer, + ListenableFuture<PathConverter> artifactUpload, int sequenceNumber) { this.event = Preconditions.checkNotNull(event); this.artifactGroupNamer = Preconditions.checkNotNull(artifactGroupNamer); + this.artifactUpload = artifactUpload; this.sequenceNumber = sequenceNumber; } @@ -674,12 +678,13 @@ public class BuildEventServiceTransport implements BuildEventTransport { } @Override - public Set<Path> referencedLocalFiles() { - return event.referencedLocalFiles(); + public ListenableFuture<PathConverter> localFileUploadProgress() { + return artifactUpload; } @Override - public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) { + public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) + throws ExecutionException, InterruptedException { BuildEventStreamProtos.BuildEvent eventProto = event.asStreamProto( new BuildEventContext() { @@ -720,8 +725,8 @@ public class BuildEventServiceTransport implements BuildEventTransport { } @Override - public Set<Path> referencedLocalFiles() { - return ImmutableSet.of(); + public ListenableFuture<PathConverter> localFileUploadProgress() { + return Futures.immediateFuture(PathConverter.NO_CONVERSION); } @Override |