diff options
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java | 91 |
1 files changed, 54 insertions, 37 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index fb8cf0d9c4..708475d882 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -135,45 +135,62 @@ class GrpcRemoteExecutor { return retrier.execute( () -> { Iterator<ChangeBatch> replies = watcherBlockingStub().watch(wr); - while (replies.hasNext()) { - ChangeBatch cb = replies.next(); - for (Change ch : cb.getChangesList()) { - switch (ch.getState()) { - case INITIAL_STATE_SKIPPED: - continue; - case ERROR: - try { - throw StatusProto.toStatusRuntimeException( - ch.getData().unpack(Status.class)); - } catch (InvalidProtocolBufferException e) { - throw new IOException(e); - } - case DOES_NOT_EXIST: - // TODO(olaola): either make this retriable, or use a different exception. - throw new IOException( - String.format("Operation %s lost on the remote server.", op.getName())); - case EXISTS: - Operation o; - try { - o = ch.getData().unpack(Operation.class); - } catch (InvalidProtocolBufferException e) { - throw new IOException(e); - } - try { - ExecuteResponse r = getOperationResponse(o); - if (r != null) { - return r; + try { + while (replies.hasNext()) { + ChangeBatch cb = replies.next(); + for (Change ch : cb.getChangesList()) { + switch (ch.getState()) { + case INITIAL_STATE_SKIPPED: + continue; + case ERROR: + try { + throw StatusProto.toStatusRuntimeException( + ch.getData().unpack(Status.class)); + } catch (InvalidProtocolBufferException e) { + throw new IOException(e); } - } catch (StatusRuntimeException e) { - // Pass through the Watch retry and retry the whole execute+watch call. - throw new RemoteRetrier.PassThroughException(e); - } - continue; - default: - // This can only happen if the enum gets unexpectedly extended. - throw new IOException( - String.format("Illegal change state: %s", ch.getState())); + case DOES_NOT_EXIST: + // TODO(olaola): either make this retriable, or use a different exception. + throw new IOException( + String.format( + "Operation %s lost on the remote server.", op.getName())); + case EXISTS: + Operation o; + try { + o = ch.getData().unpack(Operation.class); + } catch (InvalidProtocolBufferException e) { + throw new IOException(e); + } + try { + ExecuteResponse r = getOperationResponse(o); + if (r != null) { + return r; + } + } catch (StatusRuntimeException e) { + // Pass through the Watch retry and retry the whole execute+watch call. + throw new RemoteRetrier.PassThroughException(e); + } + continue; + default: + // This can only happen if the enum gets unexpectedly extended. + throw new IOException( + String.format("Illegal change state: %s", ch.getState())); + } + } + } + } finally { + // The blocking streaming call closes correctly only when trailers and a Status + // are received from the server so that onClose() is called on this call's + // CallListener. Under normal circumstances (no cancel/errors), these are + // guaranteed to be sent by the server only if replies.hasNext() has been called + // after all replies from the stream have been consumed. + try { + while (replies.hasNext()) { + replies.next(); } + } catch (StatusRuntimeException e) { + // Cleanup: ignore exceptions, because the meaningful errors have already been + // propagated. } } throw new IOException( |