aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Ola Rozenfeld <olaola@google.com>2017-03-08 20:34:15 +0000
committerGravatar Vladimir Moskva <vladmos@google.com>2017-03-09 10:29:34 +0000
commitad7fa4db34d204b16b810feed08071cf4814ce81 (patch)
tree37041fe4b5203eeeb09c614b7c3a0fbce6a4c982
parentb4b1d88bb5739335ed86481dc7689dd1e3734455 (diff)
Merging remote_cache and remote_worker into a single binary.
It can still be used as only a cache server, or only a worker with a wrapper of Hazelcast, so no functionality is lost, but it is now simpler to use in local testing / prototyping. Changed README files appropriately. TESTED=locally -- Change-Id: I3fdff9d434ce8cae5a6a700df0cb9f5bc364b60c Reviewed-on: https://cr.bazel.build/9253 PiperOrigin-RevId: 149569790 MOS_MIGRATED_REVID=149569790
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java4
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/README.md33
-rwxr-xr-xsrc/test/shell/bazel/remote_execution_test.sh4
-rw-r--r--src/tools/remote_worker/BUILD7
-rw-r--r--src/tools/remote_worker/README.md15
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteCache.java315
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java521
7 files changed, 417 insertions, 482 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java
index 6867c222e5..6c3fbffbc2 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java
@@ -237,7 +237,9 @@ public final class ConcurrentMapFactory {
}
private static boolean isHazelcastOptions(RemoteOptions options) {
- return options.hazelcastNode != null || options.hazelcastClientConfig != null;
+ return options.hazelcastNode != null
+ || options.hazelcastClientConfig != null
+ || options.hazelcastStandaloneListenPort != 0;
}
private static boolean isRestUrlOptions(RemoteOptions options) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/README.md b/src/main/java/com/google/devtools/build/lib/remote/README.md
index a62fc38df2..5040732ffe 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/README.md
+++ b/src/main/java/com/google/devtools/build/lib/remote/README.md
@@ -125,27 +125,26 @@ startup --host_jvm_args=-Dbazel.DigestFunction=SHA1
#### Running the sample gRPC cache server
-Bazel currently provides a sample gRPC CAS implementation with Hazelcast as caching backend.
+Bazel currently provides a sample gRPC CAS implementation with a ConcurrentHashMap or Hazelcast as caching backend.
To use it you need to clone from [Bazel](https://github.com/bazelbuild/bazel) and then build it.
```
-bazel build //src/tools/remote_worker:remote_cache
+bazel build //src/tools/remote_worker:all
```
-The following command will then start the cache server listening on port 8081 with the default
-Hazelcast settings.
+The following command will then start the cache server listening on port 8080 using a local in-memory cache.
```
-bazel-bin/src/tools/remote_worker/remote_cache --listen_port 8081
+bazel-bin/src/tools/remote_worker/remote_worker --listen_port=8080
```
-To run everything in a single command.
+To connect to a running instance of Hazelcast instead, use.
```
-bazel run //src/tools/remote_worker:remote_cache -- --listen_port 8081
+bazel run //src/tools/remote_worker:remote_worker -- --listen_port=8080 --hazelcast_node=address:port
```
If you want to change Hazelcast settings to enable distributed memory cache you can provide your
own hazelcast.xml with the following command.
```
-bazel-bin/src/tools/remote_worker/remote_cache --jvm_flags=-Dhazelcast.config=/path/to/hz.xml --listen_port 8081
+bazel-bin/src/tools/remote_worker/remote_worker --jvm_flags=-Dhazelcast.config=/path/to/hz.xml --listen_port 8080
```
You can copy and edit the [default](https://github.com/hazelcast/hazelcast/blob/master/hazelcast/src/main/resources/hazelcast-default.xml) Hazelcast configuration. Refer to Hazelcast [manual](http://docs.hazelcast.org/docs/3.6/manual/html-single/index.html#checking-configuration)
for more details.
@@ -153,10 +152,10 @@ for more details.
#### Using the gRPC CAS endpoint
Use the following build options to use the gRPC CAS endpoint for sharing build artifacts. Change
-`address:8081` to the correct server address and port number.
+`address:8080` to the correct server address and port number.
```
-build --spawn_strategy=remote --remote_cache=address:8081
+build --spawn_strategy=remote --remote_cache=address:8080
```
### Distributed caching with Hazelcast (TO BE REMOVED)
@@ -201,19 +200,13 @@ following line:
startup --host_jvm_args=-Dbazel.DigestFunction=SHA1
```
-## Running the sample gRPC cache server
-```
-bazel build //src/tools/remote_worker:remote_cache
-bazel-bin/src/tools/remote_worker/remote_cache --listen_port 8081
-```
-
-## Running the sample gRPC remote worker
+## Running the sample gRPC remote worker / cache server
```
bazel build //src/tools/remote_worker:remote_worker
-bazel-bin/src/tools/remote_worker/remote_cache --work_path=/tmp --listen_port 8080
+bazel-bin/src/tools/remote_worker/remote_worker --work_path=/tmp --listen_port 8080
```
-The sample gRPC cache server and gRPC remote worker both use Hazelcast and shares the **same
+The sample gRPC cache server and gRPC remote worker share the **same
distributed memory cluster** for storing and accessing CAS objects. It is important the CAS objects
are shared between the two server processes.
@@ -225,5 +218,5 @@ memory cluster.
Use the following build options.
```
-build --spawn_strategy=remote --remote_worker=localhost:8080 --remote_cache=localhost:8081
+build --spawn_strategy=remote --remote_worker=localhost:8080 --remote_cache=localhost:8080
```
diff --git a/src/test/shell/bazel/remote_execution_test.sh b/src/test/shell/bazel/remote_execution_test.sh
index 897bdbb0e6..a33805037f 100755
--- a/src/test/shell/bazel/remote_execution_test.sh
+++ b/src/test/shell/bazel/remote_execution_test.sh
@@ -71,9 +71,9 @@ function test_cc_binary() {
bazel --host_jvm_args=-Dbazel.DigestFunction=SHA1 build \
--spawn_strategy=remote \
- --hazelcast_node=localhost:${hazelcast_port} \
--remote_worker=localhost:${worker_port} \
- //a:test >& $TEST_log \
+ --remote_cache=localhost:${worker_port} \
+ //a:test >& $TEST_log \
|| fail "Failed to build //a:test with remote execution"
diff bazel-bin/a/test ${TEST_TMPDIR}/test_expected \
|| fail "Remote execution generated different result"
diff --git a/src/tools/remote_worker/BUILD b/src/tools/remote_worker/BUILD
index a319119601..b8ed480ae1 100644
--- a/src/tools/remote_worker/BUILD
+++ b/src/tools/remote_worker/BUILD
@@ -10,10 +10,3 @@ java_binary(
visibility = ["//visibility:public"],
runtime_deps = ["//src/tools/remote_worker/src/main/java/com/google/devtools/build/remote"],
)
-
-java_binary(
- name = "remote_cache",
- main_class = "com.google.devtools.build.remote.RemoteCache",
- visibility = ["//visibility:public"],
- runtime_deps = ["//src/tools/remote_worker/src/main/java/com/google/devtools/build/remote"],
-)
diff --git a/src/tools/remote_worker/README.md b/src/tools/remote_worker/README.md
index 7664d3345a..088ea7849e 100644
--- a/src/tools/remote_worker/README.md
+++ b/src/tools/remote_worker/README.md
@@ -1,5 +1,6 @@
This program implements a remote execution worker that uses gRPC to accept work
-requests. It also serves as a Hazelcast server for distributed caching.
+requests. It can work as a remote execution worker, a cache worker, or both.
+The simplest setup is as follows:
- First build remote_worker and run it.
@@ -9,9 +10,13 @@ requests. It also serves as a Hazelcast server for distributed caching.
- Then you run Bazel pointing to the remote_worker instance.
- bazel build --hazelcast_node=127.0.0.1:5701 --spawn_strategy=remote \
- --remote_worker=127.0.0.1:8080 src/tools/generate_workspace:all
+ bazel --host_jvm_args=-Dbazel.DigestFunction=SHA1 build \
+ --spawn_strategy=remote --remote_cache=localhost:8080 \
+ --remote_worker=localhost:8080 src/tools/generate_workspace:all
The above command will build generate_workspace with remote spawn strategy that
-uses Hazelcast as the distributed caching backend and executes work remotely on
-the localhost remote_worker.
+uses the local worker as the distributed caching and execution backend.
+
+You can also use a Hazelcast server for the distributed cache as follows:
+Suppose your Hazelcast server is listening on address:port. Then, run the
+remote worker with --hazelcast_node=address:port.
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteCache.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteCache.java
deleted file mode 100644
index 023145f230..0000000000
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteCache.java
+++ /dev/null
@@ -1,315 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.remote;
-
-import com.google.devtools.build.lib.remote.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase;
-import com.google.devtools.build.lib.remote.ConcurrentMapActionCache;
-import com.google.devtools.build.lib.remote.ConcurrentMapFactory;
-import com.google.devtools.build.lib.remote.ContentDigests;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceImplBase;
-import com.google.devtools.build.lib.remote.RemoteOptions;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
-import com.google.devtools.build.lib.util.Preconditions;
-import com.google.devtools.common.options.OptionsParser;
-import com.google.protobuf.ByteString;
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import io.grpc.stub.StreamObserver;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/** A server which acts as a gRPC wrapper around concurrent map based remote cache. */
-public class RemoteCache {
- private static final Logger LOG = Logger.getLogger(RemoteWorker.class.getName());
- private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER);
- private static final int MAX_MEMORY_KBYTES = 512 * 1024;
- private final ConcurrentMapActionCache cache;
- private final CasServiceImplBase casServer;
- private final ExecutionCacheServiceImplBase execCacheServer;
-
- public RemoteCache(ConcurrentMapActionCache cache) {
- this.cache = cache;
- casServer = new CasServer();
- execCacheServer = new ExecutionCacheServer();
- }
-
- public CasServiceImplBase getCasServer() {
- return casServer;
- }
-
- public ExecutionCacheServiceImplBase getExecCacheServer() {
- return execCacheServer;
- }
-
- class CasServer extends CasServiceImplBase {
- @Override
- public void lookup(CasLookupRequest request, StreamObserver<CasLookupReply> responseObserver) {
- CasLookupReply.Builder reply = CasLookupReply.newBuilder();
- CasStatus.Builder status = reply.getStatusBuilder();
- for (ContentDigest digest : request.getDigestList()) {
- if (!cache.containsKey(digest)) {
- status.addMissingDigest(digest);
- }
- }
- if (status.getMissingDigestCount() > 0) {
- status.setSucceeded(false);
- status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
- } else {
- status.setSucceeded(true);
- }
- responseObserver.onNext(reply.build());
- responseObserver.onCompleted();
- }
-
- @Override
- public void uploadTreeMetadata(
- CasUploadTreeMetadataRequest request,
- StreamObserver<CasUploadTreeMetadataReply> responseObserver) {
- try {
- for (FileNode treeNode : request.getTreeNodeList()) {
- cache.uploadBlob(treeNode.toByteArray());
- }
- responseObserver.onNext(
- CasUploadTreeMetadataReply.newBuilder()
- .setStatus(CasStatus.newBuilder().setSucceeded(true))
- .build());
- } catch (Exception e) {
- LOG.warning("Request failed: " + e.toString());
- CasUploadTreeMetadataReply.Builder reply = CasUploadTreeMetadataReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(CasStatus.ErrorCode.UNKNOWN)
- .setErrorDetail(e.toString());
- responseObserver.onNext(reply.build());
- } finally {
- responseObserver.onCompleted();
- }
- }
-
- @Override
- public void downloadBlob(
- CasDownloadBlobRequest request, StreamObserver<CasDownloadReply> responseObserver) {
- CasDownloadReply.Builder reply = CasDownloadReply.newBuilder();
- CasStatus.Builder status = reply.getStatusBuilder();
- for (ContentDigest digest : request.getDigestList()) {
- if (!cache.containsKey(digest)) {
- status.addMissingDigest(digest);
- }
- }
- if (status.getMissingDigestCount() > 0) {
- status.setSucceeded(false);
- status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
- responseObserver.onNext(reply.build());
- responseObserver.onCompleted();
- return;
- }
- status.setSucceeded(true);
- try {
- for (ContentDigest digest : request.getDigestList()) {
- reply.setData(
- BlobChunk.newBuilder()
- .setDigest(digest)
- .setData(ByteString.copyFrom(cache.downloadBlob(digest)))
- .build());
- responseObserver.onNext(reply.build());
- if (reply.hasStatus()) {
- reply.clearStatus(); // Only send status on first chunk.
- }
- }
- } catch (CacheNotFoundException e) {
- // This can only happen if an item gets evicted right after we check.
- reply.clearData();
- status.setSucceeded(false);
- status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
- status.addMissingDigest(e.getMissingDigest());
- responseObserver.onNext(reply.build());
- } finally {
- responseObserver.onCompleted();
- }
- }
-
- @Override
- public StreamObserver<CasUploadBlobRequest> uploadBlob(
- final StreamObserver<CasUploadBlobReply> responseObserver) {
- return new StreamObserver<CasUploadBlobRequest>() {
- byte[] blob = null;
- ContentDigest digest = null;
- long offset = 0;
-
- @Override
- public void onNext(CasUploadBlobRequest request) {
- BlobChunk chunk = request.getData();
- try {
- if (chunk.hasDigest()) {
- // Check if the previous chunk was really done.
- Preconditions.checkArgument(
- digest == null || offset == 0,
- "Missing input chunk for digest %s",
- digest == null ? "" : ContentDigests.toString(digest));
- digest = chunk.getDigest();
- // This unconditionally downloads the whole blob into memory!
- Preconditions.checkArgument((int) (digest.getSizeBytes() / 1024) < MAX_MEMORY_KBYTES);
- blob = new byte[(int) digest.getSizeBytes()];
- }
- Preconditions.checkArgument(digest != null, "First chunk contains no digest");
- Preconditions.checkArgument(
- offset == chunk.getOffset(),
- "Missing input chunk for digest %s",
- ContentDigests.toString(digest));
- if (digest.getSizeBytes() > 0) {
- chunk.getData().copyTo(blob, (int) offset);
- offset = (offset + chunk.getData().size()) % digest.getSizeBytes();
- }
- if (offset == 0) {
- ContentDigest uploadedDigest = cache.uploadBlob(blob);
- Preconditions.checkArgument(
- uploadedDigest.equals(digest),
- "Digest mismatch: client sent %s, server computed %s",
- ContentDigests.toString(digest),
- ContentDigests.toString(uploadedDigest));
- }
- } catch (Exception e) {
- LOG.warning("Request failed: " + e.toString());
- CasUploadBlobReply.Builder reply = CasUploadBlobReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(
- e instanceof IllegalArgumentException
- ? CasStatus.ErrorCode.INVALID_ARGUMENT
- : CasStatus.ErrorCode.UNKNOWN)
- .setErrorDetail(e.toString());
- responseObserver.onNext(reply.build());
- }
- }
-
- @Override
- public void onError(Throwable t) {
- LOG.warning("Request errored remotely: " + t);
- }
-
- @Override
- public void onCompleted() {
- responseObserver.onCompleted();
- }
- };
- }
- }
-
- class ExecutionCacheServer extends ExecutionCacheServiceImplBase {
- @Override
- public void getCachedResult(
- ExecutionCacheRequest request, StreamObserver<ExecutionCacheReply> responseObserver) {
- try {
- ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest());
- ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder();
- ActionResult result = cache.getCachedActionResult(actionKey);
- if (result != null) {
- reply.setResult(result);
- }
- reply.getStatusBuilder().setSucceeded(true);
- responseObserver.onNext(reply.build());
- } catch (Exception e) {
- LOG.warning("getCachedActionResult request failed: " + e.toString());
- ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN);
- responseObserver.onNext(reply.build());
- } finally {
- responseObserver.onCompleted();
- }
- }
-
- @Override
- public void setCachedResult(
- ExecutionCacheSetRequest request, StreamObserver<ExecutionCacheSetReply> responseObserver) {
- try {
- ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest());
- cache.setCachedActionResult(actionKey, request.getResult());
- ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder();
- reply.getStatusBuilder().setSucceeded(true);
- responseObserver.onNext(reply.build());
- } catch (Exception e) {
- LOG.warning("setCachedActionResult request failed: " + e.toString());
- ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN);
- responseObserver.onNext(reply.build());
- } finally {
- responseObserver.onCompleted();
- }
- }
- }
-
- public static void main(String[] args) throws Exception {
- OptionsParser parser =
- OptionsParser.newOptionsParser(RemoteOptions.class, RemoteWorkerOptions.class);
- parser.parseAndExitUponError(args);
- RemoteOptions remoteOptions = parser.getOptions(RemoteOptions.class);
- RemoteWorkerOptions remoteWorkerOptions = parser.getOptions(RemoteWorkerOptions.class);
-
- System.out.println("*** Starting Hazelcast server.");
- ConcurrentMapActionCache cache =
- new ConcurrentMapActionCache(ConcurrentMapFactory.createHazelcast(remoteOptions));
-
- System.out.println(
- "*** Starting grpc server on all locally bound IPs on port "
- + remoteWorkerOptions.listenPort
- + ".");
- RemoteCache worker = new RemoteCache(cache);
- final Server server =
- ServerBuilder.forPort(remoteWorkerOptions.listenPort)
- .addService(worker.getCasServer())
- .addService(worker.getExecCacheServer())
- .build();
- server.start();
-
- Runtime.getRuntime()
- .addShutdownHook(
- new Thread() {
- @Override
- public void run() {
- System.err.println("*** Shutting down grpc server.");
- server.shutdown();
- System.err.println("*** Server shut down.");
- }
- });
- server.awaitTermination();
- }
-}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
index cca971a93d..3e2ef4aeb0 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
@@ -16,31 +16,51 @@ package com.google.devtools.build.remote;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.remote.CacheNotFoundException;
+import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase;
import com.google.devtools.build.lib.remote.ConcurrentMapActionCache;
import com.google.devtools.build.lib.remote.ConcurrentMapFactory;
import com.google.devtools.build.lib.remote.ContentDigests;
+import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceImplBase;
+import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceImplBase;
import com.google.devtools.build.lib.remote.RemoteOptions;
import com.google.devtools.build.lib.remote.RemoteProtocol;
import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
+import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
import com.google.devtools.build.lib.remote.RemoteProtocol.Command.EnvironmentEntry;
import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
+import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
import com.google.devtools.build.lib.shell.AbnormalTerminationException;
import com.google.devtools.build.lib.shell.Command;
import com.google.devtools.build.lib.shell.CommandException;
import com.google.devtools.build.lib.unix.UnixFileSystem;
import com.google.devtools.build.lib.util.OS;
+import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.util.ProcessUtils;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.JavaIoFileSystem;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.common.options.OptionsParser;
+import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
@@ -55,6 +75,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -62,152 +84,380 @@ import java.util.logging.Logger;
* Implements a remote worker that accepts work items as protobufs. The server implementation is
* based on grpc.
*/
-public class RemoteWorker extends ExecuteServiceImplBase {
+public class RemoteWorker {
private static final Logger LOG = Logger.getLogger(RemoteWorker.class.getName());
private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER);
- private final Path workPath;
- private final RemoteOptions remoteOptions;
- private final RemoteWorkerOptions options;
private final ConcurrentMapActionCache cache;
+ private final CasServiceImplBase casServer;
+ private final ExecuteServiceImplBase execServer;
+ private final ExecutionCacheServiceImplBase execCacheServer;
- public RemoteWorker(
- Path workPath,
- RemoteOptions remoteOptions,
- RemoteWorkerOptions options,
- ConcurrentMapActionCache cache) {
- this.workPath = workPath;
- this.remoteOptions = remoteOptions;
- this.options = options;
+ public RemoteWorker(Path workPath, RemoteWorkerOptions options, ConcurrentMapActionCache cache) {
this.cache = cache;
+ casServer = new CasServer();
+ execServer = new ExecutionServer(workPath, options);
+ execCacheServer = new ExecutionCacheServer();
}
- private Map<String, String> getEnvironmentVariables(RemoteProtocol.Command command) {
- HashMap<String, String> result = new HashMap<>();
- for (EnvironmentEntry entry : command.getEnvironmentList()) {
- result.put(entry.getVariable(), entry.getValue());
+ public CasServiceImplBase getCasServer() {
+ return casServer;
+ }
+
+ public ExecuteServiceImplBase getExecutionServer() {
+ return execServer;
+ }
+
+ public ExecutionCacheServiceImplBase getExecCacheServer() {
+ return execCacheServer;
+ }
+
+ class CasServer extends CasServiceImplBase {
+ private static final int MAX_MEMORY_KBYTES = 512 * 1024;
+
+ @Override
+ public void lookup(CasLookupRequest request, StreamObserver<CasLookupReply> responseObserver) {
+ CasLookupReply.Builder reply = CasLookupReply.newBuilder();
+ CasStatus.Builder status = reply.getStatusBuilder();
+ for (ContentDigest digest : request.getDigestList()) {
+ if (!cache.containsKey(digest)) {
+ status.addMissingDigest(digest);
+ }
+ }
+ if (status.getMissingDigestCount() > 0) {
+ status.setSucceeded(false);
+ status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
+ } else {
+ status.setSucceeded(true);
+ }
+ responseObserver.onNext(reply.build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void uploadTreeMetadata(
+ CasUploadTreeMetadataRequest request,
+ StreamObserver<CasUploadTreeMetadataReply> responseObserver) {
+ try {
+ for (FileNode treeNode : request.getTreeNodeList()) {
+ cache.uploadBlob(treeNode.toByteArray());
+ }
+ responseObserver.onNext(
+ CasUploadTreeMetadataReply.newBuilder()
+ .setStatus(CasStatus.newBuilder().setSucceeded(true))
+ .build());
+ } catch (Exception e) {
+ LOG.warning("Request failed: " + e.toString());
+ CasUploadTreeMetadataReply.Builder reply = CasUploadTreeMetadataReply.newBuilder();
+ reply
+ .getStatusBuilder()
+ .setSucceeded(false)
+ .setError(CasStatus.ErrorCode.UNKNOWN)
+ .setErrorDetail(e.toString());
+ responseObserver.onNext(reply.build());
+ } finally {
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void downloadBlob(
+ CasDownloadBlobRequest request, StreamObserver<CasDownloadReply> responseObserver) {
+ CasDownloadReply.Builder reply = CasDownloadReply.newBuilder();
+ CasStatus.Builder status = reply.getStatusBuilder();
+ for (ContentDigest digest : request.getDigestList()) {
+ if (!cache.containsKey(digest)) {
+ status.addMissingDigest(digest);
+ }
+ }
+ if (status.getMissingDigestCount() > 0) {
+ status.setSucceeded(false);
+ status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
+ responseObserver.onNext(reply.build());
+ responseObserver.onCompleted();
+ return;
+ }
+ status.setSucceeded(true);
+ try {
+ for (ContentDigest digest : request.getDigestList()) {
+ reply.setData(
+ BlobChunk.newBuilder()
+ .setDigest(digest)
+ .setData(ByteString.copyFrom(cache.downloadBlob(digest)))
+ .build());
+ responseObserver.onNext(reply.build());
+ if (reply.hasStatus()) {
+ reply.clearStatus(); // Only send status on first chunk.
+ }
+ }
+ } catch (CacheNotFoundException e) {
+ // This can only happen if an item gets evicted right after we check.
+ reply.clearData();
+ status.setSucceeded(false);
+ status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
+ status.addMissingDigest(e.getMissingDigest());
+ responseObserver.onNext(reply.build());
+ } finally {
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public StreamObserver<CasUploadBlobRequest> uploadBlob(
+ final StreamObserver<CasUploadBlobReply> responseObserver) {
+ return new StreamObserver<CasUploadBlobRequest>() {
+ byte[] blob = null;
+ ContentDigest digest = null;
+ long offset = 0;
+
+ @Override
+ public void onNext(CasUploadBlobRequest request) {
+ BlobChunk chunk = request.getData();
+ try {
+ if (chunk.hasDigest()) {
+ // Check if the previous chunk was really done.
+ Preconditions.checkArgument(
+ digest == null || offset == 0,
+ "Missing input chunk for digest %s",
+ digest == null ? "" : ContentDigests.toString(digest));
+ digest = chunk.getDigest();
+ // This unconditionally downloads the whole blob into memory!
+ Preconditions.checkArgument((int) (digest.getSizeBytes() / 1024) < MAX_MEMORY_KBYTES);
+ blob = new byte[(int) digest.getSizeBytes()];
+ }
+ Preconditions.checkArgument(digest != null, "First chunk contains no digest");
+ Preconditions.checkArgument(
+ offset == chunk.getOffset(),
+ "Missing input chunk for digest %s",
+ ContentDigests.toString(digest));
+ if (digest.getSizeBytes() > 0) {
+ chunk.getData().copyTo(blob, (int) offset);
+ offset = (offset + chunk.getData().size()) % digest.getSizeBytes();
+ }
+ if (offset == 0) {
+ ContentDigest uploadedDigest = cache.uploadBlob(blob);
+ Preconditions.checkArgument(
+ uploadedDigest.equals(digest),
+ "Digest mismatch: client sent %s, server computed %s",
+ ContentDigests.toString(digest),
+ ContentDigests.toString(uploadedDigest));
+ }
+ } catch (Exception e) {
+ LOG.warning("Request failed: " + e.toString());
+ CasUploadBlobReply.Builder reply = CasUploadBlobReply.newBuilder();
+ reply
+ .getStatusBuilder()
+ .setSucceeded(false)
+ .setError(
+ e instanceof IllegalArgumentException
+ ? CasStatus.ErrorCode.INVALID_ARGUMENT
+ : CasStatus.ErrorCode.UNKNOWN)
+ .setErrorDetail(e.toString());
+ responseObserver.onNext(reply.build());
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ LOG.warning("Request errored remotely: " + t);
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onCompleted();
+ }
+ };
}
- return result;
}
- public ExecuteReply execute(Action action, Path execRoot)
- throws IOException, InterruptedException {
- ByteArrayOutputStream stdout = new ByteArrayOutputStream();
- ByteArrayOutputStream stderr = new ByteArrayOutputStream();
- try {
- RemoteProtocol.Command command =
- RemoteProtocol.Command.parseFrom(cache.downloadBlob(action.getCommandDigest()));
- cache.downloadTree(action.getInputRootDigest(), execRoot);
-
- List<Path> outputs = new ArrayList<>(action.getOutputPathList().size());
- for (String output : action.getOutputPathList()) {
- Path file = execRoot.getRelative(output);
- if (file.exists()) {
- throw new FileAlreadyExistsException("Output file already exists: " + file);
+ class ExecutionCacheServer extends ExecutionCacheServiceImplBase {
+ @Override
+ public void getCachedResult(
+ ExecutionCacheRequest request, StreamObserver<ExecutionCacheReply> responseObserver) {
+ try {
+ ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest());
+ ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder();
+ ActionResult result = cache.getCachedActionResult(actionKey);
+ if (result != null) {
+ reply.setResult(result);
}
- FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
- outputs.add(file);
+ reply.getStatusBuilder().setSucceeded(true);
+ responseObserver.onNext(reply.build());
+ } catch (Exception e) {
+ LOG.warning("getCachedActionResult request failed: " + e.toString());
+ ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder();
+ reply
+ .getStatusBuilder()
+ .setSucceeded(false)
+ .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN);
+ responseObserver.onNext(reply.build());
+ } finally {
+ responseObserver.onCompleted();
}
+ }
- // TODO(olaola): time out after specified server-side deadline.
- Command cmd =
- new Command(
- command.getArgvList().toArray(new String[] {}),
- getEnvironmentVariables(command),
- new File(execRoot.getPathString()));
- cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true);
-
- // Execute throws a CommandException on non-zero return values, so action has succeeded.
- ImmutableList<ContentDigest> outErrDigests =
- cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
- ActionResult.Builder result =
- ActionResult.newBuilder()
- .setReturnCode(0)
- .setStdoutDigest(outErrDigests.get(0))
- .setStderrDigest(outErrDigests.get(1));
- cache.uploadAllResults(execRoot, outputs, result);
- cache.setCachedActionResult(ContentDigests.computeActionKey(action), result.build());
- return ExecuteReply.newBuilder()
- .setResult(result)
- .setStatus(ExecutionStatus.newBuilder().setExecuted(true).setSucceeded(true))
- .build();
- } catch (CommandException e) {
- ImmutableList<ContentDigest> outErrDigests =
- cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
- final int returnCode =
- e instanceof AbnormalTerminationException
- ? ((AbnormalTerminationException) e).getResult().getTerminationStatus().getExitCode()
- : -1;
- return ExecuteReply.newBuilder()
- .setResult(
- ActionResult.newBuilder()
- .setReturnCode(returnCode)
- .setStdoutDigest(outErrDigests.get(0))
- .setStderrDigest(outErrDigests.get(1)))
- .setStatus(
- ExecutionStatus.newBuilder()
- .setExecuted(true)
- .setSucceeded(false)
- .setError(ExecutionStatus.ErrorCode.EXEC_FAILED)
- .setErrorDetail(e.toString()))
- .build();
- } catch (CacheNotFoundException e) {
- LOG.warning("Cache miss on " + ContentDigests.toString(e.getMissingDigest()));
- return ExecuteReply.newBuilder()
- .setCasError(
- CasStatus.newBuilder()
- .setSucceeded(false)
- .addMissingDigest(e.getMissingDigest())
- .setError(CasStatus.ErrorCode.MISSING_DIGEST)
- .setErrorDetail(e.toString()))
- .setStatus(
- ExecutionStatus.newBuilder()
- .setExecuted(false)
- .setSucceeded(false)
- .setError(
- e.getMissingDigest() == action.getCommandDigest()
- ? ExecutionStatus.ErrorCode.MISSING_COMMAND
- : ExecutionStatus.ErrorCode.MISSING_INPUT)
- .setErrorDetail(e.toString()))
- .build();
+ @Override
+ public void setCachedResult(
+ ExecutionCacheSetRequest request, StreamObserver<ExecutionCacheSetReply> responseObserver) {
+ try {
+ ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest());
+ cache.setCachedActionResult(actionKey, request.getResult());
+ ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder();
+ reply.getStatusBuilder().setSucceeded(true);
+ responseObserver.onNext(reply.build());
+ } catch (Exception e) {
+ LOG.warning("setCachedActionResult request failed: " + e.toString());
+ ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder();
+ reply
+ .getStatusBuilder()
+ .setSucceeded(false)
+ .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN);
+ responseObserver.onNext(reply.build());
+ } finally {
+ responseObserver.onCompleted();
+ }
}
}
- @Override
- public void execute(ExecuteRequest request, StreamObserver<ExecuteReply> responseObserver) {
- Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString());
- try {
- tempRoot.createDirectory();
- if (LOG_FINER) {
- LOG.fine(
- "Work received has "
- + request.getTotalInputFileCount()
- + " input files and "
- + request.getAction().getOutputPathCount()
- + " output files.");
+ class ExecutionServer extends ExecuteServiceImplBase {
+ private final Path workPath;
+ private final RemoteWorkerOptions options;
+
+ public ExecutionServer(Path workPath, RemoteWorkerOptions options) {
+ this.workPath = workPath;
+ this.options = options;
+ }
+
+ private Map<String, String> getEnvironmentVariables(RemoteProtocol.Command command) {
+ HashMap<String, String> result = new HashMap<>();
+ for (EnvironmentEntry entry : command.getEnvironmentList()) {
+ result.put(entry.getVariable(), entry.getValue());
}
- ExecuteReply reply = execute(request.getAction(), tempRoot);
- responseObserver.onNext(reply);
- if (options.debug) {
- if (!reply.getStatus().getSucceeded()) {
- LOG.warning("Work failed. Request: " + request.toString() + ".");
- } else if (LOG_FINER) {
- LOG.fine("Work completed.");
+ return result;
+ }
+
+ public ExecuteReply execute(Action action, Path execRoot)
+ throws IOException, InterruptedException {
+ ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+ try {
+ RemoteProtocol.Command command =
+ RemoteProtocol.Command.parseFrom(cache.downloadBlob(action.getCommandDigest()));
+ cache.downloadTree(action.getInputRootDigest(), execRoot);
+
+ List<Path> outputs = new ArrayList<>(action.getOutputPathList().size());
+ for (String output : action.getOutputPathList()) {
+ Path file = execRoot.getRelative(output);
+ if (file.exists()) {
+ throw new FileAlreadyExistsException("Output file already exists: " + file);
+ }
+ FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
+ outputs.add(file);
}
+
+ // TODO(olaola): time out after specified server-side deadline.
+ Command cmd =
+ new Command(
+ command.getArgvList().toArray(new String[] {}),
+ getEnvironmentVariables(command),
+ new File(execRoot.getPathString()));
+ cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true);
+
+ // Execute throws a CommandException on non-zero return values, so action has succeeded.
+ ImmutableList<ContentDigest> outErrDigests =
+ cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
+ ActionResult.Builder result =
+ ActionResult.newBuilder()
+ .setReturnCode(0)
+ .setStdoutDigest(outErrDigests.get(0))
+ .setStderrDigest(outErrDigests.get(1));
+ cache.uploadAllResults(execRoot, outputs, result);
+ cache.setCachedActionResult(ContentDigests.computeActionKey(action), result.build());
+ return ExecuteReply.newBuilder()
+ .setResult(result)
+ .setStatus(ExecutionStatus.newBuilder().setExecuted(true).setSucceeded(true))
+ .build();
+ } catch (CommandException e) {
+ ImmutableList<ContentDigest> outErrDigests =
+ cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
+ final int returnCode =
+ e instanceof AbnormalTerminationException
+ ? ((AbnormalTerminationException) e)
+ .getResult()
+ .getTerminationStatus()
+ .getExitCode()
+ : -1;
+ return ExecuteReply.newBuilder()
+ .setResult(
+ ActionResult.newBuilder()
+ .setReturnCode(returnCode)
+ .setStdoutDigest(outErrDigests.get(0))
+ .setStderrDigest(outErrDigests.get(1)))
+ .setStatus(
+ ExecutionStatus.newBuilder()
+ .setExecuted(true)
+ .setSucceeded(false)
+ .setError(ExecutionStatus.ErrorCode.EXEC_FAILED)
+ .setErrorDetail(e.toString()))
+ .build();
+ } catch (CacheNotFoundException e) {
+ LOG.warning("Cache miss on " + ContentDigests.toString(e.getMissingDigest()));
+ return ExecuteReply.newBuilder()
+ .setCasError(
+ CasStatus.newBuilder()
+ .setSucceeded(false)
+ .addMissingDigest(e.getMissingDigest())
+ .setError(CasStatus.ErrorCode.MISSING_DIGEST)
+ .setErrorDetail(e.toString()))
+ .setStatus(
+ ExecutionStatus.newBuilder()
+ .setExecuted(false)
+ .setSucceeded(false)
+ .setError(
+ e.getMissingDigest() == action.getCommandDigest()
+ ? ExecutionStatus.ErrorCode.MISSING_COMMAND
+ : ExecutionStatus.ErrorCode.MISSING_INPUT)
+ .setErrorDetail(e.toString()))
+ .build();
}
- if (!options.debug) {
- FileSystemUtils.deleteTree(tempRoot);
- } else {
- LOG.warning("Preserving work directory " + tempRoot.toString() + ".");
- }
- } catch (IOException | InterruptedException e) {
- ExecuteReply.Builder reply = ExecuteReply.newBuilder();
- reply.getStatusBuilder().setSucceeded(false).setErrorDetail(e.toString());
- responseObserver.onNext(reply.build());
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
+ }
+
+ @Override
+ public void execute(ExecuteRequest request, StreamObserver<ExecuteReply> responseObserver) {
+ Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString());
+ try {
+ tempRoot.createDirectory();
+ if (LOG_FINER) {
+ LOG.fine(
+ "Work received has "
+ + request.getTotalInputFileCount()
+ + " input files and "
+ + request.getAction().getOutputPathCount()
+ + " output files.");
+ }
+ ExecuteReply reply = execute(request.getAction(), tempRoot);
+ responseObserver.onNext(reply);
+ if (options.debug) {
+ if (!reply.getStatus().getSucceeded()) {
+ LOG.warning("Work failed. Request: " + request.toString() + ".");
+ } else if (LOG_FINER) {
+ LOG.fine("Work completed.");
+ }
+ }
+ if (!options.debug) {
+ FileSystemUtils.deleteTree(tempRoot);
+ } else {
+ LOG.warning("Preserving work directory " + tempRoot.toString() + ".");
+ }
+ } catch (IOException | InterruptedException e) {
+ ExecuteReply.Builder reply = ExecuteReply.newBuilder();
+ reply.getStatusBuilder().setSucceeded(false).setErrorDetail(e.toString());
+ responseObserver.onNext(reply.build());
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ } finally {
+ responseObserver.onCompleted();
}
- } finally {
- responseObserver.onCompleted();
}
}
@@ -223,9 +473,11 @@ public class RemoteWorker extends ExecuteServiceImplBase {
return;
}
- System.out.println("*** Starting Hazelcast server.");
- ConcurrentMapActionCache cache =
- new ConcurrentMapActionCache(ConcurrentMapFactory.createHazelcast(remoteOptions));
+ System.out.println("*** Initializing in-memory cache server.");
+ ConcurrentMap<String, byte[]> cache =
+ ConcurrentMapFactory.isRemoteCacheOptions(remoteOptions)
+ ? ConcurrentMapFactory.create(remoteOptions)
+ : new ConcurrentHashMap<String, byte[]>();
System.out.println(
"*** Starting grpc server on all locally bound IPs on port "
@@ -233,9 +485,14 @@ public class RemoteWorker extends ExecuteServiceImplBase {
+ ".");
Path workPath = getFileSystem().getPath(remoteWorkerOptions.workPath);
FileSystemUtils.createDirectoryAndParents(workPath);
- RemoteWorker worker = new RemoteWorker(workPath, remoteOptions, remoteWorkerOptions, cache);
+ RemoteWorker worker =
+ new RemoteWorker(workPath, remoteWorkerOptions, new ConcurrentMapActionCache(cache));
final Server server =
- ServerBuilder.forPort(remoteWorkerOptions.listenPort).addService(worker).build();
+ ServerBuilder.forPort(remoteWorkerOptions.listenPort)
+ .addService(worker.getCasServer())
+ .addService(worker.getExecutionServer())
+ .addService(worker.getExecCacheServer())
+ .build();
server.start();
final Path pidFile;
@@ -270,7 +527,7 @@ public class RemoteWorker extends ExecuteServiceImplBase {
}
public static void printUsage(OptionsParser parser) {
- System.out.println("Usage: remote_worker \n\n" + "Starts a worker that runs a RPC service.");
+ System.out.println("Usage: remote_worker \n\n" + "Starts a worker that runs a gRPC service.");
System.out.println(
parser.describeOptions(
Collections.<String, String>emptyMap(), OptionsParser.HelpVerbosity.LONG));