diff options
author | ulfjack <ulfjack@google.com> | 2017-07-05 15:17:18 -0400 |
---|---|---|
committer | John Cater <jcater@google.com> | 2017-07-06 07:13:39 -0400 |
commit | bc16cc2f49a398d57f68dcd8268b2bb6bd2b3baa (patch) | |
tree | 2d5aa737155c20d9b4d0b123a7d547a0394d79cb /src/tools | |
parent | 948e519093eb82c217e842eec61a88725689d9dc (diff) |
Rewrite the watcher / execution servers to use a thread-pool for execution
Also change them to use ListenableFuture, so multiple watchers can watch the
same action execution.
PiperOrigin-RevId: 160989277
Diffstat (limited to 'src/tools')
3 files changed, 370 insertions, 338 deletions
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java index fd9a6f9c86..034ff9ddc5 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java @@ -14,29 +14,337 @@ package com.google.devtools.build.remote; +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.SEVERE; +import static java.util.logging.Level.WARNING; + +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.devtools.build.lib.remote.CacheNotFoundException; +import com.google.devtools.build.lib.remote.Digests; +import com.google.devtools.build.lib.remote.Digests.ActionKey; +import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; +import com.google.devtools.build.lib.shell.AbnormalTerminationException; +import com.google.devtools.build.lib.shell.Command; +import com.google.devtools.build.lib.shell.CommandException; +import com.google.devtools.build.lib.shell.CommandResult; +import com.google.devtools.build.lib.shell.TimeoutKillableObserver; +import com.google.devtools.build.lib.vfs.FileSystemUtils; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.remoteexecution.v1test.Action; +import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.Command.EnvironmentVariable; import com.google.devtools.remoteexecution.v1test.ExecuteRequest; import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase; +import com.google.devtools.remoteexecution.v1test.Platform; import com.google.longrunning.Operation; +import com.google.protobuf.Duration; +import com.google.protobuf.util.Durations; +import com.google.rpc.Code; +import com.google.rpc.Status; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; /** A basic implementation of an {@link ExecutionImplBase} service. */ final class ExecutionServer extends ExecutionImplBase { - private final ConcurrentHashMap<String, ExecuteRequest> operationsCache; + private static final Logger logger = Logger.getLogger(ExecutionServer.class.getName()); + + // The name of the container image entry in the Platform proto + // (see third_party/googleapis/devtools/remoteexecution/*/remote_execution.proto and + // experimental_remote_platform_override in + // src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java) + private static final String CONTAINER_IMAGE_ENTRY_NAME = "container-image"; + + // How long to wait for the uid command. + private static final Duration uidTimeout = Durations.fromMicros(30); + + private static final int LOCAL_EXEC_ERROR = -1; - public ExecutionServer(ConcurrentHashMap<String, ExecuteRequest> operationsCache) { + private final Path workPath; + private final Path sandboxPath; + private final RemoteWorkerOptions workerOptions; + private final SimpleBlobStoreActionCache cache; + private final ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache; + private final ListeningExecutorService executorService = + MoreExecutors.listeningDecorator( + new ThreadPoolExecutor(0, 8, 1000, TimeUnit.SECONDS, new LinkedBlockingQueue<>())); + + public ExecutionServer( + Path workPath, + Path sandboxPath, + RemoteWorkerOptions workerOptions, + SimpleBlobStoreActionCache cache, + ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache) { + this.workPath = workPath; + this.sandboxPath = sandboxPath; + this.workerOptions = workerOptions; + this.cache = cache; this.operationsCache = operationsCache; } @Override public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) { - // Defer the actual action execution to the Watcher.watch request. - // There are a lot of errors for which we could fail early here, but deferring them all is - // simpler. final String opName = UUID.randomUUID().toString(); - operationsCache.put(opName, request); + ListenableFuture<ActionResult> future = executorService.submit(new Callable<ActionResult>() { + @Override + public ActionResult call() throws Exception { + return execute(request, opName); + } + }); + operationsCache.put(opName, future); responseObserver.onNext(Operation.newBuilder().setName(opName).build()); responseObserver.onCompleted(); } + + private ActionResult execute(ExecuteRequest request, String id) + throws IOException, InterruptedException, CacheNotFoundException { + Path tempRoot = workPath.getRelative("build-" + id); + try { + tempRoot.createDirectory(); + logger.log( + FINE, + "Work received has {0} input files and {1} output files.", + new Object[]{ + request.getTotalInputFileCount(), request.getAction().getOutputFilesCount() + }); + ActionResult result = execute(request.getAction(), tempRoot); + logger.log(INFO, "Completed {0}.", id); + return result; + } catch (Exception e) { + logger.log(Level.SEVERE, "Work failed.", e); + throw e; + } finally { + if (workerOptions.debug) { + logger.log(INFO, "Preserving work directory {0}.", tempRoot); + } else { + try { + FileSystemUtils.deleteTree(tempRoot); + } catch (IOException e) { + logger.log(SEVERE, + String.format( + "Failed to delete tmp directory %s: %s", + tempRoot, Throwables.getStackTraceAsString(e))); + } + } + } + } + + private ActionResult execute(Action action, Path execRoot) + throws IOException, InterruptedException, CacheNotFoundException { + ByteArrayOutputStream stdoutBuffer = new ByteArrayOutputStream(); + ByteArrayOutputStream stderrBuffer = new ByteArrayOutputStream(); + com.google.devtools.remoteexecution.v1test.Command command = + com.google.devtools.remoteexecution.v1test.Command.parseFrom( + cache.downloadBlob(action.getCommandDigest())); + cache.downloadTree(action.getInputRootDigest(), execRoot); + + List<Path> outputs = new ArrayList<>(action.getOutputFilesList().size()); + for (String output : action.getOutputFilesList()) { + Path file = execRoot.getRelative(output); + if (file.exists()) { + throw new FileAlreadyExistsException("Output file already exists: " + file); + } + FileSystemUtils.createDirectoryAndParents(file.getParentDirectory()); + outputs.add(file); + } + // TODO(olaola): support output directories. + + // TODO(ulfjack): This is basically a copy of LocalSpawnRunner. Ideally, we'd use that + // implementation instead of copying it. + Command cmd = + getCommand( + action, + command.getArgumentsList(), + getEnvironmentVariables(command), + execRoot.getPathString()); + long startTime = System.currentTimeMillis(); + CommandResult cmdResult = null; + try { + cmdResult = + cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdoutBuffer, stderrBuffer, true); + } catch (AbnormalTerminationException e) { + cmdResult = e.getResult(); + } catch (CommandException e) { + // At the time this comment was written, this must be a ExecFailedException encapsulating + // an IOException from the underlying Subprocess.Factory. + cmdResult = null; + } + long timeoutMillis = TimeUnit.MINUTES.toMillis(15); + // TODO(ulfjack): Timeout is specified in ExecuteRequest, but not passed in yet. + boolean wasTimeout = + (cmdResult != null && cmdResult.getTerminationStatus().timedout()) + || wasTimeout(timeoutMillis, System.currentTimeMillis() - startTime); + final int exitCode; + if (wasTimeout) { + final String errMessage = + String.format( + "Command:\n%s\nexceeded deadline of %f seconds.", + Arrays.toString(command.getArgumentsList().toArray()), timeoutMillis / 1000.0); + logger.warning(errMessage); + throw StatusProto.toStatusRuntimeException( + Status.newBuilder() + .setCode(Code.DEADLINE_EXCEEDED.getNumber()) + .setMessage(errMessage) + .build()); + } else if (cmdResult == null) { + exitCode = LOCAL_EXEC_ERROR; + } else { + exitCode = cmdResult.getTerminationStatus().getRawExitCode(); + } + + ActionResult.Builder result = ActionResult.newBuilder(); + cache.upload(result, execRoot, outputs); + byte[] stdout = stdoutBuffer.toByteArray(); + byte[] stderr = stderrBuffer.toByteArray(); + cache.uploadOutErr(result, stdout, stderr); + ActionResult finalResult = result.setExitCode(exitCode).build(); + if (exitCode == 0) { + ActionKey actionKey = Digests.computeActionKey(action); + cache.setCachedActionResult(actionKey, finalResult); + } + return finalResult; + } + + private boolean wasTimeout(long timeoutMillis, long wallTimeMillis) { + return timeoutMillis > 0 && wallTimeMillis > timeoutMillis; + } + + private Map<String, String> getEnvironmentVariables( + com.google.devtools.remoteexecution.v1test.Command command) { + HashMap<String, String> result = new HashMap<>(); + for (EnvironmentVariable v : command.getEnvironmentVariablesList()) { + result.put(v.getName(), v.getValue()); + } + return result; + } + + // Gets the uid of the current user. If uid could not be successfully fetched (e.g., on other + // platforms, if for some reason the timeout was not met, if "id -u" returned non-numeric + // number, etc), logs a WARNING and return -1. + // This is used to set "-u UID" flag for commands running inside Docker containers. There are + // only a small handful of cases where uid is vital (e.g., if strict permissions are set on the + // output files), so most use cases would work without setting uid. + private long getUid() { + Command cmd = new Command(new String[] {"id", "-u"}); + try { + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + cmd.execute( + Command.NO_INPUT, + new TimeoutKillableObserver(com.google.protobuf.util.Durations.toMicros(uidTimeout)), + stdout, + stderr); + return Long.parseLong(stdout.toString().trim()); + } catch (CommandException | NumberFormatException e) { + logger.log( + WARNING, "Could not get UID for passing to Docker container. Proceeding without it.", e); + return -1; + } + } + + // Checks Action for docker container definition. If no docker container specified, returns + // null. Otherwise returns docker container name from the parameters. + private String dockerContainer(Action action) { + String result = null; + for (Platform.Property property : action.getPlatform().getPropertiesList()) { + if (property.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) { + if (result != null) { + // Multiple container name entries + throw new IllegalArgumentException( + "Multiple entries for " + CONTAINER_IMAGE_ENTRY_NAME + " in action.Platform"); + } + result = property.getValue(); + } + } + return result; + } + + // Takes an Action and parameters that can be used to create a Command. Returns the Command. + // If no docker container is specified inside Action, creates a Command straight from the + // arguments. Otherwise, returns a Command that would run the specified command inside the + // specified docker container. + private Command getCommand( + Action action, + List<String> commandLineElements, + Map<String, String> environmentVariables, + String pathString) { + String container = dockerContainer(action); + if (container != null) { + // Run command inside a docker container. + ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.size()); + newCommandLineElements.add("docker"); + newCommandLineElements.add("run"); + + long uid = getUid(); + if (uid >= 0) { + newCommandLineElements.add("-u"); + newCommandLineElements.add(Long.toString(uid)); + } + + String dockerPathString = pathString + "-docker"; + newCommandLineElements.add("-v"); + newCommandLineElements.add(pathString + ":" + dockerPathString); + newCommandLineElements.add("-w"); + newCommandLineElements.add(dockerPathString); + + for (Map.Entry<String, String> entry : environmentVariables.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + + newCommandLineElements.add("-e"); + newCommandLineElements.add(key + "=" + value); + } + + newCommandLineElements.add(container); + + newCommandLineElements.addAll(commandLineElements); + + return new Command(newCommandLineElements.toArray(new String[0]), null, new File(pathString)); + } else if (sandboxPath != null) { + // Run command with sandboxing. + ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.size()); + newCommandLineElements.add(sandboxPath.getPathString()); + if (workerOptions.sandboxingBlockNetwork) { + newCommandLineElements.add("-N"); + } + for (String writablePath : workerOptions.sandboxingWritablePaths) { + newCommandLineElements.add("-w"); + newCommandLineElements.add(writablePath); + } + for (String tmpfsDir : workerOptions.sandboxingTmpfsDirs) { + newCommandLineElements.add("-e"); + newCommandLineElements.add(tmpfsDir); + } + newCommandLineElements.add("--"); + newCommandLineElements.addAll(commandLineElements); + return new Command( + newCommandLineElements.toArray(new String[0]), + environmentVariables, + new File(pathString)); + } else { + // Just run the command. + return new Command( + commandLineElements.toArray(new String[0]), environmentVariables, new File(pathString)); + } + } } diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java index f245ae61a8..10da520e9b 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java @@ -23,6 +23,7 @@ import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; +import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.remote.RemoteOptions; import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; import com.google.devtools.build.lib.remote.SimpleBlobStoreFactory; @@ -43,8 +44,8 @@ import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.common.options.OptionsParser; import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase; +import com.google.devtools.remoteexecution.v1test.ActionResult; import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase; -import com.google.devtools.remoteexecution.v1test.ExecuteRequest; import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase; import com.google.watcher.v1.WatcherGrpc.WatcherImplBase; import io.grpc.Server; @@ -102,10 +103,12 @@ public final class RemoteWorker { this.casServer = new CasServer(cache); if (workerOptions.workPath != null) { - ConcurrentHashMap<String, ExecuteRequest> operationsCache = new ConcurrentHashMap<>(); + ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache = + new ConcurrentHashMap<>(); FileSystemUtils.createDirectoryAndParents(workPath); - watchServer = new WatcherServer(workPath, cache, workerOptions, operationsCache, sandboxPath); - execServer = new ExecutionServer(operationsCache); + watchServer = new WatcherServer(operationsCache); + execServer = + new ExecutionServer(workPath, sandboxPath, workerOptions, cache, operationsCache); } else { watchServer = null; execServer = null; diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java index 3449e6e417..1bbd112c15 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java @@ -14,51 +14,23 @@ package com.google.devtools.build.remote; -import static java.util.logging.Level.FINE; -import static java.util.logging.Level.INFO; -import static java.util.logging.Level.WARNING; - import com.google.common.base.Throwables; -import com.google.devtools.build.lib.remote.CacheNotFoundException; -import com.google.devtools.build.lib.remote.Digests; -import com.google.devtools.build.lib.remote.Digests.ActionKey; -import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; -import com.google.devtools.build.lib.shell.AbnormalTerminationException; -import com.google.devtools.build.lib.shell.Command; -import com.google.devtools.build.lib.shell.CommandException; -import com.google.devtools.build.lib.shell.CommandResult; -import com.google.devtools.build.lib.shell.TimeoutKillableObserver; -import com.google.devtools.build.lib.vfs.FileSystemUtils; -import com.google.devtools.build.lib.vfs.Path; -import com.google.devtools.remoteexecution.v1test.Action; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.remoteexecution.v1test.ActionResult; -import com.google.devtools.remoteexecution.v1test.Command.EnvironmentVariable; -import com.google.devtools.remoteexecution.v1test.ExecuteRequest; import com.google.devtools.remoteexecution.v1test.ExecuteResponse; -import com.google.devtools.remoteexecution.v1test.Platform; import com.google.longrunning.Operation; import com.google.protobuf.Any; -import com.google.protobuf.Duration; -import com.google.protobuf.util.Durations; import com.google.rpc.Code; import com.google.rpc.Status; import com.google.watcher.v1.Change; import com.google.watcher.v1.ChangeBatch; import com.google.watcher.v1.Request; import com.google.watcher.v1.WatcherGrpc.WatcherImplBase; -import io.grpc.StatusRuntimeException; import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.nio.file.FileAlreadyExistsException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.logging.Level; import java.util.logging.Logger; @@ -66,238 +38,17 @@ import java.util.logging.Logger; final class WatcherServer extends WatcherImplBase { private static final Logger logger = Logger.getLogger(WatcherServer.class.getName()); - // The name of the container image entry in the Platform proto - // (see third_party/googleapis/devtools/remoteexecution/*/remote_execution.proto and - // experimental_remote_platform_override in - // src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java) - private static final String CONTAINER_IMAGE_ENTRY_NAME = "container-image"; - - // How long to wait for the uid command. - private static final Duration uidTimeout = Durations.fromMicros(30); - - private static final int LOCAL_EXEC_ERROR = -1; - - private final Path workPath; - private final SimpleBlobStoreActionCache cache; - private final RemoteWorkerOptions workerOptions; - private final ConcurrentHashMap<String, ExecuteRequest> operationsCache; - private final Path sandboxPath; + private final ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache; - public WatcherServer( - Path workPath, - SimpleBlobStoreActionCache cache, - RemoteWorkerOptions workerOptions, - ConcurrentHashMap<String, ExecuteRequest> operationsCache, - Path sandboxPath) { - this.workPath = workPath; - this.cache = cache; - this.workerOptions = workerOptions; + public WatcherServer(ConcurrentHashMap<String, ListenableFuture<ActionResult>> operationsCache) { this.operationsCache = operationsCache; - this.sandboxPath = sandboxPath; - } - - private Map<String, String> getEnvironmentVariables( - com.google.devtools.remoteexecution.v1test.Command command) { - HashMap<String, String> result = new HashMap<>(); - for (EnvironmentVariable v : command.getEnvironmentVariablesList()) { - result.put(v.getName(), v.getValue()); - } - return result; - } - - // Gets the uid of the current user. If uid could not be successfully fetched (e.g., on other - // platforms, if for some reason the timeout was not met, if "id -u" returned non-numeric - // number, etc), logs a WARNING and return -1. - // This is used to set "-u UID" flag for commands running inside Docker containers. There are - // only a small handful of cases where uid is vital (e.g., if strict permissions are set on the - // output files), so most use cases would work without setting uid. - private long getUid() { - Command cmd = new Command(new String[] {"id", "-u"}); - try { - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - cmd.execute( - Command.NO_INPUT, - new TimeoutKillableObserver(Durations.toMicros(uidTimeout)), - stdout, - stderr); - return Long.parseLong(stdout.toString().trim()); - } catch (CommandException | NumberFormatException e) { - logger.log( - WARNING, "Could not get UID for passing to Docker container. Proceeding without it.", e); - return -1; - } - } - - // Checks Action for docker container definition. If no docker container specified, returns - // null. Otherwise returns docker container name from the parameters. - private String dockerContainer(Action action) throws IllegalArgumentException { - String result = null; - for (Platform.Property property : action.getPlatform().getPropertiesList()) { - if (property.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) { - if (result != null) { - // Multiple container name entries - throw new IllegalArgumentException( - "Multiple entries for " + CONTAINER_IMAGE_ENTRY_NAME + " in action.Platform"); - } - result = property.getValue(); - } - } - return result; - } - - // Takes an Action and parameters that can be used to create a Command. Returns the Command. - // If no docker container is specified inside Action, creates a Command straight from the - // arguments. Otherwise, returns a Command that would run the specified command inside the - // specified docker container. - private Command getCommand( - Action action, - String[] commandLineElements, - Map<String, String> environmentVariables, - String pathString) - throws IllegalArgumentException { - String container = dockerContainer(action); - if (container != null) { - // Run command inside a docker container. - ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.length); - newCommandLineElements.add("docker"); - newCommandLineElements.add("run"); - - long uid = getUid(); - if (uid >= 0) { - newCommandLineElements.add("-u"); - newCommandLineElements.add(Long.toString(uid)); - } - - String dockerPathString = pathString + "-docker"; - newCommandLineElements.add("-v"); - newCommandLineElements.add(pathString + ":" + dockerPathString); - newCommandLineElements.add("-w"); - newCommandLineElements.add(dockerPathString); - - for (Map.Entry<String, String> entry : environmentVariables.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - - newCommandLineElements.add("-e"); - newCommandLineElements.add(key + "=" + value); - } - - newCommandLineElements.add(container); - - newCommandLineElements.addAll(Arrays.asList(commandLineElements)); - - return new Command(newCommandLineElements.toArray(new String[0]), null, new File(pathString)); - } else if (sandboxPath != null) { - // Run command with sandboxing. - ArrayList<String> newCommandLineElements = new ArrayList<>(commandLineElements.length); - newCommandLineElements.add(sandboxPath.getPathString()); - if (workerOptions.sandboxingBlockNetwork) { - newCommandLineElements.add("-N"); - } - for (String writablePath : workerOptions.sandboxingWritablePaths) { - newCommandLineElements.add("-w"); - newCommandLineElements.add(writablePath); - } - for (String tmpfsDir : workerOptions.sandboxingTmpfsDirs) { - newCommandLineElements.add("-e"); - newCommandLineElements.add(tmpfsDir); - } - newCommandLineElements.add("--"); - newCommandLineElements.addAll(Arrays.asList(commandLineElements)); - return new Command( - newCommandLineElements.toArray(new String[0]), - environmentVariables, - new File(pathString)); - } else { - // Just run the command. - return new Command(commandLineElements, environmentVariables, new File(pathString)); - } - } - - public ActionResult execute(Action action, Path execRoot) - throws IOException, InterruptedException, IllegalArgumentException, CacheNotFoundException { - ByteArrayOutputStream stdoutBuffer = new ByteArrayOutputStream(); - ByteArrayOutputStream stderrBuffer = new ByteArrayOutputStream(); - com.google.devtools.remoteexecution.v1test.Command command = - com.google.devtools.remoteexecution.v1test.Command.parseFrom( - cache.downloadBlob(action.getCommandDigest())); - cache.downloadTree(action.getInputRootDigest(), execRoot); - - List<Path> outputs = new ArrayList<>(action.getOutputFilesList().size()); - for (String output : action.getOutputFilesList()) { - Path file = execRoot.getRelative(output); - if (file.exists()) { - throw new FileAlreadyExistsException("Output file already exists: " + file); - } - FileSystemUtils.createDirectoryAndParents(file.getParentDirectory()); - outputs.add(file); - } - // TODO(olaola): support output directories. - - // TODO(ulfjack): This is basically a copy of LocalSpawnRunner. Ideally, we'd use that - // implementation instead of copying it. - Command cmd = - getCommand( - action, - command.getArgumentsList().toArray(new String[] {}), - getEnvironmentVariables(command), - execRoot.getPathString()); - long startTime = System.currentTimeMillis(); - CommandResult cmdResult = null; - try { - cmdResult = - cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdoutBuffer, stderrBuffer, true); - } catch (AbnormalTerminationException e) { - cmdResult = e.getResult(); - } catch (CommandException e) { - // At the time this comment was written, this must be a ExecFailedException encapsulating - // an IOException from the underlying Subprocess.Factory. - } - final int timeoutSeconds = 60 * 15; - // TODO(ulfjack): Timeout is specified in ExecuteRequest, but not passed in yet. - boolean wasTimeout = - cmdResult != null && cmdResult.getTerminationStatus().timedout() - || wasTimeout(timeoutSeconds, System.currentTimeMillis() - startTime); - int exitCode; - if (wasTimeout) { - final String errMessage = - String.format( - "Command:\n%s\nexceeded deadline of %d seconds.", - Arrays.toString(command.getArgumentsList().toArray()), timeoutSeconds); - logger.warning(errMessage); - throw StatusProto.toStatusRuntimeException( - Status.newBuilder() - .setCode(Code.DEADLINE_EXCEEDED.getNumber()) - .setMessage(errMessage) - .build()); - } else if (cmdResult == null) { - exitCode = LOCAL_EXEC_ERROR; - } else { - exitCode = cmdResult.getTerminationStatus().getRawExitCode(); - } - - ActionResult.Builder result = ActionResult.newBuilder(); - cache.upload(result, execRoot, outputs); - byte[] stdout = stdoutBuffer.toByteArray(); - byte[] stderr = stderrBuffer.toByteArray(); - cache.uploadOutErr(result, stdout, stderr); - ActionResult finalResult = result.setExitCode(exitCode).build(); - if (exitCode == 0) { - ActionKey actionKey = Digests.computeActionKey(action); - cache.setCachedActionResult(actionKey, finalResult); - } - return finalResult; - } - - private boolean wasTimeout(int timeoutSeconds, long wallTimeMillis) { - return timeoutSeconds > 0 && wallTimeMillis / 1000.0 > timeoutSeconds; } @Override public void watch(Request wr, StreamObserver<ChangeBatch> responseObserver) { final String opName = wr.getTarget(); - if (!operationsCache.containsKey(opName)) { + ListenableFuture<ActionResult> future = operationsCache.get(opName); + if (future == null) { responseObserver.onError( StatusProto.toStatusRuntimeException( Status.newBuilder() @@ -306,80 +57,50 @@ final class WatcherServer extends WatcherImplBase { .build())); return; } - ExecuteRequest request = operationsCache.get(opName); - Path tempRoot = workPath.getRelative("build-" + opName); - try { - tempRoot.createDirectory(); - logger.log( - FINE, - "Work received has {0} input files and {1} output files.", - new Object[] { - request.getTotalInputFileCount(), request.getAction().getOutputFilesCount() - }); - ActionResult result = execute(request.getAction(), tempRoot); - responseObserver.onNext( - ChangeBatch.newBuilder() - .addChanges( - Change.newBuilder() - .setState(Change.State.EXISTS) - .setData( - Any.pack( - Operation.newBuilder() - .setName(opName) - .setDone(true) - .setResponse( - Any.pack( - ExecuteResponse.newBuilder().setResult(result).build())) - .build())) - .build()) - .build()); - responseObserver.onCompleted(); - } catch (CacheNotFoundException e) { - logger.log(WARNING, "Cache miss on {0}.", e.getMissingDigest()); - responseObserver.onError(StatusUtils.notFoundError(e.getMissingDigest())); - } catch (StatusRuntimeException e) { - // In particular, command DEADLINE_EXCEEDED errors should go in the Operation.error field to - // distinguish them from gRPC request DEADLINE_EXCEEDED. - responseObserver.onNext( - ChangeBatch.newBuilder() - .addChanges( - Change.newBuilder() - .setState(Change.State.EXISTS) - .setData( - Any.pack( - Operation.newBuilder() - .setName(opName) - .setError(StatusProto.fromThrowable(e)) - .build())) - .build()) - .build()); - } catch (IllegalArgumentException e) { - responseObserver.onError( - StatusProto.toStatusRuntimeException( - Status.newBuilder() - .setCode(Code.INVALID_ARGUMENT.getNumber()) - .setMessage(e.toString()) - .build())); - } catch (Exception e) { - logger.log(Level.SEVERE, "Work failed.", e); - responseObserver.onError(StatusUtils.internalError(e)); - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - } finally { - operationsCache.remove(opName); - if (workerOptions.debug) { - logger.log(INFO, "Preserving work directory {0}.", tempRoot); - } else { + + future.addListener(() -> { + try { try { - FileSystemUtils.deleteTree(tempRoot); - } catch (IOException e) { - throw new RuntimeException( - String.format( - "Failed to delete tmp directory %s: %s", - tempRoot, Throwables.getStackTraceAsString(e))); + ActionResult result = future.get(); + responseObserver.onNext( + packExists( + Operation.newBuilder() + .setName(opName) + .setDone(true) + .setResponse( + Any.pack(ExecuteResponse.newBuilder().setResult(result).build())))); + responseObserver.onCompleted(); + } catch (ExecutionException e) { + Throwables.throwIfUnchecked(e.getCause()); + throw (Exception) e.getCause(); + } + } catch (IllegalArgumentException e) { + responseObserver.onError( + StatusProto.toStatusRuntimeException( + Status.newBuilder() + .setCode(Code.INVALID_ARGUMENT.getNumber()) + .setMessage(e.toString()) + .build())); + } catch (Exception e) { + logger.log(Level.SEVERE, "Work failed: " + opName, e); + responseObserver.onError(StatusUtils.internalError(e)); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); } + } finally { + operationsCache.remove(opName); } - } + }, MoreExecutors.directExecutor()); + } + + /** Constructs a ChangeBatch with an exists state change that contains the given operation. */ + private ChangeBatch packExists(Operation.Builder message) { + return ChangeBatch.newBuilder() + .addChanges( + Change.newBuilder() + .setState(Change.State.EXISTS) + .setData( + Any.pack(message.build()))) + .build(); } } |