aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/tools/remote_worker
diff options
context:
space:
mode:
authorGravatar olaola <olaola@google.com>2017-12-05 12:41:19 -0800
committerGravatar Copybara-Service <copybara-piper@google.com>2017-12-05 12:43:25 -0800
commit2f3d7df5750ec7f3ac293c92c31d2969af2dd1cf (patch)
tree20c679cba7b15b05e87e58d3f5b21444fdde9286 /src/tools/remote_worker
parent110432fdfe2b030745bdf8aaa750bbcf9d313327 (diff)
Moving the RemoteWorker into tools/remote directory.
This is because I want to add another remote execution related tool, the remote_client, which will use the Remote Execution API to fetch blobs from a remote cache. I will use this tool as part of end-to-end tests for remote execution. TESTED=remote integration tests, presubmit RELNOTES: None PiperOrigin-RevId: 177995895
Diffstat (limited to 'src/tools/remote_worker')
-rw-r--r--src/tools/remote_worker/BUILD16
-rw-r--r--src/tools/remote_worker/README.md42
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ActionCacheServer.java74
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD43
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java275
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/CasServer.java83
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java407
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java336
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorkerOptions.java172
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java77
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java112
11 files changed, 0 insertions, 1637 deletions
diff --git a/src/tools/remote_worker/BUILD b/src/tools/remote_worker/BUILD
deleted file mode 100644
index 3a45276870..0000000000
--- a/src/tools/remote_worker/BUILD
+++ /dev/null
@@ -1,16 +0,0 @@
-filegroup(
- name = "srcs",
- srcs = glob(["**"]) + ["//src/tools/remote_worker/src/main/java/com/google/devtools/build/remote:srcs"],
- visibility = ["//src:__pkg__"],
-)
-
-java_binary(
- name = "remote_worker",
- jvm_flags = [
- # Enables REST for Hazelcast server for testing.
- "-Dhazelcast.rest.enabled=true",
- ],
- main_class = "com.google.devtools.build.remote.RemoteWorker",
- visibility = ["//visibility:public"],
- runtime_deps = ["//src/tools/remote_worker/src/main/java/com/google/devtools/build/remote"],
-)
diff --git a/src/tools/remote_worker/README.md b/src/tools/remote_worker/README.md
deleted file mode 100644
index 408ab67d47..0000000000
--- a/src/tools/remote_worker/README.md
+++ /dev/null
@@ -1,42 +0,0 @@
-# Remote worker
-
-This program implements a remote execution worker that uses gRPC to accept work
-requests. It can work as a remote execution worker, a cache worker, or both.
-The simplest setup is as follows:
-
-- First build remote_worker and run it.
-
- bazel build src/tools/remote_worker:all
- bazel-bin/src/tools/remote_worker/remote_worker \
- --work_path=/tmp/test \
- --listen_port=8080
-
-- Then you run Bazel pointing to the remote_worker instance.
-
- bazel build \
- --spawn_strategy=remote --remote_cache=localhost:8080 \
- --remote_executor=localhost:8080 src/tools/generate_workspace:all
-
-The above command will build generate_workspace with remote spawn strategy that
-uses the local worker as the distributed caching and execution backend.
-
-## Sandboxing
-
-If you run the remote_worker on Linux, you can also enable sandboxing for increased hermeticity:
-
- bazel-bin/src/tools/remote_worker/remote_worker \
- --work_path=/tmp/test \
- --listen_port=8080 \
- --sandboxing \
- --sandboxing_writable_path=/run/shm \
- --sandboxing_tmpfs_dir=/tmp \
- --sandboxing_block_network
-
-As you can see, the specific behavior of the sandbox can be tuned via the flags
-
-- --sandboxing_writable_path=<path> makes a path writable for running actions.
-- --sandboxing_tmpfs_dir=<path> will mount a fresh, empty tmpfs for each running action on a path.
-- --sandboxing_block_network will put each running action into its own network namespace that has
- no network connectivity except for its own "localhost". Note that due to a Linux kernel issue this
- might result in a loss of performance if you run many actions in parallel. For long running tests
- it probably won't matter much, though.
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ActionCacheServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ActionCacheServer.java
deleted file mode 100644
index 8e9ba2fafb..0000000000
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ActionCacheServer.java
+++ /dev/null
@@ -1,74 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.remote;
-
-import static java.util.logging.Level.WARNING;
-
-import com.google.devtools.build.lib.remote.DigestUtil;
-import com.google.devtools.build.lib.remote.DigestUtil.ActionKey;
-import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
-import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
-import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
-import io.grpc.stub.StreamObserver;
-import java.util.logging.Logger;
-
-/** A basic implementation of an {@link ActionCacheImplBase} service. */
-final class ActionCacheServer extends ActionCacheImplBase {
- private static final Logger logger = Logger.getLogger(ActionCacheImplBase.class.getName());
-
- private final SimpleBlobStoreActionCache cache;
- private final DigestUtil digestUtil;
-
- public ActionCacheServer(SimpleBlobStoreActionCache cache, DigestUtil digestUtil) {
- this.cache = cache;
- this.digestUtil = digestUtil;
- }
-
- @Override
- public void getActionResult(
- GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
- try {
- ActionKey actionKey = digestUtil.asActionKey(request.getActionDigest());
- ActionResult result = cache.getCachedActionResult(actionKey);
-
- if (result == null) {
- responseObserver.onError(StatusUtils.notFoundError(request.getActionDigest()));
- return;
- }
-
- responseObserver.onNext(result);
- responseObserver.onCompleted();
- } catch (Exception e) {
- logger.log(WARNING, "getActionResult request failed.", e);
- responseObserver.onError(StatusUtils.internalError(e));
- }
- }
-
- @Override
- public void updateActionResult(
- UpdateActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
- try {
- ActionKey actionKey = digestUtil.asActionKey(request.getActionDigest());
- cache.setCachedActionResult(actionKey, request.getActionResult());
- responseObserver.onNext(request.getActionResult());
- responseObserver.onCompleted();
- } catch (Exception e) {
- logger.log(WARNING, "updateActionResult request failed.", e);
- responseObserver.onError(StatusUtils.internalError(e));
- }
- }
-}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
deleted file mode 100644
index 8f36c4d0a8..0000000000
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
+++ /dev/null
@@ -1,43 +0,0 @@
-filegroup(
- name = "srcs",
- srcs = glob(["**"]),
- visibility = ["//src/tools/remote_worker:__pkg__"],
-)
-
-java_library(
- name = "remote",
- srcs = glob(["*.java"]),
- data = ["//src:libunix"],
- resources = ["//src/main/tools:linux-sandbox"],
- visibility = ["//src/tools/remote_worker:__subpackages__"],
- deps = [
- "//src/main/java/com/google/devtools/build/lib:build-base",
- "//src/main/java/com/google/devtools/build/lib:os_util",
- "//src/main/java/com/google/devtools/build/lib:packages-internal",
- "//src/main/java/com/google/devtools/build/lib:process_util",
- "//src/main/java/com/google/devtools/build/lib:single-line-formatter",
- "//src/main/java/com/google/devtools/build/lib:unix",
- "//src/main/java/com/google/devtools/build/lib:util",
- "//src/main/java/com/google/devtools/build/lib/actions",
- "//src/main/java/com/google/devtools/build/lib/remote",
- "//src/main/java/com/google/devtools/build/lib/shell",
- "//src/main/java/com/google/devtools/build/lib/vfs",
- "//src/main/java/com/google/devtools/common/options",
- "//third_party:guava",
- "//third_party:hazelcast",
- "//third_party:netty",
- "//third_party/grpc:grpc-jar",
- "//third_party/protobuf:protobuf_java",
- "//third_party/protobuf:protobuf_java_util",
- "@googleapis//:google_bytestream_bytestream_java_grpc",
- "@googleapis//:google_bytestream_bytestream_java_proto",
- "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
- "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
- "@googleapis//:google_longrunning_operations_java_proto",
- "@googleapis//:google_rpc_code_java_proto",
- "@googleapis//:google_rpc_error_details_java_proto",
- "@googleapis//:google_rpc_status_java_proto",
- "@googleapis//:google_watch_v1_java_grpc",
- "@googleapis//:google_watch_v1_java_proto",
- ],
-)
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
deleted file mode 100644
index 778d74335a..0000000000
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
+++ /dev/null
@@ -1,275 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.remote;
-
-import static java.util.logging.Level.SEVERE;
-import static java.util.logging.Level.WARNING;
-
-import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
-import com.google.bytestream.ByteStreamProto.ReadRequest;
-import com.google.bytestream.ByteStreamProto.ReadResponse;
-import com.google.bytestream.ByteStreamProto.WriteRequest;
-import com.google.bytestream.ByteStreamProto.WriteResponse;
-import com.google.devtools.build.lib.remote.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.Chunker;
-import com.google.devtools.build.lib.remote.DigestUtil;
-import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
-import com.google.devtools.build.lib.vfs.FileSystemUtils;
-import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import io.grpc.Status;
-import io.grpc.protobuf.StatusProto;
-import io.grpc.stub.StreamObserver;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.UUID;
-import java.util.logging.Logger;
-import javax.annotation.Nullable;
-
-/** A basic implementation of a {@link ByteStreamImplBase} service. */
-final class ByteStreamServer extends ByteStreamImplBase {
- private static final Logger logger = Logger.getLogger(ByteStreamServer.class.getName());
- private final SimpleBlobStoreActionCache cache;
- private final Path workPath;
- private final DigestUtil digestUtil;
-
- static @Nullable Digest parseDigestFromResourceName(String resourceName) {
- try {
- String[] tokens = resourceName.split("/");
- if (tokens.length < 2) {
- return null;
- }
- String hash = tokens[tokens.length - 2];
- long size = Long.parseLong(tokens[tokens.length - 1]);
- return DigestUtil.buildDigest(hash, size);
- } catch (NumberFormatException e) {
- return null;
- }
- }
-
- public ByteStreamServer(SimpleBlobStoreActionCache cache, Path workPath, DigestUtil digestUtil) {
- this.cache = cache;
- this.workPath = workPath;
- this.digestUtil = digestUtil;
- }
-
- @Override
- public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
- Digest digest = parseDigestFromResourceName(request.getResourceName());
-
- if (digest == null) {
- responseObserver.onError(
- StatusUtils.invalidArgumentError(
- "resource_name",
- "Failed parsing digest from resource_name:" + request.getResourceName()));
- }
-
- try {
- // This still relies on the blob size to be small enough to fit in memory.
- // TODO(olaola): refactor to fix this if the need arises.
- Chunker c = new Chunker(cache.downloadBlob(digest), digestUtil);
- while (c.hasNext()) {
- responseObserver.onNext(
- ReadResponse.newBuilder().setData(c.next().getData()).build());
- }
- responseObserver.onCompleted();
- } catch (CacheNotFoundException e) {
- responseObserver.onError(StatusUtils.notFoundError(digest));
- } catch (Exception e) {
- logger.log(WARNING, "Read request failed.", e);
- responseObserver.onError(StatusUtils.internalError(e));
- }
- }
-
- @Override
- public StreamObserver<WriteRequest> write(final StreamObserver<WriteResponse> responseObserver) {
- Path temp = workPath.getRelative("upload").getRelative(UUID.randomUUID().toString());
- try {
- FileSystemUtils.createDirectoryAndParents(temp.getParentDirectory());
- temp.getOutputStream().close();
- } catch (IOException e) {
- logger.log(SEVERE, "Failed to create temporary file for upload", e);
- responseObserver.onError(StatusUtils.internalError(e));
- // We need to make sure that subsequent onNext or onCompleted calls don't make any further
- // calls on the responseObserver after the onError above, so we return a no-op observer.
- return new NoOpStreamObserver<>();
- }
- return new StreamObserver<WriteRequest>() {
- private Digest digest;
- private long offset;
- private String resourceName;
- private boolean closed;
-
- @Override
- public void onNext(WriteRequest request) {
- if (closed) {
- return;
- }
-
- if (digest == null) {
- resourceName = request.getResourceName();
- digest = parseDigestFromResourceName(resourceName);
- }
-
- if (digest == null) {
- responseObserver.onError(
- StatusUtils.invalidArgumentError(
- "resource_name",
- "Failed parsing digest from resource_name: " + request.getResourceName()));
- closed = true;
- return;
- }
-
- if (offset == 0) {
- try {
- if (cache.containsKey(digest)) {
- responseObserver.onNext(
- WriteResponse.newBuilder().setCommittedSize(digest.getSizeBytes()).build());
- responseObserver.onCompleted();
- closed = true;
- return;
- }
- } catch (InterruptedException e) {
- responseObserver.onError(StatusUtils.interruptedError(digest));
- Thread.currentThread().interrupt();
- closed = true;
- return;
- } catch (IOException e) {
- responseObserver.onError(StatusUtils.internalError(e));
- closed = true;
- return;
- }
- }
-
- if (request.getWriteOffset() != offset) {
- responseObserver.onError(
- StatusUtils.invalidArgumentError(
- "write_offset",
- "Expected: " + offset + ", received: " + request.getWriteOffset()));
- closed = true;
- return;
- }
-
- if (!request.getResourceName().isEmpty()
- && !request.getResourceName().equals(resourceName)) {
- responseObserver.onError(
- StatusUtils.invalidArgumentError(
- "resource_name",
- "Expected: " + resourceName + ", received: " + request.getResourceName()));
- closed = true;
- return;
- }
-
- long size = request.getData().size();
-
- if (size > 0) {
- try (OutputStream out = temp.getOutputStream(true)) {
- request.getData().writeTo(out);
- } catch (IOException e) {
- responseObserver.onError(StatusUtils.internalError(e));
- closed = true;
- return;
- }
- offset += size;
- }
-
- boolean shouldFinishWrite = offset == digest.getSizeBytes();
-
- if (shouldFinishWrite != request.getFinishWrite()) {
- responseObserver.onError(
- StatusUtils.invalidArgumentError(
- "finish_write",
- "Expected:" + shouldFinishWrite + ", received: " + request.getFinishWrite()));
- closed = true;
- return;
- }
- }
-
- @Override
- public void onError(Throwable t) {
- if (Status.fromThrowable(t).getCode() != Status.Code.CANCELLED) {
- logger.log(WARNING, "Write request failed remotely.", t);
- }
- closed = true;
- try {
- temp.delete();
- } catch (IOException e) {
- logger.log(WARNING, "Could not delete temp file.", e);
- }
- }
-
- @Override
- public void onCompleted() {
- if (closed) {
- return;
- }
-
- if (digest == null || offset != digest.getSizeBytes()) {
- responseObserver.onError(
- StatusProto.toStatusRuntimeException(
- com.google.rpc.Status.newBuilder()
- .setCode(Status.Code.FAILED_PRECONDITION.value())
- .setMessage("Request completed before all data was sent.")
- .build()));
- closed = true;
- return;
- }
-
- try {
- Digest d = digestUtil.compute(temp);
- try (InputStream in = temp.getInputStream()) {
- cache.uploadStream(d, in);
- }
- try {
- temp.delete();
- } catch (IOException e) {
- logger.log(WARNING, "Could not delete temp file.", e);
- }
-
- if (!d.equals(digest)) {
- responseObserver.onError(
- StatusUtils.invalidArgumentError(
- "resource_name",
- "Received digest " + digest + " does not match computed digest " + d));
- closed = true;
- return;
- }
-
- responseObserver.onNext(WriteResponse.newBuilder().setCommittedSize(offset).build());
- responseObserver.onCompleted();
- } catch (Exception e) {
- logger.log(WARNING, "Write request failed.", e);
- responseObserver.onError(StatusUtils.internalError(e));
- closed = true;
- }
- }
- };
- }
-
- private static class NoOpStreamObserver<T> implements StreamObserver<T> {
- @Override
- public void onNext(T value) {
- }
-
- @Override
- public void onError(Throwable t) {
- }
-
- @Override
- public void onCompleted() {
- }
- }
-}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/CasServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/CasServer.java
deleted file mode 100644
index 13e42b239d..0000000000
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/CasServer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.remote;
-
-import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
-import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsRequest;
-import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsResponse;
-import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
-import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
-import com.google.devtools.remoteexecution.v1test.UpdateBlobRequest;
-import com.google.rpc.Code;
-import io.grpc.stub.StreamObserver;
-import java.io.IOException;
-
-/** A basic implementation of a {@link ContentAddressableStorageImplBase} service. */
-final class CasServer extends ContentAddressableStorageImplBase {
- private final SimpleBlobStoreActionCache cache;
-
- public CasServer(SimpleBlobStoreActionCache cache) {
- this.cache = cache;
- }
-
- @Override
- public void findMissingBlobs(
- FindMissingBlobsRequest request, StreamObserver<FindMissingBlobsResponse> responseObserver) {
- FindMissingBlobsResponse.Builder response = FindMissingBlobsResponse.newBuilder();
-
- try {
- for (Digest digest : request.getBlobDigestsList()) {
- try {
- if (!cache.containsKey(digest)) {
- response.addMissingBlobDigests(digest);
- }
- } catch (InterruptedException e) {
- responseObserver.onError(StatusUtils.interruptedError(digest));
- Thread.currentThread().interrupt();
- return;
- }
- }
- responseObserver.onNext(response.build());
- responseObserver.onCompleted();
- } catch (IOException e) {
- responseObserver.onError(StatusUtils.internalError(e));
- }
- }
-
- @Override
- public void batchUpdateBlobs(
- BatchUpdateBlobsRequest request, StreamObserver<BatchUpdateBlobsResponse> responseObserver) {
- BatchUpdateBlobsResponse.Builder batchResponse = BatchUpdateBlobsResponse.newBuilder();
- for (UpdateBlobRequest r : request.getRequestsList()) {
- BatchUpdateBlobsResponse.Response.Builder resp = batchResponse.addResponsesBuilder();
- try {
- Digest digest = cache.uploadBlob(r.getData().toByteArray());
- if (!r.getContentDigest().equals(digest)) {
- String err =
- "Upload digest " + r.getContentDigest() + " did not match data digest: " + digest;
- resp.setStatus(StatusUtils.invalidArgumentStatus("content_digest", err));
- continue;
- }
- resp.getStatusBuilder().setCode(Code.OK.getNumber());
- } catch (Exception e) {
- resp.setStatus(StatusUtils.internalErrorStatus(e));
- }
- }
- responseObserver.onNext(batchResponse.build());
- responseObserver.onCompleted();
- }
-}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java
deleted file mode 100644
index 0b3f2780bf..0000000000
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java
+++ /dev/null
@@ -1,407 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.remote;
-
-import static java.util.logging.Level.FINE;
-import static java.util.logging.Level.INFO;
-import static java.util.logging.Level.SEVERE;
-import static java.util.logging.Level.WARNING;
-
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.devtools.build.lib.remote.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.DigestUtil;
-import com.google.devtools.build.lib.remote.DigestUtil.ActionKey;
-import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
-import com.google.devtools.build.lib.remote.TracingMetadataUtils;
-import com.google.devtools.build.lib.shell.AbnormalTerminationException;
-import com.google.devtools.build.lib.shell.Command;
-import com.google.devtools.build.lib.shell.CommandException;
-import com.google.devtools.build.lib.shell.CommandResult;
-import com.google.devtools.build.lib.shell.FutureCommandResult;
-import com.google.devtools.build.lib.vfs.FileSystemUtils;
-import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.remoteexecution.v1test.Action;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Command.EnvironmentVariable;
-import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
-import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
-import com.google.devtools.remoteexecution.v1test.Platform;
-import com.google.devtools.remoteexecution.v1test.RequestMetadata;
-import com.google.longrunning.Operation;
-import com.google.protobuf.util.Durations;
-import com.google.rpc.Code;
-import com.google.rpc.Status;
-import io.grpc.Context;
-import io.grpc.StatusException;
-import io.grpc.protobuf.StatusProto;
-import io.grpc.stub.StreamObserver;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileAlreadyExistsException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/** A basic implementation of an {@link ExecutionImplBase} service. */
-final class ExecutionServer extends ExecutionImplBase {
- private static final Logger logger = Logger.getLogger(ExecutionServer.class.getName());
-
- private final Object lock = new Object();
-
- // The name of the container image entry in the Platform proto
- // (see third_party/googleapis/devtools/remoteexecution/*/remote_execution.proto and
- // experimental_remote_platform_override in
- // src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java)
- private static final String CONTAINER_IMAGE_ENTRY_NAME = "container-image";
- private static final String DOCKER_IMAGE_PREFIX = "docker://";
-
- // How long to wait for the uid command.
- private static final Duration uidTimeout = Duration.ofMillis(30);
-
- private static final int LOCAL_EXEC_ERROR = -1;
-
- private final Path workPath;
- private final Path sandboxPath;
- private final RemoteWorkerOptions workerOptions;
- private final SimpleBlobStoreActionCache cache;
- private final ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache;
- private final ListeningExecutorService executorService;
- private final DigestUtil digestUtil;
-
- public ExecutionServer(
- Path workPath,
- Path sandboxPath,
- RemoteWorkerOptions workerOptions,
- SimpleBlobStoreActionCache cache,
- ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache,
- DigestUtil digestUtil) {
- this.workPath = workPath;
- this.sandboxPath = sandboxPath;
- this.workerOptions = workerOptions;
- this.cache = cache;
- this.operationsCache = operationsCache;
- this.digestUtil = digestUtil;
- ThreadPoolExecutor realExecutor = new ThreadPoolExecutor(
- // This is actually the max number of concurrent jobs.
- workerOptions.jobs,
- // Since we use an unbounded queue, the executor ignores this value, but it still checks
- // that it is greater or equal to the value above.
- workerOptions.jobs,
- // Shut down idle threads after one minute. Threads aren't all that expensive, but we also
- // don't need to keep them around if we don't need them.
- 1, TimeUnit.MINUTES,
- // We use an unbounded queue for now.
- // TODO(ulfjack): We need to reject work eventually.
- new LinkedBlockingQueue<>(),
- new ThreadFactoryBuilder().setNameFormat("subprocess-handler-%d").build());
- // Allow the core threads to die.
- realExecutor.allowCoreThreadTimeOut(true);
- this.executorService = MoreExecutors.listeningDecorator(realExecutor);
- }
-
- @Override
- public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
- final String opName = UUID.randomUUID().toString();
- ListenableFuture<ActionResult> future =
- executorService.submit(Context.current().wrap(() -> execute(request, opName)));
- operationsCache.put(opName, future);
- responseObserver.onNext(Operation.newBuilder().setName(opName).build());
- responseObserver.onCompleted();
- }
-
- private ActionResult execute(ExecuteRequest request, String id)
- throws IOException, InterruptedException, StatusException {
- Path tempRoot = workPath.getRelative("build-" + id);
- String workDetails = "";
- try {
- tempRoot.createDirectory();
- RequestMetadata meta = TracingMetadataUtils.fromCurrentContext();
- workDetails =
- String.format(
- "build-request-id: %s command-id: %s action-id: %s",
- meta.getCorrelatedInvocationsId(), meta.getToolInvocationId(), meta.getActionId());
- logger.log(FINE, "Received work for: {0}", workDetails);
- ActionResult result = execute(request.getAction(), tempRoot);
- logger.log(FINE, "Completed {0}.", workDetails);
- return result;
- } catch (Exception e) {
- logger.log(Level.SEVERE, "Work failed: {0} {1}.", new Object[] {workDetails, e});
- throw e;
- } finally {
- if (workerOptions.debug) {
- logger.log(INFO, "Preserving work directory {0}.", tempRoot);
- } else {
- try {
- FileSystemUtils.deleteTree(tempRoot);
- } catch (IOException e) {
- logger.log(SEVERE,
- String.format(
- "Failed to delete tmp directory %s: %s",
- tempRoot, Throwables.getStackTraceAsString(e)));
- }
- }
- }
- }
-
- private ActionResult execute(Action action, Path execRoot)
- throws IOException, InterruptedException, StatusException {
- com.google.devtools.remoteexecution.v1test.Command command = null;
- try {
- command =
- com.google.devtools.remoteexecution.v1test.Command.parseFrom(
- cache.downloadBlob(action.getCommandDigest()));
- cache.downloadTree(action.getInputRootDigest(), execRoot);
- } catch (CacheNotFoundException e) {
- throw StatusUtils.notFoundError(e.getMissingDigest());
- }
-
- List<Path> outputs = new ArrayList<>(action.getOutputFilesList().size());
- for (String output : action.getOutputFilesList()) {
- Path file = execRoot.getRelative(output);
- if (file.exists()) {
- throw new FileAlreadyExistsException("Output file already exists: " + file);
- }
- FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
- outputs.add(file);
- }
- // TODO(olaola): support output directories.
-
- // TODO(ulfjack): This is basically a copy of LocalSpawnRunner. Ideally, we'd use that
- // implementation instead of copying it.
- Command cmd =
- getCommand(
- action,
- command.getArgumentsList(),
- getEnvironmentVariables(command),
- execRoot.getPathString());
- long startTime = System.currentTimeMillis();
- CommandResult cmdResult = null;
-
- FutureCommandResult futureCmdResult = null;
- synchronized (lock) {
- // Linux does not provide a safe API for a multi-threaded program to fork a subprocess.
- // Consider the case where two threads both write an executable file and then try to execute
- // it. It can happen that the first thread writes its executable file, with the file
- // descriptor still being open when the second thread forks, with the fork inheriting a copy
- // of the file descriptor. Then the first thread closes the original file descriptor, and
- // proceeds to execute the file. At that point Linux sees an open file descriptor to the file
- // and returns ETXTBSY (Text file busy) as an error. This race is inherent in the fork / exec
- // duality, with fork always inheriting a copy of the file descriptor table; if there was a
- // way to fork without copying the entire file descriptor table (e.g., only copy specific
- // entries), we could avoid this race.
- //
- // I was able to reproduce this problem reliably by running significantly more threads than
- // there are CPU cores on my workstation - the more threads the more likely it happens.
- //
- // As a workaround, we put a synchronized block around the fork.
- try {
- futureCmdResult = cmd.executeAsync();
- } catch (CommandException e) {
- Throwables.throwIfInstanceOf(e.getCause(), IOException.class);
- }
- }
-
- if (futureCmdResult != null) {
- try {
- cmdResult = futureCmdResult.get();
- } catch (AbnormalTerminationException e) {
- cmdResult = e.getResult();
- }
- }
-
- long timeoutMillis =
- action.hasTimeout()
- ? Durations.toMillis(action.getTimeout())
- : TimeUnit.MINUTES.toMillis(15);
- boolean wasTimeout =
- (cmdResult != null && cmdResult.getTerminationStatus().timedOut())
- || wasTimeout(timeoutMillis, System.currentTimeMillis() - startTime);
- final int exitCode;
- if (wasTimeout) {
- final String errMessage =
- String.format(
- "Command:\n%s\nexceeded deadline of %f seconds.",
- Arrays.toString(command.getArgumentsList().toArray()), timeoutMillis / 1000.0);
- logger.warning(errMessage);
- throw StatusProto.toStatusException(
- Status.newBuilder()
- .setCode(Code.DEADLINE_EXCEEDED.getNumber())
- .setMessage(errMessage)
- .build());
- } else if (cmdResult == null) {
- exitCode = LOCAL_EXEC_ERROR;
- } else {
- exitCode = cmdResult.getTerminationStatus().getRawExitCode();
- }
-
- ActionResult.Builder result = ActionResult.newBuilder();
- cache.upload(result, execRoot, outputs);
- byte[] stdout = cmdResult.getStdout();
- byte[] stderr = cmdResult.getStderr();
- cache.uploadOutErr(result, stdout, stderr);
- ActionResult finalResult = result.setExitCode(exitCode).build();
- if (exitCode == 0) {
- ActionKey actionKey = digestUtil.computeActionKey(action);
- cache.setCachedActionResult(actionKey, finalResult);
- }
- return finalResult;
- }
-
- private boolean wasTimeout(long timeoutMillis, long wallTimeMillis) {
- return timeoutMillis > 0 && wallTimeMillis > timeoutMillis;
- }
-
- private Map<String, String> getEnvironmentVariables(
- com.google.devtools.remoteexecution.v1test.Command command) {
- HashMap<String, String> result = new HashMap<>();
- for (EnvironmentVariable v : command.getEnvironmentVariablesList()) {
- result.put(v.getName(), v.getValue());
- }
- return result;
- }
-
- // Gets the uid of the current user. If uid could not be successfully fetched (e.g., on other
- // platforms, if for some reason the timeout was not met, if "id -u" returned non-numeric
- // number, etc), logs a WARNING and return -1.
- // This is used to set "-u UID" flag for commands running inside Docker containers. There are
- // only a small handful of cases where uid is vital (e.g., if strict permissions are set on the
- // output files), so most use cases would work without setting uid.
- private long getUid() {
- Command cmd =
- new Command(new String[] {"id", "-u"}, /*env=*/null, /*workingDir=*/null, uidTimeout);
- try {
- ByteArrayOutputStream stdout = new ByteArrayOutputStream();
- ByteArrayOutputStream stderr = new ByteArrayOutputStream();
- cmd.execute(stdout, stderr);
- return Long.parseLong(stdout.toString().trim());
- } catch (CommandException | NumberFormatException e) {
- logger.log(
- WARNING, "Could not get UID for passing to Docker container. Proceeding without it.", e);
- return -1;
- }
- }
-
- // Checks Action for docker container definition. If no docker container specified, returns
- // null. Otherwise returns docker container name from the parameters.
- private String dockerContainer(Action action) throws StatusException {
- String result = null;
- for (Platform.Property property : action.getPlatform().getPropertiesList()) {
- if (property.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) {
- if (result != null) {
- // Multiple container name entries
- throw StatusUtils.invalidArgumentError(
- "platform", // Field name.
- String.format(
- "Multiple entries for %s in action.Platform", CONTAINER_IMAGE_ENTRY_NAME));
- }
- result = property.getValue();
- if (!result.startsWith(DOCKER_IMAGE_PREFIX)) {
- throw StatusUtils.invalidArgumentError(
- "platform", // Field name.
- String.format(
- "%s: Docker images must be stored in gcr.io with an image spec in the form "
- + "'docker://gcr.io/{IMAGE_NAME}'",
- CONTAINER_IMAGE_ENTRY_NAME));
- }
- result = result.substring(DOCKER_IMAGE_PREFIX.length());
- }
- }
- return result;
- }
-
- // Takes an Action and parameters that can be used to create a Command. Returns the Command.
- // If no docker container is specified inside Action, creates a Command straight from the
- // arguments. Otherwise, returns a Command that would run the specified command inside the
- // specified docker container.
- private Command getCommand(
- Action action,
- List<String> commandLineElements,
- Map<String, String> environmentVariables,
- String pathString) throws StatusException {
- String container = dockerContainer(action);
- if (container != null) {
- // Run command inside a docker container.
- ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.size());
- newCommandLineElements.add("docker");
- newCommandLineElements.add("run");
-
- long uid = getUid();
- if (uid >= 0) {
- newCommandLineElements.add("-u");
- newCommandLineElements.add(Long.toString(uid));
- }
-
- String dockerPathString = pathString + "-docker";
- newCommandLineElements.add("-v");
- newCommandLineElements.add(pathString + ":" + dockerPathString);
- newCommandLineElements.add("-w");
- newCommandLineElements.add(dockerPathString);
-
- for (Map.Entry<String, String> entry : environmentVariables.entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
-
- newCommandLineElements.add("-e");
- newCommandLineElements.add(key + "=" + value);
- }
-
- newCommandLineElements.add(container);
-
- newCommandLineElements.addAll(commandLineElements);
-
- return new Command(newCommandLineElements.toArray(new String[0]), null, new File(pathString));
- } else if (sandboxPath != null) {
- // Run command with sandboxing.
- ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.size());
- newCommandLineElements.add(sandboxPath.getPathString());
- if (workerOptions.sandboxingBlockNetwork) {
- newCommandLineElements.add("-N");
- }
- for (String writablePath : workerOptions.sandboxingWritablePaths) {
- newCommandLineElements.add("-w");
- newCommandLineElements.add(writablePath);
- }
- for (String tmpfsDir : workerOptions.sandboxingTmpfsDirs) {
- newCommandLineElements.add("-e");
- newCommandLineElements.add(tmpfsDir);
- }
- newCommandLineElements.add("--");
- newCommandLineElements.addAll(commandLineElements);
- return new Command(
- newCommandLineElements.toArray(new String[0]),
- environmentVariables,
- new File(pathString));
- } else {
- // Just run the command.
- return new Command(
- commandLineElements.toArray(new String[0]), environmentVariables, new File(pathString));
- }
- }
-}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
deleted file mode 100644
index c05076a627..0000000000
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
+++ /dev/null
@@ -1,336 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.remote;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.logging.Level.FINE;
-import static java.util.logging.Level.INFO;
-import static java.util.logging.Level.SEVERE;
-
-import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.io.ByteStreams;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.devtools.build.lib.remote.DigestUtil;
-import com.google.devtools.build.lib.remote.RemoteOptions;
-import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
-import com.google.devtools.build.lib.remote.SimpleBlobStoreFactory;
-import com.google.devtools.build.lib.remote.TracingMetadataUtils;
-import com.google.devtools.build.lib.remote.blobstore.ConcurrentMapBlobStore;
-import com.google.devtools.build.lib.remote.blobstore.OnDiskBlobStore;
-import com.google.devtools.build.lib.remote.blobstore.SimpleBlobStore;
-import com.google.devtools.build.lib.shell.Command;
-import com.google.devtools.build.lib.shell.CommandException;
-import com.google.devtools.build.lib.shell.CommandResult;
-import com.google.devtools.build.lib.unix.UnixFileSystem;
-import com.google.devtools.build.lib.util.OS;
-import com.google.devtools.build.lib.util.ProcessUtils;
-import com.google.devtools.build.lib.util.SingleLineFormatter;
-import com.google.devtools.build.lib.vfs.FileSystem;
-import com.google.devtools.build.lib.vfs.FileSystem.HashFunction;
-import com.google.devtools.build.lib.vfs.FileSystemUtils;
-import com.google.devtools.build.lib.vfs.JavaIoFileSystem;
-import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.build.lib.vfs.PathFragment;
-import com.google.devtools.common.options.OptionsParser;
-import com.google.devtools.common.options.OptionsParsingException;
-import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
-import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
-import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
-import com.hazelcast.config.Config;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-import io.grpc.Server;
-import io.grpc.ServerInterceptor;
-import io.grpc.ServerInterceptors;
-import io.grpc.netty.NettyServerBuilder;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Implements a remote worker that accepts work items as protobufs. The server implementation is
- * based on gRPC.
- */
-public final class RemoteWorker {
- // We need to keep references to the root and netty loggers to prevent them from being garbage
- // collected, which would cause us to loose their configuration.
- private static final Logger rootLogger = Logger.getLogger("");
- private static final Logger nettyLogger = Logger.getLogger("io.grpc.netty");
- private static final Logger logger = Logger.getLogger(RemoteWorker.class.getName());
-
- private final RemoteWorkerOptions workerOptions;
- private final ActionCacheImplBase actionCacheServer;
- private final ByteStreamImplBase bsServer;
- private final ContentAddressableStorageImplBase casServer;
- private final WatcherImplBase watchServer;
- private final ExecutionImplBase execServer;
-
- static FileSystem getFileSystem() {
- final HashFunction hashFunction;
- String value = null;
- try {
- value = System.getProperty("bazel.DigestFunction", "SHA256");
- hashFunction = new HashFunction.Converter().convert(value);
- } catch (OptionsParsingException e) {
- throw new Error("The specified hash function '" + value + "' is not supported.");
- }
- return OS.getCurrent() == OS.WINDOWS
- ? new JavaIoFileSystem(hashFunction)
- : new UnixFileSystem(hashFunction);
- }
-
- public RemoteWorker(
- FileSystem fs,
- RemoteWorkerOptions workerOptions,
- SimpleBlobStoreActionCache cache,
- Path sandboxPath,
- DigestUtil digestUtil)
- throws IOException {
- this.workerOptions = workerOptions;
- this.actionCacheServer = new ActionCacheServer(cache, digestUtil);
- Path workPath;
- if (workerOptions.workPath != null) {
- workPath = fs.getPath(workerOptions.workPath);
- } else {
- // TODO(ulfjack): The plan is to make the on-disk storage the default, so we always need to
- // provide a path to the remote worker, and we can then also use that as the work path. E.g.:
- // /given/path/cas/
- // /given/path/upload/
- // /given/path/work/
- // We could technically use a different path for temporary files and execution, but we want
- // the cas/ directory to be on the same file system as the upload/ and work/ directories so
- // that we can atomically move files between them, and / or use hard-links for the exec
- // directories.
- // For now, we use a temporary path if no work path was provided.
- workPath = fs.getPath("/tmp/remote-worker");
- }
- this.bsServer = new ByteStreamServer(cache, workPath, digestUtil);
- this.casServer = new CasServer(cache);
-
- if (workerOptions.workPath != null) {
- ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache =
- new ConcurrentHashMap<>();
- FileSystemUtils.createDirectoryAndParents(workPath);
- watchServer = new WatcherServer(operationsCache);
- execServer =
- new ExecutionServer(
- workPath, sandboxPath, workerOptions, cache, operationsCache, digestUtil);
- } else {
- watchServer = null;
- execServer = null;
- }
- }
-
- public Server startServer() throws IOException {
- ServerInterceptor headersInterceptor = new TracingMetadataUtils.ServerHeadersInterceptor();
- NettyServerBuilder b =
- NettyServerBuilder.forPort(workerOptions.listenPort)
- .addService(ServerInterceptors.intercept(actionCacheServer, headersInterceptor))
- .addService(ServerInterceptors.intercept(bsServer, headersInterceptor))
- .addService(ServerInterceptors.intercept(casServer, headersInterceptor));
-
- if (execServer != null) {
- b.addService(ServerInterceptors.intercept(execServer, headersInterceptor));
- b.addService(ServerInterceptors.intercept(watchServer, headersInterceptor));
- } else {
- logger.info("Execution disabled, only serving cache requests.");
- }
-
- Server server = b.build();
- logger.log(INFO, "Starting gRPC server on port {0,number,#}.", workerOptions.listenPort);
- server.start();
-
- return server;
- }
-
- private void createPidFile() throws IOException {
- if (workerOptions.pidFile == null) {
- return;
- }
-
- final Path pidFile = getFileSystem().getPath(workerOptions.pidFile);
- try (Writer writer =
- new OutputStreamWriter(pidFile.getOutputStream(), StandardCharsets.UTF_8)) {
- writer.write(Integer.toString(ProcessUtils.getpid()));
- writer.write("\n");
- }
-
- Runtime.getRuntime()
- .addShutdownHook(
- new Thread() {
- @Override
- public void run() {
- try {
- pidFile.delete();
- } catch (IOException e) {
- System.err.println("Cannot remove pid file: " + pidFile);
- }
- }
- });
- }
-
- /**
- * Construct a {@link SimpleBlobStore} using Hazelcast's version of {@link ConcurrentMap}. This
- * will start a standalone Hazelcast server in the same JVM. There will also be a REST server
- * started for accessing the maps.
- */
- private static SimpleBlobStore createHazelcast(RemoteWorkerOptions options) {
- Config config = new Config();
- config
- .getNetworkConfig()
- .setPort(options.hazelcastStandaloneListenPort)
- .getJoin()
- .getMulticastConfig()
- .setEnabled(false);
- HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);
- return new ConcurrentMapBlobStore(instance.<String, byte[]>getMap("cache"));
- }
-
- public static void main(String[] args) throws Exception {
- OptionsParser parser =
- OptionsParser.newOptionsParser(RemoteOptions.class, RemoteWorkerOptions.class);
- parser.parseAndExitUponError(args);
- RemoteOptions remoteOptions = parser.getOptions(RemoteOptions.class);
- RemoteWorkerOptions remoteWorkerOptions = parser.getOptions(RemoteWorkerOptions.class);
-
- rootLogger.getHandlers()[0].setFormatter(new SingleLineFormatter());
- if (remoteWorkerOptions.debug) {
- rootLogger.getHandlers()[0].setLevel(FINE);
- }
-
- // Only log severe log messages from Netty. Otherwise it logs warnings that look like this:
- //
- // 170714 08:16:28.552:WT 18 [io.grpc.netty.NettyServerHandler.onStreamError] Stream Error
- // io.netty.handler.codec.http2.Http2Exception$StreamException: Received DATA frame for an
- // unknown stream 11369
- //
- // As far as we can tell, these do not indicate any problem with the connection. We believe they
- // happen when the local side closes a stream, but the remote side hasn't received that
- // notification yet, so there may still be packets for that stream en-route to the local
- // machine. The wording 'unknown stream' is misleading - the stream was previously known, but
- // was recently closed. I'm told upstream discussed this, but didn't want to keep information
- // about closed streams around.
- nettyLogger.setLevel(Level.SEVERE);
-
- FileSystem fs = getFileSystem();
- Path sandboxPath = null;
- if (remoteWorkerOptions.sandboxing) {
- sandboxPath = prepareSandboxRunner(fs, remoteWorkerOptions);
- }
-
- logger.info("Initializing in-memory cache server.");
- boolean usingRemoteCache = SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions);
- if (!usingRemoteCache) {
- logger.warning("Not using remote cache. This should be used for testing only!");
- }
- if ((remoteWorkerOptions.casPath != null)
- && (!PathFragment.create(remoteWorkerOptions.casPath).isAbsolute()
- || !fs.getPath(remoteWorkerOptions.casPath).exists())) {
- logger.severe("--cas_path must refer to an existing, absolute path!");
- System.exit(1);
- return;
- }
-
- // The instance of SimpleBlobStore used is based on these criteria in order:
- // 1. If remote cache or local disk cache is specified then use it first.
- // 2. Otherwise start a standalone Hazelcast instance and use it as the blob store. This also
- // creates a REST server for testing.
- // 3. Finally use a ConcurrentMap to back the blob store.
- final SimpleBlobStore blobStore;
- if (usingRemoteCache) {
- blobStore = SimpleBlobStoreFactory.create(remoteOptions, null);
- } else if (remoteWorkerOptions.casPath != null) {
- blobStore = new OnDiskBlobStore(fs.getPath(remoteWorkerOptions.casPath));
- } else if (remoteWorkerOptions.hazelcastStandaloneListenPort != 0) {
- blobStore = createHazelcast(remoteWorkerOptions);
- } else {
- blobStore = new ConcurrentMapBlobStore(new ConcurrentHashMap<String, byte[]>());
- }
-
- DigestUtil digestUtil = new DigestUtil(fs.getDigestFunction());
- RemoteWorker worker =
- new RemoteWorker(
- fs,
- remoteWorkerOptions,
- new SimpleBlobStoreActionCache(blobStore, digestUtil),
- sandboxPath,
- digestUtil);
-
- final Server server = worker.startServer();
- worker.createPidFile();
- server.awaitTermination();
- }
-
- private static Path prepareSandboxRunner(FileSystem fs, RemoteWorkerOptions remoteWorkerOptions) {
- if (OS.getCurrent() != OS.LINUX) {
- logger.severe("Sandboxing requested, but it is currently only available on Linux.");
- System.exit(1);
- }
-
- if (remoteWorkerOptions.workPath == null) {
- logger.severe("Sandboxing requested, but --work_path was not specified.");
- System.exit(1);
- }
-
- InputStream sandbox = RemoteWorker.class.getResourceAsStream("/main/tools/linux-sandbox");
- if (sandbox == null) {
- logger.severe(
- "Sandboxing requested, but could not find bundled linux-sandbox binary. "
- + "Please rebuild a remote_worker_deploy.jar on Linux to make this work.");
- System.exit(1);
- }
-
- Path sandboxPath = null;
- try {
- sandboxPath = fs.getPath(remoteWorkerOptions.workPath).getChild("linux-sandbox");
- try (FileOutputStream fos = new FileOutputStream(sandboxPath.getPathString())) {
- ByteStreams.copy(sandbox, fos);
- }
- sandboxPath.setExecutable(true);
- } catch (IOException e) {
- logger.log(SEVERE, "Could not extract the bundled linux-sandbox binary to " + sandboxPath, e);
- System.exit(1);
- }
-
- CommandResult cmdResult = null;
- Command cmd =
- new Command(
- ImmutableList.of(sandboxPath.getPathString(), "--", "true").toArray(new String[0]),
- ImmutableMap.<String, String>of(),
- sandboxPath.getParentDirectory().getPathFile());
- try {
- cmdResult = cmd.execute();
- } catch (CommandException e) {
- logger.log(
- SEVERE,
- "Sandboxing requested, but it failed to execute 'true' as a self-check: "
- + new String(cmdResult.getStderr(), UTF_8),
- e);
- System.exit(1);
- }
-
- return sandboxPath;
- }
-}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorkerOptions.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorkerOptions.java
deleted file mode 100644
index 9dee83a349..0000000000
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorkerOptions.java
+++ /dev/null
@@ -1,172 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.remote;
-
-import com.google.devtools.build.lib.actions.LocalHostCapacity;
-import com.google.devtools.common.options.Converters.RangeConverter;
-import com.google.devtools.common.options.Option;
-import com.google.devtools.common.options.OptionDocumentationCategory;
-import com.google.devtools.common.options.OptionEffectTag;
-import com.google.devtools.common.options.OptionsBase;
-import com.google.devtools.common.options.OptionsParsingException;
-import java.util.List;
-
-/** Options for remote worker. */
-public class RemoteWorkerOptions extends OptionsBase {
- @Option(
- name = "listen_port",
- defaultValue = "8080",
- category = "build_worker",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- help = "Listening port for the netty server."
- )
- public int listenPort;
-
- @Option(
- name = "work_path",
- defaultValue = "null",
- category = "build_worker",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- help = "A directory for the build worker to do work."
- )
- public String workPath;
-
- @Option(
- name = "cas_path",
- defaultValue = "null",
- category = "build_worker",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- help = "A directory for the build worker to store it's files in. If left unset, and if no "
- + "other store is set, the worker falls back to an in-memory store."
- )
- public String casPath;
-
- @Option(
- name = "debug",
- defaultValue = "false",
- category = "build_worker",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- help =
- "Turn this on for debugging remote job failures. There will be extra messages and the "
- + "work directory will be preserved in the case of failure."
- )
- public boolean debug;
-
- @Option(
- name = "pid_file",
- defaultValue = "null",
- category = "build_worker",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- help = "File for writing the process id for this worker when it is fully started."
- )
- public String pidFile;
-
- @Option(
- name = "sandboxing",
- defaultValue = "false",
- category = "build_worker",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- help = "If supported on this platform, use sandboxing for increased hermeticity."
- )
- public boolean sandboxing;
-
- @Option(
- name = "sandboxing_writable_path",
- defaultValue = "",
- category = "build_worker",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- allowMultiple = true,
- help = "When using sandboxing, allow running actions to write to this path."
- )
- public List<String> sandboxingWritablePaths;
-
- @Option(
- name = "sandboxing_tmpfs_dir",
- defaultValue = "",
- category = "build_worker",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- allowMultiple = true,
- help = "When using sandboxing, mount an empty tmpfs onto this path for each running action."
- )
- public List<String> sandboxingTmpfsDirs;
-
- @Option(
- name = "sandboxing_block_network",
- defaultValue = "false",
- category = "build_worker",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- help = "When using sandboxing, block network access for running actions."
- )
- public boolean sandboxingBlockNetwork;
-
- @Option(
- name = "jobs",
- defaultValue = "auto",
- converter = JobsConverter.class,
- category = "build_worker",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- help =
- "The maximum number of concurrent jobs to run. \"auto\" means to use a reasonable value"
- + " derived from the machine's hardware profile (e.g. the number of processors)."
- + " Values above " + MAX_JOBS + " are not allowed."
- )
- public int jobs;
-
- @Option(
- name = "hazelcast_standalone_listen_port",
- defaultValue = "0",
- category = "build_worker",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- help =
- "Runs an embedded hazelcast server that listens to this port. The server does not join"
- + " any cluster. This is useful for testing."
- )
- public int hazelcastStandaloneListenPort;
-
- private static final int MAX_JOBS = 16384;
-
- /** Converter for jobs: [0, MAX_JOBS] or "auto". */
- public static class JobsConverter extends RangeConverter {
- public JobsConverter() {
- super(0, MAX_JOBS);
- }
-
- @Override
- public Integer convert(String input) throws OptionsParsingException {
- if (input.equals("auto")) {
- int autoJobs = (int) Math.ceil(LocalHostCapacity.getLocalHostCapacity().getCpuUsage());
- return Math.min(autoJobs, MAX_JOBS);
- } else {
- return super.convert(input);
- }
- }
-
- @Override
- public String getTypeDescription() {
- return "\"auto\" or " + super.getTypeDescription();
- }
- }
-}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java
deleted file mode 100644
index 494ee58453..0000000000
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java
+++ /dev/null
@@ -1,77 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.remote;
-
-import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.protobuf.Any;
-import com.google.rpc.BadRequest;
-import com.google.rpc.BadRequest.FieldViolation;
-import com.google.rpc.Code;
-import com.google.rpc.Status;
-import io.grpc.StatusException;
-import io.grpc.protobuf.StatusProto;
-
-/** Some utility methods to convert exceptions to Status results. */
-final class StatusUtils {
- private StatusUtils() {}
-
- static StatusException internalError(Exception e) {
- return StatusProto.toStatusException(internalErrorStatus(e));
- }
-
- static Status internalErrorStatus(Exception e) {
- // StatusProto.fromThrowable returns null on non-status errors or errors with no trailers,
- // unlike Status.fromThrowable which returns the UNKNOWN code for these.
- Status st = StatusProto.fromThrowable(e);
- return st != null
- ? st
- : Status.newBuilder().setCode(Code.INTERNAL.getNumber()).setMessage(e.getMessage()).build();
- }
-
- static StatusException notFoundError(Digest digest) {
- return StatusProto.toStatusException(notFoundStatus(digest));
- }
-
- static com.google.rpc.Status notFoundStatus(Digest digest) {
- return Status.newBuilder()
- .setCode(Code.NOT_FOUND.getNumber())
- .setMessage("Digest not found:" + digest)
- .build();
- }
-
- static StatusException interruptedError(Digest digest) {
- return StatusProto.toStatusException(interruptedStatus(digest));
- }
-
- static com.google.rpc.Status interruptedStatus(Digest digest) {
- return Status.newBuilder()
- .setCode(Code.CANCELLED.getNumber())
- .setMessage("Server operation was interrupted for " + digest)
- .build();
- }
-
- static StatusException invalidArgumentError(String field, String desc) {
- return StatusProto.toStatusException(invalidArgumentStatus(field, desc));
- }
-
- static com.google.rpc.Status invalidArgumentStatus(String field, String desc) {
- FieldViolation v = FieldViolation.newBuilder().setField(field).setDescription(desc).build();
- return Status.newBuilder()
- .setCode(Code.INVALID_ARGUMENT.getNumber())
- .setMessage("invalid argument(s): " + field + ": " + desc)
- .addDetails(Any.pack(BadRequest.newBuilder().addFieldViolations(v).build()))
- .build();
- }
-}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java
deleted file mode 100644
index 9a242f92f1..0000000000
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java
+++ /dev/null
@@ -1,112 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.remote;
-
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
-import com.google.longrunning.Operation;
-import com.google.protobuf.Any;
-import com.google.rpc.Code;
-import com.google.rpc.Status;
-import com.google.watcher.v1.Change;
-import com.google.watcher.v1.ChangeBatch;
-import com.google.watcher.v1.Request;
-import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
-import io.grpc.protobuf.StatusProto;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/** A basic implementation of an {@link WatcherImplBase} service. */
-final class WatcherServer extends WatcherImplBase {
- private static final Logger logger = Logger.getLogger(WatcherServer.class.getName());
-
- private final ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache;
-
- public WatcherServer(ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache) {
- this.operationsCache = operationsCache;
- }
-
- @Override
- public void watch(Request wr, StreamObserver<ChangeBatch> responseObserver) {
- final String opName = wr.getTarget();
- ListenableFuture<ActionResult> future = operationsCache.get(opName);
- if (future == null) {
- responseObserver.onError(
- StatusProto.toStatusRuntimeException(
- Status.newBuilder()
- .setCode(Code.NOT_FOUND.getNumber())
- .setMessage("Operation not found: " + opName)
- .build()));
- return;
- }
-
- future.addListener(() -> {
- try {
- try {
- ActionResult result = future.get();
- responseObserver.onNext(
- packExists(
- Operation.newBuilder()
- .setName(opName)
- .setDone(true)
- .setResponse(
- Any.pack(ExecuteResponse.newBuilder().setResult(result).build()))));
- responseObserver.onCompleted();
- } catch (ExecutionException e) {
- Throwables.throwIfUnchecked(e.getCause());
- throw (Exception) e.getCause();
- }
- } catch (Exception e) {
- logger.log(Level.SEVERE, "Work failed: " + opName, e);
- responseObserver.onNext(
- ChangeBatch.newBuilder()
- .addChanges(
- Change.newBuilder()
- .setState(Change.State.EXISTS)
- .setData(
- Any.pack(
- Operation.newBuilder()
- .setName(opName)
- .setError(StatusUtils.internalErrorStatus(e))
- .build()))
- .build())
- .build());
- responseObserver.onCompleted();
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- } finally {
- operationsCache.remove(opName);
- }
- }, MoreExecutors.directExecutor());
- }
-
- /** Constructs a ChangeBatch with an exists state change that contains the given operation. */
- private ChangeBatch packExists(Operation.Builder message) {
- return ChangeBatch.newBuilder()
- .addChanges(
- Change.newBuilder()
- .setState(Change.State.EXISTS)
- .setData(
- Any.pack(message.build())))
- .build();
- }
-}