diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/vfs')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java | 227 |
1 files changed, 104 insertions, 123 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java b/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java index 5fc39dce8e..6e1a066b73 100644 --- a/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java +++ b/src/main/java/com/google/devtools/build/lib/vfs/UnixGlob.java @@ -26,9 +26,11 @@ import com.google.common.cache.CacheLoader; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ForwardingListenableFuture; import com.google.common.util.concurrent.Futures; -import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.ProfilerTask; @@ -40,6 +42,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; @@ -478,69 +481,43 @@ public final class UnixGlob { /** * Adapts the result of the glob visitation as a Future. */ - private static class GlobFuture extends AbstractFuture<List<Path>> { + private static class GlobFuture extends ForwardingListenableFuture<List<Path>> { private final GlobVisitor visitor; private final boolean checkForInterrupt; - private final Object completionLock = new Object(); + private final SettableFuture<List<Path>> delegate = SettableFuture.create(); - public GlobFuture(GlobVisitor visitor, boolean checkForInterrupt) { + public GlobFuture(GlobVisitor visitor, boolean interruptible) { this.visitor = visitor; - this.checkForInterrupt = checkForInterrupt; + this.checkForInterrupt = interruptible; } - private List<Path> getSafe() throws InterruptedException, ExecutionException { - boolean interrupted = false; - try { - while (true) { - try { - return super.get(); - } catch (InterruptedException e) { - if (checkForInterrupt) { - throw e; - } - interrupted = true; - } catch (ExecutionException e) { - // The checkForInterrupt logic is already handled in - // GlobVisitor#waitForCompletion(). - Throwables.propagateIfInstanceOf(e.getCause(), InterruptedException.class); - throw e; - } - } - } finally { - if (!checkForInterrupt && interrupted) { - Thread.currentThread().interrupt(); - } - } + @Override + public List<Path> get() throws InterruptedException, ExecutionException { + return checkForInterrupt ? super.get() : Uninterruptibles.getUninterruptibly(delegate()); } @Override - public List<Path> get() throws InterruptedException, ExecutionException { - synchronized (completionLock) { - if (isDone()) { - return getSafe(); - } + protected ListenableFuture<List<Path>> delegate() { + return delegate; + } - try { - visitor.waitForCompletion(); - super.set(Lists.newArrayList(visitor.results)); - } catch (Throwable t) { - super.setException(t); - } - List<Path> result = getSafe(); - return result; - } + public void setException(IOException exception) { + delegate.setException(exception); + } + + public void set(ArrayList<Path> paths) { + delegate.set(paths); } @Override public boolean cancel(boolean mayInterruptIfRunning) { - synchronized (completionLock) { - if (isDone()) { - return false; - } + // Best-effort interrupt of the in-flight visitation. + visitor.cancel(); + return true; + } - visitor.interrupt(); - return true; - } + public void markCanceled() { + super.cancel(true); } } @@ -548,12 +525,8 @@ public final class UnixGlob { * GlobVisitor executes a glob using parallelism, which is useful when * the glob() requires many readdir() calls on high latency filesystems. */ - private static final class GlobVisitor extends AbstractQueueVisitor { - // These collections are used across workers and must therefore be - // thread-safe. - - private final static String THREAD_NAME = "GlobVisitor"; - + private static final class GlobVisitor { + // These collections are used across workers and must therefore be thread-safe. private final Collection<Path> results = Collections.synchronizedSet(Sets.<Path>newTreeSet()); private final Cache<String, Pattern> cache = CacheBuilder.newBuilder().build( @@ -565,20 +538,18 @@ public final class UnixGlob { }); private final GlobFuture result; - private final boolean failFastOnInterrupt; + private final ThreadPoolExecutor executor; + private final AtomicLong pendingOps = new AtomicLong(0); + private final AtomicReference<IOException> failure = new AtomicReference<>(); + private volatile boolean canceled = false; public GlobVisitor(ThreadPoolExecutor executor, boolean failFastOnInterrupt) { - super(executor, /*shutdownOnCompletion=*/false, /*failFastOnException=*/true, - /*failFastOnInterrupt=*/failFastOnInterrupt); + this.executor = executor; this.result = new GlobFuture(this, failFastOnInterrupt); - this.failFastOnInterrupt = failFastOnInterrupt; } public GlobVisitor(boolean failFastOnInterrupt) { - super(/*concurrent=*/false, 0, 0, 0, null, /*failFastOnException=*/true, - /*failFastOnInterrupt=*/failFastOnInterrupt, THREAD_NAME); - this.result = new GlobFuture(this, failFastOnInterrupt); - this.failFastOnInterrupt = failFastOnInterrupt; + this(null, failFastOnInterrupt); } /** @@ -629,38 +600,19 @@ public final class UnixGlob { // (e.g., readdir calls). In order to optimize, we would need // to keep track of which patterns shared sub-patterns and which did not // (for example consider the glob [*/*.java, sub/*.java, */*.txt]). - for (String[] splitPattern : splitPatterns) { - queueGlob(base, baseStat.isDirectory(), splitPattern, 0, excludeDirectories, + pendingOps.incrementAndGet(); + try { + for (String[] splitPattern : splitPatterns) { + queueGlob(base, baseStat.isDirectory(), splitPattern, 0, excludeDirectories, splitExcludes, 0, results, cache, dirPred, syscalls); + } + } finally { + decrementAndCheckDone(); } return result; } - protected void waitForCompletion() throws IOException, InterruptedException { - try { - super.work(failFastOnInterrupt); - } catch (InterruptedException e) { - if (failFastOnInterrupt) { - throw e; - } else { - Thread.currentThread().interrupt(); - } - } catch (IORuntimeException e) { - if (Thread.interrupted()) { - // As per the contract of AbstractQueueVisitor#work, if an unchecked exception is thrown - // and the build is interrupted, the thrown exception is what will be rethrown. Since the - // user presumably wanted to interrupt the build, we ignore the thrown IORuntimeException - // (which doesn't indicate a programming bug) and throw an InterruptedException. - if (failFastOnInterrupt) { - throw new InterruptedException(); - } - Thread.currentThread().interrupt(); - } - throw e.getCauseIOException(); - } - } - private void queueGlob(final Path base, final boolean baseIsDir, final String[] patternParts, final int idx, final boolean excludeDirectories, @@ -668,39 +620,72 @@ public final class UnixGlob { final int excludeIdx, final Collection<Path> results, final Cache<String, Pattern> cache, final Predicate<Path> dirPred, final FilesystemCalls syscalls) throws IOException { - try { - enqueue(new Runnable() { - @Override - public void run() { - Profiler.instance().startTask(ProfilerTask.VFS_GLOB, this); - try { - reallyGlob(base, baseIsDir, patternParts, idx, excludeDirectories, - excludePatterns, excludeIdx, results, cache, dirPred, syscalls); - } catch (IOException e) { - throw new IORuntimeException(e); - } catch (InterruptedException e) { - // When we get to this point, the main thread already knows that the - // globbing has been interrupted, so we do not need to report the - // error condition. - } finally { - Profiler.instance().completeTask(ProfilerTask.VFS_GLOB); - } + enqueue(new Runnable() { + @Override + public void run() { + Profiler.instance().startTask(ProfilerTask.VFS_GLOB, this); + try { + reallyGlob(base, baseIsDir, patternParts, idx, excludeDirectories, + excludePatterns, excludeIdx, results, cache, dirPred, syscalls); + } catch (IOException e) { + failure.set(e); + } finally { + Profiler.instance().completeTask(ProfilerTask.VFS_GLOB); } + } + + @Override + public String toString() { + return String.format( + "%s glob(include=[%s], exclude=[%s], exclude_directories=%s)", + base.getPathString(), + "\"" + Joiner.on("\", \"").join(patternParts) + "\"", + "\"" + Joiner.on("\", \"").join(excludePatterns) + "\"", + excludeDirectories); + } + }); + } + + protected void enqueue(final Runnable r) { + pendingOps.incrementAndGet(); - @Override - public String toString() { - return String.format( - "%s glob(include=[%s], exclude=[%s], exclude_directories=%s)", - base.getPathString(), - "\"" + Joiner.on("\", \"").join(patternParts) + "\"", - "\"" + Joiner.on("\", \"").join(excludePatterns) + "\"", - excludeDirectories); + Runnable wrapped = new Runnable() { + @Override + public void run() { + try { + if (!canceled && failure.get() == null) { + r.run(); + } + } finally { + decrementAndCheckDone(); } - }); - } catch (IORuntimeException e) { - throw e.getCauseIOException(); + } + }; + + if (executor == null) { + wrapped.run(); + } else { + executor.execute(wrapped); } + } + protected void cancel() { + this.canceled = true; + } + + private void decrementAndCheckDone() { + if (pendingOps.decrementAndGet() == 0) { + // We get to 0 iff we are done all the relevant work. This is because we always increment + // the pending ops count as we're enqueuing, and don't decrement until the task is complete + // (which includes accounting for any additional tasks that one enqueues). + if (canceled) { + result.markCanceled(); + } else if (failure.get() != null) { + result.setException(failure.get()); + } else { + result.set(new ArrayList<>(results)); + } + } } /** @@ -716,11 +701,7 @@ public final class UnixGlob { int excludeIdx, Collection<Path> results, Cache<String, Pattern> cache, Predicate<Path> dirPred, - FilesystemCalls syscalls) throws IOException, InterruptedException { - if (failFastOnInterrupt && Thread.interrupted()) { - throw new InterruptedException(); - } - + FilesystemCalls syscalls) throws IOException { if (baseIsDir && !dirPred.apply(base)) { return; } |