From 72d117d6fe9485b2ab8cf7938c029cc26d1787da Mon Sep 17 00:00:00 2001 From: Ola Rozenfeld Date: Mon, 19 Sep 2016 15:02:32 +0000 Subject: Description redacted. -- MOS_MIGRATED_REVID=133584935 --- .../build/lib/remote/CacheNotFoundException.java | 23 +- .../build/lib/remote/ConcurrentMapActionCache.java | 205 +++++++------ .../build/lib/remote/ConcurrentMapFactory.java | 242 ++++++++++++++++ .../build/lib/remote/HazelcastCacheFactory.java | 67 ----- .../build/lib/remote/MemcacheWorkExecutor.java | 209 -------------- .../build/lib/remote/RemoteActionCache.java | 84 ++++-- .../devtools/build/lib/remote/RemoteModule.java | 46 +-- .../devtools/build/lib/remote/RemoteOptions.java | 12 +- .../build/lib/remote/RemoteSpawnStrategy.java | 318 ++++++++++----------- .../devtools/build/lib/remote/RemoteUtils.java | 42 +++ .../build/lib/remote/RemoteWorkExecutor.java | 78 +++-- .../build/lib/remote/RestUrlCacheFactory.java | 175 ------------ .../build/lib/remote/WorkTooLargeException.java | 30 -- src/main/protobuf/remote_protocol.proto | 78 ----- .../google/devtools/build/remote/RemoteWorker.java | 163 +++++++++-- 15 files changed, 844 insertions(+), 928 deletions(-) create mode 100644 src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java delete mode 100644 src/main/java/com/google/devtools/build/lib/remote/HazelcastCacheFactory.java delete mode 100644 src/main/java/com/google/devtools/build/lib/remote/MemcacheWorkExecutor.java create mode 100644 src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java delete mode 100644 src/main/java/com/google/devtools/build/lib/remote/RestUrlCacheFactory.java delete mode 100644 src/main/java/com/google/devtools/build/lib/remote/WorkTooLargeException.java diff --git a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java index 0e4e52c1d4..5820e0bf95 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java +++ b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java @@ -14,20 +14,25 @@ package com.google.devtools.build.lib.remote; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; + /** - * An exception to indicate the cache is not found because of an expected - * problem. + * An exception to indicate cache misses. + * TODO(olaola): have a class of checked RemoteCacheExceptions. */ -final class CacheNotFoundException extends RuntimeException { - CacheNotFoundException() { - super(); +public final class CacheNotFoundException extends Exception { + private final ContentDigest missingDigest; + + CacheNotFoundException(ContentDigest missingDigest) { + this.missingDigest = missingDigest; } - CacheNotFoundException(String message) { - super(message); + public ContentDigest getMissingDigest() { + return missingDigest; } - CacheNotFoundException(String message, Throwable cause) { - super(message, cause); + @Override + public String toString() { + return "Missing digest: " + ContentDigests.toString(missingDigest); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java index 7e27653e76..72c75700b5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java @@ -14,18 +14,24 @@ package com.google.devtools.build.lib.remote; -import com.google.common.hash.HashCode; -import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputFileCache; +import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.RemoteProtocol.CacheEntry; -import com.google.devtools.build.lib.remote.RemoteProtocol.FileEntry; +import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; +import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata; +import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode; +import com.google.devtools.build.lib.remote.RemoteProtocol.Output; +import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase; +import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.util.Preconditions; +import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; @@ -38,121 +44,156 @@ import java.util.concurrent.Semaphore; */ @ThreadSafe public final class ConcurrentMapActionCache implements RemoteActionCache { - private final Path execRoot; private final ConcurrentMap cache; private static final int MAX_MEMORY_KBYTES = 512 * 1024; private final Semaphore uploadMemoryAvailable = new Semaphore(MAX_MEMORY_KBYTES, true); - public ConcurrentMapActionCache(Path execRoot, ConcurrentMap cache) { - this.execRoot = execRoot; + public ConcurrentMapActionCache(ConcurrentMap cache) { this.cache = cache; } @Override - public String putFileIfNotExist(ActionInputFileCache cache, ActionInput file) + public void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root) throws IOException, InterruptedException { - String contentKey = HashCode.fromBytes(cache.getDigest(file)).toString(); - if (containsFile(contentKey)) { - return contentKey; + repository.computeMerkleDigests(root); + for (FileNode fileNode : repository.treeToFileNodes(root)) { + uploadBlob(fileNode.toByteArray()); + } + for (TreeNode leaf : repository.leaves(root)) { + uploadFileContents(execRoot.getRelative(leaf.getActionInput().getExecPathString())); } - putFile(contentKey, execRoot.getRelative(file.getExecPathString())); - return contentKey; } - private void putFile(String key, Path file) throws IOException, InterruptedException { - int fileSizeKBytes = (int) (file.getFileSize() / 1024); - Preconditions.checkArgument(fileSizeKBytes < MAX_MEMORY_KBYTES); - try { - uploadMemoryAvailable.acquire(fileSizeKBytes); - // TODO(alpha): I should put the file content as chunks to avoid reading the entire - // file into memory. - try (InputStream stream = file.getInputStream()) { - cache.put( - key, - CacheEntry.newBuilder() - .setFileContent(ByteString.readFrom(stream)) - .build() - .toByteArray()); - } - } finally { - uploadMemoryAvailable.release(fileSizeKBytes); + @Override + public void downloadTree(ContentDigest rootDigest, Path rootLocation) + throws IOException, CacheNotFoundException { + FileNode fileNode = FileNode.parseFrom(downloadBlob(rootDigest)); + if (fileNode.hasFileMetadata()) { + FileMetadata meta = fileNode.getFileMetadata(); + downloadFileContents(meta.getDigest(), rootLocation, meta.getExecutable()); + } + for (FileNode.Child child : fileNode.getChildList()) { + downloadTree(child.getDigest(), rootLocation.getRelative(child.getPath())); } } @Override - public void writeFile(String key, Path dest, boolean executable) + public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException { + // This unconditionally reads the whole file into memory first! + return uploadBlob(ByteString.readFrom(file.getInputStream()).toByteArray()); + } + + @Override + public void downloadAllResults(ActionResult result, Path execRoot) throws IOException, CacheNotFoundException { - byte[] data = cache.get(key); - if (data == null) { - throw new CacheNotFoundException("File content cannot be found with key: " + key); - } - try (OutputStream stream = dest.getOutputStream()) { - CacheEntry.parseFrom(data).getFileContent().writeTo(stream); - dest.setExecutable(executable); + for (Output output : result.getOutputList()) { + if (output.getContentCase() == ContentCase.FILE_METADATA) { + FileMetadata m = output.getFileMetadata(); + downloadFileContents( + m.getDigest(), execRoot.getRelative(output.getPath()), m.getExecutable()); + } else { + downloadTree(output.getDigest(), execRoot.getRelative(output.getPath())); + } } } - private boolean containsFile(String key) { - return cache.containsKey(key); + @Override + public void uploadAllResults(Path execRoot, Collection files, ActionResult.Builder result) + throws IOException, InterruptedException { + for (Path file : files) { + if (file.isDirectory()) { + // TODO(olaola): to implement this for a directory, will need to create or pass a + // TreeNodeRepository to call uploadTree. + throw new UnsupportedOperationException("Storing a directory is not yet supported."); + } + // First put the file content to cache. + ContentDigest digest = uploadFileContents(file); + // Add to protobuf. + result + .addOutputBuilder() + .setPath(file.relativeTo(execRoot).getPathString()) + .getFileMetadataBuilder() + .setDigest(digest) + .setExecutable(file.isExecutable()); + } } @Override - public void writeActionOutput(String key, Path execRoot) + public void downloadFileContents(ContentDigest digest, Path dest, boolean executable) throws IOException, CacheNotFoundException { - byte[] data = cache.get(key); - if (data == null) { - throw new CacheNotFoundException("Action output cannot be found with key: " + key); + // This unconditionally downloads the whole file into memory first! + byte[] contents = downloadBlob(digest); + FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory()); + try (OutputStream stream = dest.getOutputStream()) { + stream.write(contents); } - CacheEntry cacheEntry = CacheEntry.parseFrom(data); - for (FileEntry file : cacheEntry.getFilesList()) { - writeFile(file.getContentKey(), execRoot.getRelative(file.getPath()), file.getExecutable()); + dest.setExecutable(executable); + } + + @Override + public ImmutableList uploadBlobs(Iterable blobs) + throws InterruptedException { + ArrayList digests = new ArrayList<>(); + for (byte[] blob : blobs) { + digests.add(uploadBlob(blob)); } + return ImmutableList.copyOf(digests); } @Override - public void putActionOutput(String key, Collection outputs) - throws IOException, InterruptedException { - CacheEntry.Builder actionOutput = CacheEntry.newBuilder(); - for (ActionInput output : outputs) { - Path file = execRoot.getRelative(output.getExecPathString()); - addToActionOutput(file, output.getExecPathString(), actionOutput); + public ContentDigest uploadBlob(byte[] blob) throws InterruptedException { + int blobSizeKBytes = blob.length / 1024; + Preconditions.checkArgument(blobSizeKBytes < MAX_MEMORY_KBYTES); + ContentDigest digest = ContentDigests.computeDigest(blob); + uploadMemoryAvailable.acquire(blobSizeKBytes); + try { + cache.put(ContentDigests.toHexString(digest), blob); + } finally { + uploadMemoryAvailable.release(blobSizeKBytes); } - cache.put(key, actionOutput.build().toByteArray()); + return digest; } @Override - public void putActionOutput(String key, Path execRoot, Collection files) - throws IOException, InterruptedException { - CacheEntry.Builder actionOutput = CacheEntry.newBuilder(); - for (Path file : files) { - addToActionOutput(file, file.relativeTo(execRoot).getPathString(), actionOutput); + public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException { + if (digest.getSizeBytes() == 0) { + return new byte[0]; + } + // This unconditionally downloads the whole blob into memory! + Preconditions.checkArgument((int) (digest.getSizeBytes() / 1024) < MAX_MEMORY_KBYTES); + byte[] data = cache.get(ContentDigests.toHexString(digest)); + if (data == null) { + throw new CacheNotFoundException(digest); } - cache.put(key, actionOutput.build().toByteArray()); + return data; } - /** Add the file to action output cache entry. Put the file to cache if necessary. */ - private void addToActionOutput(Path file, String execPathString, CacheEntry.Builder actionOutput) - throws IOException, InterruptedException { - if (file.isDirectory()) { - // TODO(alpha): Implement this for directory. - throw new UnsupportedOperationException("Storing a directory is not yet supported."); + @Override + public ImmutableList downloadBlobs(Iterable digests) + throws CacheNotFoundException { + ArrayList blobs = new ArrayList<>(); + for (ContentDigest c : digests) { + blobs.add(downloadBlob(c)); } - // First put the file content to cache. - String contentKey = putFileIfNotExist(file); - // Add to protobuf. - actionOutput - .addFilesBuilder() - .setPath(execPathString) - .setContentKey(contentKey) - .setExecutable(file.isExecutable()); + return ImmutableList.copyOf(blobs); } - private String putFileIfNotExist(Path file) throws IOException, InterruptedException { - String contentKey = HashCode.fromBytes(file.getMD5Digest()).toString(); - if (containsFile(contentKey)) { - return contentKey; + @Override + public ActionResult getCachedActionResult(ActionKey actionKey) { + byte[] data = cache.get(ContentDigests.toHexString(actionKey.getDigest())); + if (data == null) { + return null; } - putFile(contentKey, file); - return contentKey; + try { + return ActionResult.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + return null; + } + } + + @Override + public void setCachedActionResult(ActionKey actionKey, ActionResult result) + throws InterruptedException { + cache.put(ContentDigests.toHexString(actionKey.getDigest()), result.toByteArray()); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java new file mode 100644 index 0000000000..3461673011 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapFactory.java @@ -0,0 +1,242 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote; + +import com.hazelcast.client.HazelcastClient; +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.client.config.ClientNetworkConfig; +import com.hazelcast.client.config.XmlClientConfigBuilder; +import com.hazelcast.config.Config; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpHead; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; + +/** + * A factory class for providing a {@link ConcurrentMap} objects to be used with + * {@link ConcurrentMapActionCache} objects. The underlying maps can be Hazelcast or RestUrl based. + */ +public final class ConcurrentMapFactory { + + private static final String HAZELCAST_CACHE_NAME = "hazelcast-build-cache"; + + private ConcurrentMapFactory() {} + + public static ConcurrentMap createHazelcast(RemoteOptions options) { + HazelcastInstance instance; + if (options.hazelcastClientConfig != null) { + try { + ClientConfig config = new XmlClientConfigBuilder(options.hazelcastClientConfig).build(); + instance = HazelcastClient.newHazelcastClient(config); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (options.hazelcastNode != null) { + // If --hazelcast_node is specified then create a client instance. + ClientConfig config = new ClientConfig(); + ClientNetworkConfig net = config.getNetworkConfig(); + net.addAddress(options.hazelcastNode.split(",")); + instance = HazelcastClient.newHazelcastClient(config); + } else if (options.hazelcastStandaloneListenPort != 0) { + Config config = new Config(); + config + .getNetworkConfig() + .setPort(options.hazelcastStandaloneListenPort) + .getJoin() + .getMulticastConfig() + .setEnabled(false); + instance = Hazelcast.newHazelcastInstance(config); + } else { + // Otherwise create a default instance. This is going to look at + // -Dhazelcast.config=some-hazelcast.xml for configuration. + instance = Hazelcast.newHazelcastInstance(); + } + return instance.getMap(HAZELCAST_CACHE_NAME); + } + + private static class RestUrlCache implements ConcurrentMap { + + final String baseUrl; + + RestUrlCache(String baseUrl) { + this.baseUrl = baseUrl; + } + + @Override + public boolean containsKey(Object key) { + try { + HttpClient client = new DefaultHttpClient(); + HttpHead head = new HttpHead(baseUrl + "/" + key); + HttpResponse response = client.execute(head); + int statusCode = response.getStatusLine().getStatusCode(); + return HttpStatus.SC_OK == statusCode; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public byte[] get(Object key) { + try { + HttpClient client = new DefaultHttpClient(); + HttpGet get = new HttpGet(baseUrl + "/" + key); + HttpResponse response = client.execute(get); + int statusCode = response.getStatusLine().getStatusCode(); + if (HttpStatus.SC_NOT_FOUND == statusCode) { + return null; + } + if (HttpStatus.SC_OK != statusCode) { + throw new RuntimeException("GET failed with status code " + statusCode); + } + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + HttpEntity entity = response.getEntity(); + entity.writeTo(buffer); + buffer.flush(); + EntityUtils.consume(entity); + + return buffer.toByteArray(); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public byte[] put(String key, byte[] value) { + try { + HttpClient client = new DefaultHttpClient(); + HttpPut put = new HttpPut(baseUrl + "/" + key); + put.setEntity(new ByteArrayEntity(value)); + HttpResponse response = client.execute(put); + int statusCode = response.getStatusLine().getStatusCode(); + + if (HttpStatus.SC_OK != statusCode) { + throw new RuntimeException("PUT failed with status code " + statusCode); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + } + + //UnsupportedOperationExceptions from here down + @Override + public int size() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmpty() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsValue(Object value) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] remove(Object key) { + throw new UnsupportedOperationException(); + } + + @Override + public void putAll(Map m) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override + public Set keySet() { + throw new UnsupportedOperationException(); + } + + @Override + public Collection values() { + throw new UnsupportedOperationException(); + } + + @Override + public Set> entrySet() { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] putIfAbsent(String key, byte[] value) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object key, Object value) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean replace(String key, byte[] oldValue, byte[] newValue) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] replace(String key, byte[] value) { + throw new UnsupportedOperationException(); + } + } + + public static ConcurrentMap createRestUrl(RemoteOptions options) { + return new RestUrlCache(options.restCacheUrl); + } + + public static ConcurrentMap create(RemoteOptions options) { + if (isHazelcastOptions(options)) { + return createHazelcast(options); + } + if (isRestUrlOptions(options)) { + return createRestUrl(options); + } + throw new IllegalArgumentException( + "Unrecognized concurrent map RemoteOptions: must specify " + + "either Hazelcast or Rest URL options."); + } + + public static boolean isRemoteCacheOptions(RemoteOptions options) { + return isHazelcastOptions(options) || isRestUrlOptions(options); + } + + private static boolean isHazelcastOptions(RemoteOptions options) { + return options.hazelcastNode != null || options.hazelcastClientConfig != null; + } + + private static boolean isRestUrlOptions(RemoteOptions options) { + return options.restCacheUrl != null; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/HazelcastCacheFactory.java b/src/main/java/com/google/devtools/build/lib/remote/HazelcastCacheFactory.java deleted file mode 100644 index b135f67b6b..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/HazelcastCacheFactory.java +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.remote; - -import com.hazelcast.client.HazelcastClient; -import com.hazelcast.client.config.ClientConfig; -import com.hazelcast.client.config.ClientNetworkConfig; -import com.hazelcast.client.config.XmlClientConfigBuilder; -import com.hazelcast.config.Config; -import com.hazelcast.core.Hazelcast; -import com.hazelcast.core.HazelcastInstance; - -import java.io.IOException; -import java.util.concurrent.ConcurrentMap; - -/** - * A factory class for providing a {@link ConcurrentMap} object implemented by Hazelcast. - * Hazelcast will work as a distributed memory cache. - */ -public final class HazelcastCacheFactory { - - private static final String CACHE_NAME = "hazelcast-build-cache"; - - public static ConcurrentMap create(RemoteOptions options) { - HazelcastInstance instance; - if (options.hazelcastClientConfig != null) { - try { - ClientConfig config = new XmlClientConfigBuilder(options.hazelcastClientConfig).build(); - instance = HazelcastClient.newHazelcastClient(config); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else if (options.hazelcastNode != null) { - // If --hazelcast_node is then create a client instance. - ClientConfig config = new ClientConfig(); - ClientNetworkConfig net = config.getNetworkConfig(); - net.addAddress(options.hazelcastNode.split(",")); - instance = HazelcastClient.newHazelcastClient(config); - } else if (options.hazelcastStandaloneListenPort != 0) { - Config config = new Config(); - config - .getNetworkConfig() - .setPort(options.hazelcastStandaloneListenPort) - .getJoin() - .getMulticastConfig() - .setEnabled(false); - instance = Hazelcast.newHazelcastInstance(config); - } else { - // Otherwise create a default instance. This is going to look at - // -Dhazelcast.config=some-hazelcast.xml for configuration. - instance = Hazelcast.newHazelcastInstance(); - } - return instance.getMap(CACHE_NAME); - } -} diff --git a/src/main/java/com/google/devtools/build/lib/remote/MemcacheWorkExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/MemcacheWorkExecutor.java deleted file mode 100644 index 20cbc6058d..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/MemcacheWorkExecutor.java +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.remote; - -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputFileCache; -import com.google.devtools.build.lib.actions.Artifact; -import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.RemoteProtocol.FileEntry; -import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkResponse; -import com.google.devtools.build.lib.remote.RemoteWorkGrpc.RemoteWorkFutureStub; -import com.google.devtools.build.lib.shell.Command; -import com.google.devtools.build.lib.shell.CommandException; -import com.google.devtools.build.lib.shell.CommandResult; -import com.google.devtools.build.lib.vfs.FileSystemUtils; -import com.google.devtools.build.lib.vfs.Path; -import io.grpc.ManagedChannel; -import io.grpc.netty.NettyChannelBuilder; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.nio.file.FileAlreadyExistsException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -/** - * Implementation of {@link RemoteWorkExecutor} that uses ConcurrentMapActionCache and gRPC for - * communicating the work, inputs and outputs. - */ -@ThreadSafe -public class MemcacheWorkExecutor implements RemoteWorkExecutor { - /** - * A cache used to store the input and output files as well as the build status of the remote - * work. - */ - protected final ConcurrentMapActionCache cache; - - /** Execution root for running this work locally. */ - private final Path execRoot; - - /** Channel over which to send work to run remotely. */ - private final ManagedChannel channel; - - private static final int MAX_WORK_SIZE_BYTES = 1024 * 1024 * 512; - - /** - * This constructor is used when this class is used in a client. It requires a host address and - * port to connect to a remote service. - */ - private MemcacheWorkExecutor(ConcurrentMapActionCache cache, String host, int port) { - this.cache = cache; - this.execRoot = null; - this.channel = NettyChannelBuilder.forAddress(host, port).usePlaintext(true).build(); - } - - /** - * This constructor is used when this class is used in the remote worker. A path to the execution - * root is needed for executing work locally. - */ - private MemcacheWorkExecutor(ConcurrentMapActionCache cache, Path execRoot) { - this.cache = cache; - this.execRoot = execRoot; - this.channel = null; - } - - /** - * Create an instance of MemcacheWorkExecutor that talks to a remote server. - * - * @param cache An instance of ConcurrentMapActionCache. - * @param host Hostname of the server to connect to. - * @param port Port of the server to connect to. - * @return An instance of MemcacheWorkExecutor that talks to a remote server. - */ - public static MemcacheWorkExecutor createRemoteWorkExecutor( - ConcurrentMapActionCache cache, String host, int port) { - return new MemcacheWorkExecutor(cache, host, port); - } - - /** - * Create an instance of MemcacheWorkExecutor that runs locally. - * - * @param cache An instance of ConcurrentMapActionCache. - * @param execRoot Path of the execution root where work is executed. - * @return An instance of MemcacheWorkExecutor tthat runs locally in the execution root. - */ - public static MemcacheWorkExecutor createLocalWorkExecutor( - ConcurrentMapActionCache cache, Path execRoot) { - return new MemcacheWorkExecutor(cache, execRoot); - } - - @Override - public ListenableFuture executeRemotely( - Path execRoot, - ActionInputFileCache actionCache, - String actionOutputKey, - Collection arguments, - Collection inputs, - ImmutableMap environment, - Collection outputs, - int timeout) - throws IOException, WorkTooLargeException, InterruptedException { - RemoteWorkRequest.Builder work = RemoteWorkRequest.newBuilder(); - work.setOutputKey(actionOutputKey); - - long workSize = 0; - for (ActionInput input : inputs) { - if (!(input instanceof Artifact)) { - continue; - } - if (!actionCache.isFile((Artifact) input)) { - continue; - } - workSize += actionCache.getSizeInBytes(input); - } - - if (workSize > MAX_WORK_SIZE_BYTES) { - throw new WorkTooLargeException("Work is too large: " + workSize + " bytes."); - } - - // Save all input files to cache. - for (ActionInput input : inputs) { - Path file = execRoot.getRelative(input.getExecPathString()); - - if (file.isDirectory()) { - // TODO(alpha): Handle this case better. - throw new UnsupportedOperationException( - "Does not support directory artifacts: " + file + "."); - } - - String contentKey = cache.putFileIfNotExist(actionCache, input); - work.addInputFilesBuilder() - .setPath(input.getExecPathString()) - .setContentKey(contentKey) - .setExecutable(file.isExecutable()); - } - - work.addAllArguments(arguments); - work.putAllEnvironment(environment); - for (ActionInput output : outputs) { - work.addOutputFilesBuilder().setPath(output.getExecPathString()); - } - - RemoteWorkFutureStub stub = RemoteWorkGrpc.newFutureStub(channel); - work.setTimeout(timeout); - return stub.executeSynchronously(work.build()); - } - - /** Execute a work item locally. */ - public RemoteWorkResponse executeLocally(RemoteWorkRequest work) - throws IOException, InterruptedException { - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - try { - // Prepare directories and input files. - for (FileEntry input : work.getInputFilesList()) { - Path file = execRoot.getRelative(input.getPath()); - FileSystemUtils.createDirectoryAndParents(file.getParentDirectory()); - cache.writeFile(input.getContentKey(), file, input.getExecutable()); - } - - List outputs = new ArrayList<>(work.getOutputFilesList().size()); - for (FileEntry output : work.getOutputFilesList()) { - Path file = execRoot.getRelative(output.getPath()); - if (file.exists()) { - throw new FileAlreadyExistsException("Output file already exists: " + file); - } - FileSystemUtils.createDirectoryAndParents(file.getParentDirectory()); - outputs.add(file); - } - - Command cmd = - new Command( - work.getArgumentsList().toArray(new String[] {}), - work.getEnvironment(), - new File(execRoot.getPathString())); - CommandResult result = - cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true); - cache.putActionOutput(work.getOutputKey(), execRoot, outputs); - return RemoteWorkResponse.newBuilder() - .setSuccess(result.getTerminationStatus().success()) - .setOut(stdout.toString()) - .setErr(stderr.toString()) - .build(); - } catch (CommandException e) { - return RemoteWorkResponse.newBuilder() - .setSuccess(false) - .setOut(stdout.toString()) - .setErr(stderr.toString()) - .setException(e.toString()) - .build(); - } - } -} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java index e865c96e2c..62790e8a76 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java @@ -14,47 +14,87 @@ package com.google.devtools.build.lib.remote; -import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputFileCache; +import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible; +import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; +import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.vfs.Path; import java.io.IOException; import java.util.Collection; +import javax.annotation.Nullable; -/** - * A cache for storing artifacts (input and output) as well as the output of running an action. - */ +/** A cache for storing artifacts (input and output) as well as the output of running an action. */ @ThreadCompatible interface RemoteActionCache { + // CAS API + + // TODO(olaola): create a unified set of exceptions raised by the cache to encapsulate the + // underlying CasStatus messages and gRPC errors errors. + /** - * Put the file in cache if it is not already in it. No-op if the file is already stored in cache. - * - * @return The key for fetching the file from cache. + * Upload enough of the tree metadata and data into remote cache so that the entire tree can be + * reassembled remotely using the root digest. */ - String putFileIfNotExist(ActionInputFileCache cache, ActionInput file) + void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root) throws IOException, InterruptedException; /** - * Write the file in cache identified by key to the file system. The key must uniquely identify - * the content of the file. Throws CacheNotFoundException if the file is not found in cache. + * Download the entire tree data rooted by the given digest and write it into the given location. */ - void writeFile(String key, Path dest, boolean executable) + void downloadTree(ContentDigest rootDigest, Path rootLocation) throws IOException, CacheNotFoundException; /** - * Write the action output files identified by the key to the file system. The key must uniquely - * identify the action and the content of action inputs. - * - * @throws CacheNotFoundException if action output is not found in cache. + * Download all results of a remotely executed action locally. TODO(olaola): will need to amend to + * include the {@link com.google.devtools.build.lib.remote.TreeNodeRepository} for updating. */ - void writeActionOutput(String key, Path execRoot) + void downloadAllResults(ActionResult result, Path execRoot) throws IOException, CacheNotFoundException; - /** Update the cache with the action outputs for the specified key. */ - void putActionOutput(String key, Collection outputs) + /** + * Upload all results of a locally executed action to the cache. Add the files to the ActionResult + * builder. + */ + void uploadAllResults(Path execRoot, Collection files, ActionResult.Builder result) throws IOException, InterruptedException; - /** Update the cache with the files for the specified key. */ - void putActionOutput(String key, Path execRoot, Collection files) - throws IOException, InterruptedException; + /** + * Put the file contents cache if it is not already in it. No-op if the file is already stored in + * cache. The given path must be a full absolute path. Note: this is horribly inefficient, need to + * patch through an overload that uses an ActionInputFile cache to compute the digests! + * + * @return The key for fetching the file contents blob from cache. + */ + ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException; + + /** + * Download a blob keyed by the given digest and write it to the specified path. Set the + * executable parameter to the specified value. + */ + void downloadFileContents(ContentDigest digest, Path dest, boolean executable) + throws IOException, CacheNotFoundException; + + /** Upload the given blobs to the cache, and return their digests. */ + ImmutableList uploadBlobs(Iterable blobs) throws InterruptedException; + + /** Upload the given blob to the cache, and return its digests. */ + ContentDigest uploadBlob(byte[] blob) throws InterruptedException; + + /** Download and return a blob with a given digest from the cache. */ + byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException; + + /** Download and return blobs with given digests from the cache. */ + ImmutableList downloadBlobs(Iterable digests) + throws CacheNotFoundException; + + // Execution Cache API + + /** Returns a cached result for a given Action digest, or null if not found in cache. */ + @Nullable + ActionResult getCachedActionResult(ActionKey actionKey); + + /** Sets the given result as result of the given Action. */ + void setCachedActionResult(ActionKey actionKey, ActionResult result) throws InterruptedException; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 16ce15ca65..93f88ad244 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -17,6 +17,7 @@ package com.google.devtools.build.lib.remote; import com.google.common.collect.ImmutableList; import com.google.common.eventbus.Subscribe; import com.google.devtools.build.lib.actions.ActionContextProvider; +import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; import com.google.devtools.build.lib.buildtool.BuildRequest; import com.google.devtools.build.lib.buildtool.buildevent.BuildStartingEvent; import com.google.devtools.build.lib.events.Event; @@ -24,12 +25,8 @@ import com.google.devtools.build.lib.runtime.BlazeModule; import com.google.devtools.build.lib.runtime.Command; import com.google.devtools.build.lib.runtime.CommandEnvironment; import com.google.devtools.common.options.OptionsBase; -import java.net.URI; -import java.net.URISyntaxException; -/** - * RemoteModule provides distributed cache and remote execution for Bazel. - */ +/** RemoteModule provides distributed cache and remote execution for Bazel. */ public final class RemoteModule extends BlazeModule { private CommandEnvironment env; private BuildRequest buildRequest; @@ -61,38 +58,21 @@ public final class RemoteModule extends BlazeModule { buildRequest = event.getRequest(); RemoteOptions options = buildRequest.getOptions(RemoteOptions.class); - ConcurrentMapActionCache cache = null; + try { + // Reinitialize the remote cache and worker from options every time, because the options + // may change from build to build. - // Don't provide the remote spawn unless at least action cache is initialized. - if (actionCache == null) { - if (options.hazelcastNode != null || options.hazelcastClientConfig != null) { - cache = - new ConcurrentMapActionCache( - this.env.getDirectories().getExecRoot(), - HazelcastCacheFactory.create(options)); - } else if (options.restCacheUrl != null) { - cache = - new ConcurrentMapActionCache( - this.env.getDirectories().getExecRoot(), - RestUrlCacheFactory.create(options)); + // Don't provide the remote spawn unless at least action cache is initialized. + if (ConcurrentMapFactory.isRemoteCacheOptions(options)) { + actionCache = new ConcurrentMapActionCache(ConcurrentMapFactory.create(options)); } - actionCache = cache; - } + // Otherwise actionCache remains null and remote caching/execution are disabled. - if (cache != null) { - if (workExecutor == null && options.remoteWorker != null) { - try { - URI uri = new URI("dummy://" + options.remoteWorker); - if (uri.getHost() == null || uri.getPort() == -1) { - throw new URISyntaxException("Invalid host or port.", ""); - } - workExecutor = - MemcacheWorkExecutor.createRemoteWorkExecutor(cache, uri.getHost(), uri.getPort()); - } catch (URISyntaxException e) { - env.getReporter() - .handle(Event.warn("Invalid argument for the address of remote worker.")); - } + if (actionCache != null && RemoteWorkExecutor.isRemoteExecutionOptions(options)) { + workExecutor = new RemoteWorkExecutor(options); } + } catch (InvalidConfigurationException e) { + env.getReporter().handle(Event.warn(e.toString())); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java index 78747c8090..9ed8ff2f8d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java @@ -17,9 +17,7 @@ package com.google.devtools.build.lib.remote; import com.google.devtools.common.options.Option; import com.google.devtools.common.options.OptionsBase; -/** - * Options for remote execution and distributed caching. - */ +/** Options for remote execution and distributed caching. */ public final class RemoteOptions extends OptionsBase { @Option( name = "rest_cache_url", @@ -66,4 +64,12 @@ public final class RemoteOptions extends OptionsBase { + "For client mode only." ) public String remoteWorker; + + @Option( + name = "grpc_timeout_seconds", + defaultValue = "60", + category = "remote", + help = "The maximal number of seconds to wait for remote calls. For client mode only." + ) + public int grpcTimeoutSeconds; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java index 2244ca73cc..d2b30cef97 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java @@ -14,41 +14,44 @@ package com.google.devtools.build.lib.remote; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; -import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.actions.ActionExecutionContext; -import com.google.devtools.build.lib.actions.ActionExecutionMetadata; import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.actions.ActionInputHelper; import com.google.devtools.build.lib.actions.ExecException; import com.google.devtools.build.lib.actions.ExecutionStrategy; -import com.google.devtools.build.lib.actions.Executor; import com.google.devtools.build.lib.actions.Spawn; import com.google.devtools.build.lib.actions.SpawnActionContext; import com.google.devtools.build.lib.actions.Spawns; import com.google.devtools.build.lib.actions.UserExecException; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.EventHandler; -import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkResponse; +import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; +import com.google.devtools.build.lib.remote.RemoteProtocol.Action; +import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; +import com.google.devtools.build.lib.remote.RemoteProtocol.Command; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; +import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.standalone.StandaloneSpawnStrategy; -import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.Path; +import io.grpc.StatusRuntimeException; import java.io.IOException; -import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.TreeSet; /** - * Strategy that uses a distributed cache for sharing action input and output files. - * Optionally this strategy also support offloading the work to a remote worker. + * Strategy that uses a distributed cache for sharing action input and output files. Optionally this + * strategy also support offloading the work to a remote worker. */ @ExecutionStrategy( name = {"remote"}, @@ -74,179 +77,170 @@ final class RemoteSpawnStrategy implements SpawnActionContext { this.remoteWorkExecutor = workExecutor; } - /** Executes the given {@code spawn}. */ - @Override - public void exec(Spawn spawn, ActionExecutionContext actionExecutionContext) - throws ExecException, InterruptedException { - if (!spawn.isRemotable()) { - standaloneStrategy.exec(spawn, actionExecutionContext); - return; + private Action buildAction( + Collection outputs, ContentDigest command, ContentDigest inputRoot) { + Action.Builder action = Action.newBuilder(); + action.setCommandDigest(command); + action.setInputRootDigest(inputRoot); + // Somewhat ugly: we rely on the stable order of outputs here for remote action caching. + for (ActionInput output : outputs) { + action.addOutputPath(output.getExecPathString()); } + // TODO(olaola): Need to set platform as well! + return action.build(); + } - Executor executor = actionExecutionContext.getExecutor(); - ActionExecutionMetadata actionMetadata = spawn.getResourceOwner(); - ActionInputFileCache inputFileCache = actionExecutionContext.getActionInputFileCache(); - EventHandler eventHandler = executor.getEventHandler(); - - if (remoteActionCache == null) { - eventHandler.handle( - Event.warn( - spawn.getMnemonic() + " Cannot instantiate remote action cache. Running locally.")); - standaloneStrategy.exec(spawn, actionExecutionContext); - return; + private Command buildCommand(List arguments, ImmutableMap environment) { + Command.Builder command = Command.newBuilder(); + command.addAllArgv(arguments); + // Sorting the environment pairs by variable name. + TreeSet variables = new TreeSet<>(environment.keySet()); + for (String var : variables) { + command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var)); } + return command.build(); + } - // Compute a hash code to uniquely identify the action plus the action inputs. - Hasher hasher = Hashing.sha256().newHasher(); - - // TODO(alpha): The action key is usually computed using the path to the tool and the - // arguments. It does not take into account the content / version of the system tool (e.g. gcc). - // Either I put information about the system tools in the hash or assume tools are always - // checked in. - Preconditions.checkNotNull(actionMetadata.getKey()); - hasher.putString(actionMetadata.getKey(), Charset.defaultCharset()); - - List inputs = - ActionInputHelper.expandArtifacts( - spawn.getInputFiles(), actionExecutionContext.getArtifactExpander()); - for (ActionInput input : inputs) { - hasher.putString(input.getExecPathString(), Charset.defaultCharset()); + /** + * Fallback: execute the spawn locally. If an ActionKey is provided, try to upload results to + * remote action cache. + */ + private void execLocally( + Spawn spawn, ActionExecutionContext actionExecutionContext, ActionKey actionKey) + throws ExecException, InterruptedException { + standaloneStrategy.exec(spawn, actionExecutionContext); + if (remoteActionCache != null && actionKey != null) { + ArrayList outputFiles = new ArrayList<>(); + for (ActionInput output : spawn.getOutputFiles()) { + outputFiles.add(execRoot.getRelative(output.getExecPathString())); + } try { - // TODO(alpha): The digest from ActionInputFileCache is used to detect local file - // changes. It might not be sufficient to identify the input file globally in the - // remote action cache. Consider upgrading this to a better hash algorithm with - // less collision. - hasher.putBytes(inputFileCache.getDigest(input)); + ActionResult.Builder result = ActionResult.newBuilder(); + remoteActionCache.uploadAllResults(execRoot, outputFiles, result); + remoteActionCache.setCachedActionResult(actionKey, result.build()); + // Handle all cache errors here. } catch (IOException e) { - throw new UserExecException("Failed to get digest for input.", e); + throw new UserExecException("Unexpected IO error.", e); + } catch (UnsupportedOperationException e) { + actionExecutionContext + .getExecutor() + .getEventHandler() + .handle( + Event.warn( + spawn.getMnemonic() + " unsupported operation for action cache (" + e + ")")); } } + } - // Save the action output if found in the remote action cache. - String actionOutputKey = hasher.hash().toString(); + private void passRemoteOutErr(ActionResult result, FileOutErr outErr) { + if (remoteActionCache == null) { + return; + } + try { + ImmutableList streams = + remoteActionCache.downloadBlobs( + ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest())); + outErr.printOut(new String(streams.get(0), UTF_8)); + outErr.printErr(new String(streams.get(1), UTF_8)); + } catch (CacheNotFoundException e) { + // Ignoring. + } + } - // Timeout for running the remote spawn. - final int timeoutSeconds = Spawns.getTimeoutSeconds(spawn, 120); + /** Executes the given {@code spawn}. */ + @Override + public void exec(Spawn spawn, ActionExecutionContext actionExecutionContext) + throws ExecException, InterruptedException { + if (!spawn.isRemotable() || remoteActionCache == null) { + standaloneStrategy.exec(spawn, actionExecutionContext); + return; + } + + ActionKey actionKey = null; + String mnemonic = spawn.getMnemonic(); + EventHandler eventHandler = actionExecutionContext.getExecutor().getEventHandler(); try { - // Look up action cache using |actionOutputKey|. Reuse the action output if it is found. - if (writeActionOutput(spawn.getMnemonic(), actionOutputKey, eventHandler, true)) { - return; + // Temporary hack: the TreeNodeRepository should be created and maintained upstream! + TreeNodeRepository repository = new TreeNodeRepository(execRoot); + List inputs = + ActionInputHelper.expandArtifacts( + spawn.getInputFiles(), actionExecutionContext.getArtifactExpander()); + TreeNode inputRoot = repository.buildFromActionInputs(inputs); + repository.computeMerkleDigests(inputRoot); + Command command = buildCommand(spawn.getArguments(), spawn.getEnvironment()); + Action action = + buildAction( + spawn.getOutputFiles(), + ContentDigests.computeDigest(command), + repository.getMerkleDigest(inputRoot)); + + // Look up action cache, and reuse the action output if it is found. + actionKey = ContentDigests.computeActionKey(action); + ActionResult result = remoteActionCache.getCachedActionResult(actionKey); + boolean acceptCached = true; + if (result != null) { + // We don't cache failed actions, so we know the outputs exist. + // For now, download all outputs locally; in the future, we can reuse the digests to + // just update the TreeNodeRepository and continue the build. + try { + remoteActionCache.downloadAllResults(result, execRoot); + return; + } catch (CacheNotFoundException e) { + acceptCached = false; // Retry the action remotely and invalidate the results. + } } - FileOutErr outErr = actionExecutionContext.getFileOutErr(); - if (executeWorkRemotely( - inputFileCache, - spawn.getMnemonic(), - actionOutputKey, - spawn.getArguments(), - inputs, - spawn.getEnvironment(), - spawn.getOutputFiles(), - timeoutSeconds, - eventHandler, - outErr)) { + if (remoteWorkExecutor == null) { + execLocally(spawn, actionExecutionContext, actionKey); return; } - // If nothing works then run spawn locally. - standaloneStrategy.exec(spawn, actionExecutionContext); - if (remoteActionCache != null) { - remoteActionCache.putActionOutput(actionOutputKey, spawn.getOutputFiles()); + // Upload the command and all the inputs into the remote cache. + remoteActionCache.uploadBlob(command.toByteArray()); + // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests! + remoteActionCache.uploadTree(repository, execRoot, inputRoot); + // TODO(olaola): set BuildInfo and input total bytes as well. + ExecuteRequest.Builder request = + ExecuteRequest.newBuilder() + .setAction(action) + .setAcceptCached(acceptCached) + .setTotalInputFileCount(inputs.size()) + .setTimeoutMillis(1000 * Spawns.getTimeoutSeconds(spawn, 120)); + // TODO(olaola): set sensible local and remote timouts. + ExecuteReply reply = remoteWorkExecutor.executeRemotely(request.build()); + ExecutionStatus status = reply.getStatus(); + result = reply.getResult(); + // We do not want to pass on the remote stdout and strerr if we are going to retry the + // action. + if (status.getSucceeded()) { + passRemoteOutErr(result, actionExecutionContext.getFileOutErr()); + remoteActionCache.downloadAllResults(result, execRoot); + return; } + if (status.getError() == ExecutionStatus.ErrorCode.EXEC_FAILED) { + passRemoteOutErr(result, actionExecutionContext.getFileOutErr()); + throw new UserExecException(status.getErrorDetail()); + } + // For now, we retry locally on all other remote errors. + // TODO(olaola): add remote retries on cache miss errors. + execLocally(spawn, actionExecutionContext, actionKey); } catch (IOException e) { throw new UserExecException("Unexpected IO error.", e); - } catch (UnsupportedOperationException e) { - eventHandler.handle( - Event.warn(spawn.getMnemonic() + " unsupported operation for action cache (" + e + ")")); - } - } - - /** - * Submit work to execute remotely. - * - * @return True in case the action succeeded and all expected action outputs are found. - */ - private boolean executeWorkRemotely( - ActionInputFileCache actionCache, - String mnemonic, - String actionOutputKey, - List arguments, - List inputs, - ImmutableMap environment, - Collection outputs, - int timeout, - EventHandler eventHandler, - FileOutErr outErr) - throws IOException, InterruptedException { - if (remoteWorkExecutor == null) { - return false; - } - try { - ListenableFuture future = - remoteWorkExecutor.executeRemotely( - execRoot, - actionCache, - actionOutputKey, - arguments, - inputs, - environment, - outputs, - timeout); - RemoteWorkResponse response = future.get(timeout, TimeUnit.SECONDS); - if (!response.getSuccess()) { - String exception = ""; - if (!response.getException().isEmpty()) { - exception = " (" + response.getException() + ")"; - } - eventHandler.handle( - Event.warn( - mnemonic + " failed to execute work remotely" + exception + ", running locally")); - return false; - } - outErr.printOut(response.getOut()); - outErr.printErr(response.getErr()); - } catch (ExecutionException e) { - eventHandler.handle( - Event.warn(mnemonic + " failed to execute work remotely (" + e + "), running locally")); - return false; - } catch (TimeoutException e) { - eventHandler.handle( - Event.warn(mnemonic + " timed out executing work remotely (" + e + "), running locally")); - return false; } catch (InterruptedException e) { eventHandler.handle(Event.warn(mnemonic + " remote work interrupted (" + e + ")")); + Thread.currentThread().interrupt(); throw e; - } catch (WorkTooLargeException e) { - eventHandler.handle(Event.warn(mnemonic + " cannot be run remotely (" + e + ")")); - return false; - } - return writeActionOutput(mnemonic, actionOutputKey, eventHandler, false); - } - - /** - * Saves the action output from cache. Returns true if all action outputs are found. - */ - private boolean writeActionOutput( - String mnemonic, - String actionOutputKey, - EventHandler eventHandler, - boolean ignoreCacheNotFound) - throws IOException { - if (remoteActionCache == null) { - return false; - } - try { - remoteActionCache.writeActionOutput(actionOutputKey, execRoot); - Event.info(mnemonic + " reuse action outputs from cache"); - return true; + } catch (StatusRuntimeException e) { + eventHandler.handle(Event.warn(mnemonic + " remote work failed (" + e + ")")); + execLocally(spawn, actionExecutionContext, actionKey); } catch (CacheNotFoundException e) { - if (!ignoreCacheNotFound) { - eventHandler.handle( - Event.warn(mnemonic + " some cache entries cannot be found (" + e + ")")); - } + eventHandler.handle(Event.warn(mnemonic + " remote work results cache miss (" + e + ")")); + execLocally(spawn, actionExecutionContext, actionKey); + } catch (UnsupportedOperationException e) { + eventHandler.handle( + Event.warn(mnemonic + " unsupported operation for action cache (" + e + ")")); } - return false; } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java new file mode 100644 index 0000000000..9747e3ef11 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java @@ -0,0 +1,42 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote; + +import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; +import io.grpc.ManagedChannel; +import io.grpc.netty.NettyChannelBuilder; +import java.net.URI; +import java.net.URISyntaxException; + +/** Helper methods for gRPC calls */ +@ThreadSafe +public final class RemoteUtils { + public static ManagedChannel createChannel(String hostAndPort) + throws InvalidConfigurationException { + try { + URI uri = new URI("dummy://" + hostAndPort); + if (uri.getHost() == null || uri.getPort() == -1) { + throw new URISyntaxException("Invalid host or port.", ""); + } + return NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort()) + .usePlaintext(true) + .build(); + } catch (URISyntaxException e) { + throw new InvalidConfigurationException( + "Invalid argument for the address of remote cache server: " + hostAndPort); + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java index 198e9262a7..9cb080b402 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteWorkExecutor.java @@ -14,35 +14,53 @@ package com.google.devtools.build.lib.remote; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputFileCache; -import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible; -import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkResponse; -import com.google.devtools.build.lib.vfs.Path; -import java.io.IOException; -import java.util.Collection; +import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; +import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; +import io.grpc.ManagedChannel; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; -/** - * Interface for exeucting work remotely. - */ -@ThreadCompatible -public interface RemoteWorkExecutor { - /** - * Submit the work to this work executor. The output of running this action should be written to - * {@link RemoteActionCache} indexed by {@code actionOutputKey}. - * - *

Returns a future for the response of this work request. - */ - ListenableFuture executeRemotely( - Path execRoot, - ActionInputFileCache cache, - String actionOutputKey, - Collection arguments, - Collection inputs, - ImmutableMap environment, - Collection outputs, - int timeout) - throws IOException, WorkTooLargeException, InterruptedException; +/** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */ +@ThreadSafe +public class RemoteWorkExecutor { + /** Channel over which to send work to run remotely. */ + private final ManagedChannel channel; + private final int grpcTimeoutSeconds; + + public RemoteWorkExecutor(RemoteOptions options) throws InvalidConfigurationException { + channel = RemoteUtils.createChannel(options.remoteWorker); + grpcTimeoutSeconds = options.grpcTimeoutSeconds; + } + + public static boolean isRemoteExecutionOptions(RemoteOptions options) { + return options.remoteWorker != null; + } + + public ExecuteReply executeRemotely(ExecuteRequest request) { + ExecuteServiceBlockingStub stub = + ExecuteServiceGrpc.newBlockingStub(channel) + .withDeadlineAfter( + grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS); + Iterator replies = stub.execute(request); + ExecuteReply reply = null; + while (replies.hasNext()) { + reply = replies.next(); + // We can handle the action execution progress here. + } + if (reply == null) { + return ExecuteReply.newBuilder() + .setStatus( + ExecutionStatus.newBuilder() + .setExecuted(false) + .setSucceeded(false) + .setError(ExecutionStatus.ErrorCode.UNKNOWN_ERROR) + .setErrorDetail("Remote server terminated the connection")) + .build(); + } + return reply; + } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RestUrlCacheFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RestUrlCacheFactory.java deleted file mode 100644 index aa98b253e7..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/RestUrlCacheFactory.java +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.remote; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.HttpStatus; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpHead; -import org.apache.http.client.methods.HttpPut; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.DefaultHttpClient; -import org.apache.http.util.EntityUtils; - -/** - * A factory class for providing a {@link ConcurrentMap} object implemented by a REST service. The - * URL has to support PUT, GET, and HEAD operations - */ -public final class RestUrlCacheFactory { - - public static ConcurrentMap create(RemoteOptions options) { - return new RestUrlCache(options.restCacheUrl); - } - - private static class RestUrlCache implements ConcurrentMap { - - final String baseUrl; - - RestUrlCache(String baseUrl) { - this.baseUrl = baseUrl; - } - - @Override - public boolean containsKey(Object key) { - try { - HttpClient client = new DefaultHttpClient(); - HttpHead head = new HttpHead(baseUrl + "/" + key); - HttpResponse response = client.execute(head); - int statusCode = response.getStatusLine().getStatusCode(); - return HttpStatus.SC_OK == statusCode; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public byte[] get(Object key) { - try { - HttpClient client = new DefaultHttpClient(); - HttpGet get = new HttpGet(baseUrl + "/" + key); - HttpResponse response = client.execute(get); - int statusCode = response.getStatusLine().getStatusCode(); - if (HttpStatus.SC_NOT_FOUND == statusCode) { - return null; - } - if (HttpStatus.SC_OK != statusCode) { - throw new RuntimeException("GET failed with status code " + statusCode); - } - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - HttpEntity entity = response.getEntity(); - entity.writeTo(buffer); - buffer.flush(); - EntityUtils.consume(entity); - - return buffer.toByteArray(); - - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public byte[] put(String key, byte[] value) { - try { - HttpClient client = new DefaultHttpClient(); - HttpPut put = new HttpPut(baseUrl + "/" + key); - put.setEntity(new ByteArrayEntity(value)); - HttpResponse response = client.execute(put); - int statusCode = response.getStatusLine().getStatusCode(); - - if (HttpStatus.SC_OK != statusCode) { - throw new RuntimeException("PUT failed with status code " + statusCode); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - return null; - } - - //UnsupportedOperationExceptions from here down - @Override - public int size() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isEmpty() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean containsValue(Object value) { - throw new UnsupportedOperationException(); - } - - @Override - public byte[] remove(Object key) { - throw new UnsupportedOperationException(); - } - - @Override - public void putAll(Map m) { - throw new UnsupportedOperationException(); - } - - @Override - public void clear() { - throw new UnsupportedOperationException(); - } - - @Override - public Set keySet() { - throw new UnsupportedOperationException(); - } - - @Override - public Collection values() { - throw new UnsupportedOperationException(); - } - - @Override - public Set> entrySet() { - throw new UnsupportedOperationException(); - } - - @Override - public byte[] putIfAbsent(String key, byte[] value) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean remove(Object key, Object value) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean replace(String key, byte[] oldValue, byte[] newValue) { - throw new UnsupportedOperationException(); - } - - @Override - public byte[] replace(String key, byte[] value) { - throw new UnsupportedOperationException(); - } - } -} diff --git a/src/main/java/com/google/devtools/build/lib/remote/WorkTooLargeException.java b/src/main/java/com/google/devtools/build/lib/remote/WorkTooLargeException.java deleted file mode 100644 index d1498d73df..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/WorkTooLargeException.java +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.google.devtools.build.lib.remote; - -/** An exception that indicates the work is too large to run remotely. */ -final class WorkTooLargeException extends RuntimeException { - WorkTooLargeException() { - super(); - } - - WorkTooLargeException(String message) { - super(message); - } - - WorkTooLargeException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/src/main/protobuf/remote_protocol.proto b/src/main/protobuf/remote_protocol.proto index a9dd6a9ed1..c807b00d45 100644 --- a/src/main/protobuf/remote_protocol.proto +++ b/src/main/protobuf/remote_protocol.proto @@ -322,81 +322,3 @@ message ExecutionStatus { // Optionally will add more details pertaining to current stage, for example // time executing, or position in queue, etc. } - -// Previous version of remote execution and caching API. -// Will be removed soon after all code paths are migrated! - -// A message for cache entry. -message CacheEntry { - // A list of files stored in this cache entry. - repeated FileEntry files = 1; - - // A blob for data that is a chunk of a file. - bytes file_content = 2; -} - -// A message for storing a file in cache. -message FileEntry { - // The path in the file system where to read this input artifact from. This is - // either a path relative to the execution root (the worker process is - // launched with the working directory set to the execution root), or an - // absolute path. - string path = 1; - - // The cache key to locate the file content. This key is usually generated - // from - // the content of the file such that different keys means the file content are - // different. - string content_key = 2; - - // Whether the file is an executable. - bool executable = 3; - - // TODO(alpha): For large files we need to break down into chunks to store - // in the cache. For that case we need a index for the chunks of the file. -} - -// A message for running a command remotely. -message RemoteWorkRequest { - // The key for writing the output of this work request. - string output_key = 1; - - // The arguments for running the command. The command itself is in - // arguments[0]. - repeated string arguments = 2; - - // The list of input files to this work request. - repeated FileEntry input_files = 3; - - // A map of environment variables for this command. - map environment = 4; - - // The list of expected output files to this work request. - // The content keys for these entries will be empty since the files don't - // exist yet. - repeated FileEntry output_files = 5; - - // Timeout for running this command. - int32 timeout = 6; -} - -// A message for a work response. -message RemoteWorkResponse { - // True if the work was successful. - bool success = 1; - - // String from stdout of running the work. - string out = 2; - - // String from stderr of running the work. - string err = 3; - - // String for the exception when running this work. - string exception = 4; -} - -service RemoteWork { - // Perform work synchronously. - rpc ExecuteSynchronously(RemoteWorkRequest) returns (RemoteWorkResponse) { - } -} diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java index 414737a810..5325b8fd4b 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java @@ -14,13 +14,25 @@ package com.google.devtools.build.remote; +import com.google.common.collect.ImmutableList; +import com.google.devtools.build.lib.remote.CacheNotFoundException; import com.google.devtools.build.lib.remote.ConcurrentMapActionCache; -import com.google.devtools.build.lib.remote.HazelcastCacheFactory; -import com.google.devtools.build.lib.remote.MemcacheWorkExecutor; +import com.google.devtools.build.lib.remote.ConcurrentMapFactory; +import com.google.devtools.build.lib.remote.ContentDigests; +import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceImplBase; import com.google.devtools.build.lib.remote.RemoteOptions; -import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.RemoteWorkResponse; -import com.google.devtools.build.lib.remote.RemoteWorkGrpc.RemoteWorkImplBase; +import com.google.devtools.build.lib.remote.RemoteProtocol; +import com.google.devtools.build.lib.remote.RemoteProtocol.Action; +import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; +import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus; +import com.google.devtools.build.lib.remote.RemoteProtocol.Command.EnvironmentEntry; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; +import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; +import com.google.devtools.build.lib.shell.AbnormalTerminationException; +import com.google.devtools.build.lib.shell.Command; +import com.google.devtools.build.lib.shell.CommandException; import com.google.devtools.build.lib.util.OS; import com.google.devtools.build.lib.util.ProcessUtils; import com.google.devtools.build.lib.vfs.FileSystem; @@ -32,11 +44,17 @@ import com.google.devtools.common.options.OptionsParser; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; +import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.io.PrintWriter; +import java.nio.file.FileAlreadyExistsException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; import java.util.logging.Logger; @@ -44,61 +62,147 @@ import java.util.logging.Logger; * Implements a remote worker that accepts work items as protobufs. The server implementation is * based on grpc. */ -public class RemoteWorker extends RemoteWorkImplBase { +public class RemoteWorker extends ExecuteServiceImplBase { private static final Logger LOG = Logger.getLogger(RemoteWorker.class.getName()); private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER); private final Path workPath; private final RemoteOptions remoteOptions; private final RemoteWorkerOptions options; - private final ConcurrentMap cache; + private final ConcurrentMapActionCache cache; public RemoteWorker( Path workPath, RemoteOptions remoteOptions, RemoteWorkerOptions options, - ConcurrentMap cache) { + ConcurrentMapActionCache cache) { this.workPath = workPath; this.remoteOptions = remoteOptions; this.options = options; this.cache = cache; } + private Map getEnvironmentVariables(RemoteProtocol.Command command) { + HashMap result = new HashMap<>(); + for (EnvironmentEntry entry : command.getEnvironmentList()) { + result.put(entry.getVariable(), entry.getValue()); + } + return result; + } + + public ExecuteReply execute(Action action, Path execRoot) + throws IOException, InterruptedException { + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + try { + RemoteProtocol.Command command = + RemoteProtocol.Command.parseFrom(cache.downloadBlob(action.getCommandDigest())); + cache.downloadTree(action.getInputRootDigest(), execRoot); + + List outputs = new ArrayList<>(action.getOutputPathList().size()); + for (String output : action.getOutputPathList()) { + Path file = execRoot.getRelative(output); + if (file.exists()) { + throw new FileAlreadyExistsException("Output file already exists: " + file); + } + FileSystemUtils.createDirectoryAndParents(file.getParentDirectory()); + outputs.add(file); + } + + // TODO(olaola): time out after specified server-side deadline. + Command cmd = + new Command( + command.getArgvList().toArray(new String[] {}), + getEnvironmentVariables(command), + new File(execRoot.getPathString())); + cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true); + + // Execute throws a CommandException on non-zero return values, so action has succeeded. + ImmutableList outErrDigests = + cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray())); + ActionResult.Builder result = + ActionResult.newBuilder() + .setReturnCode(0) + .setStdoutDigest(outErrDigests.get(0)) + .setStderrDigest(outErrDigests.get(1)); + cache.uploadAllResults(execRoot, outputs, result); + cache.setCachedActionResult(ContentDigests.computeActionKey(action), result.build()); + return ExecuteReply.newBuilder() + .setResult(result) + .setStatus(ExecutionStatus.newBuilder().setExecuted(true).setSucceeded(true)) + .build(); + } catch (CommandException e) { + ImmutableList outErrDigests = + cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray())); + final int returnCode = + e instanceof AbnormalTerminationException + ? ((AbnormalTerminationException) e).getResult().getTerminationStatus().getExitCode() + : -1; + return ExecuteReply.newBuilder() + .setResult( + ActionResult.newBuilder() + .setReturnCode(returnCode) + .setStdoutDigest(outErrDigests.get(0)) + .setStderrDigest(outErrDigests.get(1))) + .setStatus( + ExecutionStatus.newBuilder() + .setExecuted(true) + .setSucceeded(false) + .setError(ExecutionStatus.ErrorCode.EXEC_FAILED) + .setErrorDetail(e.toString())) + .build(); + } catch (CacheNotFoundException e) { + LOG.warning("Cache miss on " + ContentDigests.toString(e.getMissingDigest())); + return ExecuteReply.newBuilder() + .setCasError( + CasStatus.newBuilder() + .setSucceeded(false) + .addMissingDigest(e.getMissingDigest()) + .setError(CasStatus.ErrorCode.MISSING_DIGEST) + .setErrorDetail(e.toString())) + .setStatus( + ExecutionStatus.newBuilder() + .setExecuted(false) + .setSucceeded(false) + .setError( + e.getMissingDigest() == action.getCommandDigest() + ? ExecutionStatus.ErrorCode.MISSING_COMMAND + : ExecutionStatus.ErrorCode.MISSING_INPUT) + .setErrorDetail(e.toString())) + .build(); + } + } + @Override - public void executeSynchronously( - RemoteWorkRequest request, StreamObserver responseObserver) { + public void execute(ExecuteRequest request, StreamObserver responseObserver) { Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString()); try { - FileSystemUtils.createDirectoryAndParents(tempRoot); - final ConcurrentMapActionCache actionCache = new ConcurrentMapActionCache(tempRoot, cache); - final MemcacheWorkExecutor workExecutor = - MemcacheWorkExecutor.createLocalWorkExecutor(actionCache, tempRoot); + tempRoot.createDirectory(); if (LOG_FINER) { LOG.fine( "Work received has " - + request.getInputFilesCount() + + request.getTotalInputFileCount() + " input files and " - + request.getOutputFilesCount() + + request.getAction().getOutputPathCount() + " output files."); } - RemoteWorkResponse response = workExecutor.executeLocally(request); - responseObserver.onNext(response); + ExecuteReply reply = execute(request.getAction(), tempRoot); + responseObserver.onNext(reply); if (options.debug) { - if (!response.getSuccess()) { + if (!reply.getStatus().getSucceeded()) { LOG.warning("Work failed. Request: " + request.toString() + "."); - } else if (LOG_FINER) { LOG.fine("Work completed."); } } - if (!options.debug || response.getSuccess()) { + if (!options.debug) { FileSystemUtils.deleteTree(tempRoot); } else { LOG.warning("Preserving work directory " + tempRoot.toString() + "."); } } catch (IOException | InterruptedException e) { - RemoteWorkResponse.Builder response = RemoteWorkResponse.newBuilder(); - response.setSuccess(false).setOut("").setErr("").setException(e.toString()); - responseObserver.onNext(response.build()); + ExecuteReply.Builder reply = ExecuteReply.newBuilder(); + reply.getStatusBuilder().setSucceeded(false).setErrorDetail(e.toString()); + responseObserver.onNext(reply.build()); if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } @@ -120,10 +224,13 @@ public class RemoteWorker extends RemoteWorkImplBase { } System.out.println("*** Starting Hazelcast server."); - ConcurrentMap cache = new HazelcastCacheFactory().create(remoteOptions); + ConcurrentMapActionCache cache = + new ConcurrentMapActionCache(ConcurrentMapFactory.createHazelcast(remoteOptions)); - System.out.println("*** Starting grpc server on all locally bound IPs on port " - + remoteWorkerOptions.listenPort + "."); + System.out.println( + "*** Starting grpc server on all locally bound IPs on port " + + remoteWorkerOptions.listenPort + + "."); Path workPath = getFileSystem().getPath(remoteWorkerOptions.workPath); FileSystemUtils.createDirectoryAndParents(workPath); RemoteWorker worker = new RemoteWorker(workPath, remoteOptions, remoteWorkerOptions, cache); -- cgit v1.2.3