diff options
author | 2017-12-05 12:41:19 -0800 | |
---|---|---|
committer | 2017-12-05 12:43:25 -0800 | |
commit | 2f3d7df5750ec7f3ac293c92c31d2969af2dd1cf (patch) | |
tree | 20c679cba7b15b05e87e58d3f5b21444fdde9286 /src/tools/remote_worker | |
parent | 110432fdfe2b030745bdf8aaa750bbcf9d313327 (diff) |
Moving the RemoteWorker into tools/remote directory.
This is because I want to add another remote execution related tool, the remote_client, which will use the Remote Execution API to fetch blobs from a remote cache. I will use this tool as part of end-to-end tests for remote execution.
TESTED=remote integration tests, presubmit
RELNOTES: None
PiperOrigin-RevId: 177995895
Diffstat (limited to 'src/tools/remote_worker')
11 files changed, 0 insertions, 1637 deletions
diff --git a/src/tools/remote_worker/BUILD b/src/tools/remote_worker/BUILD deleted file mode 100644 index 3a45276870..0000000000 --- a/src/tools/remote_worker/BUILD +++ /dev/null @@ -1,16 +0,0 @@ -filegroup( - name = "srcs", - srcs = glob(["**"]) + ["//src/tools/remote_worker/src/main/java/com/google/devtools/build/remote:srcs"], - visibility = ["//src:__pkg__"], -) - -java_binary( - name = "remote_worker", - jvm_flags = [ - # Enables REST for Hazelcast server for testing. - "-Dhazelcast.rest.enabled=true", - ], - main_class = "com.google.devtools.build.remote.RemoteWorker", - visibility = ["//visibility:public"], - runtime_deps = ["//src/tools/remote_worker/src/main/java/com/google/devtools/build/remote"], -) diff --git a/src/tools/remote_worker/README.md b/src/tools/remote_worker/README.md deleted file mode 100644 index 408ab67d47..0000000000 --- a/src/tools/remote_worker/README.md +++ /dev/null @@ -1,42 +0,0 @@ -# Remote worker - -This program implements a remote execution worker that uses gRPC to accept work -requests. It can work as a remote execution worker, a cache worker, or both. -The simplest setup is as follows: - -- First build remote_worker and run it. - - bazel build src/tools/remote_worker:all - bazel-bin/src/tools/remote_worker/remote_worker \ - --work_path=/tmp/test \ - --listen_port=8080 - -- Then you run Bazel pointing to the remote_worker instance. - - bazel build \ - --spawn_strategy=remote --remote_cache=localhost:8080 \ - --remote_executor=localhost:8080 src/tools/generate_workspace:all - -The above command will build generate_workspace with remote spawn strategy that -uses the local worker as the distributed caching and execution backend. - -## Sandboxing - -If you run the remote_worker on Linux, you can also enable sandboxing for increased hermeticity: - - bazel-bin/src/tools/remote_worker/remote_worker \ - --work_path=/tmp/test \ - --listen_port=8080 \ - --sandboxing \ - --sandboxing_writable_path=/run/shm \ - --sandboxing_tmpfs_dir=/tmp \ - --sandboxing_block_network - -As you can see, the specific behavior of the sandbox can be tuned via the flags - -- --sandboxing_writable_path=<path> makes a path writable for running actions. -- --sandboxing_tmpfs_dir=<path> will mount a fresh, empty tmpfs for each running action on a path. -- --sandboxing_block_network will put each running action into its own network namespace that has - no network connectivity except for its own "localhost". Note that due to a Linux kernel issue this - might result in a loss of performance if you run many actions in parallel. For long running tests - it probably won't matter much, though. diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ActionCacheServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ActionCacheServer.java deleted file mode 100644 index 8e9ba2fafb..0000000000 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ActionCacheServer.java +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.remote; - -import static java.util.logging.Level.WARNING; - -import com.google.devtools.build.lib.remote.DigestUtil; -import com.google.devtools.build.lib.remote.DigestUtil.ActionKey; -import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; -import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase; -import com.google.devtools.remoteexecution.v1test.ActionResult; -import com.google.devtools.remoteexecution.v1test.GetActionResultRequest; -import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest; -import io.grpc.stub.StreamObserver; -import java.util.logging.Logger; - -/** A basic implementation of an {@link ActionCacheImplBase} service. */ -final class ActionCacheServer extends ActionCacheImplBase { - private static final Logger logger = Logger.getLogger(ActionCacheImplBase.class.getName()); - - private final SimpleBlobStoreActionCache cache; - private final DigestUtil digestUtil; - - public ActionCacheServer(SimpleBlobStoreActionCache cache, DigestUtil digestUtil) { - this.cache = cache; - this.digestUtil = digestUtil; - } - - @Override - public void getActionResult( - GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) { - try { - ActionKey actionKey = digestUtil.asActionKey(request.getActionDigest()); - ActionResult result = cache.getCachedActionResult(actionKey); - - if (result == null) { - responseObserver.onError(StatusUtils.notFoundError(request.getActionDigest())); - return; - } - - responseObserver.onNext(result); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.log(WARNING, "getActionResult request failed.", e); - responseObserver.onError(StatusUtils.internalError(e)); - } - } - - @Override - public void updateActionResult( - UpdateActionResultRequest request, StreamObserver<ActionResult> responseObserver) { - try { - ActionKey actionKey = digestUtil.asActionKey(request.getActionDigest()); - cache.setCachedActionResult(actionKey, request.getActionResult()); - responseObserver.onNext(request.getActionResult()); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.log(WARNING, "updateActionResult request failed.", e); - responseObserver.onError(StatusUtils.internalError(e)); - } - } -} diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD deleted file mode 100644 index 8f36c4d0a8..0000000000 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD +++ /dev/null @@ -1,43 +0,0 @@ -filegroup( - name = "srcs", - srcs = glob(["**"]), - visibility = ["//src/tools/remote_worker:__pkg__"], -) - -java_library( - name = "remote", - srcs = glob(["*.java"]), - data = ["//src:libunix"], - resources = ["//src/main/tools:linux-sandbox"], - visibility = ["//src/tools/remote_worker:__subpackages__"], - deps = [ - "//src/main/java/com/google/devtools/build/lib:build-base", - "//src/main/java/com/google/devtools/build/lib:os_util", - "//src/main/java/com/google/devtools/build/lib:packages-internal", - "//src/main/java/com/google/devtools/build/lib:process_util", - "//src/main/java/com/google/devtools/build/lib:single-line-formatter", - "//src/main/java/com/google/devtools/build/lib:unix", - "//src/main/java/com/google/devtools/build/lib:util", - "//src/main/java/com/google/devtools/build/lib/actions", - "//src/main/java/com/google/devtools/build/lib/remote", - "//src/main/java/com/google/devtools/build/lib/shell", - "//src/main/java/com/google/devtools/build/lib/vfs", - "//src/main/java/com/google/devtools/common/options", - "//third_party:guava", - "//third_party:hazelcast", - "//third_party:netty", - "//third_party/grpc:grpc-jar", - "//third_party/protobuf:protobuf_java", - "//third_party/protobuf:protobuf_java_util", - "@googleapis//:google_bytestream_bytestream_java_grpc", - "@googleapis//:google_bytestream_bytestream_java_proto", - "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc", - "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto", - "@googleapis//:google_longrunning_operations_java_proto", - "@googleapis//:google_rpc_code_java_proto", - "@googleapis//:google_rpc_error_details_java_proto", - "@googleapis//:google_rpc_status_java_proto", - "@googleapis//:google_watch_v1_java_grpc", - "@googleapis//:google_watch_v1_java_proto", - ], -) diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java deleted file mode 100644 index 778d74335a..0000000000 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java +++ /dev/null @@ -1,275 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.remote; - -import static java.util.logging.Level.SEVERE; -import static java.util.logging.Level.WARNING; - -import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; -import com.google.bytestream.ByteStreamProto.ReadRequest; -import com.google.bytestream.ByteStreamProto.ReadResponse; -import com.google.bytestream.ByteStreamProto.WriteRequest; -import com.google.bytestream.ByteStreamProto.WriteResponse; -import com.google.devtools.build.lib.remote.CacheNotFoundException; -import com.google.devtools.build.lib.remote.Chunker; -import com.google.devtools.build.lib.remote.DigestUtil; -import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; -import com.google.devtools.build.lib.vfs.FileSystemUtils; -import com.google.devtools.build.lib.vfs.Path; -import com.google.devtools.remoteexecution.v1test.Digest; -import io.grpc.Status; -import io.grpc.protobuf.StatusProto; -import io.grpc.stub.StreamObserver; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.UUID; -import java.util.logging.Logger; -import javax.annotation.Nullable; - -/** A basic implementation of a {@link ByteStreamImplBase} service. */ -final class ByteStreamServer extends ByteStreamImplBase { - private static final Logger logger = Logger.getLogger(ByteStreamServer.class.getName()); - private final SimpleBlobStoreActionCache cache; - private final Path workPath; - private final DigestUtil digestUtil; - - static @Nullable Digest parseDigestFromResourceName(String resourceName) { - try { - String[] tokens = resourceName.split("/"); - if (tokens.length < 2) { - return null; - } - String hash = tokens[tokens.length - 2]; - long size = Long.parseLong(tokens[tokens.length - 1]); - return DigestUtil.buildDigest(hash, size); - } catch (NumberFormatException e) { - return null; - } - } - - public ByteStreamServer(SimpleBlobStoreActionCache cache, Path workPath, DigestUtil digestUtil) { - this.cache = cache; - this.workPath = workPath; - this.digestUtil = digestUtil; - } - - @Override - public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { - Digest digest = parseDigestFromResourceName(request.getResourceName()); - - if (digest == null) { - responseObserver.onError( - StatusUtils.invalidArgumentError( - "resource_name", - "Failed parsing digest from resource_name:" + request.getResourceName())); - } - - try { - // This still relies on the blob size to be small enough to fit in memory. - // TODO(olaola): refactor to fix this if the need arises. - Chunker c = new Chunker(cache.downloadBlob(digest), digestUtil); - while (c.hasNext()) { - responseObserver.onNext( - ReadResponse.newBuilder().setData(c.next().getData()).build()); - } - responseObserver.onCompleted(); - } catch (CacheNotFoundException e) { - responseObserver.onError(StatusUtils.notFoundError(digest)); - } catch (Exception e) { - logger.log(WARNING, "Read request failed.", e); - responseObserver.onError(StatusUtils.internalError(e)); - } - } - - @Override - public StreamObserver<WriteRequest> write(final StreamObserver<WriteResponse> responseObserver) { - Path temp = workPath.getRelative("upload").getRelative(UUID.randomUUID().toString()); - try { - FileSystemUtils.createDirectoryAndParents(temp.getParentDirectory()); - temp.getOutputStream().close(); - } catch (IOException e) { - logger.log(SEVERE, "Failed to create temporary file for upload", e); - responseObserver.onError(StatusUtils.internalError(e)); - // We need to make sure that subsequent onNext or onCompleted calls don't make any further - // calls on the responseObserver after the onError above, so we return a no-op observer. - return new NoOpStreamObserver<>(); - } - return new StreamObserver<WriteRequest>() { - private Digest digest; - private long offset; - private String resourceName; - private boolean closed; - - @Override - public void onNext(WriteRequest request) { - if (closed) { - return; - } - - if (digest == null) { - resourceName = request.getResourceName(); - digest = parseDigestFromResourceName(resourceName); - } - - if (digest == null) { - responseObserver.onError( - StatusUtils.invalidArgumentError( - "resource_name", - "Failed parsing digest from resource_name: " + request.getResourceName())); - closed = true; - return; - } - - if (offset == 0) { - try { - if (cache.containsKey(digest)) { - responseObserver.onNext( - WriteResponse.newBuilder().setCommittedSize(digest.getSizeBytes()).build()); - responseObserver.onCompleted(); - closed = true; - return; - } - } catch (InterruptedException e) { - responseObserver.onError(StatusUtils.interruptedError(digest)); - Thread.currentThread().interrupt(); - closed = true; - return; - } catch (IOException e) { - responseObserver.onError(StatusUtils.internalError(e)); - closed = true; - return; - } - } - - if (request.getWriteOffset() != offset) { - responseObserver.onError( - StatusUtils.invalidArgumentError( - "write_offset", - "Expected: " + offset + ", received: " + request.getWriteOffset())); - closed = true; - return; - } - - if (!request.getResourceName().isEmpty() - && !request.getResourceName().equals(resourceName)) { - responseObserver.onError( - StatusUtils.invalidArgumentError( - "resource_name", - "Expected: " + resourceName + ", received: " + request.getResourceName())); - closed = true; - return; - } - - long size = request.getData().size(); - - if (size > 0) { - try (OutputStream out = temp.getOutputStream(true)) { - request.getData().writeTo(out); - } catch (IOException e) { - responseObserver.onError(StatusUtils.internalError(e)); - closed = true; - return; - } - offset += size; - } - - boolean shouldFinishWrite = offset == digest.getSizeBytes(); - - if (shouldFinishWrite != request.getFinishWrite()) { - responseObserver.onError( - StatusUtils.invalidArgumentError( - "finish_write", - "Expected:" + shouldFinishWrite + ", received: " + request.getFinishWrite())); - closed = true; - return; - } - } - - @Override - public void onError(Throwable t) { - if (Status.fromThrowable(t).getCode() != Status.Code.CANCELLED) { - logger.log(WARNING, "Write request failed remotely.", t); - } - closed = true; - try { - temp.delete(); - } catch (IOException e) { - logger.log(WARNING, "Could not delete temp file.", e); - } - } - - @Override - public void onCompleted() { - if (closed) { - return; - } - - if (digest == null || offset != digest.getSizeBytes()) { - responseObserver.onError( - StatusProto.toStatusRuntimeException( - com.google.rpc.Status.newBuilder() - .setCode(Status.Code.FAILED_PRECONDITION.value()) - .setMessage("Request completed before all data was sent.") - .build())); - closed = true; - return; - } - - try { - Digest d = digestUtil.compute(temp); - try (InputStream in = temp.getInputStream()) { - cache.uploadStream(d, in); - } - try { - temp.delete(); - } catch (IOException e) { - logger.log(WARNING, "Could not delete temp file.", e); - } - - if (!d.equals(digest)) { - responseObserver.onError( - StatusUtils.invalidArgumentError( - "resource_name", - "Received digest " + digest + " does not match computed digest " + d)); - closed = true; - return; - } - - responseObserver.onNext(WriteResponse.newBuilder().setCommittedSize(offset).build()); - responseObserver.onCompleted(); - } catch (Exception e) { - logger.log(WARNING, "Write request failed.", e); - responseObserver.onError(StatusUtils.internalError(e)); - closed = true; - } - } - }; - } - - private static class NoOpStreamObserver<T> implements StreamObserver<T> { - @Override - public void onNext(T value) { - } - - @Override - public void onError(Throwable t) { - } - - @Override - public void onCompleted() { - } - } -} diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/CasServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/CasServer.java deleted file mode 100644 index 13e42b239d..0000000000 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/CasServer.java +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.remote; - -import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; -import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsRequest; -import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsResponse; -import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase; -import com.google.devtools.remoteexecution.v1test.Digest; -import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest; -import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse; -import com.google.devtools.remoteexecution.v1test.UpdateBlobRequest; -import com.google.rpc.Code; -import io.grpc.stub.StreamObserver; -import java.io.IOException; - -/** A basic implementation of a {@link ContentAddressableStorageImplBase} service. */ -final class CasServer extends ContentAddressableStorageImplBase { - private final SimpleBlobStoreActionCache cache; - - public CasServer(SimpleBlobStoreActionCache cache) { - this.cache = cache; - } - - @Override - public void findMissingBlobs( - FindMissingBlobsRequest request, StreamObserver<FindMissingBlobsResponse> responseObserver) { - FindMissingBlobsResponse.Builder response = FindMissingBlobsResponse.newBuilder(); - - try { - for (Digest digest : request.getBlobDigestsList()) { - try { - if (!cache.containsKey(digest)) { - response.addMissingBlobDigests(digest); - } - } catch (InterruptedException e) { - responseObserver.onError(StatusUtils.interruptedError(digest)); - Thread.currentThread().interrupt(); - return; - } - } - responseObserver.onNext(response.build()); - responseObserver.onCompleted(); - } catch (IOException e) { - responseObserver.onError(StatusUtils.internalError(e)); - } - } - - @Override - public void batchUpdateBlobs( - BatchUpdateBlobsRequest request, StreamObserver<BatchUpdateBlobsResponse> responseObserver) { - BatchUpdateBlobsResponse.Builder batchResponse = BatchUpdateBlobsResponse.newBuilder(); - for (UpdateBlobRequest r : request.getRequestsList()) { - BatchUpdateBlobsResponse.Response.Builder resp = batchResponse.addResponsesBuilder(); - try { - Digest digest = cache.uploadBlob(r.getData().toByteArray()); - if (!r.getContentDigest().equals(digest)) { - String err = - "Upload digest " + r.getContentDigest() + " did not match data digest: " + digest; - resp.setStatus(StatusUtils.invalidArgumentStatus("content_digest", err)); - continue; - } - resp.getStatusBuilder().setCode(Code.OK.getNumber()); - } catch (Exception e) { - resp.setStatus(StatusUtils.internalErrorStatus(e)); - } - } - responseObserver.onNext(batchResponse.build()); - responseObserver.onCompleted(); - } -} diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java deleted file mode 100644 index 0b3f2780bf..0000000000 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java +++ /dev/null @@ -1,407 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.remote; - -import static java.util.logging.Level.FINE; -import static java.util.logging.Level.INFO; -import static java.util.logging.Level.SEVERE; -import static java.util.logging.Level.WARNING; - -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.devtools.build.lib.remote.CacheNotFoundException; -import com.google.devtools.build.lib.remote.DigestUtil; -import com.google.devtools.build.lib.remote.DigestUtil.ActionKey; -import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; -import com.google.devtools.build.lib.remote.TracingMetadataUtils; -import com.google.devtools.build.lib.shell.AbnormalTerminationException; -import com.google.devtools.build.lib.shell.Command; -import com.google.devtools.build.lib.shell.CommandException; -import com.google.devtools.build.lib.shell.CommandResult; -import com.google.devtools.build.lib.shell.FutureCommandResult; -import com.google.devtools.build.lib.vfs.FileSystemUtils; -import com.google.devtools.build.lib.vfs.Path; -import com.google.devtools.remoteexecution.v1test.Action; -import com.google.devtools.remoteexecution.v1test.ActionResult; -import com.google.devtools.remoteexecution.v1test.Command.EnvironmentVariable; -import com.google.devtools.remoteexecution.v1test.ExecuteRequest; -import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase; -import com.google.devtools.remoteexecution.v1test.Platform; -import com.google.devtools.remoteexecution.v1test.RequestMetadata; -import com.google.longrunning.Operation; -import com.google.protobuf.util.Durations; -import com.google.rpc.Code; -import com.google.rpc.Status; -import io.grpc.Context; -import io.grpc.StatusException; -import io.grpc.protobuf.StatusProto; -import io.grpc.stub.StreamObserver; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.nio.file.FileAlreadyExistsException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** A basic implementation of an {@link ExecutionImplBase} service. */ -final class ExecutionServer extends ExecutionImplBase { - private static final Logger logger = Logger.getLogger(ExecutionServer.class.getName()); - - private final Object lock = new Object(); - - // The name of the container image entry in the Platform proto - // (see third_party/googleapis/devtools/remoteexecution/*/remote_execution.proto and - // experimental_remote_platform_override in - // src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java) - private static final String CONTAINER_IMAGE_ENTRY_NAME = "container-image"; - private static final String DOCKER_IMAGE_PREFIX = "docker://"; - - // How long to wait for the uid command. - private static final Duration uidTimeout = Duration.ofMillis(30); - - private static final int LOCAL_EXEC_ERROR = -1; - - private final Path workPath; - private final Path sandboxPath; - private final RemoteWorkerOptions workerOptions; - private final SimpleBlobStoreActionCache cache; - private final ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache; - private final ListeningExecutorService executorService; - private final DigestUtil digestUtil; - - public ExecutionServer( - Path workPath, - Path sandboxPath, - RemoteWorkerOptions workerOptions, - SimpleBlobStoreActionCache cache, - ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache, - DigestUtil digestUtil) { - this.workPath = workPath; - this.sandboxPath = sandboxPath; - this.workerOptions = workerOptions; - this.cache = cache; - this.operationsCache = operationsCache; - this.digestUtil = digestUtil; - ThreadPoolExecutor realExecutor = new ThreadPoolExecutor( - // This is actually the max number of concurrent jobs. - workerOptions.jobs, - // Since we use an unbounded queue, the executor ignores this value, but it still checks - // that it is greater or equal to the value above. - workerOptions.jobs, - // Shut down idle threads after one minute. Threads aren't all that expensive, but we also - // don't need to keep them around if we don't need them. - 1, TimeUnit.MINUTES, - // We use an unbounded queue for now. - // TODO(ulfjack): We need to reject work eventually. - new LinkedBlockingQueue<>(), - new ThreadFactoryBuilder().setNameFormat("subprocess-handler-%d").build()); - // Allow the core threads to die. - realExecutor.allowCoreThreadTimeOut(true); - this.executorService = MoreExecutors.listeningDecorator(realExecutor); - } - - @Override - public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) { - final String opName = UUID.randomUUID().toString(); - ListenableFuture<ActionResult> future = - executorService.submit(Context.current().wrap(() -> execute(request, opName))); - operationsCache.put(opName, future); - responseObserver.onNext(Operation.newBuilder().setName(opName).build()); - responseObserver.onCompleted(); - } - - private ActionResult execute(ExecuteRequest request, String id) - throws IOException, InterruptedException, StatusException { - Path tempRoot = workPath.getRelative("build-" + id); - String workDetails = ""; - try { - tempRoot.createDirectory(); - RequestMetadata meta = TracingMetadataUtils.fromCurrentContext(); - workDetails = - String.format( - "build-request-id: %s command-id: %s action-id: %s", - meta.getCorrelatedInvocationsId(), meta.getToolInvocationId(), meta.getActionId()); - logger.log(FINE, "Received work for: {0}", workDetails); - ActionResult result = execute(request.getAction(), tempRoot); - logger.log(FINE, "Completed {0}.", workDetails); - return result; - } catch (Exception e) { - logger.log(Level.SEVERE, "Work failed: {0} {1}.", new Object[] {workDetails, e}); - throw e; - } finally { - if (workerOptions.debug) { - logger.log(INFO, "Preserving work directory {0}.", tempRoot); - } else { - try { - FileSystemUtils.deleteTree(tempRoot); - } catch (IOException e) { - logger.log(SEVERE, - String.format( - "Failed to delete tmp directory %s: %s", - tempRoot, Throwables.getStackTraceAsString(e))); - } - } - } - } - - private ActionResult execute(Action action, Path execRoot) - throws IOException, InterruptedException, StatusException { - com.google.devtools.remoteexecution.v1test.Command command = null; - try { - command = - com.google.devtools.remoteexecution.v1test.Command.parseFrom( - cache.downloadBlob(action.getCommandDigest())); - cache.downloadTree(action.getInputRootDigest(), execRoot); - } catch (CacheNotFoundException e) { - throw StatusUtils.notFoundError(e.getMissingDigest()); - } - - List<Path> outputs = new ArrayList<>(action.getOutputFilesList().size()); - for (String output : action.getOutputFilesList()) { - Path file = execRoot.getRelative(output); - if (file.exists()) { - throw new FileAlreadyExistsException("Output file already exists: " + file); - } - FileSystemUtils.createDirectoryAndParents(file.getParentDirectory()); - outputs.add(file); - } - // TODO(olaola): support output directories. - - // TODO(ulfjack): This is basically a copy of LocalSpawnRunner. Ideally, we'd use that - // implementation instead of copying it. - Command cmd = - getCommand( - action, - command.getArgumentsList(), - getEnvironmentVariables(command), - execRoot.getPathString()); - long startTime = System.currentTimeMillis(); - CommandResult cmdResult = null; - - FutureCommandResult futureCmdResult = null; - synchronized (lock) { - // Linux does not provide a safe API for a multi-threaded program to fork a subprocess. - // Consider the case where two threads both write an executable file and then try to execute - // it. It can happen that the first thread writes its executable file, with the file - // descriptor still being open when the second thread forks, with the fork inheriting a copy - // of the file descriptor. Then the first thread closes the original file descriptor, and - // proceeds to execute the file. At that point Linux sees an open file descriptor to the file - // and returns ETXTBSY (Text file busy) as an error. This race is inherent in the fork / exec - // duality, with fork always inheriting a copy of the file descriptor table; if there was a - // way to fork without copying the entire file descriptor table (e.g., only copy specific - // entries), we could avoid this race. - // - // I was able to reproduce this problem reliably by running significantly more threads than - // there are CPU cores on my workstation - the more threads the more likely it happens. - // - // As a workaround, we put a synchronized block around the fork. - try { - futureCmdResult = cmd.executeAsync(); - } catch (CommandException e) { - Throwables.throwIfInstanceOf(e.getCause(), IOException.class); - } - } - - if (futureCmdResult != null) { - try { - cmdResult = futureCmdResult.get(); - } catch (AbnormalTerminationException e) { - cmdResult = e.getResult(); - } - } - - long timeoutMillis = - action.hasTimeout() - ? Durations.toMillis(action.getTimeout()) - : TimeUnit.MINUTES.toMillis(15); - boolean wasTimeout = - (cmdResult != null && cmdResult.getTerminationStatus().timedOut()) - || wasTimeout(timeoutMillis, System.currentTimeMillis() - startTime); - final int exitCode; - if (wasTimeout) { - final String errMessage = - String.format( - "Command:\n%s\nexceeded deadline of %f seconds.", - Arrays.toString(command.getArgumentsList().toArray()), timeoutMillis / 1000.0); - logger.warning(errMessage); - throw StatusProto.toStatusException( - Status.newBuilder() - .setCode(Code.DEADLINE_EXCEEDED.getNumber()) - .setMessage(errMessage) - .build()); - } else if (cmdResult == null) { - exitCode = LOCAL_EXEC_ERROR; - } else { - exitCode = cmdResult.getTerminationStatus().getRawExitCode(); - } - - ActionResult.Builder result = ActionResult.newBuilder(); - cache.upload(result, execRoot, outputs); - byte[] stdout = cmdResult.getStdout(); - byte[] stderr = cmdResult.getStderr(); - cache.uploadOutErr(result, stdout, stderr); - ActionResult finalResult = result.setExitCode(exitCode).build(); - if (exitCode == 0) { - ActionKey actionKey = digestUtil.computeActionKey(action); - cache.setCachedActionResult(actionKey, finalResult); - } - return finalResult; - } - - private boolean wasTimeout(long timeoutMillis, long wallTimeMillis) { - return timeoutMillis > 0 && wallTimeMillis > timeoutMillis; - } - - private Map<String, String> getEnvironmentVariables( - com.google.devtools.remoteexecution.v1test.Command command) { - HashMap<String, String> result = new HashMap<>(); - for (EnvironmentVariable v : command.getEnvironmentVariablesList()) { - result.put(v.getName(), v.getValue()); - } - return result; - } - - // Gets the uid of the current user. If uid could not be successfully fetched (e.g., on other - // platforms, if for some reason the timeout was not met, if "id -u" returned non-numeric - // number, etc), logs a WARNING and return -1. - // This is used to set "-u UID" flag for commands running inside Docker containers. There are - // only a small handful of cases where uid is vital (e.g., if strict permissions are set on the - // output files), so most use cases would work without setting uid. - private long getUid() { - Command cmd = - new Command(new String[] {"id", "-u"}, /*env=*/null, /*workingDir=*/null, uidTimeout); - try { - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - cmd.execute(stdout, stderr); - return Long.parseLong(stdout.toString().trim()); - } catch (CommandException | NumberFormatException e) { - logger.log( - WARNING, "Could not get UID for passing to Docker container. Proceeding without it.", e); - return -1; - } - } - - // Checks Action for docker container definition. If no docker container specified, returns - // null. Otherwise returns docker container name from the parameters. - private String dockerContainer(Action action) throws StatusException { - String result = null; - for (Platform.Property property : action.getPlatform().getPropertiesList()) { - if (property.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) { - if (result != null) { - // Multiple container name entries - throw StatusUtils.invalidArgumentError( - "platform", // Field name. - String.format( - "Multiple entries for %s in action.Platform", CONTAINER_IMAGE_ENTRY_NAME)); - } - result = property.getValue(); - if (!result.startsWith(DOCKER_IMAGE_PREFIX)) { - throw StatusUtils.invalidArgumentError( - "platform", // Field name. - String.format( - "%s: Docker images must be stored in gcr.io with an image spec in the form " - + "'docker://gcr.io/{IMAGE_NAME}'", - CONTAINER_IMAGE_ENTRY_NAME)); - } - result = result.substring(DOCKER_IMAGE_PREFIX.length()); - } - } - return result; - } - - // Takes an Action and parameters that can be used to create a Command. Returns the Command. - // If no docker container is specified inside Action, creates a Command straight from the - // arguments. Otherwise, returns a Command that would run the specified command inside the - // specified docker container. - private Command getCommand( - Action action, - List<String> commandLineElements, - Map<String, String> environmentVariables, - String pathString) throws StatusException { - String container = dockerContainer(action); - if (container != null) { - // Run command inside a docker container. - ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.size()); - newCommandLineElements.add("docker"); - newCommandLineElements.add("run"); - - long uid = getUid(); - if (uid >= 0) { - newCommandLineElements.add("-u"); - newCommandLineElements.add(Long.toString(uid)); - } - - String dockerPathString = pathString + "-docker"; - newCommandLineElements.add("-v"); - newCommandLineElements.add(pathString + ":" + dockerPathString); - newCommandLineElements.add("-w"); - newCommandLineElements.add(dockerPathString); - - for (Map.Entry<String, String> entry : environmentVariables.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - - newCommandLineElements.add("-e"); - newCommandLineElements.add(key + "=" + value); - } - - newCommandLineElements.add(container); - - newCommandLineElements.addAll(commandLineElements); - - return new Command(newCommandLineElements.toArray(new String[0]), null, new File(pathString)); - } else if (sandboxPath != null) { - // Run command with sandboxing. - ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.size()); - newCommandLineElements.add(sandboxPath.getPathString()); - if (workerOptions.sandboxingBlockNetwork) { - newCommandLineElements.add("-N"); - } - for (String writablePath : workerOptions.sandboxingWritablePaths) { - newCommandLineElements.add("-w"); - newCommandLineElements.add(writablePath); - } - for (String tmpfsDir : workerOptions.sandboxingTmpfsDirs) { - newCommandLineElements.add("-e"); - newCommandLineElements.add(tmpfsDir); - } - newCommandLineElements.add("--"); - newCommandLineElements.addAll(commandLineElements); - return new Command( - newCommandLineElements.toArray(new String[0]), - environmentVariables, - new File(pathString)); - } else { - // Just run the command. - return new Command( - commandLineElements.toArray(new String[0]), environmentVariables, new File(pathString)); - } - } -} diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java deleted file mode 100644 index c05076a627..0000000000 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java +++ /dev/null @@ -1,336 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.remote; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.logging.Level.FINE; -import static java.util.logging.Level.INFO; -import static java.util.logging.Level.SEVERE; - -import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.io.ByteStreams; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.devtools.build.lib.remote.DigestUtil; -import com.google.devtools.build.lib.remote.RemoteOptions; -import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; -import com.google.devtools.build.lib.remote.SimpleBlobStoreFactory; -import com.google.devtools.build.lib.remote.TracingMetadataUtils; -import com.google.devtools.build.lib.remote.blobstore.ConcurrentMapBlobStore; -import com.google.devtools.build.lib.remote.blobstore.OnDiskBlobStore; -import com.google.devtools.build.lib.remote.blobstore.SimpleBlobStore; -import com.google.devtools.build.lib.shell.Command; -import com.google.devtools.build.lib.shell.CommandException; -import com.google.devtools.build.lib.shell.CommandResult; -import com.google.devtools.build.lib.unix.UnixFileSystem; -import com.google.devtools.build.lib.util.OS; -import com.google.devtools.build.lib.util.ProcessUtils; -import com.google.devtools.build.lib.util.SingleLineFormatter; -import com.google.devtools.build.lib.vfs.FileSystem; -import com.google.devtools.build.lib.vfs.FileSystem.HashFunction; -import com.google.devtools.build.lib.vfs.FileSystemUtils; -import com.google.devtools.build.lib.vfs.JavaIoFileSystem; -import com.google.devtools.build.lib.vfs.Path; -import com.google.devtools.build.lib.vfs.PathFragment; -import com.google.devtools.common.options.OptionsParser; -import com.google.devtools.common.options.OptionsParsingException; -import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase; -import com.google.devtools.remoteexecution.v1test.ActionResult; -import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase; -import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase; -import com.google.watcher.v1.WatcherGrpc.WatcherImplBase; -import com.hazelcast.config.Config; -import com.hazelcast.core.Hazelcast; -import com.hazelcast.core.HazelcastInstance; -import io.grpc.Server; -import io.grpc.ServerInterceptor; -import io.grpc.ServerInterceptors; -import io.grpc.netty.NettyServerBuilder; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.ConcurrentHashMap; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Implements a remote worker that accepts work items as protobufs. The server implementation is - * based on gRPC. - */ -public final class RemoteWorker { - // We need to keep references to the root and netty loggers to prevent them from being garbage - // collected, which would cause us to loose their configuration. - private static final Logger rootLogger = Logger.getLogger(""); - private static final Logger nettyLogger = Logger.getLogger("io.grpc.netty"); - private static final Logger logger = Logger.getLogger(RemoteWorker.class.getName()); - - private final RemoteWorkerOptions workerOptions; - private final ActionCacheImplBase actionCacheServer; - private final ByteStreamImplBase bsServer; - private final ContentAddressableStorageImplBase casServer; - private final WatcherImplBase watchServer; - private final ExecutionImplBase execServer; - - static FileSystem getFileSystem() { - final HashFunction hashFunction; - String value = null; - try { - value = System.getProperty("bazel.DigestFunction", "SHA256"); - hashFunction = new HashFunction.Converter().convert(value); - } catch (OptionsParsingException e) { - throw new Error("The specified hash function '" + value + "' is not supported."); - } - return OS.getCurrent() == OS.WINDOWS - ? new JavaIoFileSystem(hashFunction) - : new UnixFileSystem(hashFunction); - } - - public RemoteWorker( - FileSystem fs, - RemoteWorkerOptions workerOptions, - SimpleBlobStoreActionCache cache, - Path sandboxPath, - DigestUtil digestUtil) - throws IOException { - this.workerOptions = workerOptions; - this.actionCacheServer = new ActionCacheServer(cache, digestUtil); - Path workPath; - if (workerOptions.workPath != null) { - workPath = fs.getPath(workerOptions.workPath); - } else { - // TODO(ulfjack): The plan is to make the on-disk storage the default, so we always need to - // provide a path to the remote worker, and we can then also use that as the work path. E.g.: - // /given/path/cas/ - // /given/path/upload/ - // /given/path/work/ - // We could technically use a different path for temporary files and execution, but we want - // the cas/ directory to be on the same file system as the upload/ and work/ directories so - // that we can atomically move files between them, and / or use hard-links for the exec - // directories. - // For now, we use a temporary path if no work path was provided. - workPath = fs.getPath("/tmp/remote-worker"); - } - this.bsServer = new ByteStreamServer(cache, workPath, digestUtil); - this.casServer = new CasServer(cache); - - if (workerOptions.workPath != null) { - ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache = - new ConcurrentHashMap<>(); - FileSystemUtils.createDirectoryAndParents(workPath); - watchServer = new WatcherServer(operationsCache); - execServer = - new ExecutionServer( - workPath, sandboxPath, workerOptions, cache, operationsCache, digestUtil); - } else { - watchServer = null; - execServer = null; - } - } - - public Server startServer() throws IOException { - ServerInterceptor headersInterceptor = new TracingMetadataUtils.ServerHeadersInterceptor(); - NettyServerBuilder b = - NettyServerBuilder.forPort(workerOptions.listenPort) - .addService(ServerInterceptors.intercept(actionCacheServer, headersInterceptor)) - .addService(ServerInterceptors.intercept(bsServer, headersInterceptor)) - .addService(ServerInterceptors.intercept(casServer, headersInterceptor)); - - if (execServer != null) { - b.addService(ServerInterceptors.intercept(execServer, headersInterceptor)); - b.addService(ServerInterceptors.intercept(watchServer, headersInterceptor)); - } else { - logger.info("Execution disabled, only serving cache requests."); - } - - Server server = b.build(); - logger.log(INFO, "Starting gRPC server on port {0,number,#}.", workerOptions.listenPort); - server.start(); - - return server; - } - - private void createPidFile() throws IOException { - if (workerOptions.pidFile == null) { - return; - } - - final Path pidFile = getFileSystem().getPath(workerOptions.pidFile); - try (Writer writer = - new OutputStreamWriter(pidFile.getOutputStream(), StandardCharsets.UTF_8)) { - writer.write(Integer.toString(ProcessUtils.getpid())); - writer.write("\n"); - } - - Runtime.getRuntime() - .addShutdownHook( - new Thread() { - @Override - public void run() { - try { - pidFile.delete(); - } catch (IOException e) { - System.err.println("Cannot remove pid file: " + pidFile); - } - } - }); - } - - /** - * Construct a {@link SimpleBlobStore} using Hazelcast's version of {@link ConcurrentMap}. This - * will start a standalone Hazelcast server in the same JVM. There will also be a REST server - * started for accessing the maps. - */ - private static SimpleBlobStore createHazelcast(RemoteWorkerOptions options) { - Config config = new Config(); - config - .getNetworkConfig() - .setPort(options.hazelcastStandaloneListenPort) - .getJoin() - .getMulticastConfig() - .setEnabled(false); - HazelcastInstance instance = Hazelcast.newHazelcastInstance(config); - return new ConcurrentMapBlobStore(instance.<String, byte[]>getMap("cache")); - } - - public static void main(String[] args) throws Exception { - OptionsParser parser = - OptionsParser.newOptionsParser(RemoteOptions.class, RemoteWorkerOptions.class); - parser.parseAndExitUponError(args); - RemoteOptions remoteOptions = parser.getOptions(RemoteOptions.class); - RemoteWorkerOptions remoteWorkerOptions = parser.getOptions(RemoteWorkerOptions.class); - - rootLogger.getHandlers()[0].setFormatter(new SingleLineFormatter()); - if (remoteWorkerOptions.debug) { - rootLogger.getHandlers()[0].setLevel(FINE); - } - - // Only log severe log messages from Netty. Otherwise it logs warnings that look like this: - // - // 170714 08:16:28.552:WT 18 [io.grpc.netty.NettyServerHandler.onStreamError] Stream Error - // io.netty.handler.codec.http2.Http2Exception$StreamException: Received DATA frame for an - // unknown stream 11369 - // - // As far as we can tell, these do not indicate any problem with the connection. We believe they - // happen when the local side closes a stream, but the remote side hasn't received that - // notification yet, so there may still be packets for that stream en-route to the local - // machine. The wording 'unknown stream' is misleading - the stream was previously known, but - // was recently closed. I'm told upstream discussed this, but didn't want to keep information - // about closed streams around. - nettyLogger.setLevel(Level.SEVERE); - - FileSystem fs = getFileSystem(); - Path sandboxPath = null; - if (remoteWorkerOptions.sandboxing) { - sandboxPath = prepareSandboxRunner(fs, remoteWorkerOptions); - } - - logger.info("Initializing in-memory cache server."); - boolean usingRemoteCache = SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions); - if (!usingRemoteCache) { - logger.warning("Not using remote cache. This should be used for testing only!"); - } - if ((remoteWorkerOptions.casPath != null) - && (!PathFragment.create(remoteWorkerOptions.casPath).isAbsolute() - || !fs.getPath(remoteWorkerOptions.casPath).exists())) { - logger.severe("--cas_path must refer to an existing, absolute path!"); - System.exit(1); - return; - } - - // The instance of SimpleBlobStore used is based on these criteria in order: - // 1. If remote cache or local disk cache is specified then use it first. - // 2. Otherwise start a standalone Hazelcast instance and use it as the blob store. This also - // creates a REST server for testing. - // 3. Finally use a ConcurrentMap to back the blob store. - final SimpleBlobStore blobStore; - if (usingRemoteCache) { - blobStore = SimpleBlobStoreFactory.create(remoteOptions, null); - } else if (remoteWorkerOptions.casPath != null) { - blobStore = new OnDiskBlobStore(fs.getPath(remoteWorkerOptions.casPath)); - } else if (remoteWorkerOptions.hazelcastStandaloneListenPort != 0) { - blobStore = createHazelcast(remoteWorkerOptions); - } else { - blobStore = new ConcurrentMapBlobStore(new ConcurrentHashMap<String, byte[]>()); - } - - DigestUtil digestUtil = new DigestUtil(fs.getDigestFunction()); - RemoteWorker worker = - new RemoteWorker( - fs, - remoteWorkerOptions, - new SimpleBlobStoreActionCache(blobStore, digestUtil), - sandboxPath, - digestUtil); - - final Server server = worker.startServer(); - worker.createPidFile(); - server.awaitTermination(); - } - - private static Path prepareSandboxRunner(FileSystem fs, RemoteWorkerOptions remoteWorkerOptions) { - if (OS.getCurrent() != OS.LINUX) { - logger.severe("Sandboxing requested, but it is currently only available on Linux."); - System.exit(1); - } - - if (remoteWorkerOptions.workPath == null) { - logger.severe("Sandboxing requested, but --work_path was not specified."); - System.exit(1); - } - - InputStream sandbox = RemoteWorker.class.getResourceAsStream("/main/tools/linux-sandbox"); - if (sandbox == null) { - logger.severe( - "Sandboxing requested, but could not find bundled linux-sandbox binary. " - + "Please rebuild a remote_worker_deploy.jar on Linux to make this work."); - System.exit(1); - } - - Path sandboxPath = null; - try { - sandboxPath = fs.getPath(remoteWorkerOptions.workPath).getChild("linux-sandbox"); - try (FileOutputStream fos = new FileOutputStream(sandboxPath.getPathString())) { - ByteStreams.copy(sandbox, fos); - } - sandboxPath.setExecutable(true); - } catch (IOException e) { - logger.log(SEVERE, "Could not extract the bundled linux-sandbox binary to " + sandboxPath, e); - System.exit(1); - } - - CommandResult cmdResult = null; - Command cmd = - new Command( - ImmutableList.of(sandboxPath.getPathString(), "--", "true").toArray(new String[0]), - ImmutableMap.<String, String>of(), - sandboxPath.getParentDirectory().getPathFile()); - try { - cmdResult = cmd.execute(); - } catch (CommandException e) { - logger.log( - SEVERE, - "Sandboxing requested, but it failed to execute 'true' as a self-check: " - + new String(cmdResult.getStderr(), UTF_8), - e); - System.exit(1); - } - - return sandboxPath; - } -} diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorkerOptions.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorkerOptions.java deleted file mode 100644 index 9dee83a349..0000000000 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorkerOptions.java +++ /dev/null @@ -1,172 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.remote; - -import com.google.devtools.build.lib.actions.LocalHostCapacity; -import com.google.devtools.common.options.Converters.RangeConverter; -import com.google.devtools.common.options.Option; -import com.google.devtools.common.options.OptionDocumentationCategory; -import com.google.devtools.common.options.OptionEffectTag; -import com.google.devtools.common.options.OptionsBase; -import com.google.devtools.common.options.OptionsParsingException; -import java.util.List; - -/** Options for remote worker. */ -public class RemoteWorkerOptions extends OptionsBase { - @Option( - name = "listen_port", - defaultValue = "8080", - category = "build_worker", - documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, - effectTags = {OptionEffectTag.UNKNOWN}, - help = "Listening port for the netty server." - ) - public int listenPort; - - @Option( - name = "work_path", - defaultValue = "null", - category = "build_worker", - documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, - effectTags = {OptionEffectTag.UNKNOWN}, - help = "A directory for the build worker to do work." - ) - public String workPath; - - @Option( - name = "cas_path", - defaultValue = "null", - category = "build_worker", - documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, - effectTags = {OptionEffectTag.UNKNOWN}, - help = "A directory for the build worker to store it's files in. If left unset, and if no " - + "other store is set, the worker falls back to an in-memory store." - ) - public String casPath; - - @Option( - name = "debug", - defaultValue = "false", - category = "build_worker", - documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, - effectTags = {OptionEffectTag.UNKNOWN}, - help = - "Turn this on for debugging remote job failures. There will be extra messages and the " - + "work directory will be preserved in the case of failure." - ) - public boolean debug; - - @Option( - name = "pid_file", - defaultValue = "null", - category = "build_worker", - documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, - effectTags = {OptionEffectTag.UNKNOWN}, - help = "File for writing the process id for this worker when it is fully started." - ) - public String pidFile; - - @Option( - name = "sandboxing", - defaultValue = "false", - category = "build_worker", - documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, - effectTags = {OptionEffectTag.UNKNOWN}, - help = "If supported on this platform, use sandboxing for increased hermeticity." - ) - public boolean sandboxing; - - @Option( - name = "sandboxing_writable_path", - defaultValue = "", - category = "build_worker", - documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, - effectTags = {OptionEffectTag.UNKNOWN}, - allowMultiple = true, - help = "When using sandboxing, allow running actions to write to this path." - ) - public List<String> sandboxingWritablePaths; - - @Option( - name = "sandboxing_tmpfs_dir", - defaultValue = "", - category = "build_worker", - documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, - effectTags = {OptionEffectTag.UNKNOWN}, - allowMultiple = true, - help = "When using sandboxing, mount an empty tmpfs onto this path for each running action." - ) - public List<String> sandboxingTmpfsDirs; - - @Option( - name = "sandboxing_block_network", - defaultValue = "false", - category = "build_worker", - documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, - effectTags = {OptionEffectTag.UNKNOWN}, - help = "When using sandboxing, block network access for running actions." - ) - public boolean sandboxingBlockNetwork; - - @Option( - name = "jobs", - defaultValue = "auto", - converter = JobsConverter.class, - category = "build_worker", - documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, - effectTags = {OptionEffectTag.UNKNOWN}, - help = - "The maximum number of concurrent jobs to run. \"auto\" means to use a reasonable value" - + " derived from the machine's hardware profile (e.g. the number of processors)." - + " Values above " + MAX_JOBS + " are not allowed." - ) - public int jobs; - - @Option( - name = "hazelcast_standalone_listen_port", - defaultValue = "0", - category = "build_worker", - documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, - effectTags = {OptionEffectTag.UNKNOWN}, - help = - "Runs an embedded hazelcast server that listens to this port. The server does not join" - + " any cluster. This is useful for testing." - ) - public int hazelcastStandaloneListenPort; - - private static final int MAX_JOBS = 16384; - - /** Converter for jobs: [0, MAX_JOBS] or "auto". */ - public static class JobsConverter extends RangeConverter { - public JobsConverter() { - super(0, MAX_JOBS); - } - - @Override - public Integer convert(String input) throws OptionsParsingException { - if (input.equals("auto")) { - int autoJobs = (int) Math.ceil(LocalHostCapacity.getLocalHostCapacity().getCpuUsage()); - return Math.min(autoJobs, MAX_JOBS); - } else { - return super.convert(input); - } - } - - @Override - public String getTypeDescription() { - return "\"auto\" or " + super.getTypeDescription(); - } - } -} diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java deleted file mode 100644 index 494ee58453..0000000000 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.remote; - -import com.google.devtools.remoteexecution.v1test.Digest; -import com.google.protobuf.Any; -import com.google.rpc.BadRequest; -import com.google.rpc.BadRequest.FieldViolation; -import com.google.rpc.Code; -import com.google.rpc.Status; -import io.grpc.StatusException; -import io.grpc.protobuf.StatusProto; - -/** Some utility methods to convert exceptions to Status results. */ -final class StatusUtils { - private StatusUtils() {} - - static StatusException internalError(Exception e) { - return StatusProto.toStatusException(internalErrorStatus(e)); - } - - static Status internalErrorStatus(Exception e) { - // StatusProto.fromThrowable returns null on non-status errors or errors with no trailers, - // unlike Status.fromThrowable which returns the UNKNOWN code for these. - Status st = StatusProto.fromThrowable(e); - return st != null - ? st - : Status.newBuilder().setCode(Code.INTERNAL.getNumber()).setMessage(e.getMessage()).build(); - } - - static StatusException notFoundError(Digest digest) { - return StatusProto.toStatusException(notFoundStatus(digest)); - } - - static com.google.rpc.Status notFoundStatus(Digest digest) { - return Status.newBuilder() - .setCode(Code.NOT_FOUND.getNumber()) - .setMessage("Digest not found:" + digest) - .build(); - } - - static StatusException interruptedError(Digest digest) { - return StatusProto.toStatusException(interruptedStatus(digest)); - } - - static com.google.rpc.Status interruptedStatus(Digest digest) { - return Status.newBuilder() - .setCode(Code.CANCELLED.getNumber()) - .setMessage("Server operation was interrupted for " + digest) - .build(); - } - - static StatusException invalidArgumentError(String field, String desc) { - return StatusProto.toStatusException(invalidArgumentStatus(field, desc)); - } - - static com.google.rpc.Status invalidArgumentStatus(String field, String desc) { - FieldViolation v = FieldViolation.newBuilder().setField(field).setDescription(desc).build(); - return Status.newBuilder() - .setCode(Code.INVALID_ARGUMENT.getNumber()) - .setMessage("invalid argument(s): " + field + ": " + desc) - .addDetails(Any.pack(BadRequest.newBuilder().addFieldViolations(v).build())) - .build(); - } -} diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java deleted file mode 100644 index 9a242f92f1..0000000000 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.remote; - -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.devtools.remoteexecution.v1test.ActionResult; -import com.google.devtools.remoteexecution.v1test.ExecuteResponse; -import com.google.longrunning.Operation; -import com.google.protobuf.Any; -import com.google.rpc.Code; -import com.google.rpc.Status; -import com.google.watcher.v1.Change; -import com.google.watcher.v1.ChangeBatch; -import com.google.watcher.v1.Request; -import com.google.watcher.v1.WatcherGrpc.WatcherImplBase; -import io.grpc.protobuf.StatusProto; -import io.grpc.stub.StreamObserver; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** A basic implementation of an {@link WatcherImplBase} service. */ -final class WatcherServer extends WatcherImplBase { - private static final Logger logger = Logger.getLogger(WatcherServer.class.getName()); - - private final ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache; - - public WatcherServer(ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache) { - this.operationsCache = operationsCache; - } - - @Override - public void watch(Request wr, StreamObserver<ChangeBatch> responseObserver) { - final String opName = wr.getTarget(); - ListenableFuture<ActionResult> future = operationsCache.get(opName); - if (future == null) { - responseObserver.onError( - StatusProto.toStatusRuntimeException( - Status.newBuilder() - .setCode(Code.NOT_FOUND.getNumber()) - .setMessage("Operation not found: " + opName) - .build())); - return; - } - - future.addListener(() -> { - try { - try { - ActionResult result = future.get(); - responseObserver.onNext( - packExists( - Operation.newBuilder() - .setName(opName) - .setDone(true) - .setResponse( - Any.pack(ExecuteResponse.newBuilder().setResult(result).build())))); - responseObserver.onCompleted(); - } catch (ExecutionException e) { - Throwables.throwIfUnchecked(e.getCause()); - throw (Exception) e.getCause(); - } - } catch (Exception e) { - logger.log(Level.SEVERE, "Work failed: " + opName, e); - responseObserver.onNext( - ChangeBatch.newBuilder() - .addChanges( - Change.newBuilder() - .setState(Change.State.EXISTS) - .setData( - Any.pack( - Operation.newBuilder() - .setName(opName) - .setError(StatusUtils.internalErrorStatus(e)) - .build())) - .build()) - .build()); - responseObserver.onCompleted(); - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - } finally { - operationsCache.remove(opName); - } - }, MoreExecutors.directExecutor()); - } - - /** Constructs a ChangeBatch with an exists state change that contains the given operation. */ - private ChangeBatch packExists(Operation.Builder message) { - return ChangeBatch.newBuilder() - .addChanges( - Change.newBuilder() - .setState(Change.State.EXISTS) - .setData( - Any.pack(message.build()))) - .build(); - } -} |