aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Googler <noreply@google.com>2018-04-20 08:32:04 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-04-20 08:34:00 -0700
commit2396db3f656781b038ca4fda9b41295d860a88c7 (patch)
treeed512bb13f64eb40d10b1778971fad54366be780
parent63258233946d88feea3b414113aaade3d4145660 (diff)
Blaze: add --high_priority_workers flag.
When two or more instances of high priority workers are running, no other worker execution may start. Tweak WorkerSpawnRunner. Only reserve resources for a worker after it has been obtained from the pool. This allows us to block waiting for high priority workers to have finished before resources are reserved. RELNOTES: Add --high_priority_workers flag. PiperOrigin-RevId: 193672343
-rw-r--r--src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java2
-rw-r--r--src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java11
-rw-r--r--src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java78
-rw-r--r--src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java113
4 files changed, 141 insertions, 63 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java
index 650b85ad58..716be4dcbf 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java
@@ -105,7 +105,7 @@ public class WorkerModule extends BlazeModule {
if (workerPool == null) {
workerPoolConfig = newConfig;
- workerPool = new WorkerPool(workerFactory, workerPoolConfig);
+ workerPool = new WorkerPool(workerFactory, workerPoolConfig, options.highPriorityWorkers);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java
index 1625b0f6a3..d0bb6749e3 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java
@@ -56,6 +56,17 @@ public class WorkerOptions extends OptionsBase {
public int workerMaxInstances;
@Option(
+ name = "high_priority_workers",
+ defaultValue = "",
+ documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
+ effectTags = {OptionEffectTag.UNKNOWN},
+ help =
+ "Mnemonics of workers to run with high priority. When high priority workers are running "
+ + "all other workers are throttled.",
+ allowMultiple = true)
+ public List<String> highPriorityWorkers;
+
+ @Option(
name = "worker_quit_after_build",
defaultValue = "false",
documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
index d1eb29e93c..9b3afbbdda 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
@@ -14,13 +14,12 @@
package com.google.devtools.build.lib.worker;
import com.google.common.base.Throwables;
-
-import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
-import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
-
+import com.google.common.collect.ImmutableSet;
import java.io.IOException;
-
+import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
/**
* A worker pool that spawns multiple workers and delegates work to them.
@@ -30,23 +29,65 @@ import javax.annotation.concurrent.ThreadSafe;
*/
@ThreadSafe
final class WorkerPool extends GenericKeyedObjectPool<WorkerKey, Worker> {
+ private final AtomicInteger highPriorityWorkersInUse = new AtomicInteger(0);
+ private final ImmutableSet<String> highPriorityWorkerMnemonics;
- public WorkerPool(WorkerFactory factory, GenericKeyedObjectPoolConfig config) {
+ /**
+ * @param factory worker factory
+ * @param config pool configuration
+ * @param highPriorityWorkers mnemonics of high priority workers
+ */
+ public WorkerPool(
+ WorkerFactory factory,
+ GenericKeyedObjectPoolConfig config,
+ Iterable<String> highPriorityWorkers) {
super(factory, config);
+ highPriorityWorkerMnemonics = ImmutableSet.copyOf(highPriorityWorkers);
}
+ /**
+ * Gets a worker.
+ *
+ * @param key worker key
+ * @return a worker
+ */
@Override
public Worker borrowObject(WorkerKey key) throws IOException, InterruptedException {
+ Worker result;
try {
- return super.borrowObject(key);
+ result = super.borrowObject(key);
} catch (Throwable t) {
Throwables.propagateIfPossible(t, IOException.class, InterruptedException.class);
throw new RuntimeException("unexpected", t);
}
+
+ if (highPriorityWorkerMnemonics.contains(key.getMnemonic())) {
+ highPriorityWorkersInUse.incrementAndGet();
+ } else {
+ try {
+ waitForHighPriorityWorkersToFinish();
+ } catch (InterruptedException e) {
+ returnObject(key, result);
+ throw e;
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public void returnObject(WorkerKey key, Worker obj) {
+ if (highPriorityWorkerMnemonics.contains(key.getMnemonic())) {
+ decrementHighPriorityWorkerCount();
+ }
+ super.returnObject(key, obj);
}
@Override
public void invalidateObject(WorkerKey key, Worker obj) throws IOException, InterruptedException {
+ if (highPriorityWorkerMnemonics.contains(key.getMnemonic())) {
+ decrementHighPriorityWorkerCount();
+ }
try {
super.invalidateObject(key, obj);
} catch (Throwable t) {
@@ -54,4 +95,27 @@ final class WorkerPool extends GenericKeyedObjectPool<WorkerKey, Worker> {
throw new RuntimeException("unexpected", t);
}
}
+
+ // Decrements the high-priority workers counts and pings waiting threads if appropriate.
+ private void decrementHighPriorityWorkerCount() {
+ if (highPriorityWorkersInUse.decrementAndGet() <= 1) {
+ synchronized (highPriorityWorkersInUse) {
+ highPriorityWorkersInUse.notifyAll();
+ }
+ }
+ }
+
+ // Returns once less than two high-priority workers are running.
+ private void waitForHighPriorityWorkersToFinish() throws InterruptedException {
+ // Fast path for the case where the high-priority workers feature is not in use.
+ if (highPriorityWorkerMnemonics.isEmpty()) {
+ return;
+ }
+
+ while (highPriorityWorkersInUse.get() > 1) {
+ synchronized (highPriorityWorkersInUse) {
+ highPriorityWorkersInUse.wait();
+ }
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java
index 7efe410b19..122dfc7248 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java
@@ -109,12 +109,7 @@ final class WorkerSpawnRunner implements SpawnRunner {
}
context.report(ProgressStatus.SCHEDULING, getName());
- ActionExecutionMetadata owner = spawn.getResourceOwner();
- try (ResourceHandle handle =
- ResourceManager.instance().acquireResources(owner, spawn.getLocalResources())) {
- context.report(ProgressStatus.EXECUTING, getName());
- return actuallyExec(spawn, context);
- }
+ return actuallyExec(spawn, context);
}
private SpawnResult actuallyExec(Spawn spawn, SpawnExecutionContext context)
@@ -156,7 +151,7 @@ final class WorkerSpawnRunner implements SpawnRunner {
WorkRequest workRequest = createWorkRequest(spawn, context, flagFiles, inputFileCache);
long startTime = System.currentTimeMillis();
- WorkResponse response = execInWorker(key, workRequest, context, inputFiles, outputFiles);
+ WorkResponse response = execInWorker(spawn, key, workRequest, context, inputFiles, outputFiles);
Duration wallTime = Duration.ofMillis(System.currentTimeMillis() - startTime);
FileOutErr outErr = context.getFileOutErr();
@@ -246,8 +241,9 @@ final class WorkerSpawnRunner implements SpawnRunner {
static void expandArgument(Path execRoot, String arg, WorkRequest.Builder requestBuilder)
throws IOException {
if (arg.startsWith("@") && !arg.startsWith("@@")) {
- for (String line : Files.readAllLines(
- Paths.get(execRoot.getRelative(arg.substring(1)).getPathString()), UTF_8)) {
+ for (String line :
+ Files.readAllLines(
+ Paths.get(execRoot.getRelative(arg.substring(1)).getPathString()), UTF_8)) {
expandArgument(execRoot, line, requestBuilder);
}
} else {
@@ -256,6 +252,7 @@ final class WorkerSpawnRunner implements SpawnRunner {
}
private WorkResponse execInWorker(
+ Spawn spawn,
WorkerKey key,
WorkRequest request,
SpawnExecutionContext context,
@@ -265,6 +262,7 @@ final class WorkerSpawnRunner implements SpawnRunner {
Worker worker = null;
WorkResponse response;
+ ActionExecutionMetadata owner = spawn.getResourceOwner();
try {
try {
worker = workers.borrowObject(key);
@@ -277,54 +275,59 @@ final class WorkerSpawnRunner implements SpawnRunner {
.toString());
}
- try {
- worker.prepareExecution(inputFiles, outputFiles, key.getWorkerFilesWithHashes().keySet());
- } catch (IOException e) {
- throw new UserExecException(
- ErrorMessage.builder()
- .message("IOException while preparing the execution environment of a worker:")
- .logFile(worker.getLogFile())
- .exception(e)
- .build()
- .toString());
- }
+ try (ResourceHandle handle =
+ ResourceManager.instance().acquireResources(owner, spawn.getLocalResources())) {
+ context.report(ProgressStatus.EXECUTING, getName());
+ try {
+ worker.prepareExecution(inputFiles, outputFiles, key.getWorkerFilesWithHashes().keySet());
+ } catch (IOException e) {
+ throw new UserExecException(
+ ErrorMessage.builder()
+ .message("IOException while preparing the execution environment of a worker:")
+ .logFile(worker.getLogFile())
+ .exception(e)
+ .build()
+ .toString());
+ }
- try {
- request.writeDelimitedTo(worker.getOutputStream());
- worker.getOutputStream().flush();
- } catch (IOException e) {
- throw new UserExecException(
- ErrorMessage.builder()
- .message(
- "Worker process quit or closed its stdin stream when we tried to send a"
- + " WorkRequest:")
- .logFile(worker.getLogFile())
- .exception(e)
- .build()
- .toString());
- }
+ try {
+ request.writeDelimitedTo(worker.getOutputStream());
+ worker.getOutputStream().flush();
+ } catch (IOException e) {
+ throw new UserExecException(
+ ErrorMessage.builder()
+ .message(
+ "Worker process quit or closed its stdin stream when we tried to send a"
+ + " WorkRequest:")
+ .logFile(worker.getLogFile())
+ .exception(e)
+ .build()
+ .toString());
+ }
- RecordingInputStream recordingStream = new RecordingInputStream(worker.getInputStream());
- recordingStream.startRecording(4096);
- try {
- // response can be null when the worker has already closed stdout at this point and thus the
- // InputStream is at EOF.
- response = WorkResponse.parseDelimitedFrom(recordingStream);
- } catch (IOException e) {
- // If protobuf couldn't parse the response, try to print whatever the failing worker wrote
- // to stdout - it's probably a stack trace or some kind of error message that will help the
- // user figure out why the compiler is failing.
- recordingStream.readRemaining();
- throw new UserExecException(
- ErrorMessage.builder()
- .message(
- "Worker process returned an unparseable WorkResponse!\n\n"
- + "Did you try to print something to stdout? Workers aren't allowed to do "
- + "this, as it breaks the protocol between Bazel and the worker process.")
- .logText(recordingStream.getRecordedDataAsString())
- .exception(e)
- .build()
- .toString());
+ RecordingInputStream recordingStream = new RecordingInputStream(worker.getInputStream());
+ recordingStream.startRecording(4096);
+ try {
+ // response can be null when the worker has already closed stdout at this point and thus
+ // the InputStream is at EOF.
+ response = WorkResponse.parseDelimitedFrom(recordingStream);
+ } catch (IOException e) {
+ // If protobuf couldn't parse the response, try to print whatever the failing worker wrote
+ // to stdout - it's probably a stack trace or some kind of error message that will help
+ // the user figure out why the compiler is failing.
+ recordingStream.readRemaining();
+ throw new UserExecException(
+ ErrorMessage.builder()
+ .message(
+ "Worker process returned an unparseable WorkResponse!\n\n"
+ + "Did you try to print something to stdout? Workers aren't allowed to "
+ + "do this, as it breaks the protocol between Bazel and the worker "
+ + "process.")
+ .logText(recordingStream.getRecordedDataAsString())
+ .exception(e)
+ .build()
+ .toString());
+ }
}
context.lockOutputFiles();