diff options
author | 2016-06-27 18:05:39 +0000 | |
---|---|---|
committer | 2016-06-27 20:06:15 +0000 | |
commit | 6cebed66053d369deeb669cc674f5b756778a238 (patch) | |
tree | c337cce83edf5958d258dc9c31149103adecb665 /src/main/java/com/google/devtools/build | |
parent | 8b8142a32ab8c373762464a18cbc7598198cbc80 (diff) |
Fix threadpool leak in SkyQueryEnvironment
Shutdown the SkyQueryEnvironment's threadpool after query evaluation
is complete and the environment is ready for disposal.
--
MOS_MIGRATED_REVID=125975317
Diffstat (limited to 'src/main/java/com/google/devtools/build')
4 files changed, 172 insertions, 140 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java index 6a09138efb..0292f08458 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java @@ -43,10 +43,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; /** - * {@link QueryEnvironment} that can evaluate queries to produce a result, and implements as much - * of QueryEnvironment as possible while remaining mostly agnostic as to the objects being stored. + * {@link QueryEnvironment} that can evaluate queries to produce a result, and implements as much of + * QueryEnvironment as possible while remaining mostly agnostic as to the objects being stored. */ -public abstract class AbstractBlazeQueryEnvironment<T> implements QueryEnvironment<T> { +public abstract class AbstractBlazeQueryEnvironment<T> + implements QueryEnvironment<T>, AutoCloseable { protected final ErrorSensingEventHandler eventHandler; private final Map<String, Set<T>> letBindings = new HashMap<>(); protected final boolean keepGoing; @@ -144,8 +145,8 @@ public abstract class AbstractBlazeQueryEnvironment<T> implements QueryEnvironme return new QueryEvalResult(!eventHandler.hasErrors(), empty.get()); } - public void afterCommand() { - } + @Override + public abstract void close(); public QueryExpression transformParsedQuery(QueryExpression queryExpression) { return queryExpression; diff --git a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java index cde183c790..29a9ea96dd 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java @@ -106,6 +106,13 @@ public class BlazeQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> this.labelVisitor = new LabelVisitor(packageProvider, dependencyFilter); } + /** + * Calling close is optional because {@link BlazeQueryEnvironment} has no resources that need + * manual management. + */ + @Override + public void close() {} + @Override public DigraphQueryEvalResult<Target> evaluateQuery(QueryExpression expr, final Callback<Target> callback) throws QueryException, InterruptedException { diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 3a8f9baa67..5f8749d0c3 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -38,6 +38,7 @@ import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.TargetParsingException; import com.google.devtools.build.lib.cmdline.TargetPattern; import com.google.devtools.build.lib.collect.CompactHashSet; +import com.google.devtools.build.lib.concurrent.ExecutorUtil; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.EventHandler; @@ -119,9 +120,14 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> // TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant. private static final int BATCH_CALLBACK_SIZE = 10000; private static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors(); - - protected WalkableGraph graph; - private Supplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier; + private static final Logger LOG = Logger.getLogger(SkyQueryEnvironment.class.getName()); + private static final Function<Target, Label> TARGET_LABEL_FUNCTION = + new Function<Target, Label>() { + @Override + public Label apply(Target target) { + return target.getLabel(); + } + }; private final BlazeTargetAccessor accessor = new BlazeTargetAccessor(this); private final int loadingPhaseThreads; @@ -130,38 +136,18 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> private final String parserPrefix; private final PathPackageLocator pkgPath; - private static final Logger LOG = Logger.getLogger(SkyQueryEnvironment.class.getName()); - - private static final Function<Target, Label> TARGET_LABEL_FUNCTION = - new Function<Target, Label>() { - - @Override - public Label apply(Target target) { - return target.getLabel(); - } - }; - + // Note that the executor returned by Executors.newFixedThreadPool doesn't start any threads + // unless work is submitted to it. private final ListeningExecutorService threadPool = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( DEFAULT_THREAD_COUNT, - new ThreadFactoryBuilder().setNameFormat("GetPackages-%d").build())); - private RecursivePackageProviderBackedTargetPatternResolver resolver; - - private static class BlacklistSupplier implements Supplier<ImmutableSet<PathFragment>> { - private final WalkableGraph graph; - - BlacklistSupplier(WalkableGraph graph) { - this.graph = graph; - } + new ThreadFactoryBuilder().setNameFormat("QueryEnvironment-%d").build())); - @Override - public ImmutableSet<PathFragment> get() { - return ((BlacklistedPackagePrefixesValue) - graph.getValue(BlacklistedPackagePrefixesValue.key())) - .getPatterns(); - } - } + // The following fields are set in the #beforeEvaluateQuery method. + protected WalkableGraph graph; + private Supplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier; + private RecursivePackageProviderBackedTargetPatternResolver resolver; public SkyQueryEnvironment( boolean keepGoing, @@ -189,17 +175,19 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> "No queries can be performed with an empty universe"); } - private void init() throws InterruptedException { + private void beforeEvaluateQuery() throws InterruptedException { EvaluationResult<SkyValue> result; try (AutoProfiler p = AutoProfiler.logged("evaluation and walkable graph", LOG)) { - result = graphFactory.prepareAndGet(universeScope, parserPrefix, loadingPhaseThreads, - eventHandler); + result = + graphFactory.prepareAndGet( + universeScope, parserPrefix, loadingPhaseThreads, eventHandler); } - graph = result.getWalkableGraph(); + SkyKey universeKey = graphFactory.getUniverseKey(universeScope, parserPrefix); + checkEvaluationResult(result, universeKey); + graph = result.getWalkableGraph(); blacklistPatternsSupplier = Suppliers.memoize(new BlacklistSupplier(graph)); - SkyKey universeKey = graphFactory.getUniverseKey(universeScope, parserPrefix); ImmutableList<TargetPatternKey> universeTargetPatternKeys = PrepareDepsOfPatternsFunction.getTargetPatternKeys( PrepareDepsOfPatternsFunction.getSkyKeys(universeKey, eventHandler)); @@ -211,23 +199,38 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> eventHandler, TargetPatternEvaluator.DEFAULT_FILTERING_POLICY, threadPool); + } - // The prepareAndGet call above evaluates a single PrepareDepsOfPatterns SkyKey. - // We expect to see either a single successfully evaluated value or a cycle in the result. + /** + * The {@link EvaluationResult} is from the evaluation of a single PrepareDepsOfPatterns node. We + * expect to see either a single successfully evaluated value or a cycle in the result. + */ + private void checkEvaluationResult(EvaluationResult<SkyValue> result, SkyKey universeKey) { Collection<SkyValue> values = result.values(); if (!values.isEmpty()) { - Preconditions.checkState(values.size() == 1, "Universe query \"%s\" returned multiple" - + " values unexpectedly (%s values in result)", universeScope, values.size()); + Preconditions.checkState( + values.size() == 1, + "Universe query \"%s\" returned multiple values unexpectedly (%s values in result)", + universeScope, + values.size()); Preconditions.checkNotNull(result.get(universeKey), result); } else { // No values in the result, so there must be an error. We expect the error to be a cycle. boolean foundCycle = !Iterables.isEmpty(result.getError().getCycleInfo()); - Preconditions.checkState(foundCycle, "Universe query \"%s\" failed with non-cycle error: %s", - universeScope, result.getError()); + Preconditions.checkState( + foundCycle, + "Universe query \"%s\" failed with non-cycle error: %s", + universeScope, + result.getError()); } } @Override + public void close() { + ExecutorUtil.interruptibleShutdown(threadPool); + } + + @Override public QueryExpression transformParsedQuery(QueryExpression queryExpression) { // Transform each occurrence of an expressions of the form 'rdeps(<universeScope>, <T>)' to // 'allrdeps(<T>)'. The latter is more efficient. @@ -270,7 +273,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> // result is set to have an error iff there were errors emitted during the query, so we reset // errors here. eventHandler.resetErrors(); - init(); + beforeEvaluateQuery(); // SkyQueryEnvironment batches callback invocations using a BatchStreamedCallback, created here // so that there's one per top-level evaluateQuery call. The batch size is large enough that @@ -651,8 +654,8 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> protected void preloadOrThrow(QueryExpression caller, Collection<String> patterns) throws QueryException, TargetParsingException { // SkyQueryEnvironment directly evaluates target patterns in #getTarget and similar methods - // using its graph, which is prepopulated using the universeScope (see #init), so no - // preloading of target patterns is necessary. + // using its graph, which is prepopulated using the universeScope (see #beforeEvaluateQuery), + // so no preloading of target patterns is necessary. } private static final Function<SkyKey, Label> SKYKEY_TO_LABEL = new Function<SkyKey, Label>() { @@ -872,6 +875,21 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> .build(); } + private static class BlacklistSupplier implements Supplier<ImmutableSet<PathFragment>> { + private final WalkableGraph graph; + + BlacklistSupplier(WalkableGraph graph) { + this.graph = graph; + } + + @Override + public ImmutableSet<PathFragment> get() { + return ((BlacklistedPackagePrefixesValue) + graph.getValue(BlacklistedPackagePrefixesValue.key())) + .getPatterns(); + } + } + @ThreadSafe private static class ConcurrentUniquifier implements Uniquifier<Target> { diff --git a/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java b/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java index 7f467295ef..3dd0f4d7c8 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java @@ -133,109 +133,115 @@ public final class QueryCommand implements BlazeCommand { Set<Setting> settings = queryOptions.toSettings(); boolean streamResults = QueryOutputUtils.shouldStreamResults(queryOptions, formatter); - AbstractBlazeQueryEnvironment<Target> queryEnv = newQueryEnvironment( - env, - queryOptions.keepGoing, - !streamResults, - queryOptions.universeScope, queryOptions.loadingPhaseThreads, - settings); + QueryEvalResult result; + try (AbstractBlazeQueryEnvironment<Target> queryEnv = + newQueryEnvironment( + env, + queryOptions.keepGoing, + !streamResults, + queryOptions.universeScope, + queryOptions.loadingPhaseThreads, + settings)) { - // 1. Parse and transform query: - QueryExpression expr; - try { - expr = QueryExpression.parse(query, queryEnv); - } catch (QueryException e) { - env.getReporter().handle(Event.error( - null, "Error while parsing '" + query + "': " + e.getMessage())); - return ExitCode.COMMAND_LINE_ERROR; - } - expr = queryEnv.transformParsedQuery(expr); + // 1. Parse and transform query: + QueryExpression expr; + try { + expr = QueryExpression.parse(query, queryEnv); + } catch (QueryException e) { + env.getReporter() + .handle(Event.error(null, "Error while parsing '" + query + "': " + e.getMessage())); + return ExitCode.COMMAND_LINE_ERROR; + } + expr = queryEnv.transformParsedQuery(expr); - QueryEvalResult result; - PrintStream output = null; - OutputFormatterCallback<Target> callback; - if (streamResults) { - disableAnsiCharactersFiltering(env); - output = new PrintStream(env.getReporter().getOutErr().getOutputStream()); - // 2. Evaluate expression: - StreamedFormatter streamedFormatter = ((StreamedFormatter) formatter); - streamedFormatter.setOptions(queryOptions, queryOptions.aspectDeps.createResolver( - env.getPackageManager(), env.getReporter())); - callback = streamedFormatter.createStreamCallback(output); - } else { - callback = new AggregateAllOutputFormatterCallback<>(); - } - boolean catastrophe = true; - try { - callback.start(); - result = queryEnv.evaluateQuery(expr, callback); - catastrophe = false; - } catch (QueryException e) { - catastrophe = false; - // Keep consistent with reportBuildFileError() - env.getReporter() - // TODO(bazel-team): this is a kludge to fix a bug observed in the wild. We should make - // sure no null error messages ever get in. - .handle(Event.error(e.getMessage() == null ? e.toString() : e.getMessage())); - return ExitCode.ANALYSIS_FAILURE; - } catch (InterruptedException e) { - catastrophe = false; - IOException ioException = callback.getIoException(); - if (ioException == null || ioException instanceof ClosedByInterruptException) { - env.getReporter().handle(Event.error("query interrupted")); - return ExitCode.INTERRUPTED; + PrintStream output = null; + OutputFormatterCallback<Target> callback; + if (streamResults) { + disableAnsiCharactersFiltering(env); + output = new PrintStream(env.getReporter().getOutErr().getOutputStream()); + // 2. Evaluate expression: + StreamedFormatter streamedFormatter = ((StreamedFormatter) formatter); + streamedFormatter.setOptions( + queryOptions, + queryOptions.aspectDeps.createResolver(env.getPackageManager(), env.getReporter())); + callback = streamedFormatter.createStreamCallback(output); } else { - env.getReporter().handle(Event.error("I/O error: " + e.getMessage())); - return ExitCode.LOCAL_ENVIRONMENTAL_ERROR; + callback = new AggregateAllOutputFormatterCallback<>(); } - } catch (IOException e) { - catastrophe = false; - env.getReporter().handle(Event.error("I/O error: " + e.getMessage())); - return ExitCode.LOCAL_ENVIRONMENTAL_ERROR; - } finally { - if (!catastrophe) { - if (streamResults) { - output.flush(); - queryEnv.afterCommand(); - } - try { - callback.close(); - } catch (IOException e) { + boolean catastrophe = true; + try { + callback.start(); + result = queryEnv.evaluateQuery(expr, callback); + catastrophe = false; + } catch (QueryException e) { + catastrophe = false; + // Keep consistent with reportBuildFileError() + env.getReporter() + // TODO(bazel-team): this is a kludge to fix a bug observed in the wild. We should make + // sure no null error messages ever get in. + .handle(Event.error(e.getMessage() == null ? e.toString() : e.getMessage())); + return ExitCode.ANALYSIS_FAILURE; + } catch (InterruptedException e) { + catastrophe = false; + IOException ioException = callback.getIoException(); + if (ioException == null || ioException instanceof ClosedByInterruptException) { + env.getReporter().handle(Event.error("query interrupted")); + return ExitCode.INTERRUPTED; + } else { env.getReporter().handle(Event.error("I/O error: " + e.getMessage())); return ExitCode.LOCAL_ENVIRONMENTAL_ERROR; } - } - } - - env.getEventBus().post(new NoBuildEvent()); - if (!streamResults) { - disableAnsiCharactersFiltering(env); - output = new PrintStream(env.getReporter().getOutErr().getOutputStream()); - - // 3. Output results: - try { - Set<Target> targets = ((AggregateAllOutputFormatterCallback<Target>) callback).getOutput(); - QueryOutputUtils.output(queryOptions, result, - targets, formatter, output, - queryOptions.aspectDeps.createResolver( - env.getPackageManager(), env.getReporter())); - } catch (ClosedByInterruptException | InterruptedException e) { - env.getReporter().handle(Event.error("query interrupted")); - return ExitCode.INTERRUPTED; } catch (IOException e) { + catastrophe = false; env.getReporter().handle(Event.error("I/O error: " + e.getMessage())); return ExitCode.LOCAL_ENVIRONMENTAL_ERROR; } finally { - queryEnv.afterCommand(); - // Note that PrintStream#checkError first flushes and then returns whether any - // error was ever encountered. - if (output.checkError()) { - // Unfortunately, there's no way to check the current error status of PrintStream - // without forcing a flush, so we don't know whether this error happened before or after - // timewise the one we already have from above. Neither choice is always correct, so we - // arbitrarily choose the exit code corresponding to the PrintStream's error. - env.getReporter().handle(Event.error("I/O error while writing query output")); + if (!catastrophe) { + if (streamResults) { + output.flush(); + } + try { + callback.close(); + } catch (IOException e) { + env.getReporter().handle(Event.error("I/O error: " + e.getMessage())); + return ExitCode.LOCAL_ENVIRONMENTAL_ERROR; + } + } + } + + env.getEventBus().post(new NoBuildEvent()); + if (!streamResults) { + disableAnsiCharactersFiltering(env); + output = new PrintStream(env.getReporter().getOutErr().getOutputStream()); + + // 3. Output results: + try { + Set<Target> targets = + ((AggregateAllOutputFormatterCallback<Target>) callback).getOutput(); + QueryOutputUtils.output( + queryOptions, + result, + targets, + formatter, + output, + queryOptions.aspectDeps.createResolver(env.getPackageManager(), env.getReporter())); + } catch (ClosedByInterruptException | InterruptedException e) { + env.getReporter().handle(Event.error("query interrupted")); + return ExitCode.INTERRUPTED; + } catch (IOException e) { + env.getReporter().handle(Event.error("I/O error: " + e.getMessage())); return ExitCode.LOCAL_ENVIRONMENTAL_ERROR; + } finally { + // Note that PrintStream#checkError first flushes and then returns whether any + // error was ever encountered. + if (output.checkError()) { + // Unfortunately, there's no way to check the current error status of PrintStream + // without forcing a flush, so we don't know whether this error happened before or after + // timewise the one we already have from above. Neither choice is always correct, so we + // arbitrarily choose the exit code corresponding to the PrintStream's error. + env.getReporter().handle(Event.error("I/O error while writing query output")); + return ExitCode.LOCAL_ENVIRONMENTAL_ERROR; + } } } } |