diff options
author | Googler <noreply@google.com> | 2017-08-24 21:11:50 +0200 |
---|---|---|
committer | Damien Martin-Guillerez <dmarting@google.com> | 2017-08-25 12:53:44 +0200 |
commit | e7e0f87cb69ecf69503b3fbe82ab12c62f383666 (patch) | |
tree | f11ffa28132ec173f7a8ebf5ddd92b2cdf5aaeab /src/main/java/com/google/devtools/build/lib/buildeventservice | |
parent | e005adf491c8f86c38439c377b27e29336841057 (diff) |
Abort stream if BuildEventServiceTransport does not wait for all ACKs.
This change handling for when BuildEventServiceTransport would timesout waiting for pendingAcks.
RELNOTES: None.
PiperOrigin-RevId: 166375844
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventservice')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java | 44 |
1 files changed, 28 insertions, 16 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java index 60c35302e8..cb0082eae8 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java @@ -79,8 +79,8 @@ public class BuildEventServiceTransport implements BuildEventTransport { private static final Logger logger = Logger.getLogger(BuildEventServiceTransport.class.getName()); - /** Max wait time until for the Streaming RPC to finish after all events were enqueued. */ - private static final Duration PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT = Duration.ofSeconds(120); + /** Max wait time until for the Streaming RPC to finish after all events were sent. */ + private static final Duration PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT = Duration.ofSeconds(30); private final ListeningExecutorService uploaderExecutorService; private final Duration uploadTimeout; @@ -396,42 +396,51 @@ public class BuildEventServiceTransport implements BuildEventTransport { /** * Used as method reference, responsible for the entire Streaming RPC. Safe to retry. This method - * it carries states between consecutive calls (pendingAck messages will be added to the head of + * carries over the state between consecutive calls (pendingAck messages will be added to the head * of the pendingSend queue), but that is intended behavior. */ - private Status publishEventStream() throws Exception { + private void publishEventStream() throws Exception { // Reschedule unacked messages if required, keeping its original order. PublishBuildToolEventStreamRequest unacked; while ((unacked = pendingAck.pollLast()) != null) { pendingSend.addFirst(unacked); } pendingAck = new ConcurrentLinkedDeque<>(); - - return publishEventStream(pendingAck, pendingSend, besClient) - .get(PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + publishEventStream(pendingAck, pendingSend, besClient); } /** Method responsible for a single Streaming RPC. */ - private static ListenableFuture<Status> publishEventStream( + private static void publishEventStream( final ConcurrentLinkedDeque<PublishBuildToolEventStreamRequest> pendingAck, final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend, final BuildEventServiceClient besClient) throws Exception { - PublishBuildToolEventStreamRequest event; ListenableFuture<Status> streamDone = besClient.openStream(ackCallback(pendingAck, besClient)); try { + PublishBuildToolEventStreamRequest event; do { event = pendingSend.takeFirst(); pendingAck.add(event); besClient.sendOverStream(event); } while (!isLastEvent(event)); besClient.closeStream(); - logger.log(Level.INFO, "Closing the build event stream."); } catch (Exception e) { - logger.log(Level.WARNING, "Aborting publishEventStream.", e); - besClient.abortStream(Status.INTERNAL.augmentDescription(e.getMessage())); + String additionalDetail = e.getMessage(); + logger.log(Level.WARNING, "Aborting publishBuildToolEventStream RPC: " + additionalDetail); + besClient.abortStream(Status.INTERNAL.augmentDescription(additionalDetail)); + throw e; + } + + try { + Status status = + streamDone.get(PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + logger.log(Level.INFO, "Done with publishEventStream(). Status: " + status); + } catch (TimeoutException e) { + String additionalDetail = "Build Event Protocol upload timed out waiting for ACK messages"; + logger.log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail); + besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetail)); + throw e; } - return streamDone; } private static boolean isLastEvent(PublishBuildToolEventStreamRequest event) { @@ -451,9 +460,12 @@ public class BuildEventServiceTransport implements BuildEventTransport { if (pendingSeq != ackSeq) { besClient.abortStream( Status.INTERNAL.augmentDescription( - format("Expected ack %s but was %s.", pendingSeq, ackSeq))); - } else { - pendingAck.removeFirst(); + format("Expected ACK %s but was %s.", pendingSeq, ackSeq))); + return null; + } + PublishBuildToolEventStreamRequest event = pendingAck.removeFirst(); + if (isLastEvent(event)) { + logger.log(Level.INFO, "Last ACK received."); } return null; }; |