diff options
Diffstat (limited to 'src/main')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java | 103 |
1 files changed, 20 insertions, 83 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java index d1f3ef7a77..dae49c9e46 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java @@ -13,18 +13,11 @@ // limitations under the License. package com.google.devtools.build.lib.concurrent; -import com.google.common.collect.ConcurrentHashMultiset; -import com.google.common.collect.MapMaker; -import com.google.common.collect.Maps; +import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.util.Preconditions; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; /** @@ -80,7 +73,6 @@ public abstract class MultisetSemaphore<T> { private static final int UNSET_INT = -1; private int maxNumUniqueValues = UNSET_INT; - private MapMaker mapMaker = new MapMaker(); private Builder() { } @@ -98,22 +90,11 @@ public abstract class MultisetSemaphore<T> { return this; } - /** - * Sets the concurrency level (expected number of concurrent usages) of internal data structures - * of the to-be-constructed {@link MultisetSemaphore}. - * - * <p>This is a hint for tweaking performance and lock contention. - */ - public Builder concurrencyLevel(int concurrencyLevel) { - mapMaker = mapMaker.concurrencyLevel(concurrencyLevel); - return this; - } - public <T> MultisetSemaphore<T> build() { Preconditions.checkState( maxNumUniqueValues != UNSET_INT, "maxNumUniqueValues(int) must be specified"); - return new BoundedMultisetSemaphore<>(maxNumUniqueValues, mapMaker); + return new NaiveMultisetSemaphore<>(maxNumUniqueValues); } } @@ -143,86 +124,42 @@ public abstract class MultisetSemaphore<T> { } } - private static class BoundedMultisetSemaphore<T> extends MultisetSemaphore<T> { - // Implementation strategy: - // - // We have a single Semaphore, access to which is managed by two levels of Multisets, the first - // of which is an approximate accounting of the current multiplicities, and the second of which - // is an accurate accounting of the current multiplicities. The first level is used to decide - // how many permits to acquire from the semaphore on acquireAll and the second level is used to - // decide how many permits to release from the semaphore on releaseAll. The separation between - // these two levels ensure the atomicity of acquireAll and releaseAll. - - // We also have a map of CountDownLatches, used to handle the case where there is a not-empty - // set that is a subset of the set of values for which multiple threads are concurrently trying - // to acquire permits. - + private static class NaiveMultisetSemaphore<T> extends MultisetSemaphore<T> { private final Semaphore semaphore; - private final ConcurrentHashMultiset<T> tentativeValues; - private final ConcurrentHashMultiset<T> actualValues; - private final ConcurrentMap<T, CountDownLatch> latches; + private final Object lock = new Object(); + // Protected by 'lock'. + private final HashMultiset<T> actualValues; - private BoundedMultisetSemaphore(int maxNumUniqueValues, MapMaker mapMaker) { + private NaiveMultisetSemaphore(int maxNumUniqueValues) { this.semaphore = new Semaphore(maxNumUniqueValues); - // TODO(nharmata): Use ConcurrentHashMultiset#create(ConcurrentMap<E, AtomicInteger>) when - // Bazel is switched to use a more recent version of Guava. Until then we'll have unnecessary - // contention when using these Multisets. - this.tentativeValues = ConcurrentHashMultiset.create(); - this.actualValues = ConcurrentHashMultiset.create(); - this.latches = mapMaker.makeMap(); + actualValues = HashMultiset.create(); } @Override public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException { - int numValuesToAcquire = valuesToAcquire.size(); - HashMap<T, CountDownLatch> latchesToCountDownByValue = - Maps.newHashMapWithExpectedSize(numValuesToAcquire); - ArrayList<CountDownLatch> latchesToAwait = new ArrayList<>(numValuesToAcquire); - for (T value : valuesToAcquire) { - int oldCount = tentativeValues.add(value, 1); - if (oldCount == 0) { - // The value was just uniquely added by us. - CountDownLatch latch = new CountDownLatch(1); - Preconditions.checkState(latches.put(value, latch) == null, value); - latchesToCountDownByValue.put(value, latch); - } else { - CountDownLatch latch = latches.get(value); - if (latch != null) { - // The value was recently added by another thread, and that thread is still waiting to - // acquire a permit for it. - latchesToAwait.add(latch); + int numUniqueValuesToAcquire = 0; + synchronized (lock) { + for (T value : valuesToAcquire) { + int oldCount = actualValues.add(value, 1); + if (oldCount == 0) { + numUniqueValuesToAcquire++; } } } - - int numUniqueValuesToAcquire = latchesToCountDownByValue.size(); semaphore.acquire(numUniqueValuesToAcquire); - for (T value : valuesToAcquire) { - actualValues.add(value); - } - for (Map.Entry<T, CountDownLatch> entry : latchesToCountDownByValue.entrySet()) { - T value = entry.getKey(); - CountDownLatch latchToCountDown = entry.getValue(); - latchToCountDown.countDown(); - Preconditions.checkState(latchToCountDown == latches.remove(value), value); - } - for (CountDownLatch latchToAwait : latchesToAwait) { - latchToAwait.await(); - } } @Override public void releaseAll(Set<T> valuesToRelease) { int numUniqueValuesToRelease = 0; - for (T value : valuesToRelease) { - int oldCount = actualValues.remove(value, 1); - Preconditions.checkState(oldCount >= 0, "%d %s", oldCount, value); - if (oldCount == 1) { - numUniqueValuesToRelease++; + synchronized (lock) { + for (T value : valuesToRelease) { + int oldCount = actualValues.remove(value, 1); + if (oldCount == 1) { + numUniqueValuesToRelease++; + } } - tentativeValues.remove(value, 1); } - semaphore.release(numUniqueValuesToRelease); } } |