aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Ola Rozenfeld <olaola@google.com>2016-09-19 15:02:32 +0000
committerGravatar Laszlo Csomor <laszlocsomor@google.com>2016-09-20 06:45:40 +0000
commit72d117d6fe9485b2ab8cf7938c029cc26d1787da (patch)
treea30a0489c94897d3b1085dc3c147b48d0cd9a4fc
parent11fe6abc1703a552701a55241ff6ae8157d81950 (diff)
Description redacted.
-- MOS_MIGRATED_REVID=133584935
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java23
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java205
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java (renamed from src/main/java/com/google/devtools/build/lib/remote/RestUrlCacheFactory.java)77
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/HazelcastCacheFactory.java67
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/MemcacheWorkExecutor.java209
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java84
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java46
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java12
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java318
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java42
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java78
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/WorkTooLargeException.java30
-rw-r--r--src/main/protobuf/remote_protocol.proto78
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java163
14 files changed, 674 insertions, 758 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
index 0e4e52c1d4..5820e0bf95 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
@@ -14,20 +14,25 @@
package com.google.devtools.build.lib.remote;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+
/**
- * An exception to indicate the cache is not found because of an expected
- * problem.
+ * An exception to indicate cache misses.
+ * TODO(olaola): have a class of checked RemoteCacheExceptions.
*/
-final class CacheNotFoundException extends RuntimeException {
- CacheNotFoundException() {
- super();
+public final class CacheNotFoundException extends Exception {
+ private final ContentDigest missingDigest;
+
+ CacheNotFoundException(ContentDigest missingDigest) {
+ this.missingDigest = missingDigest;
}
- CacheNotFoundException(String message) {
- super(message);
+ public ContentDigest getMissingDigest() {
+ return missingDigest;
}
- CacheNotFoundException(String message, Throwable cause) {
- super(message, cause);
+ @Override
+ public String toString() {
+ return "Missing digest: " + ContentDigests.toString(missingDigest);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java
index 7e27653e76..72c75700b5 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java
@@ -14,18 +14,24 @@
package com.google.devtools.build.lib.remote;
-import com.google.common.hash.HashCode;
-import com.google.devtools.build.lib.actions.ActionInput;
-import com.google.devtools.build.lib.actions.ActionInputFileCache;
+import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CacheEntry;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileEntry;
+import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata;
+import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
+import com.google.devtools.build.lib.remote.RemoteProtocol.Output;
+import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase;
+import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
@@ -38,121 +44,156 @@ import java.util.concurrent.Semaphore;
*/
@ThreadSafe
public final class ConcurrentMapActionCache implements RemoteActionCache {
- private final Path execRoot;
private final ConcurrentMap<String, byte[]> cache;
private static final int MAX_MEMORY_KBYTES = 512 * 1024;
private final Semaphore uploadMemoryAvailable = new Semaphore(MAX_MEMORY_KBYTES, true);
- public ConcurrentMapActionCache(Path execRoot, ConcurrentMap<String, byte[]> cache) {
- this.execRoot = execRoot;
+ public ConcurrentMapActionCache(ConcurrentMap<String, byte[]> cache) {
this.cache = cache;
}
@Override
- public String putFileIfNotExist(ActionInputFileCache cache, ActionInput file)
+ public void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root)
throws IOException, InterruptedException {
- String contentKey = HashCode.fromBytes(cache.getDigest(file)).toString();
- if (containsFile(contentKey)) {
- return contentKey;
+ repository.computeMerkleDigests(root);
+ for (FileNode fileNode : repository.treeToFileNodes(root)) {
+ uploadBlob(fileNode.toByteArray());
+ }
+ for (TreeNode leaf : repository.leaves(root)) {
+ uploadFileContents(execRoot.getRelative(leaf.getActionInput().getExecPathString()));
}
- putFile(contentKey, execRoot.getRelative(file.getExecPathString()));
- return contentKey;
}
- private void putFile(String key, Path file) throws IOException, InterruptedException {
- int fileSizeKBytes = (int) (file.getFileSize() / 1024);
- Preconditions.checkArgument(fileSizeKBytes < MAX_MEMORY_KBYTES);
- try {
- uploadMemoryAvailable.acquire(fileSizeKBytes);
- // TODO(alpha): I should put the file content as chunks to avoid reading the entire
- // file into memory.
- try (InputStream stream = file.getInputStream()) {
- cache.put(
- key,
- CacheEntry.newBuilder()
- .setFileContent(ByteString.readFrom(stream))
- .build()
- .toByteArray());
- }
- } finally {
- uploadMemoryAvailable.release(fileSizeKBytes);
+ @Override
+ public void downloadTree(ContentDigest rootDigest, Path rootLocation)
+ throws IOException, CacheNotFoundException {
+ FileNode fileNode = FileNode.parseFrom(downloadBlob(rootDigest));
+ if (fileNode.hasFileMetadata()) {
+ FileMetadata meta = fileNode.getFileMetadata();
+ downloadFileContents(meta.getDigest(), rootLocation, meta.getExecutable());
+ }
+ for (FileNode.Child child : fileNode.getChildList()) {
+ downloadTree(child.getDigest(), rootLocation.getRelative(child.getPath()));
}
}
@Override
- public void writeFile(String key, Path dest, boolean executable)
+ public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException {
+ // This unconditionally reads the whole file into memory first!
+ return uploadBlob(ByteString.readFrom(file.getInputStream()).toByteArray());
+ }
+
+ @Override
+ public void downloadAllResults(ActionResult result, Path execRoot)
throws IOException, CacheNotFoundException {
- byte[] data = cache.get(key);
- if (data == null) {
- throw new CacheNotFoundException("File content cannot be found with key: " + key);
- }
- try (OutputStream stream = dest.getOutputStream()) {
- CacheEntry.parseFrom(data).getFileContent().writeTo(stream);
- dest.setExecutable(executable);
+ for (Output output : result.getOutputList()) {
+ if (output.getContentCase() == ContentCase.FILE_METADATA) {
+ FileMetadata m = output.getFileMetadata();
+ downloadFileContents(
+ m.getDigest(), execRoot.getRelative(output.getPath()), m.getExecutable());
+ } else {
+ downloadTree(output.getDigest(), execRoot.getRelative(output.getPath()));
+ }
}
}
- private boolean containsFile(String key) {
- return cache.containsKey(key);
+ @Override
+ public void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result)
+ throws IOException, InterruptedException {
+ for (Path file : files) {
+ if (file.isDirectory()) {
+ // TODO(olaola): to implement this for a directory, will need to create or pass a
+ // TreeNodeRepository to call uploadTree.
+ throw new UnsupportedOperationException("Storing a directory is not yet supported.");
+ }
+ // First put the file content to cache.
+ ContentDigest digest = uploadFileContents(file);
+ // Add to protobuf.
+ result
+ .addOutputBuilder()
+ .setPath(file.relativeTo(execRoot).getPathString())
+ .getFileMetadataBuilder()
+ .setDigest(digest)
+ .setExecutable(file.isExecutable());
+ }
}
@Override
- public void writeActionOutput(String key, Path execRoot)
+ public void downloadFileContents(ContentDigest digest, Path dest, boolean executable)
throws IOException, CacheNotFoundException {
- byte[] data = cache.get(key);
- if (data == null) {
- throw new CacheNotFoundException("Action output cannot be found with key: " + key);
+ // This unconditionally downloads the whole file into memory first!
+ byte[] contents = downloadBlob(digest);
+ FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory());
+ try (OutputStream stream = dest.getOutputStream()) {
+ stream.write(contents);
}
- CacheEntry cacheEntry = CacheEntry.parseFrom(data);
- for (FileEntry file : cacheEntry.getFilesList()) {
- writeFile(file.getContentKey(), execRoot.getRelative(file.getPath()), file.getExecutable());
+ dest.setExecutable(executable);
+ }
+
+ @Override
+ public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs)
+ throws InterruptedException {
+ ArrayList<ContentDigest> digests = new ArrayList<>();
+ for (byte[] blob : blobs) {
+ digests.add(uploadBlob(blob));
}
+ return ImmutableList.copyOf(digests);
}
@Override
- public void putActionOutput(String key, Collection<? extends ActionInput> outputs)
- throws IOException, InterruptedException {
- CacheEntry.Builder actionOutput = CacheEntry.newBuilder();
- for (ActionInput output : outputs) {
- Path file = execRoot.getRelative(output.getExecPathString());
- addToActionOutput(file, output.getExecPathString(), actionOutput);
+ public ContentDigest uploadBlob(byte[] blob) throws InterruptedException {
+ int blobSizeKBytes = blob.length / 1024;
+ Preconditions.checkArgument(blobSizeKBytes < MAX_MEMORY_KBYTES);
+ ContentDigest digest = ContentDigests.computeDigest(blob);
+ uploadMemoryAvailable.acquire(blobSizeKBytes);
+ try {
+ cache.put(ContentDigests.toHexString(digest), blob);
+ } finally {
+ uploadMemoryAvailable.release(blobSizeKBytes);
}
- cache.put(key, actionOutput.build().toByteArray());
+ return digest;
}
@Override
- public void putActionOutput(String key, Path execRoot, Collection<Path> files)
- throws IOException, InterruptedException {
- CacheEntry.Builder actionOutput = CacheEntry.newBuilder();
- for (Path file : files) {
- addToActionOutput(file, file.relativeTo(execRoot).getPathString(), actionOutput);
+ public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException {
+ if (digest.getSizeBytes() == 0) {
+ return new byte[0];
+ }
+ // This unconditionally downloads the whole blob into memory!
+ Preconditions.checkArgument((int) (digest.getSizeBytes() / 1024) < MAX_MEMORY_KBYTES);
+ byte[] data = cache.get(ContentDigests.toHexString(digest));
+ if (data == null) {
+ throw new CacheNotFoundException(digest);
}
- cache.put(key, actionOutput.build().toByteArray());
+ return data;
}
- /** Add the file to action output cache entry. Put the file to cache if necessary. */
- private void addToActionOutput(Path file, String execPathString, CacheEntry.Builder actionOutput)
- throws IOException, InterruptedException {
- if (file.isDirectory()) {
- // TODO(alpha): Implement this for directory.
- throw new UnsupportedOperationException("Storing a directory is not yet supported.");
+ @Override
+ public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
+ throws CacheNotFoundException {
+ ArrayList<byte[]> blobs = new ArrayList<>();
+ for (ContentDigest c : digests) {
+ blobs.add(downloadBlob(c));
}
- // First put the file content to cache.
- String contentKey = putFileIfNotExist(file);
- // Add to protobuf.
- actionOutput
- .addFilesBuilder()
- .setPath(execPathString)
- .setContentKey(contentKey)
- .setExecutable(file.isExecutable());
+ return ImmutableList.copyOf(blobs);
}
- private String putFileIfNotExist(Path file) throws IOException, InterruptedException {
- String contentKey = HashCode.fromBytes(file.getMD5Digest()).toString();
- if (containsFile(contentKey)) {
- return contentKey;
+ @Override
+ public ActionResult getCachedActionResult(ActionKey actionKey) {
+ byte[] data = cache.get(ContentDigests.toHexString(actionKey.getDigest()));
+ if (data == null) {
+ return null;
}
- putFile(contentKey, file);
- return contentKey;
+ try {
+ return ActionResult.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public void setCachedActionResult(ActionKey actionKey, ActionResult result)
+ throws InterruptedException {
+ cache.put(ContentDigests.toHexString(actionKey.getDigest()), result.toByteArray());
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RestUrlCacheFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java
index aa98b253e7..3461673011 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RestUrlCacheFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java
@@ -14,6 +14,13 @@
package com.google.devtools.build.lib.remote;
+import com.hazelcast.client.HazelcastClient;
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.client.config.ClientNetworkConfig;
+import com.hazelcast.client.config.XmlClientConfigBuilder;
+import com.hazelcast.config.Config;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collection;
@@ -32,13 +39,45 @@ import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
/**
- * A factory class for providing a {@link ConcurrentMap} object implemented by a REST service. The
- * URL has to support PUT, GET, and HEAD operations
+ * A factory class for providing a {@link ConcurrentMap} objects to be used with
+ * {@link ConcurrentMapActionCache} objects. The underlying maps can be Hazelcast or RestUrl based.
*/
-public final class RestUrlCacheFactory {
+public final class ConcurrentMapFactory {
- public static ConcurrentMap<String, byte[]> create(RemoteOptions options) {
- return new RestUrlCache(options.restCacheUrl);
+ private static final String HAZELCAST_CACHE_NAME = "hazelcast-build-cache";
+
+ private ConcurrentMapFactory() {}
+
+ public static ConcurrentMap<String, byte[]> createHazelcast(RemoteOptions options) {
+ HazelcastInstance instance;
+ if (options.hazelcastClientConfig != null) {
+ try {
+ ClientConfig config = new XmlClientConfigBuilder(options.hazelcastClientConfig).build();
+ instance = HazelcastClient.newHazelcastClient(config);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else if (options.hazelcastNode != null) {
+ // If --hazelcast_node is specified then create a client instance.
+ ClientConfig config = new ClientConfig();
+ ClientNetworkConfig net = config.getNetworkConfig();
+ net.addAddress(options.hazelcastNode.split(","));
+ instance = HazelcastClient.newHazelcastClient(config);
+ } else if (options.hazelcastStandaloneListenPort != 0) {
+ Config config = new Config();
+ config
+ .getNetworkConfig()
+ .setPort(options.hazelcastStandaloneListenPort)
+ .getJoin()
+ .getMulticastConfig()
+ .setEnabled(false);
+ instance = Hazelcast.newHazelcastInstance(config);
+ } else {
+ // Otherwise create a default instance. This is going to look at
+ // -Dhazelcast.config=some-hazelcast.xml for configuration.
+ instance = Hazelcast.newHazelcastInstance();
+ }
+ return instance.getMap(HAZELCAST_CACHE_NAME);
}
private static class RestUrlCache implements ConcurrentMap<String, byte[]> {
@@ -172,4 +211,32 @@ public final class RestUrlCacheFactory {
throw new UnsupportedOperationException();
}
}
+
+ public static ConcurrentMap<String, byte[]> createRestUrl(RemoteOptions options) {
+ return new RestUrlCache(options.restCacheUrl);
+ }
+
+ public static ConcurrentMap<String, byte[]> create(RemoteOptions options) {
+ if (isHazelcastOptions(options)) {
+ return createHazelcast(options);
+ }
+ if (isRestUrlOptions(options)) {
+ return createRestUrl(options);
+ }
+ throw new IllegalArgumentException(
+ "Unrecognized concurrent map RemoteOptions: must specify "
+ + "either Hazelcast or Rest URL options.");
+ }
+
+ public static boolean isRemoteCacheOptions(RemoteOptions options) {
+ return isHazelcastOptions(options) || isRestUrlOptions(options);
+ }
+
+ private static boolean isHazelcastOptions(RemoteOptions options) {
+ return options.hazelcastNode != null || options.hazelcastClientConfig != null;
+ }
+
+ private static boolean isRestUrlOptions(RemoteOptions options) {
+ return options.restCacheUrl != null;
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/HazelcastCacheFactory.java b/src/main/java/com/google/devtools/build/lib/remote/HazelcastCacheFactory.java
deleted file mode 100644
index b135f67b6b..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/HazelcastCacheFactory.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.lib.remote;
-
-import com.hazelcast.client.HazelcastClient;
-import com.hazelcast.client.config.ClientConfig;
-import com.hazelcast.client.config.ClientNetworkConfig;
-import com.hazelcast.client.config.XmlClientConfigBuilder;
-import com.hazelcast.config.Config;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * A factory class for providing a {@link ConcurrentMap} object implemented by Hazelcast.
- * Hazelcast will work as a distributed memory cache.
- */
-public final class HazelcastCacheFactory {
-
- private static final String CACHE_NAME = "hazelcast-build-cache";
-
- public static ConcurrentMap<String, byte[]> create(RemoteOptions options) {
- HazelcastInstance instance;
- if (options.hazelcastClientConfig != null) {
- try {
- ClientConfig config = new XmlClientConfigBuilder(options.hazelcastClientConfig).build();
- instance = HazelcastClient.newHazelcastClient(config);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- } else if (options.hazelcastNode != null) {
- // If --hazelcast_node is then create a client instance.
- ClientConfig config = new ClientConfig();
- ClientNetworkConfig net = config.getNetworkConfig();
- net.addAddress(options.hazelcastNode.split(","));
- instance = HazelcastClient.newHazelcastClient(config);
- } else if (options.hazelcastStandaloneListenPort != 0) {
- Config config = new Config();
- config
- .getNetworkConfig()
- .setPort(options.hazelcastStandaloneListenPort)
- .getJoin()
- .getMulticastConfig()
- .setEnabled(false);
- instance = Hazelcast.newHazelcastInstance(config);
- } else {
- // Otherwise create a default instance. This is going to look at
- // -Dhazelcast.config=some-hazelcast.xml for configuration.
- instance = Hazelcast.newHazelcastInstance();
- }
- return instance.getMap(CACHE_NAME);
- }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/MemcacheWorkExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/MemcacheWorkExecutor.java
deleted file mode 100644
index 20cbc6058d..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/MemcacheWorkExecutor.java
+++ /dev/null
@@ -1,209 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.lib.remote;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.devtools.build.lib.actions.ActionInput;
-import com.google.devtools.build.lib.actions.ActionInputFileCache;
-import com.google.devtools.build.lib.actions.Artifact;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileEntry;
-import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkResponse;
-import com.google.devtools.build.lib.remote.RemoteWorkGrpc.RemoteWorkFutureStub;
-import com.google.devtools.build.lib.shell.Command;
-import com.google.devtools.build.lib.shell.CommandException;
-import com.google.devtools.build.lib.shell.CommandResult;
-import com.google.devtools.build.lib.vfs.FileSystemUtils;
-import com.google.devtools.build.lib.vfs.Path;
-import io.grpc.ManagedChannel;
-import io.grpc.netty.NettyChannelBuilder;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileAlreadyExistsException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Implementation of {@link RemoteWorkExecutor} that uses ConcurrentMapActionCache and gRPC for
- * communicating the work, inputs and outputs.
- */
-@ThreadSafe
-public class MemcacheWorkExecutor implements RemoteWorkExecutor {
- /**
- * A cache used to store the input and output files as well as the build status of the remote
- * work.
- */
- protected final ConcurrentMapActionCache cache;
-
- /** Execution root for running this work locally. */
- private final Path execRoot;
-
- /** Channel over which to send work to run remotely. */
- private final ManagedChannel channel;
-
- private static final int MAX_WORK_SIZE_BYTES = 1024 * 1024 * 512;
-
- /**
- * This constructor is used when this class is used in a client. It requires a host address and
- * port to connect to a remote service.
- */
- private MemcacheWorkExecutor(ConcurrentMapActionCache cache, String host, int port) {
- this.cache = cache;
- this.execRoot = null;
- this.channel = NettyChannelBuilder.forAddress(host, port).usePlaintext(true).build();
- }
-
- /**
- * This constructor is used when this class is used in the remote worker. A path to the execution
- * root is needed for executing work locally.
- */
- private MemcacheWorkExecutor(ConcurrentMapActionCache cache, Path execRoot) {
- this.cache = cache;
- this.execRoot = execRoot;
- this.channel = null;
- }
-
- /**
- * Create an instance of MemcacheWorkExecutor that talks to a remote server.
- *
- * @param cache An instance of ConcurrentMapActionCache.
- * @param host Hostname of the server to connect to.
- * @param port Port of the server to connect to.
- * @return An instance of MemcacheWorkExecutor that talks to a remote server.
- */
- public static MemcacheWorkExecutor createRemoteWorkExecutor(
- ConcurrentMapActionCache cache, String host, int port) {
- return new MemcacheWorkExecutor(cache, host, port);
- }
-
- /**
- * Create an instance of MemcacheWorkExecutor that runs locally.
- *
- * @param cache An instance of ConcurrentMapActionCache.
- * @param execRoot Path of the execution root where work is executed.
- * @return An instance of MemcacheWorkExecutor tthat runs locally in the execution root.
- */
- public static MemcacheWorkExecutor createLocalWorkExecutor(
- ConcurrentMapActionCache cache, Path execRoot) {
- return new MemcacheWorkExecutor(cache, execRoot);
- }
-
- @Override
- public ListenableFuture<RemoteWorkResponse> executeRemotely(
- Path execRoot,
- ActionInputFileCache actionCache,
- String actionOutputKey,
- Collection<String> arguments,
- Collection<ActionInput> inputs,
- ImmutableMap<String, String> environment,
- Collection<? extends ActionInput> outputs,
- int timeout)
- throws IOException, WorkTooLargeException, InterruptedException {
- RemoteWorkRequest.Builder work = RemoteWorkRequest.newBuilder();
- work.setOutputKey(actionOutputKey);
-
- long workSize = 0;
- for (ActionInput input : inputs) {
- if (!(input instanceof Artifact)) {
- continue;
- }
- if (!actionCache.isFile((Artifact) input)) {
- continue;
- }
- workSize += actionCache.getSizeInBytes(input);
- }
-
- if (workSize > MAX_WORK_SIZE_BYTES) {
- throw new WorkTooLargeException("Work is too large: " + workSize + " bytes.");
- }
-
- // Save all input files to cache.
- for (ActionInput input : inputs) {
- Path file = execRoot.getRelative(input.getExecPathString());
-
- if (file.isDirectory()) {
- // TODO(alpha): Handle this case better.
- throw new UnsupportedOperationException(
- "Does not support directory artifacts: " + file + ".");
- }
-
- String contentKey = cache.putFileIfNotExist(actionCache, input);
- work.addInputFilesBuilder()
- .setPath(input.getExecPathString())
- .setContentKey(contentKey)
- .setExecutable(file.isExecutable());
- }
-
- work.addAllArguments(arguments);
- work.putAllEnvironment(environment);
- for (ActionInput output : outputs) {
- work.addOutputFilesBuilder().setPath(output.getExecPathString());
- }
-
- RemoteWorkFutureStub stub = RemoteWorkGrpc.newFutureStub(channel);
- work.setTimeout(timeout);
- return stub.executeSynchronously(work.build());
- }
-
- /** Execute a work item locally. */
- public RemoteWorkResponse executeLocally(RemoteWorkRequest work)
- throws IOException, InterruptedException {
- ByteArrayOutputStream stdout = new ByteArrayOutputStream();
- ByteArrayOutputStream stderr = new ByteArrayOutputStream();
- try {
- // Prepare directories and input files.
- for (FileEntry input : work.getInputFilesList()) {
- Path file = execRoot.getRelative(input.getPath());
- FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
- cache.writeFile(input.getContentKey(), file, input.getExecutable());
- }
-
- List<Path> outputs = new ArrayList<>(work.getOutputFilesList().size());
- for (FileEntry output : work.getOutputFilesList()) {
- Path file = execRoot.getRelative(output.getPath());
- if (file.exists()) {
- throw new FileAlreadyExistsException("Output file already exists: " + file);
- }
- FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
- outputs.add(file);
- }
-
- Command cmd =
- new Command(
- work.getArgumentsList().toArray(new String[] {}),
- work.getEnvironment(),
- new File(execRoot.getPathString()));
- CommandResult result =
- cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true);
- cache.putActionOutput(work.getOutputKey(), execRoot, outputs);
- return RemoteWorkResponse.newBuilder()
- .setSuccess(result.getTerminationStatus().success())
- .setOut(stdout.toString())
- .setErr(stderr.toString())
- .build();
- } catch (CommandException e) {
- return RemoteWorkResponse.newBuilder()
- .setSuccess(false)
- .setOut(stdout.toString())
- .setErr(stderr.toString())
- .setException(e.toString())
- .build();
- }
- }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
index e865c96e2c..62790e8a76 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
@@ -14,47 +14,87 @@
package com.google.devtools.build.lib.remote;
-import com.google.devtools.build.lib.actions.ActionInput;
-import com.google.devtools.build.lib.actions.ActionInputFileCache;
+import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
+import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.vfs.Path;
import java.io.IOException;
import java.util.Collection;
+import javax.annotation.Nullable;
-/**
- * A cache for storing artifacts (input and output) as well as the output of running an action.
- */
+/** A cache for storing artifacts (input and output) as well as the output of running an action. */
@ThreadCompatible
interface RemoteActionCache {
+ // CAS API
+
+ // TODO(olaola): create a unified set of exceptions raised by the cache to encapsulate the
+ // underlying CasStatus messages and gRPC errors errors.
+
/**
- * Put the file in cache if it is not already in it. No-op if the file is already stored in cache.
- *
- * @return The key for fetching the file from cache.
+ * Upload enough of the tree metadata and data into remote cache so that the entire tree can be
+ * reassembled remotely using the root digest.
*/
- String putFileIfNotExist(ActionInputFileCache cache, ActionInput file)
+ void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root)
throws IOException, InterruptedException;
/**
- * Write the file in cache identified by key to the file system. The key must uniquely identify
- * the content of the file. Throws CacheNotFoundException if the file is not found in cache.
+ * Download the entire tree data rooted by the given digest and write it into the given location.
*/
- void writeFile(String key, Path dest, boolean executable)
+ void downloadTree(ContentDigest rootDigest, Path rootLocation)
throws IOException, CacheNotFoundException;
/**
- * Write the action output files identified by the key to the file system. The key must uniquely
- * identify the action and the content of action inputs.
- *
- * @throws CacheNotFoundException if action output is not found in cache.
+ * Download all results of a remotely executed action locally. TODO(olaola): will need to amend to
+ * include the {@link com.google.devtools.build.lib.remote.TreeNodeRepository} for updating.
*/
- void writeActionOutput(String key, Path execRoot)
+ void downloadAllResults(ActionResult result, Path execRoot)
throws IOException, CacheNotFoundException;
- /** Update the cache with the action outputs for the specified key. */
- void putActionOutput(String key, Collection<? extends ActionInput> outputs)
+ /**
+ * Upload all results of a locally executed action to the cache. Add the files to the ActionResult
+ * builder.
+ */
+ void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result)
throws IOException, InterruptedException;
- /** Update the cache with the files for the specified key. */
- void putActionOutput(String key, Path execRoot, Collection<Path> files)
- throws IOException, InterruptedException;
+ /**
+ * Put the file contents cache if it is not already in it. No-op if the file is already stored in
+ * cache. The given path must be a full absolute path. Note: this is horribly inefficient, need to
+ * patch through an overload that uses an ActionInputFile cache to compute the digests!
+ *
+ * @return The key for fetching the file contents blob from cache.
+ */
+ ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException;
+
+ /**
+ * Download a blob keyed by the given digest and write it to the specified path. Set the
+ * executable parameter to the specified value.
+ */
+ void downloadFileContents(ContentDigest digest, Path dest, boolean executable)
+ throws IOException, CacheNotFoundException;
+
+ /** Upload the given blobs to the cache, and return their digests. */
+ ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) throws InterruptedException;
+
+ /** Upload the given blob to the cache, and return its digests. */
+ ContentDigest uploadBlob(byte[] blob) throws InterruptedException;
+
+ /** Download and return a blob with a given digest from the cache. */
+ byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException;
+
+ /** Download and return blobs with given digests from the cache. */
+ ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
+ throws CacheNotFoundException;
+
+ // Execution Cache API
+
+ /** Returns a cached result for a given Action digest, or null if not found in cache. */
+ @Nullable
+ ActionResult getCachedActionResult(ActionKey actionKey);
+
+ /** Sets the given result as result of the given Action. */
+ void setCachedActionResult(ActionKey actionKey, ActionResult result) throws InterruptedException;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index 16ce15ca65..93f88ad244 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -17,6 +17,7 @@ package com.google.devtools.build.lib.remote;
import com.google.common.collect.ImmutableList;
import com.google.common.eventbus.Subscribe;
import com.google.devtools.build.lib.actions.ActionContextProvider;
+import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException;
import com.google.devtools.build.lib.buildtool.BuildRequest;
import com.google.devtools.build.lib.buildtool.buildevent.BuildStartingEvent;
import com.google.devtools.build.lib.events.Event;
@@ -24,12 +25,8 @@ import com.google.devtools.build.lib.runtime.BlazeModule;
import com.google.devtools.build.lib.runtime.Command;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
import com.google.devtools.common.options.OptionsBase;
-import java.net.URI;
-import java.net.URISyntaxException;
-/**
- * RemoteModule provides distributed cache and remote execution for Bazel.
- */
+/** RemoteModule provides distributed cache and remote execution for Bazel. */
public final class RemoteModule extends BlazeModule {
private CommandEnvironment env;
private BuildRequest buildRequest;
@@ -61,38 +58,21 @@ public final class RemoteModule extends BlazeModule {
buildRequest = event.getRequest();
RemoteOptions options = buildRequest.getOptions(RemoteOptions.class);
- ConcurrentMapActionCache cache = null;
+ try {
+ // Reinitialize the remote cache and worker from options every time, because the options
+ // may change from build to build.
- // Don't provide the remote spawn unless at least action cache is initialized.
- if (actionCache == null) {
- if (options.hazelcastNode != null || options.hazelcastClientConfig != null) {
- cache =
- new ConcurrentMapActionCache(
- this.env.getDirectories().getExecRoot(),
- HazelcastCacheFactory.create(options));
- } else if (options.restCacheUrl != null) {
- cache =
- new ConcurrentMapActionCache(
- this.env.getDirectories().getExecRoot(),
- RestUrlCacheFactory.create(options));
+ // Don't provide the remote spawn unless at least action cache is initialized.
+ if (ConcurrentMapFactory.isRemoteCacheOptions(options)) {
+ actionCache = new ConcurrentMapActionCache(ConcurrentMapFactory.create(options));
}
- actionCache = cache;
- }
+ // Otherwise actionCache remains null and remote caching/execution are disabled.
- if (cache != null) {
- if (workExecutor == null && options.remoteWorker != null) {
- try {
- URI uri = new URI("dummy://" + options.remoteWorker);
- if (uri.getHost() == null || uri.getPort() == -1) {
- throw new URISyntaxException("Invalid host or port.", "");
- }
- workExecutor =
- MemcacheWorkExecutor.createRemoteWorkExecutor(cache, uri.getHost(), uri.getPort());
- } catch (URISyntaxException e) {
- env.getReporter()
- .handle(Event.warn("Invalid argument for the address of remote worker."));
- }
+ if (actionCache != null && RemoteWorkExecutor.isRemoteExecutionOptions(options)) {
+ workExecutor = new RemoteWorkExecutor(options);
}
+ } catch (InvalidConfigurationException e) {
+ env.getReporter().handle(Event.warn(e.toString()));
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
index 78747c8090..9ed8ff2f8d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
@@ -17,9 +17,7 @@ package com.google.devtools.build.lib.remote;
import com.google.devtools.common.options.Option;
import com.google.devtools.common.options.OptionsBase;
-/**
- * Options for remote execution and distributed caching.
- */
+/** Options for remote execution and distributed caching. */
public final class RemoteOptions extends OptionsBase {
@Option(
name = "rest_cache_url",
@@ -66,4 +64,12 @@ public final class RemoteOptions extends OptionsBase {
+ "For client mode only."
)
public String remoteWorker;
+
+ @Option(
+ name = "grpc_timeout_seconds",
+ defaultValue = "60",
+ category = "remote",
+ help = "The maximal number of seconds to wait for remote calls. For client mode only."
+ )
+ public int grpcTimeoutSeconds;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
index 2244ca73cc..d2b30cef97 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
@@ -14,41 +14,44 @@
package com.google.devtools.build.lib.remote;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
-import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionInput;
-import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.ExecutionStrategy;
-import com.google.devtools.build.lib.actions.Executor;
import com.google.devtools.build.lib.actions.Spawn;
import com.google.devtools.build.lib.actions.SpawnActionContext;
import com.google.devtools.build.lib.actions.Spawns;
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
-import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkResponse;
+import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
+import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
+import com.google.devtools.build.lib.remote.RemoteProtocol.Command;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
+import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.standalone.StandaloneSpawnStrategy;
-import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
+import io.grpc.StatusRuntimeException;
import java.io.IOException;
-import java.nio.charset.Charset;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.TreeSet;
/**
- * Strategy that uses a distributed cache for sharing action input and output files.
- * Optionally this strategy also support offloading the work to a remote worker.
+ * Strategy that uses a distributed cache for sharing action input and output files. Optionally this
+ * strategy also support offloading the work to a remote worker.
*/
@ExecutionStrategy(
name = {"remote"},
@@ -74,179 +77,170 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
this.remoteWorkExecutor = workExecutor;
}
- /** Executes the given {@code spawn}. */
- @Override
- public void exec(Spawn spawn, ActionExecutionContext actionExecutionContext)
- throws ExecException, InterruptedException {
- if (!spawn.isRemotable()) {
- standaloneStrategy.exec(spawn, actionExecutionContext);
- return;
+ private Action buildAction(
+ Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) {
+ Action.Builder action = Action.newBuilder();
+ action.setCommandDigest(command);
+ action.setInputRootDigest(inputRoot);
+ // Somewhat ugly: we rely on the stable order of outputs here for remote action caching.
+ for (ActionInput output : outputs) {
+ action.addOutputPath(output.getExecPathString());
}
+ // TODO(olaola): Need to set platform as well!
+ return action.build();
+ }
- Executor executor = actionExecutionContext.getExecutor();
- ActionExecutionMetadata actionMetadata = spawn.getResourceOwner();
- ActionInputFileCache inputFileCache = actionExecutionContext.getActionInputFileCache();
- EventHandler eventHandler = executor.getEventHandler();
-
- if (remoteActionCache == null) {
- eventHandler.handle(
- Event.warn(
- spawn.getMnemonic() + " Cannot instantiate remote action cache. Running locally."));
- standaloneStrategy.exec(spawn, actionExecutionContext);
- return;
+ private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) {
+ Command.Builder command = Command.newBuilder();
+ command.addAllArgv(arguments);
+ // Sorting the environment pairs by variable name.
+ TreeSet<String> variables = new TreeSet<>(environment.keySet());
+ for (String var : variables) {
+ command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var));
}
+ return command.build();
+ }
- // Compute a hash code to uniquely identify the action plus the action inputs.
- Hasher hasher = Hashing.sha256().newHasher();
-
- // TODO(alpha): The action key is usually computed using the path to the tool and the
- // arguments. It does not take into account the content / version of the system tool (e.g. gcc).
- // Either I put information about the system tools in the hash or assume tools are always
- // checked in.
- Preconditions.checkNotNull(actionMetadata.getKey());
- hasher.putString(actionMetadata.getKey(), Charset.defaultCharset());
-
- List<ActionInput> inputs =
- ActionInputHelper.expandArtifacts(
- spawn.getInputFiles(), actionExecutionContext.getArtifactExpander());
- for (ActionInput input : inputs) {
- hasher.putString(input.getExecPathString(), Charset.defaultCharset());
+ /**
+ * Fallback: execute the spawn locally. If an ActionKey is provided, try to upload results to
+ * remote action cache.
+ */
+ private void execLocally(
+ Spawn spawn, ActionExecutionContext actionExecutionContext, ActionKey actionKey)
+ throws ExecException, InterruptedException {
+ standaloneStrategy.exec(spawn, actionExecutionContext);
+ if (remoteActionCache != null && actionKey != null) {
+ ArrayList<Path> outputFiles = new ArrayList<>();
+ for (ActionInput output : spawn.getOutputFiles()) {
+ outputFiles.add(execRoot.getRelative(output.getExecPathString()));
+ }
try {
- // TODO(alpha): The digest from ActionInputFileCache is used to detect local file
- // changes. It might not be sufficient to identify the input file globally in the
- // remote action cache. Consider upgrading this to a better hash algorithm with
- // less collision.
- hasher.putBytes(inputFileCache.getDigest(input));
+ ActionResult.Builder result = ActionResult.newBuilder();
+ remoteActionCache.uploadAllResults(execRoot, outputFiles, result);
+ remoteActionCache.setCachedActionResult(actionKey, result.build());
+ // Handle all cache errors here.
} catch (IOException e) {
- throw new UserExecException("Failed to get digest for input.", e);
+ throw new UserExecException("Unexpected IO error.", e);
+ } catch (UnsupportedOperationException e) {
+ actionExecutionContext
+ .getExecutor()
+ .getEventHandler()
+ .handle(
+ Event.warn(
+ spawn.getMnemonic() + " unsupported operation for action cache (" + e + ")"));
}
}
+ }
- // Save the action output if found in the remote action cache.
- String actionOutputKey = hasher.hash().toString();
+ private void passRemoteOutErr(ActionResult result, FileOutErr outErr) {
+ if (remoteActionCache == null) {
+ return;
+ }
+ try {
+ ImmutableList<byte[]> streams =
+ remoteActionCache.downloadBlobs(
+ ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
+ outErr.printOut(new String(streams.get(0), UTF_8));
+ outErr.printErr(new String(streams.get(1), UTF_8));
+ } catch (CacheNotFoundException e) {
+ // Ignoring.
+ }
+ }
- // Timeout for running the remote spawn.
- final int timeoutSeconds = Spawns.getTimeoutSeconds(spawn, 120);
+ /** Executes the given {@code spawn}. */
+ @Override
+ public void exec(Spawn spawn, ActionExecutionContext actionExecutionContext)
+ throws ExecException, InterruptedException {
+ if (!spawn.isRemotable() || remoteActionCache == null) {
+ standaloneStrategy.exec(spawn, actionExecutionContext);
+ return;
+ }
+
+ ActionKey actionKey = null;
+ String mnemonic = spawn.getMnemonic();
+ EventHandler eventHandler = actionExecutionContext.getExecutor().getEventHandler();
try {
- // Look up action cache using |actionOutputKey|. Reuse the action output if it is found.
- if (writeActionOutput(spawn.getMnemonic(), actionOutputKey, eventHandler, true)) {
- return;
+ // Temporary hack: the TreeNodeRepository should be created and maintained upstream!
+ TreeNodeRepository repository = new TreeNodeRepository(execRoot);
+ List<ActionInput> inputs =
+ ActionInputHelper.expandArtifacts(
+ spawn.getInputFiles(), actionExecutionContext.getArtifactExpander());
+ TreeNode inputRoot = repository.buildFromActionInputs(inputs);
+ repository.computeMerkleDigests(inputRoot);
+ Command command = buildCommand(spawn.getArguments(), spawn.getEnvironment());
+ Action action =
+ buildAction(
+ spawn.getOutputFiles(),
+ ContentDigests.computeDigest(command),
+ repository.getMerkleDigest(inputRoot));
+
+ // Look up action cache, and reuse the action output if it is found.
+ actionKey = ContentDigests.computeActionKey(action);
+ ActionResult result = remoteActionCache.getCachedActionResult(actionKey);
+ boolean acceptCached = true;
+ if (result != null) {
+ // We don't cache failed actions, so we know the outputs exist.
+ // For now, download all outputs locally; in the future, we can reuse the digests to
+ // just update the TreeNodeRepository and continue the build.
+ try {
+ remoteActionCache.downloadAllResults(result, execRoot);
+ return;
+ } catch (CacheNotFoundException e) {
+ acceptCached = false; // Retry the action remotely and invalidate the results.
+ }
}
- FileOutErr outErr = actionExecutionContext.getFileOutErr();
- if (executeWorkRemotely(
- inputFileCache,
- spawn.getMnemonic(),
- actionOutputKey,
- spawn.getArguments(),
- inputs,
- spawn.getEnvironment(),
- spawn.getOutputFiles(),
- timeoutSeconds,
- eventHandler,
- outErr)) {
+ if (remoteWorkExecutor == null) {
+ execLocally(spawn, actionExecutionContext, actionKey);
return;
}
- // If nothing works then run spawn locally.
- standaloneStrategy.exec(spawn, actionExecutionContext);
- if (remoteActionCache != null) {
- remoteActionCache.putActionOutput(actionOutputKey, spawn.getOutputFiles());
+ // Upload the command and all the inputs into the remote cache.
+ remoteActionCache.uploadBlob(command.toByteArray());
+ // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests!
+ remoteActionCache.uploadTree(repository, execRoot, inputRoot);
+ // TODO(olaola): set BuildInfo and input total bytes as well.
+ ExecuteRequest.Builder request =
+ ExecuteRequest.newBuilder()
+ .setAction(action)
+ .setAcceptCached(acceptCached)
+ .setTotalInputFileCount(inputs.size())
+ .setTimeoutMillis(1000 * Spawns.getTimeoutSeconds(spawn, 120));
+ // TODO(olaola): set sensible local and remote timouts.
+ ExecuteReply reply = remoteWorkExecutor.executeRemotely(request.build());
+ ExecutionStatus status = reply.getStatus();
+ result = reply.getResult();
+ // We do not want to pass on the remote stdout and strerr if we are going to retry the
+ // action.
+ if (status.getSucceeded()) {
+ passRemoteOutErr(result, actionExecutionContext.getFileOutErr());
+ remoteActionCache.downloadAllResults(result, execRoot);
+ return;
}
+ if (status.getError() == ExecutionStatus.ErrorCode.EXEC_FAILED) {
+ passRemoteOutErr(result, actionExecutionContext.getFileOutErr());
+ throw new UserExecException(status.getErrorDetail());
+ }
+ // For now, we retry locally on all other remote errors.
+ // TODO(olaola): add remote retries on cache miss errors.
+ execLocally(spawn, actionExecutionContext, actionKey);
} catch (IOException e) {
throw new UserExecException("Unexpected IO error.", e);
- } catch (UnsupportedOperationException e) {
- eventHandler.handle(
- Event.warn(spawn.getMnemonic() + " unsupported operation for action cache (" + e + ")"));
- }
- }
-
- /**
- * Submit work to execute remotely.
- *
- * @return True in case the action succeeded and all expected action outputs are found.
- */
- private boolean executeWorkRemotely(
- ActionInputFileCache actionCache,
- String mnemonic,
- String actionOutputKey,
- List<String> arguments,
- List<ActionInput> inputs,
- ImmutableMap<String, String> environment,
- Collection<? extends ActionInput> outputs,
- int timeout,
- EventHandler eventHandler,
- FileOutErr outErr)
- throws IOException, InterruptedException {
- if (remoteWorkExecutor == null) {
- return false;
- }
- try {
- ListenableFuture<RemoteWorkResponse> future =
- remoteWorkExecutor.executeRemotely(
- execRoot,
- actionCache,
- actionOutputKey,
- arguments,
- inputs,
- environment,
- outputs,
- timeout);
- RemoteWorkResponse response = future.get(timeout, TimeUnit.SECONDS);
- if (!response.getSuccess()) {
- String exception = "";
- if (!response.getException().isEmpty()) {
- exception = " (" + response.getException() + ")";
- }
- eventHandler.handle(
- Event.warn(
- mnemonic + " failed to execute work remotely" + exception + ", running locally"));
- return false;
- }
- outErr.printOut(response.getOut());
- outErr.printErr(response.getErr());
- } catch (ExecutionException e) {
- eventHandler.handle(
- Event.warn(mnemonic + " failed to execute work remotely (" + e + "), running locally"));
- return false;
- } catch (TimeoutException e) {
- eventHandler.handle(
- Event.warn(mnemonic + " timed out executing work remotely (" + e + "), running locally"));
- return false;
} catch (InterruptedException e) {
eventHandler.handle(Event.warn(mnemonic + " remote work interrupted (" + e + ")"));
+ Thread.currentThread().interrupt();
throw e;
- } catch (WorkTooLargeException e) {
- eventHandler.handle(Event.warn(mnemonic + " cannot be run remotely (" + e + ")"));
- return false;
- }
- return writeActionOutput(mnemonic, actionOutputKey, eventHandler, false);
- }
-
- /**
- * Saves the action output from cache. Returns true if all action outputs are found.
- */
- private boolean writeActionOutput(
- String mnemonic,
- String actionOutputKey,
- EventHandler eventHandler,
- boolean ignoreCacheNotFound)
- throws IOException {
- if (remoteActionCache == null) {
- return false;
- }
- try {
- remoteActionCache.writeActionOutput(actionOutputKey, execRoot);
- Event.info(mnemonic + " reuse action outputs from cache");
- return true;
+ } catch (StatusRuntimeException e) {
+ eventHandler.handle(Event.warn(mnemonic + " remote work failed (" + e + ")"));
+ execLocally(spawn, actionExecutionContext, actionKey);
} catch (CacheNotFoundException e) {
- if (!ignoreCacheNotFound) {
- eventHandler.handle(
- Event.warn(mnemonic + " some cache entries cannot be found (" + e + ")"));
- }
+ eventHandler.handle(Event.warn(mnemonic + " remote work results cache miss (" + e + ")"));
+ execLocally(spawn, actionExecutionContext, actionKey);
+ } catch (UnsupportedOperationException e) {
+ eventHandler.handle(
+ Event.warn(mnemonic + " unsupported operation for action cache (" + e + ")"));
}
- return false;
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
new file mode 100644
index 0000000000..9747e3ef11
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
@@ -0,0 +1,42 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.remote;
+
+import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NettyChannelBuilder;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/** Helper methods for gRPC calls */
+@ThreadSafe
+public final class RemoteUtils {
+ public static ManagedChannel createChannel(String hostAndPort)
+ throws InvalidConfigurationException {
+ try {
+ URI uri = new URI("dummy://" + hostAndPort);
+ if (uri.getHost() == null || uri.getPort() == -1) {
+ throw new URISyntaxException("Invalid host or port.", "");
+ }
+ return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort())
+ .usePlaintext(true)
+ .build();
+ } catch (URISyntaxException e) {
+ throw new InvalidConfigurationException(
+ "Invalid argument for the address of remote cache server: " + hostAndPort);
+ }
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java
index 198e9262a7..9cb080b402 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java
@@ -14,35 +14,53 @@
package com.google.devtools.build.lib.remote;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.devtools.build.lib.actions.ActionInput;
-import com.google.devtools.build.lib.actions.ActionInputFileCache;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
-import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkResponse;
-import com.google.devtools.build.lib.vfs.Path;
-import java.io.IOException;
-import java.util.Collection;
+import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
+import io.grpc.ManagedChannel;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
-/**
- * Interface for exeucting work remotely.
- */
-@ThreadCompatible
-public interface RemoteWorkExecutor {
- /**
- * Submit the work to this work executor. The output of running this action should be written to
- * {@link RemoteActionCache} indexed by {@code actionOutputKey}.
- *
- * <p>Returns a future for the response of this work request.
- */
- ListenableFuture<RemoteWorkResponse> executeRemotely(
- Path execRoot,
- ActionInputFileCache cache,
- String actionOutputKey,
- Collection<String> arguments,
- Collection<ActionInput> inputs,
- ImmutableMap<String, String> environment,
- Collection<? extends ActionInput> outputs,
- int timeout)
- throws IOException, WorkTooLargeException, InterruptedException;
+/** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */
+@ThreadSafe
+public class RemoteWorkExecutor {
+ /** Channel over which to send work to run remotely. */
+ private final ManagedChannel channel;
+ private final int grpcTimeoutSeconds;
+
+ public RemoteWorkExecutor(RemoteOptions options) throws InvalidConfigurationException {
+ channel = RemoteUtils.createChannel(options.remoteWorker);
+ grpcTimeoutSeconds = options.grpcTimeoutSeconds;
+ }
+
+ public static boolean isRemoteExecutionOptions(RemoteOptions options) {
+ return options.remoteWorker != null;
+ }
+
+ public ExecuteReply executeRemotely(ExecuteRequest request) {
+ ExecuteServiceBlockingStub stub =
+ ExecuteServiceGrpc.newBlockingStub(channel)
+ .withDeadlineAfter(
+ grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS);
+ Iterator<ExecuteReply> replies = stub.execute(request);
+ ExecuteReply reply = null;
+ while (replies.hasNext()) {
+ reply = replies.next();
+ // We can handle the action execution progress here.
+ }
+ if (reply == null) {
+ return ExecuteReply.newBuilder()
+ .setStatus(
+ ExecutionStatus.newBuilder()
+ .setExecuted(false)
+ .setSucceeded(false)
+ .setError(ExecutionStatus.ErrorCode.UNKNOWN_ERROR)
+ .setErrorDetail("Remote server terminated the connection"))
+ .build();
+ }
+ return reply;
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/WorkTooLargeException.java b/src/main/java/com/google/devtools/build/lib/remote/WorkTooLargeException.java
deleted file mode 100644
index d1498d73df..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/WorkTooLargeException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.lib.remote;
-
-/** An exception that indicates the work is too large to run remotely. */
-final class WorkTooLargeException extends RuntimeException {
- WorkTooLargeException() {
- super();
- }
-
- WorkTooLargeException(String message) {
- super(message);
- }
-
- WorkTooLargeException(String message, Throwable cause) {
- super(message, cause);
- }
-}
diff --git a/src/main/protobuf/remote_protocol.proto b/src/main/protobuf/remote_protocol.proto
index a9dd6a9ed1..c807b00d45 100644
--- a/src/main/protobuf/remote_protocol.proto
+++ b/src/main/protobuf/remote_protocol.proto
@@ -322,81 +322,3 @@ message ExecutionStatus {
// Optionally will add more details pertaining to current stage, for example
// time executing, or position in queue, etc.
}
-
-// Previous version of remote execution and caching API.
-// Will be removed soon after all code paths are migrated!
-
-// A message for cache entry.
-message CacheEntry {
- // A list of files stored in this cache entry.
- repeated FileEntry files = 1;
-
- // A blob for data that is a chunk of a file.
- bytes file_content = 2;
-}
-
-// A message for storing a file in cache.
-message FileEntry {
- // The path in the file system where to read this input artifact from. This is
- // either a path relative to the execution root (the worker process is
- // launched with the working directory set to the execution root), or an
- // absolute path.
- string path = 1;
-
- // The cache key to locate the file content. This key is usually generated
- // from
- // the content of the file such that different keys means the file content are
- // different.
- string content_key = 2;
-
- // Whether the file is an executable.
- bool executable = 3;
-
- // TODO(alpha): For large files we need to break down into chunks to store
- // in the cache. For that case we need a index for the chunks of the file.
-}
-
-// A message for running a command remotely.
-message RemoteWorkRequest {
- // The key for writing the output of this work request.
- string output_key = 1;
-
- // The arguments for running the command. The command itself is in
- // arguments[0].
- repeated string arguments = 2;
-
- // The list of input files to this work request.
- repeated FileEntry input_files = 3;
-
- // A map of environment variables for this command.
- map<string, string> environment = 4;
-
- // The list of expected output files to this work request.
- // The content keys for these entries will be empty since the files don't
- // exist yet.
- repeated FileEntry output_files = 5;
-
- // Timeout for running this command.
- int32 timeout = 6;
-}
-
-// A message for a work response.
-message RemoteWorkResponse {
- // True if the work was successful.
- bool success = 1;
-
- // String from stdout of running the work.
- string out = 2;
-
- // String from stderr of running the work.
- string err = 3;
-
- // String for the exception when running this work.
- string exception = 4;
-}
-
-service RemoteWork {
- // Perform work synchronously.
- rpc ExecuteSynchronously(RemoteWorkRequest) returns (RemoteWorkResponse) {
- }
-}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
index 414737a810..5325b8fd4b 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
@@ -14,13 +14,25 @@
package com.google.devtools.build.remote;
+import com.google.common.collect.ImmutableList;
+import com.google.devtools.build.lib.remote.CacheNotFoundException;
import com.google.devtools.build.lib.remote.ConcurrentMapActionCache;
-import com.google.devtools.build.lib.remote.HazelcastCacheFactory;
-import com.google.devtools.build.lib.remote.MemcacheWorkExecutor;
+import com.google.devtools.build.lib.remote.ConcurrentMapFactory;
+import com.google.devtools.build.lib.remote.ContentDigests;
+import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceImplBase;
import com.google.devtools.build.lib.remote.RemoteOptions;
-import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkResponse;
-import com.google.devtools.build.lib.remote.RemoteWorkGrpc.RemoteWorkImplBase;
+import com.google.devtools.build.lib.remote.RemoteProtocol;
+import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
+import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
+import com.google.devtools.build.lib.remote.RemoteProtocol.Command.EnvironmentEntry;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
+import com.google.devtools.build.lib.shell.AbnormalTerminationException;
+import com.google.devtools.build.lib.shell.Command;
+import com.google.devtools.build.lib.shell.CommandException;
import com.google.devtools.build.lib.util.OS;
import com.google.devtools.build.lib.util.ProcessUtils;
import com.google.devtools.build.lib.vfs.FileSystem;
@@ -32,11 +44,17 @@ import com.google.devtools.common.options.OptionsParser;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -44,61 +62,147 @@ import java.util.logging.Logger;
* Implements a remote worker that accepts work items as protobufs. The server implementation is
* based on grpc.
*/
-public class RemoteWorker extends RemoteWorkImplBase {
+public class RemoteWorker extends ExecuteServiceImplBase {
private static final Logger LOG = Logger.getLogger(RemoteWorker.class.getName());
private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER);
private final Path workPath;
private final RemoteOptions remoteOptions;
private final RemoteWorkerOptions options;
- private final ConcurrentMap<String, byte[]> cache;
+ private final ConcurrentMapActionCache cache;
public RemoteWorker(
Path workPath,
RemoteOptions remoteOptions,
RemoteWorkerOptions options,
- ConcurrentMap<String, byte[]> cache) {
+ ConcurrentMapActionCache cache) {
this.workPath = workPath;
this.remoteOptions = remoteOptions;
this.options = options;
this.cache = cache;
}
+ private Map<String, String> getEnvironmentVariables(RemoteProtocol.Command command) {
+ HashMap<String, String> result = new HashMap<>();
+ for (EnvironmentEntry entry : command.getEnvironmentList()) {
+ result.put(entry.getVariable(), entry.getValue());
+ }
+ return result;
+ }
+
+ public ExecuteReply execute(Action action, Path execRoot)
+ throws IOException, InterruptedException {
+ ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+ try {
+ RemoteProtocol.Command command =
+ RemoteProtocol.Command.parseFrom(cache.downloadBlob(action.getCommandDigest()));
+ cache.downloadTree(action.getInputRootDigest(), execRoot);
+
+ List<Path> outputs = new ArrayList<>(action.getOutputPathList().size());
+ for (String output : action.getOutputPathList()) {
+ Path file = execRoot.getRelative(output);
+ if (file.exists()) {
+ throw new FileAlreadyExistsException("Output file already exists: " + file);
+ }
+ FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
+ outputs.add(file);
+ }
+
+ // TODO(olaola): time out after specified server-side deadline.
+ Command cmd =
+ new Command(
+ command.getArgvList().toArray(new String[] {}),
+ getEnvironmentVariables(command),
+ new File(execRoot.getPathString()));
+ cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true);
+
+ // Execute throws a CommandException on non-zero return values, so action has succeeded.
+ ImmutableList<ContentDigest> outErrDigests =
+ cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
+ ActionResult.Builder result =
+ ActionResult.newBuilder()
+ .setReturnCode(0)
+ .setStdoutDigest(outErrDigests.get(0))
+ .setStderrDigest(outErrDigests.get(1));
+ cache.uploadAllResults(execRoot, outputs, result);
+ cache.setCachedActionResult(ContentDigests.computeActionKey(action), result.build());
+ return ExecuteReply.newBuilder()
+ .setResult(result)
+ .setStatus(ExecutionStatus.newBuilder().setExecuted(true).setSucceeded(true))
+ .build();
+ } catch (CommandException e) {
+ ImmutableList<ContentDigest> outErrDigests =
+ cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
+ final int returnCode =
+ e instanceof AbnormalTerminationException
+ ? ((AbnormalTerminationException) e).getResult().getTerminationStatus().getExitCode()
+ : -1;
+ return ExecuteReply.newBuilder()
+ .setResult(
+ ActionResult.newBuilder()
+ .setReturnCode(returnCode)
+ .setStdoutDigest(outErrDigests.get(0))
+ .setStderrDigest(outErrDigests.get(1)))
+ .setStatus(
+ ExecutionStatus.newBuilder()
+ .setExecuted(true)
+ .setSucceeded(false)
+ .setError(ExecutionStatus.ErrorCode.EXEC_FAILED)
+ .setErrorDetail(e.toString()))
+ .build();
+ } catch (CacheNotFoundException e) {
+ LOG.warning("Cache miss on " + ContentDigests.toString(e.getMissingDigest()));
+ return ExecuteReply.newBuilder()
+ .setCasError(
+ CasStatus.newBuilder()
+ .setSucceeded(false)
+ .addMissingDigest(e.getMissingDigest())
+ .setError(CasStatus.ErrorCode.MISSING_DIGEST)
+ .setErrorDetail(e.toString()))
+ .setStatus(
+ ExecutionStatus.newBuilder()
+ .setExecuted(false)
+ .setSucceeded(false)
+ .setError(
+ e.getMissingDigest() == action.getCommandDigest()
+ ? ExecutionStatus.ErrorCode.MISSING_COMMAND
+ : ExecutionStatus.ErrorCode.MISSING_INPUT)
+ .setErrorDetail(e.toString()))
+ .build();
+ }
+ }
+
@Override
- public void executeSynchronously(
- RemoteWorkRequest request, StreamObserver<RemoteWorkResponse> responseObserver) {
+ public void execute(ExecuteRequest request, StreamObserver<ExecuteReply> responseObserver) {
Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString());
try {
- FileSystemUtils.createDirectoryAndParents(tempRoot);
- final ConcurrentMapActionCache actionCache = new ConcurrentMapActionCache(tempRoot, cache);
- final MemcacheWorkExecutor workExecutor =
- MemcacheWorkExecutor.createLocalWorkExecutor(actionCache, tempRoot);
+ tempRoot.createDirectory();
if (LOG_FINER) {
LOG.fine(
"Work received has "
- + request.getInputFilesCount()
+ + request.getTotalInputFileCount()
+ " input files and "
- + request.getOutputFilesCount()
+ + request.getAction().getOutputPathCount()
+ " output files.");
}
- RemoteWorkResponse response = workExecutor.executeLocally(request);
- responseObserver.onNext(response);
+ ExecuteReply reply = execute(request.getAction(), tempRoot);
+ responseObserver.onNext(reply);
if (options.debug) {
- if (!response.getSuccess()) {
+ if (!reply.getStatus().getSucceeded()) {
LOG.warning("Work failed. Request: " + request.toString() + ".");
-
} else if (LOG_FINER) {
LOG.fine("Work completed.");
}
}
- if (!options.debug || response.getSuccess()) {
+ if (!options.debug) {
FileSystemUtils.deleteTree(tempRoot);
} else {
LOG.warning("Preserving work directory " + tempRoot.toString() + ".");
}
} catch (IOException | InterruptedException e) {
- RemoteWorkResponse.Builder response = RemoteWorkResponse.newBuilder();
- response.setSuccess(false).setOut("").setErr("").setException(e.toString());
- responseObserver.onNext(response.build());
+ ExecuteReply.Builder reply = ExecuteReply.newBuilder();
+ reply.getStatusBuilder().setSucceeded(false).setErrorDetail(e.toString());
+ responseObserver.onNext(reply.build());
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
@@ -120,10 +224,13 @@ public class RemoteWorker extends RemoteWorkImplBase {
}
System.out.println("*** Starting Hazelcast server.");
- ConcurrentMap<String, byte[]> cache = new HazelcastCacheFactory().create(remoteOptions);
+ ConcurrentMapActionCache cache =
+ new ConcurrentMapActionCache(ConcurrentMapFactory.createHazelcast(remoteOptions));
- System.out.println("*** Starting grpc server on all locally bound IPs on port "
- + remoteWorkerOptions.listenPort + ".");
+ System.out.println(
+ "*** Starting grpc server on all locally bound IPs on port "
+ + remoteWorkerOptions.listenPort
+ + ".");
Path workPath = getFileSystem().getPath(remoteWorkerOptions.workPath);
FileSystemUtils.createDirectoryAndParents(workPath);
RemoteWorker worker = new RemoteWorker(workPath, remoteOptions, remoteWorkerOptions, cache);