aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java18
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java6
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java8
-rwxr-xr-xsrc/test/shell/bazel/remote_execution_test.sh34
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java33
5 files changed, 81 insertions, 18 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java b/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java
index 45d76fc832..a6b992b866 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java
@@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import io.grpc.CallCredentials;
import io.grpc.auth.MoreCallCredentials;
+import io.grpc.internal.GrpcUtil;
import io.grpc.netty.GrpcSslContexts;
import io.netty.handler.ssl.SslContext;
import java.io.File;
@@ -32,20 +33,24 @@ import javax.net.ssl.SSLException;
/** Instantiate all authentication helpers from build options. */
@ThreadSafe
public final class ChannelOptions {
+ private final int maxMessageSize;
private final boolean tlsEnabled;
private final SslContext sslContext;
private final String tlsAuthorityOverride;
private final CallCredentials credentials;
+ private static final int CHUNK_MESSAGE_OVERHEAD = 1024;
private ChannelOptions(
boolean tlsEnabled,
SslContext sslContext,
String tlsAuthorityOverride,
- CallCredentials credentials) {
+ CallCredentials credentials,
+ int maxMessageSize) {
this.tlsEnabled = tlsEnabled;
this.sslContext = sslContext;
this.tlsAuthorityOverride = tlsAuthorityOverride;
this.credentials = credentials;
+ this.maxMessageSize = maxMessageSize;
}
public boolean tlsEnabled() {
@@ -64,6 +69,10 @@ public final class ChannelOptions {
return sslContext;
}
+ public int maxMessageSize() {
+ return maxMessageSize;
+ }
+
public static ChannelOptions create(RemoteOptions options) {
try {
return create(
@@ -107,6 +116,11 @@ public final class ChannelOptions {
"Failed initializing auth credentials for remote cache/execution " + e);
}
}
- return new ChannelOptions(tlsEnabled, sslContext, tlsAuthorityOverride, credentials);
+ final int maxMessageSize =
+ Math.max(
+ GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE,
+ options.grpcMaxChunkSizeBytes + CHUNK_MESSAGE_OVERHEAD);
+ return new ChannelOptions(
+ tlsEnabled, sslContext, tlsAuthorityOverride, credentials, maxMessageSize);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
index fa3647aba0..0c76aa3106 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
@@ -73,9 +73,9 @@ public final class RemoteOptions extends OptionsBase {
@Option(
name = "grpc_max_chunk_size_bytes",
- defaultValue = "400000", // <4MB. Bounded by the gRPC size limit on the overall message.
+ defaultValue = "16000",
category = "remote",
- help = "The maximal number of bytes to be sent in a single message."
+ help = "The maximal number of data bytes to be sent in a single message."
)
public int grpcMaxChunkSizeBytes;
@@ -83,7 +83,7 @@ public final class RemoteOptions extends OptionsBase {
name = "grpc_max_batch_inputs",
defaultValue = "100",
category = "remote",
- help = "The maximal number of input file to be sent in a single batch."
+ help = "The maximal number of input files to be sent in a single batch."
)
public int grpcMaxBatchInputs;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
index d890829142..845f5411fe 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
@@ -23,9 +23,11 @@ import io.grpc.netty.NettyChannelBuilder;
@ThreadSafe
public final class RemoteUtils {
public static ManagedChannel createChannel(String target, ChannelOptions channelOptions) {
- NettyChannelBuilder builder = NettyChannelBuilder.forTarget(target);
- builder.negotiationType(
- channelOptions.tlsEnabled() ? NegotiationType.TLS : NegotiationType.PLAINTEXT);
+ NettyChannelBuilder builder =
+ NettyChannelBuilder.forTarget(target)
+ .negotiationType(
+ channelOptions.tlsEnabled() ? NegotiationType.TLS : NegotiationType.PLAINTEXT)
+ .maxMessageSize(channelOptions.maxMessageSize());
if (channelOptions.getSslContext() != null) {
builder.sslContext(channelOptions.getSslContext());
if (channelOptions.getTlsAuthorityOverride() != null) {
diff --git a/src/test/shell/bazel/remote_execution_test.sh b/src/test/shell/bazel/remote_execution_test.sh
index 7dd6949b64..f3514d2593 100755
--- a/src/test/shell/bazel/remote_execution_test.sh
+++ b/src/test/shell/bazel/remote_execution_test.sh
@@ -30,6 +30,7 @@ function set_up() {
${bazel_data}/src/tools/remote_worker/remote_worker \
--work_path=${work_path} \
--listen_port=${worker_port} \
+ --grpc_max_chunk_size_bytes=120000000 \
--hazelcast_standalone_listen_port=${hazelcast_port} \
--pid_file=${pid_file} >& $TEST_log &
local wait_seconds=0
@@ -128,6 +129,39 @@ EOF
|| fail "Remote cache generated different result"
}
+# Tests that the remote worker can return a 200MB blob that requires chunking.
+# Blob has to be that large in order to exceed the grpc default max message size.
+function test_genrule_large_output_chunking() {
+ mkdir -p a
+ cat > a/BUILD <<EOF
+package(default_visibility = ["//visibility:public"])
+genrule(
+name = "large_output",
+srcs = ["small_blob.txt"],
+outs = ["large_blob.txt"],
+cmd = "cp \$(location small_blob.txt) tmp.txt; " +
+"(for i in {1..22} ; do cat tmp.txt >> \$@; cp \$@ tmp.txt; done)",
+)
+EOF
+ cat > a/small_blob.txt <<EOF
+0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890
+EOF
+ bazel build //a:large_output >& $TEST_log \
+ || fail "Failed to build //a:large_output without remote execution"
+ cp -f bazel-genfiles/a/large_blob.txt ${TEST_TMPDIR}/large_blob_expected.txt
+
+ bazel clean --expunge
+ bazel --host_jvm_args=-Dbazel.DigestFunction=SHA1 build \
+ --spawn_strategy=remote \
+ --grpc_max_chunk_size_bytes=120000000 \
+ --remote_worker=localhost:${worker_port} \
+ --remote_cache=localhost:${worker_port} \
+ //a:large_output >& $TEST_log \
+ || fail "Failed to build //a:large_output with remote execution"
+ diff bazel-genfiles/a/large_blob.txt ${TEST_TMPDIR}/large_blob_expected.txt \
+ || fail "Remote execution generated different result"
+}
+
function test_cc_binary_rest_cache() {
mkdir -p a
cat > a/BUILD <<EOF
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
index 8d43da2ef1..b8a1447589 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
@@ -17,6 +17,8 @@ package com.google.devtools.build.remote;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.remote.CacheNotFoundException;
import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase;
+import com.google.devtools.build.lib.remote.ChannelOptions;
+import com.google.devtools.build.lib.remote.Chunker;
import com.google.devtools.build.lib.remote.ContentDigests;
import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceImplBase;
@@ -63,7 +65,6 @@ import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.JavaIoFileSystem;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.common.options.OptionsParser;
-import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import io.grpc.Server;
@@ -96,11 +97,16 @@ public class RemoteWorker {
private final ExecutionCacheServiceImplBase execCacheServer;
private final SimpleBlobStoreActionCache cache;
private final RemoteWorkerOptions workerOptions;
+ private final RemoteOptions remoteOptions;
- public RemoteWorker(RemoteWorkerOptions workerOptions, SimpleBlobStoreActionCache cache)
+ public RemoteWorker(
+ RemoteWorkerOptions workerOptions,
+ RemoteOptions remoteOptions,
+ SimpleBlobStoreActionCache cache)
throws IOException {
this.cache = cache;
this.workerOptions = workerOptions;
+ this.remoteOptions = remoteOptions;
if (workerOptions.workPath != null) {
Path workPath = getFileSystem().getPath(workerOptions.workPath);
FileSystemUtils.createDirectoryAndParents(workPath);
@@ -115,13 +121,13 @@ public class RemoteWorker {
public Server startServer() throws IOException {
NettyServerBuilder b =
NettyServerBuilder.forPort(workerOptions.listenPort)
+ .maxMessageSize(ChannelOptions.create(remoteOptions).maxMessageSize())
.addService(casServer)
.addService(execCacheServer);
if (execServer != null) {
b.addService(execServer);
} else {
- System.out.println(
- "*** Execution disabled, only serving cache requests.");
+ System.out.println("*** Execution disabled, only serving cache requests.");
}
Server server = b.build();
System.out.println(
@@ -199,17 +205,23 @@ public class RemoteWorker {
}
status.setSucceeded(true);
try {
+ // This still relies on the total blob size to be small enough to fit in memory
+ // simultaneously! TODO(olaola): refactor to fix this if the need arises.
+ Chunker.Builder b = new Chunker.Builder().chunkSize(remoteOptions.grpcMaxChunkSizeBytes);
for (ContentDigest digest : request.getDigestList()) {
- reply.setData(
- BlobChunk.newBuilder()
- .setDigest(digest)
- .setData(ByteString.copyFrom(cache.downloadBlob(digest)))
- .build());
+ b.addInput(cache.downloadBlob(digest));
+ }
+ Chunker c = b.build();
+ while (c.hasNext()) {
+ reply.setData(c.next());
responseObserver.onNext(reply.build());
if (reply.hasStatus()) {
reply.clearStatus(); // Only send status on first chunk.
}
}
+ } catch (IOException e) {
+ // This cannot happen, as we are chunking in-memory blobs.
+ throw new RuntimeException("Internal error: " + e);
} catch (CacheNotFoundException e) {
// This can only happen if an item gets evicted right after we check.
reply.clearData();
@@ -604,7 +616,8 @@ public class RemoteWorker {
new ConcurrentHashMap<String, byte[]>());
RemoteWorker worker =
- new RemoteWorker(remoteWorkerOptions, new SimpleBlobStoreActionCache(blobStore));
+ new RemoteWorker(
+ remoteWorkerOptions, remoteOptions, new SimpleBlobStoreActionCache(blobStore));
final Server server = worker.startServer();
final Path pidFile;