aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Googler <noreply@google.com>2018-04-09 14:10:07 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-04-09 14:12:09 -0700
commit5c20c949188641db1376dd4b7ed958658ccb3670 (patch)
tree5a8ec73814b91256d787ccc3823aa5553a8237e8
parent1a77a347ca87dfa31158e94420cae3863777cc74 (diff)
Consume the rest of the response stream for Watch RPC before returning. This necessary for the call to close correctly (e.g. for listeners to receive Status/trailers).
RELNOTES: PiperOrigin-RevId: 192185329
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java91
1 files changed, 54 insertions, 37 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
index fb8cf0d9c4..708475d882 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
@@ -135,45 +135,62 @@ class GrpcRemoteExecutor {
return retrier.execute(
() -> {
Iterator<ChangeBatch> replies = watcherBlockingStub().watch(wr);
- while (replies.hasNext()) {
- ChangeBatch cb = replies.next();
- for (Change ch : cb.getChangesList()) {
- switch (ch.getState()) {
- case INITIAL_STATE_SKIPPED:
- continue;
- case ERROR:
- try {
- throw StatusProto.toStatusRuntimeException(
- ch.getData().unpack(Status.class));
- } catch (InvalidProtocolBufferException e) {
- throw new IOException(e);
- }
- case DOES_NOT_EXIST:
- // TODO(olaola): either make this retriable, or use a different exception.
- throw new IOException(
- String.format("Operation %s lost on the remote server.", op.getName()));
- case EXISTS:
- Operation o;
- try {
- o = ch.getData().unpack(Operation.class);
- } catch (InvalidProtocolBufferException e) {
- throw new IOException(e);
- }
- try {
- ExecuteResponse r = getOperationResponse(o);
- if (r != null) {
- return r;
+ try {
+ while (replies.hasNext()) {
+ ChangeBatch cb = replies.next();
+ for (Change ch : cb.getChangesList()) {
+ switch (ch.getState()) {
+ case INITIAL_STATE_SKIPPED:
+ continue;
+ case ERROR:
+ try {
+ throw StatusProto.toStatusRuntimeException(
+ ch.getData().unpack(Status.class));
+ } catch (InvalidProtocolBufferException e) {
+ throw new IOException(e);
}
- } catch (StatusRuntimeException e) {
- // Pass through the Watch retry and retry the whole execute+watch call.
- throw new RemoteRetrier.PassThroughException(e);
- }
- continue;
- default:
- // This can only happen if the enum gets unexpectedly extended.
- throw new IOException(
- String.format("Illegal change state: %s", ch.getState()));
+ case DOES_NOT_EXIST:
+ // TODO(olaola): either make this retriable, or use a different exception.
+ throw new IOException(
+ String.format(
+ "Operation %s lost on the remote server.", op.getName()));
+ case EXISTS:
+ Operation o;
+ try {
+ o = ch.getData().unpack(Operation.class);
+ } catch (InvalidProtocolBufferException e) {
+ throw new IOException(e);
+ }
+ try {
+ ExecuteResponse r = getOperationResponse(o);
+ if (r != null) {
+ return r;
+ }
+ } catch (StatusRuntimeException e) {
+ // Pass through the Watch retry and retry the whole execute+watch call.
+ throw new RemoteRetrier.PassThroughException(e);
+ }
+ continue;
+ default:
+ // This can only happen if the enum gets unexpectedly extended.
+ throw new IOException(
+ String.format("Illegal change state: %s", ch.getState()));
+ }
+ }
+ }
+ } finally {
+ // The blocking streaming call closes correctly only when trailers and a Status
+ // are received from the server so that onClose() is called on this call's
+ // CallListener. Under normal circumstances (no cancel/errors), these are
+ // guaranteed to be sent by the server only if replies.hasNext() has been called
+ // after all replies from the stream have been consumed.
+ try {
+ while (replies.hasNext()) {
+ replies.next();
}
+ } catch (StatusRuntimeException e) {
+ // Cleanup: ignore exceptions, because the meaningful errors have already been
+ // propagated.
}
}
throw new IOException(