aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/Retrier.java145
1 files changed, 11 insertions, 134 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
index 4a248a17f4..a84e4418d3 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
@@ -19,14 +19,12 @@ import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.util.Preconditions;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
-import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -45,108 +43,6 @@ import java.util.concurrent.TimeUnit;
* }
* });
* </pre>
- *
- * <p>When you need to retry multiple asynchronous calls, you can do:
- *
- * <pre>
- * Retrier.Backoff backoff = retrier.newBackoff();
- * List<Status> errors = Collections.synchronizedList(new ArrayList<Status>());
- * while (true) {
- * CountDownLatch finishLatch = new CountDownLatch(items.size());
- * for (Item item : items) {
- * requestObserver = myStub.asyncCall(
- * request,
- * new StreamObserver<Response>() {
- * ...
- *
- * @Override
- * public void onError(Throwable t) {
- * // Need to handle non Status errors here!
- * errors.add(Status.fromThrowable(t));
- * finishLatch.countDown();
- * }
- * @Override
- * public void onCompleted() {
- * finishLatch.countDown();
- * }
- * });
- * requestObserver.onNext(i1);
- * requestObserver.onNext(i2);
- * ...
- * requestObserver.onCompleted();
- * }
- * finishLatch.await(someTime, TimeUnit.SECONDS);
- * if (errors.isEmpty()) {
- * return;
- * }
- * retrier.onFailures(backoff, errors); // Sleep once for the whole batch of failures.
- * items = failingItems; // this needs to be collected from the observers as well.
- * }
- * </pre>
- *
- * <p>This retries the multiple calls in bulk. Another way to do it is retry each call separately as
- * it occurs:
- *
- * <pre>
- * class RetryingObserver extends StreamObserver<Response> {
- * private final CountDownLatch finishLatch;
- * private final Backoff backoff;
- * private final AtomicReference<RuntimeException> exception;
- *
- * RetryingObserver(
- * CountDownLatch finishLatch, Backoff backoff, AtomicReference<RuntimeException> exception) {
- * this.finishLatch = finishLatch;
- * this.backoff = backoff;
- * this.exception = exception;
- * }
- *
- * @Override
- * public void onError(Throwable t) {
- * // Need to handle non Status errors here first!
- * try {
- * retrier.onFailure(backoff, Status.fromThrowable(t));
- *
- * // This assumes you passed through the relevant info to recreate the original request:
- * requestObserver = myStub.asyncCall(
- * request,
- * new RetryingObserver(finishLatch, backoff)); // Recursion!
- * requestObserver.onNext(i1);
- * requestObserver.onNext(i2);
- * ...
- * requestObserver.onCompleted();
- *
- * } catch (RetryException e) {
- * exception.compareAndSet(null, e);
- * finishLatch.countDown();
- * }
- * }
- * @Override
- * public void onCompleted() {
- * finishLatch.countDown();
- * }
- * }
- *
- * Retrier.Backoff backoff = retrier.newBackoff();
- * List<Status> errors = Collections.synchronizedList(new ArrayList<Status>());
- * while (true) {
- * CountDownLatch finishLatch = new CountDownLatch(items.size());
- * for (Item item : items) {
- * requestObserver = myStub.asyncCall(
- * request,
- * new RetryingObserver(finishLatch, backoff));
- * requestObserver.onNext(i1);
- * requestObserver.onNext(i2);
- * ...
- * requestObserver.onCompleted();
- * }
- * finishLatch.await(someTime, TimeUnit.SECONDS);
- * if (exception.get() != null) {
- * throw exception.get(); // Re-throw the first encountered exception.
- * }
- * }
- * </pre>
- *
- * In both cases you need to instantiate and keep a Backoff object, and use onFailure(s) to retry.
*/
public class Retrier {
/** Wraps around a StatusRuntimeException to make it pass through a single layer of retries. */
@@ -160,7 +56,7 @@ public class Retrier {
* Backoff is a stateful object providing a sequence of durations that are used to time delays
* between retries. It is not ThreadSafe. The reason that Backoff needs to be stateful, rather
* than a static map of attempt number to delay, is to enable using the retrier via the manual
- * onFailure(backoff, e) method (see multiple async gRPC calls example above).
+ * calling isRetriable and nextDelayMillis manually (see ByteStreamUploader example).
*/
public interface Backoff {
@@ -309,7 +205,16 @@ public class Retrier {
} catch (RetryException e) {
throw e; // Nested retries are always pass-through.
} catch (StatusException | StatusRuntimeException e) {
- onFailure(backoff, Status.fromThrowable(e));
+ Status st = Status.fromThrowable(e);
+ long delay = backoff.nextDelayMillis();
+ if (st.getCode() == Status.Code.CANCELLED && Thread.currentThread().isInterrupted()) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedException();
+ }
+ if (delay < 0 || !isRetriable.apply(st)) {
+ throw new RetryException(st.asRuntimeException(), backoff.getRetryAttempts());
+ }
+ sleep(delay);
} catch (Exception e) {
// Generic catch because Callable is declared to throw Exception, we rethrow any unchecked
// exception as well as any exception we declared above.
@@ -331,32 +236,4 @@ public class Retrier {
public Backoff newBackoff() {
return backoffSupplier.get();
}
-
- public void onFailure(Backoff backoff, Status s) throws RetryException, InterruptedException {
- onFailures(backoff, ImmutableList.of(s));
- }
-
- /**
- * Handles failures according to the current backoff/retry policy. If any of the errors are not
- * retriable, the first such error is thrown. Otherwise, if backoff still allows, this sleeps for
- * the specified duration. Otherwise, the first error is thrown.
- *
- * @param backoff The backoff object to get delays from.
- * @param errors The errors that occurred (must be non-empty).
- */
- public void onFailures(Backoff backoff, List<Status> errors)
- throws InterruptedException, RetryException {
- Preconditions.checkArgument(!errors.isEmpty(), "errors must be non-empty");
- long delay = backoff.nextDelayMillis();
- for (Status st : errors) {
- if (st.getCode() == Status.Code.CANCELLED && Thread.currentThread().isInterrupted()) {
- Thread.currentThread().interrupt();
- throw new InterruptedException();
- }
- if (delay < 0 || !isRetriable.apply(st)) {
- throw new RetryException(st.asRuntimeException(), backoff.getRetryAttempts());
- }
- }
- sleep(delay);
- }
}