aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com
diff options
context:
space:
mode:
authorGravatar Nathan Harmata <nharmata@google.com>2017-02-22 22:53:02 +0000
committerGravatar Yue Gan <yueg@google.com>2017-02-23 11:30:47 +0000
commit8b2e2459c61c5a112cd8328e97c40da224e9c9ed (patch)
treeec162545304e89b16b4eff7df5eaa084f3953ea2 /src/main/java/com
parent9c018255a72293585cd4232c917f4ed640626d98 (diff)
Replace the fancy, lockless, and incorrect BoundedMultisetSemaphore and with a boring naive synchronized implementation.
There's a race condition in my design of the lockless data structure. I haven't been able to come up with a lockless algorithm that actually works, and the naive one seems to be fine. In Blaze's usage, performance actually isn't super important so the naive implementation is fine. Consider three threads and a MultisetSemaphore with 2 max unique values. T1: acquireAll({a, b}) T2: acquireAll({a, c}) T3: acquireAll({a, d}) For the for-loop before the 'acquire' call, suppose: -T1 wins the race to acquire 'a' [1] and also wants to acquire 'b' [1] -T2 loses the race to acquire 'a' [2] and also wants to acquire 'c' [1] -T3 loses the race to acquire 'a' [2] and also wants to acquire 'd' [1] So then in [3] we have: -T1 tries to acquire 2 permits -T2 tries to acquire 1 permit -T3 tries to acquire 1 permit Suppose the execution order in [3] is T2, T3, T1. So that means we then have T1 still at [3] and both T2 and T3 at [4], which is a deadlock. [1] https://github.com/bazelbuild/bazel/blob/fa96b04f6c8ca6b6b3464727672267133e852959/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java#L184 [2] https://github.com/bazelbuild/bazel/blob/fa96b04f6c8ca6b6b3464727672267133e852959/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java#L191 [3] https://github.com/bazelbuild/bazel/blob/fa96b04f6c8ca6b6b3464727672267133e852959/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java#L199 [4] https://github.com/bazelbuild/bazel/blob/fa96b04f6c8ca6b6b3464727672267133e852959/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java#L210 -- PiperOrigin-RevId: 148272171 MOS_MIGRATED_REVID=148272171
Diffstat (limited to 'src/main/java/com')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java103
1 files changed, 20 insertions, 83 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
index d1f3ef7a77..dae49c9e46 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
@@ -13,18 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.concurrent;
-import com.google.common.collect.ConcurrentHashMultiset;
-import com.google.common.collect.MapMaker;
-import com.google.common.collect.Maps;
+import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.util.Preconditions;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
/**
@@ -80,7 +73,6 @@ public abstract class MultisetSemaphore<T> {
private static final int UNSET_INT = -1;
private int maxNumUniqueValues = UNSET_INT;
- private MapMaker mapMaker = new MapMaker();
private Builder() {
}
@@ -98,22 +90,11 @@ public abstract class MultisetSemaphore<T> {
return this;
}
- /**
- * Sets the concurrency level (expected number of concurrent usages) of internal data structures
- * of the to-be-constructed {@link MultisetSemaphore}.
- *
- * <p>This is a hint for tweaking performance and lock contention.
- */
- public Builder concurrencyLevel(int concurrencyLevel) {
- mapMaker = mapMaker.concurrencyLevel(concurrencyLevel);
- return this;
- }
-
public <T> MultisetSemaphore<T> build() {
Preconditions.checkState(
maxNumUniqueValues != UNSET_INT,
"maxNumUniqueValues(int) must be specified");
- return new BoundedMultisetSemaphore<>(maxNumUniqueValues, mapMaker);
+ return new NaiveMultisetSemaphore<>(maxNumUniqueValues);
}
}
@@ -143,86 +124,42 @@ public abstract class MultisetSemaphore<T> {
}
}
- private static class BoundedMultisetSemaphore<T> extends MultisetSemaphore<T> {
- // Implementation strategy:
- //
- // We have a single Semaphore, access to which is managed by two levels of Multisets, the first
- // of which is an approximate accounting of the current multiplicities, and the second of which
- // is an accurate accounting of the current multiplicities. The first level is used to decide
- // how many permits to acquire from the semaphore on acquireAll and the second level is used to
- // decide how many permits to release from the semaphore on releaseAll. The separation between
- // these two levels ensure the atomicity of acquireAll and releaseAll.
-
- // We also have a map of CountDownLatches, used to handle the case where there is a not-empty
- // set that is a subset of the set of values for which multiple threads are concurrently trying
- // to acquire permits.
-
+ private static class NaiveMultisetSemaphore<T> extends MultisetSemaphore<T> {
private final Semaphore semaphore;
- private final ConcurrentHashMultiset<T> tentativeValues;
- private final ConcurrentHashMultiset<T> actualValues;
- private final ConcurrentMap<T, CountDownLatch> latches;
+ private final Object lock = new Object();
+ // Protected by 'lock'.
+ private final HashMultiset<T> actualValues;
- private BoundedMultisetSemaphore(int maxNumUniqueValues, MapMaker mapMaker) {
+ private NaiveMultisetSemaphore(int maxNumUniqueValues) {
this.semaphore = new Semaphore(maxNumUniqueValues);
- // TODO(nharmata): Use ConcurrentHashMultiset#create(ConcurrentMap<E, AtomicInteger>) when
- // Bazel is switched to use a more recent version of Guava. Until then we'll have unnecessary
- // contention when using these Multisets.
- this.tentativeValues = ConcurrentHashMultiset.create();
- this.actualValues = ConcurrentHashMultiset.create();
- this.latches = mapMaker.makeMap();
+ actualValues = HashMultiset.create();
}
@Override
public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException {
- int numValuesToAcquire = valuesToAcquire.size();
- HashMap<T, CountDownLatch> latchesToCountDownByValue =
- Maps.newHashMapWithExpectedSize(numValuesToAcquire);
- ArrayList<CountDownLatch> latchesToAwait = new ArrayList<>(numValuesToAcquire);
- for (T value : valuesToAcquire) {
- int oldCount = tentativeValues.add(value, 1);
- if (oldCount == 0) {
- // The value was just uniquely added by us.
- CountDownLatch latch = new CountDownLatch(1);
- Preconditions.checkState(latches.put(value, latch) == null, value);
- latchesToCountDownByValue.put(value, latch);
- } else {
- CountDownLatch latch = latches.get(value);
- if (latch != null) {
- // The value was recently added by another thread, and that thread is still waiting to
- // acquire a permit for it.
- latchesToAwait.add(latch);
+ int numUniqueValuesToAcquire = 0;
+ synchronized (lock) {
+ for (T value : valuesToAcquire) {
+ int oldCount = actualValues.add(value, 1);
+ if (oldCount == 0) {
+ numUniqueValuesToAcquire++;
}
}
}
-
- int numUniqueValuesToAcquire = latchesToCountDownByValue.size();
semaphore.acquire(numUniqueValuesToAcquire);
- for (T value : valuesToAcquire) {
- actualValues.add(value);
- }
- for (Map.Entry<T, CountDownLatch> entry : latchesToCountDownByValue.entrySet()) {
- T value = entry.getKey();
- CountDownLatch latchToCountDown = entry.getValue();
- latchToCountDown.countDown();
- Preconditions.checkState(latchToCountDown == latches.remove(value), value);
- }
- for (CountDownLatch latchToAwait : latchesToAwait) {
- latchToAwait.await();
- }
}
@Override
public void releaseAll(Set<T> valuesToRelease) {
int numUniqueValuesToRelease = 0;
- for (T value : valuesToRelease) {
- int oldCount = actualValues.remove(value, 1);
- Preconditions.checkState(oldCount >= 0, "%d %s", oldCount, value);
- if (oldCount == 1) {
- numUniqueValuesToRelease++;
+ synchronized (lock) {
+ for (T value : valuesToRelease) {
+ int oldCount = actualValues.remove(value, 1);
+ if (oldCount == 1) {
+ numUniqueValuesToRelease++;
+ }
}
- tentativeValues.remove(value, 1);
}
-
semaphore.release(numUniqueValuesToRelease);
}
}