aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java48
-rw-r--r--src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java73
2 files changed, 100 insertions, 21 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
index 8a0bb93bde..94611b5ef3 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
@@ -146,30 +146,50 @@ public abstract class MultisetSemaphore<T> {
@Override
public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException {
- int numUniqueValuesToAcquire = 0;
+ int oldNumNeededPermits;
synchronized (lock) {
- for (T value : valuesToAcquire) {
- int oldCount = actualValues.add(value, 1);
- if (oldCount == 0) {
- numUniqueValuesToAcquire++;
+ oldNumNeededPermits = computeNumNeededPermitsLocked(valuesToAcquire);
+ }
+ while (true) {
+ semaphore.acquire(oldNumNeededPermits);
+ synchronized (lock) {
+ int newNumNeededPermits = computeNumNeededPermitsLocked(valuesToAcquire);
+ if (newNumNeededPermits != oldNumNeededPermits) {
+ // While we were doing 'acquire' above, another thread won the race to acquire the first
+ // usage of one of the values in 'valuesToAcquire' or release the last usage of one of
+ // the values. This means we either acquired too many or too few permits, respectively,
+ // above. Release the permits we did acquire, in order to restore the accuracy of the
+ // semaphore's current count, and then try again.
+ semaphore.release(oldNumNeededPermits);
+ oldNumNeededPermits = newNumNeededPermits;
+ continue;
+ } else {
+ // Our modification to the semaphore was correct, so it's sound to update the multiset.
+ valuesToAcquire.forEach(actualValues::add);
+ return;
}
}
}
- semaphore.acquire(numUniqueValuesToAcquire);
+ }
+
+ private int computeNumNeededPermitsLocked(Set<T> valuesToAcquire) {
+ // We need a permit for each value that is not already in the multiset.
+ return (int) valuesToAcquire.stream()
+ .filter(v -> actualValues.count(v) == 0)
+ .count();
}
@Override
public void releaseAll(Set<T> valuesToRelease) {
- int numUniqueValuesToRelease = 0;
synchronized (lock) {
- for (T value : valuesToRelease) {
- int oldCount = actualValues.remove(value, 1);
- if (oldCount == 1) {
- numUniqueValuesToRelease++;
- }
- }
+ // We need to release a permit for each value that currently has multiplicity 1.
+ int numPermitsToRelease =
+ valuesToRelease
+ .stream()
+ .mapToInt(v -> actualValues.remove(v, 1) == 1 ? 1 : 0)
+ .sum();
+ semaphore.release(numPermitsToRelease);
}
- semaphore.release(numUniqueValuesToRelease);
}
@Override
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java
index 56729af005..905709d623 100644
--- a/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java
+++ b/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java
@@ -17,7 +17,9 @@ import static com.google.common.truth.Truth.assertThat;
import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
+import com.google.common.collect.ConcurrentHashMultiset;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import com.google.devtools.build.lib.testutil.TestThread;
import com.google.devtools.build.lib.testutil.TestUtils;
import java.util.ArrayList;
@@ -32,7 +34,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -202,7 +203,7 @@ public class MultisetSemaphoreTest {
public void run() {
try {
Set<String> vals = ImmutableSet.of(sameVal, differentVal);
- // Tries to acquire a permit for a set of two values, one of which is the
+ // Tries to acquire permits for a set of two values, one of which is the
// same for all the N Runnables and one of which is unique across all N
// Runnables,
multisetSemaphore.acquireAll(vals);
@@ -231,7 +232,7 @@ public class MultisetSemaphoreTest {
}
@Test
- public void testConcurrentRace() throws Exception {
+ public void testConcurrentRace_AllPermuations() throws Exception {
// When we have N values
int n = 6;
ArrayList<String> vals = new ArrayList<>();
@@ -250,7 +251,7 @@ public class MultisetSemaphoreTest {
ExecutorService executorService = Executors.newFixedThreadPool(numPermutations);
// And a recorder for thrown exceptions,
ThrowableRecordingRunnableWrapper wrapper =
- new ThrowableRecordingRunnableWrapper("testConcurrentRace");
+ new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllPermuations");
for (List<String> orderedVals : permutations) {
final Set<String> orderedSet = new LinkedHashSet<>(orderedVals);
// And we submit N! Runnables, each of which
@@ -262,10 +263,10 @@ public class MultisetSemaphoreTest {
@Override
public void run() {
try {
- // Tries to acquire a permit for the set of N values, with a unique
+ // Tries to acquire permits for the set of N values, with a unique
// iteration order (across all the N! different permutations),
multisetSemaphore.acquireAll(orderedSet);
- // And then immediately releases the permit.
+ // And then immediately releases the permits.
multisetSemaphore.releaseAll(orderedSet);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
@@ -284,7 +285,65 @@ public class MultisetSemaphoreTest {
}
@Test
- @Ignore("Fails due to exposing actual deadlock bug (b/112412784)")
+ public void testConcurrentRace_AllSameSizedCombinations() throws Exception {
+ // When we have n values
+ int n = 10;
+ ImmutableSet.Builder<String> valsBuilder = ImmutableSet.builder();
+ for (int i = 0; i < n; i++) {
+ valsBuilder.add("val-" + i);
+ }
+ ImmutableSet<String> vals = valsBuilder.build();
+ int k = 5;
+ // And we have all combinations of size k of these n values
+ Set<Set<String>> combinations = Sets.combinations(vals, k);
+ int numCombinations = combinations.size();
+ // And we have a MultisetSemaphore
+ final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
+ // with K max num unique values,
+ .maxNumUniqueValues(k)
+ .build();
+ // And a ExecutorService with nCk threads,
+ ExecutorService executorService = Executors.newFixedThreadPool(numCombinations);
+ // And a recorder for thrown exceptions,
+ ThrowableRecordingRunnableWrapper wrapper =
+ new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllSameSizedCombinations");
+ // And a ConcurrentHashMultiset for counting the multiplicities of the values ourselves,
+ ConcurrentHashMultiset<String> counts = ConcurrentHashMultiset.create();
+ for (Set<String> combination : combinations) {
+ // And, for each of the nCk combinations, we submit a Runnable, that
+ @SuppressWarnings("unused")
+ Future<?> possiblyIgnoredError =
+ executorService.submit(
+ wrapper.wrap(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Tries to acquire permits for its set of k values,
+ multisetSemaphore.acquireAll(combination);
+ // And then verifies that the multiplicities are as expected,
+ combination.forEach(counts::add);
+ assertThat(counts.entrySet().size()).isAtMost(k);
+ combination.forEach(counts::remove);
+ // And then releases the permits.
+ multisetSemaphore.releaseAll(combination);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }));
+ }
+ // Then all of our Runnables completed (without deadlock!), as expected,
+ boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService);
+ // And also none of them threw any Exceptions.
+ assertThat(wrapper.getFirstThrownError()).isNull();
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedException();
+ }
+ }
+
+ @Test
public void testSimpleDeadlock() throws Exception {
final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
.maxNumUniqueValues(2)