aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/buildeventservice
diff options
context:
space:
mode:
authorGravatar buchgr <buchgr@google.com>2018-06-29 13:00:36 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-06-29 13:02:14 -0700
commitd3f7f7ae1369834b5f81d62474d20541d8b1a918 (patch)
tree98fc8fba21092224e0950f5e27556c3ae07b19e2 /src/main/java/com/google/devtools/build/lib/buildeventservice
parent6e92f97f2953890055c6d724ce2aa6b3c0fca4dc (diff)
bep: Make the BuildEventArtifactUploader async.
This changes the BuildEventArtifactUploader to an async interface, thereby no longer potentially delaying event delivery over the eventbus. Additionally, the BES transport is changed to start uploading local files immediately as the events are delivered. RELNOTES: None PiperOrigin-RevId: 202694121
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventservice')
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java43
1 files changed, 24 insertions, 19 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index acf5686671..f5afa63a22 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -27,7 +27,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SettableFuture;
@@ -50,14 +50,12 @@ import com.google.devtools.build.lib.util.AbruptExitException;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.JavaSleeper;
import com.google.devtools.build.lib.util.Sleeper;
-import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.v1.BuildStatus.Result;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import com.google.protobuf.Any;
import io.grpc.Status;
-import java.io.IOException;
import java.time.Duration;
import java.util.Deque;
import java.util.Set;
@@ -312,8 +310,11 @@ public class BuildEventServiceTransport implements BuildEventTransport {
}
}
- sendOrderedBuildEvent(
- new DefaultInternalOrderedBuildEvent(event, namer, besProtoUtil.nextSequenceNumber()));
+ ListenableFuture<PathConverter> upload =
+ artifactUploader.upload(event.referencedLocalFiles());
+ InternalOrderedBuildEvent buildEvent = new DefaultInternalOrderedBuildEvent(event, namer,
+ upload, besProtoUtil.nextSequenceNumber());
+ sendOrderedBuildEvent(buildEvent);
}
private String errorMessageFromException(Throwable t) {
@@ -484,10 +485,11 @@ public class BuildEventServiceTransport implements BuildEventTransport {
do {
orderedBuildEvent = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS);
if (orderedBuildEvent != null) {
- final PathConverter pathConverter;
+ PathConverter pathConverter;
try {
- pathConverter = artifactUploader.upload(orderedBuildEvent.referencedLocalFiles());
- } catch (IOException e) {
+ // Wait for the local file upload to have been completed.
+ pathConverter = orderedBuildEvent.localFileUploadProgress().get();
+ } catch (ExecutionException e) {
logger.log(
Level.WARNING,
String.format(
@@ -508,15 +510,13 @@ public class BuildEventServiceTransport implements BuildEventTransport {
pendingAck.add(orderedBuildEvent);
besClient.sendOverStream(orderedBuildEvent.serialize(pathConverter));
}
-
checkState(besClient.isStreamActive(), "Stream was closed prematurely.");
} while (orderedBuildEvent == null || !orderedBuildEvent.isLastEvent());
logger.log(
Level.INFO,
String.format(
"Will end publishEventStream() isLastEvent: %s isStreamActive: %s",
- orderedBuildEvent != null && orderedBuildEvent.isLastEvent(),
- besClient.isStreamActive()));
+ orderedBuildEvent.isLastEvent(), besClient.isStreamActive()));
} catch (InterruptedException e) {
// By convention the interrupted flag should have been cleared,
// but just to be sure clear it.
@@ -646,20 +646,24 @@ public class BuildEventServiceTransport implements BuildEventTransport {
int getSequenceNumber();
- Set<Path> referencedLocalFiles();
+ ListenableFuture<PathConverter> localFileUploadProgress();
- PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter);
+ PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter)
+ throws ExecutionException, InterruptedException;
}
private class DefaultInternalOrderedBuildEvent implements InternalOrderedBuildEvent {
private final BuildEvent event;
private final ArtifactGroupNamer artifactGroupNamer;
+ private final ListenableFuture<PathConverter> artifactUpload;
private final int sequenceNumber;
DefaultInternalOrderedBuildEvent(
- BuildEvent event, ArtifactGroupNamer artifactGroupNamer, int sequenceNumber) {
+ BuildEvent event, ArtifactGroupNamer artifactGroupNamer,
+ ListenableFuture<PathConverter> artifactUpload, int sequenceNumber) {
this.event = Preconditions.checkNotNull(event);
this.artifactGroupNamer = Preconditions.checkNotNull(artifactGroupNamer);
+ this.artifactUpload = artifactUpload;
this.sequenceNumber = sequenceNumber;
}
@@ -674,12 +678,13 @@ public class BuildEventServiceTransport implements BuildEventTransport {
}
@Override
- public Set<Path> referencedLocalFiles() {
- return event.referencedLocalFiles();
+ public ListenableFuture<PathConverter> localFileUploadProgress() {
+ return artifactUpload;
}
@Override
- public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) {
+ public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter)
+ throws ExecutionException, InterruptedException {
BuildEventStreamProtos.BuildEvent eventProto =
event.asStreamProto(
new BuildEventContext() {
@@ -720,8 +725,8 @@ public class BuildEventServiceTransport implements BuildEventTransport {
}
@Override
- public Set<Path> referencedLocalFiles() {
- return ImmutableSet.of();
+ public ListenableFuture<PathConverter> localFileUploadProgress() {
+ return Futures.immediateFuture(PathConverter.NO_CONVERSION);
}
@Override