aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/vfs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/vfs')
-rw-r--r--src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java227
1 files changed, 104 insertions, 123 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java b/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java
index 5fc39dce8e..6e1a066b73 100644
--- a/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java
+++ b/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java
@@ -26,9 +26,11 @@ import com.google.common.cache.CacheLoader;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.Futures;
-import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
@@ -40,6 +42,7 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@@ -478,69 +481,43 @@ public final class UnixGlob {
/**
* Adapts the result of the glob visitation as a Future.
*/
- private static class GlobFuture extends AbstractFuture<List<Path>> {
+ private static class GlobFuture extends ForwardingListenableFuture<List<Path>> {
private final GlobVisitor visitor;
private final boolean checkForInterrupt;
- private final Object completionLock = new Object();
+ private final SettableFuture<List<Path>> delegate = SettableFuture.create();
- public GlobFuture(GlobVisitor visitor, boolean checkForInterrupt) {
+ public GlobFuture(GlobVisitor visitor, boolean interruptible) {
this.visitor = visitor;
- this.checkForInterrupt = checkForInterrupt;
+ this.checkForInterrupt = interruptible;
}
- private List<Path> getSafe() throws InterruptedException, ExecutionException {
- boolean interrupted = false;
- try {
- while (true) {
- try {
- return super.get();
- } catch (InterruptedException e) {
- if (checkForInterrupt) {
- throw e;
- }
- interrupted = true;
- } catch (ExecutionException e) {
- // The checkForInterrupt logic is already handled in
- // GlobVisitor#waitForCompletion().
- Throwables.propagateIfInstanceOf(e.getCause(), InterruptedException.class);
- throw e;
- }
- }
- } finally {
- if (!checkForInterrupt && interrupted) {
- Thread.currentThread().interrupt();
- }
- }
+ @Override
+ public List<Path> get() throws InterruptedException, ExecutionException {
+ return checkForInterrupt ? super.get() : Uninterruptibles.getUninterruptibly(delegate());
}
@Override
- public List<Path> get() throws InterruptedException, ExecutionException {
- synchronized (completionLock) {
- if (isDone()) {
- return getSafe();
- }
+ protected ListenableFuture<List<Path>> delegate() {
+ return delegate;
+ }
- try {
- visitor.waitForCompletion();
- super.set(Lists.newArrayList(visitor.results));
- } catch (Throwable t) {
- super.setException(t);
- }
- List<Path> result = getSafe();
- return result;
- }
+ public void setException(IOException exception) {
+ delegate.setException(exception);
+ }
+
+ public void set(ArrayList<Path> paths) {
+ delegate.set(paths);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- synchronized (completionLock) {
- if (isDone()) {
- return false;
- }
+ // Best-effort interrupt of the in-flight visitation.
+ visitor.cancel();
+ return true;
+ }
- visitor.interrupt();
- return true;
- }
+ public void markCanceled() {
+ super.cancel(true);
}
}
@@ -548,12 +525,8 @@ public final class UnixGlob {
* GlobVisitor executes a glob using parallelism, which is useful when
* the glob() requires many readdir() calls on high latency filesystems.
*/
- private static final class GlobVisitor extends AbstractQueueVisitor {
- // These collections are used across workers and must therefore be
- // thread-safe.
-
- private final static String THREAD_NAME = "GlobVisitor";
-
+ private static final class GlobVisitor {
+ // These collections are used across workers and must therefore be thread-safe.
private final Collection<Path> results =
Collections.synchronizedSet(Sets.<Path>newTreeSet());
private final Cache<String, Pattern> cache = CacheBuilder.newBuilder().build(
@@ -565,20 +538,18 @@ public final class UnixGlob {
});
private final GlobFuture result;
- private final boolean failFastOnInterrupt;
+ private final ThreadPoolExecutor executor;
+ private final AtomicLong pendingOps = new AtomicLong(0);
+ private final AtomicReference<IOException> failure = new AtomicReference<>();
+ private volatile boolean canceled = false;
public GlobVisitor(ThreadPoolExecutor executor, boolean failFastOnInterrupt) {
- super(executor, /*shutdownOnCompletion=*/false, /*failFastOnException=*/true,
- /*failFastOnInterrupt=*/failFastOnInterrupt);
+ this.executor = executor;
this.result = new GlobFuture(this, failFastOnInterrupt);
- this.failFastOnInterrupt = failFastOnInterrupt;
}
public GlobVisitor(boolean failFastOnInterrupt) {
- super(/*concurrent=*/false, 0, 0, 0, null, /*failFastOnException=*/true,
- /*failFastOnInterrupt=*/failFastOnInterrupt, THREAD_NAME);
- this.result = new GlobFuture(this, failFastOnInterrupt);
- this.failFastOnInterrupt = failFastOnInterrupt;
+ this(null, failFastOnInterrupt);
}
/**
@@ -629,38 +600,19 @@ public final class UnixGlob {
// (e.g., readdir calls). In order to optimize, we would need
// to keep track of which patterns shared sub-patterns and which did not
// (for example consider the glob [*/*.java, sub/*.java, */*.txt]).
- for (String[] splitPattern : splitPatterns) {
- queueGlob(base, baseStat.isDirectory(), splitPattern, 0, excludeDirectories,
+ pendingOps.incrementAndGet();
+ try {
+ for (String[] splitPattern : splitPatterns) {
+ queueGlob(base, baseStat.isDirectory(), splitPattern, 0, excludeDirectories,
splitExcludes, 0, results, cache, dirPred, syscalls);
+ }
+ } finally {
+ decrementAndCheckDone();
}
return result;
}
- protected void waitForCompletion() throws IOException, InterruptedException {
- try {
- super.work(failFastOnInterrupt);
- } catch (InterruptedException e) {
- if (failFastOnInterrupt) {
- throw e;
- } else {
- Thread.currentThread().interrupt();
- }
- } catch (IORuntimeException e) {
- if (Thread.interrupted()) {
- // As per the contract of AbstractQueueVisitor#work, if an unchecked exception is thrown
- // and the build is interrupted, the thrown exception is what will be rethrown. Since the
- // user presumably wanted to interrupt the build, we ignore the thrown IORuntimeException
- // (which doesn't indicate a programming bug) and throw an InterruptedException.
- if (failFastOnInterrupt) {
- throw new InterruptedException();
- }
- Thread.currentThread().interrupt();
- }
- throw e.getCauseIOException();
- }
- }
-
private void queueGlob(final Path base, final boolean baseIsDir,
final String[] patternParts, final int idx,
final boolean excludeDirectories,
@@ -668,39 +620,72 @@ public final class UnixGlob {
final int excludeIdx,
final Collection<Path> results, final Cache<String, Pattern> cache,
final Predicate<Path> dirPred, final FilesystemCalls syscalls) throws IOException {
- try {
- enqueue(new Runnable() {
- @Override
- public void run() {
- Profiler.instance().startTask(ProfilerTask.VFS_GLOB, this);
- try {
- reallyGlob(base, baseIsDir, patternParts, idx, excludeDirectories,
- excludePatterns, excludeIdx, results, cache, dirPred, syscalls);
- } catch (IOException e) {
- throw new IORuntimeException(e);
- } catch (InterruptedException e) {
- // When we get to this point, the main thread already knows that the
- // globbing has been interrupted, so we do not need to report the
- // error condition.
- } finally {
- Profiler.instance().completeTask(ProfilerTask.VFS_GLOB);
- }
+ enqueue(new Runnable() {
+ @Override
+ public void run() {
+ Profiler.instance().startTask(ProfilerTask.VFS_GLOB, this);
+ try {
+ reallyGlob(base, baseIsDir, patternParts, idx, excludeDirectories,
+ excludePatterns, excludeIdx, results, cache, dirPred, syscalls);
+ } catch (IOException e) {
+ failure.set(e);
+ } finally {
+ Profiler.instance().completeTask(ProfilerTask.VFS_GLOB);
}
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "%s glob(include=[%s], exclude=[%s], exclude_directories=%s)",
+ base.getPathString(),
+ "\"" + Joiner.on("\", \"").join(patternParts) + "\"",
+ "\"" + Joiner.on("\", \"").join(excludePatterns) + "\"",
+ excludeDirectories);
+ }
+ });
+ }
+
+ protected void enqueue(final Runnable r) {
+ pendingOps.incrementAndGet();
- @Override
- public String toString() {
- return String.format(
- "%s glob(include=[%s], exclude=[%s], exclude_directories=%s)",
- base.getPathString(),
- "\"" + Joiner.on("\", \"").join(patternParts) + "\"",
- "\"" + Joiner.on("\", \"").join(excludePatterns) + "\"",
- excludeDirectories);
+ Runnable wrapped = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (!canceled && failure.get() == null) {
+ r.run();
+ }
+ } finally {
+ decrementAndCheckDone();
}
- });
- } catch (IORuntimeException e) {
- throw e.getCauseIOException();
+ }
+ };
+
+ if (executor == null) {
+ wrapped.run();
+ } else {
+ executor.execute(wrapped);
}
+ }
+ protected void cancel() {
+ this.canceled = true;
+ }
+
+ private void decrementAndCheckDone() {
+ if (pendingOps.decrementAndGet() == 0) {
+ // We get to 0 iff we are done all the relevant work. This is because we always increment
+ // the pending ops count as we're enqueuing, and don't decrement until the task is complete
+ // (which includes accounting for any additional tasks that one enqueues).
+ if (canceled) {
+ result.markCanceled();
+ } else if (failure.get() != null) {
+ result.setException(failure.get());
+ } else {
+ result.set(new ArrayList<>(results));
+ }
+ }
}
/**
@@ -716,11 +701,7 @@ public final class UnixGlob {
int excludeIdx,
Collection<Path> results, Cache<String, Pattern> cache,
Predicate<Path> dirPred,
- FilesystemCalls syscalls) throws IOException, InterruptedException {
- if (failFastOnInterrupt && Thread.interrupted()) {
- throw new InterruptedException();
- }
-
+ FilesystemCalls syscalls) throws IOException {
if (baseIsDir && !dirPred.apply(base)) {
return;
}