// Copyright 2016 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.remote; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.util.Preconditions; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.StatusRuntimeException; import java.io.IOException; import java.time.Duration; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; /** * Supports execution with retries on particular gRPC Statuses. The retrier is ThreadSafe. * *
Example usage: The simple use-case is to call retrier.execute, e.g: * *
* foo = retrier.execute( * new Callable* *() { * @Override * public Foo call() { * return grpcStub.getFoo(fooRequest); * } * }); *
When you need to retry multiple asynchronous calls, you can do: * *
* Retrier.Backoff backoff = retrier.newBackoff(); * List* *errors = Collections.synchronizedList(new ArrayList ()); * while (true) { * CountDownLatch finishLatch = new CountDownLatch(items.size()); * for (Item item : items) { * requestObserver = myStub.asyncCall( * request, * new StreamObserver () { * ... * * @Override * public void onError(Throwable t) { * // Need to handle non Status errors here! * errors.add(Status.fromThrowable(t)); * finishLatch.countDown(); * } * @Override * public void onCompleted() { * finishLatch.countDown(); * } * }); * requestObserver.onNext(i1); * requestObserver.onNext(i2); * ... * requestObserver.onCompleted(); * } * finishLatch.await(someTime, TimeUnit.SECONDS); * if (errors.isEmpty()) { * return; * } * retrier.onFailures(backoff, errors); // Sleep once for the whole batch of failures. * items = failingItems; // this needs to be collected from the observers as well. * } *
This retries the multiple calls in bulk. Another way to do it is retry each call separately as * it occurs: * *
* class RetryingObserver extends StreamObserver* * In both cases you need to instantiate and keep a Backoff object, and use onFailure(s) to retry. */ public class Retrier { /** Wraps around a StatusRuntimeException to make it pass through a single layer of retries. */ public static class PassThroughException extends Exception { public PassThroughException(StatusRuntimeException e) { super(e); } } /** * Backoff is a stateful object providing a sequence of durations that are used to time delays * between retries. It is not ThreadSafe. The reason that Backoff needs to be stateful, rather * than a static map of attempt number to delay, is to enable using the retrier via the manual * onFailure(backoff, e) method (see multiple async gRPC calls example above). */ public interface Backoff { /** Indicates that no more retries should be made for use in {@link #nextDelayMillis()}. */ static final long STOP = -1L; /** Returns the next delay in milliseconds, or < 0 if we should not continue retrying. */ long nextDelayMillis(); /** * Returns the number of calls to {@link #nextDelayMillis()} thus far, not counting any calls * that returned STOP. */ int getRetryAttempts(); /** * Creates a Backoff supplier for a Backoff which does not support any retries. Both the * Supplier and the Backoff are stateless and thread-safe. */ static final Supplier{ * private final CountDownLatch finishLatch; * private final Backoff backoff; * private final AtomicReference exception; * * RetryingObserver( * CountDownLatch finishLatch, Backoff backoff, AtomicReference exception) { * this.finishLatch = finishLatch; * this.backoff = backoff; * this.exception = exception; * } * * @Override * public void onError(Throwable t) { * // Need to handle non Status errors here first! * try { * retrier.onFailure(backoff, Status.fromThrowable(t)); * * // This assumes you passed through the relevant info to recreate the original request: * requestObserver = myStub.asyncCall( * request, * new RetryingObserver(finishLatch, backoff)); // Recursion! * requestObserver.onNext(i1); * requestObserver.onNext(i2); * ... * requestObserver.onCompleted(); * * } catch (RetryException e) { * exception.compareAndSet(null, e); * finishLatch.countDown(); * } * } * @Override * public void onCompleted() { * finishLatch.countDown(); * } * } * * Retrier.Backoff backoff = retrier.newBackoff(); * List errors = Collections.synchronizedList(new ArrayList ()); * while (true) { * CountDownLatch finishLatch = new CountDownLatch(items.size()); * for (Item item : items) { * requestObserver = myStub.asyncCall( * request, * new RetryingObserver(finishLatch, backoff)); * requestObserver.onNext(i1); * requestObserver.onNext(i2); * ... * requestObserver.onCompleted(); * } * finishLatch.await(someTime, TimeUnit.SECONDS); * if (exception.get() != null) { * throw exception.get(); // Re-throw the first encountered exception. * } * } *
This method never throws {@link StatusRuntimeException} even if the passed-in Callable does.
*
* @param c The callable to execute.
*/
public