diff options
author | Googler <noreply@google.com> | 2017-10-30 14:26:29 -0400 |
---|---|---|
committer | John Cater <jcater@google.com> | 2017-10-31 10:37:20 -0400 |
commit | 1fef5277c8c47dd976e7028890246df612216456 (patch) | |
tree | 6ec8ea4550d1408b35e55af7d3b9cb05f7e1f938 /src/main/java/com/google/devtools/build/lib/buildeventservice | |
parent | 1c62beca3a1cb5d6b7e55c199bc889d0dc957516 (diff) |
Make BuildEventServiceTransport sender thread verify the streaming RPC is active, restarting the RPC if required.
- BuildEventServiceTransport sender thread checks the RPC is still active every 1s, failing the RPC if it was ended before we reached the end of the BEP stream.
- "fail_fast" is the default behavior on gRPC (it needs to be disabled using .withWaitForReady).
PiperOrigin-RevId: 173921837
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventservice')
2 files changed, 22 insertions, 7 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java index 8c79989c99..3aba07f73d 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java @@ -83,6 +83,8 @@ public class BuildEventServiceTransport implements BuildEventTransport { /** Max wait time until for the Streaming RPC to finish after all events were sent. */ private static final Duration PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT = Duration.ofSeconds(30); + /** Max wait time between isStreamActive checks of the PublishBuildToolEventStream RPC. */ + private static final int STREAMING_RPC_POLL_IN_SECS = 1; private final ListeningExecutorService uploaderExecutorService; private final Duration uploadTimeout; @@ -471,12 +473,20 @@ public class BuildEventServiceTransport implements BuildEventTransport { ListenableFuture<Status> streamDone = besClient .openStream(ackCallback(pendingAck, besClient)); try { - PublishBuildToolEventStreamRequest event; + @Nullable PublishBuildToolEventStreamRequest event; do { - event = pendingSend.takeFirst(); - pendingAck.add(event); - besClient.sendOverStream(event); + event = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS); + if (event != null) { + pendingAck.add(event); + besClient.sendOverStream(event); + } + checkState(besClient.isStreamActive(), "Stream was closed prematurely."); } while (!isLastEvent(event)); + logger.log( + Level.INFO, + String.format( + "Will end publishEventStream() isLastEvent: %s isStreamActive: %s", + isLastEvent(event), besClient.isStreamActive())); } catch (InterruptedException e) { // By convention the interrupted flag should have been cleared, // but just to be sure clear it. @@ -485,8 +495,13 @@ public class BuildEventServiceTransport implements BuildEventTransport { besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetails)); throw e; } catch (Exception e) { + Status status = streamDone.isDone() ? streamDone.get() : null; String additionalDetail = e.getMessage(); - logger.log(Level.WARNING, "Aborting publishBuildToolEventStream RPC: " + additionalDetail); + logger.log( + Level.WARNING, + String.format( + "Aborting publishBuildToolEventStream RPC (status=%s): %s", status, additionalDetail), + e); besClient.abortStream(Status.INTERNAL.augmentDescription(additionalDetail)); throw e; } @@ -511,7 +526,7 @@ public class BuildEventServiceTransport implements BuildEventTransport { } } - private static boolean isLastEvent(PublishBuildToolEventStreamRequest event) { + private static boolean isLastEvent(@Nullable PublishBuildToolEventStreamRequest event) { return event != null && event.getOrderedBuildEvent().getEvent().getEventCase() == COMPONENT_STREAM_FINISHED; } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java index 5721118352..f4f688ed9a 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java @@ -62,7 +62,7 @@ public interface BuildEventServiceClient { /** * Closes the currently opened stream with error. This method does not block. Callers should block * on the Future returned by {@link #openStream(Function)} if in order to make sure that all - * ackCallback calls have been received. + * ackCallback calls have been received. This method is NOOP if the stream was already finished. */ void abortStream(Status status); |