aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java
diff options
context:
space:
mode:
authorGravatar Eric Fellheimer <felly@google.com>2015-06-26 20:58:18 +0000
committerGravatar Damien Martin-Guillerez <dmarting@google.com>2015-06-29 16:37:52 +0000
commit20e2f6e9bfd65782f7b7e59c7c77432a226d64d4 (patch)
treed9898d0c737e129b753d3aae4e8c8eb7deb2695a /src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java
parentf551f25c99ac4f26b7df78cadfbb0244ac4de814 (diff)
Refactor the legacy globbing thread pool to make use of more modern concurrency abstractions.
Care is taken to maintain the invariant that the glob result returns after all of the work is done, even if there was an exception that cuts the task short. Interruption is an exception to this: In this case, the GlobCache later cancels the task and ensures it is done. -- MOS_MIGRATED_REVID=97000506
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java227
1 files changed, 104 insertions, 123 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java b/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java
index 5fc39dce8e..6e1a066b73 100644
--- a/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java
+++ b/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java
@@ -26,9 +26,11 @@ import com.google.common.cache.CacheLoader;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.Futures;
-import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
@@ -40,6 +42,7 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@@ -478,69 +481,43 @@ public final class UnixGlob {
/**
* Adapts the result of the glob visitation as a Future.
*/
- private static class GlobFuture extends AbstractFuture<List<Path>> {
+ private static class GlobFuture extends ForwardingListenableFuture<List<Path>> {
private final GlobVisitor visitor;
private final boolean checkForInterrupt;
- private final Object completionLock = new Object();
+ private final SettableFuture<List<Path>> delegate = SettableFuture.create();
- public GlobFuture(GlobVisitor visitor, boolean checkForInterrupt) {
+ public GlobFuture(GlobVisitor visitor, boolean interruptible) {
this.visitor = visitor;
- this.checkForInterrupt = checkForInterrupt;
+ this.checkForInterrupt = interruptible;
}
- private List<Path> getSafe() throws InterruptedException, ExecutionException {
- boolean interrupted = false;
- try {
- while (true) {
- try {
- return super.get();
- } catch (InterruptedException e) {
- if (checkForInterrupt) {
- throw e;
- }
- interrupted = true;
- } catch (ExecutionException e) {
- // The checkForInterrupt logic is already handled in
- // GlobVisitor#waitForCompletion().
- Throwables.propagateIfInstanceOf(e.getCause(), InterruptedException.class);
- throw e;
- }
- }
- } finally {
- if (!checkForInterrupt && interrupted) {
- Thread.currentThread().interrupt();
- }
- }
+ @Override
+ public List<Path> get() throws InterruptedException, ExecutionException {
+ return checkForInterrupt ? super.get() : Uninterruptibles.getUninterruptibly(delegate());
}
@Override
- public List<Path> get() throws InterruptedException, ExecutionException {
- synchronized (completionLock) {
- if (isDone()) {
- return getSafe();
- }
+ protected ListenableFuture<List<Path>> delegate() {
+ return delegate;
+ }
- try {
- visitor.waitForCompletion();
- super.set(Lists.newArrayList(visitor.results));
- } catch (Throwable t) {
- super.setException(t);
- }
- List<Path> result = getSafe();
- return result;
- }
+ public void setException(IOException exception) {
+ delegate.setException(exception);
+ }
+
+ public void set(ArrayList<Path> paths) {
+ delegate.set(paths);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- synchronized (completionLock) {
- if (isDone()) {
- return false;
- }
+ // Best-effort interrupt of the in-flight visitation.
+ visitor.cancel();
+ return true;
+ }
- visitor.interrupt();
- return true;
- }
+ public void markCanceled() {
+ super.cancel(true);
}
}
@@ -548,12 +525,8 @@ public final class UnixGlob {
* GlobVisitor executes a glob using parallelism, which is useful when
* the glob() requires many readdir() calls on high latency filesystems.
*/
- private static final class GlobVisitor extends AbstractQueueVisitor {
- // These collections are used across workers and must therefore be
- // thread-safe.
-
- private final static String THREAD_NAME = "GlobVisitor";
-
+ private static final class GlobVisitor {
+ // These collections are used across workers and must therefore be thread-safe.
private final Collection<Path> results =
Collections.synchronizedSet(Sets.<Path>newTreeSet());
private final Cache<String, Pattern> cache = CacheBuilder.newBuilder().build(
@@ -565,20 +538,18 @@ public final class UnixGlob {
});
private final GlobFuture result;
- private final boolean failFastOnInterrupt;
+ private final ThreadPoolExecutor executor;
+ private final AtomicLong pendingOps = new AtomicLong(0);
+ private final AtomicReference<IOException> failure = new AtomicReference<>();
+ private volatile boolean canceled = false;
public GlobVisitor(ThreadPoolExecutor executor, boolean failFastOnInterrupt) {
- super(executor, /*shutdownOnCompletion=*/false, /*failFastOnException=*/true,
- /*failFastOnInterrupt=*/failFastOnInterrupt);
+ this.executor = executor;
this.result = new GlobFuture(this, failFastOnInterrupt);
- this.failFastOnInterrupt = failFastOnInterrupt;
}
public GlobVisitor(boolean failFastOnInterrupt) {
- super(/*concurrent=*/false, 0, 0, 0, null, /*failFastOnException=*/true,
- /*failFastOnInterrupt=*/failFastOnInterrupt, THREAD_NAME);
- this.result = new GlobFuture(this, failFastOnInterrupt);
- this.failFastOnInterrupt = failFastOnInterrupt;
+ this(null, failFastOnInterrupt);
}
/**
@@ -629,38 +600,19 @@ public final class UnixGlob {
// (e.g., readdir calls). In order to optimize, we would need
// to keep track of which patterns shared sub-patterns and which did not
// (for example consider the glob [*/*.java, sub/*.java, */*.txt]).
- for (String[] splitPattern : splitPatterns) {
- queueGlob(base, baseStat.isDirectory(), splitPattern, 0, excludeDirectories,
+ pendingOps.incrementAndGet();
+ try {
+ for (String[] splitPattern : splitPatterns) {
+ queueGlob(base, baseStat.isDirectory(), splitPattern, 0, excludeDirectories,
splitExcludes, 0, results, cache, dirPred, syscalls);
+ }
+ } finally {
+ decrementAndCheckDone();
}
return result;
}
- protected void waitForCompletion() throws IOException, InterruptedException {
- try {
- super.work(failFastOnInterrupt);
- } catch (InterruptedException e) {
- if (failFastOnInterrupt) {
- throw e;
- } else {
- Thread.currentThread().interrupt();
- }
- } catch (IORuntimeException e) {
- if (Thread.interrupted()) {
- // As per the contract of AbstractQueueVisitor#work, if an unchecked exception is thrown
- // and the build is interrupted, the thrown exception is what will be rethrown. Since the
- // user presumably wanted to interrupt the build, we ignore the thrown IORuntimeException
- // (which doesn't indicate a programming bug) and throw an InterruptedException.
- if (failFastOnInterrupt) {
- throw new InterruptedException();
- }
- Thread.currentThread().interrupt();
- }
- throw e.getCauseIOException();
- }
- }
-
private void queueGlob(final Path base, final boolean baseIsDir,
final String[] patternParts, final int idx,
final boolean excludeDirectories,
@@ -668,39 +620,72 @@ public final class UnixGlob {
final int excludeIdx,
final Collection<Path> results, final Cache<String, Pattern> cache,
final Predicate<Path> dirPred, final FilesystemCalls syscalls) throws IOException {
- try {
- enqueue(new Runnable() {
- @Override
- public void run() {
- Profiler.instance().startTask(ProfilerTask.VFS_GLOB, this);
- try {
- reallyGlob(base, baseIsDir, patternParts, idx, excludeDirectories,
- excludePatterns, excludeIdx, results, cache, dirPred, syscalls);
- } catch (IOException e) {
- throw new IORuntimeException(e);
- } catch (InterruptedException e) {
- // When we get to this point, the main thread already knows that the
- // globbing has been interrupted, so we do not need to report the
- // error condition.
- } finally {
- Profiler.instance().completeTask(ProfilerTask.VFS_GLOB);
- }
+ enqueue(new Runnable() {
+ @Override
+ public void run() {
+ Profiler.instance().startTask(ProfilerTask.VFS_GLOB, this);
+ try {
+ reallyGlob(base, baseIsDir, patternParts, idx, excludeDirectories,
+ excludePatterns, excludeIdx, results, cache, dirPred, syscalls);
+ } catch (IOException e) {
+ failure.set(e);
+ } finally {
+ Profiler.instance().completeTask(ProfilerTask.VFS_GLOB);
}
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "%s glob(include=[%s], exclude=[%s], exclude_directories=%s)",
+ base.getPathString(),
+ "\"" + Joiner.on("\", \"").join(patternParts) + "\"",
+ "\"" + Joiner.on("\", \"").join(excludePatterns) + "\"",
+ excludeDirectories);
+ }
+ });
+ }
+
+ protected void enqueue(final Runnable r) {
+ pendingOps.incrementAndGet();
- @Override
- public String toString() {
- return String.format(
- "%s glob(include=[%s], exclude=[%s], exclude_directories=%s)",
- base.getPathString(),
- "\"" + Joiner.on("\", \"").join(patternParts) + "\"",
- "\"" + Joiner.on("\", \"").join(excludePatterns) + "\"",
- excludeDirectories);
+ Runnable wrapped = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (!canceled && failure.get() == null) {
+ r.run();
+ }
+ } finally {
+ decrementAndCheckDone();
}
- });
- } catch (IORuntimeException e) {
- throw e.getCauseIOException();
+ }
+ };
+
+ if (executor == null) {
+ wrapped.run();
+ } else {
+ executor.execute(wrapped);
}
+ }
+ protected void cancel() {
+ this.canceled = true;
+ }
+
+ private void decrementAndCheckDone() {
+ if (pendingOps.decrementAndGet() == 0) {
+ // We get to 0 iff we are done all the relevant work. This is because we always increment
+ // the pending ops count as we're enqueuing, and don't decrement until the task is complete
+ // (which includes accounting for any additional tasks that one enqueues).
+ if (canceled) {
+ result.markCanceled();
+ } else if (failure.get() != null) {
+ result.setException(failure.get());
+ } else {
+ result.set(new ArrayList<>(results));
+ }
+ }
}
/**
@@ -716,11 +701,7 @@ public final class UnixGlob {
int excludeIdx,
Collection<Path> results, Cache<String, Pattern> cache,
Predicate<Path> dirPred,
- FilesystemCalls syscalls) throws IOException, InterruptedException {
- if (failFastOnInterrupt && Thread.interrupted()) {
- throw new InterruptedException();
- }
-
+ FilesystemCalls syscalls) throws IOException {
if (baseIsDir && !dirPred.apply(base)) {
return;
}