diff options
author | 2017-06-30 05:07:13 +0200 | |
---|---|---|
committer | 2017-06-30 13:01:12 +0200 | |
commit | fdd66ef1b1dbee1663676cdf4b36ddbe139a35bf (patch) | |
tree | f4abdcf4adc85e1ecb56167bba8eade94cbcd376 /src/main/java/com/google/devtools | |
parent | 5f00cd2b1cde682f77a47439e9dc631349992f9e (diff) |
Implement retry logic for the gRPC calls in remote execution and caching. The
retry strategy may need tuning.
Other behavior changes: swallowing gRPC CANCELLED errors when the thread is interrupted, as these are expected and just make debugging difficult. Also, distinguishing between the gRPC DEADLINE_EXCEEDED caused by the actual command timing out on the server vs. other causes (the former should not be retriable, while the latter should retry).
TESTED=unit tests, remote worker on Bazel
PiperOrigin-RevId: 160605830
Diffstat (limited to 'src/main/java/com/google/devtools')
7 files changed, 701 insertions, 225 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java index 7f0a04812f..23899b5549 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java @@ -21,6 +21,7 @@ import com.google.bytestream.ByteStreamProto.ReadRequest; import com.google.bytestream.ByteStreamProto.ReadResponse; import com.google.bytestream.ByteStreamProto.WriteRequest; import com.google.bytestream.ByteStreamProto.WriteResponse; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; @@ -53,6 +54,7 @@ import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest; import com.google.protobuf.ByteString; import io.grpc.Channel; import io.grpc.Status; +import io.grpc.StatusException; import io.grpc.StatusRuntimeException; import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; @@ -60,7 +62,11 @@ import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -72,66 +78,54 @@ public class GrpcActionCache implements RemoteActionCache { private final RemoteOptions options; private final ChannelOptions channelOptions; private final Channel channel; + private final Retrier retrier; + // All gRPC stubs are reused. + private final Supplier<ContentAddressableStorageBlockingStub> casBlockingStub; + private final Supplier<ByteStreamBlockingStub> bsBlockingStub; + private final Supplier<ByteStreamStub> bsStub; + private final Supplier<ActionCacheBlockingStub> acBlockingStub; + @VisibleForTesting public GrpcActionCache(Channel channel, ChannelOptions channelOptions, RemoteOptions options) { this.options = options; this.channelOptions = channelOptions; this.channel = channel; + this.retrier = new Retrier(options); + casBlockingStub = + Suppliers.memoize( + () -> + ContentAddressableStorageGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS)); + bsBlockingStub = + Suppliers.memoize( + () -> + ByteStreamGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS)); + bsStub = + Suppliers.memoize( + () -> + ByteStreamGrpc.newStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS)); + acBlockingStub = + Suppliers.memoize( + () -> + ActionCacheGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS)); } @Override public void close() {} - // All gRPC stubs are reused. - private final Supplier<ContentAddressableStorageBlockingStub> casBlockingStub = - Suppliers.memoize( - new Supplier<ContentAddressableStorageBlockingStub>() { - @Override - public ContentAddressableStorageBlockingStub get() { - return ContentAddressableStorageGrpc.newBlockingStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); - } - }); - - private final Supplier<ByteStreamBlockingStub> bsBlockingStub = - Suppliers.memoize( - new Supplier<ByteStreamBlockingStub>() { - @Override - public ByteStreamBlockingStub get() { - return ByteStreamGrpc.newBlockingStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); - } - }); - - private final Supplier<ByteStreamStub> bsStub = - Suppliers.memoize( - new Supplier<ByteStreamStub>() { - @Override - public ByteStreamStub get() { - return ByteStreamGrpc.newStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); - } - }); - - private final Supplier<ActionCacheBlockingStub> acBlockingStub = - Suppliers.memoize( - new Supplier<ActionCacheBlockingStub>() { - @Override - public ActionCacheBlockingStub get() { - return ActionCacheGrpc.newBlockingStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); - } - }); - public static boolean isRemoteCacheOptions(RemoteOptions options) { return options.remoteCache != null; } - private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests) { + private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests) + throws IOException, InterruptedException { FindMissingBlobsRequest.Builder request = FindMissingBlobsRequest.newBuilder() .setInstanceName(options.remoteInstanceName) @@ -139,7 +133,8 @@ public class GrpcActionCache implements RemoteActionCache { if (request.getBlobDigestsCount() == 0) { return ImmutableSet.of(); } - FindMissingBlobsResponse response = casBlockingStub.get().findMissingBlobs(request.build()); + FindMissingBlobsResponse response = + retrier.execute(() -> casBlockingStub.get().findMissingBlobs(request.build())); return ImmutableSet.copyOf(response.getMissingBlobDigestsList()); } @@ -150,7 +145,7 @@ public class GrpcActionCache implements RemoteActionCache { @Override public void ensureInputsPresent( TreeNodeRepository repository, Path execRoot, TreeNode root, Command command) - throws IOException, InterruptedException { + throws IOException, InterruptedException { repository.computeMerkleDigests(root); // TODO(olaola): avoid querying all the digests, only ask for novel subtrees. ImmutableSet<Digest> missingDigests = getMissingDigests(repository.getAllDigests(root)); @@ -165,20 +160,23 @@ public class GrpcActionCache implements RemoteActionCache { BatchUpdateBlobsRequest.Builder treeBlobRequest = BatchUpdateBlobsRequest.newBuilder().setInstanceName(options.remoteInstanceName); for (Directory d : treeNodes) { - final byte[] data = d.toByteArray(); + byte[] data = d.toByteArray(); treeBlobRequest .addRequestsBuilder() .setContentDigest(Digests.computeDigest(data)) .setData(ByteString.copyFrom(data)); } - BatchUpdateBlobsResponse response = - casBlockingStub.get().batchUpdateBlobs(treeBlobRequest.build()); - // TODO(olaola): handle retries on transient errors. - for (BatchUpdateBlobsResponse.Response r : response.getResponsesList()) { - if (!Status.fromCodeValue(r.getStatus().getCode()).isOk()) { - throw StatusProto.toStatusRuntimeException(r.getStatus()); - } - } + retrier.execute( + () -> { + BatchUpdateBlobsResponse response = + casBlockingStub.get().batchUpdateBlobs(treeBlobRequest.build()); + for (BatchUpdateBlobsResponse.Response r : response.getResponsesList()) { + if (!Status.fromCodeValue(r.getStatus().getCode()).isOk()) { + throw StatusProto.toStatusRuntimeException(r.getStatus()); + } + } + return null; + }); } uploadBlob(command.toByteArray()); if (!actionInputs.isEmpty()) { @@ -204,7 +202,7 @@ public class GrpcActionCache implements RemoteActionCache { */ @Override public void download(ActionResult result, Path execRoot, FileOutErr outErr) - throws IOException, CacheNotFoundException { + throws IOException, InterruptedException, CacheNotFoundException { for (OutputFile file : result.getOutputFilesList()) { Path path = execRoot.getRelative(file.getPath()); FileSystemUtils.createDirectoryAndParents(path.getParentDirectory()); @@ -213,15 +211,21 @@ public class GrpcActionCache implements RemoteActionCache { // Handle empty file locally. FileSystemUtils.writeContent(path, new byte[0]); } else { - try (OutputStream stream = path.getOutputStream()) { - if (!file.getContent().isEmpty()) { + if (!file.getContent().isEmpty()) { + try (OutputStream stream = path.getOutputStream()) { file.getContent().writeTo(stream); - } else { - Iterator<ReadResponse> replies = readBlob(digest); - while (replies.hasNext()) { - replies.next().getData().writeTo(stream); - } } + } else { + retrier.execute( + () -> { + try (OutputStream stream = path.getOutputStream()) { + Iterator<ReadResponse> replies = readBlob(digest); + while (replies.hasNext()) { + replies.next().getData().writeTo(stream); + } + return null; + } + }); } } path.setExecutable(file.getIsExecutable()); @@ -234,7 +238,7 @@ public class GrpcActionCache implements RemoteActionCache { } private void downloadOutErr(ActionResult result, FileOutErr outErr) - throws IOException, CacheNotFoundException { + throws IOException, InterruptedException, CacheNotFoundException { if (!result.getStdoutRaw().isEmpty()) { result.getStdoutRaw().writeTo(outErr.getOutputStream()); outErr.getOutputStream().flush(); @@ -272,20 +276,21 @@ public class GrpcActionCache implements RemoteActionCache { } @Override - public void upload( - ActionKey actionKey, Path execRoot, Collection<Path> files, FileOutErr outErr) - throws IOException, InterruptedException { + public void upload(ActionKey actionKey, Path execRoot, Collection<Path> files, FileOutErr outErr) + throws IOException, InterruptedException { ActionResult.Builder result = ActionResult.newBuilder(); upload(execRoot, files, outErr, result); try { - acBlockingStub - .get() - .updateActionResult( - UpdateActionResultRequest.newBuilder() - .setInstanceName(options.remoteInstanceName) - .setActionDigest(actionKey.getDigest()) - .setActionResult(result) - .build()); + retrier.execute( + () -> + acBlockingStub + .get() + .updateActionResult( + UpdateActionResultRequest.newBuilder() + .setInstanceName(options.remoteInstanceName) + .setActionDigest(actionKey.getDigest()) + .setActionResult(result) + .build())); } catch (StatusRuntimeException e) { if (e.getStatus().getCode() != Status.Code.UNIMPLEMENTED) { throw e; @@ -294,7 +299,7 @@ public class GrpcActionCache implements RemoteActionCache { } void upload(Path execRoot, Collection<Path> files, FileOutErr outErr, ActionResult.Builder result) - throws IOException, InterruptedException { + throws IOException, InterruptedException { ArrayList<Digest> digests = new ArrayList<>(); Chunker.Builder b = new Chunker.Builder(); for (Path file : files) { @@ -324,7 +329,7 @@ public class GrpcActionCache implements RemoteActionCache { .setDigest(digests.get(index++)) .setIsExecutable(file.isExecutable()); } - // TODO(ulfjack): Use the Chunker also for stdout / stderr. + // TODO(olaola): inline small stdout/stderr here. if (outErr.getErrorPath().exists()) { Digest stderr = uploadFileContents(outErr.getErrorPath()); result.setStderrDigest(stderr); @@ -356,8 +361,7 @@ public class GrpcActionCache implements RemoteActionCache { * * @return The key for fetching the file contents blob from cache. */ - Digest uploadFileContents( - ActionInput input, Path execRoot, ActionInputFileCache inputCache) + Digest uploadFileContents(ActionInput input, Path execRoot, ActionInputFileCache inputCache) throws IOException, InterruptedException { Digest digest = Digests.getDigestFromInputCache(input, inputCache); ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest)); @@ -369,80 +373,98 @@ public class GrpcActionCache implements RemoteActionCache { private void uploadChunks(int numItems, Chunker.Builder chunkerBuilder) throws InterruptedException, IOException { - final CountDownLatch finishLatch = new CountDownLatch(numItems); - final AtomicReference<RuntimeException> exception = new AtomicReference<>(null); - StreamObserver<WriteRequest> requestObserver = null; String resourceName = ""; if (!options.remoteInstanceName.isEmpty()) { resourceName += options.remoteInstanceName + "/"; } + Retrier.Backoff backoff = retrier.newBackoff(); Chunker chunker = chunkerBuilder.build(); - while (chunker.hasNext()) { - Chunker.Chunk chunk = chunker.next(); - final Digest digest = chunk.getDigest(); - long offset = chunk.getOffset(); - WriteRequest.Builder request = WriteRequest.newBuilder(); - if (offset == 0) { // Beginning of new upload. - numItems--; - request.setResourceName( - resourceName - + "uploads/" - + UUID.randomUUID() - + "/blobs/" - + digest.getHash() - + "/" - + digest.getSizeBytes()); - // The batches execute simultaneously. - requestObserver = - bsStub - .get() - .write( - new StreamObserver<WriteResponse>() { - private long bytesLeft = digest.getSizeBytes(); + while (true) { // Retry until either uploaded everything or raised an exception. + CountDownLatch finishLatch = new CountDownLatch(numItems); + AtomicReference<IOException> crashException = new AtomicReference<>(null); + List<Status> errors = Collections.synchronizedList(new ArrayList<Status>()); + Set<Digest> failedDigests = Collections.synchronizedSet(new HashSet<Digest>()); + StreamObserver<WriteRequest> requestObserver = null; + while (chunker.hasNext()) { + Chunker.Chunk chunk = chunker.next(); + Digest digest = chunk.getDigest(); + long offset = chunk.getOffset(); + WriteRequest.Builder request = WriteRequest.newBuilder(); + if (offset == 0) { // Beginning of new upload. + numItems--; + request.setResourceName( + String.format( + "%s/uploads/%s/blobs/%s/%d", + resourceName, UUID.randomUUID(), digest.getHash(), digest.getSizeBytes())); + // The batches execute simultaneously. + requestObserver = + bsStub + .get() + .write( + new StreamObserver<WriteResponse>() { + private long bytesLeft = digest.getSizeBytes(); + + @Override + public void onNext(WriteResponse reply) { + bytesLeft -= reply.getCommittedSize(); + } - @Override - public void onNext(WriteResponse reply) { - bytesLeft -= reply.getCommittedSize(); - } + @Override + public void onError(Throwable t) { + // In theory, this can be any error, even though it's supposed to usually + // be only StatusException or StatusRuntimeException. We have to check + // for other errors, in order to not accidentally retry them! + if (!(t instanceof StatusRuntimeException + || t instanceof StatusException)) { + crashException.compareAndSet(null, new IOException(t)); + } - @Override - public void onError(Throwable t) { - exception.compareAndSet( - null, new StatusRuntimeException(Status.fromThrowable(t))); - finishLatch.countDown(); - } + failedDigests.add(digest); + errors.add(Status.fromThrowable(t)); + finishLatch.countDown(); + } - @Override - public void onCompleted() { - if (bytesLeft != 0) { - exception.compareAndSet( - null, new RuntimeException("Server did not commit all data.")); + @Override + public void onCompleted() { + // This can actually happen even if we did not send all the bytes, + // if the server has and is able to reuse parts of the uploaded blob. + finishLatch.countDown(); } - finishLatch.countDown(); - } - }); + }); + } + byte[] data = chunk.getData(); + boolean finishWrite = offset + data.length == digest.getSizeBytes(); + request + .setData(ByteString.copyFrom(data)) + .setWriteOffset(offset) + .setFinishWrite(finishWrite); + requestObserver.onNext(request.build()); + if (finishWrite) { + requestObserver.onCompleted(); + } + if (finishLatch.getCount() <= numItems) { + // Current RPC errored before we finished sending. + if (!finishWrite) { + chunker.advanceInput(); + } + } } - byte[] data = chunk.getData(); - boolean finishWrite = offset + data.length == digest.getSizeBytes(); - request.setData(ByteString.copyFrom(data)).setWriteOffset(offset).setFinishWrite(finishWrite); - requestObserver.onNext(request.build()); - if (finishWrite) { - requestObserver.onCompleted(); + finishLatch.await(options.remoteTimeout, TimeUnit.SECONDS); + if (crashException.get() != null) { + throw crashException.get(); // Re-throw the exception that is supposed to never happen. } - if (finishLatch.getCount() <= numItems) { - // Current RPC errored before we finished sending. - if (!finishWrite) { - chunker.advanceInput(); - } + if (failedDigests.isEmpty()) { + return; // Successfully sent everything. } - } - finishLatch.await(options.remoteTimeout, TimeUnit.SECONDS); - if (exception.get() != null) { - throw exception.get(); // Re-throw the first encountered exception. + retrier.onFailures(backoff, errors); // This will throw when out of retries. + // We don't have to synchronize on failedDigests now, because after finishLatch.await we're + // back to single threaded execution. + chunker = chunkerBuilder.onlyUseDigests(failedDigests).build(); + numItems = failedDigests.size(); } } - Digest uploadBlob(byte[] blob) throws InterruptedException { + Digest uploadBlob(byte[] blob) throws IOException, InterruptedException { Digest digest = Digests.computeDigest(blob); ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest)); try { @@ -456,19 +478,24 @@ public class GrpcActionCache implements RemoteActionCache { } } - byte[] downloadBlob(Digest digest) throws CacheNotFoundException { + byte[] downloadBlob(Digest digest) + throws IOException, InterruptedException, CacheNotFoundException { if (digest.getSizeBytes() == 0) { return new byte[0]; } - Iterator<ReadResponse> replies = readBlob(digest); byte[] result = new byte[(int) digest.getSizeBytes()]; - int offset = 0; - while (replies.hasNext()) { - ByteString data = replies.next().getData(); - data.copyTo(result, offset); - offset += data.size(); - } - Preconditions.checkState(digest.getSizeBytes() == offset); + retrier.execute( + () -> { + Iterator<ReadResponse> replies = readBlob(digest); + int offset = 0; + while (replies.hasNext()) { + ByteString data = replies.next().getData(); + data.copyTo(result, offset); + offset += data.size(); + } + Preconditions.checkState(digest.getSizeBytes() == offset); + return null; + }); return result; } @@ -476,17 +503,20 @@ public class GrpcActionCache implements RemoteActionCache { /** Returns a cached result for a given Action digest, or null if not found in cache. */ @Override - public ActionResult getCachedActionResult(ActionKey actionKey) { + public ActionResult getCachedActionResult(ActionKey actionKey) + throws IOException, InterruptedException { try { - return acBlockingStub - .get() - .getActionResult( - GetActionResultRequest.newBuilder() - .setInstanceName(options.remoteInstanceName) - .setActionDigest(actionKey.getDigest()) - .build()); - } catch (StatusRuntimeException e) { - if (e.getStatus().getCode() == Status.Code.NOT_FOUND) { + return retrier.execute( + () -> + acBlockingStub + .get() + .getActionResult( + GetActionResultRequest.newBuilder() + .setInstanceName(options.remoteInstanceName) + .setActionDigest(actionKey.getDigest()) + .build())); + } catch (RetryException e) { + if (e.causedByStatusCode(Status.Code.NOT_FOUND)) { return null; } throw e; diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index 815b9237d0..51bc0b6fa7 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -16,6 +16,7 @@ package com.google.devtools.build.lib.remote; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import com.google.devtools.build.lib.actions.UserExecException; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.remoteexecution.v1test.ExecuteRequest; @@ -24,7 +25,6 @@ import com.google.devtools.remoteexecution.v1test.ExecutionGrpc; import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionBlockingStub; import com.google.longrunning.Operation; import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.util.Durations; import com.google.rpc.Status; import com.google.watcher.v1.Change; import com.google.watcher.v1.ChangeBatch; @@ -32,7 +32,10 @@ import com.google.watcher.v1.Request; import com.google.watcher.v1.WatcherGrpc; import com.google.watcher.v1.WatcherGrpc.WatcherBlockingStub; import io.grpc.Channel; +import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; import io.grpc.protobuf.StatusProto; +import java.io.IOException; import java.util.Iterator; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -43,18 +46,11 @@ public class GrpcRemoteExecutor { private final RemoteOptions options; private final ChannelOptions channelOptions; private final Channel channel; + private final Retrier retrier; - // Reuse the gRPC stub. - private final Supplier<ExecutionBlockingStub> execBlockingStub = - Suppliers.memoize( - new Supplier<ExecutionBlockingStub>() { - @Override - public ExecutionBlockingStub get() { - return ExecutionGrpc.newBlockingStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); - } - }); + // Reuse the gRPC stubs. + private final Supplier<ExecutionBlockingStub> execBlockingStub; + private final Supplier<WatcherBlockingStub> watcherBlockingStub; public static boolean isRemoteExecutionOptions(RemoteOptions options) { return options.remoteExecutor != null; @@ -64,68 +60,94 @@ public class GrpcRemoteExecutor { this.options = options; this.channelOptions = channelOptions; this.channel = channel; + this.retrier = new Retrier(options); + execBlockingStub = + Suppliers.memoize( + () -> + ExecutionGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS)); + // Do not set a deadline on this call, because it is hard to estimate in + // advance how much time we should give the server to execute an action + // remotely (scheduling, queing, optional retries, etc.) + // It is the server's responsibility to respect the Action timeout field. + watcherBlockingStub = + Suppliers.memoize( + () -> + WatcherGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials())); } - private @Nullable ExecuteResponse getOperationResponse(Operation op) { + private @Nullable ExecuteResponse getOperationResponse(Operation op) + throws IOException, UserExecException { if (op.getResultCase() == Operation.ResultCase.ERROR) { - throw StatusProto.toStatusRuntimeException(op.getError()); + StatusRuntimeException e = StatusProto.toStatusRuntimeException(op.getError()); + if (e.getStatus().getCode() == Code.DEADLINE_EXCEEDED) { + // This was caused by the command itself exceeding the timeout, + // therefore it is not retriable. + // TODO(olaola): this should propagate a timeout SpawnResult instead of raising. + throw new UserExecException("Remote execution time out", true); + } + throw e; } if (op.getDone()) { Preconditions.checkState(op.getResultCase() != Operation.ResultCase.RESULT_NOT_SET); try { return op.getResponse().unpack(ExecuteResponse.class); } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); + throw new IOException(e); } } return null; } - public ExecuteResponse executeRemotely(ExecuteRequest request) { - Operation op = execBlockingStub.get().execute(request); + public ExecuteResponse executeRemotely(ExecuteRequest request) + throws InterruptedException, IOException, UserExecException { + Operation op = retrier.execute(() -> execBlockingStub.get().execute(request)); ExecuteResponse resp = getOperationResponse(op); if (resp != null) { return resp; } - int actionSeconds = (int) Durations.toSeconds(request.getAction().getTimeout()); - WatcherBlockingStub stub = - WatcherGrpc.newBlockingStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(options.remoteTimeout + actionSeconds, TimeUnit.SECONDS); Request wr = Request.newBuilder().setTarget(op.getName()).build(); - Iterator<ChangeBatch> replies = stub.watch(wr); - while (replies.hasNext()) { - ChangeBatch cb = replies.next(); - for (Change ch : cb.getChangesList()) { - switch (ch.getState()) { - case INITIAL_STATE_SKIPPED: - continue; - case ERROR: - try { - throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class)); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - case DOES_NOT_EXIST: - throw new RuntimeException( - String.format("Operation %s lost on the remote server.", op.getName())); - case EXISTS: - try { - op = ch.getData().unpack(Operation.class); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - resp = getOperationResponse(op); - if (resp != null) { - return resp; + return retrier.execute( + () -> { + Iterator<ChangeBatch> replies = watcherBlockingStub.get().watch(wr); + while (replies.hasNext()) { + ChangeBatch cb = replies.next(); + for (Change ch : cb.getChangesList()) { + switch (ch.getState()) { + case INITIAL_STATE_SKIPPED: + continue; + case ERROR: + try { + throw StatusProto.toStatusRuntimeException(ch.getData().unpack(Status.class)); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + case DOES_NOT_EXIST: + // TODO(olaola): either make this retriable, or use a different exception. + throw new IOException( + String.format("Operation %s lost on the remote server.", op.getName())); + case EXISTS: + Operation o; + try { + o = ch.getData().unpack(Operation.class); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + ExecuteResponse r = getOperationResponse(o); + if (r != null) { + return r; + } + continue; + default: + // This can only happen if the enum gets unexpectedly extended. + throw new IOException(String.format("Illegal change state: %s", ch.getState())); + } } - continue; - default: - throw new RuntimeException(String.format("Illegal change state: %s", ch.getState())); - } - } - } - throw new RuntimeException( - String.format("Watch request for %s terminated with no result.", op.getName())); + } + throw new IOException( + String.format("Watch request for %s terminated with no result.", op.getName())); + }); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java index 994c23e1e6..f8c53268c5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java @@ -58,15 +58,14 @@ interface RemoteActionCache { */ // TODO(olaola): will need to amend to include the TreeNodeRepository for updating. void download(ActionResult result, Path execRoot, FileOutErr outErr) - throws IOException, CacheNotFoundException; - + throws IOException, InterruptedException, CacheNotFoundException; /** * Attempts to look up the given action in the remote cache and return its result, if present. * Returns {@code null} if there is no such entry. Note that a successful result from this method * does not guarantee the availability of the corresponding output files in the remote cache. */ @Nullable - ActionResult getCachedActionResult(ActionKey actionKey); + ActionResult getCachedActionResult(ActionKey actionKey) throws IOException, InterruptedException; /** * Upload the result of a locally executed action to the cache by uploading any necessary files, diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java index 07aaff11f7..d472adae09 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java @@ -154,4 +154,52 @@ public final class RemoteOptions extends OptionsBase { help = "Value to pass as instance_name in the remote execution API." ) public String remoteInstanceName; + + @Option( + name = "experimental_remote_retry", + defaultValue = "true", + category = "remote", + help = "Whether to retry transient remote execution/cache errors." + ) + public boolean experimentalRemoteRetry; + + @Option( + name = "experimental_remote_retry_start_delay_millis", + defaultValue = "100", + category = "remote", + help = "The initial delay before retrying a transient error." + ) + public long experimentalRemoteRetryStartDelayMillis; + + @Option( + name = "experimental_remote_retry_max_delay_millis", + defaultValue = "5000", + category = "remote", + help = "The maximum delay before retrying a transient error." + ) + public long experimentalRemoteRetryMaxDelayMillis; + + @Option( + name = "experimental_remote_retry_max_attempts", + defaultValue = "5", + category = "remote", + help = "The maximum number of attempts to retry a transient error." + ) + public int experimentalRemoteRetryMaxAttempts; + + @Option( + name = "experimental_remote_retry_multiplier", + defaultValue = "2", + category = "remote", + help = "The multiplier by which to increase the retry delay on transient errors." + ) + public double experimentalRemoteRetryMultiplier; + + @Option( + name = "experimental_remote_retry_jitter", + defaultValue = "0.1", + category = "remote", + help = "The random factor to apply to retry delays on transient errors." + ) + public double experimentalRemoteRetryJitter; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java index c0952c0282..8426ae3864 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java @@ -297,13 +297,7 @@ final class RemoteSpawnStrategy implements SpawnActionContext { verboseFailures, spawn.getArguments(), spawn.getEnvironment(), cwd); throw new UserExecException(message + ": Exit " + result.getExitCode()); } - } catch (IOException e) { - throw new UserExecException("Unexpected IO error.", e); - } catch (InterruptedException e) { - eventHandler.handle(Event.warn(mnemonic + " remote work interrupted (" + e + ")")); - Thread.currentThread().interrupt(); - throw e; - } catch (StatusRuntimeException e) { + } catch (RetryException e) { String stackTrace = ""; if (verboseFailures) { stackTrace = "\n" + Throwables.getStackTraceAsString(e); @@ -312,7 +306,7 @@ final class RemoteSpawnStrategy implements SpawnActionContext { if (remoteOptions.remoteLocalFallback) { execLocally(spawn, actionExecutionContext, remoteCache, actionKey); } else { - throw new UserExecException(e); + throw new UserExecException(e.getCause()); } } catch (CacheNotFoundException e) { // TODO(olaola): handle this exception by reuploading / reexecuting the action remotely. @@ -322,6 +316,8 @@ final class RemoteSpawnStrategy implements SpawnActionContext { } else { throw new UserExecException(e); } + } catch (IOException e) { + throw new UserExecException("Unexpected IO error.", e); } catch (UnsupportedOperationException e) { eventHandler.handle( Event.warn(mnemonic + " unsupported operation for action cache (" + e + ")")); diff --git a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java new file mode 100644 index 0000000000..981af46199 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java @@ -0,0 +1,338 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.devtools.build.lib.util.Preconditions; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +/** + * Supports execution with retries on particular gRPC Statuses. The retrier is ThreadSafe. + * + * <p>Example usage: The simple use-case is to call retrier.execute, e.g: + * + * <pre> + * foo = retrier.execute( + * new Callable<Foo>() { + * @Override + * public Foo call() { + * return grpcStub.getFoo(fooRequest); + * } + * }); + * </pre> + * + * <p>When you need to retry multiple asynchronous calls, you can do: + * + * <pre> + * Retrier.Backoff backoff = retrier.newBackoff(); + * List<Status> errors = Collections.synchronizedList(new ArrayList<Status>()); + * while (true) { + * CountDownLatch finishLatch = new CountDownLatch(items.size()); + * for (Item item : items) { + * requestObserver = myStub.asyncCall( + * request, + * new StreamObserver<Response>() { + * ... + * + * @Override + * public void onError(Throwable t) { + * // Need to handle non Status errors here! + * errors.add(Status.fromThrowable(t)); + * finishLatch.countDown(); + * } + * @Override + * public void onCompleted() { + * finishLatch.countDown(); + * } + * }); + * requestObserver.onNext(i1); + * requestObserver.onNext(i2); + * ... + * requestObserver.onCompleted(); + * } + * finishLatch.await(someTime, TimeUnit.SECONDS); + * if (errors.isEmpty()) { + * return; + * } + * retrier.onFailures(backoff, errors); // Sleep once for the whole batch of failures. + * items = failingItems; // this needs to be collected from the observers as well. + * } + * </pre> + * + * <p>This retries the multiple calls in bulk. Another way to do it is retry each call separately as + * it occurs: + * + * <pre> + * class RetryingObserver extends StreamObserver<Response> { + * private final CountDownLatch finishLatch; + * private final Backoff backoff; + * private final AtomicReference<RuntimeException> exception; + * + * RetryingObserver( + * CountDownLatch finishLatch, Backoff backoff, AtomicReference<RuntimeException> exception) { + * this.finishLatch = finishLatch; + * this.backoff = backoff; + * this.exception = exception; + * } + * + * @Override + * public void onError(Throwable t) { + * // Need to handle non Status errors here first! + * try { + * retrier.onFailure(backoff, Status.fromThrowable(t)); + * + * // This assumes you passed through the relevant info to recreate the original request: + * requestObserver = myStub.asyncCall( + * request, + * new RetryingObserver(finishLatch, backoff)); // Recursion! + * requestObserver.onNext(i1); + * requestObserver.onNext(i2); + * ... + * requestObserver.onCompleted(); + * + * } catch (RetryException e) { + * exception.compareAndSet(null, e); + * finishLatch.countDown(); + * } + * } + * @Override + * public void onCompleted() { + * finishLatch.countDown(); + * } + * } + * + * Retrier.Backoff backoff = retrier.newBackoff(); + * List<Status> errors = Collections.synchronizedList(new ArrayList<Status>()); + * while (true) { + * CountDownLatch finishLatch = new CountDownLatch(items.size()); + * for (Item item : items) { + * requestObserver = myStub.asyncCall( + * request, + * new RetryingObserver(finishLatch, backoff)); + * requestObserver.onNext(i1); + * requestObserver.onNext(i2); + * ... + * requestObserver.onCompleted(); + * } + * finishLatch.await(someTime, TimeUnit.SECONDS); + * if (exception.get() != null) { + * throw exception.get(); // Re-throw the first encountered exception. + * } + * } + * </pre> + * + * In both cases you need to instantiate and keep a Backoff object, and use onFailure(s) to retry. + */ +public class Retrier { + /** + * Backoff is a stateful object providing a sequence of durations that are used to time delays + * between retries. It is not ThreadSafe. The reason that Backoff needs to be stateful, rather + * than a static map of attempt number to delay, is to enable using the retrier via the manual + * onFailure(backoff, e) method (see multiple async gRPC calls example above). + */ + public interface Backoff { + + /** Indicates that no more retries should be made for use in {@link #nextDelayMillis()}. */ + static final long STOP = -1L; + + /** Returns the next delay in milliseconds, or < 0 if we should not continue retrying. */ + long nextDelayMillis(); + + /** + * Returns the number of calls to {@link #nextDelayMillis()} thus far, not counting any calls + * that returned STOP. + */ + int getRetryAttempts(); + + /** + * Creates a Backoff supplier for a Backoff which does not support any retries. Both the + * Supplier and the Backoff are stateless and thread-safe. + */ + static final Supplier<Backoff> NO_RETRIES = + () -> + new Backoff() { + @Override + public long nextDelayMillis() { + return STOP; + } + + @Override + public int getRetryAttempts() { + return 0; + } + }; + + /** + * Creates a Backoff supplier for an optionally jittered exponential backoff. The supplier is + * ThreadSafe (non-synchronized calls to get() are fine), but the returned Backoff is not. + * + * @param initial The initial backoff duration. + * @param max The maximum backoff duration. + * @param multiplier The amount the backoff should increase in each iteration. Must be >1. + * @param jitter The amount the backoff should be randomly varied (0-1), with 0 providing no + * jitter, and 1 providing a duration that is 0-200% of the non-jittered duration. + * @param maxAttempts Maximal times to attempt a retry 0 means no retries. + */ + static Supplier<Backoff> exponential( + Duration initial, Duration max, double multiplier, double jitter, int maxAttempts) { + Preconditions.checkArgument(multiplier > 1, "multipler must be > 1"); + Preconditions.checkArgument(jitter >= 0 && jitter <= 1, "jitter must be in the range (0, 1)"); + Preconditions.checkArgument(maxAttempts >= 0, "maxAttempts must be >= 0"); + return () -> + new Backoff() { + private final long maxMillis = max.toMillis(); + private long nextDelayMillis = initial.toMillis(); + private int attempts = 0; + + @Override + public long nextDelayMillis() { + if (attempts == maxAttempts) { + return STOP; + } + attempts++; + double jitterRatio = jitter * (ThreadLocalRandom.current().nextDouble(2.0) - 1); + long result = (long) (nextDelayMillis * (1 + jitterRatio)); + // Advance current by the non-jittered result. + nextDelayMillis = (long) (nextDelayMillis * multiplier); + if (nextDelayMillis > maxMillis) { + nextDelayMillis = maxMillis; + } + return result; + } + + @Override + public int getRetryAttempts() { + return attempts; + } + }; + } + } + + public static final Predicate<Status> DEFAULT_IS_RETRIABLE = + st -> { + switch (st.getCode()) { + case CANCELLED: + return !Thread.currentThread().isInterrupted(); + case UNKNOWN: + case DEADLINE_EXCEEDED: + case ABORTED: + case INTERNAL: + case UNAVAILABLE: + case UNAUTHENTICATED: + return true; + default: + return false; + } + }; + + public static final Predicate<Status> RETRY_ALL = Predicates.alwaysTrue(); + public static final Predicate<Status> RETRY_NONE = Predicates.alwaysFalse(); + public static final Retrier NO_RETRIES = new Retrier(Backoff.NO_RETRIES, RETRY_NONE); + + private final Supplier<Backoff> backoffSupplier; + private final Predicate<Status> isRetriable; + + @VisibleForTesting + Retrier(Supplier<Backoff> backoffSupplier, Predicate<Status> isRetriable) { + this.backoffSupplier = backoffSupplier; + this.isRetriable = isRetriable; + } + + public Retrier(RemoteOptions options) { + this( + options.experimentalRemoteRetry + ? Backoff.exponential( + Duration.ofMillis(options.experimentalRemoteRetryStartDelayMillis), + Duration.ofMillis(options.experimentalRemoteRetryMaxDelayMillis), + options.experimentalRemoteRetryMultiplier, + options.experimentalRemoteRetryJitter, + options.experimentalRemoteRetryMaxAttempts) + : Backoff.NO_RETRIES, + DEFAULT_IS_RETRIABLE); + } + + /** + * Executes the given callable in a loop, retrying on retryable errors, as defined by the current + * backoff/retry policy. Will raise the last encountered retriable error, or the first + * non-retriable error. + * + * @param c The callable to execute. + */ + public <T> T execute(Callable<T> c) throws InterruptedException, RetryException { + Backoff backoff = backoffSupplier.get(); + while (true) { + try { + return c.call(); + } catch (StatusException | StatusRuntimeException e) { + onFailure(backoff, Status.fromThrowable(e)); + } catch (Exception e) { + // Generic catch because Callable is declared to throw Exception. + Throwables.throwIfUnchecked(e); + throw new RetryException(e, backoff.getRetryAttempts()); + } + } + } + + @VisibleForTesting + void sleep(long timeMillis) throws InterruptedException { + Preconditions.checkArgument( + timeMillis >= 0L, "timeMillis must not be negative: %s", timeMillis); + TimeUnit.MILLISECONDS.sleep(timeMillis); + } + + public Backoff newBackoff() { + return backoffSupplier.get(); + } + + public void onFailure(Backoff backoff, Status s) throws RetryException, InterruptedException { + onFailures(backoff, ImmutableList.of(s)); + } + + /** + * Handles failures according to the current backoff/retry policy. If any of the errors are not + * retriable, the first such error is thrown. Otherwise, if backoff still allows, this sleeps for + * the specified duration. Otherwise, the first error is thrown. + * + * @param backoff The backoff object to get delays from. + * @param errors The errors that occurred (must be non-empty). + */ + public void onFailures(Backoff backoff, List<Status> errors) + throws InterruptedException, RetryException { + Preconditions.checkArgument(!errors.isEmpty(), "errors must be non-empty"); + long delay = backoff.nextDelayMillis(); + for (Status st : errors) { + if (st.getCode() == Status.Code.CANCELLED && Thread.currentThread().isInterrupted()) { + Thread.currentThread().interrupt(); + throw new InterruptedException(); + } + if (delay < 0 || !isRetriable.apply(st)) { + throw new RetryException(st.asRuntimeException(), backoff.getRetryAttempts()); + } + } + sleep(delay); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RetryException.java b/src/main/java/com/google/devtools/build/lib/remote/RetryException.java new file mode 100644 index 0000000000..6e9890afd6 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/RetryException.java @@ -0,0 +1,43 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote; + +import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; +import java.io.IOException; + +/** An exception to indicate failed retry attempts. */ +public final class RetryException extends IOException { + private final int attempts; + + RetryException(Throwable cause, int retryAttempts) { + super(cause); + this.attempts = retryAttempts + 1; + } + + public int getAttempts() { + return attempts; + } + + public boolean causedByStatusCode(Code code) { + return getCause() instanceof StatusRuntimeException + && ((StatusRuntimeException) getCause()).getStatus().getCode() == code; + } + + @Override + public String toString() { + return String.format("after %d attempts: %s", attempts, getCause()); + } +} |