aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/buildeventservice
diff options
context:
space:
mode:
authorGravatar Googler <noreply@google.com>2017-08-24 21:11:50 +0200
committerGravatar Damien Martin-Guillerez <dmarting@google.com>2017-08-25 12:53:44 +0200
commite7e0f87cb69ecf69503b3fbe82ab12c62f383666 (patch)
treef11ffa28132ec173f7a8ebf5ddd92b2cdf5aaeab /src/main/java/com/google/devtools/build/lib/buildeventservice
parente005adf491c8f86c38439c377b27e29336841057 (diff)
Abort stream if BuildEventServiceTransport does not wait for all ACKs.
This change handling for when BuildEventServiceTransport would timesout waiting for pendingAcks. RELNOTES: None. PiperOrigin-RevId: 166375844
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventservice')
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java44
1 files changed, 28 insertions, 16 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index 60c35302e8..cb0082eae8 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -79,8 +79,8 @@ public class BuildEventServiceTransport implements BuildEventTransport {
private static final Logger logger = Logger.getLogger(BuildEventServiceTransport.class.getName());
- /** Max wait time until for the Streaming RPC to finish after all events were enqueued. */
- private static final Duration PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT = Duration.ofSeconds(120);
+ /** Max wait time until for the Streaming RPC to finish after all events were sent. */
+ private static final Duration PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT = Duration.ofSeconds(30);
private final ListeningExecutorService uploaderExecutorService;
private final Duration uploadTimeout;
@@ -396,42 +396,51 @@ public class BuildEventServiceTransport implements BuildEventTransport {
/**
* Used as method reference, responsible for the entire Streaming RPC. Safe to retry. This method
- * it carries states between consecutive calls (pendingAck messages will be added to the head of
+ * carries over the state between consecutive calls (pendingAck messages will be added to the head
* of the pendingSend queue), but that is intended behavior.
*/
- private Status publishEventStream() throws Exception {
+ private void publishEventStream() throws Exception {
// Reschedule unacked messages if required, keeping its original order.
PublishBuildToolEventStreamRequest unacked;
while ((unacked = pendingAck.pollLast()) != null) {
pendingSend.addFirst(unacked);
}
pendingAck = new ConcurrentLinkedDeque<>();
-
- return publishEventStream(pendingAck, pendingSend, besClient)
- .get(PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+ publishEventStream(pendingAck, pendingSend, besClient);
}
/** Method responsible for a single Streaming RPC. */
- private static ListenableFuture<Status> publishEventStream(
+ private static void publishEventStream(
final ConcurrentLinkedDeque<PublishBuildToolEventStreamRequest> pendingAck,
final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend,
final BuildEventServiceClient besClient)
throws Exception {
- PublishBuildToolEventStreamRequest event;
ListenableFuture<Status> streamDone = besClient.openStream(ackCallback(pendingAck, besClient));
try {
+ PublishBuildToolEventStreamRequest event;
do {
event = pendingSend.takeFirst();
pendingAck.add(event);
besClient.sendOverStream(event);
} while (!isLastEvent(event));
besClient.closeStream();
- logger.log(Level.INFO, "Closing the build event stream.");
} catch (Exception e) {
- logger.log(Level.WARNING, "Aborting publishEventStream.", e);
- besClient.abortStream(Status.INTERNAL.augmentDescription(e.getMessage()));
+ String additionalDetail = e.getMessage();
+ logger.log(Level.WARNING, "Aborting publishBuildToolEventStream RPC: " + additionalDetail);
+ besClient.abortStream(Status.INTERNAL.augmentDescription(additionalDetail));
+ throw e;
+ }
+
+ try {
+ Status status =
+ streamDone.get(PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+ logger.log(Level.INFO, "Done with publishEventStream(). Status: " + status);
+ } catch (TimeoutException e) {
+ String additionalDetail = "Build Event Protocol upload timed out waiting for ACK messages";
+ logger.log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail);
+ besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetail));
+ throw e;
}
- return streamDone;
}
private static boolean isLastEvent(PublishBuildToolEventStreamRequest event) {
@@ -451,9 +460,12 @@ public class BuildEventServiceTransport implements BuildEventTransport {
if (pendingSeq != ackSeq) {
besClient.abortStream(
Status.INTERNAL.augmentDescription(
- format("Expected ack %s but was %s.", pendingSeq, ackSeq)));
- } else {
- pendingAck.removeFirst();
+ format("Expected ACK %s but was %s.", pendingSeq, ackSeq)));
+ return null;
+ }
+ PublishBuildToolEventStreamRequest event = pendingAck.removeFirst();
+ if (isLastEvent(event)) {
+ logger.log(Level.INFO, "Last ACK received.");
}
return null;
};