diff options
5 files changed, 81 insertions, 18 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java b/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java index 45d76fc832..a6b992b866 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import io.grpc.CallCredentials; import io.grpc.auth.MoreCallCredentials; +import io.grpc.internal.GrpcUtil; import io.grpc.netty.GrpcSslContexts; import io.netty.handler.ssl.SslContext; import java.io.File; @@ -32,20 +33,24 @@ import javax.net.ssl.SSLException; /** Instantiate all authentication helpers from build options. */ @ThreadSafe public final class ChannelOptions { + private final int maxMessageSize; private final boolean tlsEnabled; private final SslContext sslContext; private final String tlsAuthorityOverride; private final CallCredentials credentials; + private static final int CHUNK_MESSAGE_OVERHEAD = 1024; private ChannelOptions( boolean tlsEnabled, SslContext sslContext, String tlsAuthorityOverride, - CallCredentials credentials) { + CallCredentials credentials, + int maxMessageSize) { this.tlsEnabled = tlsEnabled; this.sslContext = sslContext; this.tlsAuthorityOverride = tlsAuthorityOverride; this.credentials = credentials; + this.maxMessageSize = maxMessageSize; } public boolean tlsEnabled() { @@ -64,6 +69,10 @@ public final class ChannelOptions { return sslContext; } + public int maxMessageSize() { + return maxMessageSize; + } + public static ChannelOptions create(RemoteOptions options) { try { return create( @@ -107,6 +116,11 @@ public final class ChannelOptions { "Failed initializing auth credentials for remote cache/execution " + e); } } - return new ChannelOptions(tlsEnabled, sslContext, tlsAuthorityOverride, credentials); + final int maxMessageSize = + Math.max( + GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE, + options.grpcMaxChunkSizeBytes + CHUNK_MESSAGE_OVERHEAD); + return new ChannelOptions( + tlsEnabled, sslContext, tlsAuthorityOverride, credentials, maxMessageSize); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java index fa3647aba0..0c76aa3106 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java @@ -73,9 +73,9 @@ public final class RemoteOptions extends OptionsBase { @Option( name = "grpc_max_chunk_size_bytes", - defaultValue = "400000", // <4MB. Bounded by the gRPC size limit on the overall message. + defaultValue = "16000", category = "remote", - help = "The maximal number of bytes to be sent in a single message." + help = "The maximal number of data bytes to be sent in a single message." ) public int grpcMaxChunkSizeBytes; @@ -83,7 +83,7 @@ public final class RemoteOptions extends OptionsBase { name = "grpc_max_batch_inputs", defaultValue = "100", category = "remote", - help = "The maximal number of input file to be sent in a single batch." + help = "The maximal number of input files to be sent in a single batch." ) public int grpcMaxBatchInputs; diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java index d890829142..845f5411fe 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java @@ -23,9 +23,11 @@ import io.grpc.netty.NettyChannelBuilder; @ThreadSafe public final class RemoteUtils { public static ManagedChannel createChannel(String target, ChannelOptions channelOptions) { - NettyChannelBuilder builder = NettyChannelBuilder.forTarget(target); - builder.negotiationType( - channelOptions.tlsEnabled() ? NegotiationType.TLS : NegotiationType.PLAINTEXT); + NettyChannelBuilder builder = + NettyChannelBuilder.forTarget(target) + .negotiationType( + channelOptions.tlsEnabled() ? NegotiationType.TLS : NegotiationType.PLAINTEXT) + .maxMessageSize(channelOptions.maxMessageSize()); if (channelOptions.getSslContext() != null) { builder.sslContext(channelOptions.getSslContext()); if (channelOptions.getTlsAuthorityOverride() != null) { diff --git a/src/test/shell/bazel/remote_execution_test.sh b/src/test/shell/bazel/remote_execution_test.sh index 7dd6949b64..f3514d2593 100755 --- a/src/test/shell/bazel/remote_execution_test.sh +++ b/src/test/shell/bazel/remote_execution_test.sh @@ -30,6 +30,7 @@ function set_up() { ${bazel_data}/src/tools/remote_worker/remote_worker \ --work_path=${work_path} \ --listen_port=${worker_port} \ + --grpc_max_chunk_size_bytes=120000000 \ --hazelcast_standalone_listen_port=${hazelcast_port} \ --pid_file=${pid_file} >& $TEST_log & local wait_seconds=0 @@ -128,6 +129,39 @@ EOF || fail "Remote cache generated different result" } +# Tests that the remote worker can return a 200MB blob that requires chunking. +# Blob has to be that large in order to exceed the grpc default max message size. +function test_genrule_large_output_chunking() { + mkdir -p a + cat > a/BUILD <<EOF +package(default_visibility = ["//visibility:public"]) +genrule( +name = "large_output", +srcs = ["small_blob.txt"], +outs = ["large_blob.txt"], +cmd = "cp \$(location small_blob.txt) tmp.txt; " + +"(for i in {1..22} ; do cat tmp.txt >> \$@; cp \$@ tmp.txt; done)", +) +EOF + cat > a/small_blob.txt <<EOF +0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890 +EOF + bazel build //a:large_output >& $TEST_log \ + || fail "Failed to build //a:large_output without remote execution" + cp -f bazel-genfiles/a/large_blob.txt ${TEST_TMPDIR}/large_blob_expected.txt + + bazel clean --expunge + bazel --host_jvm_args=-Dbazel.DigestFunction=SHA1 build \ + --spawn_strategy=remote \ + --grpc_max_chunk_size_bytes=120000000 \ + --remote_worker=localhost:${worker_port} \ + --remote_cache=localhost:${worker_port} \ + //a:large_output >& $TEST_log \ + || fail "Failed to build //a:large_output with remote execution" + diff bazel-genfiles/a/large_blob.txt ${TEST_TMPDIR}/large_blob_expected.txt \ + || fail "Remote execution generated different result" +} + function test_cc_binary_rest_cache() { mkdir -p a cat > a/BUILD <<EOF diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java index 8d43da2ef1..b8a1447589 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java @@ -17,6 +17,8 @@ package com.google.devtools.build.remote; import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.remote.CacheNotFoundException; import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase; +import com.google.devtools.build.lib.remote.ChannelOptions; +import com.google.devtools.build.lib.remote.Chunker; import com.google.devtools.build.lib.remote.ContentDigests; import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceImplBase; @@ -63,7 +65,6 @@ import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.JavaIoFileSystem; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.common.options.OptionsParser; -import com.google.protobuf.ByteString; import com.google.protobuf.Duration; import com.google.protobuf.util.Durations; import io.grpc.Server; @@ -96,11 +97,16 @@ public class RemoteWorker { private final ExecutionCacheServiceImplBase execCacheServer; private final SimpleBlobStoreActionCache cache; private final RemoteWorkerOptions workerOptions; + private final RemoteOptions remoteOptions; - public RemoteWorker(RemoteWorkerOptions workerOptions, SimpleBlobStoreActionCache cache) + public RemoteWorker( + RemoteWorkerOptions workerOptions, + RemoteOptions remoteOptions, + SimpleBlobStoreActionCache cache) throws IOException { this.cache = cache; this.workerOptions = workerOptions; + this.remoteOptions = remoteOptions; if (workerOptions.workPath != null) { Path workPath = getFileSystem().getPath(workerOptions.workPath); FileSystemUtils.createDirectoryAndParents(workPath); @@ -115,13 +121,13 @@ public class RemoteWorker { public Server startServer() throws IOException { NettyServerBuilder b = NettyServerBuilder.forPort(workerOptions.listenPort) + .maxMessageSize(ChannelOptions.create(remoteOptions).maxMessageSize()) .addService(casServer) .addService(execCacheServer); if (execServer != null) { b.addService(execServer); } else { - System.out.println( - "*** Execution disabled, only serving cache requests."); + System.out.println("*** Execution disabled, only serving cache requests."); } Server server = b.build(); System.out.println( @@ -199,17 +205,23 @@ public class RemoteWorker { } status.setSucceeded(true); try { + // This still relies on the total blob size to be small enough to fit in memory + // simultaneously! TODO(olaola): refactor to fix this if the need arises. + Chunker.Builder b = new Chunker.Builder().chunkSize(remoteOptions.grpcMaxChunkSizeBytes); for (ContentDigest digest : request.getDigestList()) { - reply.setData( - BlobChunk.newBuilder() - .setDigest(digest) - .setData(ByteString.copyFrom(cache.downloadBlob(digest))) - .build()); + b.addInput(cache.downloadBlob(digest)); + } + Chunker c = b.build(); + while (c.hasNext()) { + reply.setData(c.next()); responseObserver.onNext(reply.build()); if (reply.hasStatus()) { reply.clearStatus(); // Only send status on first chunk. } } + } catch (IOException e) { + // This cannot happen, as we are chunking in-memory blobs. + throw new RuntimeException("Internal error: " + e); } catch (CacheNotFoundException e) { // This can only happen if an item gets evicted right after we check. reply.clearData(); @@ -604,7 +616,8 @@ public class RemoteWorker { new ConcurrentHashMap<String, byte[]>()); RemoteWorker worker = - new RemoteWorker(remoteWorkerOptions, new SimpleBlobStoreActionCache(blobStore)); + new RemoteWorker( + remoteWorkerOptions, remoteOptions, new SimpleBlobStoreActionCache(blobStore)); final Server server = worker.startServer(); final Path pidFile; |