diff options
author | buchgr <buchgr@google.com> | 2018-06-02 14:13:43 -0700 |
---|---|---|
committer | Copybara-Service <copybara-piper@google.com> | 2018-06-02 14:15:06 -0700 |
commit | ff008f445905bf6f4601a368782b620f7899d322 (patch) | |
tree | 7fbfe2ef3d3e794680d12ee42f5d4e0016b6b736 /src/tools | |
parent | aaf11e91a02a2f42d8bf26cce76df941c8afc8e2 (diff) |
remote: concurrent blob downloads. Fixes #5215
This change introduces concurrent downloads of action outputs
for remote caching/execution. So far, for an action we would
download one output after the other which isn't as bad as it
sounds as we would typically run dozens or hundreds of actions
in parallel. However, for actions with a lot of outputs or graphs
that allow limited parallelism we expect this change to positively
impact performance.
Note, that with this change the AbstractRemoteActionCache will
attempt to always download all outputs concurrently. The actual
parallelism is controlled by the underlying network transport.
The gRPC transport currently enforces no limits on the concurrent
calls, which should be fine given that all calls are multiplexed
on a single network connection. The HTTP/1.1 transport also
enforces no parallelism by default, but I have added the
--remote_max_connections=INT flag which allows to specify an upper
bound on the number of network connections to be open concurrently.
I have introduced this flag as a defensive mechanism for users
who's environment might enforce an upper bound on the number of open
connections, as with this change its possible for the number of
concurrently open connections to dramatically increase (from
NumParallelActions to NumParallelActions * SumParallelActionOutputs).
A side effect of this change is that it puts the infrastructure
for retries and circuit breaking for the HttpBlobStore in place.
RELNOTES: None
PiperOrigin-RevId: 199005510
Diffstat (limited to 'src/tools')
3 files changed, 20 insertions, 3 deletions
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java index 3fbbd1461a..4010243579 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java @@ -14,6 +14,7 @@ package com.google.devtools.build.remote.worker; +import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; import static java.util.logging.Level.SEVERE; import static java.util.logging.Level.WARNING; @@ -80,7 +81,7 @@ final class ByteStreamServer extends ByteStreamImplBase { try { // This still relies on the blob size to be small enough to fit in memory. // TODO(olaola): refactor to fix this if the need arises. - Chunker c = new Chunker(cache.downloadBlob(digest), digestUtil); + Chunker c = new Chunker(getFromFuture(cache.downloadBlob(digest)), digestUtil); while (c.hasNext()) { responseObserver.onNext( ReadResponse.newBuilder().setData(c.next().getData()).build()); diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java index 0912b1f375..f9506987f6 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java @@ -14,6 +14,7 @@ package com.google.devtools.build.remote.worker; +import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; import static java.util.logging.Level.FINE; import static java.util.logging.Level.INFO; import static java.util.logging.Level.SEVERE; @@ -178,7 +179,7 @@ final class ExecutionServer extends ExecutionImplBase { try { command = com.google.devtools.remoteexecution.v1test.Command.parseFrom( - cache.downloadBlob(action.getCommandDigest())); + getFromFuture(cache.downloadBlob(action.getCommandDigest()))); cache.downloadTree(action.getInputRootDigest(), execRoot); } catch (CacheNotFoundException e) { throw StatusUtils.notFoundError(e.getMissingDigest()); diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java index a55d2d9a6a..6f6b2c111e 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java @@ -24,7 +24,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.remote.RemoteOptions; +import com.google.devtools.build.lib.remote.RemoteRetrier; +import com.google.devtools.build.lib.remote.Retrier; import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; import com.google.devtools.build.lib.remote.SimpleBlobStoreFactory; import com.google.devtools.build.lib.remote.blobstore.ConcurrentMapBlobStore; @@ -66,6 +70,7 @@ import java.io.OutputStreamWriter; import java.io.Writer; import java.nio.charset.StandardCharsets; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; @@ -267,18 +272,28 @@ public final class RemoteWorker { blobStore = new ConcurrentMapBlobStore(new ConcurrentHashMap<String, byte[]>()); } + ListeningScheduledExecutorService retryService = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + + RemoteRetrier retrier = + new RemoteRetrier( + remoteOptions, + RemoteRetrier.RETRIABLE_GRPC_ERRORS, + retryService, + Retrier.ALLOW_ALL_CALLS); DigestUtil digestUtil = new DigestUtil(fs.getDigestFunction()); RemoteWorker worker = new RemoteWorker( fs, remoteWorkerOptions, - new SimpleBlobStoreActionCache(remoteOptions, blobStore, digestUtil), + new SimpleBlobStoreActionCache(remoteOptions, blobStore, retrier, digestUtil), sandboxPath, digestUtil); final Server server = worker.startServer(); worker.createPidFile(); server.awaitTermination(); + retryService.shutdownNow(); } private static Path prepareSandboxRunner(FileSystem fs, RemoteWorkerOptions remoteWorkerOptions) { |