aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/tools
diff options
context:
space:
mode:
authorGravatar ulfjack <ulfjack@google.com>2017-07-05 15:17:18 -0400
committerGravatar John Cater <jcater@google.com>2017-07-06 07:13:39 -0400
commitbc16cc2f49a398d57f68dcd8268b2bb6bd2b3baa (patch)
tree2d5aa737155c20d9b4d0b123a7d547a0394d79cb /src/tools
parent948e519093eb82c217e842eec61a88725689d9dc (diff)
Rewrite the watcher / execution servers to use a thread-pool for execution
Also change them to use ListenableFuture, so multiple watchers can watch the same action execution. PiperOrigin-RevId: 160989277
Diffstat (limited to 'src/tools')
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java320
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java11
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java377
3 files changed, 370 insertions, 338 deletions
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java
index fd9a6f9c86..034ff9ddc5 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java
@@ -14,29 +14,337 @@
package com.google.devtools.build.remote;
+import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.INFO;
+import static java.util.logging.Level.SEVERE;
+import static java.util.logging.Level.WARNING;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.devtools.build.lib.remote.CacheNotFoundException;
+import com.google.devtools.build.lib.remote.Digests;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
+import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
+import com.google.devtools.build.lib.shell.AbnormalTerminationException;
+import com.google.devtools.build.lib.shell.Command;
+import com.google.devtools.build.lib.shell.CommandException;
+import com.google.devtools.build.lib.shell.CommandResult;
+import com.google.devtools.build.lib.shell.TimeoutKillableObserver;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Command.EnvironmentVariable;
import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
+import com.google.devtools.remoteexecution.v1test.Platform;
import com.google.longrunning.Operation;
+import com.google.protobuf.Duration;
+import com.google.protobuf.util.Durations;
+import com.google.rpc.Code;
+import com.google.rpc.Status;
+import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/** A basic implementation of an {@link ExecutionImplBase} service. */
final class ExecutionServer extends ExecutionImplBase {
- private final ConcurrentHashMap<String, ExecuteRequest> operationsCache;
+ private static final Logger logger = Logger.getLogger(ExecutionServer.class.getName());
+
+ // The name of the container image entry in the Platform proto
+ // (see third_party/googleapis/devtools/remoteexecution/*/remote_execution.proto and
+ // experimental_remote_platform_override in
+ // src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java)
+ private static final String CONTAINER_IMAGE_ENTRY_NAME = "container-image";
+
+ // How long to wait for the uid command.
+ private static final Duration uidTimeout = Durations.fromMicros(30);
+
+ private static final int LOCAL_EXEC_ERROR = -1;
- public ExecutionServer(ConcurrentHashMap<String, ExecuteRequest> operationsCache) {
+ private final Path workPath;
+ private final Path sandboxPath;
+ private final RemoteWorkerOptions workerOptions;
+ private final SimpleBlobStoreActionCache cache;
+ private final ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache;
+ private final ListeningExecutorService executorService =
+ MoreExecutors.listeningDecorator(
+ new ThreadPoolExecutor(0, 8, 1000, TimeUnit.SECONDS, new LinkedBlockingQueue<>()));
+
+ public ExecutionServer(
+ Path workPath,
+ Path sandboxPath,
+ RemoteWorkerOptions workerOptions,
+ SimpleBlobStoreActionCache cache,
+ ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache) {
+ this.workPath = workPath;
+ this.sandboxPath = sandboxPath;
+ this.workerOptions = workerOptions;
+ this.cache = cache;
this.operationsCache = operationsCache;
}
@Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
- // Defer the actual action execution to the Watcher.watch request.
- // There are a lot of errors for which we could fail early here, but deferring them all is
- // simpler.
final String opName = UUID.randomUUID().toString();
- operationsCache.put(opName, request);
+ ListenableFuture<ActionResult> future = executorService.submit(new Callable<ActionResult>() {
+ @Override
+ public ActionResult call() throws Exception {
+ return execute(request, opName);
+ }
+ });
+ operationsCache.put(opName, future);
responseObserver.onNext(Operation.newBuilder().setName(opName).build());
responseObserver.onCompleted();
}
+
+ private ActionResult execute(ExecuteRequest request, String id)
+ throws IOException, InterruptedException, CacheNotFoundException {
+ Path tempRoot = workPath.getRelative("build-" + id);
+ try {
+ tempRoot.createDirectory();
+ logger.log(
+ FINE,
+ "Work received has {0} input files and {1} output files.",
+ new Object[]{
+ request.getTotalInputFileCount(), request.getAction().getOutputFilesCount()
+ });
+ ActionResult result = execute(request.getAction(), tempRoot);
+ logger.log(INFO, "Completed {0}.", id);
+ return result;
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, "Work failed.", e);
+ throw e;
+ } finally {
+ if (workerOptions.debug) {
+ logger.log(INFO, "Preserving work directory {0}.", tempRoot);
+ } else {
+ try {
+ FileSystemUtils.deleteTree(tempRoot);
+ } catch (IOException e) {
+ logger.log(SEVERE,
+ String.format(
+ "Failed to delete tmp directory %s: %s",
+ tempRoot, Throwables.getStackTraceAsString(e)));
+ }
+ }
+ }
+ }
+
+ private ActionResult execute(Action action, Path execRoot)
+ throws IOException, InterruptedException, CacheNotFoundException {
+ ByteArrayOutputStream stdoutBuffer = new ByteArrayOutputStream();
+ ByteArrayOutputStream stderrBuffer = new ByteArrayOutputStream();
+ com.google.devtools.remoteexecution.v1test.Command command =
+ com.google.devtools.remoteexecution.v1test.Command.parseFrom(
+ cache.downloadBlob(action.getCommandDigest()));
+ cache.downloadTree(action.getInputRootDigest(), execRoot);
+
+ List<Path> outputs = new ArrayList<>(action.getOutputFilesList().size());
+ for (String output : action.getOutputFilesList()) {
+ Path file = execRoot.getRelative(output);
+ if (file.exists()) {
+ throw new FileAlreadyExistsException("Output file already exists: " + file);
+ }
+ FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
+ outputs.add(file);
+ }
+ // TODO(olaola): support output directories.
+
+ // TODO(ulfjack): This is basically a copy of LocalSpawnRunner. Ideally, we'd use that
+ // implementation instead of copying it.
+ Command cmd =
+ getCommand(
+ action,
+ command.getArgumentsList(),
+ getEnvironmentVariables(command),
+ execRoot.getPathString());
+ long startTime = System.currentTimeMillis();
+ CommandResult cmdResult = null;
+ try {
+ cmdResult =
+ cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdoutBuffer, stderrBuffer, true);
+ } catch (AbnormalTerminationException e) {
+ cmdResult = e.getResult();
+ } catch (CommandException e) {
+ // At the time this comment was written, this must be a ExecFailedException encapsulating
+ // an IOException from the underlying Subprocess.Factory.
+ cmdResult = null;
+ }
+ long timeoutMillis = TimeUnit.MINUTES.toMillis(15);
+ // TODO(ulfjack): Timeout is specified in ExecuteRequest, but not passed in yet.
+ boolean wasTimeout =
+ (cmdResult != null && cmdResult.getTerminationStatus().timedout())
+ || wasTimeout(timeoutMillis, System.currentTimeMillis() - startTime);
+ final int exitCode;
+ if (wasTimeout) {
+ final String errMessage =
+ String.format(
+ "Command:\n%s\nexceeded deadline of %f seconds.",
+ Arrays.toString(command.getArgumentsList().toArray()), timeoutMillis / 1000.0);
+ logger.warning(errMessage);
+ throw StatusProto.toStatusRuntimeException(
+ Status.newBuilder()
+ .setCode(Code.DEADLINE_EXCEEDED.getNumber())
+ .setMessage(errMessage)
+ .build());
+ } else if (cmdResult == null) {
+ exitCode = LOCAL_EXEC_ERROR;
+ } else {
+ exitCode = cmdResult.getTerminationStatus().getRawExitCode();
+ }
+
+ ActionResult.Builder result = ActionResult.newBuilder();
+ cache.upload(result, execRoot, outputs);
+ byte[] stdout = stdoutBuffer.toByteArray();
+ byte[] stderr = stderrBuffer.toByteArray();
+ cache.uploadOutErr(result, stdout, stderr);
+ ActionResult finalResult = result.setExitCode(exitCode).build();
+ if (exitCode == 0) {
+ ActionKey actionKey = Digests.computeActionKey(action);
+ cache.setCachedActionResult(actionKey, finalResult);
+ }
+ return finalResult;
+ }
+
+ private boolean wasTimeout(long timeoutMillis, long wallTimeMillis) {
+ return timeoutMillis > 0 && wallTimeMillis > timeoutMillis;
+ }
+
+ private Map<String, String> getEnvironmentVariables(
+ com.google.devtools.remoteexecution.v1test.Command command) {
+ HashMap<String, String> result = new HashMap<>();
+ for (EnvironmentVariable v : command.getEnvironmentVariablesList()) {
+ result.put(v.getName(), v.getValue());
+ }
+ return result;
+ }
+
+ // Gets the uid of the current user. If uid could not be successfully fetched (e.g., on other
+ // platforms, if for some reason the timeout was not met, if "id -u" returned non-numeric
+ // number, etc), logs a WARNING and return -1.
+ // This is used to set "-u UID" flag for commands running inside Docker containers. There are
+ // only a small handful of cases where uid is vital (e.g., if strict permissions are set on the
+ // output files), so most use cases would work without setting uid.
+ private long getUid() {
+ Command cmd = new Command(new String[] {"id", "-u"});
+ try {
+ ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+ cmd.execute(
+ Command.NO_INPUT,
+ new TimeoutKillableObserver(com.google.protobuf.util.Durations.toMicros(uidTimeout)),
+ stdout,
+ stderr);
+ return Long.parseLong(stdout.toString().trim());
+ } catch (CommandException | NumberFormatException e) {
+ logger.log(
+ WARNING, "Could not get UID for passing to Docker container. Proceeding without it.", e);
+ return -1;
+ }
+ }
+
+ // Checks Action for docker container definition. If no docker container specified, returns
+ // null. Otherwise returns docker container name from the parameters.
+ private String dockerContainer(Action action) {
+ String result = null;
+ for (Platform.Property property : action.getPlatform().getPropertiesList()) {
+ if (property.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) {
+ if (result != null) {
+ // Multiple container name entries
+ throw new IllegalArgumentException(
+ "Multiple entries for " + CONTAINER_IMAGE_ENTRY_NAME + " in action.Platform");
+ }
+ result = property.getValue();
+ }
+ }
+ return result;
+ }
+
+ // Takes an Action and parameters that can be used to create a Command. Returns the Command.
+ // If no docker container is specified inside Action, creates a Command straight from the
+ // arguments. Otherwise, returns a Command that would run the specified command inside the
+ // specified docker container.
+ private Command getCommand(
+ Action action,
+ List<String> commandLineElements,
+ Map<String, String> environmentVariables,
+ String pathString) {
+ String container = dockerContainer(action);
+ if (container != null) {
+ // Run command inside a docker container.
+ ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.size());
+ newCommandLineElements.add("docker");
+ newCommandLineElements.add("run");
+
+ long uid = getUid();
+ if (uid >= 0) {
+ newCommandLineElements.add("-u");
+ newCommandLineElements.add(Long.toString(uid));
+ }
+
+ String dockerPathString = pathString + "-docker";
+ newCommandLineElements.add("-v");
+ newCommandLineElements.add(pathString + ":" + dockerPathString);
+ newCommandLineElements.add("-w");
+ newCommandLineElements.add(dockerPathString);
+
+ for (Map.Entry<String, String> entry : environmentVariables.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ newCommandLineElements.add("-e");
+ newCommandLineElements.add(key + "=" + value);
+ }
+
+ newCommandLineElements.add(container);
+
+ newCommandLineElements.addAll(commandLineElements);
+
+ return new Command(newCommandLineElements.toArray(new String[0]), null, new File(pathString));
+ } else if (sandboxPath != null) {
+ // Run command with sandboxing.
+ ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.size());
+ newCommandLineElements.add(sandboxPath.getPathString());
+ if (workerOptions.sandboxingBlockNetwork) {
+ newCommandLineElements.add("-N");
+ }
+ for (String writablePath : workerOptions.sandboxingWritablePaths) {
+ newCommandLineElements.add("-w");
+ newCommandLineElements.add(writablePath);
+ }
+ for (String tmpfsDir : workerOptions.sandboxingTmpfsDirs) {
+ newCommandLineElements.add("-e");
+ newCommandLineElements.add(tmpfsDir);
+ }
+ newCommandLineElements.add("--");
+ newCommandLineElements.addAll(commandLineElements);
+ return new Command(
+ newCommandLineElements.toArray(new String[0]),
+ environmentVariables,
+ new File(pathString));
+ } else {
+ // Just run the command.
+ return new Command(
+ commandLineElements.toArray(new String[0]), environmentVariables, new File(pathString));
+ }
+ }
}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
index f245ae61a8..10da520e9b 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
@@ -23,6 +23,7 @@ import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.remote.RemoteOptions;
import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
import com.google.devtools.build.lib.remote.SimpleBlobStoreFactory;
@@ -43,8 +44,8 @@ import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.common.options.OptionsParser;
import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
-import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
import io.grpc.Server;
@@ -102,10 +103,12 @@ public final class RemoteWorker {
this.casServer = new CasServer(cache);
if (workerOptions.workPath != null) {
- ConcurrentHashMap<String, ExecuteRequest> operationsCache = new ConcurrentHashMap<>();
+ ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache =
+ new ConcurrentHashMap<>();
FileSystemUtils.createDirectoryAndParents(workPath);
- watchServer = new WatcherServer(workPath, cache, workerOptions, operationsCache, sandboxPath);
- execServer = new ExecutionServer(operationsCache);
+ watchServer = new WatcherServer(operationsCache);
+ execServer =
+ new ExecutionServer(workPath, sandboxPath, workerOptions, cache, operationsCache);
} else {
watchServer = null;
execServer = null;
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java
index 3449e6e417..1bbd112c15 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java
@@ -14,51 +14,23 @@
package com.google.devtools.build.remote;
-import static java.util.logging.Level.FINE;
-import static java.util.logging.Level.INFO;
-import static java.util.logging.Level.WARNING;
-
import com.google.common.base.Throwables;
-import com.google.devtools.build.lib.remote.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.Digests;
-import com.google.devtools.build.lib.remote.Digests.ActionKey;
-import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
-import com.google.devtools.build.lib.shell.AbnormalTerminationException;
-import com.google.devtools.build.lib.shell.Command;
-import com.google.devtools.build.lib.shell.CommandException;
-import com.google.devtools.build.lib.shell.CommandResult;
-import com.google.devtools.build.lib.shell.TimeoutKillableObserver;
-import com.google.devtools.build.lib.vfs.FileSystemUtils;
-import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.remoteexecution.v1test.ActionResult;
-import com.google.devtools.remoteexecution.v1test.Command.EnvironmentVariable;
-import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
-import com.google.devtools.remoteexecution.v1test.Platform;
import com.google.longrunning.Operation;
import com.google.protobuf.Any;
-import com.google.protobuf.Duration;
-import com.google.protobuf.util.Durations;
import com.google.rpc.Code;
import com.google.rpc.Status;
import com.google.watcher.v1.Change;
import com.google.watcher.v1.ChangeBatch;
import com.google.watcher.v1.Request;
import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
-import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileAlreadyExistsException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -66,238 +38,17 @@ import java.util.logging.Logger;
final class WatcherServer extends WatcherImplBase {
private static final Logger logger = Logger.getLogger(WatcherServer.class.getName());
- // The name of the container image entry in the Platform proto
- // (see third_party/googleapis/devtools/remoteexecution/*/remote_execution.proto and
- // experimental_remote_platform_override in
- // src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java)
- private static final String CONTAINER_IMAGE_ENTRY_NAME = "container-image";
-
- // How long to wait for the uid command.
- private static final Duration uidTimeout = Durations.fromMicros(30);
-
- private static final int LOCAL_EXEC_ERROR = -1;
-
- private final Path workPath;
- private final SimpleBlobStoreActionCache cache;
- private final RemoteWorkerOptions workerOptions;
- private final ConcurrentHashMap<String, ExecuteRequest> operationsCache;
- private final Path sandboxPath;
+ private final ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache;
- public WatcherServer(
- Path workPath,
- SimpleBlobStoreActionCache cache,
- RemoteWorkerOptions workerOptions,
- ConcurrentHashMap<String, ExecuteRequest> operationsCache,
- Path sandboxPath) {
- this.workPath = workPath;
- this.cache = cache;
- this.workerOptions = workerOptions;
+ public WatcherServer(ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache) {
this.operationsCache = operationsCache;
- this.sandboxPath = sandboxPath;
- }
-
- private Map<String, String> getEnvironmentVariables(
- com.google.devtools.remoteexecution.v1test.Command command) {
- HashMap<String, String> result = new HashMap<>();
- for (EnvironmentVariable v : command.getEnvironmentVariablesList()) {
- result.put(v.getName(), v.getValue());
- }
- return result;
- }
-
- // Gets the uid of the current user. If uid could not be successfully fetched (e.g., on other
- // platforms, if for some reason the timeout was not met, if "id -u" returned non-numeric
- // number, etc), logs a WARNING and return -1.
- // This is used to set "-u UID" flag for commands running inside Docker containers. There are
- // only a small handful of cases where uid is vital (e.g., if strict permissions are set on the
- // output files), so most use cases would work without setting uid.
- private long getUid() {
- Command cmd = new Command(new String[] {"id", "-u"});
- try {
- ByteArrayOutputStream stdout = new ByteArrayOutputStream();
- ByteArrayOutputStream stderr = new ByteArrayOutputStream();
- cmd.execute(
- Command.NO_INPUT,
- new TimeoutKillableObserver(Durations.toMicros(uidTimeout)),
- stdout,
- stderr);
- return Long.parseLong(stdout.toString().trim());
- } catch (CommandException | NumberFormatException e) {
- logger.log(
- WARNING, "Could not get UID for passing to Docker container. Proceeding without it.", e);
- return -1;
- }
- }
-
- // Checks Action for docker container definition. If no docker container specified, returns
- // null. Otherwise returns docker container name from the parameters.
- private String dockerContainer(Action action) throws IllegalArgumentException {
- String result = null;
- for (Platform.Property property : action.getPlatform().getPropertiesList()) {
- if (property.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) {
- if (result != null) {
- // Multiple container name entries
- throw new IllegalArgumentException(
- "Multiple entries for " + CONTAINER_IMAGE_ENTRY_NAME + " in action.Platform");
- }
- result = property.getValue();
- }
- }
- return result;
- }
-
- // Takes an Action and parameters that can be used to create a Command. Returns the Command.
- // If no docker container is specified inside Action, creates a Command straight from the
- // arguments. Otherwise, returns a Command that would run the specified command inside the
- // specified docker container.
- private Command getCommand(
- Action action,
- String[] commandLineElements,
- Map<String, String> environmentVariables,
- String pathString)
- throws IllegalArgumentException {
- String container = dockerContainer(action);
- if (container != null) {
- // Run command inside a docker container.
- ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.length);
- newCommandLineElements.add("docker");
- newCommandLineElements.add("run");
-
- long uid = getUid();
- if (uid >= 0) {
- newCommandLineElements.add("-u");
- newCommandLineElements.add(Long.toString(uid));
- }
-
- String dockerPathString = pathString + "-docker";
- newCommandLineElements.add("-v");
- newCommandLineElements.add(pathString + ":" + dockerPathString);
- newCommandLineElements.add("-w");
- newCommandLineElements.add(dockerPathString);
-
- for (Map.Entry<String, String> entry : environmentVariables.entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
-
- newCommandLineElements.add("-e");
- newCommandLineElements.add(key + "=" + value);
- }
-
- newCommandLineElements.add(container);
-
- newCommandLineElements.addAll(Arrays.asList(commandLineElements));
-
- return new Command(newCommandLineElements.toArray(new String[0]), null, new File(pathString));
- } else if (sandboxPath != null) {
- // Run command with sandboxing.
- ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.length);
- newCommandLineElements.add(sandboxPath.getPathString());
- if (workerOptions.sandboxingBlockNetwork) {
- newCommandLineElements.add("-N");
- }
- for (String writablePath : workerOptions.sandboxingWritablePaths) {
- newCommandLineElements.add("-w");
- newCommandLineElements.add(writablePath);
- }
- for (String tmpfsDir : workerOptions.sandboxingTmpfsDirs) {
- newCommandLineElements.add("-e");
- newCommandLineElements.add(tmpfsDir);
- }
- newCommandLineElements.add("--");
- newCommandLineElements.addAll(Arrays.asList(commandLineElements));
- return new Command(
- newCommandLineElements.toArray(new String[0]),
- environmentVariables,
- new File(pathString));
- } else {
- // Just run the command.
- return new Command(commandLineElements, environmentVariables, new File(pathString));
- }
- }
-
- public ActionResult execute(Action action, Path execRoot)
- throws IOException, InterruptedException, IllegalArgumentException, CacheNotFoundException {
- ByteArrayOutputStream stdoutBuffer = new ByteArrayOutputStream();
- ByteArrayOutputStream stderrBuffer = new ByteArrayOutputStream();
- com.google.devtools.remoteexecution.v1test.Command command =
- com.google.devtools.remoteexecution.v1test.Command.parseFrom(
- cache.downloadBlob(action.getCommandDigest()));
- cache.downloadTree(action.getInputRootDigest(), execRoot);
-
- List<Path> outputs = new ArrayList<>(action.getOutputFilesList().size());
- for (String output : action.getOutputFilesList()) {
- Path file = execRoot.getRelative(output);
- if (file.exists()) {
- throw new FileAlreadyExistsException("Output file already exists: " + file);
- }
- FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
- outputs.add(file);
- }
- // TODO(olaola): support output directories.
-
- // TODO(ulfjack): This is basically a copy of LocalSpawnRunner. Ideally, we'd use that
- // implementation instead of copying it.
- Command cmd =
- getCommand(
- action,
- command.getArgumentsList().toArray(new String[] {}),
- getEnvironmentVariables(command),
- execRoot.getPathString());
- long startTime = System.currentTimeMillis();
- CommandResult cmdResult = null;
- try {
- cmdResult =
- cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdoutBuffer, stderrBuffer, true);
- } catch (AbnormalTerminationException e) {
- cmdResult = e.getResult();
- } catch (CommandException e) {
- // At the time this comment was written, this must be a ExecFailedException encapsulating
- // an IOException from the underlying Subprocess.Factory.
- }
- final int timeoutSeconds = 60 * 15;
- // TODO(ulfjack): Timeout is specified in ExecuteRequest, but not passed in yet.
- boolean wasTimeout =
- cmdResult != null && cmdResult.getTerminationStatus().timedout()
- || wasTimeout(timeoutSeconds, System.currentTimeMillis() - startTime);
- int exitCode;
- if (wasTimeout) {
- final String errMessage =
- String.format(
- "Command:\n%s\nexceeded deadline of %d seconds.",
- Arrays.toString(command.getArgumentsList().toArray()), timeoutSeconds);
- logger.warning(errMessage);
- throw StatusProto.toStatusRuntimeException(
- Status.newBuilder()
- .setCode(Code.DEADLINE_EXCEEDED.getNumber())
- .setMessage(errMessage)
- .build());
- } else if (cmdResult == null) {
- exitCode = LOCAL_EXEC_ERROR;
- } else {
- exitCode = cmdResult.getTerminationStatus().getRawExitCode();
- }
-
- ActionResult.Builder result = ActionResult.newBuilder();
- cache.upload(result, execRoot, outputs);
- byte[] stdout = stdoutBuffer.toByteArray();
- byte[] stderr = stderrBuffer.toByteArray();
- cache.uploadOutErr(result, stdout, stderr);
- ActionResult finalResult = result.setExitCode(exitCode).build();
- if (exitCode == 0) {
- ActionKey actionKey = Digests.computeActionKey(action);
- cache.setCachedActionResult(actionKey, finalResult);
- }
- return finalResult;
- }
-
- private boolean wasTimeout(int timeoutSeconds, long wallTimeMillis) {
- return timeoutSeconds > 0 && wallTimeMillis / 1000.0 > timeoutSeconds;
}
@Override
public void watch(Request wr, StreamObserver<ChangeBatch> responseObserver) {
final String opName = wr.getTarget();
- if (!operationsCache.containsKey(opName)) {
+ ListenableFuture<ActionResult> future = operationsCache.get(opName);
+ if (future == null) {
responseObserver.onError(
StatusProto.toStatusRuntimeException(
Status.newBuilder()
@@ -306,80 +57,50 @@ final class WatcherServer extends WatcherImplBase {
.build()));
return;
}
- ExecuteRequest request = operationsCache.get(opName);
- Path tempRoot = workPath.getRelative("build-" + opName);
- try {
- tempRoot.createDirectory();
- logger.log(
- FINE,
- "Work received has {0} input files and {1} output files.",
- new Object[] {
- request.getTotalInputFileCount(), request.getAction().getOutputFilesCount()
- });
- ActionResult result = execute(request.getAction(), tempRoot);
- responseObserver.onNext(
- ChangeBatch.newBuilder()
- .addChanges(
- Change.newBuilder()
- .setState(Change.State.EXISTS)
- .setData(
- Any.pack(
- Operation.newBuilder()
- .setName(opName)
- .setDone(true)
- .setResponse(
- Any.pack(
- ExecuteResponse.newBuilder().setResult(result).build()))
- .build()))
- .build())
- .build());
- responseObserver.onCompleted();
- } catch (CacheNotFoundException e) {
- logger.log(WARNING, "Cache miss on {0}.", e.getMissingDigest());
- responseObserver.onError(StatusUtils.notFoundError(e.getMissingDigest()));
- } catch (StatusRuntimeException e) {
- // In particular, command DEADLINE_EXCEEDED errors should go in the Operation.error field to
- // distinguish them from gRPC request DEADLINE_EXCEEDED.
- responseObserver.onNext(
- ChangeBatch.newBuilder()
- .addChanges(
- Change.newBuilder()
- .setState(Change.State.EXISTS)
- .setData(
- Any.pack(
- Operation.newBuilder()
- .setName(opName)
- .setError(StatusProto.fromThrowable(e))
- .build()))
- .build())
- .build());
- } catch (IllegalArgumentException e) {
- responseObserver.onError(
- StatusProto.toStatusRuntimeException(
- Status.newBuilder()
- .setCode(Code.INVALID_ARGUMENT.getNumber())
- .setMessage(e.toString())
- .build()));
- } catch (Exception e) {
- logger.log(Level.SEVERE, "Work failed.", e);
- responseObserver.onError(StatusUtils.internalError(e));
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- } finally {
- operationsCache.remove(opName);
- if (workerOptions.debug) {
- logger.log(INFO, "Preserving work directory {0}.", tempRoot);
- } else {
+
+ future.addListener(() -> {
+ try {
try {
- FileSystemUtils.deleteTree(tempRoot);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format(
- "Failed to delete tmp directory %s: %s",
- tempRoot, Throwables.getStackTraceAsString(e)));
+ ActionResult result = future.get();
+ responseObserver.onNext(
+ packExists(
+ Operation.newBuilder()
+ .setName(opName)
+ .setDone(true)
+ .setResponse(
+ Any.pack(ExecuteResponse.newBuilder().setResult(result).build()))));
+ responseObserver.onCompleted();
+ } catch (ExecutionException e) {
+ Throwables.throwIfUnchecked(e.getCause());
+ throw (Exception) e.getCause();
+ }
+ } catch (IllegalArgumentException e) {
+ responseObserver.onError(
+ StatusProto.toStatusRuntimeException(
+ Status.newBuilder()
+ .setCode(Code.INVALID_ARGUMENT.getNumber())
+ .setMessage(e.toString())
+ .build()));
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, "Work failed: " + opName, e);
+ responseObserver.onError(StatusUtils.internalError(e));
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
}
+ } finally {
+ operationsCache.remove(opName);
}
- }
+ }, MoreExecutors.directExecutor());
+ }
+
+ /** Constructs a ChangeBatch with an exists state change that contains the given operation. */
+ private ChangeBatch packExists(Operation.Builder message) {
+ return ChangeBatch.newBuilder()
+ .addChanges(
+ Change.newBuilder()
+ .setState(Change.State.EXISTS)
+ .setData(
+ Any.pack(message.build())))
+ .build();
}
}