diff options
7 files changed, 417 insertions, 482 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java index 6867c222e5..6c3fbffbc2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java @@ -237,7 +237,9 @@ public final class ConcurrentMapFactory { } private static boolean isHazelcastOptions(RemoteOptions options) { - return options.hazelcastNode != null || options.hazelcastClientConfig != null; + return options.hazelcastNode != null + || options.hazelcastClientConfig != null + || options.hazelcastStandaloneListenPort != 0; } private static boolean isRestUrlOptions(RemoteOptions options) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/README.md b/src/main/java/com/google/devtools/build/lib/remote/README.md index a62fc38df2..5040732ffe 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/README.md +++ b/src/main/java/com/google/devtools/build/lib/remote/README.md @@ -125,27 +125,26 @@ startup --host_jvm_args=-Dbazel.DigestFunction=SHA1 #### Running the sample gRPC cache server -Bazel currently provides a sample gRPC CAS implementation with Hazelcast as caching backend. +Bazel currently provides a sample gRPC CAS implementation with a ConcurrentHashMap or Hazelcast as caching backend. To use it you need to clone from [Bazel](https://github.com/bazelbuild/bazel) and then build it. ``` -bazel build //src/tools/remote_worker:remote_cache +bazel build //src/tools/remote_worker:all ``` -The following command will then start the cache server listening on port 8081 with the default -Hazelcast settings. +The following command will then start the cache server listening on port 8080 using a local in-memory cache. ``` -bazel-bin/src/tools/remote_worker/remote_cache --listen_port 8081 +bazel-bin/src/tools/remote_worker/remote_worker --listen_port=8080 ``` -To run everything in a single command. +To connect to a running instance of Hazelcast instead, use. ``` -bazel run //src/tools/remote_worker:remote_cache -- --listen_port 8081 +bazel run //src/tools/remote_worker:remote_worker -- --listen_port=8080 --hazelcast_node=address:port ``` If you want to change Hazelcast settings to enable distributed memory cache you can provide your own hazelcast.xml with the following command. ``` -bazel-bin/src/tools/remote_worker/remote_cache --jvm_flags=-Dhazelcast.config=/path/to/hz.xml --listen_port 8081 +bazel-bin/src/tools/remote_worker/remote_worker --jvm_flags=-Dhazelcast.config=/path/to/hz.xml --listen_port 8080 ``` You can copy and edit the [default](https://github.com/hazelcast/hazelcast/blob/master/hazelcast/src/main/resources/hazelcast-default.xml) Hazelcast configuration. Refer to Hazelcast [manual](http://docs.hazelcast.org/docs/3.6/manual/html-single/index.html#checking-configuration) for more details. @@ -153,10 +152,10 @@ for more details. #### Using the gRPC CAS endpoint Use the following build options to use the gRPC CAS endpoint for sharing build artifacts. Change -`address:8081` to the correct server address and port number. +`address:8080` to the correct server address and port number. ``` -build --spawn_strategy=remote --remote_cache=address:8081 +build --spawn_strategy=remote --remote_cache=address:8080 ``` ### Distributed caching with Hazelcast (TO BE REMOVED) @@ -201,19 +200,13 @@ following line: startup --host_jvm_args=-Dbazel.DigestFunction=SHA1 ``` -## Running the sample gRPC cache server -``` -bazel build //src/tools/remote_worker:remote_cache -bazel-bin/src/tools/remote_worker/remote_cache --listen_port 8081 -``` - -## Running the sample gRPC remote worker +## Running the sample gRPC remote worker / cache server ``` bazel build //src/tools/remote_worker:remote_worker -bazel-bin/src/tools/remote_worker/remote_cache --work_path=/tmp --listen_port 8080 +bazel-bin/src/tools/remote_worker/remote_worker --work_path=/tmp --listen_port 8080 ``` -The sample gRPC cache server and gRPC remote worker both use Hazelcast and shares the **same +The sample gRPC cache server and gRPC remote worker share the **same distributed memory cluster** for storing and accessing CAS objects. It is important the CAS objects are shared between the two server processes. @@ -225,5 +218,5 @@ memory cluster. Use the following build options. ``` -build --spawn_strategy=remote --remote_worker=localhost:8080 --remote_cache=localhost:8081 +build --spawn_strategy=remote --remote_worker=localhost:8080 --remote_cache=localhost:8080 ``` diff --git a/src/test/shell/bazel/remote_execution_test.sh b/src/test/shell/bazel/remote_execution_test.sh index 897bdbb0e6..a33805037f 100755 --- a/src/test/shell/bazel/remote_execution_test.sh +++ b/src/test/shell/bazel/remote_execution_test.sh @@ -71,9 +71,9 @@ function test_cc_binary() { bazel --host_jvm_args=-Dbazel.DigestFunction=SHA1 build \ --spawn_strategy=remote \ - --hazelcast_node=localhost:${hazelcast_port} \ --remote_worker=localhost:${worker_port} \ - //a:test >& $TEST_log \ + --remote_cache=localhost:${worker_port} \ + //a:test >& $TEST_log \ || fail "Failed to build //a:test with remote execution" diff bazel-bin/a/test ${TEST_TMPDIR}/test_expected \ || fail "Remote execution generated different result" diff --git a/src/tools/remote_worker/BUILD b/src/tools/remote_worker/BUILD index a319119601..b8ed480ae1 100644 --- a/src/tools/remote_worker/BUILD +++ b/src/tools/remote_worker/BUILD @@ -10,10 +10,3 @@ java_binary( visibility = ["//visibility:public"], runtime_deps = ["//src/tools/remote_worker/src/main/java/com/google/devtools/build/remote"], ) - -java_binary( - name = "remote_cache", - main_class = "com.google.devtools.build.remote.RemoteCache", - visibility = ["//visibility:public"], - runtime_deps = ["//src/tools/remote_worker/src/main/java/com/google/devtools/build/remote"], -) diff --git a/src/tools/remote_worker/README.md b/src/tools/remote_worker/README.md index 7664d3345a..088ea7849e 100644 --- a/src/tools/remote_worker/README.md +++ b/src/tools/remote_worker/README.md @@ -1,5 +1,6 @@ This program implements a remote execution worker that uses gRPC to accept work -requests. It also serves as a Hazelcast server for distributed caching. +requests. It can work as a remote execution worker, a cache worker, or both. +The simplest setup is as follows: - First build remote_worker and run it. @@ -9,9 +10,13 @@ requests. It also serves as a Hazelcast server for distributed caching. - Then you run Bazel pointing to the remote_worker instance. - bazel build --hazelcast_node=127.0.0.1:5701 --spawn_strategy=remote \ - --remote_worker=127.0.0.1:8080 src/tools/generate_workspace:all + bazel --host_jvm_args=-Dbazel.DigestFunction=SHA1 build \ + --spawn_strategy=remote --remote_cache=localhost:8080 \ + --remote_worker=localhost:8080 src/tools/generate_workspace:all The above command will build generate_workspace with remote spawn strategy that -uses Hazelcast as the distributed caching backend and executes work remotely on -the localhost remote_worker. +uses the local worker as the distributed caching and execution backend. + +You can also use a Hazelcast server for the distributed cache as follows: +Suppose your Hazelcast server is listening on address:port. Then, run the +remote worker with --hazelcast_node=address:port. diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteCache.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteCache.java deleted file mode 100644 index 023145f230..0000000000 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteCache.java +++ /dev/null @@ -1,315 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.remote; - -import com.google.devtools.build.lib.remote.CacheNotFoundException; -import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase; -import com.google.devtools.build.lib.remote.ConcurrentMapActionCache; -import com.google.devtools.build.lib.remote.ConcurrentMapFactory; -import com.google.devtools.build.lib.remote.ContentDigests; -import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; -import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceImplBase; -import com.google.devtools.build.lib.remote.RemoteOptions; -import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; -import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus; -import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode; -import com.google.devtools.build.lib.util.Preconditions; -import com.google.devtools.common.options.OptionsParser; -import com.google.protobuf.ByteString; -import io.grpc.Server; -import io.grpc.ServerBuilder; -import io.grpc.stub.StreamObserver; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** A server which acts as a gRPC wrapper around concurrent map based remote cache. */ -public class RemoteCache { - private static final Logger LOG = Logger.getLogger(RemoteWorker.class.getName()); - private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER); - private static final int MAX_MEMORY_KBYTES = 512 * 1024; - private final ConcurrentMapActionCache cache; - private final CasServiceImplBase casServer; - private final ExecutionCacheServiceImplBase execCacheServer; - - public RemoteCache(ConcurrentMapActionCache cache) { - this.cache = cache; - casServer = new CasServer(); - execCacheServer = new ExecutionCacheServer(); - } - - public CasServiceImplBase getCasServer() { - return casServer; - } - - public ExecutionCacheServiceImplBase getExecCacheServer() { - return execCacheServer; - } - - class CasServer extends CasServiceImplBase { - @Override - public void lookup(CasLookupRequest request, StreamObserver<CasLookupReply> responseObserver) { - CasLookupReply.Builder reply = CasLookupReply.newBuilder(); - CasStatus.Builder status = reply.getStatusBuilder(); - for (ContentDigest digest : request.getDigestList()) { - if (!cache.containsKey(digest)) { - status.addMissingDigest(digest); - } - } - if (status.getMissingDigestCount() > 0) { - status.setSucceeded(false); - status.setError(CasStatus.ErrorCode.MISSING_DIGEST); - } else { - status.setSucceeded(true); - } - responseObserver.onNext(reply.build()); - responseObserver.onCompleted(); - } - - @Override - public void uploadTreeMetadata( - CasUploadTreeMetadataRequest request, - StreamObserver<CasUploadTreeMetadataReply> responseObserver) { - try { - for (FileNode treeNode : request.getTreeNodeList()) { - cache.uploadBlob(treeNode.toByteArray()); - } - responseObserver.onNext( - CasUploadTreeMetadataReply.newBuilder() - .setStatus(CasStatus.newBuilder().setSucceeded(true)) - .build()); - } catch (Exception e) { - LOG.warning("Request failed: " + e.toString()); - CasUploadTreeMetadataReply.Builder reply = CasUploadTreeMetadataReply.newBuilder(); - reply - .getStatusBuilder() - .setSucceeded(false) - .setError(CasStatus.ErrorCode.UNKNOWN) - .setErrorDetail(e.toString()); - responseObserver.onNext(reply.build()); - } finally { - responseObserver.onCompleted(); - } - } - - @Override - public void downloadBlob( - CasDownloadBlobRequest request, StreamObserver<CasDownloadReply> responseObserver) { - CasDownloadReply.Builder reply = CasDownloadReply.newBuilder(); - CasStatus.Builder status = reply.getStatusBuilder(); - for (ContentDigest digest : request.getDigestList()) { - if (!cache.containsKey(digest)) { - status.addMissingDigest(digest); - } - } - if (status.getMissingDigestCount() > 0) { - status.setSucceeded(false); - status.setError(CasStatus.ErrorCode.MISSING_DIGEST); - responseObserver.onNext(reply.build()); - responseObserver.onCompleted(); - return; - } - status.setSucceeded(true); - try { - for (ContentDigest digest : request.getDigestList()) { - reply.setData( - BlobChunk.newBuilder() - .setDigest(digest) - .setData(ByteString.copyFrom(cache.downloadBlob(digest))) - .build()); - responseObserver.onNext(reply.build()); - if (reply.hasStatus()) { - reply.clearStatus(); // Only send status on first chunk. - } - } - } catch (CacheNotFoundException e) { - // This can only happen if an item gets evicted right after we check. - reply.clearData(); - status.setSucceeded(false); - status.setError(CasStatus.ErrorCode.MISSING_DIGEST); - status.addMissingDigest(e.getMissingDigest()); - responseObserver.onNext(reply.build()); - } finally { - responseObserver.onCompleted(); - } - } - - @Override - public StreamObserver<CasUploadBlobRequest> uploadBlob( - final StreamObserver<CasUploadBlobReply> responseObserver) { - return new StreamObserver<CasUploadBlobRequest>() { - byte[] blob = null; - ContentDigest digest = null; - long offset = 0; - - @Override - public void onNext(CasUploadBlobRequest request) { - BlobChunk chunk = request.getData(); - try { - if (chunk.hasDigest()) { - // Check if the previous chunk was really done. - Preconditions.checkArgument( - digest == null || offset == 0, - "Missing input chunk for digest %s", - digest == null ? "" : ContentDigests.toString(digest)); - digest = chunk.getDigest(); - // This unconditionally downloads the whole blob into memory! - Preconditions.checkArgument((int) (digest.getSizeBytes() / 1024) < MAX_MEMORY_KBYTES); - blob = new byte[(int) digest.getSizeBytes()]; - } - Preconditions.checkArgument(digest != null, "First chunk contains no digest"); - Preconditions.checkArgument( - offset == chunk.getOffset(), - "Missing input chunk for digest %s", - ContentDigests.toString(digest)); - if (digest.getSizeBytes() > 0) { - chunk.getData().copyTo(blob, (int) offset); - offset = (offset + chunk.getData().size()) % digest.getSizeBytes(); - } - if (offset == 0) { - ContentDigest uploadedDigest = cache.uploadBlob(blob); - Preconditions.checkArgument( - uploadedDigest.equals(digest), - "Digest mismatch: client sent %s, server computed %s", - ContentDigests.toString(digest), - ContentDigests.toString(uploadedDigest)); - } - } catch (Exception e) { - LOG.warning("Request failed: " + e.toString()); - CasUploadBlobReply.Builder reply = CasUploadBlobReply.newBuilder(); - reply - .getStatusBuilder() - .setSucceeded(false) - .setError( - e instanceof IllegalArgumentException - ? CasStatus.ErrorCode.INVALID_ARGUMENT - : CasStatus.ErrorCode.UNKNOWN) - .setErrorDetail(e.toString()); - responseObserver.onNext(reply.build()); - } - } - - @Override - public void onError(Throwable t) { - LOG.warning("Request errored remotely: " + t); - } - - @Override - public void onCompleted() { - responseObserver.onCompleted(); - } - }; - } - } - - class ExecutionCacheServer extends ExecutionCacheServiceImplBase { - @Override - public void getCachedResult( - ExecutionCacheRequest request, StreamObserver<ExecutionCacheReply> responseObserver) { - try { - ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest()); - ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder(); - ActionResult result = cache.getCachedActionResult(actionKey); - if (result != null) { - reply.setResult(result); - } - reply.getStatusBuilder().setSucceeded(true); - responseObserver.onNext(reply.build()); - } catch (Exception e) { - LOG.warning("getCachedActionResult request failed: " + e.toString()); - ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder(); - reply - .getStatusBuilder() - .setSucceeded(false) - .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN); - responseObserver.onNext(reply.build()); - } finally { - responseObserver.onCompleted(); - } - } - - @Override - public void setCachedResult( - ExecutionCacheSetRequest request, StreamObserver<ExecutionCacheSetReply> responseObserver) { - try { - ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest()); - cache.setCachedActionResult(actionKey, request.getResult()); - ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder(); - reply.getStatusBuilder().setSucceeded(true); - responseObserver.onNext(reply.build()); - } catch (Exception e) { - LOG.warning("setCachedActionResult request failed: " + e.toString()); - ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder(); - reply - .getStatusBuilder() - .setSucceeded(false) - .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN); - responseObserver.onNext(reply.build()); - } finally { - responseObserver.onCompleted(); - } - } - } - - public static void main(String[] args) throws Exception { - OptionsParser parser = - OptionsParser.newOptionsParser(RemoteOptions.class, RemoteWorkerOptions.class); - parser.parseAndExitUponError(args); - RemoteOptions remoteOptions = parser.getOptions(RemoteOptions.class); - RemoteWorkerOptions remoteWorkerOptions = parser.getOptions(RemoteWorkerOptions.class); - - System.out.println("*** Starting Hazelcast server."); - ConcurrentMapActionCache cache = - new ConcurrentMapActionCache(ConcurrentMapFactory.createHazelcast(remoteOptions)); - - System.out.println( - "*** Starting grpc server on all locally bound IPs on port " - + remoteWorkerOptions.listenPort - + "."); - RemoteCache worker = new RemoteCache(cache); - final Server server = - ServerBuilder.forPort(remoteWorkerOptions.listenPort) - .addService(worker.getCasServer()) - .addService(worker.getExecCacheServer()) - .build(); - server.start(); - - Runtime.getRuntime() - .addShutdownHook( - new Thread() { - @Override - public void run() { - System.err.println("*** Shutting down grpc server."); - server.shutdown(); - System.err.println("*** Server shut down."); - } - }); - server.awaitTermination(); - } -} diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java index cca971a93d..3e2ef4aeb0 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java @@ -16,31 +16,51 @@ package com.google.devtools.build.remote; import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.remote.CacheNotFoundException; +import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase; import com.google.devtools.build.lib.remote.ConcurrentMapActionCache; import com.google.devtools.build.lib.remote.ConcurrentMapFactory; import com.google.devtools.build.lib.remote.ContentDigests; +import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceImplBase; +import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceImplBase; import com.google.devtools.build.lib.remote.RemoteOptions; import com.google.devtools.build.lib.remote.RemoteProtocol; import com.google.devtools.build.lib.remote.RemoteProtocol.Action; import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; +import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest; import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest; import com.google.devtools.build.lib.remote.RemoteProtocol.Command.EnvironmentEntry; import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus; import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; +import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode; import com.google.devtools.build.lib.shell.AbnormalTerminationException; import com.google.devtools.build.lib.shell.Command; import com.google.devtools.build.lib.shell.CommandException; import com.google.devtools.build.lib.unix.UnixFileSystem; import com.google.devtools.build.lib.util.OS; +import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.util.ProcessUtils; import com.google.devtools.build.lib.vfs.FileSystem; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.JavaIoFileSystem; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.common.options.OptionsParser; +import com.google.protobuf.ByteString; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; @@ -55,6 +75,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; import java.util.logging.Logger; @@ -62,152 +84,380 @@ import java.util.logging.Logger; * Implements a remote worker that accepts work items as protobufs. The server implementation is * based on grpc. */ -public class RemoteWorker extends ExecuteServiceImplBase { +public class RemoteWorker { private static final Logger LOG = Logger.getLogger(RemoteWorker.class.getName()); private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER); - private final Path workPath; - private final RemoteOptions remoteOptions; - private final RemoteWorkerOptions options; private final ConcurrentMapActionCache cache; + private final CasServiceImplBase casServer; + private final ExecuteServiceImplBase execServer; + private final ExecutionCacheServiceImplBase execCacheServer; - public RemoteWorker( - Path workPath, - RemoteOptions remoteOptions, - RemoteWorkerOptions options, - ConcurrentMapActionCache cache) { - this.workPath = workPath; - this.remoteOptions = remoteOptions; - this.options = options; + public RemoteWorker(Path workPath, RemoteWorkerOptions options, ConcurrentMapActionCache cache) { this.cache = cache; + casServer = new CasServer(); + execServer = new ExecutionServer(workPath, options); + execCacheServer = new ExecutionCacheServer(); } - private Map<String, String> getEnvironmentVariables(RemoteProtocol.Command command) { - HashMap<String, String> result = new HashMap<>(); - for (EnvironmentEntry entry : command.getEnvironmentList()) { - result.put(entry.getVariable(), entry.getValue()); + public CasServiceImplBase getCasServer() { + return casServer; + } + + public ExecuteServiceImplBase getExecutionServer() { + return execServer; + } + + public ExecutionCacheServiceImplBase getExecCacheServer() { + return execCacheServer; + } + + class CasServer extends CasServiceImplBase { + private static final int MAX_MEMORY_KBYTES = 512 * 1024; + + @Override + public void lookup(CasLookupRequest request, StreamObserver<CasLookupReply> responseObserver) { + CasLookupReply.Builder reply = CasLookupReply.newBuilder(); + CasStatus.Builder status = reply.getStatusBuilder(); + for (ContentDigest digest : request.getDigestList()) { + if (!cache.containsKey(digest)) { + status.addMissingDigest(digest); + } + } + if (status.getMissingDigestCount() > 0) { + status.setSucceeded(false); + status.setError(CasStatus.ErrorCode.MISSING_DIGEST); + } else { + status.setSucceeded(true); + } + responseObserver.onNext(reply.build()); + responseObserver.onCompleted(); + } + + @Override + public void uploadTreeMetadata( + CasUploadTreeMetadataRequest request, + StreamObserver<CasUploadTreeMetadataReply> responseObserver) { + try { + for (FileNode treeNode : request.getTreeNodeList()) { + cache.uploadBlob(treeNode.toByteArray()); + } + responseObserver.onNext( + CasUploadTreeMetadataReply.newBuilder() + .setStatus(CasStatus.newBuilder().setSucceeded(true)) + .build()); + } catch (Exception e) { + LOG.warning("Request failed: " + e.toString()); + CasUploadTreeMetadataReply.Builder reply = CasUploadTreeMetadataReply.newBuilder(); + reply + .getStatusBuilder() + .setSucceeded(false) + .setError(CasStatus.ErrorCode.UNKNOWN) + .setErrorDetail(e.toString()); + responseObserver.onNext(reply.build()); + } finally { + responseObserver.onCompleted(); + } + } + + @Override + public void downloadBlob( + CasDownloadBlobRequest request, StreamObserver<CasDownloadReply> responseObserver) { + CasDownloadReply.Builder reply = CasDownloadReply.newBuilder(); + CasStatus.Builder status = reply.getStatusBuilder(); + for (ContentDigest digest : request.getDigestList()) { + if (!cache.containsKey(digest)) { + status.addMissingDigest(digest); + } + } + if (status.getMissingDigestCount() > 0) { + status.setSucceeded(false); + status.setError(CasStatus.ErrorCode.MISSING_DIGEST); + responseObserver.onNext(reply.build()); + responseObserver.onCompleted(); + return; + } + status.setSucceeded(true); + try { + for (ContentDigest digest : request.getDigestList()) { + reply.setData( + BlobChunk.newBuilder() + .setDigest(digest) + .setData(ByteString.copyFrom(cache.downloadBlob(digest))) + .build()); + responseObserver.onNext(reply.build()); + if (reply.hasStatus()) { + reply.clearStatus(); // Only send status on first chunk. + } + } + } catch (CacheNotFoundException e) { + // This can only happen if an item gets evicted right after we check. + reply.clearData(); + status.setSucceeded(false); + status.setError(CasStatus.ErrorCode.MISSING_DIGEST); + status.addMissingDigest(e.getMissingDigest()); + responseObserver.onNext(reply.build()); + } finally { + responseObserver.onCompleted(); + } + } + + @Override + public StreamObserver<CasUploadBlobRequest> uploadBlob( + final StreamObserver<CasUploadBlobReply> responseObserver) { + return new StreamObserver<CasUploadBlobRequest>() { + byte[] blob = null; + ContentDigest digest = null; + long offset = 0; + + @Override + public void onNext(CasUploadBlobRequest request) { + BlobChunk chunk = request.getData(); + try { + if (chunk.hasDigest()) { + // Check if the previous chunk was really done. + Preconditions.checkArgument( + digest == null || offset == 0, + "Missing input chunk for digest %s", + digest == null ? "" : ContentDigests.toString(digest)); + digest = chunk.getDigest(); + // This unconditionally downloads the whole blob into memory! + Preconditions.checkArgument((int) (digest.getSizeBytes() / 1024) < MAX_MEMORY_KBYTES); + blob = new byte[(int) digest.getSizeBytes()]; + } + Preconditions.checkArgument(digest != null, "First chunk contains no digest"); + Preconditions.checkArgument( + offset == chunk.getOffset(), + "Missing input chunk for digest %s", + ContentDigests.toString(digest)); + if (digest.getSizeBytes() > 0) { + chunk.getData().copyTo(blob, (int) offset); + offset = (offset + chunk.getData().size()) % digest.getSizeBytes(); + } + if (offset == 0) { + ContentDigest uploadedDigest = cache.uploadBlob(blob); + Preconditions.checkArgument( + uploadedDigest.equals(digest), + "Digest mismatch: client sent %s, server computed %s", + ContentDigests.toString(digest), + ContentDigests.toString(uploadedDigest)); + } + } catch (Exception e) { + LOG.warning("Request failed: " + e.toString()); + CasUploadBlobReply.Builder reply = CasUploadBlobReply.newBuilder(); + reply + .getStatusBuilder() + .setSucceeded(false) + .setError( + e instanceof IllegalArgumentException + ? CasStatus.ErrorCode.INVALID_ARGUMENT + : CasStatus.ErrorCode.UNKNOWN) + .setErrorDetail(e.toString()); + responseObserver.onNext(reply.build()); + } + } + + @Override + public void onError(Throwable t) { + LOG.warning("Request errored remotely: " + t); + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; } - return result; } - public ExecuteReply execute(Action action, Path execRoot) - throws IOException, InterruptedException { - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - try { - RemoteProtocol.Command command = - RemoteProtocol.Command.parseFrom(cache.downloadBlob(action.getCommandDigest())); - cache.downloadTree(action.getInputRootDigest(), execRoot); - - List<Path> outputs = new ArrayList<>(action.getOutputPathList().size()); - for (String output : action.getOutputPathList()) { - Path file = execRoot.getRelative(output); - if (file.exists()) { - throw new FileAlreadyExistsException("Output file already exists: " + file); + class ExecutionCacheServer extends ExecutionCacheServiceImplBase { + @Override + public void getCachedResult( + ExecutionCacheRequest request, StreamObserver<ExecutionCacheReply> responseObserver) { + try { + ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest()); + ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder(); + ActionResult result = cache.getCachedActionResult(actionKey); + if (result != null) { + reply.setResult(result); } - FileSystemUtils.createDirectoryAndParents(file.getParentDirectory()); - outputs.add(file); + reply.getStatusBuilder().setSucceeded(true); + responseObserver.onNext(reply.build()); + } catch (Exception e) { + LOG.warning("getCachedActionResult request failed: " + e.toString()); + ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder(); + reply + .getStatusBuilder() + .setSucceeded(false) + .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN); + responseObserver.onNext(reply.build()); + } finally { + responseObserver.onCompleted(); } + } - // TODO(olaola): time out after specified server-side deadline. - Command cmd = - new Command( - command.getArgvList().toArray(new String[] {}), - getEnvironmentVariables(command), - new File(execRoot.getPathString())); - cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true); - - // Execute throws a CommandException on non-zero return values, so action has succeeded. - ImmutableList<ContentDigest> outErrDigests = - cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray())); - ActionResult.Builder result = - ActionResult.newBuilder() - .setReturnCode(0) - .setStdoutDigest(outErrDigests.get(0)) - .setStderrDigest(outErrDigests.get(1)); - cache.uploadAllResults(execRoot, outputs, result); - cache.setCachedActionResult(ContentDigests.computeActionKey(action), result.build()); - return ExecuteReply.newBuilder() - .setResult(result) - .setStatus(ExecutionStatus.newBuilder().setExecuted(true).setSucceeded(true)) - .build(); - } catch (CommandException e) { - ImmutableList<ContentDigest> outErrDigests = - cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray())); - final int returnCode = - e instanceof AbnormalTerminationException - ? ((AbnormalTerminationException) e).getResult().getTerminationStatus().getExitCode() - : -1; - return ExecuteReply.newBuilder() - .setResult( - ActionResult.newBuilder() - .setReturnCode(returnCode) - .setStdoutDigest(outErrDigests.get(0)) - .setStderrDigest(outErrDigests.get(1))) - .setStatus( - ExecutionStatus.newBuilder() - .setExecuted(true) - .setSucceeded(false) - .setError(ExecutionStatus.ErrorCode.EXEC_FAILED) - .setErrorDetail(e.toString())) - .build(); - } catch (CacheNotFoundException e) { - LOG.warning("Cache miss on " + ContentDigests.toString(e.getMissingDigest())); - return ExecuteReply.newBuilder() - .setCasError( - CasStatus.newBuilder() - .setSucceeded(false) - .addMissingDigest(e.getMissingDigest()) - .setError(CasStatus.ErrorCode.MISSING_DIGEST) - .setErrorDetail(e.toString())) - .setStatus( - ExecutionStatus.newBuilder() - .setExecuted(false) - .setSucceeded(false) - .setError( - e.getMissingDigest() == action.getCommandDigest() - ? ExecutionStatus.ErrorCode.MISSING_COMMAND - : ExecutionStatus.ErrorCode.MISSING_INPUT) - .setErrorDetail(e.toString())) - .build(); + @Override + public void setCachedResult( + ExecutionCacheSetRequest request, StreamObserver<ExecutionCacheSetReply> responseObserver) { + try { + ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest()); + cache.setCachedActionResult(actionKey, request.getResult()); + ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder(); + reply.getStatusBuilder().setSucceeded(true); + responseObserver.onNext(reply.build()); + } catch (Exception e) { + LOG.warning("setCachedActionResult request failed: " + e.toString()); + ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder(); + reply + .getStatusBuilder() + .setSucceeded(false) + .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN); + responseObserver.onNext(reply.build()); + } finally { + responseObserver.onCompleted(); + } } } - @Override - public void execute(ExecuteRequest request, StreamObserver<ExecuteReply> responseObserver) { - Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString()); - try { - tempRoot.createDirectory(); - if (LOG_FINER) { - LOG.fine( - "Work received has " - + request.getTotalInputFileCount() - + " input files and " - + request.getAction().getOutputPathCount() - + " output files."); + class ExecutionServer extends ExecuteServiceImplBase { + private final Path workPath; + private final RemoteWorkerOptions options; + + public ExecutionServer(Path workPath, RemoteWorkerOptions options) { + this.workPath = workPath; + this.options = options; + } + + private Map<String, String> getEnvironmentVariables(RemoteProtocol.Command command) { + HashMap<String, String> result = new HashMap<>(); + for (EnvironmentEntry entry : command.getEnvironmentList()) { + result.put(entry.getVariable(), entry.getValue()); } - ExecuteReply reply = execute(request.getAction(), tempRoot); - responseObserver.onNext(reply); - if (options.debug) { - if (!reply.getStatus().getSucceeded()) { - LOG.warning("Work failed. Request: " + request.toString() + "."); - } else if (LOG_FINER) { - LOG.fine("Work completed."); + return result; + } + + public ExecuteReply execute(Action action, Path execRoot) + throws IOException, InterruptedException { + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + try { + RemoteProtocol.Command command = + RemoteProtocol.Command.parseFrom(cache.downloadBlob(action.getCommandDigest())); + cache.downloadTree(action.getInputRootDigest(), execRoot); + + List<Path> outputs = new ArrayList<>(action.getOutputPathList().size()); + for (String output : action.getOutputPathList()) { + Path file = execRoot.getRelative(output); + if (file.exists()) { + throw new FileAlreadyExistsException("Output file already exists: " + file); + } + FileSystemUtils.createDirectoryAndParents(file.getParentDirectory()); + outputs.add(file); } + + // TODO(olaola): time out after specified server-side deadline. + Command cmd = + new Command( + command.getArgvList().toArray(new String[] {}), + getEnvironmentVariables(command), + new File(execRoot.getPathString())); + cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true); + + // Execute throws a CommandException on non-zero return values, so action has succeeded. + ImmutableList<ContentDigest> outErrDigests = + cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray())); + ActionResult.Builder result = + ActionResult.newBuilder() + .setReturnCode(0) + .setStdoutDigest(outErrDigests.get(0)) + .setStderrDigest(outErrDigests.get(1)); + cache.uploadAllResults(execRoot, outputs, result); + cache.setCachedActionResult(ContentDigests.computeActionKey(action), result.build()); + return ExecuteReply.newBuilder() + .setResult(result) + .setStatus(ExecutionStatus.newBuilder().setExecuted(true).setSucceeded(true)) + .build(); + } catch (CommandException e) { + ImmutableList<ContentDigest> outErrDigests = + cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray())); + final int returnCode = + e instanceof AbnormalTerminationException + ? ((AbnormalTerminationException) e) + .getResult() + .getTerminationStatus() + .getExitCode() + : -1; + return ExecuteReply.newBuilder() + .setResult( + ActionResult.newBuilder() + .setReturnCode(returnCode) + .setStdoutDigest(outErrDigests.get(0)) + .setStderrDigest(outErrDigests.get(1))) + .setStatus( + ExecutionStatus.newBuilder() + .setExecuted(true) + .setSucceeded(false) + .setError(ExecutionStatus.ErrorCode.EXEC_FAILED) + .setErrorDetail(e.toString())) + .build(); + } catch (CacheNotFoundException e) { + LOG.warning("Cache miss on " + ContentDigests.toString(e.getMissingDigest())); + return ExecuteReply.newBuilder() + .setCasError( + CasStatus.newBuilder() + .setSucceeded(false) + .addMissingDigest(e.getMissingDigest()) + .setError(CasStatus.ErrorCode.MISSING_DIGEST) + .setErrorDetail(e.toString())) + .setStatus( + ExecutionStatus.newBuilder() + .setExecuted(false) + .setSucceeded(false) + .setError( + e.getMissingDigest() == action.getCommandDigest() + ? ExecutionStatus.ErrorCode.MISSING_COMMAND + : ExecutionStatus.ErrorCode.MISSING_INPUT) + .setErrorDetail(e.toString())) + .build(); } - if (!options.debug) { - FileSystemUtils.deleteTree(tempRoot); - } else { - LOG.warning("Preserving work directory " + tempRoot.toString() + "."); - } - } catch (IOException | InterruptedException e) { - ExecuteReply.Builder reply = ExecuteReply.newBuilder(); - reply.getStatusBuilder().setSucceeded(false).setErrorDetail(e.toString()); - responseObserver.onNext(reply.build()); - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); + } + + @Override + public void execute(ExecuteRequest request, StreamObserver<ExecuteReply> responseObserver) { + Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString()); + try { + tempRoot.createDirectory(); + if (LOG_FINER) { + LOG.fine( + "Work received has " + + request.getTotalInputFileCount() + + " input files and " + + request.getAction().getOutputPathCount() + + " output files."); + } + ExecuteReply reply = execute(request.getAction(), tempRoot); + responseObserver.onNext(reply); + if (options.debug) { + if (!reply.getStatus().getSucceeded()) { + LOG.warning("Work failed. Request: " + request.toString() + "."); + } else if (LOG_FINER) { + LOG.fine("Work completed."); + } + } + if (!options.debug) { + FileSystemUtils.deleteTree(tempRoot); + } else { + LOG.warning("Preserving work directory " + tempRoot.toString() + "."); + } + } catch (IOException | InterruptedException e) { + ExecuteReply.Builder reply = ExecuteReply.newBuilder(); + reply.getStatusBuilder().setSucceeded(false).setErrorDetail(e.toString()); + responseObserver.onNext(reply.build()); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + } finally { + responseObserver.onCompleted(); } - } finally { - responseObserver.onCompleted(); } } @@ -223,9 +473,11 @@ public class RemoteWorker extends ExecuteServiceImplBase { return; } - System.out.println("*** Starting Hazelcast server."); - ConcurrentMapActionCache cache = - new ConcurrentMapActionCache(ConcurrentMapFactory.createHazelcast(remoteOptions)); + System.out.println("*** Initializing in-memory cache server."); + ConcurrentMap<String, byte[]> cache = + ConcurrentMapFactory.isRemoteCacheOptions(remoteOptions) + ? ConcurrentMapFactory.create(remoteOptions) + : new ConcurrentHashMap<String, byte[]>(); System.out.println( "*** Starting grpc server on all locally bound IPs on port " @@ -233,9 +485,14 @@ public class RemoteWorker extends ExecuteServiceImplBase { + "."); Path workPath = getFileSystem().getPath(remoteWorkerOptions.workPath); FileSystemUtils.createDirectoryAndParents(workPath); - RemoteWorker worker = new RemoteWorker(workPath, remoteOptions, remoteWorkerOptions, cache); + RemoteWorker worker = + new RemoteWorker(workPath, remoteWorkerOptions, new ConcurrentMapActionCache(cache)); final Server server = - ServerBuilder.forPort(remoteWorkerOptions.listenPort).addService(worker).build(); + ServerBuilder.forPort(remoteWorkerOptions.listenPort) + .addService(worker.getCasServer()) + .addService(worker.getExecutionServer()) + .addService(worker.getExecCacheServer()) + .build(); server.start(); final Path pidFile; @@ -270,7 +527,7 @@ public class RemoteWorker extends ExecuteServiceImplBase { } public static void printUsage(OptionsParser parser) { - System.out.println("Usage: remote_worker \n\n" + "Starts a worker that runs a RPC service."); + System.out.println("Usage: remote_worker \n\n" + "Starts a worker that runs a gRPC service."); System.out.println( parser.describeOptions( Collections.<String, String>emptyMap(), OptionsParser.HelpVerbosity.LONG)); |