diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildtool/SkyframeBuilder.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/buildtool/SkyframeBuilder.java | 355 |
1 files changed, 355 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildtool/SkyframeBuilder.java b/src/main/java/com/google/devtools/build/lib/buildtool/SkyframeBuilder.java new file mode 100644 index 0000000000..779515a28b --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/buildtool/SkyframeBuilder.java @@ -0,0 +1,355 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.buildtool; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.common.eventbus.EventBus; +import com.google.devtools.build.lib.actions.Action; +import com.google.devtools.build.lib.actions.ActionCacheChecker; +import com.google.devtools.build.lib.actions.ActionExecutionStatusReporter; +import com.google.devtools.build.lib.actions.ActionInputFileCache; +import com.google.devtools.build.lib.actions.Artifact; +import com.google.devtools.build.lib.actions.BuildFailedException; +import com.google.devtools.build.lib.actions.BuilderUtils; +import com.google.devtools.build.lib.actions.Executor; +import com.google.devtools.build.lib.actions.ResourceManager; +import com.google.devtools.build.lib.actions.TestExecException; +import com.google.devtools.build.lib.analysis.ConfiguredTarget; +import com.google.devtools.build.lib.analysis.TargetCompleteEvent; +import com.google.devtools.build.lib.rules.test.TestProvider; +import com.google.devtools.build.lib.skyframe.ActionExecutionInactivityWatchdog; +import com.google.devtools.build.lib.skyframe.ActionExecutionValue; +import com.google.devtools.build.lib.skyframe.Builder; +import com.google.devtools.build.lib.skyframe.SkyFunctions; +import com.google.devtools.build.lib.skyframe.SkyframeActionExecutor; +import com.google.devtools.build.lib.skyframe.SkyframeExecutor; +import com.google.devtools.build.lib.skyframe.TargetCompletionValue; +import com.google.devtools.build.lib.util.AbruptExitException; +import com.google.devtools.build.lib.util.BlazeClock; +import com.google.devtools.build.skyframe.CycleInfo; +import com.google.devtools.build.skyframe.ErrorInfo; +import com.google.devtools.build.skyframe.EvaluationProgressReceiver; +import com.google.devtools.build.skyframe.EvaluationResult; +import com.google.devtools.build.skyframe.SkyFunctionName; +import com.google.devtools.build.skyframe.SkyKey; +import com.google.devtools.build.skyframe.SkyValue; + +import java.text.NumberFormat; +import java.util.Collection; +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A {@link Builder} implementation driven by Skyframe. + */ +@VisibleForTesting +public class SkyframeBuilder implements Builder { + + private final SkyframeExecutor skyframeExecutor; + private final boolean keepGoing; + private final int numJobs; + private final boolean checkOutputFiles; + private final ActionInputFileCache fileCache; + private final ActionCacheChecker actionCacheChecker; + private final int progressReportInterval; + + @VisibleForTesting + public SkyframeBuilder(SkyframeExecutor skyframeExecutor, ActionCacheChecker actionCacheChecker, + boolean keepGoing, int numJobs, boolean checkOutputFiles, + ActionInputFileCache fileCache, int progressReportInterval) { + this.skyframeExecutor = skyframeExecutor; + this.actionCacheChecker = actionCacheChecker; + this.keepGoing = keepGoing; + this.numJobs = numJobs; + this.checkOutputFiles = checkOutputFiles; + this.fileCache = fileCache; + this.progressReportInterval = progressReportInterval; + } + + @Override + public void buildArtifacts(Set<Artifact> artifacts, + Set<ConfiguredTarget> parallelTests, + Set<ConfiguredTarget> exclusiveTests, + Collection<ConfiguredTarget> targetsToBuild, + Executor executor, + Set<ConfiguredTarget> builtTargets, + boolean explain) + throws BuildFailedException, AbruptExitException, TestExecException, InterruptedException { + skyframeExecutor.prepareExecution(checkOutputFiles); + skyframeExecutor.setFileCache(fileCache); + // Note that executionProgressReceiver accesses builtTargets concurrently (after wrapping in a + // synchronized collection), so unsynchronized access to this variable is unsafe while it runs. + ExecutionProgressReceiver executionProgressReceiver = + new ExecutionProgressReceiver(Preconditions.checkNotNull(builtTargets), + countTestActions(exclusiveTests), skyframeExecutor.getEventBus()); + ResourceManager.instance().setEventBus(skyframeExecutor.getEventBus()); + + boolean success = false; + EvaluationResult<?> result; + + ActionExecutionStatusReporter statusReporter = ActionExecutionStatusReporter.create( + skyframeExecutor.getReporter(), executor, skyframeExecutor.getEventBus()); + + AtomicBoolean isBuildingExclusiveArtifacts = new AtomicBoolean(false); + ActionExecutionInactivityWatchdog watchdog = new ActionExecutionInactivityWatchdog( + executionProgressReceiver.createInactivityMonitor(statusReporter), + executionProgressReceiver.createInactivityReporter(statusReporter, + isBuildingExclusiveArtifacts), progressReportInterval); + + skyframeExecutor.setActionExecutionProgressReportingObjects(executionProgressReceiver, + executionProgressReceiver, statusReporter); + watchdog.start(); + + try { + result = skyframeExecutor.buildArtifacts(executor, artifacts, targetsToBuild, parallelTests, + /*exclusiveTesting=*/false, keepGoing, explain, numJobs, actionCacheChecker, + executionProgressReceiver); + // progressReceiver is finished, so unsynchronized access to builtTargets is now safe. + success = processResult(result, keepGoing, skyframeExecutor); + + Preconditions.checkState( + !success || result.keyNames().size() + == (artifacts.size() + targetsToBuild.size() + parallelTests.size()), + "Build reported as successful but not all artifacts and targets built: %s, %s", + result, artifacts); + + // Run exclusive tests: either tagged as "exclusive" or is run in an invocation with + // --test_output=streamed. + isBuildingExclusiveArtifacts.set(true); + for (ConfiguredTarget exclusiveTest : exclusiveTests) { + // Since only one artifact is being built at a time, we don't worry about an artifact being + // built and then the build being interrupted. + result = skyframeExecutor.buildArtifacts(executor, ImmutableSet.<Artifact>of(), + targetsToBuild, ImmutableSet.of(exclusiveTest), /*exclusiveTesting=*/true, keepGoing, + explain, numJobs, actionCacheChecker, null); + boolean exclusiveSuccess = processResult(result, keepGoing, skyframeExecutor); + Preconditions.checkState(!exclusiveSuccess || !result.keyNames().isEmpty(), + "Build reported as successful but test %s not executed: %s", + exclusiveTest, result); + success &= exclusiveSuccess; + } + } finally { + watchdog.stop(); + ResourceManager.instance().unsetEventBus(); + skyframeExecutor.setActionExecutionProgressReportingObjects(null, null, null); + statusReporter.unregisterFromEventBus(); + } + + if (!success) { + throw new BuildFailedException(); + } + } + + private static boolean resultHasCatastrophicError(EvaluationResult<?> result) { + for (ErrorInfo errorInfo : result.errorMap().values()) { + if (errorInfo.isCatastrophic()) { + return true; + } + } + // An unreported catastrophe manifests with hasError() being true but no errors visible. + return result.hasError() && result.errorMap().isEmpty(); + } + + /** + * Process the Skyframe update, taking into account the keepGoing setting. + * + * Returns false if the update() failed, but we should continue. Returns true on success. + * Throws on fail-fast failures. + */ + private static boolean processResult(EvaluationResult<?> result, boolean keepGoing, + SkyframeExecutor skyframeExecutor) throws BuildFailedException, TestExecException { + if (result.hasError()) { + boolean hasCycles = false; + for (Map.Entry<SkyKey, ErrorInfo> entry : result.errorMap().entrySet()) { + Iterable<CycleInfo> cycles = entry.getValue().getCycleInfo(); + skyframeExecutor.reportCycles(cycles, entry.getKey()); + hasCycles |= !Iterables.isEmpty(cycles); + } + if (keepGoing && !resultHasCatastrophicError(result)) { + return false; + } + if (hasCycles || result.errorMap().isEmpty()) { + // error map may be empty in the case of a catastrophe. + throw new BuildFailedException(); + } else { + // Need to wrap exception for rethrowCause. + BuilderUtils.rethrowCause( + new Exception(Preconditions.checkNotNull(result.getError().getException()))); + } + } + return true; + } + + private static int countTestActions(Iterable<ConfiguredTarget> testTargets) { + int count = 0; + for (ConfiguredTarget testTarget : testTargets) { + count += TestProvider.getTestStatusArtifacts(testTarget).size(); + } + return count; + } + + /** + * Listener for executed actions and built artifacts. We use a listener so that we have an + * accurate set of successfully run actions and built artifacts, even if the build is interrupted. + */ + private static final class ExecutionProgressReceiver implements EvaluationProgressReceiver, + SkyframeActionExecutor.ProgressSupplier, SkyframeActionExecutor.ActionCompletedReceiver { + private static final NumberFormat PROGRESS_MESSAGE_NUMBER_FORMATTER; + + // Must be thread-safe! + private final Set<ConfiguredTarget> builtTargets; + private final Set<SkyKey> enqueuedActions = Sets.newConcurrentHashSet(); + private final Set<Action> completedActions = Sets.newConcurrentHashSet(); + private final Object activityIndicator = new Object(); + /** Number of exclusive tests. To be accounted for in progress messages. */ + private final int exclusiveTestsCount; + private final EventBus eventBus; + + static { + PROGRESS_MESSAGE_NUMBER_FORMATTER = NumberFormat.getIntegerInstance(Locale.ENGLISH); + PROGRESS_MESSAGE_NUMBER_FORMATTER.setGroupingUsed(true); + } + + /** + * {@code builtTargets} is accessed through a synchronized set, and so no other access to it + * is permitted while this receiver is active. + */ + ExecutionProgressReceiver(Set<ConfiguredTarget> builtTargets, int exclusiveTestsCount, + EventBus eventBus) { + this.builtTargets = Collections.synchronizedSet(builtTargets); + this.exclusiveTestsCount = exclusiveTestsCount; + this.eventBus = eventBus; + } + + @Override + public void invalidated(SkyValue node, InvalidationState state) {} + + @Override + public void enqueueing(SkyKey skyKey) { + if (ActionExecutionValue.isReportWorthyAction(skyKey)) { + // Remember all enqueued actions for the benefit of progress reporting. + // We discover most actions early in the build, well before we start executing them. + // Some of these will be cache hits and won't be executed, so we'll need to account for them + // in the evaluated method too. + enqueuedActions.add(skyKey); + } + } + + @Override + public void evaluated(SkyKey skyKey, SkyValue node, EvaluationState state) { + SkyFunctionName type = skyKey.functionName(); + if (type == SkyFunctions.TARGET_COMPLETION) { + TargetCompletionValue val = (TargetCompletionValue) node; + ConfiguredTarget target = val.getConfiguredTarget(); + builtTargets.add(target); + eventBus.post(TargetCompleteEvent.createSuccessful(target)); + } else if (type == SkyFunctions.ACTION_EXECUTION) { + // Remember all completed actions, regardless of having been cached or really executed. + actionCompleted((Action) skyKey.argument()); + } + } + + /** + * {@inheritDoc} + * + * <p>This method adds the action to {@link #completedActions} and notifies the + * {@link #activityIndicator}. + * + * <p>We could do this only in the {@link #evaluated} method too, but as it happens the action + * executor tells the reporter about the completed action before the node is inserted into the + * graph, so the reporter would find out about the completed action sooner than we could + * have updated {@link #completedActions}, which would result in incorrect numbers on the + * progress messages. However we have to store completed actions in {@link #evaluated} too, + * because that's the only place we get notified about completed cached actions. + */ + @Override + public void actionCompleted(Action a) { + if (ActionExecutionValue.isReportWorthyAction(a)) { + completedActions.add(a); + synchronized (activityIndicator) { + activityIndicator.notifyAll(); + } + } + } + + @Override + public String getProgressString() { + return String.format("[%s / %s]", + PROGRESS_MESSAGE_NUMBER_FORMATTER.format(completedActions.size()), + PROGRESS_MESSAGE_NUMBER_FORMATTER.format(exclusiveTestsCount + enqueuedActions.size())); + } + + ActionExecutionInactivityWatchdog.InactivityMonitor createInactivityMonitor( + final ActionExecutionStatusReporter statusReporter) { + return new ActionExecutionInactivityWatchdog.InactivityMonitor() { + + @Override + public boolean hasStarted() { + return !enqueuedActions.isEmpty(); + } + + @Override + public int getPending() { + return statusReporter.getCount(); + } + + @Override + public int waitForNextCompletion(int timeoutMilliseconds) throws InterruptedException { + synchronized (activityIndicator) { + int before = completedActions.size(); + long startTime = BlazeClock.instance().currentTimeMillis(); + while (true) { + activityIndicator.wait(timeoutMilliseconds); + + int completed = completedActions.size() - before; + long now = 0; + if (completed > 0 || (startTime + timeoutMilliseconds) <= (now = BlazeClock.instance() + .currentTimeMillis())) { + // Some actions completed, or timeout fully elapsed. + return completed; + } else { + // Spurious Wakeup -- no actions completed and there's still time to wait. + timeoutMilliseconds -= now - startTime; // account for elapsed wait time + startTime = now; + } + } + } + } + }; + } + + ActionExecutionInactivityWatchdog.InactivityReporter createInactivityReporter( + final ActionExecutionStatusReporter statusReporter, + final AtomicBoolean isBuildingExclusiveArtifacts) { + return new ActionExecutionInactivityWatchdog.InactivityReporter() { + @Override + public void maybeReportInactivity() { + // Do not report inactivity if we are currently running an exclusive test or a streaming + // action (in practice only tests can stream and it implicitly makes them exclusive). + if (!isBuildingExclusiveArtifacts.get()) { + statusReporter.showCurrentlyExecutingActions( + ExecutionProgressReceiver.this.getProgressString() + " "); + } + } + }; + } + } +} |