aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java103
1 files changed, 20 insertions, 83 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
index d1f3ef7a77..dae49c9e46 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
@@ -13,18 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.concurrent;
-import com.google.common.collect.ConcurrentHashMultiset;
-import com.google.common.collect.MapMaker;
-import com.google.common.collect.Maps;
+import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.util.Preconditions;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
/**
@@ -80,7 +73,6 @@ public abstract class MultisetSemaphore<T> {
private static final int UNSET_INT = -1;
private int maxNumUniqueValues = UNSET_INT;
- private MapMaker mapMaker = new MapMaker();
private Builder() {
}
@@ -98,22 +90,11 @@ public abstract class MultisetSemaphore<T> {
return this;
}
- /**
- * Sets the concurrency level (expected number of concurrent usages) of internal data structures
- * of the to-be-constructed {@link MultisetSemaphore}.
- *
- * <p>This is a hint for tweaking performance and lock contention.
- */
- public Builder concurrencyLevel(int concurrencyLevel) {
- mapMaker = mapMaker.concurrencyLevel(concurrencyLevel);
- return this;
- }
-
public <T> MultisetSemaphore<T> build() {
Preconditions.checkState(
maxNumUniqueValues != UNSET_INT,
"maxNumUniqueValues(int) must be specified");
- return new BoundedMultisetSemaphore<>(maxNumUniqueValues, mapMaker);
+ return new NaiveMultisetSemaphore<>(maxNumUniqueValues);
}
}
@@ -143,86 +124,42 @@ public abstract class MultisetSemaphore<T> {
}
}
- private static class BoundedMultisetSemaphore<T> extends MultisetSemaphore<T> {
- // Implementation strategy:
- //
- // We have a single Semaphore, access to which is managed by two levels of Multisets, the first
- // of which is an approximate accounting of the current multiplicities, and the second of which
- // is an accurate accounting of the current multiplicities. The first level is used to decide
- // how many permits to acquire from the semaphore on acquireAll and the second level is used to
- // decide how many permits to release from the semaphore on releaseAll. The separation between
- // these two levels ensure the atomicity of acquireAll and releaseAll.
-
- // We also have a map of CountDownLatches, used to handle the case where there is a not-empty
- // set that is a subset of the set of values for which multiple threads are concurrently trying
- // to acquire permits.
-
+ private static class NaiveMultisetSemaphore<T> extends MultisetSemaphore<T> {
private final Semaphore semaphore;
- private final ConcurrentHashMultiset<T> tentativeValues;
- private final ConcurrentHashMultiset<T> actualValues;
- private final ConcurrentMap<T, CountDownLatch> latches;
+ private final Object lock = new Object();
+ // Protected by 'lock'.
+ private final HashMultiset<T> actualValues;
- private BoundedMultisetSemaphore(int maxNumUniqueValues, MapMaker mapMaker) {
+ private NaiveMultisetSemaphore(int maxNumUniqueValues) {
this.semaphore = new Semaphore(maxNumUniqueValues);
- // TODO(nharmata): Use ConcurrentHashMultiset#create(ConcurrentMap<E, AtomicInteger>) when
- // Bazel is switched to use a more recent version of Guava. Until then we'll have unnecessary
- // contention when using these Multisets.
- this.tentativeValues = ConcurrentHashMultiset.create();
- this.actualValues = ConcurrentHashMultiset.create();
- this.latches = mapMaker.makeMap();
+ actualValues = HashMultiset.create();
}
@Override
public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException {
- int numValuesToAcquire = valuesToAcquire.size();
- HashMap<T, CountDownLatch> latchesToCountDownByValue =
- Maps.newHashMapWithExpectedSize(numValuesToAcquire);
- ArrayList<CountDownLatch> latchesToAwait = new ArrayList<>(numValuesToAcquire);
- for (T value : valuesToAcquire) {
- int oldCount = tentativeValues.add(value, 1);
- if (oldCount == 0) {
- // The value was just uniquely added by us.
- CountDownLatch latch = new CountDownLatch(1);
- Preconditions.checkState(latches.put(value, latch) == null, value);
- latchesToCountDownByValue.put(value, latch);
- } else {
- CountDownLatch latch = latches.get(value);
- if (latch != null) {
- // The value was recently added by another thread, and that thread is still waiting to
- // acquire a permit for it.
- latchesToAwait.add(latch);
+ int numUniqueValuesToAcquire = 0;
+ synchronized (lock) {
+ for (T value : valuesToAcquire) {
+ int oldCount = actualValues.add(value, 1);
+ if (oldCount == 0) {
+ numUniqueValuesToAcquire++;
}
}
}
-
- int numUniqueValuesToAcquire = latchesToCountDownByValue.size();
semaphore.acquire(numUniqueValuesToAcquire);
- for (T value : valuesToAcquire) {
- actualValues.add(value);
- }
- for (Map.Entry<T, CountDownLatch> entry : latchesToCountDownByValue.entrySet()) {
- T value = entry.getKey();
- CountDownLatch latchToCountDown = entry.getValue();
- latchToCountDown.countDown();
- Preconditions.checkState(latchToCountDown == latches.remove(value), value);
- }
- for (CountDownLatch latchToAwait : latchesToAwait) {
- latchToAwait.await();
- }
}
@Override
public void releaseAll(Set<T> valuesToRelease) {
int numUniqueValuesToRelease = 0;
- for (T value : valuesToRelease) {
- int oldCount = actualValues.remove(value, 1);
- Preconditions.checkState(oldCount >= 0, "%d %s", oldCount, value);
- if (oldCount == 1) {
- numUniqueValuesToRelease++;
+ synchronized (lock) {
+ for (T value : valuesToRelease) {
+ int oldCount = actualValues.remove(value, 1);
+ if (oldCount == 1) {
+ numUniqueValuesToRelease++;
+ }
}
- tentativeValues.remove(value, 1);
}
-
semaphore.release(numUniqueValuesToRelease);
}
}