aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/buildeventservice
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventservice')
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java43
1 files changed, 24 insertions, 19 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index acf5686671..f5afa63a22 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -27,7 +27,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SettableFuture;
@@ -50,14 +50,12 @@ import com.google.devtools.build.lib.util.AbruptExitException;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.JavaSleeper;
import com.google.devtools.build.lib.util.Sleeper;
-import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.v1.BuildStatus.Result;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import com.google.protobuf.Any;
import io.grpc.Status;
-import java.io.IOException;
import java.time.Duration;
import java.util.Deque;
import java.util.Set;
@@ -312,8 +310,11 @@ public class BuildEventServiceTransport implements BuildEventTransport {
}
}
- sendOrderedBuildEvent(
- new DefaultInternalOrderedBuildEvent(event, namer, besProtoUtil.nextSequenceNumber()));
+ ListenableFuture<PathConverter> upload =
+ artifactUploader.upload(event.referencedLocalFiles());
+ InternalOrderedBuildEvent buildEvent = new DefaultInternalOrderedBuildEvent(event, namer,
+ upload, besProtoUtil.nextSequenceNumber());
+ sendOrderedBuildEvent(buildEvent);
}
private String errorMessageFromException(Throwable t) {
@@ -484,10 +485,11 @@ public class BuildEventServiceTransport implements BuildEventTransport {
do {
orderedBuildEvent = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS);
if (orderedBuildEvent != null) {
- final PathConverter pathConverter;
+ PathConverter pathConverter;
try {
- pathConverter = artifactUploader.upload(orderedBuildEvent.referencedLocalFiles());
- } catch (IOException e) {
+ // Wait for the local file upload to have been completed.
+ pathConverter = orderedBuildEvent.localFileUploadProgress().get();
+ } catch (ExecutionException e) {
logger.log(
Level.WARNING,
String.format(
@@ -508,15 +510,13 @@ public class BuildEventServiceTransport implements BuildEventTransport {
pendingAck.add(orderedBuildEvent);
besClient.sendOverStream(orderedBuildEvent.serialize(pathConverter));
}
-
checkState(besClient.isStreamActive(), "Stream was closed prematurely.");
} while (orderedBuildEvent == null || !orderedBuildEvent.isLastEvent());
logger.log(
Level.INFO,
String.format(
"Will end publishEventStream() isLastEvent: %s isStreamActive: %s",
- orderedBuildEvent != null && orderedBuildEvent.isLastEvent(),
- besClient.isStreamActive()));
+ orderedBuildEvent.isLastEvent(), besClient.isStreamActive()));
} catch (InterruptedException e) {
// By convention the interrupted flag should have been cleared,
// but just to be sure clear it.
@@ -646,20 +646,24 @@ public class BuildEventServiceTransport implements BuildEventTransport {
int getSequenceNumber();
- Set<Path> referencedLocalFiles();
+ ListenableFuture<PathConverter> localFileUploadProgress();
- PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter);
+ PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter)
+ throws ExecutionException, InterruptedException;
}
private class DefaultInternalOrderedBuildEvent implements InternalOrderedBuildEvent {
private final BuildEvent event;
private final ArtifactGroupNamer artifactGroupNamer;
+ private final ListenableFuture<PathConverter> artifactUpload;
private final int sequenceNumber;
DefaultInternalOrderedBuildEvent(
- BuildEvent event, ArtifactGroupNamer artifactGroupNamer, int sequenceNumber) {
+ BuildEvent event, ArtifactGroupNamer artifactGroupNamer,
+ ListenableFuture<PathConverter> artifactUpload, int sequenceNumber) {
this.event = Preconditions.checkNotNull(event);
this.artifactGroupNamer = Preconditions.checkNotNull(artifactGroupNamer);
+ this.artifactUpload = artifactUpload;
this.sequenceNumber = sequenceNumber;
}
@@ -674,12 +678,13 @@ public class BuildEventServiceTransport implements BuildEventTransport {
}
@Override
- public Set<Path> referencedLocalFiles() {
- return event.referencedLocalFiles();
+ public ListenableFuture<PathConverter> localFileUploadProgress() {
+ return artifactUpload;
}
@Override
- public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) {
+ public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter)
+ throws ExecutionException, InterruptedException {
BuildEventStreamProtos.BuildEvent eventProto =
event.asStreamProto(
new BuildEventContext() {
@@ -720,8 +725,8 @@ public class BuildEventServiceTransport implements BuildEventTransport {
}
@Override
- public Set<Path> referencedLocalFiles() {
- return ImmutableSet.of();
+ public ListenableFuture<PathConverter> localFileUploadProgress() {
+ return Futures.immediateFuture(PathConverter.NO_CONVERSION);
}
@Override