diff options
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java | 48 | ||||
-rw-r--r-- | src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java | 73 |
2 files changed, 100 insertions, 21 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java index 8a0bb93bde..94611b5ef3 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java @@ -146,30 +146,50 @@ public abstract class MultisetSemaphore<T> { @Override public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException { - int numUniqueValuesToAcquire = 0; + int oldNumNeededPermits; synchronized (lock) { - for (T value : valuesToAcquire) { - int oldCount = actualValues.add(value, 1); - if (oldCount == 0) { - numUniqueValuesToAcquire++; + oldNumNeededPermits = computeNumNeededPermitsLocked(valuesToAcquire); + } + while (true) { + semaphore.acquire(oldNumNeededPermits); + synchronized (lock) { + int newNumNeededPermits = computeNumNeededPermitsLocked(valuesToAcquire); + if (newNumNeededPermits != oldNumNeededPermits) { + // While we were doing 'acquire' above, another thread won the race to acquire the first + // usage of one of the values in 'valuesToAcquire' or release the last usage of one of + // the values. This means we either acquired too many or too few permits, respectively, + // above. Release the permits we did acquire, in order to restore the accuracy of the + // semaphore's current count, and then try again. + semaphore.release(oldNumNeededPermits); + oldNumNeededPermits = newNumNeededPermits; + continue; + } else { + // Our modification to the semaphore was correct, so it's sound to update the multiset. + valuesToAcquire.forEach(actualValues::add); + return; } } } - semaphore.acquire(numUniqueValuesToAcquire); + } + + private int computeNumNeededPermitsLocked(Set<T> valuesToAcquire) { + // We need a permit for each value that is not already in the multiset. + return (int) valuesToAcquire.stream() + .filter(v -> actualValues.count(v) == 0) + .count(); } @Override public void releaseAll(Set<T> valuesToRelease) { - int numUniqueValuesToRelease = 0; synchronized (lock) { - for (T value : valuesToRelease) { - int oldCount = actualValues.remove(value, 1); - if (oldCount == 1) { - numUniqueValuesToRelease++; - } - } + // We need to release a permit for each value that currently has multiplicity 1. + int numPermitsToRelease = + valuesToRelease + .stream() + .mapToInt(v -> actualValues.remove(v, 1) == 1 ? 1 : 0) + .sum(); + semaphore.release(numPermitsToRelease); } - semaphore.release(numUniqueValuesToRelease); } @Override diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java index 56729af005..905709d623 100644 --- a/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java +++ b/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java @@ -17,7 +17,9 @@ import static com.google.common.truth.Truth.assertThat; import com.google.common.base.Preconditions; import com.google.common.collect.Collections2; +import com.google.common.collect.ConcurrentHashMultiset; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import com.google.devtools.build.lib.testutil.TestThread; import com.google.devtools.build.lib.testutil.TestUtils; import java.util.ArrayList; @@ -32,7 +34,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -202,7 +203,7 @@ public class MultisetSemaphoreTest { public void run() { try { Set<String> vals = ImmutableSet.of(sameVal, differentVal); - // Tries to acquire a permit for a set of two values, one of which is the + // Tries to acquire permits for a set of two values, one of which is the // same for all the N Runnables and one of which is unique across all N // Runnables, multisetSemaphore.acquireAll(vals); @@ -231,7 +232,7 @@ public class MultisetSemaphoreTest { } @Test - public void testConcurrentRace() throws Exception { + public void testConcurrentRace_AllPermuations() throws Exception { // When we have N values int n = 6; ArrayList<String> vals = new ArrayList<>(); @@ -250,7 +251,7 @@ public class MultisetSemaphoreTest { ExecutorService executorService = Executors.newFixedThreadPool(numPermutations); // And a recorder for thrown exceptions, ThrowableRecordingRunnableWrapper wrapper = - new ThrowableRecordingRunnableWrapper("testConcurrentRace"); + new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllPermuations"); for (List<String> orderedVals : permutations) { final Set<String> orderedSet = new LinkedHashSet<>(orderedVals); // And we submit N! Runnables, each of which @@ -262,10 +263,10 @@ public class MultisetSemaphoreTest { @Override public void run() { try { - // Tries to acquire a permit for the set of N values, with a unique + // Tries to acquire permits for the set of N values, with a unique // iteration order (across all the N! different permutations), multisetSemaphore.acquireAll(orderedSet); - // And then immediately releases the permit. + // And then immediately releases the permits. multisetSemaphore.releaseAll(orderedSet); } catch (InterruptedException e) { throw new IllegalStateException(e); @@ -284,7 +285,65 @@ public class MultisetSemaphoreTest { } @Test - @Ignore("Fails due to exposing actual deadlock bug (b/112412784)") + public void testConcurrentRace_AllSameSizedCombinations() throws Exception { + // When we have n values + int n = 10; + ImmutableSet.Builder<String> valsBuilder = ImmutableSet.builder(); + for (int i = 0; i < n; i++) { + valsBuilder.add("val-" + i); + } + ImmutableSet<String> vals = valsBuilder.build(); + int k = 5; + // And we have all combinations of size k of these n values + Set<Set<String>> combinations = Sets.combinations(vals, k); + int numCombinations = combinations.size(); + // And we have a MultisetSemaphore + final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder() + // with K max num unique values, + .maxNumUniqueValues(k) + .build(); + // And a ExecutorService with nCk threads, + ExecutorService executorService = Executors.newFixedThreadPool(numCombinations); + // And a recorder for thrown exceptions, + ThrowableRecordingRunnableWrapper wrapper = + new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllSameSizedCombinations"); + // And a ConcurrentHashMultiset for counting the multiplicities of the values ourselves, + ConcurrentHashMultiset<String> counts = ConcurrentHashMultiset.create(); + for (Set<String> combination : combinations) { + // And, for each of the nCk combinations, we submit a Runnable, that + @SuppressWarnings("unused") + Future<?> possiblyIgnoredError = + executorService.submit( + wrapper.wrap( + new Runnable() { + @Override + public void run() { + try { + // Tries to acquire permits for its set of k values, + multisetSemaphore.acquireAll(combination); + // And then verifies that the multiplicities are as expected, + combination.forEach(counts::add); + assertThat(counts.entrySet().size()).isAtMost(k); + combination.forEach(counts::remove); + // And then releases the permits. + multisetSemaphore.releaseAll(combination); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + })); + } + // Then all of our Runnables completed (without deadlock!), as expected, + boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService); + // And also none of them threw any Exceptions. + assertThat(wrapper.getFirstThrownError()).isNull(); + if (interrupted) { + Thread.currentThread().interrupt(); + throw new InterruptedException(); + } + } + + @Test public void testSimpleDeadlock() throws Exception { final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder() .maxNumUniqueValues(2) |