aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools
diff options
context:
space:
mode:
authorGravatar olaola <olaola@google.com>2017-06-30 05:07:13 +0200
committerGravatar Marcel Hlopko <hlopko@google.com>2017-06-30 13:01:12 +0200
commitfdd66ef1b1dbee1663676cdf4b36ddbe139a35bf (patch)
treef4abdcf4adc85e1ecb56167bba8eade94cbcd376 /src/main/java/com/google/devtools
parent5f00cd2b1cde682f77a47439e9dc631349992f9e (diff)
Implement retry logic for the gRPC calls in remote execution and caching. The
retry strategy may need tuning. Other behavior changes: swallowing gRPC CANCELLED errors when the thread is interrupted, as these are expected and just make debugging difficult. Also, distinguishing between the gRPC DEADLINE_EXCEEDED caused by the actual command timing out on the server vs. other causes (the former should not be retriable, while the latter should retry). TESTED=unit tests, remote worker on Bazel PiperOrigin-RevId: 160605830
Diffstat (limited to 'src/main/java/com/google/devtools')
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java348
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java132
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java5
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java48
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java12
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/Retrier.java338
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RetryException.java43
7 files changed, 701 insertions, 225 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
index 7f0a04812f..23899b5549 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
@@ -21,6 +21,7 @@ import com.google.bytestream.ByteStreamProto.ReadRequest;
import com.google.bytestream.ByteStreamProto.ReadResponse;
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
@@ -53,6 +54,7 @@ import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.Status;
+import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
@@ -60,7 +62,11 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -72,66 +78,54 @@ public class GrpcActionCache implements RemoteActionCache {
private final RemoteOptions options;
private final ChannelOptions channelOptions;
private final Channel channel;
+ private final Retrier retrier;
+ // All gRPC stubs are reused.
+ private final Supplier<ContentAddressableStorageBlockingStub> casBlockingStub;
+ private final Supplier<ByteStreamBlockingStub> bsBlockingStub;
+ private final Supplier<ByteStreamStub> bsStub;
+ private final Supplier<ActionCacheBlockingStub> acBlockingStub;
+ @VisibleForTesting
public GrpcActionCache(Channel channel, ChannelOptions channelOptions, RemoteOptions options) {
this.options = options;
this.channelOptions = channelOptions;
this.channel = channel;
+ this.retrier = new Retrier(options);
+ casBlockingStub =
+ Suppliers.memoize(
+ () ->
+ ContentAddressableStorageGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS));
+ bsBlockingStub =
+ Suppliers.memoize(
+ () ->
+ ByteStreamGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS));
+ bsStub =
+ Suppliers.memoize(
+ () ->
+ ByteStreamGrpc.newStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS));
+ acBlockingStub =
+ Suppliers.memoize(
+ () ->
+ ActionCacheGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS));
}
@Override
public void close() {}
- // All gRPC stubs are reused.
- private final Supplier<ContentAddressableStorageBlockingStub> casBlockingStub =
- Suppliers.memoize(
- new Supplier<ContentAddressableStorageBlockingStub>() {
- @Override
- public ContentAddressableStorageBlockingStub get() {
- return ContentAddressableStorageGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
- }
- });
-
- private final Supplier<ByteStreamBlockingStub> bsBlockingStub =
- Suppliers.memoize(
- new Supplier<ByteStreamBlockingStub>() {
- @Override
- public ByteStreamBlockingStub get() {
- return ByteStreamGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
- }
- });
-
- private final Supplier<ByteStreamStub> bsStub =
- Suppliers.memoize(
- new Supplier<ByteStreamStub>() {
- @Override
- public ByteStreamStub get() {
- return ByteStreamGrpc.newStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
- }
- });
-
- private final Supplier<ActionCacheBlockingStub> acBlockingStub =
- Suppliers.memoize(
- new Supplier<ActionCacheBlockingStub>() {
- @Override
- public ActionCacheBlockingStub get() {
- return ActionCacheGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
- }
- });
-
public static boolean isRemoteCacheOptions(RemoteOptions options) {
return options.remoteCache != null;
}
- private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests) {
+ private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests)
+ throws IOException, InterruptedException {
FindMissingBlobsRequest.Builder request =
FindMissingBlobsRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
@@ -139,7 +133,8 @@ public class GrpcActionCache implements RemoteActionCache {
if (request.getBlobDigestsCount() == 0) {
return ImmutableSet.of();
}
- FindMissingBlobsResponse response = casBlockingStub.get().findMissingBlobs(request.build());
+ FindMissingBlobsResponse response =
+ retrier.execute(() -> casBlockingStub.get().findMissingBlobs(request.build()));
return ImmutableSet.copyOf(response.getMissingBlobDigestsList());
}
@@ -150,7 +145,7 @@ public class GrpcActionCache implements RemoteActionCache {
@Override
public void ensureInputsPresent(
TreeNodeRepository repository, Path execRoot, TreeNode root, Command command)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
// TODO(olaola): avoid querying all the digests, only ask for novel subtrees.
ImmutableSet<Digest> missingDigests = getMissingDigests(repository.getAllDigests(root));
@@ -165,20 +160,23 @@ public class GrpcActionCache implements RemoteActionCache {
BatchUpdateBlobsRequest.Builder treeBlobRequest =
BatchUpdateBlobsRequest.newBuilder().setInstanceName(options.remoteInstanceName);
for (Directory d : treeNodes) {
- final byte[] data = d.toByteArray();
+ byte[] data = d.toByteArray();
treeBlobRequest
.addRequestsBuilder()
.setContentDigest(Digests.computeDigest(data))
.setData(ByteString.copyFrom(data));
}
- BatchUpdateBlobsResponse response =
- casBlockingStub.get().batchUpdateBlobs(treeBlobRequest.build());
- // TODO(olaola): handle retries on transient errors.
- for (BatchUpdateBlobsResponse.Response r : response.getResponsesList()) {
- if (!Status.fromCodeValue(r.getStatus().getCode()).isOk()) {
- throw StatusProto.toStatusRuntimeException(r.getStatus());
- }
- }
+ retrier.execute(
+ () -> {
+ BatchUpdateBlobsResponse response =
+ casBlockingStub.get().batchUpdateBlobs(treeBlobRequest.build());
+ for (BatchUpdateBlobsResponse.Response r : response.getResponsesList()) {
+ if (!Status.fromCodeValue(r.getStatus().getCode()).isOk()) {
+ throw StatusProto.toStatusRuntimeException(r.getStatus());
+ }
+ }
+ return null;
+ });
}
uploadBlob(command.toByteArray());
if (!actionInputs.isEmpty()) {
@@ -204,7 +202,7 @@ public class GrpcActionCache implements RemoteActionCache {
*/
@Override
public void download(ActionResult result, Path execRoot, FileOutErr outErr)
- throws IOException, CacheNotFoundException {
+ throws IOException, InterruptedException, CacheNotFoundException {
for (OutputFile file : result.getOutputFilesList()) {
Path path = execRoot.getRelative(file.getPath());
FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
@@ -213,15 +211,21 @@ public class GrpcActionCache implements RemoteActionCache {
// Handle empty file locally.
FileSystemUtils.writeContent(path, new byte[0]);
} else {
- try (OutputStream stream = path.getOutputStream()) {
- if (!file.getContent().isEmpty()) {
+ if (!file.getContent().isEmpty()) {
+ try (OutputStream stream = path.getOutputStream()) {
file.getContent().writeTo(stream);
- } else {
- Iterator<ReadResponse> replies = readBlob(digest);
- while (replies.hasNext()) {
- replies.next().getData().writeTo(stream);
- }
}
+ } else {
+ retrier.execute(
+ () -> {
+ try (OutputStream stream = path.getOutputStream()) {
+ Iterator<ReadResponse> replies = readBlob(digest);
+ while (replies.hasNext()) {
+ replies.next().getData().writeTo(stream);
+ }
+ return null;
+ }
+ });
}
}
path.setExecutable(file.getIsExecutable());
@@ -234,7 +238,7 @@ public class GrpcActionCache implements RemoteActionCache {
}
private void downloadOutErr(ActionResult result, FileOutErr outErr)
- throws IOException, CacheNotFoundException {
+ throws IOException, InterruptedException, CacheNotFoundException {
if (!result.getStdoutRaw().isEmpty()) {
result.getStdoutRaw().writeTo(outErr.getOutputStream());
outErr.getOutputStream().flush();
@@ -272,20 +276,21 @@ public class GrpcActionCache implements RemoteActionCache {
}
@Override
- public void upload(
- ActionKey actionKey, Path execRoot, Collection<Path> files, FileOutErr outErr)
- throws IOException, InterruptedException {
+ public void upload(ActionKey actionKey, Path execRoot, Collection<Path> files, FileOutErr outErr)
+ throws IOException, InterruptedException {
ActionResult.Builder result = ActionResult.newBuilder();
upload(execRoot, files, outErr, result);
try {
- acBlockingStub
- .get()
- .updateActionResult(
- UpdateActionResultRequest.newBuilder()
- .setInstanceName(options.remoteInstanceName)
- .setActionDigest(actionKey.getDigest())
- .setActionResult(result)
- .build());
+ retrier.execute(
+ () ->
+ acBlockingStub
+ .get()
+ .updateActionResult(
+ UpdateActionResultRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
+ .setActionDigest(actionKey.getDigest())
+ .setActionResult(result)
+ .build()));
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() != Status.Code.UNIMPLEMENTED) {
throw e;
@@ -294,7 +299,7 @@ public class GrpcActionCache implements RemoteActionCache {
}
void upload(Path execRoot, Collection<Path> files, FileOutErr outErr, ActionResult.Builder result)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
ArrayList<Digest> digests = new ArrayList<>();
Chunker.Builder b = new Chunker.Builder();
for (Path file : files) {
@@ -324,7 +329,7 @@ public class GrpcActionCache implements RemoteActionCache {
.setDigest(digests.get(index++))
.setIsExecutable(file.isExecutable());
}
- // TODO(ulfjack): Use the Chunker also for stdout / stderr.
+ // TODO(olaola): inline small stdout/stderr here.
if (outErr.getErrorPath().exists()) {
Digest stderr = uploadFileContents(outErr.getErrorPath());
result.setStderrDigest(stderr);
@@ -356,8 +361,7 @@ public class GrpcActionCache implements RemoteActionCache {
*
* @return The key for fetching the file contents blob from cache.
*/
- Digest uploadFileContents(
- ActionInput input, Path execRoot, ActionInputFileCache inputCache)
+ Digest uploadFileContents(ActionInput input, Path execRoot, ActionInputFileCache inputCache)
throws IOException, InterruptedException {
Digest digest = Digests.getDigestFromInputCache(input, inputCache);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
@@ -369,80 +373,98 @@ public class GrpcActionCache implements RemoteActionCache {
private void uploadChunks(int numItems, Chunker.Builder chunkerBuilder)
throws InterruptedException, IOException {
- final CountDownLatch finishLatch = new CountDownLatch(numItems);
- final AtomicReference<RuntimeException> exception = new AtomicReference<>(null);
- StreamObserver<WriteRequest> requestObserver = null;
String resourceName = "";
if (!options.remoteInstanceName.isEmpty()) {
resourceName += options.remoteInstanceName + "/";
}
+ Retrier.Backoff backoff = retrier.newBackoff();
Chunker chunker = chunkerBuilder.build();
- while (chunker.hasNext()) {
- Chunker.Chunk chunk = chunker.next();
- final Digest digest = chunk.getDigest();
- long offset = chunk.getOffset();
- WriteRequest.Builder request = WriteRequest.newBuilder();
- if (offset == 0) { // Beginning of new upload.
- numItems--;
- request.setResourceName(
- resourceName
- + "uploads/"
- + UUID.randomUUID()
- + "/blobs/"
- + digest.getHash()
- + "/"
- + digest.getSizeBytes());
- // The batches execute simultaneously.
- requestObserver =
- bsStub
- .get()
- .write(
- new StreamObserver<WriteResponse>() {
- private long bytesLeft = digest.getSizeBytes();
+ while (true) { // Retry until either uploaded everything or raised an exception.
+ CountDownLatch finishLatch = new CountDownLatch(numItems);
+ AtomicReference<IOException> crashException = new AtomicReference<>(null);
+ List<Status> errors = Collections.synchronizedList(new ArrayList<Status>());
+ Set<Digest> failedDigests = Collections.synchronizedSet(new HashSet<Digest>());
+ StreamObserver<WriteRequest> requestObserver = null;
+ while (chunker.hasNext()) {
+ Chunker.Chunk chunk = chunker.next();
+ Digest digest = chunk.getDigest();
+ long offset = chunk.getOffset();
+ WriteRequest.Builder request = WriteRequest.newBuilder();
+ if (offset == 0) { // Beginning of new upload.
+ numItems--;
+ request.setResourceName(
+ String.format(
+ "%s/uploads/%s/blobs/%s/%d",
+ resourceName, UUID.randomUUID(), digest.getHash(), digest.getSizeBytes()));
+ // The batches execute simultaneously.
+ requestObserver =
+ bsStub
+ .get()
+ .write(
+ new StreamObserver<WriteResponse>() {
+ private long bytesLeft = digest.getSizeBytes();
+
+ @Override
+ public void onNext(WriteResponse reply) {
+ bytesLeft -= reply.getCommittedSize();
+ }
- @Override
- public void onNext(WriteResponse reply) {
- bytesLeft -= reply.getCommittedSize();
- }
+ @Override
+ public void onError(Throwable t) {
+ // In theory, this can be any error, even though it's supposed to usually
+ // be only StatusException or StatusRuntimeException. We have to check
+ // for other errors, in order to not accidentally retry them!
+ if (!(t instanceof StatusRuntimeException
+ || t instanceof StatusException)) {
+ crashException.compareAndSet(null, new IOException(t));
+ }
- @Override
- public void onError(Throwable t) {
- exception.compareAndSet(
- null, new StatusRuntimeException(Status.fromThrowable(t)));
- finishLatch.countDown();
- }
+ failedDigests.add(digest);
+ errors.add(Status.fromThrowable(t));
+ finishLatch.countDown();
+ }
- @Override
- public void onCompleted() {
- if (bytesLeft != 0) {
- exception.compareAndSet(
- null, new RuntimeException("Server did not commit all data."));
+ @Override
+ public void onCompleted() {
+ // This can actually happen even if we did not send all the bytes,
+ // if the server has and is able to reuse parts of the uploaded blob.
+ finishLatch.countDown();
}
- finishLatch.countDown();
- }
- });
+ });
+ }
+ byte[] data = chunk.getData();
+ boolean finishWrite = offset + data.length == digest.getSizeBytes();
+ request
+ .setData(ByteString.copyFrom(data))
+ .setWriteOffset(offset)
+ .setFinishWrite(finishWrite);
+ requestObserver.onNext(request.build());
+ if (finishWrite) {
+ requestObserver.onCompleted();
+ }
+ if (finishLatch.getCount() <= numItems) {
+ // Current RPC errored before we finished sending.
+ if (!finishWrite) {
+ chunker.advanceInput();
+ }
+ }
}
- byte[] data = chunk.getData();
- boolean finishWrite = offset + data.length == digest.getSizeBytes();
- request.setData(ByteString.copyFrom(data)).setWriteOffset(offset).setFinishWrite(finishWrite);
- requestObserver.onNext(request.build());
- if (finishWrite) {
- requestObserver.onCompleted();
+ finishLatch.await(options.remoteTimeout, TimeUnit.SECONDS);
+ if (crashException.get() != null) {
+ throw crashException.get(); // Re-throw the exception that is supposed to never happen.
}
- if (finishLatch.getCount() <= numItems) {
- // Current RPC errored before we finished sending.
- if (!finishWrite) {
- chunker.advanceInput();
- }
+ if (failedDigests.isEmpty()) {
+ return; // Successfully sent everything.
}
- }
- finishLatch.await(options.remoteTimeout, TimeUnit.SECONDS);
- if (exception.get() != null) {
- throw exception.get(); // Re-throw the first encountered exception.
+ retrier.onFailures(backoff, errors); // This will throw when out of retries.
+ // We don't have to synchronize on failedDigests now, because after finishLatch.await we're
+ // back to single threaded execution.
+ chunker = chunkerBuilder.onlyUseDigests(failedDigests).build();
+ numItems = failedDigests.size();
}
}
- Digest uploadBlob(byte[] blob) throws InterruptedException {
+ Digest uploadBlob(byte[] blob) throws IOException, InterruptedException {
Digest digest = Digests.computeDigest(blob);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
try {
@@ -456,19 +478,24 @@ public class GrpcActionCache implements RemoteActionCache {
}
}
- byte[] downloadBlob(Digest digest) throws CacheNotFoundException {
+ byte[] downloadBlob(Digest digest)
+ throws IOException, InterruptedException, CacheNotFoundException {
if (digest.getSizeBytes() == 0) {
return new byte[0];
}
- Iterator<ReadResponse> replies = readBlob(digest);
byte[] result = new byte[(int) digest.getSizeBytes()];
- int offset = 0;
- while (replies.hasNext()) {
- ByteString data = replies.next().getData();
- data.copyTo(result, offset);
- offset += data.size();
- }
- Preconditions.checkState(digest.getSizeBytes() == offset);
+ retrier.execute(
+ () -> {
+ Iterator<ReadResponse> replies = readBlob(digest);
+ int offset = 0;
+ while (replies.hasNext()) {
+ ByteString data = replies.next().getData();
+ data.copyTo(result, offset);
+ offset += data.size();
+ }
+ Preconditions.checkState(digest.getSizeBytes() == offset);
+ return null;
+ });
return result;
}
@@ -476,17 +503,20 @@ public class GrpcActionCache implements RemoteActionCache {
/** Returns a cached result for a given Action digest, or null if not found in cache. */
@Override
- public ActionResult getCachedActionResult(ActionKey actionKey) {
+ public ActionResult getCachedActionResult(ActionKey actionKey)
+ throws IOException, InterruptedException {
try {
- return acBlockingStub
- .get()
- .getActionResult(
- GetActionResultRequest.newBuilder()
- .setInstanceName(options.remoteInstanceName)
- .setActionDigest(actionKey.getDigest())
- .build());
- } catch (StatusRuntimeException e) {
- if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
+ return retrier.execute(
+ () ->
+ acBlockingStub
+ .get()
+ .getActionResult(
+ GetActionResultRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
+ .setActionDigest(actionKey.getDigest())
+ .build()));
+ } catch (RetryException e) {
+ if (e.causedByStatusCode(Status.Code.NOT_FOUND)) {
return null;
}
throw e;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
index 815b9237d0..51bc0b6fa7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
@@ -16,6 +16,7 @@ package com.google.devtools.build.lib.remote;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
+import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
@@ -24,7 +25,6 @@ import com.google.devtools.remoteexecution.v1test.ExecutionGrpc;
import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionBlockingStub;
import com.google.longrunning.Operation;
import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.util.Durations;
import com.google.rpc.Status;
import com.google.watcher.v1.Change;
import com.google.watcher.v1.ChangeBatch;
@@ -32,7 +32,10 @@ import com.google.watcher.v1.Request;
import com.google.watcher.v1.WatcherGrpc;
import com.google.watcher.v1.WatcherGrpc.WatcherBlockingStub;
import io.grpc.Channel;
+import io.grpc.Status.Code;
+import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
+import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -43,18 +46,11 @@ public class GrpcRemoteExecutor {
private final RemoteOptions options;
private final ChannelOptions channelOptions;
private final Channel channel;
+ private final Retrier retrier;
- // Reuse the gRPC stub.
- private final Supplier<ExecutionBlockingStub> execBlockingStub =
- Suppliers.memoize(
- new Supplier<ExecutionBlockingStub>() {
- @Override
- public ExecutionBlockingStub get() {
- return ExecutionGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
- }
- });
+ // Reuse the gRPC stubs.
+ private final Supplier<ExecutionBlockingStub> execBlockingStub;
+ private final Supplier<WatcherBlockingStub> watcherBlockingStub;
public static boolean isRemoteExecutionOptions(RemoteOptions options) {
return options.remoteExecutor != null;
@@ -64,68 +60,94 @@ public class GrpcRemoteExecutor {
this.options = options;
this.channelOptions = channelOptions;
this.channel = channel;
+ this.retrier = new Retrier(options);
+ execBlockingStub =
+ Suppliers.memoize(
+ () ->
+ ExecutionGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS));
+ // Do not set a deadline on this call, because it is hard to estimate in
+ // advance how much time we should give the server to execute an action
+ // remotely (scheduling, queing, optional retries, etc.)
+ // It is the server's responsibility to respect the Action timeout field.
+ watcherBlockingStub =
+ Suppliers.memoize(
+ () ->
+ WatcherGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials()));
}
- private @Nullable ExecuteResponse getOperationResponse(Operation op) {
+ private @Nullable ExecuteResponse getOperationResponse(Operation op)
+ throws IOException, UserExecException {
if (op.getResultCase() == Operation.ResultCase.ERROR) {
- throw StatusProto.toStatusRuntimeException(op.getError());
+ StatusRuntimeException e = StatusProto.toStatusRuntimeException(op.getError());
+ if (e.getStatus().getCode() == Code.DEADLINE_EXCEEDED) {
+ // This was caused by the command itself exceeding the timeout,
+ // therefore it is not retriable.
+ // TODO(olaola): this should propagate a timeout SpawnResult instead of raising.
+ throw new UserExecException("Remote execution time out", true);
+ }
+ throw e;
}
if (op.getDone()) {
Preconditions.checkState(op.getResultCase() != Operation.ResultCase.RESULT_NOT_SET);
try {
return op.getResponse().unpack(ExecuteResponse.class);
} catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
+ throw new IOException(e);
}
}
return null;
}
- public ExecuteResponse executeRemotely(ExecuteRequest request) {
- Operation op = execBlockingStub.get().execute(request);
+ public ExecuteResponse executeRemotely(ExecuteRequest request)
+ throws InterruptedException, IOException, UserExecException {
+ Operation op = retrier.execute(() -> execBlockingStub.get().execute(request));
ExecuteResponse resp = getOperationResponse(op);
if (resp != null) {
return resp;
}
- int actionSeconds = (int) Durations.toSeconds(request.getAction().getTimeout());
- WatcherBlockingStub stub =
- WatcherGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(options.remoteTimeout + actionSeconds, TimeUnit.SECONDS);
Request wr = Request.newBuilder().setTarget(op.getName()).build();
- Iterator<ChangeBatch> replies = stub.watch(wr);
- while (replies.hasNext()) {
- ChangeBatch cb = replies.next();
- for (Change ch : cb.getChangesList()) {
- switch (ch.getState()) {
- case INITIAL_STATE_SKIPPED:
- continue;
- case ERROR:
- try {
- throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class));
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
- }
- case DOES_NOT_EXIST:
- throw new RuntimeException(
- String.format("Operation %s lost on the remote server.", op.getName()));
- case EXISTS:
- try {
- op = ch.getData().unpack(Operation.class);
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
- }
- resp = getOperationResponse(op);
- if (resp != null) {
- return resp;
+ return retrier.execute(
+ () -> {
+ Iterator<ChangeBatch> replies = watcherBlockingStub.get().watch(wr);
+ while (replies.hasNext()) {
+ ChangeBatch cb = replies.next();
+ for (Change ch : cb.getChangesList()) {
+ switch (ch.getState()) {
+ case INITIAL_STATE_SKIPPED:
+ continue;
+ case ERROR:
+ try {
+ throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class));
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ case DOES_NOT_EXIST:
+ // TODO(olaola): either make this retriable, or use a different exception.
+ throw new IOException(
+ String.format("Operation %s lost on the remote server.", op.getName()));
+ case EXISTS:
+ Operation o;
+ try {
+ o = ch.getData().unpack(Operation.class);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ ExecuteResponse r = getOperationResponse(o);
+ if (r != null) {
+ return r;
+ }
+ continue;
+ default:
+ // This can only happen if the enum gets unexpectedly extended.
+ throw new IOException(String.format("Illegal change state: %s", ch.getState()));
+ }
}
- continue;
- default:
- throw new RuntimeException(String.format("Illegal change state: %s", ch.getState()));
- }
- }
- }
- throw new RuntimeException(
- String.format("Watch request for %s terminated with no result.", op.getName()));
+ }
+ throw new IOException(
+ String.format("Watch request for %s terminated with no result.", op.getName()));
+ });
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
index 994c23e1e6..f8c53268c5 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
@@ -58,15 +58,14 @@ interface RemoteActionCache {
*/
// TODO(olaola): will need to amend to include the TreeNodeRepository for updating.
void download(ActionResult result, Path execRoot, FileOutErr outErr)
- throws IOException, CacheNotFoundException;
-
+ throws IOException, InterruptedException, CacheNotFoundException;
/**
* Attempts to look up the given action in the remote cache and return its result, if present.
* Returns {@code null} if there is no such entry. Note that a successful result from this method
* does not guarantee the availability of the corresponding output files in the remote cache.
*/
@Nullable
- ActionResult getCachedActionResult(ActionKey actionKey);
+ ActionResult getCachedActionResult(ActionKey actionKey) throws IOException, InterruptedException;
/**
* Upload the result of a locally executed action to the cache by uploading any necessary files,
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
index 07aaff11f7..d472adae09 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
@@ -154,4 +154,52 @@ public final class RemoteOptions extends OptionsBase {
help = "Value to pass as instance_name in the remote execution API."
)
public String remoteInstanceName;
+
+ @Option(
+ name = "experimental_remote_retry",
+ defaultValue = "true",
+ category = "remote",
+ help = "Whether to retry transient remote execution/cache errors."
+ )
+ public boolean experimentalRemoteRetry;
+
+ @Option(
+ name = "experimental_remote_retry_start_delay_millis",
+ defaultValue = "100",
+ category = "remote",
+ help = "The initial delay before retrying a transient error."
+ )
+ public long experimentalRemoteRetryStartDelayMillis;
+
+ @Option(
+ name = "experimental_remote_retry_max_delay_millis",
+ defaultValue = "5000",
+ category = "remote",
+ help = "The maximum delay before retrying a transient error."
+ )
+ public long experimentalRemoteRetryMaxDelayMillis;
+
+ @Option(
+ name = "experimental_remote_retry_max_attempts",
+ defaultValue = "5",
+ category = "remote",
+ help = "The maximum number of attempts to retry a transient error."
+ )
+ public int experimentalRemoteRetryMaxAttempts;
+
+ @Option(
+ name = "experimental_remote_retry_multiplier",
+ defaultValue = "2",
+ category = "remote",
+ help = "The multiplier by which to increase the retry delay on transient errors."
+ )
+ public double experimentalRemoteRetryMultiplier;
+
+ @Option(
+ name = "experimental_remote_retry_jitter",
+ defaultValue = "0.1",
+ category = "remote",
+ help = "The random factor to apply to retry delays on transient errors."
+ )
+ public double experimentalRemoteRetryJitter;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
index c0952c0282..8426ae3864 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
@@ -297,13 +297,7 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
verboseFailures, spawn.getArguments(), spawn.getEnvironment(), cwd);
throw new UserExecException(message + ": Exit " + result.getExitCode());
}
- } catch (IOException e) {
- throw new UserExecException("Unexpected IO error.", e);
- } catch (InterruptedException e) {
- eventHandler.handle(Event.warn(mnemonic + " remote work interrupted (" + e + ")"));
- Thread.currentThread().interrupt();
- throw e;
- } catch (StatusRuntimeException e) {
+ } catch (RetryException e) {
String stackTrace = "";
if (verboseFailures) {
stackTrace = "\n" + Throwables.getStackTraceAsString(e);
@@ -312,7 +306,7 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
if (remoteOptions.remoteLocalFallback) {
execLocally(spawn, actionExecutionContext, remoteCache, actionKey);
} else {
- throw new UserExecException(e);
+ throw new UserExecException(e.getCause());
}
} catch (CacheNotFoundException e) {
// TODO(olaola): handle this exception by reuploading / reexecuting the action remotely.
@@ -322,6 +316,8 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
} else {
throw new UserExecException(e);
}
+ } catch (IOException e) {
+ throw new UserExecException("Unexpected IO error.", e);
} catch (UnsupportedOperationException e) {
eventHandler.handle(
Event.warn(mnemonic + " unsupported operation for action cache (" + e + ")"));
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
new file mode 100644
index 0000000000..981af46199
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
@@ -0,0 +1,338 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.remote;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.devtools.build.lib.util.Preconditions;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Supports execution with retries on particular gRPC Statuses. The retrier is ThreadSafe.
+ *
+ * <p>Example usage: The simple use-case is to call retrier.execute, e.g:
+ *
+ * <pre>
+ * foo = retrier.execute(
+ * new Callable<Foo>() {
+ * @Override
+ * public Foo call() {
+ * return grpcStub.getFoo(fooRequest);
+ * }
+ * });
+ * </pre>
+ *
+ * <p>When you need to retry multiple asynchronous calls, you can do:
+ *
+ * <pre>
+ * Retrier.Backoff backoff = retrier.newBackoff();
+ * List<Status> errors = Collections.synchronizedList(new ArrayList<Status>());
+ * while (true) {
+ * CountDownLatch finishLatch = new CountDownLatch(items.size());
+ * for (Item item : items) {
+ * requestObserver = myStub.asyncCall(
+ * request,
+ * new StreamObserver<Response>() {
+ * ...
+ *
+ * @Override
+ * public void onError(Throwable t) {
+ * // Need to handle non Status errors here!
+ * errors.add(Status.fromThrowable(t));
+ * finishLatch.countDown();
+ * }
+ * @Override
+ * public void onCompleted() {
+ * finishLatch.countDown();
+ * }
+ * });
+ * requestObserver.onNext(i1);
+ * requestObserver.onNext(i2);
+ * ...
+ * requestObserver.onCompleted();
+ * }
+ * finishLatch.await(someTime, TimeUnit.SECONDS);
+ * if (errors.isEmpty()) {
+ * return;
+ * }
+ * retrier.onFailures(backoff, errors); // Sleep once for the whole batch of failures.
+ * items = failingItems; // this needs to be collected from the observers as well.
+ * }
+ * </pre>
+ *
+ * <p>This retries the multiple calls in bulk. Another way to do it is retry each call separately as
+ * it occurs:
+ *
+ * <pre>
+ * class RetryingObserver extends StreamObserver<Response> {
+ * private final CountDownLatch finishLatch;
+ * private final Backoff backoff;
+ * private final AtomicReference<RuntimeException> exception;
+ *
+ * RetryingObserver(
+ * CountDownLatch finishLatch, Backoff backoff, AtomicReference<RuntimeException> exception) {
+ * this.finishLatch = finishLatch;
+ * this.backoff = backoff;
+ * this.exception = exception;
+ * }
+ *
+ * @Override
+ * public void onError(Throwable t) {
+ * // Need to handle non Status errors here first!
+ * try {
+ * retrier.onFailure(backoff, Status.fromThrowable(t));
+ *
+ * // This assumes you passed through the relevant info to recreate the original request:
+ * requestObserver = myStub.asyncCall(
+ * request,
+ * new RetryingObserver(finishLatch, backoff)); // Recursion!
+ * requestObserver.onNext(i1);
+ * requestObserver.onNext(i2);
+ * ...
+ * requestObserver.onCompleted();
+ *
+ * } catch (RetryException e) {
+ * exception.compareAndSet(null, e);
+ * finishLatch.countDown();
+ * }
+ * }
+ * @Override
+ * public void onCompleted() {
+ * finishLatch.countDown();
+ * }
+ * }
+ *
+ * Retrier.Backoff backoff = retrier.newBackoff();
+ * List<Status> errors = Collections.synchronizedList(new ArrayList<Status>());
+ * while (true) {
+ * CountDownLatch finishLatch = new CountDownLatch(items.size());
+ * for (Item item : items) {
+ * requestObserver = myStub.asyncCall(
+ * request,
+ * new RetryingObserver(finishLatch, backoff));
+ * requestObserver.onNext(i1);
+ * requestObserver.onNext(i2);
+ * ...
+ * requestObserver.onCompleted();
+ * }
+ * finishLatch.await(someTime, TimeUnit.SECONDS);
+ * if (exception.get() != null) {
+ * throw exception.get(); // Re-throw the first encountered exception.
+ * }
+ * }
+ * </pre>
+ *
+ * In both cases you need to instantiate and keep a Backoff object, and use onFailure(s) to retry.
+ */
+public class Retrier {
+ /**
+ * Backoff is a stateful object providing a sequence of durations that are used to time delays
+ * between retries. It is not ThreadSafe. The reason that Backoff needs to be stateful, rather
+ * than a static map of attempt number to delay, is to enable using the retrier via the manual
+ * onFailure(backoff, e) method (see multiple async gRPC calls example above).
+ */
+ public interface Backoff {
+
+ /** Indicates that no more retries should be made for use in {@link #nextDelayMillis()}. */
+ static final long STOP = -1L;
+
+ /** Returns the next delay in milliseconds, or < 0 if we should not continue retrying. */
+ long nextDelayMillis();
+
+ /**
+ * Returns the number of calls to {@link #nextDelayMillis()} thus far, not counting any calls
+ * that returned STOP.
+ */
+ int getRetryAttempts();
+
+ /**
+ * Creates a Backoff supplier for a Backoff which does not support any retries. Both the
+ * Supplier and the Backoff are stateless and thread-safe.
+ */
+ static final Supplier<Backoff> NO_RETRIES =
+ () ->
+ new Backoff() {
+ @Override
+ public long nextDelayMillis() {
+ return STOP;
+ }
+
+ @Override
+ public int getRetryAttempts() {
+ return 0;
+ }
+ };
+
+ /**
+ * Creates a Backoff supplier for an optionally jittered exponential backoff. The supplier is
+ * ThreadSafe (non-synchronized calls to get() are fine), but the returned Backoff is not.
+ *
+ * @param initial The initial backoff duration.
+ * @param max The maximum backoff duration.
+ * @param multiplier The amount the backoff should increase in each iteration. Must be >1.
+ * @param jitter The amount the backoff should be randomly varied (0-1), with 0 providing no
+ * jitter, and 1 providing a duration that is 0-200% of the non-jittered duration.
+ * @param maxAttempts Maximal times to attempt a retry 0 means no retries.
+ */
+ static Supplier<Backoff> exponential(
+ Duration initial, Duration max, double multiplier, double jitter, int maxAttempts) {
+ Preconditions.checkArgument(multiplier > 1, "multipler must be > 1");
+ Preconditions.checkArgument(jitter >= 0 && jitter <= 1, "jitter must be in the range (0, 1)");
+ Preconditions.checkArgument(maxAttempts >= 0, "maxAttempts must be >= 0");
+ return () ->
+ new Backoff() {
+ private final long maxMillis = max.toMillis();
+ private long nextDelayMillis = initial.toMillis();
+ private int attempts = 0;
+
+ @Override
+ public long nextDelayMillis() {
+ if (attempts == maxAttempts) {
+ return STOP;
+ }
+ attempts++;
+ double jitterRatio = jitter * (ThreadLocalRandom.current().nextDouble(2.0) - 1);
+ long result = (long) (nextDelayMillis * (1 + jitterRatio));
+ // Advance current by the non-jittered result.
+ nextDelayMillis = (long) (nextDelayMillis * multiplier);
+ if (nextDelayMillis > maxMillis) {
+ nextDelayMillis = maxMillis;
+ }
+ return result;
+ }
+
+ @Override
+ public int getRetryAttempts() {
+ return attempts;
+ }
+ };
+ }
+ }
+
+ public static final Predicate<Status> DEFAULT_IS_RETRIABLE =
+ st -> {
+ switch (st.getCode()) {
+ case CANCELLED:
+ return !Thread.currentThread().isInterrupted();
+ case UNKNOWN:
+ case DEADLINE_EXCEEDED:
+ case ABORTED:
+ case INTERNAL:
+ case UNAVAILABLE:
+ case UNAUTHENTICATED:
+ return true;
+ default:
+ return false;
+ }
+ };
+
+ public static final Predicate<Status> RETRY_ALL = Predicates.alwaysTrue();
+ public static final Predicate<Status> RETRY_NONE = Predicates.alwaysFalse();
+ public static final Retrier NO_RETRIES = new Retrier(Backoff.NO_RETRIES, RETRY_NONE);
+
+ private final Supplier<Backoff> backoffSupplier;
+ private final Predicate<Status> isRetriable;
+
+ @VisibleForTesting
+ Retrier(Supplier<Backoff> backoffSupplier, Predicate<Status> isRetriable) {
+ this.backoffSupplier = backoffSupplier;
+ this.isRetriable = isRetriable;
+ }
+
+ public Retrier(RemoteOptions options) {
+ this(
+ options.experimentalRemoteRetry
+ ? Backoff.exponential(
+ Duration.ofMillis(options.experimentalRemoteRetryStartDelayMillis),
+ Duration.ofMillis(options.experimentalRemoteRetryMaxDelayMillis),
+ options.experimentalRemoteRetryMultiplier,
+ options.experimentalRemoteRetryJitter,
+ options.experimentalRemoteRetryMaxAttempts)
+ : Backoff.NO_RETRIES,
+ DEFAULT_IS_RETRIABLE);
+ }
+
+ /**
+ * Executes the given callable in a loop, retrying on retryable errors, as defined by the current
+ * backoff/retry policy. Will raise the last encountered retriable error, or the first
+ * non-retriable error.
+ *
+ * @param c The callable to execute.
+ */
+ public <T> T execute(Callable<T> c) throws InterruptedException, RetryException {
+ Backoff backoff = backoffSupplier.get();
+ while (true) {
+ try {
+ return c.call();
+ } catch (StatusException | StatusRuntimeException e) {
+ onFailure(backoff, Status.fromThrowable(e));
+ } catch (Exception e) {
+ // Generic catch because Callable is declared to throw Exception.
+ Throwables.throwIfUnchecked(e);
+ throw new RetryException(e, backoff.getRetryAttempts());
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void sleep(long timeMillis) throws InterruptedException {
+ Preconditions.checkArgument(
+ timeMillis >= 0L, "timeMillis must not be negative: %s", timeMillis);
+ TimeUnit.MILLISECONDS.sleep(timeMillis);
+ }
+
+ public Backoff newBackoff() {
+ return backoffSupplier.get();
+ }
+
+ public void onFailure(Backoff backoff, Status s) throws RetryException, InterruptedException {
+ onFailures(backoff, ImmutableList.of(s));
+ }
+
+ /**
+ * Handles failures according to the current backoff/retry policy. If any of the errors are not
+ * retriable, the first such error is thrown. Otherwise, if backoff still allows, this sleeps for
+ * the specified duration. Otherwise, the first error is thrown.
+ *
+ * @param backoff The backoff object to get delays from.
+ * @param errors The errors that occurred (must be non-empty).
+ */
+ public void onFailures(Backoff backoff, List<Status> errors)
+ throws InterruptedException, RetryException {
+ Preconditions.checkArgument(!errors.isEmpty(), "errors must be non-empty");
+ long delay = backoff.nextDelayMillis();
+ for (Status st : errors) {
+ if (st.getCode() == Status.Code.CANCELLED && Thread.currentThread().isInterrupted()) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedException();
+ }
+ if (delay < 0 || !isRetriable.apply(st)) {
+ throw new RetryException(st.asRuntimeException(), backoff.getRetryAttempts());
+ }
+ }
+ sleep(delay);
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RetryException.java b/src/main/java/com/google/devtools/build/lib/remote/RetryException.java
new file mode 100644
index 0000000000..6e9890afd6
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/RetryException.java
@@ -0,0 +1,43 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.remote;
+
+import io.grpc.Status.Code;
+import io.grpc.StatusRuntimeException;
+import java.io.IOException;
+
+/** An exception to indicate failed retry attempts. */
+public final class RetryException extends IOException {
+ private final int attempts;
+
+ RetryException(Throwable cause, int retryAttempts) {
+ super(cause);
+ this.attempts = retryAttempts + 1;
+ }
+
+ public int getAttempts() {
+ return attempts;
+ }
+
+ public boolean causedByStatusCode(Code code) {
+ return getCause() instanceof StatusRuntimeException
+ && ((StatusRuntimeException) getCause()).getStatus().getCode() == code;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("after %d attempts: %s", attempts, getCause());
+ }
+}