// Copyright 2017 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.remote; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AsyncCallable; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableScheduledFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State; import java.io.IOException; import java.util.concurrent.Callable; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; /** * Supports retrying the execution of a {@link Callable} in case of failure. * *

The errors that are retried are configurable via a {@link Predicate}. The * delay between executions is specified by a {@link Backoff}. Additionally, the retrier supports * circuit breaking to stop execution in case of high failure rates. */ @ThreadSafe public class Retrier { /** A backoff strategy. */ public interface Backoff { /** * Returns the next delay in milliseconds, or a value less than {@code 0} if we should stop * retrying. */ long nextDelayMillis(); /** * Returns the number of calls to {@link #nextDelayMillis()} thus far, not counting any calls * that returned less than {@code 0}. */ int getRetryAttempts(); } /** * The circuit breaker allows to reject execution when failure rates are high. * *

The initial state of a circuit breaker is the {@link State#ACCEPT_CALLS}. Calls are executed * and retried in this state. However, if error rates are high a circuit breaker can choose to * transition into {@link State#REJECT_CALLS}. In this state any calls are rejected with a {@link * RetryException} immediately. A circuit breaker in state {@link State#REJECT_CALLS} can * periodically return a {@code TRIAL_CALL} state, in which case a call will be executed once and * in case of success the circuit breaker may return to state {@code ACCEPT_CALLS}. * *

A circuit breaker implementation must be thread-safe. * * @see CircuitBreaker */ public interface CircuitBreaker { /** The state of the circuit breaker. */ enum State { /** * Calls are executed and retried in case of failure. * *

The circuit breaker can transition into state {@link State#REJECT_CALLS}. */ ACCEPT_CALLS, /** * A call is executed and not retried in case of failure. * *

The circuit breaker can transition into any state. */ TRIAL_CALL, /** * All calls are rejected. * *

The circuit breaker can transition into state {@link State#TRIAL_CALL}. */ REJECT_CALLS } /** Returns the current {@link State} of the circuit breaker. */ State state(); /** Called after an execution failed. */ void recordFailure(); /** Called after an execution succeeded. */ void recordSuccess(); } /** * {@link Sleeper#sleep(long)} is called to pause between synchronous retries ({@link * #execute(Callable)}. */ public interface Sleeper { void sleep(long millis) throws InterruptedException; } /** * Wraps around the actual cause for the retry. Contains information about the number of retry * attempts. */ public static class RetryException extends IOException { private final int attempts; public RetryException(String message, int numRetries, Exception cause) { super(message, cause); this.attempts = numRetries + 1; } protected RetryException(String message) { super(message); this.attempts = 0; } /** * Returns the number of times a {@link Callable} has been executed before this exception was * thrown. */ public int getAttempts() { return attempts; } } /** Thrown if the call was stopped by a circuit breaker. */ public static class CircuitBreakerException extends RetryException { private CircuitBreakerException(String message, int numRetries, Exception cause) { super(message, numRetries, cause); } private CircuitBreakerException() { super("Call not executed due to a high failure rate."); } } /** Disables circuit breaking. */ public static final CircuitBreaker ALLOW_ALL_CALLS = new CircuitBreaker() { @Override public State state() { return State.ACCEPT_CALLS; } @Override public void recordFailure() {} @Override public void recordSuccess() {} }; /** Disables retries. */ public static final Backoff RETRIES_DISABLED = new Backoff() { @Override public long nextDelayMillis() { return -1; } @Override public int getRetryAttempts() { return 0; } }; /** No backoff. */ public static class ZeroBackoff implements Backoff { private final int maxRetries; private int retries; public ZeroBackoff(int maxRetries) { this.maxRetries = maxRetries; } @Override public long nextDelayMillis() { if (retries >= maxRetries) { return -1; } retries++; return 0; } @Override public int getRetryAttempts() { return retries; } } private final Supplier backoffSupplier; private final Predicate shouldRetry; private final CircuitBreaker circuitBreaker; private final ListeningScheduledExecutorService retryService; private final Sleeper sleeper; public Retrier( Supplier backoffSupplier, Predicate shouldRetry, ListeningScheduledExecutorService retryScheduler, CircuitBreaker circuitBreaker) { this( backoffSupplier, shouldRetry, retryScheduler, circuitBreaker, TimeUnit.MILLISECONDS::sleep); } @VisibleForTesting Retrier( Supplier backoffSupplier, Predicate shouldRetry, ListeningScheduledExecutorService retryService, CircuitBreaker circuitBreaker, Sleeper sleeper) { this.backoffSupplier = backoffSupplier; this.shouldRetry = shouldRetry; this.retryService = retryService; this.circuitBreaker = circuitBreaker; this.sleeper = sleeper; } /** * Execute a {@link Callable}, retrying execution in case of failure and returning the result in * case of success. * *

{@link InterruptedException} is not retried. * * @param call the {@link Callable} to execute. * @throws RetryException if the {@code call} didn't succeed within the framework specified by * {@code backoffSupplier} and {@code shouldRetry}. * @throws CircuitBreakerException in case a call was rejected because the circuit breaker * tripped. * @throws InterruptedException if the {@code call} throws an {@link InterruptedException} or the * current thread's interrupted flag is set. */ public T execute(Callable call) throws RetryException, InterruptedException { final Backoff backoff = newBackoff(); while (true) { final State circuitState; circuitState = circuitBreaker.state(); if (State.REJECT_CALLS.equals(circuitState)) { throw new CircuitBreakerException(); } try { if (Thread.interrupted()) { throw new InterruptedException(); } T r = call.call(); circuitBreaker.recordSuccess(); return r; } catch (InterruptedException e) { circuitBreaker.recordFailure(); throw e; } catch (Exception e) { circuitBreaker.recordFailure(); if (e instanceof RetryException) { // Support nested retry calls. e = (Exception) e.getCause(); } if (State.TRIAL_CALL.equals(circuitState)) { throw new CircuitBreakerException( "Call failed in circuit breaker half open state.", 0, e); } int attempts = backoff.getRetryAttempts(); if (!shouldRetry.test(e)) { throw new RetryException("Call failed with not retriable error: " + e, attempts, e); } final long delayMillis = backoff.nextDelayMillis(); if (delayMillis < 0) { throw new RetryException( "Call failed after " + attempts + " retry attempts: " + e, attempts, e); } sleeper.sleep(delayMillis); } } } /** * Executes an {@link AsyncCallable}, retrying execution in case of failure and returning a {@link * ListenableFuture} pointing to the result/error. */ public ListenableFuture executeAsync(AsyncCallable call) { SettableFuture f = SettableFuture.create(); executeAsync(call, f); return f; } /** * Executes an {@link AsyncCallable}, retrying execution in case of failure and uses the provided * {@code promise} to point to the result/error. */ public void executeAsync(AsyncCallable call, SettableFuture promise) { Preconditions.checkNotNull(call); Preconditions.checkNotNull(promise); Backoff backoff = newBackoff(); executeAsync(call, promise, backoff); } private void executeAsync(AsyncCallable call, SettableFuture outerF, Backoff backoff) { Preconditions.checkState(!outerF.isDone(), "outerF completed already."); try { Futures.addCallback( call.call(), new FutureCallback() { @Override public void onSuccess(T t) { outerF.set(t); } @Override public void onFailure(Throwable t) { onExecuteAsyncFailure(t, call, outerF, backoff); } }, MoreExecutors.directExecutor()); } catch (Exception e) { onExecuteAsyncFailure(e, call, outerF, backoff); } } private void onExecuteAsyncFailure( Throwable t, AsyncCallable call, SettableFuture outerF, Backoff backoff) { long waitMillis = backoff.nextDelayMillis(); if (waitMillis >= 0 && t instanceof Exception && isRetriable((Exception) t)) { try { ListenableScheduledFuture sf = retryService.schedule( () -> executeAsync(call, outerF, backoff), waitMillis, TimeUnit.MILLISECONDS); Futures.addCallback( sf, new FutureCallback() { @Override public void onSuccess(Object o) { // Submitted successfully. Intentionally left empty. } @Override public void onFailure(Throwable t) { Exception e = t instanceof Exception ? (Exception) t : new Exception(t); outerF.setException( new RetryException( "Scheduled execution errored.", backoff.getRetryAttempts(), e)); } }, MoreExecutors.directExecutor()); } catch (RejectedExecutionException e) { // May be thrown by .schedule(...) if i.e. the executor is shutdown. outerF.setException( new RetryException("Rejected by executor.", backoff.getRetryAttempts(), e)); } } else { Exception e = t instanceof Exception ? (Exception) t : new Exception(t); String message = waitMillis >= 0 ? "Status not retriable." : "Exhaused retry attempts (" + backoff.getRetryAttempts() + ")"; RetryException error = new RetryException(message, backoff.getRetryAttempts(), e); outerF.setException(error); } } public Backoff newBackoff() { return backoffSupplier.get(); } public boolean isRetriable(Exception e) { return shouldRetry.test(e); } }