aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java
diff options
context:
space:
mode:
authorGravatar Benjamin Peterson <bp@benjamin.pe>2017-08-11 13:34:52 +0200
committerGravatar Marcel Hlopko <hlopko@google.com>2017-08-11 15:43:45 +0200
commit740cd903ee7666a7ddef512c65b8455bde46abae (patch)
tree997d58e3a891c4f14d240aa13cb7d94d9d93d249 /src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java
parenta3acb6630b7e0693b42781b8ec7ff6860efc7c3f (diff)
Refactor persistent workers to use SpawnRunner.
Change the persistent worker spawn strategy to extend AbstractSpawnStrategy and put the actual logic into WorkerSpawnRunner. WorkerTestStrategy is unaffected. I had to extend SpawnPolicy with a speculating() method. Persistent workers need to know if speculation is happening in order to require sandboxing. Additionally, I added java_test rules for the local runner tests and worker tests. See https://github.com/bazelbuild/bazel/issues/3481. NOTE: ulfjack@ made some changes to this change before merging: - changed Reporter to EventHandler; added TODO about its usage - reverted non-semantic indentation change in AbstractSpawnStrategy - reverted a non-semantic indentation change in WorkerSpawnRunner - updated some internal classes to match - removed catch IOException in WorkerSpawnRunner in some cases, removed verboseFailures flag from WorkerSpawnRunner, updated callers - disable some tests on Windows; we were previously not running them, now that we do, they fail :-( Change-Id: I207b3938f0dc84d374ab052d5030020886451d47 PiperOrigin-RevId: 164965398
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java350
1 files changed, 350 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java
new file mode 100644
index 0000000000..a38244ec1b
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java
@@ -0,0 +1,350 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.worker;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.hash.HashCode;
+import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
+import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ActionInputFileCache;
+import com.google.devtools.build.lib.actions.ActionInputHelper;
+import com.google.devtools.build.lib.actions.ExecException;
+import com.google.devtools.build.lib.actions.ExecutionRequirements;
+import com.google.devtools.build.lib.actions.ResourceManager;
+import com.google.devtools.build.lib.actions.ResourceManager.ResourceHandle;
+import com.google.devtools.build.lib.actions.Spawn;
+import com.google.devtools.build.lib.actions.UserExecException;
+import com.google.devtools.build.lib.events.Event;
+import com.google.devtools.build.lib.events.EventHandler;
+import com.google.devtools.build.lib.exec.SpawnResult;
+import com.google.devtools.build.lib.exec.SpawnRunner;
+import com.google.devtools.build.lib.sandbox.SandboxHelpers;
+import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.lib.util.io.FileOutErr;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
+import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * A spawn runner that launches Spawns the first time they are used in a persistent mode and then
+ * shards work over all the processes.
+ */
+final class WorkerSpawnRunner implements SpawnRunner {
+ public static final String ERROR_MESSAGE_PREFIX =
+ "Worker strategy cannot execute this %s action, ";
+ public static final String REASON_NO_FLAGFILE =
+ "because the command-line arguments do not contain at least one @flagfile or --flagfile=";
+ public static final String REASON_NO_TOOLS = "because the action has no tools";
+ public static final String REASON_NO_EXECUTION_INFO =
+ "because the action's execution info does not contain 'supports-workers=1'";
+
+ /** Pattern for @flagfile.txt and --flagfile=flagfile.txt */
+ private static final Pattern FLAG_FILE_PATTERN = Pattern.compile("(?:@|--?flagfile=)(.+)");
+
+ private final Path execRoot;
+ private final WorkerPool workers;
+ private final Multimap<String, String> extraFlags;
+ private final EventHandler reporter;
+ private final SpawnRunner fallbackRunner;
+
+ public WorkerSpawnRunner(
+ Path execRoot,
+ WorkerPool workers,
+ Multimap<String, String> extraFlags,
+ EventHandler reporter,
+ SpawnRunner fallbackRunner) {
+ this.execRoot = execRoot;
+ this.workers = Preconditions.checkNotNull(workers);
+ this.extraFlags = extraFlags;
+ this.reporter = reporter;
+ this.fallbackRunner = fallbackRunner;
+ }
+
+ @Override
+ public SpawnResult exec(Spawn spawn, SpawnExecutionPolicy policy)
+ throws ExecException, IOException, InterruptedException {
+ if (!spawn.getExecutionInfo().containsKey(ExecutionRequirements.SUPPORTS_WORKERS)
+ || !spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_WORKERS).equals("1")) {
+ // TODO(ulfjack): Don't circumvent SpawnExecutionPolicy. Either drop the warning here, or
+ // provide a mechanism in SpawnExectionPolicy to report warnings.
+ reporter.handle(
+ Event.warn(
+ String.format(ERROR_MESSAGE_PREFIX + REASON_NO_EXECUTION_INFO, spawn.getMnemonic())));
+ return fallbackRunner.exec(spawn, policy);
+ }
+
+ policy.report(ProgressStatus.SCHEDULING, "worker");
+ ActionExecutionMetadata owner = spawn.getResourceOwner();
+ try (ResourceHandle handle =
+ ResourceManager.instance().acquireResources(owner, spawn.getLocalResources())) {
+ policy.report(ProgressStatus.EXECUTING, "worker");
+ return actuallyExec(spawn, policy);
+ }
+ }
+
+ private SpawnResult actuallyExec(Spawn spawn, SpawnExecutionPolicy policy)
+ throws ExecException, IOException, InterruptedException {
+ if (Iterables.isEmpty(spawn.getToolFiles())) {
+ throw new UserExecException(
+ String.format(ERROR_MESSAGE_PREFIX + REASON_NO_TOOLS, spawn.getMnemonic()));
+ }
+
+ // We assume that the spawn to be executed always gets at least one @flagfile.txt or
+ // --flagfile=flagfile.txt argument, which contains the flags related to the work itself (as
+ // opposed to start-up options for the executed tool). Thus, we can extract those elements from
+ // its args and put them into the WorkRequest instead.
+ List<String> flagFiles = new ArrayList<>();
+ ImmutableList<String> workerArgs = splitSpawnArgsIntoWorkerArgsAndFlagFiles(spawn, flagFiles);
+ ImmutableMap<String, String> env = spawn.getEnvironment();
+
+ ActionInputFileCache inputFileCache = policy.getActionInputFileCache();
+
+ HashCode workerFilesHash =
+ WorkerFilesHash.getWorkerFilesHash(spawn.getToolFiles(), inputFileCache);
+ Map<PathFragment, Path> inputFiles = SandboxHelpers.getInputFiles(spawn, policy, execRoot);
+ Set<PathFragment> outputFiles = SandboxHelpers.getOutputFiles(spawn);
+
+ WorkerKey key =
+ new WorkerKey(
+ workerArgs,
+ env,
+ execRoot,
+ spawn.getMnemonic(),
+ workerFilesHash,
+ inputFiles,
+ outputFiles,
+ policy.speculating());
+
+ WorkRequest workRequest = createWorkRequest(spawn, policy, flagFiles, inputFileCache);
+
+ long startTime = System.currentTimeMillis();
+ WorkResponse response = execInWorker(key, workRequest, policy);
+ long wallTimeMillis = System.currentTimeMillis() - startTime;
+
+ FileOutErr outErr = policy.getFileOutErr();
+ response.getOutputBytes().writeTo(outErr.getErrorStream());
+
+ return new SpawnResult.Builder()
+ .setExitCode(response.getExitCode())
+ .setStatus(SpawnResult.Status.SUCCESS)
+ .setWallTimeMillis(wallTimeMillis)
+ .build();
+ }
+
+ /**
+ * Splits the command-line arguments of the {@code Spawn} into the part that is used to start the
+ * persistent worker ({@code workerArgs}) and the part that goes into the {@code WorkRequest}
+ * protobuf ({@code flagFiles}).
+ */
+ private ImmutableList<String> splitSpawnArgsIntoWorkerArgsAndFlagFiles(
+ Spawn spawn, List<String> flagFiles) throws UserExecException {
+ ImmutableList.Builder<String> workerArgs = ImmutableList.builder();
+ for (String arg : spawn.getArguments()) {
+ if (FLAG_FILE_PATTERN.matcher(arg).matches()) {
+ flagFiles.add(arg);
+ } else {
+ workerArgs.add(arg);
+ }
+ }
+
+ if (flagFiles.isEmpty()) {
+ throw new UserExecException(
+ String.format(ERROR_MESSAGE_PREFIX + REASON_NO_FLAGFILE, spawn.getMnemonic()));
+ }
+
+ return workerArgs
+ .add("--persistent_worker")
+ .addAll(
+ MoreObjects.firstNonNull(
+ extraFlags.get(spawn.getMnemonic()), ImmutableList.<String>of()))
+ .build();
+ }
+
+ private WorkRequest createWorkRequest(
+ Spawn spawn,
+ SpawnExecutionPolicy policy,
+ List<String> flagfiles,
+ ActionInputFileCache inputFileCache)
+ throws IOException {
+ WorkRequest.Builder requestBuilder = WorkRequest.newBuilder();
+ for (String flagfile : flagfiles) {
+ expandArgument(requestBuilder, flagfile);
+ }
+
+ List<ActionInput> inputs =
+ ActionInputHelper.expandArtifacts(spawn.getInputFiles(), policy.getArtifactExpander());
+
+ for (ActionInput input : inputs) {
+ byte[] digestBytes = inputFileCache.getMetadata(input).getDigest();
+ ByteString digest;
+ if (digestBytes == null) {
+ digest = ByteString.EMPTY;
+ } else {
+ digest = ByteString.copyFromUtf8(HashCode.fromBytes(digestBytes).toString());
+ }
+
+ requestBuilder
+ .addInputsBuilder()
+ .setPath(input.getExecPathString())
+ .setDigest(digest)
+ .build();
+ }
+ return requestBuilder.build();
+ }
+
+ /**
+ * Recursively expands arguments by replacing @filename args with the contents of the referenced
+ * files. The @ itself can be escaped with @@. This deliberately does not expand --flagfile= style
+ * arguments, because we want to get rid of the expansion entirely at some point in time.
+ *
+ * @param requestBuilder the WorkRequest.Builder that the arguments should be added to.
+ * @param arg the argument to expand.
+ * @throws java.io.IOException if one of the files containing options cannot be read.
+ */
+ private void expandArgument(WorkRequest.Builder requestBuilder, String arg) throws IOException {
+ if (arg.startsWith("@") && !arg.startsWith("@@")) {
+ for (String line : Files.readAllLines(
+ Paths.get(execRoot.getRelative(arg.substring(1)).getPathString()), UTF_8)) {
+ if (line.length() > 0) {
+ expandArgument(requestBuilder, line);
+ }
+ }
+ } else {
+ requestBuilder.addArguments(arg);
+ }
+ }
+
+ private WorkResponse execInWorker(WorkerKey key, WorkRequest request, SpawnExecutionPolicy policy)
+ throws InterruptedException, ExecException {
+ Worker worker = null;
+ WorkResponse response;
+
+ try {
+ try {
+ worker = workers.borrowObject(key);
+ } catch (IOException e) {
+ throw new UserExecException(
+ ErrorMessage.builder()
+ .message("IOException while borrowing a worker from the pool:")
+ .exception(e)
+ .build()
+ .toString());
+ }
+
+ try {
+ worker.prepareExecution(key);
+ } catch (IOException e) {
+ throw new UserExecException(
+ ErrorMessage.builder()
+ .message("IOException while preparing the execution environment of a worker:")
+ .logFile(worker.getLogFile())
+ .exception(e)
+ .build()
+ .toString());
+ }
+
+ try {
+ request.writeDelimitedTo(worker.getOutputStream());
+ worker.getOutputStream().flush();
+ } catch (IOException e) {
+ throw new UserExecException(
+ ErrorMessage.builder()
+ .message(
+ "Worker process quit or closed its stdin stream when we tried to send a"
+ + " WorkRequest:")
+ .logFile(worker.getLogFile())
+ .exception(e)
+ .build()
+ .toString());
+ }
+
+ RecordingInputStream recordingStream = new RecordingInputStream(worker.getInputStream());
+ recordingStream.startRecording(4096);
+ try {
+ // response can be null when the worker has already closed stdout at this point and thus the
+ // InputStream is at EOF.
+ response = WorkResponse.parseDelimitedFrom(recordingStream);
+ } catch (IOException e) {
+ // If protobuf couldn't parse the response, try to print whatever the failing worker wrote
+ // to stdout - it's probably a stack trace or some kind of error message that will help the
+ // user figure out why the compiler is failing.
+ recordingStream.readRemaining();
+ throw new UserExecException(
+ ErrorMessage.builder()
+ .message("Worker process returned an unparseable WorkResponse:")
+ .logText(recordingStream.getRecordedDataAsString())
+ .exception(e)
+ .build()
+ .toString());
+ }
+
+ policy.lockOutputFiles();
+
+ if (response == null) {
+ throw new UserExecException(
+ ErrorMessage.builder()
+ .message("Worker process did not return a WorkResponse:")
+ .logFile(worker.getLogFile())
+ .logSizeLimit(4096)
+ .build()
+ .toString());
+ }
+
+ try {
+ worker.finishExecution(key);
+ } catch (IOException e) {
+ throw new UserExecException(
+ ErrorMessage.builder()
+ .message("IOException while finishing worker execution:")
+ .exception(e)
+ .build()
+ .toString());
+ }
+ } catch (ExecException e) {
+ if (worker != null) {
+ try {
+ workers.invalidateObject(key, worker);
+ } catch (IOException e1) {
+ // The original exception is more important / helpful, so we'll just ignore this one.
+ }
+ worker = null;
+ }
+
+ throw e;
+ } finally {
+ if (worker != null) {
+ workers.returnObject(key, worker);
+ }
+ }
+
+ return response;
+ }
+}