aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com
diff options
context:
space:
mode:
authorGravatar buchgr <buchgr@google.com>2017-09-12 13:49:21 +0200
committerGravatar Philipp Wollermann <philwo@google.com>2017-09-12 14:09:13 +0200
commit0ca8f4bbfb3673cf8a0a34cc5b487023d70561c5 (patch)
treef2c1a2a4d9fa0f95e26c1d167277da2135180d6f /src/main/java/com
parentbbb4a401d45c1a814e89a056781b635282e010f7 (diff)
bes: cancel stream on bazel shutdown
Introduce a new method BuildEventTransport#closeNow() which forcefully closes the BES upload even if there are still events to upload. This is executed if the bazel server is being shutdown. The implementation in the BuildEventServiceTransport works by shutting down the ExecutorService handling the upload, which in turn will interrupt the upload thread, which in turn will cancel the streaming RPC. The BuildEventServiceTransport will wait up to 100ms for that cancellation to happen. Also fix a bug where the BES transport would not wait for ACKS. PiperOrigin-RevId: 168359947
Diffstat (limited to 'src/main/java/com')
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java11
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java67
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java68
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java10
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java6
5 files changed, 130 insertions, 32 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
index 93bf8cf080..4c05cd548a 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
@@ -62,6 +62,8 @@ public abstract class BuildEventServiceModule<T extends BuildEventServiceOptions
private OutErr outErr;
+ private Set<BuildEventTransport> transports = ImmutableSet.of();
+
@Override
public Iterable<Class<? extends OptionsBase>> getCommandOptions(Command command) {
return ImmutableList.of(optionsClass(), AuthAndTLSOptions.class, BuildEventStreamOptions.class);
@@ -183,7 +185,7 @@ public abstract class BuildEventServiceModule<T extends BuildEventServiceOptions
transportsBuilder.add(besTransport);
}
- ImmutableSet<BuildEventTransport> transports = transportsBuilder.build();
+ transports = transportsBuilder.build();
if (!transports.isEmpty()) {
return new BuildEventStreamer(transports, reporter);
}
@@ -239,6 +241,13 @@ public abstract class BuildEventServiceModule<T extends BuildEventServiceOptions
}
}
+ @Override
+ public void blazeShutdown() {
+ for (BuildEventTransport transport : transports) {
+ transport.closeNow();
+ }
+ }
+
protected abstract Class<T> optionsClass();
protected abstract BuildEventServiceClient createBesClient(T besOptions,
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index fa9148a030..f453add769 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -165,8 +165,24 @@ public class BuildEventServiceTransport implements BuildEventTransport {
}
@Override
- public synchronized ListenableFuture<Void> close() {
+ public ListenableFuture<Void> close() {
+ return close(/*now=*/false);
+ }
+
+ @Override
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public void closeNow() {
+ close(/*now=*/true);
+ }
+
+ private synchronized ListenableFuture<Void> close(boolean now) {
if (shutdownFuture != null) {
+ if (now) {
+ cancelUpload();
+ if (!shutdownFuture.isDone()) {
+ shutdownFuture.set(null);
+ }
+ }
return shutdownFuture;
}
@@ -175,6 +191,12 @@ public class BuildEventServiceTransport implements BuildEventTransport {
// The future is completed once the close succeeded or failed.
shutdownFuture = SettableFuture.create();
+ if (now) {
+ cancelUpload();
+ shutdownFuture.set(null);
+ return shutdownFuture;
+ }
+
uploaderExecutorService.execute(
() -> {
try {
@@ -223,6 +245,23 @@ public class BuildEventServiceTransport implements BuildEventTransport {
return shutdownFuture;
}
+ private void cancelUpload() {
+ if (!uploaderExecutorService.isShutdown()) {
+ logger.log(Level.INFO, "Forcefully closing the build event service transport.");
+ // This will interrupt the thread doing the BES upload.
+ if (uploadComplete != null) {
+ uploadComplete.cancel(true);
+ }
+ uploaderExecutorService.shutdownNow();
+ try {
+ uploaderExecutorService.awaitTermination(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // Ignore this exception. We are shutting down independently no matter what the BES
+ // upload does.
+ }
+ }
+ }
+
@Override
public String name() {
// TODO(buchgr): Also display the hostname / IP.
@@ -415,7 +454,8 @@ public class BuildEventServiceTransport implements BuildEventTransport {
final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend,
final BuildEventServiceClient besClient)
throws Exception {
- ListenableFuture<Status> streamDone = besClient.openStream(ackCallback(pendingAck, besClient));
+ ListenableFuture<Status> streamDone = besClient
+ .openStream(ackCallback(pendingAck, besClient));
try {
PublishBuildToolEventStreamRequest event;
do {
@@ -423,7 +463,13 @@ public class BuildEventServiceTransport implements BuildEventTransport {
pendingAck.add(event);
besClient.sendOverStream(event);
} while (!isLastEvent(event));
- besClient.closeStream();
+ } catch (InterruptedException e) {
+ // By convention the interrupted flag should have been cleared,
+ // but just to be sure clear it.
+ Thread.interrupted();
+ String additionalDetails = "Sending build events.";
+ besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetails));
+ throw e;
} catch (Exception e) {
String additionalDetail = e.getMessage();
logger.log(Level.WARNING, "Aborting publishBuildToolEventStream RPC: " + additionalDetail);
@@ -435,9 +481,17 @@ public class BuildEventServiceTransport implements BuildEventTransport {
Status status =
streamDone.get(PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
logger.log(Level.INFO, "Done with publishEventStream(). Status: " + status);
+ } catch (InterruptedException e) {
+ // By convention the interrupted flag should have been cleared,
+ // but just to be sure clear it.
+ Thread.interrupted();
+ String additionalDetails = "Waiting for ACK messages.";
+ besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetails));
+ throw e;
} catch (TimeoutException e) {
String additionalDetail = "Build Event Protocol upload timed out waiting for ACK messages";
- logger.log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail);
+ logger
+ .log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail);
besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetail));
throw e;
}
@@ -466,6 +520,7 @@ public class BuildEventServiceTransport implements BuildEventTransport {
PublishBuildToolEventStreamRequest event = pendingAck.removeFirst();
if (isLastEvent(event)) {
logger.log(Level.INFO, "Last ACK received.");
+ besClient.closeStream();
}
return null;
};
@@ -487,8 +542,8 @@ public class BuildEventServiceTransport implements BuildEventTransport {
c.call();
lastKnownError = null;
return;
- // TODO(buchgr): Narrow the exception to not catch InterruptedException and
- // RuntimeException's.
+ } catch (InterruptedException e) {
+ throw e;
} catch (Exception e) {
tries++;
lastKnownError = e;
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
index 7fd3cc20ff..f26060a701 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
@@ -68,9 +68,15 @@ public class BuildEventServiceGrpcClient implements BuildEventServiceClient {
@Override
public Status publish(PublishLifecycleEventRequest lifecycleEvent) throws Exception {
- besBlocking
- .withDeadlineAfter(RPC_TIMEOUT.toMillis(), MILLISECONDS)
- .publishLifecycleEvent(lifecycleEvent);
+ try {
+ besBlocking
+ .withDeadlineAfter(RPC_TIMEOUT.toMillis(), MILLISECONDS)
+ .publishLifecycleEvent(lifecycleEvent);
+ } catch (StatusRuntimeException e) {
+ Throwable rootCause = Throwables.getRootCause(e);
+ Throwables.throwIfInstanceOf(rootCause, InterruptedException.class);
+ throw e;
+ }
return Status.OK;
}
@@ -87,32 +93,44 @@ public class BuildEventServiceGrpcClient implements BuildEventServiceClient {
private StreamObserver<PublishBuildToolEventStreamRequest> createStream(
final Function<PublishBuildToolEventStreamResponse, Void> ack,
- final SettableFuture<Status> streamFinished) {
- return besAsync.publishBuildToolEventStream(
- new StreamObserver<PublishBuildToolEventStreamResponse>() {
- @Override
- public void onNext(PublishBuildToolEventStreamResponse response) {
- ack.apply(response);
- }
-
- @Override
- public void onError(Throwable t) {
- streamReference.set(null);
- streamFinished.setException(t);
- }
-
- @Override
- public void onCompleted() {
- streamReference.set(null);
- streamFinished.set(Status.OK);
- }
- });
+ final SettableFuture<Status> streamFinished) throws InterruptedException {
+ try {
+ return besAsync.publishBuildToolEventStream(
+ new StreamObserver<PublishBuildToolEventStreamResponse>() {
+ @Override
+ public void onNext(PublishBuildToolEventStreamResponse response) {
+ ack.apply(response);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ streamReference.set(null);
+ streamFinished.setException(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ streamReference.set(null);
+ streamFinished.set(Status.OK);
+ }
+ });
+ } catch (StatusRuntimeException e) {
+ Throwable rootCause = Throwables.getRootCause(e);
+ Throwables.throwIfInstanceOf(rootCause, InterruptedException.class);
+ throw e;
+ }
}
@Override
public void sendOverStream(PublishBuildToolEventStreamRequest buildEvent) throws Exception {
- checkNotNull(streamReference.get(), "Attempting to send over a closed or unopened stream")
- .onNext(buildEvent);
+ try {
+ checkNotNull(streamReference.get(), "Attempting to send over a closed or unopened stream")
+ .onNext(buildEvent);
+ } catch (StatusRuntimeException e) {
+ Throwable rootCause = Throwables.getRootCause(e);
+ Throwables.throwIfInstanceOf(rootCause, InterruptedException.class);
+ throw e;
+ }
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
index 84845ab80c..9b930b9ecd 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java
@@ -55,4 +55,14 @@ public interface BuildEventTransport {
* <p>This method should not throw any exceptions.
*/
ListenableFuture<Void> close();
+
+ /**
+ * Similar to {@link #close()}. Instructs the transport to close as soon as possible even if
+ * some build events will be lost.
+ *
+ * <p>This method might be called multiple times without any effect after the first call.
+ *
+ * <p>This method should not throw any exceptions.
+ */
+ void closeNow();
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index 0d5fabf214..528e6d6d25 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -111,6 +111,12 @@ abstract class FileTransport implements BuildEventTransport {
}
}
+ @Override
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public void closeNow() {
+ close();
+ }
+
private boolean closing() {
return closeFuture != null;
}