aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib')
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java43
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java11
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java120
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java4
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java12
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteRetrierUtils.java33
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java12
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/Retrier.java363
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/Retrier2.java260
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RetryException.java43
10 files changed, 331 insertions, 570 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 0bef2ed1ae..609ee3c669 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -32,6 +32,7 @@ import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import com.google.devtools.build.lib.remote.Retrier.RetryException;
import com.google.devtools.remoteexecution.v1test.Digest;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
@@ -68,7 +69,7 @@ final class ByteStreamUploader {
private final Channel channel;
private final CallCredentials callCredentials;
private final long callTimeoutSecs;
- private final Retrier retrier;
+ private final RemoteRetrier retrier;
private final ListeningScheduledExecutorService retryService;
private final Object lock = new Object();
@@ -89,7 +90,7 @@ final class ByteStreamUploader {
* case no authentication is performed
* @param callTimeoutSecs the timeout in seconds after which a {@code Write} gRPC call must be
* complete. The timeout resets between retries
- * @param retrier the {@link Retrier} whose backoff strategy to use for retry timings.
+ * @param retrier the {@link RemoteRetrier} whose backoff strategy to use for retry timings.
* @param retryService the executor service to schedule retries on. It's the responsibility of the
* caller to properly shutdown the service after use. Users should avoid shutting down the
* service before {@link #shutdown()} has been called
@@ -99,7 +100,7 @@ final class ByteStreamUploader {
Channel channel,
@Nullable CallCredentials callCredentials,
long callTimeoutSecs,
- Retrier retrier,
+ RemoteRetrier retrier,
ListeningScheduledExecutorService retryService) {
checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0.");
@@ -112,12 +113,11 @@ final class ByteStreamUploader {
}
/**
- * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code
- * ByteStream} service. The call blocks until the upload is complete, or throws an {@link
- * Exception} in case of error.
+ * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service.
+ * The call blocks until the upload is complete, or throws an {@link Exception} in case of error.
*
- * <p>Uploads are retried according to the specified {@link Retrier}. Retrying is transparent to
- * the user of this API.
+ * <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is
+ * transparent to the user of this API.
*
* <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being
* performed. This is transparent to the user of this API.
@@ -125,8 +125,7 @@ final class ByteStreamUploader {
* @throws IOException when reading of the {@link Chunker}s input source fails
* @throws RetryException when the upload failed after a retry
*/
- public void uploadBlob(Chunker chunker)
- throws IOException, InterruptedException {
+ public void uploadBlob(Chunker chunker) throws IOException, InterruptedException {
uploadBlobs(singletonList(chunker));
}
@@ -136,8 +135,8 @@ final class ByteStreamUploader {
* upload failed. Any other uploads will continue uploading in the background, until they complete
* or the {@link #shutdown()} method is called. Errors encountered by these uploads are swallowed.
*
- * <p>Uploads are retried according to the specified {@link Retrier}. Retrying is transparent to
- * the user of this API.
+ * <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is
+ * transparent to the user of this API.
*
* <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being
* performed. This is transparent to the user of this API.
@@ -145,8 +144,7 @@ final class ByteStreamUploader {
* @throws IOException when reading of the {@link Chunker}s input source fails
* @throws RetryException when the upload failed after a retry
*/
- public void uploadBlobs(Iterable<Chunker> chunkers)
- throws IOException, InterruptedException {
+ public void uploadBlobs(Iterable<Chunker> chunkers) throws IOException, InterruptedException {
List<ListenableFuture<Void>> uploads = new ArrayList<>();
for (Chunker chunker : chunkers) {
@@ -226,9 +224,7 @@ final class ByteStreamUploader {
}
private void startAsyncUploadWithRetry(
- Chunker chunker,
- Retrier.Backoff backoffTimes,
- SettableFuture<Void> overallUploadResult) {
+ Chunker chunker, Retrier.Backoff backoffTimes, SettableFuture<Void> overallUploadResult) {
AsyncUpload.Listener listener =
new AsyncUpload.Listener() {
@@ -241,9 +237,13 @@ final class ByteStreamUploader {
public void failure(Status status) {
StatusException cause = status.asException();
long nextDelayMillis = backoffTimes.nextDelayMillis();
- if (nextDelayMillis < 0 || !retrier.isRetriable(status)) {
+ if (nextDelayMillis < 0 || !retrier.isRetriable(cause)) {
// Out of retries or status not retriable.
- RetryException error = new RetryException(cause, backoffTimes.getRetryAttempts());
+ RetryException error =
+ new RetryException(
+ "Out of retries or status not retriable.",
+ backoffTimes.getRetryAttempts(),
+ cause);
overallUploadResult.setException(error);
} else {
retryAsyncUpload(nextDelayMillis, chunker, backoffTimes, overallUploadResult);
@@ -272,14 +272,15 @@ final class ByteStreamUploader {
schedulingResult.get();
} catch (Exception e) {
overallUploadResult.setException(
- new RetryException(e, backoffTimes.getRetryAttempts()));
+ new RetryException(
+ "Scheduled execution errored.", backoffTimes.getRetryAttempts(), e));
}
},
MoreExecutors.directExecutor());
} catch (RejectedExecutionException e) {
// May be thrown by .schedule(...) if i.e. the executor is shutdown.
overallUploadResult.setException(
- new RetryException(e, backoffTimes.getRetryAttempts()));
+ new RetryException("Rejected by executor.", backoffTimes.getRetryAttempts(), e));
}
}
};
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
index 5c3722e727..908fb9b0cc 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
@@ -30,6 +30,7 @@ import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.MetadataProvider;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.DigestUtil.ActionKey;
+import com.google.devtools.build.lib.remote.Retrier.RetryException;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
@@ -70,7 +71,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
private final RemoteOptions options;
private final CallCredentials credentials;
private final Channel channel;
- private final Retrier retrier;
+ private final RemoteRetrier retrier;
private final ByteStreamUploader uploader;
private final DigestUtil digestUtil;
private final ListeningScheduledExecutorService retryScheduler =
@@ -81,7 +82,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
Channel channel,
CallCredentials credentials,
RemoteOptions options,
- Retrier retrier,
+ RemoteRetrier retrier,
DigestUtil digestUtil) {
this.options = options;
this.credentials = credentials;
@@ -266,7 +267,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
* This method can throw {@link StatusRuntimeException}, but the RemoteCache interface does not
* allow throwing such an exception. Any caller must make sure to catch the
* {@link StatusRuntimeException}. Note that the retrier implicitly catches it, so if this is used
- * in the context of {@link Retrier#execute}, that's perfectly safe.
+ * in the context of {@link RemoteRetrier#execute}, that's perfectly safe.
*
* <p>This method also converts any NOT_FOUND code returned from the server into a
* {@link CacheNotFoundException}. TODO(olaola): this is not enough. NOT_FOUND can also be raised
@@ -318,7 +319,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
.setActionResult(result)
.build()));
} catch (RetryException e) {
- if (e.causedByStatusCode(Status.Code.UNIMPLEMENTED)) {
+ if (RemoteRetrierUtils.causedByStatus(e, Status.Code.UNIMPLEMENTED)) {
// Silently return without upload.
return;
}
@@ -443,7 +444,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
.setActionDigest(actionKey.getDigest())
.build()));
} catch (RetryException e) {
- if (e.causedByStatusCode(Status.Code.NOT_FOUND)) {
+ if (RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND)) {
// Return null to indicate that it was a cache miss.
return null;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
index e87fa4b305..dbff8e7e88 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
@@ -45,10 +45,13 @@ class GrpcRemoteExecutor {
private final Channel channel;
private final CallCredentials callCredentials;
private final int callTimeoutSecs;
- private final Retrier retrier;
+ private final RemoteRetrier retrier;
- public GrpcRemoteExecutor(Channel channel, @Nullable CallCredentials callCredentials,
- int callTimeoutSecs, Retrier retrier) {
+ public GrpcRemoteExecutor(
+ Channel channel,
+ @Nullable CallCredentials callCredentials,
+ int callTimeoutSecs,
+ RemoteRetrier retrier) {
Preconditions.checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0.");
this.channel = channel;
this.callCredentials = callCredentials;
@@ -123,61 +126,64 @@ class GrpcRemoteExecutor {
throws IOException, InterruptedException {
// The only errors retried here are transient failures of the Action itself on the server, not
// any gRPC errors that occurred during the call.
- return retrier.execute(() -> {
- // Here all transient gRPC errors will be retried.
- Operation op = retrier.execute(() -> execBlockingStub().execute(request));
- ExecuteResponse resp = getOperationResponse(op);
- if (resp != null) {
- return resp;
- }
- Request wr = Request.newBuilder().setTarget(op.getName()).build();
- // Here all transient gRPC errors will be retried, while transient failures of the Action
- // itself will be propagated.
- return retrier.execute(
- () -> {
- Iterator<ChangeBatch> replies = watcherBlockingStub().watch(wr);
- while (replies.hasNext()) {
- ChangeBatch cb = replies.next();
- for (Change ch : cb.getChangesList()) {
- switch (ch.getState()) {
- case INITIAL_STATE_SKIPPED:
- continue;
- case ERROR:
- try {
- throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class));
- } catch (InvalidProtocolBufferException e) {
- throw new IOException(e);
- }
- case DOES_NOT_EXIST:
- // TODO(olaola): either make this retriable, or use a different exception.
- throw new IOException(
- String.format("Operation %s lost on the remote server.", op.getName()));
- case EXISTS:
- Operation o;
- try {
- o = ch.getData().unpack(Operation.class);
- } catch (InvalidProtocolBufferException e) {
- throw new IOException(e);
- }
- try {
- ExecuteResponse r = getOperationResponse(o);
- if (r != null) {
- return r;
- }
- } catch (StatusRuntimeException e) {
- // Pass through the Watch retry and retry the whole execute+watch call.
- throw new Retrier.PassThroughException(e);
+ return retrier.execute(
+ () -> {
+ // Here all transient gRPC errors will be retried.
+ Operation op = retrier.execute(() -> execBlockingStub().execute(request));
+ ExecuteResponse resp = getOperationResponse(op);
+ if (resp != null) {
+ return resp;
+ }
+ Request wr = Request.newBuilder().setTarget(op.getName()).build();
+ // Here all transient gRPC errors will be retried, while transient failures of the Action
+ // itself will be propagated.
+ return retrier.execute(
+ () -> {
+ Iterator<ChangeBatch> replies = watcherBlockingStub().watch(wr);
+ while (replies.hasNext()) {
+ ChangeBatch cb = replies.next();
+ for (Change ch : cb.getChangesList()) {
+ switch (ch.getState()) {
+ case INITIAL_STATE_SKIPPED:
+ continue;
+ case ERROR:
+ try {
+ throw StatusProto.toStatusRuntimeException(
+ ch.getData().unpack(Status.class));
+ } catch (InvalidProtocolBufferException e) {
+ throw new IOException(e);
+ }
+ case DOES_NOT_EXIST:
+ // TODO(olaola): either make this retriable, or use a different exception.
+ throw new IOException(
+ String.format("Operation %s lost on the remote server.", op.getName()));
+ case EXISTS:
+ Operation o;
+ try {
+ o = ch.getData().unpack(Operation.class);
+ } catch (InvalidProtocolBufferException e) {
+ throw new IOException(e);
+ }
+ try {
+ ExecuteResponse r = getOperationResponse(o);
+ if (r != null) {
+ return r;
+ }
+ } catch (StatusRuntimeException e) {
+ // Pass through the Watch retry and retry the whole execute+watch call.
+ throw new RemoteRetrier.PassThroughException(e);
+ }
+ continue;
+ default:
+ // This can only happen if the enum gets unexpectedly extended.
+ throw new IOException(
+ String.format("Illegal change state: %s", ch.getState()));
}
- continue;
- default:
- // This can only happen if the enum gets unexpectedly extended.
- throw new IOException(String.format("Illegal change state: %s", ch.getState()));
+ }
}
- }
- }
- throw new IOException(
- String.format("Watch request for %s terminated with no result.", op.getName()));
- });
- });
+ throw new IOException(
+ String.format("Watch request for %s terminated with no result.", op.getName()));
+ });
+ });
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index c923a4d811..8928142626 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -114,7 +114,9 @@ public final class RemoteModule extends BlazeModule {
boolean remoteOrLocalCache = SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions);
boolean grpcCache = GrpcRemoteCache.isRemoteCacheOptions(remoteOptions);
- Retrier retrier = new Retrier(remoteOptions);
+ RemoteRetrier retrier =
+ new RemoteRetrier(
+ remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, Retrier.ALLOW_ALL_CALLS);
CallCredentials creds = GrpcUtils.newCallCredentials(authAndTlsOptions);
// TODO(davido): The naming is wrong here. "Remote"-prefix in RemoteActionCache class has no
// meaning.
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
index 8359f4de99..27de8b7305 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
@@ -36,13 +36,13 @@ import java.util.function.Supplier;
* // Not retried.
* throw PassThroughException(new IOException("fail"));
* }
- * } catch (RetryException2 e) {
+ * } catch (RetryException e) {
* // e.getCause() is the IOException
* System.out.println(e.getCause());
* }
* </code>
*/
-class RemoteRetrier extends Retrier2 {
+class RemoteRetrier extends Retrier {
/**
* Wraps around an {@link Exception} to make it pass through a single layer of retries.
@@ -96,13 +96,13 @@ class RemoteRetrier extends Retrier2 {
}
@Override
- public <T> T execute(Callable<T> call) throws RetryException2, InterruptedException {
+ public <T> T execute(Callable<T> call) throws RetryException, InterruptedException {
try {
return super.execute(call);
- } catch (RetryException2 e) {
+ } catch (RetryException e) {
if (e.getCause() instanceof PassThroughException) {
PassThroughException passThrough = (PassThroughException) e.getCause();
- throw new RetryException2("Retries aborted because of PassThroughException",
+ throw new RetryException("Retries aborted because of PassThroughException",
e.getAttempts(), (Exception) passThrough.getCause());
}
throw e;
@@ -116,7 +116,7 @@ class RemoteRetrier extends Retrier2 {
return e -> !(e instanceof PassThroughException) && delegate.test(e);
}
- static class ExponentialBackoff implements Retrier2.Backoff {
+ static class ExponentialBackoff implements Retrier.Backoff {
private final long maxMillis;
private long nextDelayMillis;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrierUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrierUtils.java
new file mode 100644
index 0000000000..f518b43682
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrierUtils.java
@@ -0,0 +1,33 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.remote;
+
+import com.google.devtools.build.lib.remote.Retrier.RetryException;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+
+/** Methods useful when using the {@link RemoteRetrier}. */
+public final class RemoteRetrierUtils {
+
+ public static boolean causedByStatus(RetryException e, Status.Code expected) {
+ if (e.getCause() instanceof StatusRuntimeException) {
+ return ((StatusRuntimeException) e.getCause()).getStatus().getCode() == expected;
+ } else if (e.getCause() instanceof StatusException) {
+ return ((StatusException) e.getCause()).getStatus().getCode() == expected;
+ }
+ return false;
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index 3a1b474000..c67ef489a6 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -33,6 +33,7 @@ import com.google.devtools.build.lib.exec.SpawnExecException;
import com.google.devtools.build.lib.exec.SpawnInputExpander;
import com.google.devtools.build.lib.exec.SpawnRunner;
import com.google.devtools.build.lib.remote.DigestUtil.ActionKey;
+import com.google.devtools.build.lib.remote.Retrier.RetryException;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.io.FileOutErr;
@@ -223,9 +224,10 @@ class RemoteSpawnRunner implements SpawnRunner {
return handleError(cause, policy.getFileOutErr());
}
- private SpawnResult handleError(IOException cause, FileOutErr outErr) throws IOException,
+ private SpawnResult handleError(IOException exception, FileOutErr outErr) throws IOException,
ExecException {
- if (cause instanceof TimeoutException) {
+ final Throwable cause = exception.getCause();
+ if (exception instanceof TimeoutException || cause instanceof TimeoutException) {
// TODO(buchgr): provide stdout/stderr from the action that timed out.
// Remove the unsuported message once remote execution tests no longer check for it.
try (OutputStream out = outErr.getOutputStream()) {
@@ -238,8 +240,8 @@ class RemoteSpawnRunner implements SpawnRunner {
.build();
}
final Status status;
- if (cause instanceof RetryException
- && ((RetryException) cause).causedByStatusCode(Code.UNAVAILABLE)) {
+ if (exception instanceof RetryException
+ && RemoteRetrierUtils.causedByStatus((RetryException) exception, Code.UNAVAILABLE)) {
status = Status.EXECUTION_FAILED_CATASTROPHICALLY;
} else if (cause instanceof CacheNotFoundException) {
status = Status.REMOTE_CACHE_FAILED;
@@ -247,7 +249,7 @@ class RemoteSpawnRunner implements SpawnRunner {
status = Status.EXECUTION_FAILED;
}
throw new SpawnExecException(
- Throwables.getStackTraceAsString(cause),
+ Throwables.getStackTraceAsString(exception),
new SpawnResult.Builder()
.setStatus(status)
.setExitCode(ExitCode.REMOTE_ERROR.getNumericExitCode())
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
index e529109121..b675e51e67 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
@@ -1,4 +1,4 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
+// Copyright 2017 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,227 +15,246 @@
package com.google.devtools.build.lib.remote;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import io.grpc.Status;
-import io.grpc.StatusException;
-import io.grpc.StatusRuntimeException;
+import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State;
import java.io.IOException;
-import java.time.Duration;
import java.util.concurrent.Callable;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.ThreadSafe;
/**
- * Supports execution with retries on particular gRPC Statuses. The retrier is ThreadSafe.
+ * Supports retrying the execution of a {@link Callable} in case of failure.
*
- * <p>Example usage: The simple use-case is to call retrier.execute, e.g:
- *
- * <pre>
- * foo = retrier.execute(
- * new Callable<Foo>() {
- * @Override
- * public Foo call() {
- * return grpcStub.getFoo(fooRequest);
- * }
- * });
- * </pre>
+ * <p>The errors that are retried are configurable via a {@link Predicate<? super Exception>}. The
+ * delay between executions is specified by a {@link Backoff}. Additionally, the retrier supports
+ * circuit breaking to stop execution in case of high failure rates.
*/
-public class Retrier {
- /** Wraps around a StatusRuntimeException to make it pass through a single layer of retries. */
- public static class PassThroughException extends Exception {
- public PassThroughException(StatusRuntimeException e) {
- super(e);
- }
- }
+// TODO(buchgr): Move to a different package and use it for BES code.
+@ThreadSafe
+class Retrier {
/**
- * Backoff is a stateful object providing a sequence of durations that are used to time delays
- * between retries. It is not ThreadSafe. The reason that Backoff needs to be stateful, rather
- * than a static map of attempt number to delay, is to enable using the retrier via the manual
- * calling isRetriable and nextDelayMillis manually (see ByteStreamUploader example).
+ * A backoff strategy.
*/
public interface Backoff {
- /** Indicates that no more retries should be made for use in {@link #nextDelayMillis()}. */
- static final long STOP = -1L;
-
- /** Returns the next delay in milliseconds, or < 0 if we should not continue retrying. */
+ /**
+ * Returns the next delay in milliseconds, or a value less than {@code 0} if we should stop
+ * retrying.
+ */
long nextDelayMillis();
/**
* Returns the number of calls to {@link #nextDelayMillis()} thus far, not counting any calls
- * that returned STOP.
+ * that returned less than {@code 0}.
*/
int getRetryAttempts();
+ }
+
+ /**
+ * The circuit breaker allows to reject execution when failure rates are high.
+ *
+ * <p>The initial state of a circuit breaker is the {@link State#ACCEPT_CALLS}. Calls are
+ * executed and retried in this state. However, if error rates are high a circuit breaker can
+ * choose to transition into {@link State#REJECT_CALLS}. In this state any calls are rejected with
+ * a {@link RetryException} immediately. A circuit breaker in state {@link State#REJECT_CALLS}
+ * can periodically return a {@code TRIAL_CALL} state, in which case a call will be executed once
+ * and in case of success the circuit breaker may return to state {@code ACCEPT_CALLS}.
+ *
+ * <p>A circuit breaker implementation must be thread-safe.
+ *
+ * @see <a href = "https://martinfowler.com/bliki/CircuitBreaker.html">CircuitBreaker</a>
+ */
+ public interface CircuitBreaker {
+
+ enum State {
+ /**
+ * Calls are executed and retried in case of failure.
+ *
+ * <p>The circuit breaker can transition into state {@link State#REJECT_CALLS}.
+ */
+ ACCEPT_CALLS,
+
+ /**
+ * A call is executed and not retried in case of failure.
+ *
+ * <p>The circuit breaker can transition into any state.
+ */
+ TRIAL_CALL,
+
+ /**
+ * All calls are rejected.
+ *
+ * <p>The circuit breaker can transition into state {@link State#TRIAL_CALL}.
+ */
+ REJECT_CALLS
+ }
/**
- * Creates a Backoff supplier for a Backoff which does not support any retries. Both the
- * Supplier and the Backoff are stateless and thread-safe.
+ * Returns the current {@link State} of the circuit breaker.
*/
- static final Supplier<Backoff> NO_RETRIES =
- () ->
- new Backoff() {
- @Override
- public long nextDelayMillis() {
- return STOP;
- }
-
- @Override
- public int getRetryAttempts() {
- return 0;
- }
- };
+ State state();
/**
- * Creates a Backoff supplier for an optionally jittered exponential backoff. The supplier is
- * ThreadSafe (non-synchronized calls to get() are fine), but the returned Backoff is not.
- *
- * @param initial The initial backoff duration.
- * @param max The maximum backoff duration.
- * @param multiplier The amount the backoff should increase in each iteration. Must be >1.
- * @param jitter The amount the backoff should be randomly varied (0-1), with 0 providing no
- * jitter, and 1 providing a duration that is 0-200% of the non-jittered duration.
- * @param maxAttempts Maximal times to attempt a retry 0 means no retries.
+ * Called after an execution failed.
*/
- static Supplier<Backoff> exponential(
- Duration initial, Duration max, double multiplier, double jitter, int maxAttempts) {
- Preconditions.checkArgument(multiplier > 1, "multipler must be > 1");
- Preconditions.checkArgument(jitter >= 0 && jitter <= 1, "jitter must be in the range (0, 1)");
- Preconditions.checkArgument(maxAttempts >= 0, "maxAttempts must be >= 0");
- return () ->
- new Backoff() {
- private final long maxMillis = max.toMillis();
- private long nextDelayMillis = initial.toMillis();
- private int attempts = 0;
-
- @Override
- public long nextDelayMillis() {
- if (attempts == maxAttempts) {
- return STOP;
- }
- attempts++;
- double jitterRatio = jitter * (ThreadLocalRandom.current().nextDouble(2.0) - 1);
- long result = (long) (nextDelayMillis * (1 + jitterRatio));
- // Advance current by the non-jittered result.
- nextDelayMillis = (long) (nextDelayMillis * multiplier);
- if (nextDelayMillis > maxMillis) {
- nextDelayMillis = maxMillis;
- }
- return result;
- }
-
- @Override
- public int getRetryAttempts() {
- return attempts;
- }
- };
- }
+ void recordFailure();
+
+ /**
+ * Called after an execution succeeded.
+ */
+ void recordSuccess();
}
- public static final Predicate<Status> DEFAULT_IS_RETRIABLE =
- st -> {
- switch (st.getCode()) {
- case CANCELLED:
- return !Thread.currentThread().isInterrupted();
- case UNKNOWN:
- case DEADLINE_EXCEEDED:
- case ABORTED:
- case INTERNAL:
- case UNAVAILABLE:
- case UNAUTHENTICATED:
- case RESOURCE_EXHAUSTED:
- return true;
- default:
- return false;
- }
- };
+ public interface Sleeper {
+ void sleep(long millis) throws InterruptedException;
+ }
- public static final Predicate<Status> RETRY_ALL = Predicates.alwaysTrue();
- public static final Predicate<Status> RETRY_NONE = Predicates.alwaysFalse();
- public static final Retrier NO_RETRIES = new Retrier(Backoff.NO_RETRIES, RETRY_NONE);
+ public static class RetryException extends IOException {
- private final Supplier<Backoff> backoffSupplier;
- private final Predicate<Status> isRetriable;
+ private final int attempts;
- @VisibleForTesting
- Retrier(Supplier<Backoff> backoffSupplier, Predicate<Status> isRetriable) {
- this.backoffSupplier = backoffSupplier;
- this.isRetriable = isRetriable;
+ public RetryException(String message, int numRetries, Exception cause) {
+ super(message, cause);
+ this.attempts = numRetries + 1;
+ }
+
+ protected RetryException(String message) {
+ super(message);
+ this.attempts = 0;
+ }
+
+ /**
+ * Returns the number of times a {@link Callable} has been executed before this exception
+ * was thrown.
+ */
+ public int getAttempts() {
+ return attempts;
+ }
}
- public Retrier(RemoteOptions options) {
- this(
- options.experimentalRemoteRetry
- ? Backoff.exponential(
- Duration.ofMillis(options.experimentalRemoteRetryStartDelayMillis),
- Duration.ofMillis(options.experimentalRemoteRetryMaxDelayMillis),
- options.experimentalRemoteRetryMultiplier,
- options.experimentalRemoteRetryJitter,
- options.experimentalRemoteRetryMaxAttempts)
- : Backoff.NO_RETRIES,
- DEFAULT_IS_RETRIABLE);
+ public static class CircuitBreakerException extends RetryException {
+
+ private CircuitBreakerException(String message, int numRetries, Exception cause) {
+ super(message, numRetries, cause);
+ }
+
+ private CircuitBreakerException() {
+ super("Call not executed due to a high failure rate.");
+ }
}
- /**
- * Returns {@code true} if the {@link Status} is retriable.
- */
- public boolean isRetriable(Status s) {
- return isRetriable.apply(s);
+ public static final CircuitBreaker ALLOW_ALL_CALLS = new CircuitBreaker() {
+ @Override
+ public State state() {
+ return State.ACCEPT_CALLS;
+ }
+
+ @Override
+ public void recordFailure() {
+ }
+
+ @Override
+ public void recordSuccess() {
+ }
+ };
+
+ public static final Backoff RETRIES_DISABLED = new Backoff() {
+ @Override
+ public long nextDelayMillis() {
+ return -1;
+ }
+
+ @Override
+ public int getRetryAttempts() {
+ return 0;
+ }
+ };
+
+ private final Supplier<Backoff> backoffSupplier;
+ private final Predicate<? super Exception> shouldRetry;
+ private final CircuitBreaker circuitBreaker;
+ private final Sleeper sleeper;
+
+ public Retrier(Supplier<Backoff> backoffSupplier, Predicate<? super Exception> shouldRetry,
+ CircuitBreaker circuitBreaker) {
+ this(backoffSupplier, shouldRetry, circuitBreaker, TimeUnit.MILLISECONDS::sleep);
+ }
+
+ @VisibleForTesting
+ Retrier(Supplier<Backoff> backoffSupplier, Predicate<? super Exception> shouldRetry,
+ CircuitBreaker circuitBreaker, Sleeper sleeper) {
+ this.backoffSupplier = backoffSupplier;
+ this.shouldRetry = shouldRetry;
+ this.circuitBreaker = circuitBreaker;
+ this.sleeper = sleeper;
}
/**
- * Executes the given callable in a loop, retrying on retryable errors, as defined by the current
- * backoff/retry policy. Will raise the last encountered retriable error, or the first
- * non-retriable error.
+ * Execute a {@link Callable}, retrying execution in case of failure and returning the result in
+ * case of success.
*
- * <p>This method never throws {@link StatusRuntimeException} even if the passed-in Callable does.
+ * <p>{@link InterruptedException} is not retried.
*
- * @param c The callable to execute.
+ * @param call the {@link Callable} to execute.
+ * @throws RetryException if the {@code call} didn't succeed within the framework specified by
+ * {@code backoffSupplier} and {@code shouldRetry}.
+ * @throws CircuitBreakerException in case a call was rejected because the circuit breaker
+ * tripped.
+ * @throws InterruptedException if the {@code call} throws an {@link InterruptedException} or the
+ * current thread's interrupted flag is set.
*/
- public <T> T execute(Callable<T> c) throws InterruptedException, IOException {
- Backoff backoff = backoffSupplier.get();
+ public <T> T execute(Callable<T> call) throws RetryException, InterruptedException {
+ final Backoff backoff = newBackoff();
while (true) {
+ final State circuitState;
+ circuitState = circuitBreaker.state();
+ if (State.REJECT_CALLS.equals(circuitState)) {
+ throw new CircuitBreakerException();
+ }
try {
- return c.call();
- } catch (PassThroughException e) {
- throw (StatusRuntimeException) e.getCause();
- } catch (RetryException e) {
- throw e; // Nested retries are always pass-through.
- } catch (StatusException | StatusRuntimeException e) {
- Status st = Status.fromThrowable(e);
- int attempts = backoff.getRetryAttempts();
- long delay = backoff.nextDelayMillis();
- if (st.getCode() == Status.Code.CANCELLED && Thread.currentThread().isInterrupted()) {
- Thread.currentThread().interrupt();
+ if (Thread.interrupted()) {
throw new InterruptedException();
}
- if (delay < 0 || !isRetriable.apply(st)) {
- throw new RetryException(st.asRuntimeException(), attempts);
- }
- sleep(delay);
+ T r = call.call();
+ circuitBreaker.recordSuccess();
+ return r;
+ } catch (InterruptedException e) {
+ circuitBreaker.recordFailure();
+ throw e;
} catch (Exception e) {
- // Generic catch because Callable is declared to throw Exception, we rethrow any unchecked
- // exception as well as any exception we declared above.
- Throwables.throwIfUnchecked(e);
- Throwables.throwIfInstanceOf(e, IOException.class);
- Throwables.throwIfInstanceOf(e, InterruptedException.class);
- throw new RetryException(e, backoff.getRetryAttempts());
+ circuitBreaker.recordFailure();
+ if (e instanceof RetryException) {
+ // Support nested retry calls.
+ e = (Exception) e.getCause();
+ }
+ if (State.TRIAL_CALL.equals(circuitState)) {
+ throw new CircuitBreakerException("Call failed in circuit breaker half open state.", 0,
+ e);
+ }
+ int attempts = backoff.getRetryAttempts();
+ if (!shouldRetry.test(e)) {
+ throw new RetryException("Call failed with not retriable error.", attempts, e);
+ }
+ final long delayMillis = backoff.nextDelayMillis();
+ if (delayMillis < 0) {
+ throw new RetryException(
+ "Call failed after exhausting retry attempts: " + attempts, attempts, e);
+ }
+ sleeper.sleep(delayMillis);
}
}
}
- @VisibleForTesting
- void sleep(long timeMillis) throws InterruptedException {
- Preconditions.checkArgument(
- timeMillis >= 0L, "timeMillis must not be negative: %s", timeMillis);
- TimeUnit.MILLISECONDS.sleep(timeMillis);
- }
+ //TODO(buchgr): Add executeAsync to be used by ByteStreamUploader
+ // <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call, ScheduledExecutorService executor)
public Backoff newBackoff() {
return backoffSupplier.get();
}
+
+ public boolean isRetriable(Exception e) {
+ return shouldRetry.test(e);
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Retrier2.java b/src/main/java/com/google/devtools/build/lib/remote/Retrier2.java
deleted file mode 100644
index e9a938b658..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/Retrier2.java
+++ /dev/null
@@ -1,260 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.lib.remote;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.devtools.build.lib.remote.Retrier2.CircuitBreaker.State;
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * Supports retrying the execution of a {@link Callable} in case of failure.
- *
- * <p>The errors that are retried are configurable via a {@link Predicate<? super Exception>}. The
- * delay between executions is specified by a {@link Backoff}. Additionally, the retrier supports
- * circuit breaking to stop execution in case of high failure rates.
- */
-// TODO(buchgr): Move to a different package and use it for BES code.
-@ThreadSafe
-class Retrier2 {
-
- /**
- * A backoff strategy.
- */
- public interface Backoff {
-
- /**
- * Returns the next delay in milliseconds, or a value less than {@code 0} if we should stop
- * retrying.
- */
- long nextDelayMillis();
-
- /**
- * Returns the number of calls to {@link #nextDelayMillis()} thus far, not counting any calls
- * that returned less than {@code 0}.
- */
- int getRetryAttempts();
- }
-
- /**
- * The circuit breaker allows to reject execution when failure rates are high.
- *
- * <p>The initial state of a circuit breaker is the {@link State#ACCEPT_CALLS}. Calls are
- * executed and retried in this state. However, if error rates are high a circuit breaker can
- * choose to transition into {@link State#REJECT_CALLS}. In this state any calls are rejected with
- * a {@link RetryException2} immediately. A circuit breaker in state {@link State#REJECT_CALLS}
- * can periodically return a {@code TRIAL_CALL} state, in which case a call will be executed once
- * and in case of success the circuit breaker may return to state {@code ACCEPT_CALLS}.
- *
- * <p>A circuit breaker implementation must be thread-safe.
- *
- * @see <a href = "https://martinfowler.com/bliki/CircuitBreaker.html">CircuitBreaker</a>
- */
- public interface CircuitBreaker {
-
- enum State {
- /**
- * Calls are executed and retried in case of failure.
- *
- * <p>The circuit breaker can transition into state {@link State#REJECT_CALLS}.
- */
- ACCEPT_CALLS,
-
- /**
- * A call is executed and not retried in case of failure.
- *
- * <p>The circuit breaker can transition into any state.
- */
- TRIAL_CALL,
-
- /**
- * All calls are rejected.
- *
- * <p>The circuit breaker can transition into state {@link State#TRIAL_CALL}.
- */
- REJECT_CALLS
- }
-
- /**
- * Returns the current {@link State} of the circuit breaker.
- */
- State state();
-
- /**
- * Called after an execution failed.
- */
- void recordFailure();
-
- /**
- * Called after an execution succeeded.
- */
- void recordSuccess();
- }
-
- public interface Sleeper {
- void sleep(long millis) throws InterruptedException;
- }
-
- public static class RetryException2 extends IOException {
-
- private final int attempts;
-
- public RetryException2(String message, int numRetries, Exception cause) {
- super(message, cause);
- this.attempts = numRetries + 1;
- }
-
- protected RetryException2(String message) {
- super(message);
- this.attempts = 0;
- }
-
- /**
- * Returns the number of times a {@link Callable} has been executed before this exception
- * was thrown.
- */
- public int getAttempts() {
- return attempts;
- }
- }
-
- public static class CircuitBreakerException extends RetryException2 {
-
- private CircuitBreakerException(String message, int numRetries, Exception cause) {
- super(message, numRetries, cause);
- }
-
- private CircuitBreakerException() {
- super("Call not executed due to a high failure rate.");
- }
- }
-
- public static final CircuitBreaker ALLOW_ALL_CALLS = new CircuitBreaker() {
- @Override
- public State state() {
- return State.ACCEPT_CALLS;
- }
-
- @Override
- public void recordFailure() {
- }
-
- @Override
- public void recordSuccess() {
- }
- };
-
- public static final Backoff RETRIES_DISABLED = new Backoff() {
- @Override
- public long nextDelayMillis() {
- return -1;
- }
-
- @Override
- public int getRetryAttempts() {
- return 0;
- }
- };
-
- private final Supplier<Backoff> backoffSupplier;
- private final Predicate<? super Exception> shouldRetry;
- private final CircuitBreaker circuitBreaker;
- private final Sleeper sleeper;
-
- public Retrier2 (Supplier<Backoff> backoffSupplier, Predicate<? super Exception> shouldRetry,
- CircuitBreaker circuitBreaker) {
- this(backoffSupplier, shouldRetry, circuitBreaker, TimeUnit.MILLISECONDS::sleep);
- }
-
- @VisibleForTesting
- Retrier2 (Supplier<Backoff> backoffSupplier, Predicate<? super Exception> shouldRetry,
- CircuitBreaker circuitBreaker, Sleeper sleeper) {
- this.backoffSupplier = backoffSupplier;
- this.shouldRetry = shouldRetry;
- this.circuitBreaker = circuitBreaker;
- this.sleeper = sleeper;
- }
-
- /**
- * Execute a {@link Callable}, retrying execution in case of failure and returning the result in
- * case of success.
- *
- * <p>{@link InterruptedException} is not retried.
- *
- * @param call the {@link Callable} to execute.
- * @throws RetryException2 if the {@code call} didn't succeed within the framework specified by
- * {@code backoffSupplier} and {@code shouldRetry}.
- * @throws CircuitBreakerException in case a call was rejected because the circuit breaker
- * tripped.
- * @throws InterruptedException if the {@code call} throws an {@link InterruptedException} or the
- * current thread's interrupted flag is set.
- */
- public <T> T execute(Callable<T> call) throws RetryException2, InterruptedException {
- final Backoff backoff = newBackoff();
- while (true) {
- final State circuitState;
- circuitState = circuitBreaker.state();
- if (State.REJECT_CALLS.equals(circuitState)) {
- throw new CircuitBreakerException();
- }
- try {
- if (Thread.interrupted()) {
- throw new InterruptedException();
- }
- T r = call.call();
- circuitBreaker.recordSuccess();
- return r;
- } catch (InterruptedException e) {
- circuitBreaker.recordFailure();
- throw e;
- } catch (Exception e) {
- circuitBreaker.recordFailure();
- if (e instanceof RetryException2) {
- // Support nested retry calls.
- e = (Exception) e.getCause();
- }
- if (State.TRIAL_CALL.equals(circuitState)) {
- throw new CircuitBreakerException("Call failed in circuit breaker half open state.", 0,
- e);
- }
- int attempts = backoff.getRetryAttempts();
- if (!shouldRetry.test(e)) {
- throw new RetryException2("Call failed with not retriable error.", attempts, e);
- }
- final long delayMillis = backoff.nextDelayMillis();
- if (delayMillis < 0) {
- throw new RetryException2(
- "Call failed after exhausting retry attempts: " + attempts, attempts, e);
- }
- sleeper.sleep(delayMillis);
- }
- }
- }
-
- //TODO(buchgr): Add executeAsync to be used by ByteStreamUploader
- // <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call, ScheduledExecutorService executor)
-
- public Backoff newBackoff() {
- return backoffSupplier.get();
- }
-
- public boolean isRetriable(Exception e) {
- return shouldRetry.test(e);
- }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RetryException.java b/src/main/java/com/google/devtools/build/lib/remote/RetryException.java
deleted file mode 100644
index 49fa6abf3a..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/RetryException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.lib.remote;
-
-import io.grpc.Status.Code;
-import io.grpc.StatusException;
-import io.grpc.StatusRuntimeException;
-import java.io.IOException;
-
-/** An exception to indicate failed retry attempts. */
-public final class RetryException extends IOException {
- private final int attempts;
-
- RetryException(Throwable cause, int retryAttempts) {
- super(String.format("after %d attempts: %s", retryAttempts + 1, cause), cause);
- this.attempts = retryAttempts + 1;
- }
-
- public int getAttempts() {
- return attempts;
- }
-
- public boolean causedByStatusCode(Code code) {
- if (getCause() instanceof StatusRuntimeException) {
- return ((StatusRuntimeException) getCause()).getStatus().getCode() == code;
- } else if (getCause() instanceof StatusException) {
- return ((StatusException) getCause()).getStatus().getCode() == code;
- }
- return false;
- }
-}