aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java
diff options
context:
space:
mode:
authorGravatar buchgr <buchgr@google.com>2018-06-02 14:13:43 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-06-02 14:15:06 -0700
commitff008f445905bf6f4601a368782b620f7899d322 (patch)
tree7fbfe2ef3d3e794680d12ee42f5d4e0016b6b736 /src/main/java
parentaaf11e91a02a2f42d8bf26cce76df941c8afc8e2 (diff)
remote: concurrent blob downloads. Fixes #5215
This change introduces concurrent downloads of action outputs for remote caching/execution. So far, for an action we would download one output after the other which isn't as bad as it sounds as we would typically run dozens or hundreds of actions in parallel. However, for actions with a lot of outputs or graphs that allow limited parallelism we expect this change to positively impact performance. Note, that with this change the AbstractRemoteActionCache will attempt to always download all outputs concurrently. The actual parallelism is controlled by the underlying network transport. The gRPC transport currently enforces no limits on the concurrent calls, which should be fine given that all calls are multiplexed on a single network connection. The HTTP/1.1 transport also enforces no parallelism by default, but I have added the --remote_max_connections=INT flag which allows to specify an upper bound on the number of network connections to be open concurrently. I have introduced this flag as a defensive mechanism for users who's environment might enforce an upper bound on the number of open connections, as with this change its possible for the number of concurrently open connections to dramatically increase (from NumParallelActions to NumParallelActions * SumParallelActionOutputs). A side effect of this change is that it puts the infrastructure for retries and circuit breaking for the HttpBlobStore in place. RELNOTES: None PiperOrigin-RevId: 199005510
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java309
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java138
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java124
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java30
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java18
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java36
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java4
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/Retrier.java218
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java68
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java1
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD1
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java34
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java27
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java3
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/http/BUILD1
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java315
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java11
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/util/Utils.java43
18 files changed, 912 insertions, 469 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
index 1cd1ef9922..ef90223d83 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractRemoteActionCache.java
@@ -13,6 +13,14 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.EnvironmentalExecException;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.UserExecException;
@@ -35,10 +43,13 @@ import com.google.devtools.remoteexecution.v1test.OutputDirectory;
import com.google.devtools.remoteexecution.v1test.OutputFile;
import com.google.devtools.remoteexecution.v1test.Tree;
import com.google.protobuf.ByteString;
+import io.grpc.Context;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -48,12 +59,23 @@ import javax.annotation.Nullable;
/** A cache for storing artifacts (input and output) as well as the output of running an action. */
@ThreadSafety.ThreadSafe
public abstract class AbstractRemoteActionCache implements AutoCloseable {
+
+ private static final ListenableFuture<Void> COMPLETED_SUCCESS = SettableFuture.create();
+ private static final ListenableFuture<byte[]> EMPTY_BYTES = SettableFuture.create();
+
+ static {
+ ((SettableFuture<Void>) COMPLETED_SUCCESS).set(null);
+ ((SettableFuture<byte[]>) EMPTY_BYTES).set(new byte[0]);
+ }
+
protected final RemoteOptions options;
protected final DigestUtil digestUtil;
+ private final Retrier retrier;
- public AbstractRemoteActionCache(RemoteOptions options, DigestUtil digestUtil) {
+ public AbstractRemoteActionCache(RemoteOptions options, DigestUtil digestUtil, Retrier retrier) {
this.options = options;
this.digestUtil = digestUtil;
+ this.retrier = retrier;
}
/**
@@ -101,23 +123,40 @@ public abstract class AbstractRemoteActionCache implements AutoCloseable {
throws ExecException, IOException, InterruptedException;
/**
- * Download a remote blob to a local destination.
+ * Downloads a blob with a content hash {@code digest} to {@code out}.
*
- * @param digest The digest of the remote blob.
- * @param dest The path to the local file.
- * @throws IOException if download failed.
+ * @return a future that completes after the download completes (succeeds / fails).
*/
- protected abstract void downloadBlob(Digest digest, Path dest)
- throws IOException, InterruptedException;
+ protected abstract ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out);
/**
- * Download a remote blob and store it in memory.
+ * Downloads a blob with content hash {@code digest} and stores its content in memory.
*
- * @param digest The digest of the remote blob.
- * @return The remote blob.
- * @throws IOException if download failed.
+ * @return a future that completes after the download completes (succeeds / fails). If successful,
+ * the content is stored in the future's {@code byte[]}.
*/
- protected abstract byte[] downloadBlob(Digest digest) throws IOException, InterruptedException;
+ public ListenableFuture<byte[]> downloadBlob(Digest digest) {
+ if (digest.getSizeBytes() == 0) {
+ return EMPTY_BYTES;
+ }
+ ByteArrayOutputStream bOut = new ByteArrayOutputStream((int) digest.getSizeBytes());
+ SettableFuture<byte[]> outerF = SettableFuture.create();
+ Futures.addCallback(
+ downloadBlob(digest, bOut),
+ new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void aVoid) {
+ outerF.set(bOut.toByteArray());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ outerF.setException(t);
+ }
+ },
+ MoreExecutors.directExecutor());
+ return outerF;
+ }
/**
* Download the output files and directory trees of a remotely executed action to the local
@@ -132,22 +171,66 @@ public abstract class AbstractRemoteActionCache implements AutoCloseable {
public void download(ActionResult result, Path execRoot, FileOutErr outErr)
throws ExecException, IOException, InterruptedException {
try {
+ Context ctx = Context.current();
+ List<FuturePathBooleanTuple> fileDownloads =
+ Collections.synchronizedList(
+ new ArrayList<>(result.getOutputFilesCount() + result.getOutputDirectoriesCount()));
for (OutputFile file : result.getOutputFilesList()) {
Path path = execRoot.getRelative(file.getPath());
- downloadFile(path, file.getDigest(), file.getIsExecutable(), file.getContent());
+ ListenableFuture<Void> download =
+ retrier.executeAsync(
+ () -> ctx.call(() -> downloadFile(path, file.getDigest(), file.getContent())));
+ fileDownloads.add(new FuturePathBooleanTuple(download, path, file.getIsExecutable()));
}
+
+ List<ListenableFuture<Void>> dirDownloads =
+ new ArrayList<>(result.getOutputDirectoriesCount());
for (OutputDirectory dir : result.getOutputDirectoriesList()) {
- byte[] b = downloadBlob(dir.getTreeDigest());
- Tree tree = Tree.parseFrom(b);
- Map<Digest, Directory> childrenMap = new HashMap<>();
- for (Directory child : tree.getChildrenList()) {
- childrenMap.put(digestUtil.compute(child), child);
+ SettableFuture<Void> dirDownload = SettableFuture.create();
+ ListenableFuture<byte[]> protoDownload =
+ retrier.executeAsync(() -> ctx.call(() -> downloadBlob(dir.getTreeDigest())));
+ Futures.addCallback(
+ protoDownload,
+ new FutureCallback<byte[]>() {
+ @Override
+ public void onSuccess(byte[] b) {
+ try {
+ Tree tree = Tree.parseFrom(b);
+ Map<Digest, Directory> childrenMap = new HashMap<>();
+ for (Directory child : tree.getChildrenList()) {
+ childrenMap.put(digestUtil.compute(child), child);
+ }
+ Path path = execRoot.getRelative(dir.getPath());
+ fileDownloads.addAll(downloadDirectory(path, tree.getRoot(), childrenMap, ctx));
+ dirDownload.set(null);
+ } catch (IOException e) {
+ dirDownload.setException(e);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ dirDownload.setException(t);
+ }
+ },
+ MoreExecutors.directExecutor());
+ dirDownloads.add(dirDownload);
+ }
+
+ fileDownloads.addAll(downloadOutErr(result, outErr, ctx));
+
+ for (ListenableFuture<Void> dirDownload : dirDownloads) {
+ // Block on all directory download futures, so that we can be sure that we have discovered
+ // all file downloads and can subsequently safely iterate over the list of file downloads.
+ getFromFuture(dirDownload);
+ }
+
+ for (FuturePathBooleanTuple download : fileDownloads) {
+ getFromFuture(download.getFuture());
+ if (download.getPath() != null) {
+ download.getPath().setExecutable(download.isExecutable());
}
- Path path = execRoot.getRelative(dir.getPath());
- downloadDirectory(path, tree.getRoot(), childrenMap);
}
- // TODO(ulfjack): use same code as above also for stdout / stderr if applicable.
- downloadOutErr(result, outErr);
} catch (IOException downloadException) {
try {
// Delete any (partially) downloaded output files, since any subsequent local execution
@@ -178,19 +261,51 @@ public abstract class AbstractRemoteActionCache implements AutoCloseable {
}
}
+ /** Tuple of {@code ListenableFuture, Path, boolean}. */
+ private static class FuturePathBooleanTuple {
+ private final ListenableFuture<?> future;
+ private final Path path;
+ private final boolean isExecutable;
+
+ public FuturePathBooleanTuple(ListenableFuture<?> future, Path path, boolean isExecutable) {
+ this.future = future;
+ this.path = path;
+ this.isExecutable = isExecutable;
+ }
+
+ public ListenableFuture<?> getFuture() {
+ return future;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public boolean isExecutable() {
+ return isExecutable;
+ }
+ }
+
/**
* Download a directory recursively. The directory is represented by a {@link Directory} protobuf
* message, and the descendant directories are in {@code childrenMap}, accessible through their
* digest.
*/
- private void downloadDirectory(Path path, Directory dir, Map<Digest, Directory> childrenMap)
- throws IOException, InterruptedException {
+ private List<FuturePathBooleanTuple> downloadDirectory(
+ Path path, Directory dir, Map<Digest, Directory> childrenMap, Context ctx)
+ throws IOException {
// Ensure that the directory is created here even though the directory might be empty
- FileSystemUtils.createDirectoryAndParents(path);
+ path.createDirectoryAndParents();
+ List<FuturePathBooleanTuple> downloads = new ArrayList<>(dir.getFilesCount());
for (FileNode child : dir.getFilesList()) {
Path childPath = path.getRelative(child.getName());
- downloadFile(childPath, child.getDigest(), child.getIsExecutable(), null);
+ downloads.add(
+ new FuturePathBooleanTuple(
+ retrier.executeAsync(
+ () -> ctx.call(() -> downloadFile(childPath, child.getDigest(), null))),
+ childPath,
+ child.getIsExecutable()));
}
for (DirectoryNode child : dir.getDirectoriesList()) {
@@ -207,55 +322,93 @@ public abstract class AbstractRemoteActionCache implements AutoCloseable {
+ childDigest
+ "not found");
}
- downloadDirectory(childPath, childDir, childrenMap);
+ downloads.addAll(downloadDirectory(childPath, childDir, childrenMap, ctx));
}
+
+ return downloads;
}
/**
* Download a file (that is not a directory). If the {@code content} is not given, the content is
* fetched from the digest.
*/
- public void downloadFile(
- Path path, Digest digest, boolean isExecutable, @Nullable ByteString content)
- throws IOException, InterruptedException {
- FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
+ public ListenableFuture<Void> downloadFile(Path path, Digest digest, @Nullable ByteString content)
+ throws IOException {
+ Preconditions.checkNotNull(path.getParentDirectory()).createDirectoryAndParents();
if (digest.getSizeBytes() == 0) {
// Handle empty file locally.
FileSystemUtils.writeContent(path, new byte[0]);
- } else {
- if (content != null && !content.isEmpty()) {
- try (OutputStream stream = path.getOutputStream()) {
- content.writeTo(stream);
- }
- } else {
- downloadBlob(digest, path);
- Digest receivedDigest = digestUtil.compute(path);
- if (!receivedDigest.equals(digest)) {
- throw new IOException("Digest does not match " + receivedDigest + " != " + digest);
- }
+ return COMPLETED_SUCCESS;
+ }
+
+ if (content != null && !content.isEmpty()) {
+ try (OutputStream stream = path.getOutputStream()) {
+ content.writeTo(stream);
}
+ return COMPLETED_SUCCESS;
}
- path.setExecutable(isExecutable);
+
+ OutputStream out = new LazyFileOutputStream(path);
+ SettableFuture<Void> outerF = SettableFuture.create();
+ ListenableFuture<Void> f = downloadBlob(digest, out);
+ Futures.addCallback(
+ f,
+ new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ try {
+ out.close();
+ outerF.set(null);
+ } catch (IOException e) {
+ outerF.setException(e);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ outerF.setException(t);
+ try {
+ out.close();
+ } catch (IOException e) {
+ // Intentionally left empty. The download already failed, so we can ignore
+ // the error on close().
+ }
+ }
+ },
+ MoreExecutors.directExecutor());
+ return outerF;
}
- private void downloadOutErr(ActionResult result, FileOutErr outErr)
- throws IOException, InterruptedException {
+ private List<FuturePathBooleanTuple> downloadOutErr(
+ ActionResult result, FileOutErr outErr, Context ctx) throws IOException {
+ List<FuturePathBooleanTuple> downloads = new ArrayList<>();
if (!result.getStdoutRaw().isEmpty()) {
result.getStdoutRaw().writeTo(outErr.getOutputStream());
outErr.getOutputStream().flush();
} else if (result.hasStdoutDigest()) {
- byte[] stdoutBytes = downloadBlob(result.getStdoutDigest());
- outErr.getOutputStream().write(stdoutBytes);
- outErr.getOutputStream().flush();
+ downloads.add(
+ new FuturePathBooleanTuple(
+ retrier.executeAsync(
+ () ->
+ ctx.call(
+ () -> downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()))),
+ null,
+ false));
}
if (!result.getStderrRaw().isEmpty()) {
result.getStderrRaw().writeTo(outErr.getErrorStream());
outErr.getErrorStream().flush();
} else if (result.hasStderrDigest()) {
- byte[] stderrBytes = downloadBlob(result.getStderrDigest());
- outErr.getErrorStream().write(stderrBytes);
- outErr.getErrorStream().flush();
+ downloads.add(
+ new FuturePathBooleanTuple(
+ retrier.executeAsync(
+ () ->
+ ctx.call(
+ () -> downloadBlob(result.getStderrDigest(), outErr.getErrorStream()))),
+ null,
+ false));
}
+ return downloads;
}
/** UploadManifest adds output metadata to a {@link ActionResult}. */
@@ -291,8 +444,7 @@ public abstract class AbstractRemoteActionCache implements AutoCloseable {
* <p>Attempting to a upload symlink results in a {@link
* com.google.build.lib.actions.ExecException}, since cachable actions shouldn't emit symlinks.
*/
- public void addFiles(Collection<Path> files)
- throws ExecException, IOException, InterruptedException {
+ public void addFiles(Collection<Path> files) throws ExecException, IOException {
for (Path file : files) {
// TODO(ulfjack): Maybe pass in a SpawnResult here, add a list of output files to that, and
// rely on the local spawn runner to stat the files, instead of statting here.
@@ -398,4 +550,55 @@ public abstract class AbstractRemoteActionCache implements AutoCloseable {
/** Release resources associated with the cache. The cache may not be used after calling this. */
@Override
public abstract void close();
+
+ /**
+ * Creates an {@link OutputStream} that isn't actually opened until the first data is written.
+ * This is useful to only have as many open file descriptors as necessary at a time to avoid
+ * running into system limits.
+ */
+ private static class LazyFileOutputStream extends OutputStream {
+
+ private final Path path;
+ private OutputStream out;
+
+ public LazyFileOutputStream(Path path) {
+ this.path = path;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ ensureOpen();
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ ensureOpen();
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ ensureOpen();
+ out.write(b);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ ensureOpen();
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ ensureOpen();
+ out.close();
+ }
+
+ private void ensureOpen() throws IOException {
+ if (out == null) {
+ out = path.getOutputStream();
+ }
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 75d58ad83c..001ba22713 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -18,7 +18,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.bytestream.ByteStreamGrpc;
@@ -28,8 +27,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableScheduledFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.Retrier.RetryException;
@@ -42,7 +39,6 @@ import io.grpc.ClientCall;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
-import io.grpc.StatusException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -51,7 +47,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -71,7 +66,6 @@ final class ByteStreamUploader {
private final CallCredentials callCredentials;
private final long callTimeoutSecs;
private final RemoteRetrier retrier;
- private final ListeningScheduledExecutorService retryService;
private final Object lock = new Object();
@@ -92,17 +86,13 @@ final class ByteStreamUploader {
* @param callTimeoutSecs the timeout in seconds after which a {@code Write} gRPC call must be
* complete. The timeout resets between retries
* @param retrier the {@link RemoteRetrier} whose backoff strategy to use for retry timings.
- * @param retryService the executor service to schedule retries on. It's the responsibility of the
- * caller to properly shutdown the service after use. Users should avoid shutting down the
- * service before {@link #shutdown()} has been called
*/
public ByteStreamUploader(
@Nullable String instanceName,
Channel channel,
@Nullable CallCredentials callCredentials,
long callTimeoutSecs,
- RemoteRetrier retrier,
- ListeningScheduledExecutorService retryService) {
+ RemoteRetrier retrier) {
checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0.");
this.instanceName = instanceName;
@@ -110,7 +100,6 @@ final class ByteStreamUploader {
this.callCredentials = callCredentials;
this.callTimeoutSecs = callTimeoutSecs;
this.retrier = retrier;
- this.retryService = retryService;
}
/**
@@ -192,27 +181,29 @@ final class ByteStreamUploader {
}
@VisibleForTesting
- ListenableFuture<Void> uploadBlobAsync(Chunker chunker)
- throws IOException {
+ ListenableFuture<Void> uploadBlobAsync(Chunker chunker) {
Digest digest = checkNotNull(chunker.digest());
synchronized (lock) {
checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");
- ListenableFuture<Void> uploadResult = uploadsInProgress.get(digest);
- if (uploadResult == null) {
- uploadResult = SettableFuture.create();
- uploadResult.addListener(
- () -> {
- synchronized (lock) {
- uploadsInProgress.remove(digest);
- }
- },
- MoreExecutors.directExecutor());
- startAsyncUploadWithRetry(
- chunker, retrier.newBackoff(), (SettableFuture<Void>) uploadResult);
- uploadsInProgress.put(digest, uploadResult);
+ ListenableFuture<Void> inProgress = uploadsInProgress.get(digest);
+ if (inProgress != null) {
+ return inProgress;
}
+
+ final SettableFuture<Void> uploadResult = SettableFuture.create();
+ uploadResult.addListener(
+ () -> {
+ synchronized (lock) {
+ uploadsInProgress.remove(digest);
+ }
+ },
+ MoreExecutors.directExecutor());
+ Context ctx = Context.current();
+ retrier.executeAsync(
+ () -> ctx.call(() -> startAsyncUpload(chunker, uploadResult)), uploadResult);
+ uploadsInProgress.put(digest, uploadResult);
return uploadResult;
}
}
@@ -224,77 +215,23 @@ final class ByteStreamUploader {
}
}
- private void startAsyncUploadWithRetry(
- Chunker chunker, Retrier.Backoff backoffTimes, SettableFuture<Void> overallUploadResult) {
-
- AsyncUpload.Listener listener =
- new AsyncUpload.Listener() {
- @Override
- public void success() {
- overallUploadResult.set(null);
- }
-
- @Override
- public void failure(Status status) {
- StatusException cause = status.asException();
- long nextDelayMillis = backoffTimes.nextDelayMillis();
- if (nextDelayMillis < 0 || !retrier.isRetriable(cause)) {
- // Out of retries or status not retriable.
- RetryException error =
- new RetryException(
- "Out of retries or status not retriable.",
- backoffTimes.getRetryAttempts(),
- cause);
- overallUploadResult.setException(error);
- } else {
- retryAsyncUpload(nextDelayMillis, chunker, backoffTimes, overallUploadResult);
- }
- }
-
- private void retryAsyncUpload(
- long nextDelayMillis,
- Chunker chunker,
- Retrier.Backoff backoffTimes,
- SettableFuture<Void> overallUploadResult) {
- try {
- ListenableScheduledFuture<?> schedulingResult =
- retryService.schedule(
- Context.current()
- .wrap(
- () ->
- startAsyncUploadWithRetry(
- chunker, backoffTimes, overallUploadResult)),
- nextDelayMillis,
- MILLISECONDS);
- // In case the scheduled execution errors, we need to notify the overallUploadResult.
- schedulingResult.addListener(
- () -> {
- try {
- schedulingResult.get();
- } catch (Exception e) {
- overallUploadResult.setException(
- new RetryException(
- "Scheduled execution errored.", backoffTimes.getRetryAttempts(), e));
- }
- },
- MoreExecutors.directExecutor());
- } catch (RejectedExecutionException e) {
- // May be thrown by .schedule(...) if i.e. the executor is shutdown.
- overallUploadResult.setException(
- new RetryException("Rejected by executor.", backoffTimes.getRetryAttempts(), e));
- }
- }
- };
-
+ /**
+ * Starts a file upload an returns a future representing the upload. The {@code
+ * overallUploadResult} future propagates cancellations from the caller to the upload.
+ */
+ private ListenableFuture<Void> startAsyncUpload(
+ Chunker chunker, ListenableFuture<Void> overallUploadResult) {
+ SettableFuture<Void> currUpload = SettableFuture.create();
try {
chunker.reset();
} catch (IOException e) {
- overallUploadResult.setException(e);
- return;
+ currUpload.setException(e);
+ return currUpload;
}
AsyncUpload newUpload =
- new AsyncUpload(channel, callCredentials, callTimeoutSecs, instanceName, chunker, listener);
+ new AsyncUpload(
+ channel, callCredentials, callTimeoutSecs, instanceName, chunker, currUpload);
overallUploadResult.addListener(
() -> {
if (overallUploadResult.isCancelled()) {
@@ -303,22 +240,17 @@ final class ByteStreamUploader {
},
MoreExecutors.directExecutor());
newUpload.start();
+ return currUpload;
}
private static class AsyncUpload {
- interface Listener {
- void success();
-
- void failure(Status status);
- }
-
private final Channel channel;
private final CallCredentials callCredentials;
private final long callTimeoutSecs;
private final String instanceName;
private final Chunker chunker;
- private final Listener listener;
+ private final SettableFuture<Void> uploadResult;
private ClientCall<WriteRequest, WriteResponse> call;
@@ -328,13 +260,13 @@ final class ByteStreamUploader {
long callTimeoutSecs,
String instanceName,
Chunker chunker,
- Listener listener) {
+ SettableFuture<Void> uploadResult) {
this.channel = channel;
this.callCredentials = callCredentials;
this.callTimeoutSecs = callTimeoutSecs;
this.instanceName = instanceName;
this.chunker = chunker;
- this.listener = listener;
+ this.uploadResult = uploadResult;
}
void start() {
@@ -358,9 +290,9 @@ final class ByteStreamUploader {
@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
- listener.success();
+ uploadResult.set(null);
} else {
- listener.failure(status);
+ uploadResult.setException(status.asRuntimeException());
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
index 8f65021709..48bd4c8845 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
@@ -15,15 +15,16 @@
package com.google.devtools.build.lib.remote;
import com.google.bytestream.ByteStreamGrpc;
-import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
import com.google.bytestream.ByteStreamProto.ReadRequest;
import com.google.bytestream.ByteStreamProto.ReadResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.hash.HashingOutputStream;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.MetadataProvider;
@@ -51,16 +52,14 @@ import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
-import java.io.ByteArrayOutputStream;
+import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
@@ -70,8 +69,6 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache {
private final Channel channel;
private final RemoteRetrier retrier;
private final ByteStreamUploader uploader;
- private final ListeningScheduledExecutorService retryScheduler =
- MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
@VisibleForTesting
public GrpcRemoteCache(
@@ -80,13 +77,14 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache {
RemoteOptions options,
RemoteRetrier retrier,
DigestUtil digestUtil) {
- super(options, digestUtil);
+ super(options, digestUtil, retrier);
this.credentials = credentials;
this.channel = channel;
this.retrier = retrier;
- uploader = new ByteStreamUploader(options.remoteInstanceName, channel, credentials,
- options.remoteTimeout, retrier, retryScheduler);
+ uploader =
+ new ByteStreamUploader(
+ options.remoteInstanceName, channel, credentials, options.remoteTimeout, retrier);
}
private ContentAddressableStorageBlockingStub casBlockingStub() {
@@ -96,8 +94,8 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache {
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
- private ByteStreamBlockingStub bsBlockingStub() {
- return ByteStreamGrpc.newBlockingStub(channel)
+ private ByteStreamStub bsAsyncStub() {
+ return ByteStreamGrpc.newStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(credentials)
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
@@ -112,7 +110,6 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache {
@Override
public void close() {
- retryScheduler.shutdownNow();
uploader.shutdown();
}
@@ -174,62 +171,63 @@ public class GrpcRemoteCache extends AbstractRemoteActionCache {
uploader.uploadBlobs(toUpload);
}
- /**
- * This method can throw {@link StatusRuntimeException}, but the RemoteCache interface does not
- * allow throwing such an exception. Any caller must make sure to catch the
- * {@link StatusRuntimeException}. Note that the retrier implicitly catches it, so if this is used
- * in the context of {@link RemoteRetrier#execute}, that's perfectly safe.
- */
- private void readBlob(Digest digest, OutputStream stream)
- throws IOException, StatusRuntimeException {
+ @Override
+ protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
String resourceName = "";
if (!options.remoteInstanceName.isEmpty()) {
resourceName += options.remoteInstanceName + "/";
}
resourceName += "blobs/" + digestUtil.toString(digest);
- Iterator<ReadResponse> replies = bsBlockingStub()
- .read(ReadRequest.newBuilder().setResourceName(resourceName).build());
- while (replies.hasNext()) {
- replies.next().getData().writeTo(stream);
- }
- }
- @Override
- protected void downloadBlob(Digest digest, Path dest) throws IOException, InterruptedException {
- try {
- retrier.execute(
- () -> {
- try (OutputStream stream = dest.getOutputStream()) {
- readBlob(digest, stream);
- }
- return null;
- });
- } catch (RetryException e) {
- if (RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND)) {
- throw new CacheNotFoundException(digest, digestUtil);
- }
- throw e;
- }
- }
+ HashingOutputStream hashOut = digestUtil.newHashingOutputStream(out);
+ SettableFuture<Void> outerF = SettableFuture.create();
+ bsAsyncStub()
+ .read(
+ ReadRequest.newBuilder().setResourceName(resourceName).build(),
+ new StreamObserver<ReadResponse>() {
+ @Override
+ public void onNext(ReadResponse readResponse) {
+ try {
+ readResponse.getData().writeTo(hashOut);
+ } catch (IOException e) {
+ outerF.setException(e);
+ // Cancel the call.
+ throw new RuntimeException(e);
+ }
+ }
- @Override
- protected byte[] downloadBlob(Digest digest) throws IOException, InterruptedException {
- if (digest.getSizeBytes() == 0) {
- return new byte[0];
- }
- try {
- return retrier.execute(
- () -> {
- ByteArrayOutputStream stream = new ByteArrayOutputStream((int) digest.getSizeBytes());
- readBlob(digest, stream);
- return stream.toByteArray();
- });
- } catch (RetryException e) {
- if (RemoteRetrierUtils.causedByStatus(e, Status.Code.NOT_FOUND)) {
- throw new CacheNotFoundException(digest, digestUtil);
- }
- throw e;
- }
+ @Override
+ public void onError(Throwable t) {
+ if (t instanceof StatusRuntimeException
+ && ((StatusRuntimeException) t).getStatus().getCode()
+ == Status.NOT_FOUND.getCode()) {
+ outerF.setException(new CacheNotFoundException(digest, digestUtil));
+ } else {
+ outerF.setException(t);
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ String expectedHash = digest.getHash();
+ String actualHash = DigestUtil.hashCodeToString(hashOut.hash());
+ if (!expectedHash.equals(actualHash)) {
+ String msg =
+ String.format(
+ "Expected hash '%s' does not match received hash '%s'.",
+ expectedHash, actualHash);
+ outerF.setException(new IOException(msg));
+ } else {
+ try {
+ out.flush();
+ outerF.set(null);
+ } catch (IOException e) {
+ outerF.setException(e);
+ }
+ }
+ }
+ });
+ return outerF;
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index 11a6119bae..9201c94858 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -16,6 +16,8 @@ package com.google.devtools.build.lib.remote;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.authandtls.GoogleAuthUtils;
import com.google.devtools.build.lib.buildeventstream.PathConverter;
@@ -40,6 +42,7 @@ import com.google.devtools.remoteexecution.v1test.Digest;
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import java.io.IOException;
+import java.util.concurrent.Executors;
import java.util.logging.Logger;
/** RemoteModule provides distributed cache and remote execution for Bazel. */
@@ -87,7 +90,8 @@ public final class RemoteModule extends BlazeModule {
}
private final CasPathConverter converter = new CasPathConverter();
-
+ private final ListeningScheduledExecutorService retryScheduler =
+ MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
private RemoteActionContextProvider actionContextProvider;
@Override
@@ -149,11 +153,14 @@ public final class RemoteModule extends BlazeModule {
logger = new LoggingInterceptor(rpcLogFile, env.getRuntime().getClock());
}
- RemoteRetrier retrier =
- new RemoteRetrier(
- remoteOptions, RemoteRetrier.RETRIABLE_GRPC_ERRORS, Retrier.ALLOW_ALL_CALLS);
final AbstractRemoteActionCache cache;
if (enableBlobStoreCache) {
+ Retrier retrier =
+ new Retrier(
+ () -> Retrier.RETRIES_DISABLED,
+ (e) -> false,
+ retryScheduler,
+ Retrier.ALLOW_ALL_CALLS);
cache =
new SimpleBlobStoreActionCache(
remoteOptions,
@@ -161,6 +168,7 @@ public final class RemoteModule extends BlazeModule {
remoteOptions,
GoogleAuthUtils.newCredentials(authAndTlsOptions),
env.getWorkingDirectory()),
+ retrier,
digestUtil);
} else if (enableGrpcCache || remoteOptions.remoteExecutor != null) {
// If a remote executor but no remote cache is specified, assume both at the same target.
@@ -169,6 +177,12 @@ public final class RemoteModule extends BlazeModule {
if (logger != null) {
ch = ClientInterceptors.intercept(ch, logger);
}
+ RemoteRetrier retrier =
+ new RemoteRetrier(
+ remoteOptions,
+ RemoteRetrier.RETRIABLE_GRPC_ERRORS,
+ retryScheduler,
+ Retrier.ALLOW_ALL_CALLS);
cache =
new GrpcRemoteCache(
ch,
@@ -180,11 +194,15 @@ public final class RemoteModule extends BlazeModule {
cache = null;
}
- // TODO(davido): The naming is wrong here. "Remote"-prefix in RemoteActionCache class has no
- // meaning.
final GrpcRemoteExecutor executor;
if (remoteOptions.remoteExecutor != null) {
Channel ch = GoogleAuthUtils.newChannel(remoteOptions.remoteExecutor, authAndTlsOptions);
+ RemoteRetrier retrier =
+ new RemoteRetrier(
+ remoteOptions,
+ RemoteRetrier.RETRIABLE_GRPC_ERRORS,
+ retryScheduler,
+ Retrier.ALLOW_ALL_CALLS);
if (logger != null) {
ch = ClientInterceptors.intercept(ch, logger);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
index ebef95e8b2..f3fa147291 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
@@ -36,16 +36,14 @@ public final class RemoteOptions extends OptionsBase {
public String remoteHttpCache;
@Option(
- name = "remote_rest_cache_pool_size",
- defaultValue = "20",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- help = "Size of the HTTP pool for making requests to the REST cache.",
- deprecationWarning =
- "The value will be ignored and the option will be removed in the next "
- + "release. Bazel selects the ideal pool size automatically."
- )
- public int restCachePoolSize;
+ name = "remote_max_connections",
+ defaultValue = "0",
+ documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
+ effectTags = {OptionEffectTag.HOST_MACHINE_RESOURCE_OPTIMIZATIONS},
+ help =
+ "The max. number of concurrent network connections to the remote cache/executor. By "
+ + "default Bazel selects the ideal number of connections automatically.")
+ public int remoteMaxConnections;
@Option(
name = "remote_executor",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
index 27de8b7305..11d24811ed 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRetrier.java
@@ -16,6 +16,7 @@ package com.google.devtools.build.lib.remote;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
@@ -28,8 +29,7 @@ import java.util.function.Supplier;
/**
* Specific retry logic for remote execution/caching.
*
- * <p>A call can disable retries by throwing a {@link PassThroughException}.
- * <code>
+ * <p>A call can disable retries by throwing a {@link PassThroughException}. <code>
* RemoteRetrier r = ...;
* try {
* r.execute(() -> {
@@ -42,7 +42,7 @@ import java.util.function.Supplier;
* }
* </code>
*/
-class RemoteRetrier extends Retrier {
+public class RemoteRetrier extends Retrier {
/**
* Wraps around an {@link Exception} to make it pass through a single layer of retries.
@@ -75,24 +75,36 @@ class RemoteRetrier extends Retrier {
}
};
- public RemoteRetrier(RemoteOptions options, Predicate<? super Exception> shouldRetry,
+ public RemoteRetrier(
+ RemoteOptions options,
+ Predicate<? super Exception> shouldRetry,
+ ListeningScheduledExecutorService retryScheduler,
CircuitBreaker circuitBreaker) {
- this(options.experimentalRemoteRetry
- ? () -> new ExponentialBackoff(options)
- : () -> RETRIES_DISABLED,
+ this(
+ options.experimentalRemoteRetry
+ ? () -> new ExponentialBackoff(options)
+ : () -> RETRIES_DISABLED,
shouldRetry,
+ retryScheduler,
circuitBreaker);
}
- public RemoteRetrier(Supplier<Backoff> backoff, Predicate<? super Exception> shouldRetry,
+ public RemoteRetrier(
+ Supplier<Backoff> backoff,
+ Predicate<? super Exception> shouldRetry,
+ ListeningScheduledExecutorService retryScheduler,
CircuitBreaker circuitBreaker) {
- super(backoff, supportPassthrough(shouldRetry), circuitBreaker);
+ super(backoff, supportPassthrough(shouldRetry), retryScheduler, circuitBreaker);
}
@VisibleForTesting
- RemoteRetrier(Supplier<Backoff> backoff, Predicate<? super Exception> shouldRetry,
- CircuitBreaker circuitBreaker, Sleeper sleeper) {
- super(backoff, supportPassthrough(shouldRetry), circuitBreaker, sleeper);
+ RemoteRetrier(
+ Supplier<Backoff> backoff,
+ Predicate<? super Exception> shouldRetry,
+ ListeningScheduledExecutorService retryScheduler,
+ CircuitBreaker circuitBreaker,
+ Sleeper sleeper) {
+ super(backoff, supportPassthrough(shouldRetry), retryScheduler, circuitBreaker, sleeper);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index fb28fffe96..f533cf7d16 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -14,6 +14,8 @@
package com.google.devtools.build.lib.remote;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
@@ -234,7 +236,7 @@ class RemoteSpawnRunner implements SpawnRunner {
logPath = parent.getRelative(e.getKey());
logCount++;
try {
- remoteCache.downloadFile(logPath, e.getValue().getDigest(), false, null);
+ getFromFuture(remoteCache.downloadFile(logPath, e.getValue().getDigest(), null));
} catch (IOException ex) {
reportOnce(Event.warn("Failed downloading server logs from the remote cache."));
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
index 770098cc5f..a329c0475e 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Retrier.java
@@ -15,9 +15,19 @@
package com.google.devtools.build.lib.remote;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AsyncCallable;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableScheduledFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State;
import java.io.IOException;
import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -30,13 +40,10 @@ import javax.annotation.concurrent.ThreadSafe;
* delay between executions is specified by a {@link Backoff}. Additionally, the retrier supports
* circuit breaking to stop execution in case of high failure rates.
*/
-// TODO(buchgr): Move to a different package and use it for BES code.
@ThreadSafe
-class Retrier {
+public class Retrier {
- /**
- * A backoff strategy.
- */
+ /** A backoff strategy. */
public interface Backoff {
/**
@@ -55,12 +62,12 @@ class Retrier {
/**
* The circuit breaker allows to reject execution when failure rates are high.
*
- * <p>The initial state of a circuit breaker is the {@link State#ACCEPT_CALLS}. Calls are
- * executed and retried in this state. However, if error rates are high a circuit breaker can
- * choose to transition into {@link State#REJECT_CALLS}. In this state any calls are rejected with
- * a {@link RetryException} immediately. A circuit breaker in state {@link State#REJECT_CALLS}
- * can periodically return a {@code TRIAL_CALL} state, in which case a call will be executed once
- * and in case of success the circuit breaker may return to state {@code ACCEPT_CALLS}.
+ * <p>The initial state of a circuit breaker is the {@link State#ACCEPT_CALLS}. Calls are executed
+ * and retried in this state. However, if error rates are high a circuit breaker can choose to
+ * transition into {@link State#REJECT_CALLS}. In this state any calls are rejected with a {@link
+ * RetryException} immediately. A circuit breaker in state {@link State#REJECT_CALLS} can
+ * periodically return a {@code TRIAL_CALL} state, in which case a call will be executed once and
+ * in case of success the circuit breaker may return to state {@code ACCEPT_CALLS}.
*
* <p>A circuit breaker implementation must be thread-safe.
*
@@ -68,6 +75,7 @@ class Retrier {
*/
public interface CircuitBreaker {
+ /** The state of the circuit breaker. */
enum State {
/**
* Calls are executed and retried in case of failure.
@@ -91,26 +99,28 @@ class Retrier {
REJECT_CALLS
}
- /**
- * Returns the current {@link State} of the circuit breaker.
- */
+ /** Returns the current {@link State} of the circuit breaker. */
State state();
- /**
- * Called after an execution failed.
- */
+ /** Called after an execution failed. */
void recordFailure();
- /**
- * Called after an execution succeeded.
- */
+ /** Called after an execution succeeded. */
void recordSuccess();
}
+ /**
+ * {@link Sleeper#sleep(long)} is called to pause between synchronous retries ({@link
+ * #execute(Callable)}.
+ */
public interface Sleeper {
void sleep(long millis) throws InterruptedException;
}
+ /**
+ * Wraps around the actual cause for the retry. Contains information about the number of retry
+ * attempts.
+ */
public static class RetryException extends IOException {
private final int attempts;
@@ -126,14 +136,15 @@ class Retrier {
}
/**
- * Returns the number of times a {@link Callable} has been executed before this exception
- * was thrown.
+ * Returns the number of times a {@link Callable} has been executed before this exception was
+ * thrown.
*/
public int getAttempts() {
return attempts;
}
}
+ /** Thrown if the call was stopped by a circuit breaker. */
public static class CircuitBreakerException extends RetryException {
private CircuitBreakerException(String message, int numRetries, Exception cause) {
@@ -145,48 +156,60 @@ class Retrier {
}
}
- public static final CircuitBreaker ALLOW_ALL_CALLS = new CircuitBreaker() {
- @Override
- public State state() {
- return State.ACCEPT_CALLS;
- }
+ /** Disables circuit breaking. */
+ public static final CircuitBreaker ALLOW_ALL_CALLS =
+ new CircuitBreaker() {
+ @Override
+ public State state() {
+ return State.ACCEPT_CALLS;
+ }
- @Override
- public void recordFailure() {
- }
+ @Override
+ public void recordFailure() {}
- @Override
- public void recordSuccess() {
- }
- };
+ @Override
+ public void recordSuccess() {}
+ };
- public static final Backoff RETRIES_DISABLED = new Backoff() {
- @Override
- public long nextDelayMillis() {
- return -1;
- }
+ /** Disables retries. */
+ public static final Backoff RETRIES_DISABLED =
+ new Backoff() {
+ @Override
+ public long nextDelayMillis() {
+ return -1;
+ }
- @Override
- public int getRetryAttempts() {
- return 0;
- }
- };
+ @Override
+ public int getRetryAttempts() {
+ return 0;
+ }
+ };
private final Supplier<Backoff> backoffSupplier;
private final Predicate<? super Exception> shouldRetry;
private final CircuitBreaker circuitBreaker;
+ private final ListeningScheduledExecutorService retryService;
private final Sleeper sleeper;
- public Retrier(Supplier<Backoff> backoffSupplier, Predicate<? super Exception> shouldRetry,
+ public Retrier(
+ Supplier<Backoff> backoffSupplier,
+ Predicate<? super Exception> shouldRetry,
+ ListeningScheduledExecutorService retryScheduler,
CircuitBreaker circuitBreaker) {
- this(backoffSupplier, shouldRetry, circuitBreaker, TimeUnit.MILLISECONDS::sleep);
+ this(
+ backoffSupplier, shouldRetry, retryScheduler, circuitBreaker, TimeUnit.MILLISECONDS::sleep);
}
@VisibleForTesting
- Retrier(Supplier<Backoff> backoffSupplier, Predicate<? super Exception> shouldRetry,
- CircuitBreaker circuitBreaker, Sleeper sleeper) {
+ Retrier(
+ Supplier<Backoff> backoffSupplier,
+ Predicate<? super Exception> shouldRetry,
+ ListeningScheduledExecutorService retryService,
+ CircuitBreaker circuitBreaker,
+ Sleeper sleeper) {
this.backoffSupplier = backoffSupplier;
this.shouldRetry = shouldRetry;
+ this.retryService = retryService;
this.circuitBreaker = circuitBreaker;
this.sleeper = sleeper;
}
@@ -197,13 +220,13 @@ class Retrier {
*
* <p>{@link InterruptedException} is not retried.
*
- * @param call the {@link Callable} to execute.
+ * @param call the {@link Callable} to execute.
* @throws RetryException if the {@code call} didn't succeed within the framework specified by
- * {@code backoffSupplier} and {@code shouldRetry}.
- * @throws CircuitBreakerException in case a call was rejected because the circuit breaker
- * tripped.
+ * {@code backoffSupplier} and {@code shouldRetry}.
+ * @throws CircuitBreakerException in case a call was rejected because the circuit breaker
+ * tripped.
* @throws InterruptedException if the {@code call} throws an {@link InterruptedException} or the
- * current thread's interrupted flag is set.
+ * current thread's interrupted flag is set.
*/
public <T> T execute(Callable<T> call) throws RetryException, InterruptedException {
final Backoff backoff = newBackoff();
@@ -230,8 +253,8 @@ class Retrier {
e = (Exception) e.getCause();
}
if (State.TRIAL_CALL.equals(circuitState)) {
- throw new CircuitBreakerException("Call failed in circuit breaker half open state.", 0,
- e);
+ throw new CircuitBreakerException(
+ "Call failed in circuit breaker half open state.", 0, e);
}
int attempts = backoff.getRetryAttempts();
if (!shouldRetry.test(e)) {
@@ -247,8 +270,89 @@ class Retrier {
}
}
- //TODO(buchgr): Add executeAsync to be used by ByteStreamUploader
- // <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call, ScheduledExecutorService executor)
+ /**
+ * Executes an {@link AsyncCallable}, retrying execution in case of failure and returning a {@link
+ * ListenableFuture} pointing to the result/error.
+ */
+ public <T> ListenableFuture<T> executeAsync(AsyncCallable<T> call) {
+ SettableFuture<T> f = SettableFuture.create();
+ executeAsync(call, f);
+ return f;
+ }
+
+ /**
+ * Executes an {@link AsyncCallable}, retrying execution in case of failure and uses the provided
+ * {@code promise} to point to the result/error.
+ */
+ public <T> void executeAsync(AsyncCallable<T> call, SettableFuture<T> promise) {
+ Preconditions.checkNotNull(call);
+ Preconditions.checkNotNull(promise);
+ Backoff backoff = newBackoff();
+ executeAsync(call, promise, backoff);
+ }
+
+ private <T> void executeAsync(AsyncCallable<T> call, SettableFuture<T> outerF, Backoff backoff) {
+ Preconditions.checkState(!outerF.isDone(), "outerF completed already.");
+ try {
+ Futures.addCallback(
+ call.call(),
+ new FutureCallback<T>() {
+ @Override
+ public void onSuccess(T t) {
+ outerF.set(t);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ onExecuteAsyncFailure(t, call, outerF, backoff);
+ }
+ },
+ MoreExecutors.directExecutor());
+ } catch (Exception e) {
+ onExecuteAsyncFailure(e, call, outerF, backoff);
+ }
+ }
+
+ private <T> void onExecuteAsyncFailure(
+ Throwable t, AsyncCallable<T> call, SettableFuture<T> outerF, Backoff backoff) {
+ long waitMillis = backoff.nextDelayMillis();
+ if (waitMillis >= 0 && t instanceof Exception && isRetriable((Exception) t)) {
+ try {
+ ListenableScheduledFuture<?> sf =
+ retryService.schedule(
+ () -> executeAsync(call, outerF, backoff), waitMillis, TimeUnit.MILLISECONDS);
+ Futures.addCallback(
+ sf,
+ new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(Object o) {
+ // Submitted successfully. Intentionally left empty.
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ Exception e = t instanceof Exception ? (Exception) t : new Exception(t);
+ outerF.setException(
+ new RetryException(
+ "Scheduled execution errored.", backoff.getRetryAttempts(), e));
+ }
+ },
+ MoreExecutors.directExecutor());
+ } catch (RejectedExecutionException e) {
+ // May be thrown by .schedule(...) if i.e. the executor is shutdown.
+ outerF.setException(
+ new RetryException("Rejected by executor.", backoff.getRetryAttempts(), e));
+ }
+ } else {
+ Exception e = t instanceof Exception ? (Exception) t : new Exception(t);
+ String message =
+ waitMillis >= 0
+ ? "Status not retriable."
+ : "Exhaused retry attempts (" + backoff.getRetryAttempts() + ")";
+ RetryException error = new RetryException(message, backoff.getRetryAttempts(), e);
+ outerF.setException(error);
+ }
+ }
public Backoff newBackoff() {
return backoffSupplier.get();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index 803946cc61..b1b0d604df 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -14,6 +14,13 @@
package com.google.devtools.build.lib.remote;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.MetadataProvider;
@@ -24,7 +31,6 @@ import com.google.devtools.build.lib.remote.blobstore.SimpleBlobStore;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
import com.google.devtools.build.lib.util.io.FileOutErr;
-import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.remoteexecution.v1test.ActionResult;
import com.google.devtools.remoteexecution.v1test.Command;
@@ -57,8 +63,8 @@ public final class SimpleBlobStoreActionCache extends AbstractRemoteActionCache
private final SimpleBlobStore blobStore;
public SimpleBlobStoreActionCache(
- RemoteOptions options, SimpleBlobStore blobStore, DigestUtil digestUtil) {
- super(options, digestUtil);
+ RemoteOptions options, SimpleBlobStore blobStore, Retrier retrier, DigestUtil digestUtil) {
+ super(options, digestUtil, retrier);
this.blobStore = blobStore;
}
@@ -79,11 +85,12 @@ public final class SimpleBlobStoreActionCache extends AbstractRemoteActionCache
public void downloadTree(Digest rootDigest, Path rootLocation)
throws IOException, InterruptedException {
- FileSystemUtils.createDirectoryAndParents(rootLocation);
- Directory directory = Directory.parseFrom(downloadBlob(rootDigest));
+ rootLocation.createDirectoryAndParents();
+ Directory directory = Directory.parseFrom(getFromFuture(downloadBlob(rootDigest)));
for (FileNode file : directory.getFilesList()) {
- downloadFile(
- rootLocation.getRelative(file.getName()), file.getDigest(), file.getIsExecutable(), null);
+ Path dst = rootLocation.getRelative(file.getName());
+ getFromFuture(downloadFile(dst, file.getDigest(), null));
+ dst.setExecutable(file.getIsExecutable());
}
for (DirectoryNode child : directory.getDirectoriesList()) {
downloadTree(child.getDigest(), rootLocation.getRelative(child.getName()));
@@ -218,26 +225,31 @@ public final class SimpleBlobStoreActionCache extends AbstractRemoteActionCache
}
@Override
- protected void downloadBlob(Digest digest, Path dest) throws IOException, InterruptedException {
- try (OutputStream out = dest.getOutputStream()) {
- boolean success = blobStore.get(digest.getHash(), out);
- if (!success) {
- throw new CacheNotFoundException(digest, digestUtil);
- }
- }
- }
-
- @Override
- public byte[] downloadBlob(Digest digest) throws IOException, InterruptedException {
- if (digest.getSizeBytes() == 0) {
- return new byte[0];
- }
- try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- boolean success = blobStore.get(digest.getHash(), out);
- if (!success) {
- throw new CacheNotFoundException(digest, digestUtil);
- }
- return out.toByteArray();
- }
+ protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
+ SettableFuture<Void> outerF = SettableFuture.create();
+ Futures.addCallback(
+ blobStore.get(digest.getHash(), out),
+ new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean found) {
+ if (found) {
+ try {
+ out.flush();
+ outerF.set(null);
+ } catch (IOException e) {
+ outerF.setException(e);
+ }
+ } else {
+ outerF.setException(new CacheNotFoundException(digest, digestUtil));
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ outerF.setException(throwable);
+ }
+ },
+ MoreExecutors.directExecutor());
+ return outerF;
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
index 47367560e9..7d893fbd4a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
@@ -40,6 +40,7 @@ public final class SimpleBlobStoreFactory {
return new HttpBlobStore(
URI.create(options.remoteHttpCache),
(int) TimeUnit.SECONDS.toMillis(options.remoteTimeout),
+ options.remoteMaxConnections,
creds);
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD b/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD
index 247315bc08..cd0b3291bf 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD
@@ -11,6 +11,7 @@ java_library(
srcs = glob(["*.java"]),
tags = ["bazel"],
deps = [
+ "//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/common/options",
"//third_party:guava",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
index 28088d28e2..85cbc8779c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
@@ -15,10 +15,13 @@ package com.google.devtools.build.lib.remote.blobstore;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
/** A {@link SimpleBlobStore} implementation using a {@link ConcurrentMap}. */
public final class ConcurrentMapBlobStore implements SimpleBlobStore {
@@ -34,19 +37,26 @@ public final class ConcurrentMapBlobStore implements SimpleBlobStore {
}
@Override
- public boolean get(String key, OutputStream out) throws IOException {
+ public ListenableFuture<Boolean> get(String key, OutputStream out) {
byte[] data = map.get(key);
+ SettableFuture<Boolean> f = SettableFuture.create();
if (data == null) {
- return false;
+ f.set(false);
+ } else {
+ try {
+ out.write(data);
+ f.set(true);
+ } catch (IOException e) {
+ f.setException(e);
+ }
}
- out.write(data);
- return true;
+ return f;
}
@Override
public boolean getActionResult(String key, OutputStream out)
throws IOException, InterruptedException {
- return get(key, out);
+ return getFromFuture(get(key, out));
}
@Override
@@ -57,10 +67,22 @@ public final class ConcurrentMapBlobStore implements SimpleBlobStore {
}
@Override
- public void putActionResult(String key, byte[] in) throws IOException, InterruptedException {
+ public void putActionResult(String key, byte[] in) {
map.put(key, in);
}
@Override
public void close() {}
+
+ private static <T> T getFromFuture(ListenableFuture<T> f)
+ throws IOException, InterruptedException {
+ try {
+ return f.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new IOException(e.getCause());
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
index 011431013b..6556746ffc 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
@@ -13,7 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.blobstore;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+
import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.vfs.Path;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -35,21 +39,26 @@ public final class OnDiskBlobStore implements SimpleBlobStore {
}
@Override
- public boolean get(String key, OutputStream out) throws IOException {
- Path f = toPath(key);
- if (!f.exists()) {
- return false;
- }
- try (InputStream in = f.getInputStream()) {
- ByteStreams.copy(in, out);
+ public ListenableFuture<Boolean> get(String key, OutputStream out) {
+ SettableFuture<Boolean> f = SettableFuture.create();
+ Path p = toPath(key);
+ if (!p.exists()) {
+ f.set(false);
+ } else {
+ try (InputStream in = p.getInputStream()) {
+ ByteStreams.copy(in, out);
+ f.set(true);
+ } catch (IOException e) {
+ f.setException(e);
+ }
}
- return true;
+ return f;
}
@Override
public boolean getActionResult(String key, OutputStream out)
throws IOException, InterruptedException {
- return get(key, out);
+ return getFromFuture(get(key, out));
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
index b7e4db2c3a..3bf67460fa 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote.blobstore;
+import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -36,7 +37,7 @@ public interface SimpleBlobStore {
*
* @return {@code true} if the {@code key} was found. {@code false} otherwise.
*/
- boolean get(String key, OutputStream out) throws IOException, InterruptedException;
+ ListenableFuture<Boolean> get(String key, OutputStream out);
/**
* Fetches the BLOB associated with the {@code key} from the Action Cache and writes it to {@code
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/BUILD b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/BUILD
index e532922182..8ba1f8813e 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/BUILD
@@ -16,6 +16,7 @@ java_library(
],
deps = [
"//src/main/java/com/google/devtools/build/lib/remote/blobstore",
+ "//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/common/options",
"//third_party:auth",
"//third_party:guava",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
index 857dfe4f1d..a017d2a9c5 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
@@ -13,7 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.blobstore.http;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+
import com.google.auth.Credentials;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.blobstore.SimpleBlobStore;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@@ -23,6 +27,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
+import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.pool.SimpleChannelPool;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
@@ -39,6 +44,8 @@ import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
@@ -86,9 +93,9 @@ public final class HttpBlobStore implements SimpleBlobStore {
Pattern.compile("\\s*error\\s*=\\s*\"?invalid_token\"?");
private final NioEventLoopGroup eventLoop = new NioEventLoopGroup(2 /* number of threads */);
- private final ChannelPool downloadChannels;
- private final ChannelPool uploadChannels;
+ private final ChannelPool channelPool;
private final URI uri;
+ private final int timeoutMillis;
private final Object credentialsLock = new Object();
@@ -98,7 +105,9 @@ public final class HttpBlobStore implements SimpleBlobStore {
@GuardedBy("credentialsLock")
private long lastRefreshTime;
- public HttpBlobStore(URI uri, int timeoutMillis, @Nullable final Credentials creds)
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public HttpBlobStore(
+ URI uri, int timeoutMillis, int remoteMaxConnections, @Nullable final Credentials creds)
throws Exception {
boolean useTls = uri.getScheme().equals("https");
if (uri.getPort() == -1) {
@@ -129,52 +138,48 @@ public final class HttpBlobStore implements SimpleBlobStore {
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
.group(eventLoop)
.remoteAddress(uri.getHost(), uri.getPort());
- downloadChannels =
- new SimpleChannelPool(
- clientBootstrap,
- new ChannelPoolHandler() {
- @Override
- public void channelReleased(Channel ch) {
- ch.pipeline().remove("read-timeout-handler");
- }
+ ChannelPoolHandler channelPoolHandler =
+ new ChannelPoolHandler() {
+ @Override
+ public void channelReleased(Channel ch) {}
- @Override
- public void channelAcquired(Channel ch) {
- ch.pipeline()
- .addFirst("read-timeout-handler", new ReadTimeoutHandler(timeoutMillis));
- }
+ @Override
+ public void channelAcquired(Channel ch) {}
- @Override
- public void channelCreated(Channel ch) {
- ChannelPipeline p = ch.pipeline();
- p.addFirst("read-timeout-handler", new ReadTimeoutHandler(timeoutMillis));
- if (sslCtx != null) {
- SSLEngine engine = sslCtx.newEngine(ch.alloc());
- engine.setUseClientMode(true);
- p.addFirst(new SslHandler(engine));
- }
- p.addLast(new HttpClientCodec());
- p.addLast(new HttpDownloadHandler(creds));
+ @Override
+ public void channelCreated(Channel ch) {
+ ChannelPipeline p = ch.pipeline();
+ if (sslCtx != null) {
+ SSLEngine engine = sslCtx.newEngine(ch.alloc());
+ engine.setUseClientMode(true);
+ p.addFirst("ssl-handler", new SslHandler(engine));
+ }
+ }
+ };
+ if (remoteMaxConnections > 0) {
+ channelPool = new FixedChannelPool(clientBootstrap, channelPoolHandler, remoteMaxConnections);
+ } else {
+ channelPool = new SimpleChannelPool(clientBootstrap, channelPoolHandler);
+ }
+ this.creds = creds;
+ this.timeoutMillis = timeoutMillis;
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private Channel acquireUploadChannel() throws InterruptedException {
+ Promise<Channel> channelReady = eventLoop.next().newPromise();
+ channelPool
+ .acquire()
+ .addListener(
+ (Future<Channel> channelAcquired) -> {
+ if (!channelAcquired.isSuccess()) {
+ channelReady.setFailure(channelAcquired.cause());
+ return;
}
- });
- uploadChannels =
- new SimpleChannelPool(
- clientBootstrap,
- new ChannelPoolHandler() {
- @Override
- public void channelReleased(Channel ch) {}
-
- @Override
- public void channelAcquired(Channel ch) {}
-
- @Override
- public void channelCreated(Channel ch) {
+
+ try {
+ Channel ch = channelAcquired.getNow();
ChannelPipeline p = ch.pipeline();
- if (sslCtx != null) {
- SSLEngine engine = sslCtx.newEngine(ch.alloc());
- engine.setUseClientMode(true);
- p.addFirst(new SslHandler(engine));
- }
p.addLast(new HttpResponseDecoder());
// The 10KiB limit was chosen at random. We only expect HTTP servers to respond with
// an error message in the body and that should always be less than 10KiB.
@@ -182,26 +187,87 @@ public final class HttpBlobStore implements SimpleBlobStore {
p.addLast(new HttpRequestEncoder());
p.addLast(new ChunkedWriteHandler());
p.addLast(new HttpUploadHandler(creds));
+
+ channelReady.setSuccess(ch);
+ } catch (Throwable t) {
+ channelReady.setFailure(t);
}
});
- this.creds = creds;
+
+ try {
+ return channelReady.get();
+ } catch (ExecutionException e) {
+ PlatformDependent.throwException(e.getCause());
+ return null;
+ }
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void releaseUploadChannel(Channel ch) {
+ if (ch.isOpen()) {
+ ch.pipeline().remove(HttpResponseDecoder.class);
+ ch.pipeline().remove(HttpObjectAggregator.class);
+ ch.pipeline().remove(HttpRequestEncoder.class);
+ ch.pipeline().remove(ChunkedWriteHandler.class);
+ ch.pipeline().remove(HttpUploadHandler.class);
+ }
+ channelPool.release(ch);
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private Future<Channel> acquireDownloadChannel() {
+ Promise<Channel> channelReady = eventLoop.next().newPromise();
+ channelPool
+ .acquire()
+ .addListener(
+ (Future<Channel> channelAcquired) -> {
+ if (!channelAcquired.isSuccess()) {
+ channelReady.setFailure(channelAcquired.cause());
+ return;
+ }
+
+ try {
+ Channel ch = channelAcquired.getNow();
+ ChannelPipeline p = ch.pipeline();
+ ch.pipeline()
+ .addFirst("read-timeout-handler", new ReadTimeoutHandler(timeoutMillis));
+ p.addLast(new HttpClientCodec());
+ p.addLast(new HttpDownloadHandler(creds));
+
+ channelReady.setSuccess(ch);
+ } catch (Throwable t) {
+ channelReady.setFailure(t);
+ }
+ });
+
+ return channelReady;
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void releaseDownloadChannel(Channel ch) {
+ if (ch.isOpen()) {
+ // The channel might have been closed due to an error, in which case its pipeline
+ // has already been cleared. Closed channels can't be reused.
+ ch.pipeline().remove(ReadTimeoutHandler.class);
+ ch.pipeline().remove(HttpClientCodec.class);
+ ch.pipeline().remove(HttpDownloadHandler.class);
+ }
+ channelPool.release(ch);
}
@Override
- public boolean containsKey(String key) throws IOException, InterruptedException {
+ public boolean containsKey(String key) {
throw new UnsupportedOperationException("HTTP Caching does not use this method.");
}
@Override
- public boolean get(String key, OutputStream out) throws IOException, InterruptedException {
+ public ListenableFuture<Boolean> get(String key, OutputStream out) {
return get(key, out, true);
}
@SuppressWarnings("FutureReturnValueIgnored")
- private boolean get(String key, final OutputStream out, boolean casDownload)
- throws IOException, InterruptedException {
+ private ListenableFuture<Boolean> get(String key, final OutputStream out, boolean casDownload) {
final AtomicBoolean dataWritten = new AtomicBoolean();
-
OutputStream wrappedOut =
new OutputStream() {
// OutputStream.close() does nothing, which is what we want to ensure that the
@@ -226,62 +292,90 @@ public final class HttpBlobStore implements SimpleBlobStore {
}
};
DownloadCommand download = new DownloadCommand(uri, casDownload, key, wrappedOut);
+ SettableFuture<Boolean> outerF = SettableFuture.create();
+ acquireDownloadChannel()
+ .addListener(
+ (Future<Channel> chP) -> {
+ if (!chP.isSuccess()) {
+ outerF.setException(chP.cause());
+ return;
+ }
- Channel ch = null;
- try {
- ch = acquireDownloadChannel();
- ChannelFuture downloadFuture = ch.writeAndFlush(download);
- downloadFuture.sync();
- return true;
- } catch (Exception e) {
- // e can be of type HttpException, because Netty uses Unsafe.throwException to re-throw a
- // checked exception that hasn't been declared in the method signature.
- if (e instanceof HttpException) {
- HttpResponse response = ((HttpException) e).response();
- if (!dataWritten.get() && authTokenExpired(response)) {
- // The error is due to an auth token having expired. Let's try again.
- refreshCredentials();
- return getAfterCredentialRefresh(download);
- }
- if (cacheMiss(response.status())) {
- return false;
- }
- }
- throw e;
- } finally {
- if (ch != null) {
- downloadChannels.release(ch);
- }
- }
+ Channel ch = chP.getNow();
+ ch.writeAndFlush(download)
+ .addListener(
+ (f) -> {
+ try {
+ if (f.isSuccess()) {
+ outerF.set(true);
+ } else {
+ Throwable cause = f.cause();
+ // cause can be of type HttpException, because Netty uses
+ // Unsafe.throwException to
+ // re-throw a checked exception that hasn't been declared in the method
+ // signature.
+ if (cause instanceof HttpException) {
+ HttpResponse response = ((HttpException) cause).response();
+ if (!dataWritten.get() && authTokenExpired(response)) {
+ // The error is due to an auth token having expired. Let's try
+ // again.
+ refreshCredentials();
+ getAfterCredentialRefresh(download, outerF);
+ return;
+ } else if (cacheMiss(response.status())) {
+ outerF.set(false);
+ return;
+ }
+ }
+ outerF.setException(cause);
+ }
+ } finally {
+ releaseDownloadChannel(ch);
+ }
+ });
+ });
+ return outerF;
}
@SuppressWarnings("FutureReturnValueIgnored")
- private boolean getAfterCredentialRefresh(DownloadCommand cmd) throws InterruptedException {
- Channel ch = null;
- try {
- ch = acquireDownloadChannel();
- ChannelFuture downloadFuture = ch.writeAndFlush(cmd);
- downloadFuture.sync();
- return true;
- } catch (Exception e) {
- if (e instanceof HttpException) {
- HttpResponse response = ((HttpException) e).response();
- if (cacheMiss(response.status())) {
- return false;
- }
- }
- throw e;
- } finally {
- if (ch != null) {
- downloadChannels.release(ch);
- }
- }
+ private void getAfterCredentialRefresh(DownloadCommand cmd, SettableFuture<Boolean> outerF) {
+ acquireDownloadChannel()
+ .addListener(
+ (Future<Channel> chP) -> {
+ if (!chP.isSuccess()) {
+ outerF.setException(chP.cause());
+ return;
+ }
+
+ Channel ch = chP.getNow();
+ ch.writeAndFlush(cmd)
+ .addListener(
+ (f) -> {
+ try {
+ if (f.isSuccess()) {
+ outerF.set(true);
+ } else {
+ Throwable cause = f.cause();
+ if (cause instanceof HttpException) {
+ HttpResponse response = ((HttpException) cause).response();
+ if (cacheMiss(response.status())) {
+ outerF.set(false);
+ return;
+ }
+ }
+ outerF.setException(cause);
+ }
+ } finally {
+ releaseDownloadChannel(ch);
+ }
+ });
+ });
}
@Override
public boolean getActionResult(String actionKey, OutputStream out)
throws IOException, InterruptedException {
- return get(actionKey, out, false);
+ return getFromFuture(get(actionKey, out, false));
}
@Override
@@ -329,7 +423,7 @@ public final class HttpBlobStore implements SimpleBlobStore {
} finally {
in.close();
if (ch != null) {
- uploadChannels.release(ch);
+ releaseUploadChannel(ch);
}
}
}
@@ -343,7 +437,7 @@ public final class HttpBlobStore implements SimpleBlobStore {
uploadFuture.sync();
} finally {
if (ch != null) {
- uploadChannels.release(ch);
+ releaseUploadChannel(ch);
}
}
}
@@ -376,8 +470,7 @@ public final class HttpBlobStore implements SimpleBlobStore {
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public void close() {
- downloadChannels.close();
- uploadChannels.close();
+ channelPool.close();
eventLoop.shutdownGracefully();
}
@@ -403,24 +496,6 @@ public final class HttpBlobStore implements SimpleBlobStore {
}
}
- private Channel acquireDownloadChannel() throws InterruptedException {
- try {
- return downloadChannels.acquire().get();
- } catch (ExecutionException e) {
- PlatformDependent.throwException(e.getCause());
- return null;
- }
- }
-
- private Channel acquireUploadChannel() throws InterruptedException {
- try {
- return uploadChannels.acquire().get();
- } catch (ExecutionException e) {
- PlatformDependent.throwException(e.getCause());
- return null;
- }
- }
-
private void refreshCredentials() throws IOException {
synchronized (credentialsLock) {
long now = System.currentTimeMillis();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
index e4dc92a46f..ae8b109d55 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java
@@ -17,6 +17,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.base.Preconditions;
import com.google.common.hash.HashCode;
+import com.google.common.hash.HashingOutputStream;
+import com.google.common.io.BaseEncoding;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.MetadataProvider;
import com.google.devtools.build.lib.actions.cache.DigestUtils;
@@ -29,6 +31,7 @@ import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.Message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
/** Utility methods to work with {@link Digest}. */
public class DigestUtil {
@@ -106,6 +109,14 @@ public class DigestUtil {
return Digest.newBuilder().setHash(hexHash).setSizeBytes(size).build();
}
+ public static String hashCodeToString(HashCode hash) {
+ return BaseEncoding.base16().lowerCase().encode(hash.asBytes());
+ }
+
+ public HashingOutputStream newHashingOutputStream(OutputStream out) {
+ return new HashingOutputStream(hashFn.getHash(), out);
+ }
+
public String toString(Digest digest) {
return digest.getHash() + "/" + digest.getSizeBytes();
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
new file mode 100644
index 0000000000..95a02abfdd
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
@@ -0,0 +1,43 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote.util;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+/** Utility methods for the remote package. * */
+public class Utils {
+
+ private Utils() {}
+
+ /**
+ * Returns the result of a {@link ListenableFuture} if successful, or throws any checked {@link
+ * Exception} directly if it's an {@link IOException} or else wraps it in an {@link IOException}.
+ */
+ public static <T> T getFromFuture(ListenableFuture<T> f)
+ throws IOException, InterruptedException {
+ try {
+ return f.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) e.getCause();
+ }
+ throw new IOException(e.getCause());
+ }
+ }
+}