aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/buildeventservice
diff options
context:
space:
mode:
authorGravatar Googler <noreply@google.com>2017-10-30 14:26:29 -0400
committerGravatar John Cater <jcater@google.com>2017-10-31 10:37:20 -0400
commit1fef5277c8c47dd976e7028890246df612216456 (patch)
tree6ec8ea4550d1408b35e55af7d3b9cb05f7e1f938 /src/main/java/com/google/devtools/build/lib/buildeventservice
parent1c62beca3a1cb5d6b7e55c199bc889d0dc957516 (diff)
Make BuildEventServiceTransport sender thread verify the streaming RPC is active, restarting the RPC if required.
- BuildEventServiceTransport sender thread checks the RPC is still active every 1s, failing the RPC if it was ended before we reached the end of the BEP stream. - "fail_fast" is the default behavior on gRPC (it needs to be disabled using .withWaitForReady). PiperOrigin-RevId: 173921837
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventservice')
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java27
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java2
2 files changed, 22 insertions, 7 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index 8c79989c99..3aba07f73d 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -83,6 +83,8 @@ public class BuildEventServiceTransport implements BuildEventTransport {
/** Max wait time until for the Streaming RPC to finish after all events were sent. */
private static final Duration PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT = Duration.ofSeconds(30);
+ /** Max wait time between isStreamActive checks of the PublishBuildToolEventStream RPC. */
+ private static final int STREAMING_RPC_POLL_IN_SECS = 1;
private final ListeningExecutorService uploaderExecutorService;
private final Duration uploadTimeout;
@@ -471,12 +473,20 @@ public class BuildEventServiceTransport implements BuildEventTransport {
ListenableFuture<Status> streamDone = besClient
.openStream(ackCallback(pendingAck, besClient));
try {
- PublishBuildToolEventStreamRequest event;
+ @Nullable PublishBuildToolEventStreamRequest event;
do {
- event = pendingSend.takeFirst();
- pendingAck.add(event);
- besClient.sendOverStream(event);
+ event = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS);
+ if (event != null) {
+ pendingAck.add(event);
+ besClient.sendOverStream(event);
+ }
+ checkState(besClient.isStreamActive(), "Stream was closed prematurely.");
} while (!isLastEvent(event));
+ logger.log(
+ Level.INFO,
+ String.format(
+ "Will end publishEventStream() isLastEvent: %s isStreamActive: %s",
+ isLastEvent(event), besClient.isStreamActive()));
} catch (InterruptedException e) {
// By convention the interrupted flag should have been cleared,
// but just to be sure clear it.
@@ -485,8 +495,13 @@ public class BuildEventServiceTransport implements BuildEventTransport {
besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetails));
throw e;
} catch (Exception e) {
+ Status status = streamDone.isDone() ? streamDone.get() : null;
String additionalDetail = e.getMessage();
- logger.log(Level.WARNING, "Aborting publishBuildToolEventStream RPC: " + additionalDetail);
+ logger.log(
+ Level.WARNING,
+ String.format(
+ "Aborting publishBuildToolEventStream RPC (status=%s): %s", status, additionalDetail),
+ e);
besClient.abortStream(Status.INTERNAL.augmentDescription(additionalDetail));
throw e;
}
@@ -511,7 +526,7 @@ public class BuildEventServiceTransport implements BuildEventTransport {
}
}
- private static boolean isLastEvent(PublishBuildToolEventStreamRequest event) {
+ private static boolean isLastEvent(@Nullable PublishBuildToolEventStreamRequest event) {
return event != null
&& event.getOrderedBuildEvent().getEvent().getEventCase() == COMPONENT_STREAM_FINISHED;
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
index 5721118352..f4f688ed9a 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
@@ -62,7 +62,7 @@ public interface BuildEventServiceClient {
/**
* Closes the currently opened stream with error. This method does not block. Callers should block
* on the Future returned by {@link #openStream(Function)} if in order to make sure that all
- * ackCallback calls have been received.
+ * ackCallback calls have been received. This method is NOOP if the stream was already finished.
*/
void abortStream(Status status);