From 44e40bc84d05eea7a3527fed12028ef58e90d607 Mon Sep 17 00:00:00 2001 From: buchgr Date: Mon, 4 Dec 2017 10:44:47 -0800 Subject: remote: Replace Retrier with Retrier2. - Replace the existing Retrier with Retrier2. - Rename Retrier2 to Retrier and remove the old Retrier + RetryException class. RELNOTES: None. PiperOrigin-RevId: 177835070 --- .../build/lib/remote/ByteStreamUploader.java | 43 +-- .../devtools/build/lib/remote/GrpcRemoteCache.java | 11 +- .../build/lib/remote/GrpcRemoteExecutor.java | 120 +++---- .../devtools/build/lib/remote/RemoteModule.java | 4 +- .../devtools/build/lib/remote/RemoteRetrier.java | 12 +- .../build/lib/remote/RemoteRetrierUtils.java | 33 ++ .../build/lib/remote/RemoteSpawnRunner.java | 12 +- .../google/devtools/build/lib/remote/Retrier.java | 363 +++++++++++---------- .../google/devtools/build/lib/remote/Retrier2.java | 260 --------------- .../devtools/build/lib/remote/RetryException.java | 43 --- .../build/lib/remote/ByteStreamUploaderTest.java | 37 ++- .../build/lib/remote/GrpcRemoteCacheTest.java | 4 +- .../lib/remote/GrpcRemoteExecutionClientTest.java | 3 +- .../build/lib/remote/RemoteRetrierTest.java | 30 +- .../devtools/build/lib/remote/Retrier2Test.java | 307 ----------------- .../devtools/build/lib/remote/RetrierTest.java | 339 +++++++++++++------ 16 files changed, 614 insertions(+), 1007 deletions(-) create mode 100644 src/main/java/com/google/devtools/build/lib/remote/RemoteRetrierUtils.java delete mode 100644 src/main/java/com/google/devtools/build/lib/remote/Retrier2.java delete mode 100644 src/main/java/com/google/devtools/build/lib/remote/RetryException.java delete mode 100644 src/test/java/com/google/devtools/build/lib/remote/Retrier2Test.java (limited to 'src') diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index 0bef2ed1ae..609ee3c669 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -32,6 +32,7 @@ import com.google.common.util.concurrent.ListenableScheduledFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; +import com.google.devtools.build.lib.remote.Retrier.RetryException; import com.google.devtools.remoteexecution.v1test.Digest; import io.grpc.CallCredentials; import io.grpc.CallOptions; @@ -68,7 +69,7 @@ final class ByteStreamUploader { private final Channel channel; private final CallCredentials callCredentials; private final long callTimeoutSecs; - private final Retrier retrier; + private final RemoteRetrier retrier; private final ListeningScheduledExecutorService retryService; private final Object lock = new Object(); @@ -89,7 +90,7 @@ final class ByteStreamUploader { * case no authentication is performed * @param callTimeoutSecs the timeout in seconds after which a {@code Write} gRPC call must be * complete. The timeout resets between retries - * @param retrier the {@link Retrier} whose backoff strategy to use for retry timings. + * @param retrier the {@link RemoteRetrier} whose backoff strategy to use for retry timings. * @param retryService the executor service to schedule retries on. It's the responsibility of the * caller to properly shutdown the service after use. Users should avoid shutting down the * service before {@link #shutdown()} has been called @@ -99,7 +100,7 @@ final class ByteStreamUploader { Channel channel, @Nullable CallCredentials callCredentials, long callTimeoutSecs, - Retrier retrier, + RemoteRetrier retrier, ListeningScheduledExecutorService retryService) { checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0."); @@ -112,12 +113,11 @@ final class ByteStreamUploader { } /** - * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code - * ByteStream} service. The call blocks until the upload is complete, or throws an {@link - * Exception} in case of error. + * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service. + * The call blocks until the upload is complete, or throws an {@link Exception} in case of error. * - *

Uploads are retried according to the specified {@link Retrier}. Retrying is transparent to - * the user of this API. + *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is + * transparent to the user of this API. * *

Trying to upload the same BLOB multiple times concurrently, results in only one upload being * performed. This is transparent to the user of this API. @@ -125,8 +125,7 @@ final class ByteStreamUploader { * @throws IOException when reading of the {@link Chunker}s input source fails * @throws RetryException when the upload failed after a retry */ - public void uploadBlob(Chunker chunker) - throws IOException, InterruptedException { + public void uploadBlob(Chunker chunker) throws IOException, InterruptedException { uploadBlobs(singletonList(chunker)); } @@ -136,8 +135,8 @@ final class ByteStreamUploader { * upload failed. Any other uploads will continue uploading in the background, until they complete * or the {@link #shutdown()} method is called. Errors encountered by these uploads are swallowed. * - *

Uploads are retried according to the specified {@link Retrier}. Retrying is transparent to - * the user of this API. + *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is + * transparent to the user of this API. * *

Trying to upload the same BLOB multiple times concurrently, results in only one upload being * performed. This is transparent to the user of this API. @@ -145,8 +144,7 @@ final class ByteStreamUploader { * @throws IOException when reading of the {@link Chunker}s input source fails * @throws RetryException when the upload failed after a retry */ - public void uploadBlobs(Iterable chunkers) - throws IOException, InterruptedException { + public void uploadBlobs(Iterable chunkers) throws IOException, InterruptedException { List> uploads = new ArrayList<>(); for (Chunker chunker : chunkers) { @@ -226,9 +224,7 @@ final class ByteStreamUploader { } private void startAsyncUploadWithRetry( - Chunker chunker, - Retrier.Backoff backoffTimes, - SettableFuture overallUploadResult) { + Chunker chunker, Retrier.Backoff backoffTimes, SettableFuture overallUploadResult) { AsyncUpload.Listener listener = new AsyncUpload.Listener() { @@ -241,9 +237,13 @@ final class ByteStreamUploader { public void failure(Status status) { StatusException cause = status.asException(); long nextDelayMillis = backoffTimes.nextDelayMillis(); - if (nextDelayMillis < 0 || !retrier.isRetriable(status)) { + if (nextDelayMillis < 0 || !retrier.isRetriable(cause)) { // Out of retries or status not retriable. - RetryException error = new RetryException(cause, backoffTimes.getRetryAttempts()); + RetryException error = + new RetryException( + "Out of retries or status not retriable.", + backoffTimes.getRetryAttempts(), + cause); overallUploadResult.setException(error); } else { retryAsyncUpload(nextDelayMillis, chunker, backoffTimes, overallUploadResult); @@ -272,14 +272,15 @@ final class ByteStreamUploader { schedulingResult.get(); } catch (Exception e) { overallUploadResult.setException( - new RetryException(e, backoffTimes.getRetryAttempts())); + new RetryException( + "Scheduled execution errored.", backoffTimes.getRetryAttempts(), e)); } }, MoreExecutors.directExecutor()); } catch (RejectedExecutionException e) { // May be thrown by .schedule(...) if i.e. the executor is shutdown. overallUploadResult.setException( - new RetryException(e, backoffTimes.getRetryAttempts())); + new RetryException("Rejected by executor.", backoffTimes.getRetryAttempts(), e)); } } }; diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java index 5c3722e727..908fb9b0cc 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java @@ -30,6 +30,7 @@ import com.google.devtools.build.lib.actions.ExecException; import com.google.devtools.build.lib.actions.MetadataProvider; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.remote.DigestUtil.ActionKey; +import com.google.devtools.build.lib.remote.Retrier.RetryException; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.FileSystemUtils; @@ -70,7 +71,7 @@ public class GrpcRemoteCache implements RemoteActionCache { private final RemoteOptions options; private final CallCredentials credentials; private final Channel channel; - private final Retrier retrier; + private final RemoteRetrier retrier; private final ByteStreamUploader uploader; private final DigestUtil digestUtil; private final ListeningScheduledExecutorService retryScheduler = @@ -81,7 +82,7 @@ public class GrpcRemoteCache implements RemoteActionCache { Channel channel, CallCredentials credentials, RemoteOptions options, - Retrier retrier, + RemoteRetrier retrier, DigestUtil digestUtil) { this.options = options; this.credentials = credentials; @@ -266,7 +267,7 @@ public class GrpcRemoteCache implements RemoteActionCache { * This method can throw {@link StatusRuntimeException}, but the RemoteCache interface does not * allow throwing such an exception. Any caller must make sure to catch the * {@link StatusRuntimeException}. Note that the retrier implicitly catches it, so if this is used - * in the context of {@link Retrier#execute}, that's perfectly safe. + * in the context of {@link RemoteRetrier#execute}, that's perfectly safe. * *

This method also converts any NOT_FOUND code returned from the server into a * {@link CacheNotFoundException}. TODO(olaola): this is not enough. NOT_FOUND can also be raised @@ -318,7 +319,7 @@ public class GrpcRemoteCache implements RemoteActionCache { .setActionResult(result) .build())); } catch (RetryException e) { - if (e.causedByStatusCode(Status.Code.UNIMPLEMENTED)) { + if (RemoteRetrierUtils.causedByStatus(e, Status.Code.UNIMPLEMENTED)) { // Silently return without upload. return; } @@ -443,7 +444,7 @@ public class GrpcRemoteCache implements RemoteActionCache { .setActionDigest(actionKey.getDigest()) .build())); } catch (RetryException e) { - if (e.causedByStatusCode(Status.Code.NOT_FOUND)) { + if (RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND)) { // Return null to indicate that it was a cache miss. return null; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index e87fa4b305..dbff8e7e88 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -45,10 +45,13 @@ class GrpcRemoteExecutor { private final Channel channel; private final CallCredentials callCredentials; private final int callTimeoutSecs; - private final Retrier retrier; + private final RemoteRetrier retrier; - public GrpcRemoteExecutor(Channel channel, @Nullable CallCredentials callCredentials, - int callTimeoutSecs, Retrier retrier) { + public GrpcRemoteExecutor( + Channel channel, + @Nullable CallCredentials callCredentials, + int callTimeoutSecs, + RemoteRetrier retrier) { Preconditions.checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0."); this.channel = channel; this.callCredentials = callCredentials; @@ -123,61 +126,64 @@ class GrpcRemoteExecutor { throws IOException, InterruptedException { // The only errors retried here are transient failures of the Action itself on the server, not // any gRPC errors that occurred during the call. - return retrier.execute(() -> { - // Here all transient gRPC errors will be retried. - Operation op = retrier.execute(() -> execBlockingStub().execute(request)); - ExecuteResponse resp = getOperationResponse(op); - if (resp != null) { - return resp; - } - Request wr = Request.newBuilder().setTarget(op.getName()).build(); - // Here all transient gRPC errors will be retried, while transient failures of the Action - // itself will be propagated. - return retrier.execute( - () -> { - Iterator replies = watcherBlockingStub().watch(wr); - while (replies.hasNext()) { - ChangeBatch cb = replies.next(); - for (Change ch : cb.getChangesList()) { - switch (ch.getState()) { - case INITIAL_STATE_SKIPPED: - continue; - case ERROR: - try { - throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class)); - } catch (InvalidProtocolBufferException e) { - throw new IOException(e); - } - case DOES_NOT_EXIST: - // TODO(olaola): either make this retriable, or use a different exception. - throw new IOException( - String.format("Operation %s lost on the remote server.", op.getName())); - case EXISTS: - Operation o; - try { - o = ch.getData().unpack(Operation.class); - } catch (InvalidProtocolBufferException e) { - throw new IOException(e); - } - try { - ExecuteResponse r = getOperationResponse(o); - if (r != null) { - return r; - } - } catch (StatusRuntimeException e) { - // Pass through the Watch retry and retry the whole execute+watch call. - throw new Retrier.PassThroughException(e); + return retrier.execute( + () -> { + // Here all transient gRPC errors will be retried. + Operation op = retrier.execute(() -> execBlockingStub().execute(request)); + ExecuteResponse resp = getOperationResponse(op); + if (resp != null) { + return resp; + } + Request wr = Request.newBuilder().setTarget(op.getName()).build(); + // Here all transient gRPC errors will be retried, while transient failures of the Action + // itself will be propagated. + return retrier.execute( + () -> { + Iterator replies = watcherBlockingStub().watch(wr); + while (replies.hasNext()) { + ChangeBatch cb = replies.next(); + for (Change ch : cb.getChangesList()) { + switch (ch.getState()) { + case INITIAL_STATE_SKIPPED: + continue; + case ERROR: + try { + throw StatusProto.toStatusRuntimeException( + ch.getData().unpack(Status.class)); + } catch (InvalidProtocolBufferException e) { + throw new IOException(e); + } + case DOES_NOT_EXIST: + // TODO(olaola): either make this retriable, or use a different exception. + throw new IOException( + String.format("Operation %s lost on the remote server.", op.getName())); + case EXISTS: + Operation o; + try { + o = ch.getData().unpack(Operation.class); + } catch (InvalidProtocolBufferException e) { + throw new IOException(e); + } + try { + ExecuteResponse r = getOperationResponse(o); + if (r != null) { + return r; + } + } catch (StatusRuntimeException e) { + // Pass through the Watch retry and retry the whole execute+watch call. + throw new RemoteRetrier.PassThroughException(e); + } + continue; + default: + // This can only happen if the enum gets unexpectedly extended. + throw new IOException( + String.format("Illegal change state: %s", ch.getState())); } - continue; - default: - // This can only happen if the enum gets unexpectedly extended. - throw new IOException(String.format("Illegal change state: %s", ch.getState())); + } } - } - } - throw new IOException( - String.format("Watch request for %s terminated with no result.", op.getName())); - }); - }); + throw new IOException( + String.format("Watch request for %s terminated with no result.", op.getName())); + }); + }); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index c923a4d811..8928142626 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -114,7 +114,9 @@ public final class RemoteModule extends BlazeModule { boolean remoteOrLocalCache = SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions); boolean grpcCache = GrpcRemoteCache.isRemoteCacheOptions(remoteOptions); - Retrier retrier = new Retrier(remoteOptions); + RemoteRetrier retrier = + new RemoteRetrier( + remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, Retrier.ALLOW_ALL_CALLS); CallCredentials creds = GrpcUtils.newCallCredentials(authAndTlsOptions); // TODO(davido): The naming is wrong here. "Remote"-prefix in RemoteActionCache class has no // meaning. diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java index 8359f4de99..27de8b7305 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java @@ -36,13 +36,13 @@ import java.util.function.Supplier; * // Not retried. * throw PassThroughException(new IOException("fail")); * } - * } catch (RetryException2 e) { + * } catch (RetryException e) { * // e.getCause() is the IOException * System.out.println(e.getCause()); * } * */ -class RemoteRetrier extends Retrier2 { +class RemoteRetrier extends Retrier { /** * Wraps around an {@link Exception} to make it pass through a single layer of retries. @@ -96,13 +96,13 @@ class RemoteRetrier extends Retrier2 { } @Override - public T execute(Callable call) throws RetryException2, InterruptedException { + public T execute(Callable call) throws RetryException, InterruptedException { try { return super.execute(call); - } catch (RetryException2 e) { + } catch (RetryException e) { if (e.getCause() instanceof PassThroughException) { PassThroughException passThrough = (PassThroughException) e.getCause(); - throw new RetryException2("Retries aborted because of PassThroughException", + throw new RetryException("Retries aborted because of PassThroughException", e.getAttempts(), (Exception) passThrough.getCause()); } throw e; @@ -116,7 +116,7 @@ class RemoteRetrier extends Retrier2 { return e -> !(e instanceof PassThroughException) && delegate.test(e); } - static class ExponentialBackoff implements Retrier2.Backoff { + static class ExponentialBackoff implements Retrier.Backoff { private final long maxMillis; private long nextDelayMillis; diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrierUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrierUtils.java new file mode 100644 index 0000000000..f518b43682 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrierUtils.java @@ -0,0 +1,33 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote; + +import com.google.devtools.build.lib.remote.Retrier.RetryException; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; + +/** Methods useful when using the {@link RemoteRetrier}. */ +public final class RemoteRetrierUtils { + + public static boolean causedByStatus(RetryException e, Status.Code expected) { + if (e.getCause() instanceof StatusRuntimeException) { + return ((StatusRuntimeException) e.getCause()).getStatus().getCode() == expected; + } else if (e.getCause() instanceof StatusException) { + return ((StatusException) e.getCause()).getStatus().getCode() == expected; + } + return false; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java index 3a1b474000..c67ef489a6 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java @@ -33,6 +33,7 @@ import com.google.devtools.build.lib.exec.SpawnExecException; import com.google.devtools.build.lib.exec.SpawnInputExpander; import com.google.devtools.build.lib.exec.SpawnRunner; import com.google.devtools.build.lib.remote.DigestUtil.ActionKey; +import com.google.devtools.build.lib.remote.Retrier.RetryException; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.util.ExitCode; import com.google.devtools.build.lib.util.io.FileOutErr; @@ -223,9 +224,10 @@ class RemoteSpawnRunner implements SpawnRunner { return handleError(cause, policy.getFileOutErr()); } - private SpawnResult handleError(IOException cause, FileOutErr outErr) throws IOException, + private SpawnResult handleError(IOException exception, FileOutErr outErr) throws IOException, ExecException { - if (cause instanceof TimeoutException) { + final Throwable cause = exception.getCause(); + if (exception instanceof TimeoutException || cause instanceof TimeoutException) { // TODO(buchgr): provide stdout/stderr from the action that timed out. // Remove the unsuported message once remote execution tests no longer check for it. try (OutputStream out = outErr.getOutputStream()) { @@ -238,8 +240,8 @@ class RemoteSpawnRunner implements SpawnRunner { .build(); } final Status status; - if (cause instanceof RetryException - && ((RetryException) cause).causedByStatusCode(Code.UNAVAILABLE)) { + if (exception instanceof RetryException + && RemoteRetrierUtils.causedByStatus((RetryException) exception, Code.UNAVAILABLE)) { status = Status.EXECUTION_FAILED_CATASTROPHICALLY; } else if (cause instanceof CacheNotFoundException) { status = Status.REMOTE_CACHE_FAILED; @@ -247,7 +249,7 @@ class RemoteSpawnRunner implements SpawnRunner { status = Status.EXECUTION_FAILED; } throw new SpawnExecException( - Throwables.getStackTraceAsString(cause), + Throwables.getStackTraceAsString(exception), new SpawnResult.Builder() .setStatus(status) .setExitCode(ExitCode.REMOTE_ERROR.getNumericExitCode()) diff --git a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java index e529109121..b675e51e67 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java @@ -1,4 +1,4 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. +// Copyright 2017 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,227 +15,246 @@ package com.google.devtools.build.lib.remote; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.base.Supplier; -import com.google.common.base.Throwables; -import io.grpc.Status; -import io.grpc.StatusException; -import io.grpc.StatusRuntimeException; +import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State; import java.io.IOException; -import java.time.Duration; import java.util.concurrent.Callable; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; /** - * Supports execution with retries on particular gRPC Statuses. The retrier is ThreadSafe. + * Supports retrying the execution of a {@link Callable} in case of failure. * - *

Example usage: The simple use-case is to call retrier.execute, e.g: - * - *

- * foo = retrier.execute(
- *     new Callable() {
- *       @Override
- *       public Foo call() {
- *         return grpcStub.getFoo(fooRequest);
- *       }
- *     });
- * 
+ *

The errors that are retried are configurable via a {@link Predicate}. The + * delay between executions is specified by a {@link Backoff}. Additionally, the retrier supports + * circuit breaking to stop execution in case of high failure rates. */ -public class Retrier { - /** Wraps around a StatusRuntimeException to make it pass through a single layer of retries. */ - public static class PassThroughException extends Exception { - public PassThroughException(StatusRuntimeException e) { - super(e); - } - } +// TODO(buchgr): Move to a different package and use it for BES code. +@ThreadSafe +class Retrier { /** - * Backoff is a stateful object providing a sequence of durations that are used to time delays - * between retries. It is not ThreadSafe. The reason that Backoff needs to be stateful, rather - * than a static map of attempt number to delay, is to enable using the retrier via the manual - * calling isRetriable and nextDelayMillis manually (see ByteStreamUploader example). + * A backoff strategy. */ public interface Backoff { - /** Indicates that no more retries should be made for use in {@link #nextDelayMillis()}. */ - static final long STOP = -1L; - - /** Returns the next delay in milliseconds, or < 0 if we should not continue retrying. */ + /** + * Returns the next delay in milliseconds, or a value less than {@code 0} if we should stop + * retrying. + */ long nextDelayMillis(); /** * Returns the number of calls to {@link #nextDelayMillis()} thus far, not counting any calls - * that returned STOP. + * that returned less than {@code 0}. */ int getRetryAttempts(); + } + + /** + * The circuit breaker allows to reject execution when failure rates are high. + * + *

The initial state of a circuit breaker is the {@link State#ACCEPT_CALLS}. Calls are + * executed and retried in this state. However, if error rates are high a circuit breaker can + * choose to transition into {@link State#REJECT_CALLS}. In this state any calls are rejected with + * a {@link RetryException} immediately. A circuit breaker in state {@link State#REJECT_CALLS} + * can periodically return a {@code TRIAL_CALL} state, in which case a call will be executed once + * and in case of success the circuit breaker may return to state {@code ACCEPT_CALLS}. + * + *

A circuit breaker implementation must be thread-safe. + * + * @see CircuitBreaker + */ + public interface CircuitBreaker { + + enum State { + /** + * Calls are executed and retried in case of failure. + * + *

The circuit breaker can transition into state {@link State#REJECT_CALLS}. + */ + ACCEPT_CALLS, + + /** + * A call is executed and not retried in case of failure. + * + *

The circuit breaker can transition into any state. + */ + TRIAL_CALL, + + /** + * All calls are rejected. + * + *

The circuit breaker can transition into state {@link State#TRIAL_CALL}. + */ + REJECT_CALLS + } /** - * Creates a Backoff supplier for a Backoff which does not support any retries. Both the - * Supplier and the Backoff are stateless and thread-safe. + * Returns the current {@link State} of the circuit breaker. */ - static final Supplier NO_RETRIES = - () -> - new Backoff() { - @Override - public long nextDelayMillis() { - return STOP; - } - - @Override - public int getRetryAttempts() { - return 0; - } - }; + State state(); /** - * Creates a Backoff supplier for an optionally jittered exponential backoff. The supplier is - * ThreadSafe (non-synchronized calls to get() are fine), but the returned Backoff is not. - * - * @param initial The initial backoff duration. - * @param max The maximum backoff duration. - * @param multiplier The amount the backoff should increase in each iteration. Must be >1. - * @param jitter The amount the backoff should be randomly varied (0-1), with 0 providing no - * jitter, and 1 providing a duration that is 0-200% of the non-jittered duration. - * @param maxAttempts Maximal times to attempt a retry 0 means no retries. + * Called after an execution failed. */ - static Supplier exponential( - Duration initial, Duration max, double multiplier, double jitter, int maxAttempts) { - Preconditions.checkArgument(multiplier > 1, "multipler must be > 1"); - Preconditions.checkArgument(jitter >= 0 && jitter <= 1, "jitter must be in the range (0, 1)"); - Preconditions.checkArgument(maxAttempts >= 0, "maxAttempts must be >= 0"); - return () -> - new Backoff() { - private final long maxMillis = max.toMillis(); - private long nextDelayMillis = initial.toMillis(); - private int attempts = 0; - - @Override - public long nextDelayMillis() { - if (attempts == maxAttempts) { - return STOP; - } - attempts++; - double jitterRatio = jitter * (ThreadLocalRandom.current().nextDouble(2.0) - 1); - long result = (long) (nextDelayMillis * (1 + jitterRatio)); - // Advance current by the non-jittered result. - nextDelayMillis = (long) (nextDelayMillis * multiplier); - if (nextDelayMillis > maxMillis) { - nextDelayMillis = maxMillis; - } - return result; - } - - @Override - public int getRetryAttempts() { - return attempts; - } - }; - } + void recordFailure(); + + /** + * Called after an execution succeeded. + */ + void recordSuccess(); } - public static final Predicate DEFAULT_IS_RETRIABLE = - st -> { - switch (st.getCode()) { - case CANCELLED: - return !Thread.currentThread().isInterrupted(); - case UNKNOWN: - case DEADLINE_EXCEEDED: - case ABORTED: - case INTERNAL: - case UNAVAILABLE: - case UNAUTHENTICATED: - case RESOURCE_EXHAUSTED: - return true; - default: - return false; - } - }; + public interface Sleeper { + void sleep(long millis) throws InterruptedException; + } - public static final Predicate RETRY_ALL = Predicates.alwaysTrue(); - public static final Predicate RETRY_NONE = Predicates.alwaysFalse(); - public static final Retrier NO_RETRIES = new Retrier(Backoff.NO_RETRIES, RETRY_NONE); + public static class RetryException extends IOException { - private final Supplier backoffSupplier; - private final Predicate isRetriable; + private final int attempts; - @VisibleForTesting - Retrier(Supplier backoffSupplier, Predicate isRetriable) { - this.backoffSupplier = backoffSupplier; - this.isRetriable = isRetriable; + public RetryException(String message, int numRetries, Exception cause) { + super(message, cause); + this.attempts = numRetries + 1; + } + + protected RetryException(String message) { + super(message); + this.attempts = 0; + } + + /** + * Returns the number of times a {@link Callable} has been executed before this exception + * was thrown. + */ + public int getAttempts() { + return attempts; + } } - public Retrier(RemoteOptions options) { - this( - options.experimentalRemoteRetry - ? Backoff.exponential( - Duration.ofMillis(options.experimentalRemoteRetryStartDelayMillis), - Duration.ofMillis(options.experimentalRemoteRetryMaxDelayMillis), - options.experimentalRemoteRetryMultiplier, - options.experimentalRemoteRetryJitter, - options.experimentalRemoteRetryMaxAttempts) - : Backoff.NO_RETRIES, - DEFAULT_IS_RETRIABLE); + public static class CircuitBreakerException extends RetryException { + + private CircuitBreakerException(String message, int numRetries, Exception cause) { + super(message, numRetries, cause); + } + + private CircuitBreakerException() { + super("Call not executed due to a high failure rate."); + } } - /** - * Returns {@code true} if the {@link Status} is retriable. - */ - public boolean isRetriable(Status s) { - return isRetriable.apply(s); + public static final CircuitBreaker ALLOW_ALL_CALLS = new CircuitBreaker() { + @Override + public State state() { + return State.ACCEPT_CALLS; + } + + @Override + public void recordFailure() { + } + + @Override + public void recordSuccess() { + } + }; + + public static final Backoff RETRIES_DISABLED = new Backoff() { + @Override + public long nextDelayMillis() { + return -1; + } + + @Override + public int getRetryAttempts() { + return 0; + } + }; + + private final Supplier backoffSupplier; + private final Predicate shouldRetry; + private final CircuitBreaker circuitBreaker; + private final Sleeper sleeper; + + public Retrier(Supplier backoffSupplier, Predicate shouldRetry, + CircuitBreaker circuitBreaker) { + this(backoffSupplier, shouldRetry, circuitBreaker, TimeUnit.MILLISECONDS::sleep); + } + + @VisibleForTesting + Retrier(Supplier backoffSupplier, Predicate shouldRetry, + CircuitBreaker circuitBreaker, Sleeper sleeper) { + this.backoffSupplier = backoffSupplier; + this.shouldRetry = shouldRetry; + this.circuitBreaker = circuitBreaker; + this.sleeper = sleeper; } /** - * Executes the given callable in a loop, retrying on retryable errors, as defined by the current - * backoff/retry policy. Will raise the last encountered retriable error, or the first - * non-retriable error. + * Execute a {@link Callable}, retrying execution in case of failure and returning the result in + * case of success. * - *

This method never throws {@link StatusRuntimeException} even if the passed-in Callable does. + *

{@link InterruptedException} is not retried. * - * @param c The callable to execute. + * @param call the {@link Callable} to execute. + * @throws RetryException if the {@code call} didn't succeed within the framework specified by + * {@code backoffSupplier} and {@code shouldRetry}. + * @throws CircuitBreakerException in case a call was rejected because the circuit breaker + * tripped. + * @throws InterruptedException if the {@code call} throws an {@link InterruptedException} or the + * current thread's interrupted flag is set. */ - public T execute(Callable c) throws InterruptedException, IOException { - Backoff backoff = backoffSupplier.get(); + public T execute(Callable call) throws RetryException, InterruptedException { + final Backoff backoff = newBackoff(); while (true) { + final State circuitState; + circuitState = circuitBreaker.state(); + if (State.REJECT_CALLS.equals(circuitState)) { + throw new CircuitBreakerException(); + } try { - return c.call(); - } catch (PassThroughException e) { - throw (StatusRuntimeException) e.getCause(); - } catch (RetryException e) { - throw e; // Nested retries are always pass-through. - } catch (StatusException | StatusRuntimeException e) { - Status st = Status.fromThrowable(e); - int attempts = backoff.getRetryAttempts(); - long delay = backoff.nextDelayMillis(); - if (st.getCode() == Status.Code.CANCELLED && Thread.currentThread().isInterrupted()) { - Thread.currentThread().interrupt(); + if (Thread.interrupted()) { throw new InterruptedException(); } - if (delay < 0 || !isRetriable.apply(st)) { - throw new RetryException(st.asRuntimeException(), attempts); - } - sleep(delay); + T r = call.call(); + circuitBreaker.recordSuccess(); + return r; + } catch (InterruptedException e) { + circuitBreaker.recordFailure(); + throw e; } catch (Exception e) { - // Generic catch because Callable is declared to throw Exception, we rethrow any unchecked - // exception as well as any exception we declared above. - Throwables.throwIfUnchecked(e); - Throwables.throwIfInstanceOf(e, IOException.class); - Throwables.throwIfInstanceOf(e, InterruptedException.class); - throw new RetryException(e, backoff.getRetryAttempts()); + circuitBreaker.recordFailure(); + if (e instanceof RetryException) { + // Support nested retry calls. + e = (Exception) e.getCause(); + } + if (State.TRIAL_CALL.equals(circuitState)) { + throw new CircuitBreakerException("Call failed in circuit breaker half open state.", 0, + e); + } + int attempts = backoff.getRetryAttempts(); + if (!shouldRetry.test(e)) { + throw new RetryException("Call failed with not retriable error.", attempts, e); + } + final long delayMillis = backoff.nextDelayMillis(); + if (delayMillis < 0) { + throw new RetryException( + "Call failed after exhausting retry attempts: " + attempts, attempts, e); + } + sleeper.sleep(delayMillis); } } } - @VisibleForTesting - void sleep(long timeMillis) throws InterruptedException { - Preconditions.checkArgument( - timeMillis >= 0L, "timeMillis must not be negative: %s", timeMillis); - TimeUnit.MILLISECONDS.sleep(timeMillis); - } + //TODO(buchgr): Add executeAsync to be used by ByteStreamUploader + // ListenableFuture executeAsync(AsyncCallable call, ScheduledExecutorService executor) public Backoff newBackoff() { return backoffSupplier.get(); } + + public boolean isRetriable(Exception e) { + return shouldRetry.test(e); + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/Retrier2.java b/src/main/java/com/google/devtools/build/lib/remote/Retrier2.java deleted file mode 100644 index e9a938b658..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/Retrier2.java +++ /dev/null @@ -1,260 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.remote; - -import com.google.common.annotations.VisibleForTesting; -import com.google.devtools.build.lib.remote.Retrier2.CircuitBreaker.State; -import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; -import java.util.function.Supplier; -import javax.annotation.concurrent.ThreadSafe; - -/** - * Supports retrying the execution of a {@link Callable} in case of failure. - * - *

The errors that are retried are configurable via a {@link Predicate}. The - * delay between executions is specified by a {@link Backoff}. Additionally, the retrier supports - * circuit breaking to stop execution in case of high failure rates. - */ -// TODO(buchgr): Move to a different package and use it for BES code. -@ThreadSafe -class Retrier2 { - - /** - * A backoff strategy. - */ - public interface Backoff { - - /** - * Returns the next delay in milliseconds, or a value less than {@code 0} if we should stop - * retrying. - */ - long nextDelayMillis(); - - /** - * Returns the number of calls to {@link #nextDelayMillis()} thus far, not counting any calls - * that returned less than {@code 0}. - */ - int getRetryAttempts(); - } - - /** - * The circuit breaker allows to reject execution when failure rates are high. - * - *

The initial state of a circuit breaker is the {@link State#ACCEPT_CALLS}. Calls are - * executed and retried in this state. However, if error rates are high a circuit breaker can - * choose to transition into {@link State#REJECT_CALLS}. In this state any calls are rejected with - * a {@link RetryException2} immediately. A circuit breaker in state {@link State#REJECT_CALLS} - * can periodically return a {@code TRIAL_CALL} state, in which case a call will be executed once - * and in case of success the circuit breaker may return to state {@code ACCEPT_CALLS}. - * - *

A circuit breaker implementation must be thread-safe. - * - * @see CircuitBreaker - */ - public interface CircuitBreaker { - - enum State { - /** - * Calls are executed and retried in case of failure. - * - *

The circuit breaker can transition into state {@link State#REJECT_CALLS}. - */ - ACCEPT_CALLS, - - /** - * A call is executed and not retried in case of failure. - * - *

The circuit breaker can transition into any state. - */ - TRIAL_CALL, - - /** - * All calls are rejected. - * - *

The circuit breaker can transition into state {@link State#TRIAL_CALL}. - */ - REJECT_CALLS - } - - /** - * Returns the current {@link State} of the circuit breaker. - */ - State state(); - - /** - * Called after an execution failed. - */ - void recordFailure(); - - /** - * Called after an execution succeeded. - */ - void recordSuccess(); - } - - public interface Sleeper { - void sleep(long millis) throws InterruptedException; - } - - public static class RetryException2 extends IOException { - - private final int attempts; - - public RetryException2(String message, int numRetries, Exception cause) { - super(message, cause); - this.attempts = numRetries + 1; - } - - protected RetryException2(String message) { - super(message); - this.attempts = 0; - } - - /** - * Returns the number of times a {@link Callable} has been executed before this exception - * was thrown. - */ - public int getAttempts() { - return attempts; - } - } - - public static class CircuitBreakerException extends RetryException2 { - - private CircuitBreakerException(String message, int numRetries, Exception cause) { - super(message, numRetries, cause); - } - - private CircuitBreakerException() { - super("Call not executed due to a high failure rate."); - } - } - - public static final CircuitBreaker ALLOW_ALL_CALLS = new CircuitBreaker() { - @Override - public State state() { - return State.ACCEPT_CALLS; - } - - @Override - public void recordFailure() { - } - - @Override - public void recordSuccess() { - } - }; - - public static final Backoff RETRIES_DISABLED = new Backoff() { - @Override - public long nextDelayMillis() { - return -1; - } - - @Override - public int getRetryAttempts() { - return 0; - } - }; - - private final Supplier backoffSupplier; - private final Predicate shouldRetry; - private final CircuitBreaker circuitBreaker; - private final Sleeper sleeper; - - public Retrier2 (Supplier backoffSupplier, Predicate shouldRetry, - CircuitBreaker circuitBreaker) { - this(backoffSupplier, shouldRetry, circuitBreaker, TimeUnit.MILLISECONDS::sleep); - } - - @VisibleForTesting - Retrier2 (Supplier backoffSupplier, Predicate shouldRetry, - CircuitBreaker circuitBreaker, Sleeper sleeper) { - this.backoffSupplier = backoffSupplier; - this.shouldRetry = shouldRetry; - this.circuitBreaker = circuitBreaker; - this.sleeper = sleeper; - } - - /** - * Execute a {@link Callable}, retrying execution in case of failure and returning the result in - * case of success. - * - *

{@link InterruptedException} is not retried. - * - * @param call the {@link Callable} to execute. - * @throws RetryException2 if the {@code call} didn't succeed within the framework specified by - * {@code backoffSupplier} and {@code shouldRetry}. - * @throws CircuitBreakerException in case a call was rejected because the circuit breaker - * tripped. - * @throws InterruptedException if the {@code call} throws an {@link InterruptedException} or the - * current thread's interrupted flag is set. - */ - public T execute(Callable call) throws RetryException2, InterruptedException { - final Backoff backoff = newBackoff(); - while (true) { - final State circuitState; - circuitState = circuitBreaker.state(); - if (State.REJECT_CALLS.equals(circuitState)) { - throw new CircuitBreakerException(); - } - try { - if (Thread.interrupted()) { - throw new InterruptedException(); - } - T r = call.call(); - circuitBreaker.recordSuccess(); - return r; - } catch (InterruptedException e) { - circuitBreaker.recordFailure(); - throw e; - } catch (Exception e) { - circuitBreaker.recordFailure(); - if (e instanceof RetryException2) { - // Support nested retry calls. - e = (Exception) e.getCause(); - } - if (State.TRIAL_CALL.equals(circuitState)) { - throw new CircuitBreakerException("Call failed in circuit breaker half open state.", 0, - e); - } - int attempts = backoff.getRetryAttempts(); - if (!shouldRetry.test(e)) { - throw new RetryException2("Call failed with not retriable error.", attempts, e); - } - final long delayMillis = backoff.nextDelayMillis(); - if (delayMillis < 0) { - throw new RetryException2( - "Call failed after exhausting retry attempts: " + attempts, attempts, e); - } - sleeper.sleep(delayMillis); - } - } - } - - //TODO(buchgr): Add executeAsync to be used by ByteStreamUploader - // ListenableFuture executeAsync(AsyncCallable call, ScheduledExecutorService executor) - - public Backoff newBackoff() { - return backoffSupplier.get(); - } - - public boolean isRetriable(Exception e) { - return shouldRetry.test(e); - } -} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RetryException.java b/src/main/java/com/google/devtools/build/lib/remote/RetryException.java deleted file mode 100644 index 49fa6abf3a..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/RetryException.java +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.remote; - -import io.grpc.Status.Code; -import io.grpc.StatusException; -import io.grpc.StatusRuntimeException; -import java.io.IOException; - -/** An exception to indicate failed retry attempts. */ -public final class RetryException extends IOException { - private final int attempts; - - RetryException(Throwable cause, int retryAttempts) { - super(String.format("after %d attempts: %s", retryAttempts + 1, cause), cause); - this.attempts = retryAttempts + 1; - } - - public int getAttempts() { - return attempts; - } - - public boolean causedByStatusCode(Code code) { - if (getCause() instanceof StatusRuntimeException) { - return ((StatusRuntimeException) getCause()).getStatus().getCode() == code; - } else if (getCause() instanceof StatusException) { - return ((StatusException) getCause()).getStatus().getCode() == code; - } - return false; - } -} diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 0b70f36623..21e4438d6f 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -26,6 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.analysis.BlazeVersionInfo; +import com.google.devtools.build.lib.remote.Retrier.RetryException; import com.google.devtools.build.lib.vfs.FileSystem.HashFunction; import com.google.devtools.remoteexecution.v1test.Digest; import com.google.devtools.remoteexecution.v1test.RequestMetadata; @@ -89,8 +90,7 @@ public class ByteStreamUploaderTest { private Channel channel; private Context withEmptyMetadata; - @Mock - private Retrier.Backoff mockBackoff; + @Mock private Retrier.Backoff mockBackoff; @Before public final void setUp() throws Exception { @@ -117,7 +117,8 @@ public class ByteStreamUploaderTest { @Test(timeout = 10000) public void singleBlobUploadShouldWork() throws Exception { withEmptyMetadata.attach(); - Retrier retrier = new Retrier(() -> mockBackoff, (Status s) -> true); + RemoteRetrier retrier = + new RemoteRetrier(() -> mockBackoff, (e) -> true, Retrier.ALLOW_ALL_CALLS); ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); @@ -186,7 +187,8 @@ public class ByteStreamUploaderTest { @Test(timeout = 20000) public void multipleBlobsUploadShouldWork() throws Exception { withEmptyMetadata.attach(); - Retrier retrier = new Retrier(() -> new FixedBackoff(1, 0), (Status s) -> true); + RemoteRetrier retrier = + new RemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, Retrier.ALLOW_ALL_CALLS); ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); @@ -276,7 +278,8 @@ public class ByteStreamUploaderTest { withEmptyMetadata.attach(); // We upload blobs with different context, and retry 3 times for each upload. // We verify that the correct metadata is passed to the server with every blob. - Retrier retrier = new Retrier(() -> new FixedBackoff(3, 0), (Status s) -> true); + RemoteRetrier retrier = + new RemoteRetrier(() -> new FixedBackoff(3, 0), (e) -> true, Retrier.ALLOW_ALL_CALLS); ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); @@ -365,7 +368,8 @@ public class ByteStreamUploaderTest { // Test that uploading the same file concurrently triggers only one file upload. withEmptyMetadata.attach(); - Retrier retrier = new Retrier(() -> mockBackoff, (Status s) -> true); + RemoteRetrier retrier = + new RemoteRetrier(() -> mockBackoff, (e) -> true, Retrier.ALLOW_ALL_CALLS); ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); @@ -424,7 +428,8 @@ public class ByteStreamUploaderTest { @Test(timeout = 10000) public void errorsShouldBeReported() throws IOException, InterruptedException { withEmptyMetadata.attach(); - Retrier retrier = new Retrier(() -> new FixedBackoff(1, 10), (Status s) -> true); + RemoteRetrier retrier = + new RemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, Retrier.ALLOW_ALL_CALLS); ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); @@ -444,14 +449,15 @@ public class ByteStreamUploaderTest { fail("Should have thrown an exception."); } catch (RetryException e) { assertThat(e.getAttempts()).isEqualTo(2); - assertThat(e.causedByStatusCode(Code.INTERNAL)).isTrue(); + assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue(); } } @Test(timeout = 10000) public void shutdownShouldCancelOngoingUploads() throws Exception { withEmptyMetadata.attach(); - Retrier retrier = new Retrier(() -> new FixedBackoff(1, 10), (Status s) -> true); + RemoteRetrier retrier = + new RemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, Retrier.ALLOW_ALL_CALLS); ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); @@ -503,7 +509,8 @@ public class ByteStreamUploaderTest { @Test(timeout = 10000) public void failureInRetryExecutorShouldBeHandled() throws Exception { withEmptyMetadata.attach(); - Retrier retrier = new Retrier(() -> new FixedBackoff(1, 10), (Status s) -> true); + RemoteRetrier retrier = + new RemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, Retrier.ALLOW_ALL_CALLS); ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); @@ -534,7 +541,8 @@ public class ByteStreamUploaderTest { @Test(timeout = 10000) public void resourceNameWithoutInstanceName() throws Exception { withEmptyMetadata.attach(); - Retrier retrier = new Retrier(() -> mockBackoff, (Status s) -> true); + RemoteRetrier retrier = + new RemoteRetrier(() -> mockBackoff, (e) -> true, Retrier.ALLOW_ALL_CALLS); ByteStreamUploader uploader = new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier, retryService); @@ -571,8 +579,11 @@ public class ByteStreamUploaderTest { @Test(timeout = 10000) public void nonRetryableStatusShouldNotBeRetried() throws Exception { withEmptyMetadata.attach(); - Retrier retrier = new Retrier(() -> new FixedBackoff(1, 0), - /* No Status is retriable. */ (Status s) -> false); + RemoteRetrier retrier = + new RemoteRetrier( + () -> new FixedBackoff(1, 0), + /* No Status is retriable. */ (e) -> false, + Retrier.ALLOW_ALL_CALLS); ByteStreamUploader uploader = new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier, retryService); diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java index 5664ea37cc..d2c8d287c4 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java @@ -152,7 +152,9 @@ public class GrpcRemoteCacheTest { scratch.resolve(authTlsOptions.authCredentials).getInputStream(), authTlsOptions.authScope); RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); - Retrier retrier = new Retrier(remoteOptions); + RemoteRetrier retrier = + new RemoteRetrier( + remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, Retrier.ALLOW_ALL_CALLS); return new GrpcRemoteCache( ClientInterceptors.intercept( InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java index 2e06805539..b6fcd830df 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java @@ -232,7 +232,8 @@ public class GrpcRemoteExecutionClientTest { FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory()); outErr = new FileOutErr(stdout, stderr); RemoteOptions options = Options.getDefaults(RemoteOptions.class); - Retrier retrier = new Retrier(options); + RemoteRetrier retrier = + new RemoteRetrier(options, RemoteRetrier.RETRIABLE_GRPC_ERRORS, Retrier.ALLOW_ALL_CALLS); Channel channel = InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(); GrpcRemoteExecutor executor = new GrpcRemoteExecutor(channel, null, options.remoteTimeout, retrier); diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteRetrierTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteRetrierTest.java index 058a2dfba6..68ce4543cd 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteRetrierTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteRetrierTest.java @@ -20,9 +20,9 @@ import static org.mockito.Mockito.when; import com.google.common.collect.Range; import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff; -import com.google.devtools.build.lib.remote.Retrier2.Backoff; -import com.google.devtools.build.lib.remote.Retrier2.RetryException2; -import com.google.devtools.build.lib.remote.Retrier2.Sleeper; +import com.google.devtools.build.lib.remote.Retrier.Backoff; +import com.google.devtools.build.lib.remote.Retrier.RetryException; +import com.google.devtools.build.lib.remote.Retrier.Sleeper; import com.google.devtools.common.options.Options; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -42,7 +42,7 @@ import org.mockito.Mockito; public class RemoteRetrierTest { interface Foo { - public String foo(); + String foo(); } private RemoteRetrierTest.Foo fooMock; @@ -54,7 +54,7 @@ public class RemoteRetrierTest { @Test public void testExponentialBackoff() throws Exception { - Retrier2.Backoff backoff = + Retrier.Backoff backoff = new ExponentialBackoff(Duration.ofSeconds(1), Duration.ofSeconds(10), 2, 0, 6); assertThat(backoff.nextDelayMillis()).isEqualTo(1000); assertThat(backoff.nextDelayMillis()).isEqualTo(2000); @@ -67,7 +67,7 @@ public class RemoteRetrierTest { @Test public void testExponentialBackoffJittered() throws Exception { - Retrier2.Backoff backoff = + Retrier.Backoff backoff = new ExponentialBackoff(Duration.ofSeconds(1), Duration.ofSeconds(10), 2, 0.1, 6); assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(900L, 1100L)); assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(1800L, 2200L)); @@ -82,7 +82,7 @@ public class RemoteRetrierTest { try { retrier.execute(() -> fooMock.foo()); fail(); - } catch (RetryException2 e) { + } catch (RetryException e) { assertThat(e.getAttempts()).isEqualTo(attempts); } } @@ -92,8 +92,8 @@ public class RemoteRetrierTest { RemoteOptions options = Options.getDefaults(RemoteOptions.class); options.experimentalRemoteRetry = false; - RemoteRetrier retrier = Mockito.spy(new RemoteRetrier(options, - RemoteRetrier.RETRIABLE_GRPC_ERRORS, Retrier2.ALLOW_ALL_CALLS)); + RemoteRetrier retrier = + Mockito.spy(new RemoteRetrier(options, (e) -> true, Retrier.ALLOW_ALL_CALLS)); when(fooMock.foo()) .thenReturn("bla") .thenThrow(Status.Code.UNKNOWN.toStatus().asRuntimeException()); @@ -107,7 +107,7 @@ public class RemoteRetrierTest { Supplier s = () -> new ExponentialBackoff(Duration.ofSeconds(1), Duration.ofSeconds(10), 2.0, 0.0, 2); RemoteRetrier retrier = Mockito.spy(new RemoteRetrier(s, (e) -> false, - Retrier2.ALLOW_ALL_CALLS, Mockito.mock(Sleeper.class))); + Retrier.ALLOW_ALL_CALLS, Mockito.mock(Sleeper.class))); when(fooMock.foo()).thenThrow(Status.Code.UNKNOWN.toStatus().asRuntimeException()); assertThrows(retrier, 1); Mockito.verify(fooMock, Mockito.times(1)).foo(); @@ -119,7 +119,7 @@ public class RemoteRetrierTest { () -> new ExponentialBackoff(Duration.ofSeconds(1), Duration.ofSeconds(10), 2.0, 0.0, 2); Sleeper sleeper = Mockito.mock(Sleeper.class); RemoteRetrier retrier = Mockito.spy(new RemoteRetrier(s, (e) -> true, - Retrier2.ALLOW_ALL_CALLS, sleeper)); + Retrier.ALLOW_ALL_CALLS, sleeper)); when(fooMock.foo()).thenThrow(Status.Code.UNKNOWN.toStatus().asRuntimeException()); assertThrows(retrier, 3); @@ -135,8 +135,7 @@ public class RemoteRetrierTest { RemoteOptions options = Options.getDefaults(RemoteOptions.class); options.experimentalRemoteRetry = false; - RemoteRetrier retrier = new RemoteRetrier(options, RemoteRetrier.RETRIABLE_GRPC_ERRORS, - Retrier2.ALLOW_ALL_CALLS); + RemoteRetrier retrier = new RemoteRetrier(options, (e) -> true, Retrier.ALLOW_ALL_CALLS); try { retrier.execute(() -> { throw thrown; @@ -152,8 +151,7 @@ public class RemoteRetrierTest { StatusRuntimeException thrown = Status.Code.UNKNOWN.toStatus().asRuntimeException(); RemoteOptions options = Options.getDefaults(RemoteOptions.class); - RemoteRetrier retrier = new RemoteRetrier(options, RemoteRetrier.RETRIABLE_GRPC_ERRORS, - Retrier2.ALLOW_ALL_CALLS); + RemoteRetrier retrier = new RemoteRetrier(options, (e) -> true, Retrier.ALLOW_ALL_CALLS); AtomicInteger numCalls = new AtomicInteger(); try { @@ -162,7 +160,7 @@ public class RemoteRetrierTest { throw new RemoteRetrier.PassThroughException(thrown); }); fail(); - } catch (RetryException2 expected) { + } catch (RetryException expected) { assertThat(expected).hasCauseThat().isSameAs(thrown); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/Retrier2Test.java b/src/test/java/com/google/devtools/build/lib/remote/Retrier2Test.java deleted file mode 100644 index 220c825cda..0000000000 --- a/src/test/java/com/google/devtools/build/lib/remote/Retrier2Test.java +++ /dev/null @@ -1,307 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.remote; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.devtools.build.lib.remote.Retrier2.Backoff; -import com.google.devtools.build.lib.remote.Retrier2.CircuitBreaker; -import com.google.devtools.build.lib.remote.Retrier2.CircuitBreaker.State; -import com.google.devtools.build.lib.remote.Retrier2.CircuitBreakerException; -import com.google.devtools.build.lib.remote.Retrier2.RetryException2; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; -import java.util.function.Supplier; -import javax.annotation.concurrent.ThreadSafe; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link Retrier2}. - */ -@RunWith(JUnit4.class) -public class Retrier2Test { - - @Mock - private CircuitBreaker alwaysOpen; - - private static final Predicate RETRY_ALL = (e) -> true; - private static final Predicate RETRY_NONE = (e) -> false; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - when(alwaysOpen.state()).thenReturn(State.ACCEPT_CALLS); - } - - @Test - public void retryShouldWork_failure() throws Exception { - // Test that a call is retried according to the backoff. - // All calls fail. - - Supplier s = () -> new ZeroBackoff(/*maxRetries=*/2); - Retrier2 r = new Retrier2(s, RETRY_ALL, alwaysOpen); - try { - r.execute(() -> { - throw new Exception("call failed"); - }); - fail("exception expected."); - } catch (RetryException2 e) { - assertThat(e.getAttempts()).isEqualTo(3); - } - - verify(alwaysOpen, times(3)).recordFailure(); - verify(alwaysOpen, never()).recordSuccess(); - } - - @Test - public void retryShouldWorkNoRetries_failure() throws Exception { - // Test that a non-retriable error is not retried. - // All calls fail. - - Supplier s = () -> new ZeroBackoff(/*maxRetries=*/2); - Retrier2 r = new Retrier2(s, RETRY_NONE, alwaysOpen); - try { - r.execute(() -> { - throw new Exception("call failed"); - }); - fail("exception expected."); - } catch (RetryException2 e) { - assertThat(e.getAttempts()).isEqualTo(1); - } - - verify(alwaysOpen, times(1)).recordFailure(); - verify(alwaysOpen, never()).recordSuccess(); - } - - @Test - public void retryShouldWork_success() throws Exception { - // Test that a call is retried according to the backoff. - // The last call succeeds. - - Supplier s = () -> new ZeroBackoff(/*maxRetries=*/2); - Retrier2 r = new Retrier2(s, RETRY_ALL, alwaysOpen); - AtomicInteger numCalls = new AtomicInteger(); - int val = r.execute(() -> { - numCalls.incrementAndGet(); - if (numCalls.get() == 3) { - return 1; - } - throw new Exception("call failed"); - }); - assertThat(val).isEqualTo(1); - - verify(alwaysOpen, times(2)).recordFailure(); - verify(alwaysOpen, times(1)).recordSuccess(); - } - - @Test - public void nestedRetriesShouldWork() throws Exception { - // Test that nested calls using retries compose as expected. - - Supplier s = () -> new ZeroBackoff(/*maxRetries=*/1); - Retrier2 r = new Retrier2(s, RETRY_ALL, alwaysOpen); - - AtomicInteger attemptsLvl0 = new AtomicInteger(); - AtomicInteger attemptsLvl1 = new AtomicInteger(); - AtomicInteger attemptsLvl2 = new AtomicInteger(); - try { - r.execute(() -> { - attemptsLvl0.incrementAndGet(); - return r.execute(() -> { - attemptsLvl1.incrementAndGet(); - return r.execute(() -> { - attemptsLvl2.incrementAndGet(); - throw new Exception("call failed"); - }); - }); - }); - } catch (RetryException2 outer) { - assertThat(outer.getAttempts()).isEqualTo(2); - assertThat(outer).hasCauseThat().hasMessageThat().isEqualTo("call failed"); - assertThat(attemptsLvl0.get()).isEqualTo(2); - assertThat(attemptsLvl1.get()).isEqualTo(4); - assertThat(attemptsLvl2.get()).isEqualTo(8); - } - } - - @Test - public void circuitBreakerShouldTrip() throws Exception { - // Test that a circuit breaker can trip. - - Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); - TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); - Retrier2 r = new Retrier2(s, RETRY_ALL, cb); - - try { - r.execute(() -> { - throw new Exception("call failed"); - }); - fail ("exception expected"); - } catch (CircuitBreakerException expected) { - // Intentionally left empty. - } - - assertThat(cb.state()).isEqualTo(State.REJECT_CALLS); - assertThat(cb.consecutiveFailures).isEqualTo(2); - } - - @Test - public void circuitBreakerCanRecover() throws Exception { - // Test that a circuit breaker can recover from REJECT_CALLS to ACCEPT_CALLS by - // utilizing the TRIAL_CALL state. - - Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); - TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); - Retrier2 r = new Retrier2(s, RETRY_ALL, cb); - - cb.trialCall(); - - assertThat(cb.state()).isEqualTo(State.TRIAL_CALL); - - int val = r.execute(() -> 10); - assertThat(val).isEqualTo(10); - assertThat(cb.state()).isEqualTo(State.ACCEPT_CALLS); - } - - @Test - public void circuitBreakerHalfOpenIsNotRetried() throws Exception { - // Test that a call executed in TRIAL_CALL state is not retried - // in case of failure. - - Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); - TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); - Retrier2 r = new Retrier2(s, RETRY_ALL, cb); - - cb.trialCall(); - - try { - r.execute(() -> { - throw new Exception("call failed"); - }); - } catch (RetryException2 expected) { - // Intentionally left empty. - } - - assertThat(cb.consecutiveFailures).isEqualTo(1); - } - - @Test - public void interruptsShouldNotBeRetried_flag() throws Exception { - // Test that a call is not executed / retried if the current thread - // is interrupted. - - Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); - TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); - Retrier2 r = new Retrier2(s, RETRY_ALL, cb); - - try { - Thread.currentThread().interrupt(); - r.execute(() -> 10); - } catch (InterruptedException expected) { - // Intentionally left empty. - } - } - - @Test - public void interruptsShouldNotBeRetried_exception() throws Exception { - // Test that a call is not retried if an InterruptedException is thrown. - - Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); - TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); - Retrier2 r = new Retrier2(s, RETRY_ALL, cb); - - try { - Thread.currentThread().interrupt(); - r.execute(() -> { - throw new InterruptedException(); - }); - } catch (InterruptedException expected) { - // Intentionally left empty. - } - } - - /** - * Simple circuit breaker that trips after N consecutive failures. - */ - @ThreadSafe - private static class TripAfterNCircuitBreaker implements CircuitBreaker { - - private final int maxConsecutiveFailures; - - private State state = State.ACCEPT_CALLS; - private int consecutiveFailures; - - TripAfterNCircuitBreaker(int maxConsecutiveFailures) { - this.maxConsecutiveFailures = maxConsecutiveFailures; - } - - @Override - public synchronized State state() { - return state; - } - - @Override - public synchronized void recordFailure() { - consecutiveFailures++; - if (consecutiveFailures >= maxConsecutiveFailures) { - state = State.REJECT_CALLS; - } - } - - @Override - public synchronized void recordSuccess() { - consecutiveFailures = 0; - state = State.ACCEPT_CALLS; - } - - void trialCall() { - state = State.TRIAL_CALL; - } - } - - private static class ZeroBackoff implements Backoff { - - private final int maxRetries; - private int retries; - - public ZeroBackoff(int maxRetries) { - this.maxRetries = maxRetries; - } - - @Override - public long nextDelayMillis() { - if (retries >= maxRetries) { - return -1; - } - retries++; - return 0; - } - - @Override - public int getRetryAttempts() { - return retries; - } - } -} diff --git a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java index dfce640c06..945c27d66d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RetrierTest.java @@ -1,4 +1,4 @@ -// Copyright 2015 The Bazel Authors. All rights reserved. +// Copyright 2016 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -11,156 +11,297 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package com.google.devtools.build.lib.remote; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.fail; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.Range; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import java.io.IOException; -import java.time.Duration; +import com.google.devtools.build.lib.remote.Retrier.Backoff; +import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker; +import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State; +import com.google.devtools.build.lib.remote.Retrier.CircuitBreakerException; +import com.google.devtools.build.lib.remote.Retrier.RetryException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mockito; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; -/** Tests for {@link Retrier}. */ +/** + * Tests for {@link Retrier}. + */ @RunWith(JUnit4.class) public class RetrierTest { - interface Foo { - public String foo(); - } + @Mock + private CircuitBreaker alwaysOpen; - private Foo fooMock; + private static final Predicate RETRY_ALL = (e) -> true; + private static final Predicate RETRY_NONE = (e) -> false; @Before - public void setUp() { - fooMock = Mockito.mock(Foo.class); + public void setup() { + MockitoAnnotations.initMocks(this); + when(alwaysOpen.state()).thenReturn(State.ACCEPT_CALLS); } @Test - public void testExponentialBackoff() throws Exception { - Retrier.Backoff backoff = - Retrier.Backoff.exponential( - Duration.ofSeconds(1), Duration.ofSeconds(10), 2, 0, 6) - .get(); - assertThat(backoff.nextDelayMillis()).isEqualTo(1000); - assertThat(backoff.nextDelayMillis()).isEqualTo(2000); - assertThat(backoff.nextDelayMillis()).isEqualTo(4000); - assertThat(backoff.nextDelayMillis()).isEqualTo(8000); - assertThat(backoff.nextDelayMillis()).isEqualTo(10000); - assertThat(backoff.nextDelayMillis()).isEqualTo(10000); - assertThat(backoff.nextDelayMillis()).isEqualTo(Retrier.Backoff.STOP); + public void retryShouldWork_failure() throws Exception { + // Test that a call is retried according to the backoff. + // All calls fail. + + Supplier s = () -> new ZeroBackoff(/*maxRetries=*/2); + Retrier r = new Retrier(s, RETRY_ALL, alwaysOpen); + try { + r.execute(() -> { + throw new Exception("call failed"); + }); + fail("exception expected."); + } catch (RetryException e) { + assertThat(e.getAttempts()).isEqualTo(3); + } + + verify(alwaysOpen, times(3)).recordFailure(); + verify(alwaysOpen, never()).recordSuccess(); } @Test - public void testExponentialBackoffJittered() throws Exception { - Retrier.Backoff backoff = - Retrier.Backoff.exponential( - Duration.ofSeconds(1), Duration.ofSeconds(10), 2, 0.1, 6) - .get(); - assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(900L, 1100L)); - assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(1800L, 2200L)); - assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(3600L, 4400L)); - assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(7200L, 8800L)); - assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(9000L, 11000L)); - assertThat(backoff.nextDelayMillis()).isIn(Range.closedOpen(9000L, 11000L)); - assertThat(backoff.nextDelayMillis()).isEqualTo(Retrier.Backoff.STOP); - } + public void retryShouldWorkNoRetries_failure() throws Exception { + // Test that a non-retriable error is not retried. + // All calls fail. - void assertThrows(Retrier retrier, int attempts) throws InterruptedException, IOException { + Supplier s = () -> new ZeroBackoff(/*maxRetries=*/2); + Retrier r = new Retrier(s, RETRY_NONE, alwaysOpen); try { - retrier.execute(() -> fooMock.foo()); - fail(); + r.execute(() -> { + throw new Exception("call failed"); + }); + fail("exception expected."); } catch (RetryException e) { - assertThat(e.getAttempts()).isEqualTo(attempts); + assertThat(e.getAttempts()).isEqualTo(1); } + + verify(alwaysOpen, times(1)).recordFailure(); + verify(alwaysOpen, never()).recordSuccess(); } @Test - public void testNoRetries() throws Exception { - Retrier retrier = Mockito.spy(Retrier.NO_RETRIES); - Mockito.doNothing().when(retrier).sleep(Mockito.anyLong()); - when(fooMock.foo()) - .thenReturn("bla") - .thenThrow(Status.Code.UNKNOWN.toStatus().asRuntimeException()); - assertThat(retrier.execute(() -> fooMock.foo())).isEqualTo("bla"); - assertThrows(retrier, 1); - Mockito.verify(fooMock, Mockito.times(2)).foo(); + public void retryShouldWork_success() throws Exception { + // Test that a call is retried according to the backoff. + // The last call succeeds. + + Supplier s = () -> new ZeroBackoff(/*maxRetries=*/2); + Retrier r = new Retrier(s, RETRY_ALL, alwaysOpen); + AtomicInteger numCalls = new AtomicInteger(); + int val = r.execute(() -> { + numCalls.incrementAndGet(); + if (numCalls.get() == 3) { + return 1; + } + throw new Exception("call failed"); + }); + assertThat(val).isEqualTo(1); + + verify(alwaysOpen, times(2)).recordFailure(); + verify(alwaysOpen, times(1)).recordSuccess(); } @Test - public void testNonRetriableError() throws Exception { - Retrier retrier = - Mockito.spy( - new Retrier( - Retrier.Backoff.exponential( - Duration.ofSeconds(1), Duration.ofSeconds(10), 2, 0, 2), - Retrier.DEFAULT_IS_RETRIABLE)); - Mockito.doNothing().when(retrier).sleep(Mockito.anyLong()); - when(fooMock.foo()).thenThrow(Status.Code.NOT_FOUND.toStatus().asRuntimeException()); - assertThrows(retrier, 1); - Mockito.verify(fooMock, Mockito.times(1)).foo(); + public void nestedRetriesShouldWork() throws Exception { + // Test that nested calls using retries compose as expected. + + Supplier s = () -> new ZeroBackoff(/*maxRetries=*/1); + Retrier r = new Retrier(s, RETRY_ALL, alwaysOpen); + + AtomicInteger attemptsLvl0 = new AtomicInteger(); + AtomicInteger attemptsLvl1 = new AtomicInteger(); + AtomicInteger attemptsLvl2 = new AtomicInteger(); + try { + r.execute(() -> { + attemptsLvl0.incrementAndGet(); + return r.execute(() -> { + attemptsLvl1.incrementAndGet(); + return r.execute(() -> { + attemptsLvl2.incrementAndGet(); + throw new Exception("call failed"); + }); + }); + }); + } catch (RetryException outer) { + assertThat(outer.getAttempts()).isEqualTo(2); + assertThat(outer).hasCauseThat().hasMessageThat().isEqualTo("call failed"); + assertThat(attemptsLvl0.get()).isEqualTo(2); + assertThat(attemptsLvl1.get()).isEqualTo(4); + assertThat(attemptsLvl2.get()).isEqualTo(8); + } + } + + @Test + public void circuitBreakerShouldTrip() throws Exception { + // Test that a circuit breaker can trip. + + Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); + TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); + Retrier r = new Retrier(s, RETRY_ALL, cb); + + try { + r.execute(() -> { + throw new Exception("call failed"); + }); + fail ("exception expected"); + } catch (CircuitBreakerException expected) { + // Intentionally left empty. + } + + assertThat(cb.state()).isEqualTo(State.REJECT_CALLS); + assertThat(cb.consecutiveFailures).isEqualTo(2); } @Test - public void testRepeatedRetriesReset() throws Exception { - Retrier retrier = - Mockito.spy( - new Retrier( - Retrier.Backoff.exponential( - Duration.ofSeconds(1), Duration.ofSeconds(10), 2, 0, 2), - Retrier.RETRY_ALL)); - Mockito.doNothing().when(retrier).sleep(Mockito.anyLong()); - when(fooMock.foo()).thenThrow(Status.Code.UNKNOWN.toStatus().asRuntimeException()); - assertThrows(retrier, 3); - assertThrows(retrier, 3); - Mockito.verify(retrier, Mockito.times(2)).sleep(1000); - Mockito.verify(retrier, Mockito.times(2)).sleep(2000); - Mockito.verify(fooMock, Mockito.times(6)).foo(); + public void circuitBreakerCanRecover() throws Exception { + // Test that a circuit breaker can recover from REJECT_CALLS to ACCEPT_CALLS by + // utilizing the TRIAL_CALL state. + + Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); + TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); + Retrier r = new Retrier(s, RETRY_ALL, cb); + + cb.trialCall(); + + assertThat(cb.state()).isEqualTo(State.TRIAL_CALL); + + int val = r.execute(() -> 10); + assertThat(val).isEqualTo(10); + assertThat(cb.state()).isEqualTo(State.ACCEPT_CALLS); } @Test - public void testInterruptedExceptionIsPassedThrough() throws Exception { - InterruptedException thrown = new InterruptedException(); + public void circuitBreakerHalfOpenIsNotRetried() throws Exception { + // Test that a call executed in TRIAL_CALL state is not retried + // in case of failure. + + Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); + TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); + Retrier r = new Retrier(s, RETRY_ALL, cb); + + cb.trialCall(); + try { - Retrier.NO_RETRIES.execute(() -> { - throw thrown; + r.execute(() -> { + throw new Exception("call failed"); }); - fail(); - } catch (InterruptedException expected) { - assertThat(expected).isSameAs(thrown); + } catch (RetryException expected) { + // Intentionally left empty. } + + assertThat(cb.consecutiveFailures).isEqualTo(1); } @Test - public void testPassThroughException() throws Exception { - StatusRuntimeException thrown = Status.Code.UNKNOWN.toStatus().asRuntimeException(); + public void interruptsShouldNotBeRetried_flag() throws Exception { + // Test that a call is not executed / retried if the current thread + // is interrupted. + + Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); + TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); + Retrier r = new Retrier(s, RETRY_ALL, cb); + try { - Retrier.NO_RETRIES.execute(() -> { - throw new Retrier.PassThroughException(thrown); - }); - fail(); - } catch (StatusRuntimeException expected) { - assertThat(expected).isSameAs(thrown); + Thread.currentThread().interrupt(); + r.execute(() -> 10); + } catch (InterruptedException expected) { + // Intentionally left empty. } } @Test - public void testIOExceptionIsPassedThrough() throws Exception { - IOException thrown = new IOException(); + public void interruptsShouldNotBeRetried_exception() throws Exception { + // Test that a call is not retried if an InterruptedException is thrown. + + Supplier s = () -> new ZeroBackoff(/*maxRetries=*/3); + TripAfterNCircuitBreaker cb = new TripAfterNCircuitBreaker(/*maxConsecutiveFailures=*/2); + Retrier r = new Retrier(s, RETRY_ALL, cb); + try { - Retrier.NO_RETRIES.execute(() -> { - throw thrown; + Thread.currentThread().interrupt(); + r.execute(() -> { + throw new InterruptedException(); }); - fail(); - } catch (IOException expected) { - assertThat(expected).isSameAs(thrown); + } catch (InterruptedException expected) { + // Intentionally left empty. + } + } + + /** + * Simple circuit breaker that trips after N consecutive failures. + */ + @ThreadSafe + private static class TripAfterNCircuitBreaker implements CircuitBreaker { + + private final int maxConsecutiveFailures; + + private State state = State.ACCEPT_CALLS; + private int consecutiveFailures; + + TripAfterNCircuitBreaker(int maxConsecutiveFailures) { + this.maxConsecutiveFailures = maxConsecutiveFailures; + } + + @Override + public synchronized State state() { + return state; + } + + @Override + public synchronized void recordFailure() { + consecutiveFailures++; + if (consecutiveFailures >= maxConsecutiveFailures) { + state = State.REJECT_CALLS; + } + } + + @Override + public synchronized void recordSuccess() { + consecutiveFailures = 0; + state = State.ACCEPT_CALLS; + } + + void trialCall() { + state = State.TRIAL_CALL; + } + } + + private static class ZeroBackoff implements Backoff { + + private final int maxRetries; + private int retries; + + public ZeroBackoff(int maxRetries) { + this.maxRetries = maxRetries; + } + + @Override + public long nextDelayMillis() { + if (retries >= maxRetries) { + return -1; + } + retries++; + return 0; + } + + @Override + public int getRetryAttempts() { + return retries; } } } -- cgit v1.2.3