diff options
Diffstat (limited to 'src/main')
13 files changed, 539 insertions, 730 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java index 0e4e52c1d4..5820e0bf95 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java +++ b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java @@ -14,20 +14,25 @@ package com.google.devtools.build.lib.remote; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; + /** - * An exception to indicate the cache is not found because of an expected - * problem. + * An exception to indicate cache misses. + * TODO(olaola): have a class of checked RemoteCacheExceptions. */ -final class CacheNotFoundException extends RuntimeException { - CacheNotFoundException() { - super(); +public final class CacheNotFoundException extends Exception { + private final ContentDigest missingDigest; + + CacheNotFoundException(ContentDigest missingDigest) { + this.missingDigest = missingDigest; } - CacheNotFoundException(String message) { - super(message); + public ContentDigest getMissingDigest() { + return missingDigest; } - CacheNotFoundException(String message, Throwable cause) { - super(message, cause); + @Override + public String toString() { + return "Missing digest: " + ContentDigests.toString(missingDigest); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java index 7e27653e76..72c75700b5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java @@ -14,18 +14,24 @@ package com.google.devtools.build.lib.remote; -import com.google.common.hash.HashCode; -import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputFileCache; +import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.RemoteProtocol.CacheEntry; -import com.google.devtools.build.lib.remote.RemoteProtocol.FileEntry; +import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; +import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata; +import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode; +import com.google.devtools.build.lib.remote.RemoteProtocol.Output; +import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase; +import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.util.Preconditions; +import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; @@ -38,121 +44,156 @@ import java.util.concurrent.Semaphore; */ @ThreadSafe public final class ConcurrentMapActionCache implements RemoteActionCache { - private final Path execRoot; private final ConcurrentMap<String, byte[]> cache; private static final int MAX_MEMORY_KBYTES = 512 * 1024; private final Semaphore uploadMemoryAvailable = new Semaphore(MAX_MEMORY_KBYTES, true); - public ConcurrentMapActionCache(Path execRoot, ConcurrentMap<String, byte[]> cache) { - this.execRoot = execRoot; + public ConcurrentMapActionCache(ConcurrentMap<String, byte[]> cache) { this.cache = cache; } @Override - public String putFileIfNotExist(ActionInputFileCache cache, ActionInput file) + public void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root) throws IOException, InterruptedException { - String contentKey = HashCode.fromBytes(cache.getDigest(file)).toString(); - if (containsFile(contentKey)) { - return contentKey; + repository.computeMerkleDigests(root); + for (FileNode fileNode : repository.treeToFileNodes(root)) { + uploadBlob(fileNode.toByteArray()); + } + for (TreeNode leaf : repository.leaves(root)) { + uploadFileContents(execRoot.getRelative(leaf.getActionInput().getExecPathString())); } - putFile(contentKey, execRoot.getRelative(file.getExecPathString())); - return contentKey; } - private void putFile(String key, Path file) throws IOException, InterruptedException { - int fileSizeKBytes = (int) (file.getFileSize() / 1024); - Preconditions.checkArgument(fileSizeKBytes < MAX_MEMORY_KBYTES); - try { - uploadMemoryAvailable.acquire(fileSizeKBytes); - // TODO(alpha): I should put the file content as chunks to avoid reading the entire - // file into memory. - try (InputStream stream = file.getInputStream()) { - cache.put( - key, - CacheEntry.newBuilder() - .setFileContent(ByteString.readFrom(stream)) - .build() - .toByteArray()); - } - } finally { - uploadMemoryAvailable.release(fileSizeKBytes); + @Override + public void downloadTree(ContentDigest rootDigest, Path rootLocation) + throws IOException, CacheNotFoundException { + FileNode fileNode = FileNode.parseFrom(downloadBlob(rootDigest)); + if (fileNode.hasFileMetadata()) { + FileMetadata meta = fileNode.getFileMetadata(); + downloadFileContents(meta.getDigest(), rootLocation, meta.getExecutable()); + } + for (FileNode.Child child : fileNode.getChildList()) { + downloadTree(child.getDigest(), rootLocation.getRelative(child.getPath())); } } @Override - public void writeFile(String key, Path dest, boolean executable) + public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException { + // This unconditionally reads the whole file into memory first! + return uploadBlob(ByteString.readFrom(file.getInputStream()).toByteArray()); + } + + @Override + public void downloadAllResults(ActionResult result, Path execRoot) throws IOException, CacheNotFoundException { - byte[] data = cache.get(key); - if (data == null) { - throw new CacheNotFoundException("File content cannot be found with key: " + key); - } - try (OutputStream stream = dest.getOutputStream()) { - CacheEntry.parseFrom(data).getFileContent().writeTo(stream); - dest.setExecutable(executable); + for (Output output : result.getOutputList()) { + if (output.getContentCase() == ContentCase.FILE_METADATA) { + FileMetadata m = output.getFileMetadata(); + downloadFileContents( + m.getDigest(), execRoot.getRelative(output.getPath()), m.getExecutable()); + } else { + downloadTree(output.getDigest(), execRoot.getRelative(output.getPath())); + } } } - private boolean containsFile(String key) { - return cache.containsKey(key); + @Override + public void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result) + throws IOException, InterruptedException { + for (Path file : files) { + if (file.isDirectory()) { + // TODO(olaola): to implement this for a directory, will need to create or pass a + // TreeNodeRepository to call uploadTree. + throw new UnsupportedOperationException("Storing a directory is not yet supported."); + } + // First put the file content to cache. + ContentDigest digest = uploadFileContents(file); + // Add to protobuf. + result + .addOutputBuilder() + .setPath(file.relativeTo(execRoot).getPathString()) + .getFileMetadataBuilder() + .setDigest(digest) + .setExecutable(file.isExecutable()); + } } @Override - public void writeActionOutput(String key, Path execRoot) + public void downloadFileContents(ContentDigest digest, Path dest, boolean executable) throws IOException, CacheNotFoundException { - byte[] data = cache.get(key); - if (data == null) { - throw new CacheNotFoundException("Action output cannot be found with key: " + key); + // This unconditionally downloads the whole file into memory first! + byte[] contents = downloadBlob(digest); + FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory()); + try (OutputStream stream = dest.getOutputStream()) { + stream.write(contents); } - CacheEntry cacheEntry = CacheEntry.parseFrom(data); - for (FileEntry file : cacheEntry.getFilesList()) { - writeFile(file.getContentKey(), execRoot.getRelative(file.getPath()), file.getExecutable()); + dest.setExecutable(executable); + } + + @Override + public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) + throws InterruptedException { + ArrayList<ContentDigest> digests = new ArrayList<>(); + for (byte[] blob : blobs) { + digests.add(uploadBlob(blob)); } + return ImmutableList.copyOf(digests); } @Override - public void putActionOutput(String key, Collection<? extends ActionInput> outputs) - throws IOException, InterruptedException { - CacheEntry.Builder actionOutput = CacheEntry.newBuilder(); - for (ActionInput output : outputs) { - Path file = execRoot.getRelative(output.getExecPathString()); - addToActionOutput(file, output.getExecPathString(), actionOutput); + public ContentDigest uploadBlob(byte[] blob) throws InterruptedException { + int blobSizeKBytes = blob.length / 1024; + Preconditions.checkArgument(blobSizeKBytes < MAX_MEMORY_KBYTES); + ContentDigest digest = ContentDigests.computeDigest(blob); + uploadMemoryAvailable.acquire(blobSizeKBytes); + try { + cache.put(ContentDigests.toHexString(digest), blob); + } finally { + uploadMemoryAvailable.release(blobSizeKBytes); } - cache.put(key, actionOutput.build().toByteArray()); + return digest; } @Override - public void putActionOutput(String key, Path execRoot, Collection<Path> files) - throws IOException, InterruptedException { - CacheEntry.Builder actionOutput = CacheEntry.newBuilder(); - for (Path file : files) { - addToActionOutput(file, file.relativeTo(execRoot).getPathString(), actionOutput); + public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException { + if (digest.getSizeBytes() == 0) { + return new byte[0]; + } + // This unconditionally downloads the whole blob into memory! + Preconditions.checkArgument((int) (digest.getSizeBytes() / 1024) < MAX_MEMORY_KBYTES); + byte[] data = cache.get(ContentDigests.toHexString(digest)); + if (data == null) { + throw new CacheNotFoundException(digest); } - cache.put(key, actionOutput.build().toByteArray()); + return data; } - /** Add the file to action output cache entry. Put the file to cache if necessary. */ - private void addToActionOutput(Path file, String execPathString, CacheEntry.Builder actionOutput) - throws IOException, InterruptedException { - if (file.isDirectory()) { - // TODO(alpha): Implement this for directory. - throw new UnsupportedOperationException("Storing a directory is not yet supported."); + @Override + public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests) + throws CacheNotFoundException { + ArrayList<byte[]> blobs = new ArrayList<>(); + for (ContentDigest c : digests) { + blobs.add(downloadBlob(c)); } - // First put the file content to cache. - String contentKey = putFileIfNotExist(file); - // Add to protobuf. - actionOutput - .addFilesBuilder() - .setPath(execPathString) - .setContentKey(contentKey) - .setExecutable(file.isExecutable()); + return ImmutableList.copyOf(blobs); } - private String putFileIfNotExist(Path file) throws IOException, InterruptedException { - String contentKey = HashCode.fromBytes(file.getMD5Digest()).toString(); - if (containsFile(contentKey)) { - return contentKey; + @Override + public ActionResult getCachedActionResult(ActionKey actionKey) { + byte[] data = cache.get(ContentDigests.toHexString(actionKey.getDigest())); + if (data == null) { + return null; } - putFile(contentKey, file); - return contentKey; + try { + return ActionResult.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + return null; + } + } + + @Override + public void setCachedActionResult(ActionKey actionKey, ActionResult result) + throws InterruptedException { + cache.put(ContentDigests.toHexString(actionKey.getDigest()), result.toByteArray()); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RestUrlCacheFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java index aa98b253e7..3461673011 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RestUrlCacheFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java @@ -14,6 +14,13 @@ package com.google.devtools.build.lib.remote; +import com.hazelcast.client.HazelcastClient; +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.client.config.ClientNetworkConfig; +import com.hazelcast.client.config.XmlClientConfigBuilder; +import com.hazelcast.config.Config; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Collection; @@ -32,13 +39,45 @@ import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.util.EntityUtils; /** - * A factory class for providing a {@link ConcurrentMap} object implemented by a REST service. The - * URL has to support PUT, GET, and HEAD operations + * A factory class for providing a {@link ConcurrentMap} objects to be used with + * {@link ConcurrentMapActionCache} objects. The underlying maps can be Hazelcast or RestUrl based. */ -public final class RestUrlCacheFactory { +public final class ConcurrentMapFactory { - public static ConcurrentMap<String, byte[]> create(RemoteOptions options) { - return new RestUrlCache(options.restCacheUrl); + private static final String HAZELCAST_CACHE_NAME = "hazelcast-build-cache"; + + private ConcurrentMapFactory() {} + + public static ConcurrentMap<String, byte[]> createHazelcast(RemoteOptions options) { + HazelcastInstance instance; + if (options.hazelcastClientConfig != null) { + try { + ClientConfig config = new XmlClientConfigBuilder(options.hazelcastClientConfig).build(); + instance = HazelcastClient.newHazelcastClient(config); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (options.hazelcastNode != null) { + // If --hazelcast_node is specified then create a client instance. + ClientConfig config = new ClientConfig(); + ClientNetworkConfig net = config.getNetworkConfig(); + net.addAddress(options.hazelcastNode.split(",")); + instance = HazelcastClient.newHazelcastClient(config); + } else if (options.hazelcastStandaloneListenPort != 0) { + Config config = new Config(); + config + .getNetworkConfig() + .setPort(options.hazelcastStandaloneListenPort) + .getJoin() + .getMulticastConfig() + .setEnabled(false); + instance = Hazelcast.newHazelcastInstance(config); + } else { + // Otherwise create a default instance. This is going to look at + // -Dhazelcast.config=some-hazelcast.xml for configuration. + instance = Hazelcast.newHazelcastInstance(); + } + return instance.getMap(HAZELCAST_CACHE_NAME); } private static class RestUrlCache implements ConcurrentMap<String, byte[]> { @@ -172,4 +211,32 @@ public final class RestUrlCacheFactory { throw new UnsupportedOperationException(); } } + + public static ConcurrentMap<String, byte[]> createRestUrl(RemoteOptions options) { + return new RestUrlCache(options.restCacheUrl); + } + + public static ConcurrentMap<String, byte[]> create(RemoteOptions options) { + if (isHazelcastOptions(options)) { + return createHazelcast(options); + } + if (isRestUrlOptions(options)) { + return createRestUrl(options); + } + throw new IllegalArgumentException( + "Unrecognized concurrent map RemoteOptions: must specify " + + "either Hazelcast or Rest URL options."); + } + + public static boolean isRemoteCacheOptions(RemoteOptions options) { + return isHazelcastOptions(options) || isRestUrlOptions(options); + } + + private static boolean isHazelcastOptions(RemoteOptions options) { + return options.hazelcastNode != null || options.hazelcastClientConfig != null; + } + + private static boolean isRestUrlOptions(RemoteOptions options) { + return options.restCacheUrl != null; + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/HazelcastCacheFactory.java b/src/main/java/com/google/devtools/build/lib/remote/HazelcastCacheFactory.java deleted file mode 100644 index b135f67b6b..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/HazelcastCacheFactory.java +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.remote; - -import com.hazelcast.client.HazelcastClient; -import com.hazelcast.client.config.ClientConfig; -import com.hazelcast.client.config.ClientNetworkConfig; -import com.hazelcast.client.config.XmlClientConfigBuilder; -import com.hazelcast.config.Config; -import com.hazelcast.core.Hazelcast; -import com.hazelcast.core.HazelcastInstance; - -import java.io.IOException; -import java.util.concurrent.ConcurrentMap; - -/** - * A factory class for providing a {@link ConcurrentMap} object implemented by Hazelcast. - * Hazelcast will work as a distributed memory cache. - */ -public final class HazelcastCacheFactory { - - private static final String CACHE_NAME = "hazelcast-build-cache"; - - public static ConcurrentMap<String, byte[]> create(RemoteOptions options) { - HazelcastInstance instance; - if (options.hazelcastClientConfig != null) { - try { - ClientConfig config = new XmlClientConfigBuilder(options.hazelcastClientConfig).build(); - instance = HazelcastClient.newHazelcastClient(config); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else if (options.hazelcastNode != null) { - // If --hazelcast_node is then create a client instance. - ClientConfig config = new ClientConfig(); - ClientNetworkConfig net = config.getNetworkConfig(); - net.addAddress(options.hazelcastNode.split(",")); - instance = HazelcastClient.newHazelcastClient(config); - } else if (options.hazelcastStandaloneListenPort != 0) { - Config config = new Config(); - config - .getNetworkConfig() - .setPort(options.hazelcastStandaloneListenPort) - .getJoin() - .getMulticastConfig() - .setEnabled(false); - instance = Hazelcast.newHazelcastInstance(config); - } else { - // Otherwise create a default instance. This is going to look at - // -Dhazelcast.config=some-hazelcast.xml for configuration. - instance = Hazelcast.newHazelcastInstance(); - } - return instance.getMap(CACHE_NAME); - } -} diff --git a/src/main/java/com/google/devtools/build/lib/remote/MemcacheWorkExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/MemcacheWorkExecutor.java deleted file mode 100644 index 20cbc6058d..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/MemcacheWorkExecutor.java +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.remote; - -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputFileCache; -import com.google.devtools.build.lib.actions.Artifact; -import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.RemoteProtocol.FileEntry; -import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkResponse; -import com.google.devtools.build.lib.remote.RemoteWorkGrpc.RemoteWorkFutureStub; -import com.google.devtools.build.lib.shell.Command; -import com.google.devtools.build.lib.shell.CommandException; -import com.google.devtools.build.lib.shell.CommandResult; -import com.google.devtools.build.lib.vfs.FileSystemUtils; -import com.google.devtools.build.lib.vfs.Path; -import io.grpc.ManagedChannel; -import io.grpc.netty.NettyChannelBuilder; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.nio.file.FileAlreadyExistsException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -/** - * Implementation of {@link RemoteWorkExecutor} that uses ConcurrentMapActionCache and gRPC for - * communicating the work, inputs and outputs. - */ -@ThreadSafe -public class MemcacheWorkExecutor implements RemoteWorkExecutor { - /** - * A cache used to store the input and output files as well as the build status of the remote - * work. - */ - protected final ConcurrentMapActionCache cache; - - /** Execution root for running this work locally. */ - private final Path execRoot; - - /** Channel over which to send work to run remotely. */ - private final ManagedChannel channel; - - private static final int MAX_WORK_SIZE_BYTES = 1024 * 1024 * 512; - - /** - * This constructor is used when this class is used in a client. It requires a host address and - * port to connect to a remote service. - */ - private MemcacheWorkExecutor(ConcurrentMapActionCache cache, String host, int port) { - this.cache = cache; - this.execRoot = null; - this.channel = NettyChannelBuilder.forAddress(host, port).usePlaintext(true).build(); - } - - /** - * This constructor is used when this class is used in the remote worker. A path to the execution - * root is needed for executing work locally. - */ - private MemcacheWorkExecutor(ConcurrentMapActionCache cache, Path execRoot) { - this.cache = cache; - this.execRoot = execRoot; - this.channel = null; - } - - /** - * Create an instance of MemcacheWorkExecutor that talks to a remote server. - * - * @param cache An instance of ConcurrentMapActionCache. - * @param host Hostname of the server to connect to. - * @param port Port of the server to connect to. - * @return An instance of MemcacheWorkExecutor that talks to a remote server. - */ - public static MemcacheWorkExecutor createRemoteWorkExecutor( - ConcurrentMapActionCache cache, String host, int port) { - return new MemcacheWorkExecutor(cache, host, port); - } - - /** - * Create an instance of MemcacheWorkExecutor that runs locally. - * - * @param cache An instance of ConcurrentMapActionCache. - * @param execRoot Path of the execution root where work is executed. - * @return An instance of MemcacheWorkExecutor tthat runs locally in the execution root. - */ - public static MemcacheWorkExecutor createLocalWorkExecutor( - ConcurrentMapActionCache cache, Path execRoot) { - return new MemcacheWorkExecutor(cache, execRoot); - } - - @Override - public ListenableFuture<RemoteWorkResponse> executeRemotely( - Path execRoot, - ActionInputFileCache actionCache, - String actionOutputKey, - Collection<String> arguments, - Collection<ActionInput> inputs, - ImmutableMap<String, String> environment, - Collection<? extends ActionInput> outputs, - int timeout) - throws IOException, WorkTooLargeException, InterruptedException { - RemoteWorkRequest.Builder work = RemoteWorkRequest.newBuilder(); - work.setOutputKey(actionOutputKey); - - long workSize = 0; - for (ActionInput input : inputs) { - if (!(input instanceof Artifact)) { - continue; - } - if (!actionCache.isFile((Artifact) input)) { - continue; - } - workSize += actionCache.getSizeInBytes(input); - } - - if (workSize > MAX_WORK_SIZE_BYTES) { - throw new WorkTooLargeException("Work is too large: " + workSize + " bytes."); - } - - // Save all input files to cache. - for (ActionInput input : inputs) { - Path file = execRoot.getRelative(input.getExecPathString()); - - if (file.isDirectory()) { - // TODO(alpha): Handle this case better. - throw new UnsupportedOperationException( - "Does not support directory artifacts: " + file + "."); - } - - String contentKey = cache.putFileIfNotExist(actionCache, input); - work.addInputFilesBuilder() - .setPath(input.getExecPathString()) - .setContentKey(contentKey) - .setExecutable(file.isExecutable()); - } - - work.addAllArguments(arguments); - work.putAllEnvironment(environment); - for (ActionInput output : outputs) { - work.addOutputFilesBuilder().setPath(output.getExecPathString()); - } - - RemoteWorkFutureStub stub = RemoteWorkGrpc.newFutureStub(channel); - work.setTimeout(timeout); - return stub.executeSynchronously(work.build()); - } - - /** Execute a work item locally. */ - public RemoteWorkResponse executeLocally(RemoteWorkRequest work) - throws IOException, InterruptedException { - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - try { - // Prepare directories and input files. - for (FileEntry input : work.getInputFilesList()) { - Path file = execRoot.getRelative(input.getPath()); - FileSystemUtils.createDirectoryAndParents(file.getParentDirectory()); - cache.writeFile(input.getContentKey(), file, input.getExecutable()); - } - - List<Path> outputs = new ArrayList<>(work.getOutputFilesList().size()); - for (FileEntry output : work.getOutputFilesList()) { - Path file = execRoot.getRelative(output.getPath()); - if (file.exists()) { - throw new FileAlreadyExistsException("Output file already exists: " + file); - } - FileSystemUtils.createDirectoryAndParents(file.getParentDirectory()); - outputs.add(file); - } - - Command cmd = - new Command( - work.getArgumentsList().toArray(new String[] {}), - work.getEnvironment(), - new File(execRoot.getPathString())); - CommandResult result = - cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true); - cache.putActionOutput(work.getOutputKey(), execRoot, outputs); - return RemoteWorkResponse.newBuilder() - .setSuccess(result.getTerminationStatus().success()) - .setOut(stdout.toString()) - .setErr(stderr.toString()) - .build(); - } catch (CommandException e) { - return RemoteWorkResponse.newBuilder() - .setSuccess(false) - .setOut(stdout.toString()) - .setErr(stderr.toString()) - .setException(e.toString()) - .build(); - } - } -} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java index e865c96e2c..62790e8a76 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java @@ -14,47 +14,87 @@ package com.google.devtools.build.lib.remote; -import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputFileCache; +import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible; +import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; +import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.vfs.Path; import java.io.IOException; import java.util.Collection; +import javax.annotation.Nullable; -/** - * A cache for storing artifacts (input and output) as well as the output of running an action. - */ +/** A cache for storing artifacts (input and output) as well as the output of running an action. */ @ThreadCompatible interface RemoteActionCache { + // CAS API + + // TODO(olaola): create a unified set of exceptions raised by the cache to encapsulate the + // underlying CasStatus messages and gRPC errors errors. + /** - * Put the file in cache if it is not already in it. No-op if the file is already stored in cache. - * - * @return The key for fetching the file from cache. + * Upload enough of the tree metadata and data into remote cache so that the entire tree can be + * reassembled remotely using the root digest. */ - String putFileIfNotExist(ActionInputFileCache cache, ActionInput file) + void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root) throws IOException, InterruptedException; /** - * Write the file in cache identified by key to the file system. The key must uniquely identify - * the content of the file. Throws CacheNotFoundException if the file is not found in cache. + * Download the entire tree data rooted by the given digest and write it into the given location. */ - void writeFile(String key, Path dest, boolean executable) + void downloadTree(ContentDigest rootDigest, Path rootLocation) throws IOException, CacheNotFoundException; /** - * Write the action output files identified by the key to the file system. The key must uniquely - * identify the action and the content of action inputs. - * - * @throws CacheNotFoundException if action output is not found in cache. + * Download all results of a remotely executed action locally. TODO(olaola): will need to amend to + * include the {@link com.google.devtools.build.lib.remote.TreeNodeRepository} for updating. */ - void writeActionOutput(String key, Path execRoot) + void downloadAllResults(ActionResult result, Path execRoot) throws IOException, CacheNotFoundException; - /** Update the cache with the action outputs for the specified key. */ - void putActionOutput(String key, Collection<? extends ActionInput> outputs) + /** + * Upload all results of a locally executed action to the cache. Add the files to the ActionResult + * builder. + */ + void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result) throws IOException, InterruptedException; - /** Update the cache with the files for the specified key. */ - void putActionOutput(String key, Path execRoot, Collection<Path> files) - throws IOException, InterruptedException; + /** + * Put the file contents cache if it is not already in it. No-op if the file is already stored in + * cache. The given path must be a full absolute path. Note: this is horribly inefficient, need to + * patch through an overload that uses an ActionInputFile cache to compute the digests! + * + * @return The key for fetching the file contents blob from cache. + */ + ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException; + + /** + * Download a blob keyed by the given digest and write it to the specified path. Set the + * executable parameter to the specified value. + */ + void downloadFileContents(ContentDigest digest, Path dest, boolean executable) + throws IOException, CacheNotFoundException; + + /** Upload the given blobs to the cache, and return their digests. */ + ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) throws InterruptedException; + + /** Upload the given blob to the cache, and return its digests. */ + ContentDigest uploadBlob(byte[] blob) throws InterruptedException; + + /** Download and return a blob with a given digest from the cache. */ + byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException; + + /** Download and return blobs with given digests from the cache. */ + ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests) + throws CacheNotFoundException; + + // Execution Cache API + + /** Returns a cached result for a given Action digest, or null if not found in cache. */ + @Nullable + ActionResult getCachedActionResult(ActionKey actionKey); + + /** Sets the given result as result of the given Action. */ + void setCachedActionResult(ActionKey actionKey, ActionResult result) throws InterruptedException; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 16ce15ca65..93f88ad244 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -17,6 +17,7 @@ package com.google.devtools.build.lib.remote; import com.google.common.collect.ImmutableList; import com.google.common.eventbus.Subscribe; import com.google.devtools.build.lib.actions.ActionContextProvider; +import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; import com.google.devtools.build.lib.buildtool.BuildRequest; import com.google.devtools.build.lib.buildtool.buildevent.BuildStartingEvent; import com.google.devtools.build.lib.events.Event; @@ -24,12 +25,8 @@ import com.google.devtools.build.lib.runtime.BlazeModule; import com.google.devtools.build.lib.runtime.Command; import com.google.devtools.build.lib.runtime.CommandEnvironment; import com.google.devtools.common.options.OptionsBase; -import java.net.URI; -import java.net.URISyntaxException; -/** - * RemoteModule provides distributed cache and remote execution for Bazel. - */ +/** RemoteModule provides distributed cache and remote execution for Bazel. */ public final class RemoteModule extends BlazeModule { private CommandEnvironment env; private BuildRequest buildRequest; @@ -61,38 +58,21 @@ public final class RemoteModule extends BlazeModule { buildRequest = event.getRequest(); RemoteOptions options = buildRequest.getOptions(RemoteOptions.class); - ConcurrentMapActionCache cache = null; + try { + // Reinitialize the remote cache and worker from options every time, because the options + // may change from build to build. - // Don't provide the remote spawn unless at least action cache is initialized. - if (actionCache == null) { - if (options.hazelcastNode != null || options.hazelcastClientConfig != null) { - cache = - new ConcurrentMapActionCache( - this.env.getDirectories().getExecRoot(), - HazelcastCacheFactory.create(options)); - } else if (options.restCacheUrl != null) { - cache = - new ConcurrentMapActionCache( - this.env.getDirectories().getExecRoot(), - RestUrlCacheFactory.create(options)); + // Don't provide the remote spawn unless at least action cache is initialized. + if (ConcurrentMapFactory.isRemoteCacheOptions(options)) { + actionCache = new ConcurrentMapActionCache(ConcurrentMapFactory.create(options)); } - actionCache = cache; - } + // Otherwise actionCache remains null and remote caching/execution are disabled. - if (cache != null) { - if (workExecutor == null && options.remoteWorker != null) { - try { - URI uri = new URI("dummy://" + options.remoteWorker); - if (uri.getHost() == null || uri.getPort() == -1) { - throw new URISyntaxException("Invalid host or port.", ""); - } - workExecutor = - MemcacheWorkExecutor.createRemoteWorkExecutor(cache, uri.getHost(), uri.getPort()); - } catch (URISyntaxException e) { - env.getReporter() - .handle(Event.warn("Invalid argument for the address of remote worker.")); - } + if (actionCache != null && RemoteWorkExecutor.isRemoteExecutionOptions(options)) { + workExecutor = new RemoteWorkExecutor(options); } + } catch (InvalidConfigurationException e) { + env.getReporter().handle(Event.warn(e.toString())); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java index 78747c8090..9ed8ff2f8d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java @@ -17,9 +17,7 @@ package com.google.devtools.build.lib.remote; import com.google.devtools.common.options.Option; import com.google.devtools.common.options.OptionsBase; -/** - * Options for remote execution and distributed caching. - */ +/** Options for remote execution and distributed caching. */ public final class RemoteOptions extends OptionsBase { @Option( name = "rest_cache_url", @@ -66,4 +64,12 @@ public final class RemoteOptions extends OptionsBase { + "For client mode only." ) public String remoteWorker; + + @Option( + name = "grpc_timeout_seconds", + defaultValue = "60", + category = "remote", + help = "The maximal number of seconds to wait for remote calls. For client mode only." + ) + public int grpcTimeoutSeconds; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java index 2244ca73cc..d2b30cef97 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java @@ -14,41 +14,44 @@ package com.google.devtools.build.lib.remote; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; -import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.actions.ActionExecutionContext; -import com.google.devtools.build.lib.actions.ActionExecutionMetadata; import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.actions.ActionInputHelper; import com.google.devtools.build.lib.actions.ExecException; import com.google.devtools.build.lib.actions.ExecutionStrategy; -import com.google.devtools.build.lib.actions.Executor; import com.google.devtools.build.lib.actions.Spawn; import com.google.devtools.build.lib.actions.SpawnActionContext; import com.google.devtools.build.lib.actions.Spawns; import com.google.devtools.build.lib.actions.UserExecException; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.EventHandler; -import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkResponse; +import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; +import com.google.devtools.build.lib.remote.RemoteProtocol.Action; +import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; +import com.google.devtools.build.lib.remote.RemoteProtocol.Command; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; +import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.standalone.StandaloneSpawnStrategy; -import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.Path; +import io.grpc.StatusRuntimeException; import java.io.IOException; -import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.TreeSet; /** - * Strategy that uses a distributed cache for sharing action input and output files. - * Optionally this strategy also support offloading the work to a remote worker. + * Strategy that uses a distributed cache for sharing action input and output files. Optionally this + * strategy also support offloading the work to a remote worker. */ @ExecutionStrategy( name = {"remote"}, @@ -74,179 +77,170 @@ final class RemoteSpawnStrategy implements SpawnActionContext { this.remoteWorkExecutor = workExecutor; } - /** Executes the given {@code spawn}. */ - @Override - public void exec(Spawn spawn, ActionExecutionContext actionExecutionContext) - throws ExecException, InterruptedException { - if (!spawn.isRemotable()) { - standaloneStrategy.exec(spawn, actionExecutionContext); - return; + private Action buildAction( + Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) { + Action.Builder action = Action.newBuilder(); + action.setCommandDigest(command); + action.setInputRootDigest(inputRoot); + // Somewhat ugly: we rely on the stable order of outputs here for remote action caching. + for (ActionInput output : outputs) { + action.addOutputPath(output.getExecPathString()); } + // TODO(olaola): Need to set platform as well! + return action.build(); + } - Executor executor = actionExecutionContext.getExecutor(); - ActionExecutionMetadata actionMetadata = spawn.getResourceOwner(); - ActionInputFileCache inputFileCache = actionExecutionContext.getActionInputFileCache(); - EventHandler eventHandler = executor.getEventHandler(); - - if (remoteActionCache == null) { - eventHandler.handle( - Event.warn( - spawn.getMnemonic() + " Cannot instantiate remote action cache. Running locally.")); - standaloneStrategy.exec(spawn, actionExecutionContext); - return; + private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) { + Command.Builder command = Command.newBuilder(); + command.addAllArgv(arguments); + // Sorting the environment pairs by variable name. + TreeSet<String> variables = new TreeSet<>(environment.keySet()); + for (String var : variables) { + command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var)); } + return command.build(); + } - // Compute a hash code to uniquely identify the action plus the action inputs. - Hasher hasher = Hashing.sha256().newHasher(); - - // TODO(alpha): The action key is usually computed using the path to the tool and the - // arguments. It does not take into account the content / version of the system tool (e.g. gcc). - // Either I put information about the system tools in the hash or assume tools are always - // checked in. - Preconditions.checkNotNull(actionMetadata.getKey()); - hasher.putString(actionMetadata.getKey(), Charset.defaultCharset()); - - List<ActionInput> inputs = - ActionInputHelper.expandArtifacts( - spawn.getInputFiles(), actionExecutionContext.getArtifactExpander()); - for (ActionInput input : inputs) { - hasher.putString(input.getExecPathString(), Charset.defaultCharset()); + /** + * Fallback: execute the spawn locally. If an ActionKey is provided, try to upload results to + * remote action cache. + */ + private void execLocally( + Spawn spawn, ActionExecutionContext actionExecutionContext, ActionKey actionKey) + throws ExecException, InterruptedException { + standaloneStrategy.exec(spawn, actionExecutionContext); + if (remoteActionCache != null && actionKey != null) { + ArrayList<Path> outputFiles = new ArrayList<>(); + for (ActionInput output : spawn.getOutputFiles()) { + outputFiles.add(execRoot.getRelative(output.getExecPathString())); + } try { - // TODO(alpha): The digest from ActionInputFileCache is used to detect local file - // changes. It might not be sufficient to identify the input file globally in the - // remote action cache. Consider upgrading this to a better hash algorithm with - // less collision. - hasher.putBytes(inputFileCache.getDigest(input)); + ActionResult.Builder result = ActionResult.newBuilder(); + remoteActionCache.uploadAllResults(execRoot, outputFiles, result); + remoteActionCache.setCachedActionResult(actionKey, result.build()); + // Handle all cache errors here. } catch (IOException e) { - throw new UserExecException("Failed to get digest for input.", e); + throw new UserExecException("Unexpected IO error.", e); + } catch (UnsupportedOperationException e) { + actionExecutionContext + .getExecutor() + .getEventHandler() + .handle( + Event.warn( + spawn.getMnemonic() + " unsupported operation for action cache (" + e + ")")); } } + } - // Save the action output if found in the remote action cache. - String actionOutputKey = hasher.hash().toString(); + private void passRemoteOutErr(ActionResult result, FileOutErr outErr) { + if (remoteActionCache == null) { + return; + } + try { + ImmutableList<byte[]> streams = + remoteActionCache.downloadBlobs( + ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest())); + outErr.printOut(new String(streams.get(0), UTF_8)); + outErr.printErr(new String(streams.get(1), UTF_8)); + } catch (CacheNotFoundException e) { + // Ignoring. + } + } - // Timeout for running the remote spawn. - final int timeoutSeconds = Spawns.getTimeoutSeconds(spawn, 120); + /** Executes the given {@code spawn}. */ + @Override + public void exec(Spawn spawn, ActionExecutionContext actionExecutionContext) + throws ExecException, InterruptedException { + if (!spawn.isRemotable() || remoteActionCache == null) { + standaloneStrategy.exec(spawn, actionExecutionContext); + return; + } + + ActionKey actionKey = null; + String mnemonic = spawn.getMnemonic(); + EventHandler eventHandler = actionExecutionContext.getExecutor().getEventHandler(); try { - // Look up action cache using |actionOutputKey|. Reuse the action output if it is found. - if (writeActionOutput(spawn.getMnemonic(), actionOutputKey, eventHandler, true)) { - return; + // Temporary hack: the TreeNodeRepository should be created and maintained upstream! + TreeNodeRepository repository = new TreeNodeRepository(execRoot); + List<ActionInput> inputs = + ActionInputHelper.expandArtifacts( + spawn.getInputFiles(), actionExecutionContext.getArtifactExpander()); + TreeNode inputRoot = repository.buildFromActionInputs(inputs); + repository.computeMerkleDigests(inputRoot); + Command command = buildCommand(spawn.getArguments(), spawn.getEnvironment()); + Action action = + buildAction( + spawn.getOutputFiles(), + ContentDigests.computeDigest(command), + repository.getMerkleDigest(inputRoot)); + + // Look up action cache, and reuse the action output if it is found. + actionKey = ContentDigests.computeActionKey(action); + ActionResult result = remoteActionCache.getCachedActionResult(actionKey); + boolean acceptCached = true; + if (result != null) { + // We don't cache failed actions, so we know the outputs exist. + // For now, download all outputs locally; in the future, we can reuse the digests to + // just update the TreeNodeRepository and continue the build. + try { + remoteActionCache.downloadAllResults(result, execRoot); + return; + } catch (CacheNotFoundException e) { + acceptCached = false; // Retry the action remotely and invalidate the results. + } } - FileOutErr outErr = actionExecutionContext.getFileOutErr(); - if (executeWorkRemotely( - inputFileCache, - spawn.getMnemonic(), - actionOutputKey, - spawn.getArguments(), - inputs, - spawn.getEnvironment(), - spawn.getOutputFiles(), - timeoutSeconds, - eventHandler, - outErr)) { + if (remoteWorkExecutor == null) { + execLocally(spawn, actionExecutionContext, actionKey); return; } - // If nothing works then run spawn locally. - standaloneStrategy.exec(spawn, actionExecutionContext); - if (remoteActionCache != null) { - remoteActionCache.putActionOutput(actionOutputKey, spawn.getOutputFiles()); + // Upload the command and all the inputs into the remote cache. + remoteActionCache.uploadBlob(command.toByteArray()); + // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests! + remoteActionCache.uploadTree(repository, execRoot, inputRoot); + // TODO(olaola): set BuildInfo and input total bytes as well. + ExecuteRequest.Builder request = + ExecuteRequest.newBuilder() + .setAction(action) + .setAcceptCached(acceptCached) + .setTotalInputFileCount(inputs.size()) + .setTimeoutMillis(1000 * Spawns.getTimeoutSeconds(spawn, 120)); + // TODO(olaola): set sensible local and remote timouts. + ExecuteReply reply = remoteWorkExecutor.executeRemotely(request.build()); + ExecutionStatus status = reply.getStatus(); + result = reply.getResult(); + // We do not want to pass on the remote stdout and strerr if we are going to retry the + // action. + if (status.getSucceeded()) { + passRemoteOutErr(result, actionExecutionContext.getFileOutErr()); + remoteActionCache.downloadAllResults(result, execRoot); + return; } + if (status.getError() == ExecutionStatus.ErrorCode.EXEC_FAILED) { + passRemoteOutErr(result, actionExecutionContext.getFileOutErr()); + throw new UserExecException(status.getErrorDetail()); + } + // For now, we retry locally on all other remote errors. + // TODO(olaola): add remote retries on cache miss errors. + execLocally(spawn, actionExecutionContext, actionKey); } catch (IOException e) { throw new UserExecException("Unexpected IO error.", e); - } catch (UnsupportedOperationException e) { - eventHandler.handle( - Event.warn(spawn.getMnemonic() + " unsupported operation for action cache (" + e + ")")); - } - } - - /** - * Submit work to execute remotely. - * - * @return True in case the action succeeded and all expected action outputs are found. - */ - private boolean executeWorkRemotely( - ActionInputFileCache actionCache, - String mnemonic, - String actionOutputKey, - List<String> arguments, - List<ActionInput> inputs, - ImmutableMap<String, String> environment, - Collection<? extends ActionInput> outputs, - int timeout, - EventHandler eventHandler, - FileOutErr outErr) - throws IOException, InterruptedException { - if (remoteWorkExecutor == null) { - return false; - } - try { - ListenableFuture<RemoteWorkResponse> future = - remoteWorkExecutor.executeRemotely( - execRoot, - actionCache, - actionOutputKey, - arguments, - inputs, - environment, - outputs, - timeout); - RemoteWorkResponse response = future.get(timeout, TimeUnit.SECONDS); - if (!response.getSuccess()) { - String exception = ""; - if (!response.getException().isEmpty()) { - exception = " (" + response.getException() + ")"; - } - eventHandler.handle( - Event.warn( - mnemonic + " failed to execute work remotely" + exception + ", running locally")); - return false; - } - outErr.printOut(response.getOut()); - outErr.printErr(response.getErr()); - } catch (ExecutionException e) { - eventHandler.handle( - Event.warn(mnemonic + " failed to execute work remotely (" + e + "), running locally")); - return false; - } catch (TimeoutException e) { - eventHandler.handle( - Event.warn(mnemonic + " timed out executing work remotely (" + e + "), running locally")); - return false; } catch (InterruptedException e) { eventHandler.handle(Event.warn(mnemonic + " remote work interrupted (" + e + ")")); + Thread.currentThread().interrupt(); throw e; - } catch (WorkTooLargeException e) { - eventHandler.handle(Event.warn(mnemonic + " cannot be run remotely (" + e + ")")); - return false; - } - return writeActionOutput(mnemonic, actionOutputKey, eventHandler, false); - } - - /** - * Saves the action output from cache. Returns true if all action outputs are found. - */ - private boolean writeActionOutput( - String mnemonic, - String actionOutputKey, - EventHandler eventHandler, - boolean ignoreCacheNotFound) - throws IOException { - if (remoteActionCache == null) { - return false; - } - try { - remoteActionCache.writeActionOutput(actionOutputKey, execRoot); - Event.info(mnemonic + " reuse action outputs from cache"); - return true; + } catch (StatusRuntimeException e) { + eventHandler.handle(Event.warn(mnemonic + " remote work failed (" + e + ")")); + execLocally(spawn, actionExecutionContext, actionKey); } catch (CacheNotFoundException e) { - if (!ignoreCacheNotFound) { - eventHandler.handle( - Event.warn(mnemonic + " some cache entries cannot be found (" + e + ")")); - } + eventHandler.handle(Event.warn(mnemonic + " remote work results cache miss (" + e + ")")); + execLocally(spawn, actionExecutionContext, actionKey); + } catch (UnsupportedOperationException e) { + eventHandler.handle( + Event.warn(mnemonic + " unsupported operation for action cache (" + e + ")")); } - return false; } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java new file mode 100644 index 0000000000..9747e3ef11 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java @@ -0,0 +1,42 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote; + +import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; +import io.grpc.ManagedChannel; +import io.grpc.netty.NettyChannelBuilder; +import java.net.URI; +import java.net.URISyntaxException; + +/** Helper methods for gRPC calls */ +@ThreadSafe +public final class RemoteUtils { + public static ManagedChannel createChannel(String hostAndPort) + throws InvalidConfigurationException { + try { + URI uri = new URI("dummy://" + hostAndPort); + if (uri.getHost() == null || uri.getPort() == -1) { + throw new URISyntaxException("Invalid host or port.", ""); + } + return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort()) + .usePlaintext(true) + .build(); + } catch (URISyntaxException e) { + throw new InvalidConfigurationException( + "Invalid argument for the address of remote cache server: " + hostAndPort); + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java index 198e9262a7..9cb080b402 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java @@ -14,35 +14,53 @@ package com.google.devtools.build.lib.remote; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputFileCache; -import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible; -import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkResponse; -import com.google.devtools.build.lib.vfs.Path; -import java.io.IOException; -import java.util.Collection; +import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; +import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; +import io.grpc.ManagedChannel; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; -/** - * Interface for exeucting work remotely. - */ -@ThreadCompatible -public interface RemoteWorkExecutor { - /** - * Submit the work to this work executor. The output of running this action should be written to - * {@link RemoteActionCache} indexed by {@code actionOutputKey}. - * - * <p>Returns a future for the response of this work request. - */ - ListenableFuture<RemoteWorkResponse> executeRemotely( - Path execRoot, - ActionInputFileCache cache, - String actionOutputKey, - Collection<String> arguments, - Collection<ActionInput> inputs, - ImmutableMap<String, String> environment, - Collection<? extends ActionInput> outputs, - int timeout) - throws IOException, WorkTooLargeException, InterruptedException; +/** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */ +@ThreadSafe +public class RemoteWorkExecutor { + /** Channel over which to send work to run remotely. */ + private final ManagedChannel channel; + private final int grpcTimeoutSeconds; + + public RemoteWorkExecutor(RemoteOptions options) throws InvalidConfigurationException { + channel = RemoteUtils.createChannel(options.remoteWorker); + grpcTimeoutSeconds = options.grpcTimeoutSeconds; + } + + public static boolean isRemoteExecutionOptions(RemoteOptions options) { + return options.remoteWorker != null; + } + + public ExecuteReply executeRemotely(ExecuteRequest request) { + ExecuteServiceBlockingStub stub = + ExecuteServiceGrpc.newBlockingStub(channel) + .withDeadlineAfter( + grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS); + Iterator<ExecuteReply> replies = stub.execute(request); + ExecuteReply reply = null; + while (replies.hasNext()) { + reply = replies.next(); + // We can handle the action execution progress here. + } + if (reply == null) { + return ExecuteReply.newBuilder() + .setStatus( + ExecutionStatus.newBuilder() + .setExecuted(false) + .setSucceeded(false) + .setError(ExecutionStatus.ErrorCode.UNKNOWN_ERROR) + .setErrorDetail("Remote server terminated the connection")) + .build(); + } + return reply; + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/WorkTooLargeException.java b/src/main/java/com/google/devtools/build/lib/remote/WorkTooLargeException.java deleted file mode 100644 index d1498d73df..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/WorkTooLargeException.java +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.remote; - -/** An exception that indicates the work is too large to run remotely. */ -final class WorkTooLargeException extends RuntimeException { - WorkTooLargeException() { - super(); - } - - WorkTooLargeException(String message) { - super(message); - } - - WorkTooLargeException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/src/main/protobuf/remote_protocol.proto b/src/main/protobuf/remote_protocol.proto index a9dd6a9ed1..c807b00d45 100644 --- a/src/main/protobuf/remote_protocol.proto +++ b/src/main/protobuf/remote_protocol.proto @@ -322,81 +322,3 @@ message ExecutionStatus { // Optionally will add more details pertaining to current stage, for example // time executing, or position in queue, etc. } - -// Previous version of remote execution and caching API. -// Will be removed soon after all code paths are migrated! - -// A message for cache entry. -message CacheEntry { - // A list of files stored in this cache entry. - repeated FileEntry files = 1; - - // A blob for data that is a chunk of a file. - bytes file_content = 2; -} - -// A message for storing a file in cache. -message FileEntry { - // The path in the file system where to read this input artifact from. This is - // either a path relative to the execution root (the worker process is - // launched with the working directory set to the execution root), or an - // absolute path. - string path = 1; - - // The cache key to locate the file content. This key is usually generated - // from - // the content of the file such that different keys means the file content are - // different. - string content_key = 2; - - // Whether the file is an executable. - bool executable = 3; - - // TODO(alpha): For large files we need to break down into chunks to store - // in the cache. For that case we need a index for the chunks of the file. -} - -// A message for running a command remotely. -message RemoteWorkRequest { - // The key for writing the output of this work request. - string output_key = 1; - - // The arguments for running the command. The command itself is in - // arguments[0]. - repeated string arguments = 2; - - // The list of input files to this work request. - repeated FileEntry input_files = 3; - - // A map of environment variables for this command. - map<string, string> environment = 4; - - // The list of expected output files to this work request. - // The content keys for these entries will be empty since the files don't - // exist yet. - repeated FileEntry output_files = 5; - - // Timeout for running this command. - int32 timeout = 6; -} - -// A message for a work response. -message RemoteWorkResponse { - // True if the work was successful. - bool success = 1; - - // String from stdout of running the work. - string out = 2; - - // String from stderr of running the work. - string err = 3; - - // String for the exception when running this work. - string exception = 4; -} - -service RemoteWork { - // Perform work synchronously. - rpc ExecuteSynchronously(RemoteWorkRequest) returns (RemoteWorkResponse) { - } -} |