diff options
author | 2017-09-12 13:49:21 +0200 | |
---|---|---|
committer | 2017-09-12 14:09:13 +0200 | |
commit | 0ca8f4bbfb3673cf8a0a34cc5b487023d70561c5 (patch) | |
tree | f2c1a2a4d9fa0f95e26c1d167277da2135180d6f /src/main/java/com | |
parent | bbb4a401d45c1a814e89a056781b635282e010f7 (diff) |
bes: cancel stream on bazel shutdown
Introduce a new method BuildEventTransport#closeNow() which forcefully
closes the BES upload even if there are still events to upload. This
is executed if the bazel server is being shutdown.
The implementation in the BuildEventServiceTransport works by shutting
down the ExecutorService handling the upload, which in turn will
interrupt the upload thread, which in turn will cancel the streaming RPC.
The BuildEventServiceTransport will wait up to 100ms for that
cancellation to happen.
Also fix a bug where the BES transport would not wait for ACKS.
PiperOrigin-RevId: 168359947
Diffstat (limited to 'src/main/java/com')
5 files changed, 130 insertions, 32 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java index 93bf8cf080..4c05cd548a 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java @@ -62,6 +62,8 @@ public abstract class BuildEventServiceModule<T extends BuildEventServiceOptions private OutErr outErr; + private Set<BuildEventTransport> transports = ImmutableSet.of(); + @Override public Iterable<Class<? extends OptionsBase>> getCommandOptions(Command command) { return ImmutableList.of(optionsClass(), AuthAndTLSOptions.class, BuildEventStreamOptions.class); @@ -183,7 +185,7 @@ public abstract class BuildEventServiceModule<T extends BuildEventServiceOptions transportsBuilder.add(besTransport); } - ImmutableSet<BuildEventTransport> transports = transportsBuilder.build(); + transports = transportsBuilder.build(); if (!transports.isEmpty()) { return new BuildEventStreamer(transports, reporter); } @@ -239,6 +241,13 @@ public abstract class BuildEventServiceModule<T extends BuildEventServiceOptions } } + @Override + public void blazeShutdown() { + for (BuildEventTransport transport : transports) { + transport.closeNow(); + } + } + protected abstract Class<T> optionsClass(); protected abstract BuildEventServiceClient createBesClient(T besOptions, diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java index fa9148a030..f453add769 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java @@ -165,8 +165,24 @@ public class BuildEventServiceTransport implements BuildEventTransport { } @Override - public synchronized ListenableFuture<Void> close() { + public ListenableFuture<Void> close() { + return close(/*now=*/false); + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void closeNow() { + close(/*now=*/true); + } + + private synchronized ListenableFuture<Void> close(boolean now) { if (shutdownFuture != null) { + if (now) { + cancelUpload(); + if (!shutdownFuture.isDone()) { + shutdownFuture.set(null); + } + } return shutdownFuture; } @@ -175,6 +191,12 @@ public class BuildEventServiceTransport implements BuildEventTransport { // The future is completed once the close succeeded or failed. shutdownFuture = SettableFuture.create(); + if (now) { + cancelUpload(); + shutdownFuture.set(null); + return shutdownFuture; + } + uploaderExecutorService.execute( () -> { try { @@ -223,6 +245,23 @@ public class BuildEventServiceTransport implements BuildEventTransport { return shutdownFuture; } + private void cancelUpload() { + if (!uploaderExecutorService.isShutdown()) { + logger.log(Level.INFO, "Forcefully closing the build event service transport."); + // This will interrupt the thread doing the BES upload. + if (uploadComplete != null) { + uploadComplete.cancel(true); + } + uploaderExecutorService.shutdownNow(); + try { + uploaderExecutorService.awaitTermination(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // Ignore this exception. We are shutting down independently no matter what the BES + // upload does. + } + } + } + @Override public String name() { // TODO(buchgr): Also display the hostname / IP. @@ -415,7 +454,8 @@ public class BuildEventServiceTransport implements BuildEventTransport { final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend, final BuildEventServiceClient besClient) throws Exception { - ListenableFuture<Status> streamDone = besClient.openStream(ackCallback(pendingAck, besClient)); + ListenableFuture<Status> streamDone = besClient + .openStream(ackCallback(pendingAck, besClient)); try { PublishBuildToolEventStreamRequest event; do { @@ -423,7 +463,13 @@ public class BuildEventServiceTransport implements BuildEventTransport { pendingAck.add(event); besClient.sendOverStream(event); } while (!isLastEvent(event)); - besClient.closeStream(); + } catch (InterruptedException e) { + // By convention the interrupted flag should have been cleared, + // but just to be sure clear it. + Thread.interrupted(); + String additionalDetails = "Sending build events."; + besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetails)); + throw e; } catch (Exception e) { String additionalDetail = e.getMessage(); logger.log(Level.WARNING, "Aborting publishBuildToolEventStream RPC: " + additionalDetail); @@ -435,9 +481,17 @@ public class BuildEventServiceTransport implements BuildEventTransport { Status status = streamDone.get(PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); logger.log(Level.INFO, "Done with publishEventStream(). Status: " + status); + } catch (InterruptedException e) { + // By convention the interrupted flag should have been cleared, + // but just to be sure clear it. + Thread.interrupted(); + String additionalDetails = "Waiting for ACK messages."; + besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetails)); + throw e; } catch (TimeoutException e) { String additionalDetail = "Build Event Protocol upload timed out waiting for ACK messages"; - logger.log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail); + logger + .log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail); besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetail)); throw e; } @@ -466,6 +520,7 @@ public class BuildEventServiceTransport implements BuildEventTransport { PublishBuildToolEventStreamRequest event = pendingAck.removeFirst(); if (isLastEvent(event)) { logger.log(Level.INFO, "Last ACK received."); + besClient.closeStream(); } return null; }; @@ -487,8 +542,8 @@ public class BuildEventServiceTransport implements BuildEventTransport { c.call(); lastKnownError = null; return; - // TODO(buchgr): Narrow the exception to not catch InterruptedException and - // RuntimeException's. + } catch (InterruptedException e) { + throw e; } catch (Exception e) { tries++; lastKnownError = e; diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java index 7fd3cc20ff..f26060a701 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java @@ -68,9 +68,15 @@ public class BuildEventServiceGrpcClient implements BuildEventServiceClient { @Override public Status publish(PublishLifecycleEventRequest lifecycleEvent) throws Exception { - besBlocking - .withDeadlineAfter(RPC_TIMEOUT.toMillis(), MILLISECONDS) - .publishLifecycleEvent(lifecycleEvent); + try { + besBlocking + .withDeadlineAfter(RPC_TIMEOUT.toMillis(), MILLISECONDS) + .publishLifecycleEvent(lifecycleEvent); + } catch (StatusRuntimeException e) { + Throwable rootCause = Throwables.getRootCause(e); + Throwables.throwIfInstanceOf(rootCause, InterruptedException.class); + throw e; + } return Status.OK; } @@ -87,32 +93,44 @@ public class BuildEventServiceGrpcClient implements BuildEventServiceClient { private StreamObserver<PublishBuildToolEventStreamRequest> createStream( final Function<PublishBuildToolEventStreamResponse, Void> ack, - final SettableFuture<Status> streamFinished) { - return besAsync.publishBuildToolEventStream( - new StreamObserver<PublishBuildToolEventStreamResponse>() { - @Override - public void onNext(PublishBuildToolEventStreamResponse response) { - ack.apply(response); - } - - @Override - public void onError(Throwable t) { - streamReference.set(null); - streamFinished.setException(t); - } - - @Override - public void onCompleted() { - streamReference.set(null); - streamFinished.set(Status.OK); - } - }); + final SettableFuture<Status> streamFinished) throws InterruptedException { + try { + return besAsync.publishBuildToolEventStream( + new StreamObserver<PublishBuildToolEventStreamResponse>() { + @Override + public void onNext(PublishBuildToolEventStreamResponse response) { + ack.apply(response); + } + + @Override + public void onError(Throwable t) { + streamReference.set(null); + streamFinished.setException(t); + } + + @Override + public void onCompleted() { + streamReference.set(null); + streamFinished.set(Status.OK); + } + }); + } catch (StatusRuntimeException e) { + Throwable rootCause = Throwables.getRootCause(e); + Throwables.throwIfInstanceOf(rootCause, InterruptedException.class); + throw e; + } } @Override public void sendOverStream(PublishBuildToolEventStreamRequest buildEvent) throws Exception { - checkNotNull(streamReference.get(), "Attempting to send over a closed or unopened stream") - .onNext(buildEvent); + try { + checkNotNull(streamReference.get(), "Attempting to send over a closed or unopened stream") + .onNext(buildEvent); + } catch (StatusRuntimeException e) { + Throwable rootCause = Throwables.getRootCause(e); + Throwables.throwIfInstanceOf(rootCause, InterruptedException.class); + throw e; + } } @Override diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java index 84845ab80c..9b930b9ecd 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java @@ -55,4 +55,14 @@ public interface BuildEventTransport { * <p>This method should not throw any exceptions. */ ListenableFuture<Void> close(); + + /** + * Similar to {@link #close()}. Instructs the transport to close as soon as possible even if + * some build events will be lost. + * + * <p>This method might be called multiple times without any effect after the first call. + * + * <p>This method should not throw any exceptions. + */ + void closeNow(); } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java index 0d5fabf214..528e6d6d25 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java @@ -111,6 +111,12 @@ abstract class FileTransport implements BuildEventTransport { } } + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void closeNow() { + close(); + } + private boolean closing() { return closeFuture != null; } |