aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build
diff options
context:
space:
mode:
authorGravatar Mark Schaller <mschaller@google.com>2016-06-27 18:05:39 +0000
committerGravatar Dmitry Lomov <dslomov@google.com>2016-06-27 20:06:15 +0000
commit6cebed66053d369deeb669cc674f5b756778a238 (patch)
treec337cce83edf5958d258dc9c31149103adecb665 /src/main/java/com/google/devtools/build
parent8b8142a32ab8c373762464a18cbc7598198cbc80 (diff)
Fix threadpool leak in SkyQueryEnvironment
Shutdown the SkyQueryEnvironment's threadpool after query evaluation is complete and the environment is ready for disposal. -- MOS_MIGRATED_REVID=125975317
Diffstat (limited to 'src/main/java/com/google/devtools/build')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java11
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java7
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java106
-rw-r--r--src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java188
4 files changed, 172 insertions, 140 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
index 6a09138efb..0292f08458 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
@@ -43,10 +43,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
/**
- * {@link QueryEnvironment} that can evaluate queries to produce a result, and implements as much
- * of QueryEnvironment as possible while remaining mostly agnostic as to the objects being stored.
+ * {@link QueryEnvironment} that can evaluate queries to produce a result, and implements as much of
+ * QueryEnvironment as possible while remaining mostly agnostic as to the objects being stored.
*/
-public abstract class AbstractBlazeQueryEnvironment<T> implements QueryEnvironment<T> {
+public abstract class AbstractBlazeQueryEnvironment<T>
+ implements QueryEnvironment<T>, AutoCloseable {
protected final ErrorSensingEventHandler eventHandler;
private final Map<String, Set<T>> letBindings = new HashMap<>();
protected final boolean keepGoing;
@@ -144,8 +145,8 @@ public abstract class AbstractBlazeQueryEnvironment<T> implements QueryEnvironme
return new QueryEvalResult(!eventHandler.hasErrors(), empty.get());
}
- public void afterCommand() {
- }
+ @Override
+ public abstract void close();
public QueryExpression transformParsedQuery(QueryExpression queryExpression) {
return queryExpression;
diff --git a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
index cde183c790..29a9ea96dd 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
@@ -106,6 +106,13 @@ public class BlazeQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
this.labelVisitor = new LabelVisitor(packageProvider, dependencyFilter);
}
+ /**
+ * Calling close is optional because {@link BlazeQueryEnvironment} has no resources that need
+ * manual management.
+ */
+ @Override
+ public void close() {}
+
@Override
public DigraphQueryEvalResult<Target> evaluateQuery(QueryExpression expr,
final Callback<Target> callback) throws QueryException, InterruptedException {
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 3a8f9baa67..5f8749d0c3 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -38,6 +38,7 @@ import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.TargetParsingException;
import com.google.devtools.build.lib.cmdline.TargetPattern;
import com.google.devtools.build.lib.collect.CompactHashSet;
+import com.google.devtools.build.lib.concurrent.ExecutorUtil;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
@@ -119,9 +120,14 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
// TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant.
private static final int BATCH_CALLBACK_SIZE = 10000;
private static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
-
- protected WalkableGraph graph;
- private Supplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier;
+ private static final Logger LOG = Logger.getLogger(SkyQueryEnvironment.class.getName());
+ private static final Function<Target, Label> TARGET_LABEL_FUNCTION =
+ new Function<Target, Label>() {
+ @Override
+ public Label apply(Target target) {
+ return target.getLabel();
+ }
+ };
private final BlazeTargetAccessor accessor = new BlazeTargetAccessor(this);
private final int loadingPhaseThreads;
@@ -130,38 +136,18 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
private final String parserPrefix;
private final PathPackageLocator pkgPath;
- private static final Logger LOG = Logger.getLogger(SkyQueryEnvironment.class.getName());
-
- private static final Function<Target, Label> TARGET_LABEL_FUNCTION =
- new Function<Target, Label>() {
-
- @Override
- public Label apply(Target target) {
- return target.getLabel();
- }
- };
-
+ // Note that the executor returned by Executors.newFixedThreadPool doesn't start any threads
+ // unless work is submitted to it.
private final ListeningExecutorService threadPool =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
DEFAULT_THREAD_COUNT,
- new ThreadFactoryBuilder().setNameFormat("GetPackages-%d").build()));
- private RecursivePackageProviderBackedTargetPatternResolver resolver;
-
- private static class BlacklistSupplier implements Supplier<ImmutableSet<PathFragment>> {
- private final WalkableGraph graph;
-
- BlacklistSupplier(WalkableGraph graph) {
- this.graph = graph;
- }
+ new ThreadFactoryBuilder().setNameFormat("QueryEnvironment-%d").build()));
- @Override
- public ImmutableSet<PathFragment> get() {
- return ((BlacklistedPackagePrefixesValue)
- graph.getValue(BlacklistedPackagePrefixesValue.key()))
- .getPatterns();
- }
- }
+ // The following fields are set in the #beforeEvaluateQuery method.
+ protected WalkableGraph graph;
+ private Supplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier;
+ private RecursivePackageProviderBackedTargetPatternResolver resolver;
public SkyQueryEnvironment(
boolean keepGoing,
@@ -189,17 +175,19 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
"No queries can be performed with an empty universe");
}
- private void init() throws InterruptedException {
+ private void beforeEvaluateQuery() throws InterruptedException {
EvaluationResult<SkyValue> result;
try (AutoProfiler p = AutoProfiler.logged("evaluation and walkable graph", LOG)) {
- result = graphFactory.prepareAndGet(universeScope, parserPrefix, loadingPhaseThreads,
- eventHandler);
+ result =
+ graphFactory.prepareAndGet(
+ universeScope, parserPrefix, loadingPhaseThreads, eventHandler);
}
- graph = result.getWalkableGraph();
+ SkyKey universeKey = graphFactory.getUniverseKey(universeScope, parserPrefix);
+ checkEvaluationResult(result, universeKey);
+ graph = result.getWalkableGraph();
blacklistPatternsSupplier = Suppliers.memoize(new BlacklistSupplier(graph));
- SkyKey universeKey = graphFactory.getUniverseKey(universeScope, parserPrefix);
ImmutableList<TargetPatternKey> universeTargetPatternKeys =
PrepareDepsOfPatternsFunction.getTargetPatternKeys(
PrepareDepsOfPatternsFunction.getSkyKeys(universeKey, eventHandler));
@@ -211,23 +199,38 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
eventHandler,
TargetPatternEvaluator.DEFAULT_FILTERING_POLICY,
threadPool);
+ }
- // The prepareAndGet call above evaluates a single PrepareDepsOfPatterns SkyKey.
- // We expect to see either a single successfully evaluated value or a cycle in the result.
+ /**
+ * The {@link EvaluationResult} is from the evaluation of a single PrepareDepsOfPatterns node. We
+ * expect to see either a single successfully evaluated value or a cycle in the result.
+ */
+ private void checkEvaluationResult(EvaluationResult<SkyValue> result, SkyKey universeKey) {
Collection<SkyValue> values = result.values();
if (!values.isEmpty()) {
- Preconditions.checkState(values.size() == 1, "Universe query \"%s\" returned multiple"
- + " values unexpectedly (%s values in result)", universeScope, values.size());
+ Preconditions.checkState(
+ values.size() == 1,
+ "Universe query \"%s\" returned multiple values unexpectedly (%s values in result)",
+ universeScope,
+ values.size());
Preconditions.checkNotNull(result.get(universeKey), result);
} else {
// No values in the result, so there must be an error. We expect the error to be a cycle.
boolean foundCycle = !Iterables.isEmpty(result.getError().getCycleInfo());
- Preconditions.checkState(foundCycle, "Universe query \"%s\" failed with non-cycle error: %s",
- universeScope, result.getError());
+ Preconditions.checkState(
+ foundCycle,
+ "Universe query \"%s\" failed with non-cycle error: %s",
+ universeScope,
+ result.getError());
}
}
@Override
+ public void close() {
+ ExecutorUtil.interruptibleShutdown(threadPool);
+ }
+
+ @Override
public QueryExpression transformParsedQuery(QueryExpression queryExpression) {
// Transform each occurrence of an expressions of the form 'rdeps(<universeScope>, <T>)' to
// 'allrdeps(<T>)'. The latter is more efficient.
@@ -270,7 +273,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
// result is set to have an error iff there were errors emitted during the query, so we reset
// errors here.
eventHandler.resetErrors();
- init();
+ beforeEvaluateQuery();
// SkyQueryEnvironment batches callback invocations using a BatchStreamedCallback, created here
// so that there's one per top-level evaluateQuery call. The batch size is large enough that
@@ -651,8 +654,8 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
protected void preloadOrThrow(QueryExpression caller, Collection<String> patterns)
throws QueryException, TargetParsingException {
// SkyQueryEnvironment directly evaluates target patterns in #getTarget and similar methods
- // using its graph, which is prepopulated using the universeScope (see #init), so no
- // preloading of target patterns is necessary.
+ // using its graph, which is prepopulated using the universeScope (see #beforeEvaluateQuery),
+ // so no preloading of target patterns is necessary.
}
private static final Function<SkyKey, Label> SKYKEY_TO_LABEL = new Function<SkyKey, Label>() {
@@ -872,6 +875,21 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
.build();
}
+ private static class BlacklistSupplier implements Supplier<ImmutableSet<PathFragment>> {
+ private final WalkableGraph graph;
+
+ BlacklistSupplier(WalkableGraph graph) {
+ this.graph = graph;
+ }
+
+ @Override
+ public ImmutableSet<PathFragment> get() {
+ return ((BlacklistedPackagePrefixesValue)
+ graph.getValue(BlacklistedPackagePrefixesValue.key()))
+ .getPatterns();
+ }
+ }
+
@ThreadSafe
private static class ConcurrentUniquifier implements Uniquifier<Target> {
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java b/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java
index 7f467295ef..3dd0f4d7c8 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java
@@ -133,109 +133,115 @@ public final class QueryCommand implements BlazeCommand {
Set<Setting> settings = queryOptions.toSettings();
boolean streamResults = QueryOutputUtils.shouldStreamResults(queryOptions, formatter);
- AbstractBlazeQueryEnvironment<Target> queryEnv = newQueryEnvironment(
- env,
- queryOptions.keepGoing,
- !streamResults,
- queryOptions.universeScope, queryOptions.loadingPhaseThreads,
- settings);
+ QueryEvalResult result;
+ try (AbstractBlazeQueryEnvironment<Target> queryEnv =
+ newQueryEnvironment(
+ env,
+ queryOptions.keepGoing,
+ !streamResults,
+ queryOptions.universeScope,
+ queryOptions.loadingPhaseThreads,
+ settings)) {
- // 1. Parse and transform query:
- QueryExpression expr;
- try {
- expr = QueryExpression.parse(query, queryEnv);
- } catch (QueryException e) {
- env.getReporter().handle(Event.error(
- null, "Error while parsing '" + query + "': " + e.getMessage()));
- return ExitCode.COMMAND_LINE_ERROR;
- }
- expr = queryEnv.transformParsedQuery(expr);
+ // 1. Parse and transform query:
+ QueryExpression expr;
+ try {
+ expr = QueryExpression.parse(query, queryEnv);
+ } catch (QueryException e) {
+ env.getReporter()
+ .handle(Event.error(null, "Error while parsing '" + query + "': " + e.getMessage()));
+ return ExitCode.COMMAND_LINE_ERROR;
+ }
+ expr = queryEnv.transformParsedQuery(expr);
- QueryEvalResult result;
- PrintStream output = null;
- OutputFormatterCallback<Target> callback;
- if (streamResults) {
- disableAnsiCharactersFiltering(env);
- output = new PrintStream(env.getReporter().getOutErr().getOutputStream());
- // 2. Evaluate expression:
- StreamedFormatter streamedFormatter = ((StreamedFormatter) formatter);
- streamedFormatter.setOptions(queryOptions, queryOptions.aspectDeps.createResolver(
- env.getPackageManager(), env.getReporter()));
- callback = streamedFormatter.createStreamCallback(output);
- } else {
- callback = new AggregateAllOutputFormatterCallback<>();
- }
- boolean catastrophe = true;
- try {
- callback.start();
- result = queryEnv.evaluateQuery(expr, callback);
- catastrophe = false;
- } catch (QueryException e) {
- catastrophe = false;
- // Keep consistent with reportBuildFileError()
- env.getReporter()
- // TODO(bazel-team): this is a kludge to fix a bug observed in the wild. We should make
- // sure no null error messages ever get in.
- .handle(Event.error(e.getMessage() == null ? e.toString() : e.getMessage()));
- return ExitCode.ANALYSIS_FAILURE;
- } catch (InterruptedException e) {
- catastrophe = false;
- IOException ioException = callback.getIoException();
- if (ioException == null || ioException instanceof ClosedByInterruptException) {
- env.getReporter().handle(Event.error("query interrupted"));
- return ExitCode.INTERRUPTED;
+ PrintStream output = null;
+ OutputFormatterCallback<Target> callback;
+ if (streamResults) {
+ disableAnsiCharactersFiltering(env);
+ output = new PrintStream(env.getReporter().getOutErr().getOutputStream());
+ // 2. Evaluate expression:
+ StreamedFormatter streamedFormatter = ((StreamedFormatter) formatter);
+ streamedFormatter.setOptions(
+ queryOptions,
+ queryOptions.aspectDeps.createResolver(env.getPackageManager(), env.getReporter()));
+ callback = streamedFormatter.createStreamCallback(output);
} else {
- env.getReporter().handle(Event.error("I/O error: " + e.getMessage()));
- return ExitCode.LOCAL_ENVIRONMENTAL_ERROR;
+ callback = new AggregateAllOutputFormatterCallback<>();
}
- } catch (IOException e) {
- catastrophe = false;
- env.getReporter().handle(Event.error("I/O error: " + e.getMessage()));
- return ExitCode.LOCAL_ENVIRONMENTAL_ERROR;
- } finally {
- if (!catastrophe) {
- if (streamResults) {
- output.flush();
- queryEnv.afterCommand();
- }
- try {
- callback.close();
- } catch (IOException e) {
+ boolean catastrophe = true;
+ try {
+ callback.start();
+ result = queryEnv.evaluateQuery(expr, callback);
+ catastrophe = false;
+ } catch (QueryException e) {
+ catastrophe = false;
+ // Keep consistent with reportBuildFileError()
+ env.getReporter()
+ // TODO(bazel-team): this is a kludge to fix a bug observed in the wild. We should make
+ // sure no null error messages ever get in.
+ .handle(Event.error(e.getMessage() == null ? e.toString() : e.getMessage()));
+ return ExitCode.ANALYSIS_FAILURE;
+ } catch (InterruptedException e) {
+ catastrophe = false;
+ IOException ioException = callback.getIoException();
+ if (ioException == null || ioException instanceof ClosedByInterruptException) {
+ env.getReporter().handle(Event.error("query interrupted"));
+ return ExitCode.INTERRUPTED;
+ } else {
env.getReporter().handle(Event.error("I/O error: " + e.getMessage()));
return ExitCode.LOCAL_ENVIRONMENTAL_ERROR;
}
- }
- }
-
- env.getEventBus().post(new NoBuildEvent());
- if (!streamResults) {
- disableAnsiCharactersFiltering(env);
- output = new PrintStream(env.getReporter().getOutErr().getOutputStream());
-
- // 3. Output results:
- try {
- Set<Target> targets = ((AggregateAllOutputFormatterCallback<Target>) callback).getOutput();
- QueryOutputUtils.output(queryOptions, result,
- targets, formatter, output,
- queryOptions.aspectDeps.createResolver(
- env.getPackageManager(), env.getReporter()));
- } catch (ClosedByInterruptException | InterruptedException e) {
- env.getReporter().handle(Event.error("query interrupted"));
- return ExitCode.INTERRUPTED;
} catch (IOException e) {
+ catastrophe = false;
env.getReporter().handle(Event.error("I/O error: " + e.getMessage()));
return ExitCode.LOCAL_ENVIRONMENTAL_ERROR;
} finally {
- queryEnv.afterCommand();
- // Note that PrintStream#checkError first flushes and then returns whether any
- // error was ever encountered.
- if (output.checkError()) {
- // Unfortunately, there's no way to check the current error status of PrintStream
- // without forcing a flush, so we don't know whether this error happened before or after
- // timewise the one we already have from above. Neither choice is always correct, so we
- // arbitrarily choose the exit code corresponding to the PrintStream's error.
- env.getReporter().handle(Event.error("I/O error while writing query output"));
+ if (!catastrophe) {
+ if (streamResults) {
+ output.flush();
+ }
+ try {
+ callback.close();
+ } catch (IOException e) {
+ env.getReporter().handle(Event.error("I/O error: " + e.getMessage()));
+ return ExitCode.LOCAL_ENVIRONMENTAL_ERROR;
+ }
+ }
+ }
+
+ env.getEventBus().post(new NoBuildEvent());
+ if (!streamResults) {
+ disableAnsiCharactersFiltering(env);
+ output = new PrintStream(env.getReporter().getOutErr().getOutputStream());
+
+ // 3. Output results:
+ try {
+ Set<Target> targets =
+ ((AggregateAllOutputFormatterCallback<Target>) callback).getOutput();
+ QueryOutputUtils.output(
+ queryOptions,
+ result,
+ targets,
+ formatter,
+ output,
+ queryOptions.aspectDeps.createResolver(env.getPackageManager(), env.getReporter()));
+ } catch (ClosedByInterruptException | InterruptedException e) {
+ env.getReporter().handle(Event.error("query interrupted"));
+ return ExitCode.INTERRUPTED;
+ } catch (IOException e) {
+ env.getReporter().handle(Event.error("I/O error: " + e.getMessage()));
return ExitCode.LOCAL_ENVIRONMENTAL_ERROR;
+ } finally {
+ // Note that PrintStream#checkError first flushes and then returns whether any
+ // error was ever encountered.
+ if (output.checkError()) {
+ // Unfortunately, there's no way to check the current error status of PrintStream
+ // without forcing a flush, so we don't know whether this error happened before or after
+ // timewise the one we already have from above. Neither choice is always correct, so we
+ // arbitrarily choose the exit code corresponding to the PrintStream's error.
+ env.getReporter().handle(Event.error("I/O error while writing query output"));
+ return ExitCode.LOCAL_ENVIRONMENTAL_ERROR;
+ }
}
}
}