aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java91
1 files changed, 54 insertions, 37 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
index fb8cf0d9c4..708475d882 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
@@ -135,45 +135,62 @@ class GrpcRemoteExecutor {
return retrier.execute(
() -> {
Iterator<ChangeBatch> replies = watcherBlockingStub().watch(wr);
- while (replies.hasNext()) {
- ChangeBatch cb = replies.next();
- for (Change ch : cb.getChangesList()) {
- switch (ch.getState()) {
- case INITIAL_STATE_SKIPPED:
- continue;
- case ERROR:
- try {
- throw StatusProto.toStatusRuntimeException(
- ch.getData().unpack(Status.class));
- } catch (InvalidProtocolBufferException e) {
- throw new IOException(e);
- }
- case DOES_NOT_EXIST:
- // TODO(olaola): either make this retriable, or use a different exception.
- throw new IOException(
- String.format("Operation %s lost on the remote server.", op.getName()));
- case EXISTS:
- Operation o;
- try {
- o = ch.getData().unpack(Operation.class);
- } catch (InvalidProtocolBufferException e) {
- throw new IOException(e);
- }
- try {
- ExecuteResponse r = getOperationResponse(o);
- if (r != null) {
- return r;
+ try {
+ while (replies.hasNext()) {
+ ChangeBatch cb = replies.next();
+ for (Change ch : cb.getChangesList()) {
+ switch (ch.getState()) {
+ case INITIAL_STATE_SKIPPED:
+ continue;
+ case ERROR:
+ try {
+ throw StatusProto.toStatusRuntimeException(
+ ch.getData().unpack(Status.class));
+ } catch (InvalidProtocolBufferException e) {
+ throw new IOException(e);
}
- } catch (StatusRuntimeException e) {
- // Pass through the Watch retry and retry the whole execute+watch call.
- throw new RemoteRetrier.PassThroughException(e);
- }
- continue;
- default:
- // This can only happen if the enum gets unexpectedly extended.
- throw new IOException(
- String.format("Illegal change state: %s", ch.getState()));
+ case DOES_NOT_EXIST:
+ // TODO(olaola): either make this retriable, or use a different exception.
+ throw new IOException(
+ String.format(
+ "Operation %s lost on the remote server.", op.getName()));
+ case EXISTS:
+ Operation o;
+ try {
+ o = ch.getData().unpack(Operation.class);
+ } catch (InvalidProtocolBufferException e) {
+ throw new IOException(e);
+ }
+ try {
+ ExecuteResponse r = getOperationResponse(o);
+ if (r != null) {
+ return r;
+ }
+ } catch (StatusRuntimeException e) {
+ // Pass through the Watch retry and retry the whole execute+watch call.
+ throw new RemoteRetrier.PassThroughException(e);
+ }
+ continue;
+ default:
+ // This can only happen if the enum gets unexpectedly extended.
+ throw new IOException(
+ String.format("Illegal change state: %s", ch.getState()));
+ }
+ }
+ }
+ } finally {
+ // The blocking streaming call closes correctly only when trailers and a Status
+ // are received from the server so that onClose() is called on this call's
+ // CallListener. Under normal circumstances (no cancel/errors), these are
+ // guaranteed to be sent by the server only if replies.hasNext() has been called
+ // after all replies from the stream have been consumed.
+ try {
+ while (replies.hasNext()) {
+ replies.next();
}
+ } catch (StatusRuntimeException e) {
+ // Cleanup: ignore exceptions, because the meaningful errors have already been
+ // propagated.
}
}
throw new IOException(