From 60eb53bca7348edebdb2b8e6734f6b748e990ec7 Mon Sep 17 00:00:00 2001 From: olaola Date: Thu, 13 Jul 2017 00:00:27 +0200 Subject: Fixing the handling of retries for watch and execute calls. TESTED=remote worker, triggered some errors RELNOTES: fixes #3305, helps #3356 PiperOrigin-RevId: 161722997 --- .../build/lib/remote/GrpcRemoteExecutor.java | 121 +++++++++++++-------- .../google/devtools/build/lib/remote/Retrier.java | 11 ++ 2 files changed, 86 insertions(+), 46 deletions(-) (limited to 'src/main/java/com/google/devtools/build/lib/remote') diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index ad4a4ddbe7..68046fe2ca 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -67,8 +67,7 @@ final class GrpcRemoteExecutor { .withCallCredentials(callCredentials); } - private @Nullable ExecuteResponse getOperationResponse(Operation op) - throws IOException { + private @Nullable ExecuteResponse getOperationResponse(Operation op) throws IOException { if (op.getResultCase() == Operation.ResultCase.ERROR) { StatusRuntimeException e = StatusProto.toStatusRuntimeException(op.getError()); if (e.getStatus().getCode() == Code.DEADLINE_EXCEEDED) { @@ -90,53 +89,83 @@ final class GrpcRemoteExecutor { return null; } + /* Execute has two components: the execute call and the watch call. + * This is the simple flow without any errors: + * + * - A call to execute returns an Operation object. + * - That Operation may already have an inlined result; if so, we return that result. + * - Otherwise, we call watch on that operation to receive a stream of Changes to the Operation + * object, until the first such change is an Operation with a result, which we return. + * + * Error possibilities: + * - Any Operation object can have an error field instead of a result. Such Operations are + * completed and failed; however, some of these errors may be retriable. These errors should + * trigger a retry of the full execute+watch call, resulting in a new Operation. + * - An execute call may fail with a retriable error (raise a StatusRuntimeException). We then + * retry that call. + * - A watch call may fail with a retriable error (either raise a StatusRuntimeException, or + * return an ERROR in the ChangeBatch field). In that case, we retry the watch call only on the + * same operation object. + * */ public ExecuteResponse executeRemotely(ExecuteRequest request) throws IOException, InterruptedException { - Operation op = retrier.execute(() -> execBlockingStub().execute(request)); - ExecuteResponse resp = getOperationResponse(op); - if (resp != null) { - return resp; - } - Request wr = Request.newBuilder().setTarget(op.getName()).build(); - return retrier.execute( - () -> { - Iterator replies = watcherBlockingStub().watch(wr); - while (replies.hasNext()) { - ChangeBatch cb = replies.next(); - for (Change ch : cb.getChangesList()) { - switch (ch.getState()) { - case INITIAL_STATE_SKIPPED: - continue; - case ERROR: - try { - throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class)); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - case DOES_NOT_EXIST: - // TODO(olaola): either make this retriable, or use a different exception. - throw new IOException( - String.format("Operation %s lost on the remote server.", op.getName())); - case EXISTS: - Operation o; - try { - o = ch.getData().unpack(Operation.class); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - ExecuteResponse r = getOperationResponse(o); - if (r != null) { - return r; - } - continue; - default: - // This can only happen if the enum gets unexpectedly extended. - throw new IOException(String.format("Illegal change state: %s", ch.getState())); + // The only errors retried here are transient failures of the Action itself on the server, not + // any gRPC errors that occurred during the call. + return retrier.execute(() -> { + // Here all transient gRPC errors will be retried. + Operation op = retrier.execute(() -> execBlockingStub().execute(request)); + ExecuteResponse resp = getOperationResponse(op); + if (resp != null) { + return resp; + } + Request wr = Request.newBuilder().setTarget(op.getName()).build(); + // Here all transient gRPC errors will be retried, while transient failures of the Action + // itself will be propagated. + return retrier.execute( + () -> { + Iterator replies = watcherBlockingStub().watch(wr); + while (replies.hasNext()) { + ChangeBatch cb = replies.next(); + for (Change ch : cb.getChangesList()) { + switch (ch.getState()) { + case INITIAL_STATE_SKIPPED: + continue; + case ERROR: + try { + throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class)); + } catch (InvalidProtocolBufferException e) { + throw new IOException(e); + } + case DOES_NOT_EXIST: + // TODO(olaola): either make this retriable, or use a different exception. + throw new IOException( + String.format("Operation %s lost on the remote server.", op.getName())); + case EXISTS: + Operation o; + try { + o = ch.getData().unpack(Operation.class); + } catch (InvalidProtocolBufferException e) { + throw new IOException(e); + } + try { + ExecuteResponse r = getOperationResponse(o); + if (r != null) { + return r; + } + } catch (StatusRuntimeException e) { + // Pass through the Watch retry and retry the whole execute+watch call. + throw new Retrier.PassThroughException(e); + } + continue; + default: + // This can only happen if the enum gets unexpectedly extended. + throw new IOException(String.format("Illegal change state: %s", ch.getState())); + } } } - } - throw new IOException( - String.format("Watch request for %s terminated with no result.", op.getName())); - }); + throw new IOException( + String.format("Watch request for %s terminated with no result.", op.getName())); + }); + }); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java index 33d2422107..4a248a17f4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java @@ -149,6 +149,13 @@ import java.util.concurrent.TimeUnit; * In both cases you need to instantiate and keep a Backoff object, and use onFailure(s) to retry. */ public class Retrier { + /** Wraps around a StatusRuntimeException to make it pass through a single layer of retries. */ + public static class PassThroughException extends Exception { + public PassThroughException(StatusRuntimeException e) { + super(e); + } + } + /** * Backoff is a stateful object providing a sequence of durations that are used to time delays * between retries. It is not ThreadSafe. The reason that Backoff needs to be stateful, rather @@ -297,6 +304,10 @@ public class Retrier { while (true) { try { return c.call(); + } catch (PassThroughException e) { + throw (StatusRuntimeException) e.getCause(); + } catch (RetryException e) { + throw e; // Nested retries are always pass-through. } catch (StatusException | StatusRuntimeException e) { onFailure(backoff, Status.fromThrowable(e)); } catch (Exception e) { -- cgit v1.2.3