diff options
author | 2018-04-20 08:32:04 -0700 | |
---|---|---|
committer | 2018-04-20 08:34:00 -0700 | |
commit | 2396db3f656781b038ca4fda9b41295d860a88c7 (patch) | |
tree | ed512bb13f64eb40d10b1778971fad54366be780 | |
parent | 63258233946d88feea3b414113aaade3d4145660 (diff) |
Blaze: add --high_priority_workers flag.
When two or more instances of high priority workers are running, no other worker execution may start.
Tweak WorkerSpawnRunner. Only reserve resources for a worker after it has been obtained from the pool.
This allows us to block waiting for high priority workers to have finished before resources are reserved.
RELNOTES: Add --high_priority_workers flag.
PiperOrigin-RevId: 193672343
4 files changed, 141 insertions, 63 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java index 650b85ad58..716be4dcbf 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerModule.java @@ -105,7 +105,7 @@ public class WorkerModule extends BlazeModule { if (workerPool == null) { workerPoolConfig = newConfig; - workerPool = new WorkerPool(workerFactory, workerPoolConfig); + workerPool = new WorkerPool(workerFactory, workerPoolConfig, options.highPriorityWorkers); } } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java index 1625b0f6a3..d0bb6749e3 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerOptions.java @@ -56,6 +56,17 @@ public class WorkerOptions extends OptionsBase { public int workerMaxInstances; @Option( + name = "high_priority_workers", + defaultValue = "", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.UNKNOWN}, + help = + "Mnemonics of workers to run with high priority. When high priority workers are running " + + "all other workers are throttled.", + allowMultiple = true) + public List<String> highPriorityWorkers; + + @Option( name = "worker_quit_after_build", defaultValue = "false", documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java index d1eb29e93c..9b3afbbdda 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java @@ -14,13 +14,12 @@ package com.google.devtools.build.lib.worker; import com.google.common.base.Throwables; - -import org.apache.commons.pool2.impl.GenericKeyedObjectPool; -import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; - +import com.google.common.collect.ImmutableSet; import java.io.IOException; - +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.concurrent.ThreadSafe; +import org.apache.commons.pool2.impl.GenericKeyedObjectPool; +import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; /** * A worker pool that spawns multiple workers and delegates work to them. @@ -30,23 +29,65 @@ import javax.annotation.concurrent.ThreadSafe; */ @ThreadSafe final class WorkerPool extends GenericKeyedObjectPool<WorkerKey, Worker> { + private final AtomicInteger highPriorityWorkersInUse = new AtomicInteger(0); + private final ImmutableSet<String> highPriorityWorkerMnemonics; - public WorkerPool(WorkerFactory factory, GenericKeyedObjectPoolConfig config) { + /** + * @param factory worker factory + * @param config pool configuration + * @param highPriorityWorkers mnemonics of high priority workers + */ + public WorkerPool( + WorkerFactory factory, + GenericKeyedObjectPoolConfig config, + Iterable<String> highPriorityWorkers) { super(factory, config); + highPriorityWorkerMnemonics = ImmutableSet.copyOf(highPriorityWorkers); } + /** + * Gets a worker. + * + * @param key worker key + * @return a worker + */ @Override public Worker borrowObject(WorkerKey key) throws IOException, InterruptedException { + Worker result; try { - return super.borrowObject(key); + result = super.borrowObject(key); } catch (Throwable t) { Throwables.propagateIfPossible(t, IOException.class, InterruptedException.class); throw new RuntimeException("unexpected", t); } + + if (highPriorityWorkerMnemonics.contains(key.getMnemonic())) { + highPriorityWorkersInUse.incrementAndGet(); + } else { + try { + waitForHighPriorityWorkersToFinish(); + } catch (InterruptedException e) { + returnObject(key, result); + throw e; + } + } + + return result; + } + + @Override + public void returnObject(WorkerKey key, Worker obj) { + if (highPriorityWorkerMnemonics.contains(key.getMnemonic())) { + decrementHighPriorityWorkerCount(); + } + super.returnObject(key, obj); } @Override public void invalidateObject(WorkerKey key, Worker obj) throws IOException, InterruptedException { + if (highPriorityWorkerMnemonics.contains(key.getMnemonic())) { + decrementHighPriorityWorkerCount(); + } try { super.invalidateObject(key, obj); } catch (Throwable t) { @@ -54,4 +95,27 @@ final class WorkerPool extends GenericKeyedObjectPool<WorkerKey, Worker> { throw new RuntimeException("unexpected", t); } } + + // Decrements the high-priority workers counts and pings waiting threads if appropriate. + private void decrementHighPriorityWorkerCount() { + if (highPriorityWorkersInUse.decrementAndGet() <= 1) { + synchronized (highPriorityWorkersInUse) { + highPriorityWorkersInUse.notifyAll(); + } + } + } + + // Returns once less than two high-priority workers are running. + private void waitForHighPriorityWorkersToFinish() throws InterruptedException { + // Fast path for the case where the high-priority workers feature is not in use. + if (highPriorityWorkerMnemonics.isEmpty()) { + return; + } + + while (highPriorityWorkersInUse.get() > 1) { + synchronized (highPriorityWorkersInUse) { + highPriorityWorkersInUse.wait(); + } + } + } } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java index 7efe410b19..122dfc7248 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java @@ -109,12 +109,7 @@ final class WorkerSpawnRunner implements SpawnRunner { } context.report(ProgressStatus.SCHEDULING, getName()); - ActionExecutionMetadata owner = spawn.getResourceOwner(); - try (ResourceHandle handle = - ResourceManager.instance().acquireResources(owner, spawn.getLocalResources())) { - context.report(ProgressStatus.EXECUTING, getName()); - return actuallyExec(spawn, context); - } + return actuallyExec(spawn, context); } private SpawnResult actuallyExec(Spawn spawn, SpawnExecutionContext context) @@ -156,7 +151,7 @@ final class WorkerSpawnRunner implements SpawnRunner { WorkRequest workRequest = createWorkRequest(spawn, context, flagFiles, inputFileCache); long startTime = System.currentTimeMillis(); - WorkResponse response = execInWorker(key, workRequest, context, inputFiles, outputFiles); + WorkResponse response = execInWorker(spawn, key, workRequest, context, inputFiles, outputFiles); Duration wallTime = Duration.ofMillis(System.currentTimeMillis() - startTime); FileOutErr outErr = context.getFileOutErr(); @@ -246,8 +241,9 @@ final class WorkerSpawnRunner implements SpawnRunner { static void expandArgument(Path execRoot, String arg, WorkRequest.Builder requestBuilder) throws IOException { if (arg.startsWith("@") && !arg.startsWith("@@")) { - for (String line : Files.readAllLines( - Paths.get(execRoot.getRelative(arg.substring(1)).getPathString()), UTF_8)) { + for (String line : + Files.readAllLines( + Paths.get(execRoot.getRelative(arg.substring(1)).getPathString()), UTF_8)) { expandArgument(execRoot, line, requestBuilder); } } else { @@ -256,6 +252,7 @@ final class WorkerSpawnRunner implements SpawnRunner { } private WorkResponse execInWorker( + Spawn spawn, WorkerKey key, WorkRequest request, SpawnExecutionContext context, @@ -265,6 +262,7 @@ final class WorkerSpawnRunner implements SpawnRunner { Worker worker = null; WorkResponse response; + ActionExecutionMetadata owner = spawn.getResourceOwner(); try { try { worker = workers.borrowObject(key); @@ -277,54 +275,59 @@ final class WorkerSpawnRunner implements SpawnRunner { .toString()); } - try { - worker.prepareExecution(inputFiles, outputFiles, key.getWorkerFilesWithHashes().keySet()); - } catch (IOException e) { - throw new UserExecException( - ErrorMessage.builder() - .message("IOException while preparing the execution environment of a worker:") - .logFile(worker.getLogFile()) - .exception(e) - .build() - .toString()); - } + try (ResourceHandle handle = + ResourceManager.instance().acquireResources(owner, spawn.getLocalResources())) { + context.report(ProgressStatus.EXECUTING, getName()); + try { + worker.prepareExecution(inputFiles, outputFiles, key.getWorkerFilesWithHashes().keySet()); + } catch (IOException e) { + throw new UserExecException( + ErrorMessage.builder() + .message("IOException while preparing the execution environment of a worker:") + .logFile(worker.getLogFile()) + .exception(e) + .build() + .toString()); + } - try { - request.writeDelimitedTo(worker.getOutputStream()); - worker.getOutputStream().flush(); - } catch (IOException e) { - throw new UserExecException( - ErrorMessage.builder() - .message( - "Worker process quit or closed its stdin stream when we tried to send a" - + " WorkRequest:") - .logFile(worker.getLogFile()) - .exception(e) - .build() - .toString()); - } + try { + request.writeDelimitedTo(worker.getOutputStream()); + worker.getOutputStream().flush(); + } catch (IOException e) { + throw new UserExecException( + ErrorMessage.builder() + .message( + "Worker process quit or closed its stdin stream when we tried to send a" + + " WorkRequest:") + .logFile(worker.getLogFile()) + .exception(e) + .build() + .toString()); + } - RecordingInputStream recordingStream = new RecordingInputStream(worker.getInputStream()); - recordingStream.startRecording(4096); - try { - // response can be null when the worker has already closed stdout at this point and thus the - // InputStream is at EOF. - response = WorkResponse.parseDelimitedFrom(recordingStream); - } catch (IOException e) { - // If protobuf couldn't parse the response, try to print whatever the failing worker wrote - // to stdout - it's probably a stack trace or some kind of error message that will help the - // user figure out why the compiler is failing. - recordingStream.readRemaining(); - throw new UserExecException( - ErrorMessage.builder() - .message( - "Worker process returned an unparseable WorkResponse!\n\n" - + "Did you try to print something to stdout? Workers aren't allowed to do " - + "this, as it breaks the protocol between Bazel and the worker process.") - .logText(recordingStream.getRecordedDataAsString()) - .exception(e) - .build() - .toString()); + RecordingInputStream recordingStream = new RecordingInputStream(worker.getInputStream()); + recordingStream.startRecording(4096); + try { + // response can be null when the worker has already closed stdout at this point and thus + // the InputStream is at EOF. + response = WorkResponse.parseDelimitedFrom(recordingStream); + } catch (IOException e) { + // If protobuf couldn't parse the response, try to print whatever the failing worker wrote + // to stdout - it's probably a stack trace or some kind of error message that will help + // the user figure out why the compiler is failing. + recordingStream.readRemaining(); + throw new UserExecException( + ErrorMessage.builder() + .message( + "Worker process returned an unparseable WorkResponse!\n\n" + + "Did you try to print something to stdout? Workers aren't allowed to " + + "do this, as it breaks the protocol between Bazel and the worker " + + "process.") + .logText(recordingStream.getRecordedDataAsString()) + .exception(e) + .build() + .toString()); + } } context.lockOutputFiles(); |