From ce170540ba5401e926a5433e6e35b1d22426e525 Mon Sep 17 00:00:00 2001 From: nharmata Date: Mon, 13 Aug 2018 16:11:16 -0700 Subject: Fix MultisetSemaphore. We use a fixed version of the previous algorithm. See the comments for details. Fancier algorithms exist. I thought of a cool one that makes use of BatchKeyedLocker (would give me an excuse to revive it, heh), but fancy algorithms would be overkill. As noted in the initial commit of NaiveMultisetSemaphore, performance isn't critical. RELNOTES: None PiperOrigin-RevId: 208560559 --- .../build/lib/concurrent/MultisetSemaphore.java | 48 +++++++++----- .../lib/concurrent/MultisetSemaphoreTest.java | 73 +++++++++++++++++++--- 2 files changed, 100 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java index 8a0bb93bde..94611b5ef3 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java @@ -146,30 +146,50 @@ public abstract class MultisetSemaphore { @Override public void acquireAll(Set valuesToAcquire) throws InterruptedException { - int numUniqueValuesToAcquire = 0; + int oldNumNeededPermits; synchronized (lock) { - for (T value : valuesToAcquire) { - int oldCount = actualValues.add(value, 1); - if (oldCount == 0) { - numUniqueValuesToAcquire++; + oldNumNeededPermits = computeNumNeededPermitsLocked(valuesToAcquire); + } + while (true) { + semaphore.acquire(oldNumNeededPermits); + synchronized (lock) { + int newNumNeededPermits = computeNumNeededPermitsLocked(valuesToAcquire); + if (newNumNeededPermits != oldNumNeededPermits) { + // While we were doing 'acquire' above, another thread won the race to acquire the first + // usage of one of the values in 'valuesToAcquire' or release the last usage of one of + // the values. This means we either acquired too many or too few permits, respectively, + // above. Release the permits we did acquire, in order to restore the accuracy of the + // semaphore's current count, and then try again. + semaphore.release(oldNumNeededPermits); + oldNumNeededPermits = newNumNeededPermits; + continue; + } else { + // Our modification to the semaphore was correct, so it's sound to update the multiset. + valuesToAcquire.forEach(actualValues::add); + return; } } } - semaphore.acquire(numUniqueValuesToAcquire); + } + + private int computeNumNeededPermitsLocked(Set valuesToAcquire) { + // We need a permit for each value that is not already in the multiset. + return (int) valuesToAcquire.stream() + .filter(v -> actualValues.count(v) == 0) + .count(); } @Override public void releaseAll(Set valuesToRelease) { - int numUniqueValuesToRelease = 0; synchronized (lock) { - for (T value : valuesToRelease) { - int oldCount = actualValues.remove(value, 1); - if (oldCount == 1) { - numUniqueValuesToRelease++; - } - } + // We need to release a permit for each value that currently has multiplicity 1. + int numPermitsToRelease = + valuesToRelease + .stream() + .mapToInt(v -> actualValues.remove(v, 1) == 1 ? 1 : 0) + .sum(); + semaphore.release(numPermitsToRelease); } - semaphore.release(numUniqueValuesToRelease); } @Override diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java index 56729af005..905709d623 100644 --- a/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java +++ b/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java @@ -17,7 +17,9 @@ import static com.google.common.truth.Truth.assertThat; import com.google.common.base.Preconditions; import com.google.common.collect.Collections2; +import com.google.common.collect.ConcurrentHashMultiset; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import com.google.devtools.build.lib.testutil.TestThread; import com.google.devtools.build.lib.testutil.TestUtils; import java.util.ArrayList; @@ -32,7 +34,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -202,7 +203,7 @@ public class MultisetSemaphoreTest { public void run() { try { Set vals = ImmutableSet.of(sameVal, differentVal); - // Tries to acquire a permit for a set of two values, one of which is the + // Tries to acquire permits for a set of two values, one of which is the // same for all the N Runnables and one of which is unique across all N // Runnables, multisetSemaphore.acquireAll(vals); @@ -231,7 +232,7 @@ public class MultisetSemaphoreTest { } @Test - public void testConcurrentRace() throws Exception { + public void testConcurrentRace_AllPermuations() throws Exception { // When we have N values int n = 6; ArrayList vals = new ArrayList<>(); @@ -250,7 +251,7 @@ public class MultisetSemaphoreTest { ExecutorService executorService = Executors.newFixedThreadPool(numPermutations); // And a recorder for thrown exceptions, ThrowableRecordingRunnableWrapper wrapper = - new ThrowableRecordingRunnableWrapper("testConcurrentRace"); + new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllPermuations"); for (List orderedVals : permutations) { final Set orderedSet = new LinkedHashSet<>(orderedVals); // And we submit N! Runnables, each of which @@ -262,10 +263,10 @@ public class MultisetSemaphoreTest { @Override public void run() { try { - // Tries to acquire a permit for the set of N values, with a unique + // Tries to acquire permits for the set of N values, with a unique // iteration order (across all the N! different permutations), multisetSemaphore.acquireAll(orderedSet); - // And then immediately releases the permit. + // And then immediately releases the permits. multisetSemaphore.releaseAll(orderedSet); } catch (InterruptedException e) { throw new IllegalStateException(e); @@ -284,7 +285,65 @@ public class MultisetSemaphoreTest { } @Test - @Ignore("Fails due to exposing actual deadlock bug (b/112412784)") + public void testConcurrentRace_AllSameSizedCombinations() throws Exception { + // When we have n values + int n = 10; + ImmutableSet.Builder valsBuilder = ImmutableSet.builder(); + for (int i = 0; i < n; i++) { + valsBuilder.add("val-" + i); + } + ImmutableSet vals = valsBuilder.build(); + int k = 5; + // And we have all combinations of size k of these n values + Set> combinations = Sets.combinations(vals, k); + int numCombinations = combinations.size(); + // And we have a MultisetSemaphore + final MultisetSemaphore multisetSemaphore = MultisetSemaphore.newBuilder() + // with K max num unique values, + .maxNumUniqueValues(k) + .build(); + // And a ExecutorService with nCk threads, + ExecutorService executorService = Executors.newFixedThreadPool(numCombinations); + // And a recorder for thrown exceptions, + ThrowableRecordingRunnableWrapper wrapper = + new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllSameSizedCombinations"); + // And a ConcurrentHashMultiset for counting the multiplicities of the values ourselves, + ConcurrentHashMultiset counts = ConcurrentHashMultiset.create(); + for (Set combination : combinations) { + // And, for each of the nCk combinations, we submit a Runnable, that + @SuppressWarnings("unused") + Future possiblyIgnoredError = + executorService.submit( + wrapper.wrap( + new Runnable() { + @Override + public void run() { + try { + // Tries to acquire permits for its set of k values, + multisetSemaphore.acquireAll(combination); + // And then verifies that the multiplicities are as expected, + combination.forEach(counts::add); + assertThat(counts.entrySet().size()).isAtMost(k); + combination.forEach(counts::remove); + // And then releases the permits. + multisetSemaphore.releaseAll(combination); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + })); + } + // Then all of our Runnables completed (without deadlock!), as expected, + boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService); + // And also none of them threw any Exceptions. + assertThat(wrapper.getFirstThrownError()).isNull(); + if (interrupted) { + Thread.currentThread().interrupt(); + throw new InterruptedException(); + } + } + + @Test public void testSimpleDeadlock() throws Exception { final MultisetSemaphore multisetSemaphore = MultisetSemaphore.newBuilder() .maxNumUniqueValues(2) -- cgit v1.2.3