aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar nharmata <nharmata@google.com>2018-08-13 16:11:16 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-08-13 16:12:22 -0700
commitce170540ba5401e926a5433e6e35b1d22426e525 (patch)
tree01e63404c573168d3793200b935193089acaf591
parent74321cffb64adbecb7dff11c273a8d86b70ad210 (diff)
Fix MultisetSemaphore.
We use a fixed version of the previous algorithm. See the comments for details. Fancier algorithms exist. I thought of a cool one that makes use of BatchKeyedLocker (would give me an excuse to revive it, heh), but fancy algorithms would be overkill. As noted in the initial commit of NaiveMultisetSemaphore, performance isn't critical. RELNOTES: None PiperOrigin-RevId: 208560559
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java48
-rw-r--r--src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java73
2 files changed, 100 insertions, 21 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
index 8a0bb93bde..94611b5ef3 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
@@ -146,30 +146,50 @@ public abstract class MultisetSemaphore<T> {
@Override
public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException {
- int numUniqueValuesToAcquire = 0;
+ int oldNumNeededPermits;
synchronized (lock) {
- for (T value : valuesToAcquire) {
- int oldCount = actualValues.add(value, 1);
- if (oldCount == 0) {
- numUniqueValuesToAcquire++;
+ oldNumNeededPermits = computeNumNeededPermitsLocked(valuesToAcquire);
+ }
+ while (true) {
+ semaphore.acquire(oldNumNeededPermits);
+ synchronized (lock) {
+ int newNumNeededPermits = computeNumNeededPermitsLocked(valuesToAcquire);
+ if (newNumNeededPermits != oldNumNeededPermits) {
+ // While we were doing 'acquire' above, another thread won the race to acquire the first
+ // usage of one of the values in 'valuesToAcquire' or release the last usage of one of
+ // the values. This means we either acquired too many or too few permits, respectively,
+ // above. Release the permits we did acquire, in order to restore the accuracy of the
+ // semaphore's current count, and then try again.
+ semaphore.release(oldNumNeededPermits);
+ oldNumNeededPermits = newNumNeededPermits;
+ continue;
+ } else {
+ // Our modification to the semaphore was correct, so it's sound to update the multiset.
+ valuesToAcquire.forEach(actualValues::add);
+ return;
}
}
}
- semaphore.acquire(numUniqueValuesToAcquire);
+ }
+
+ private int computeNumNeededPermitsLocked(Set<T> valuesToAcquire) {
+ // We need a permit for each value that is not already in the multiset.
+ return (int) valuesToAcquire.stream()
+ .filter(v -> actualValues.count(v) == 0)
+ .count();
}
@Override
public void releaseAll(Set<T> valuesToRelease) {
- int numUniqueValuesToRelease = 0;
synchronized (lock) {
- for (T value : valuesToRelease) {
- int oldCount = actualValues.remove(value, 1);
- if (oldCount == 1) {
- numUniqueValuesToRelease++;
- }
- }
+ // We need to release a permit for each value that currently has multiplicity 1.
+ int numPermitsToRelease =
+ valuesToRelease
+ .stream()
+ .mapToInt(v -> actualValues.remove(v, 1) == 1 ? 1 : 0)
+ .sum();
+ semaphore.release(numPermitsToRelease);
}
- semaphore.release(numUniqueValuesToRelease);
}
@Override
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java
index 56729af005..905709d623 100644
--- a/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java
+++ b/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java
@@ -17,7 +17,9 @@ import static com.google.common.truth.Truth.assertThat;
import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
+import com.google.common.collect.ConcurrentHashMultiset;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import com.google.devtools.build.lib.testutil.TestThread;
import com.google.devtools.build.lib.testutil.TestUtils;
import java.util.ArrayList;
@@ -32,7 +34,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -202,7 +203,7 @@ public class MultisetSemaphoreTest {
public void run() {
try {
Set<String> vals = ImmutableSet.of(sameVal, differentVal);
- // Tries to acquire a permit for a set of two values, one of which is the
+ // Tries to acquire permits for a set of two values, one of which is the
// same for all the N Runnables and one of which is unique across all N
// Runnables,
multisetSemaphore.acquireAll(vals);
@@ -231,7 +232,7 @@ public class MultisetSemaphoreTest {
}
@Test
- public void testConcurrentRace() throws Exception {
+ public void testConcurrentRace_AllPermuations() throws Exception {
// When we have N values
int n = 6;
ArrayList<String> vals = new ArrayList<>();
@@ -250,7 +251,7 @@ public class MultisetSemaphoreTest {
ExecutorService executorService = Executors.newFixedThreadPool(numPermutations);
// And a recorder for thrown exceptions,
ThrowableRecordingRunnableWrapper wrapper =
- new ThrowableRecordingRunnableWrapper("testConcurrentRace");
+ new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllPermuations");
for (List<String> orderedVals : permutations) {
final Set<String> orderedSet = new LinkedHashSet<>(orderedVals);
// And we submit N! Runnables, each of which
@@ -262,10 +263,10 @@ public class MultisetSemaphoreTest {
@Override
public void run() {
try {
- // Tries to acquire a permit for the set of N values, with a unique
+ // Tries to acquire permits for the set of N values, with a unique
// iteration order (across all the N! different permutations),
multisetSemaphore.acquireAll(orderedSet);
- // And then immediately releases the permit.
+ // And then immediately releases the permits.
multisetSemaphore.releaseAll(orderedSet);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
@@ -284,7 +285,65 @@ public class MultisetSemaphoreTest {
}
@Test
- @Ignore("Fails due to exposing actual deadlock bug (b/112412784)")
+ public void testConcurrentRace_AllSameSizedCombinations() throws Exception {
+ // When we have n values
+ int n = 10;
+ ImmutableSet.Builder<String> valsBuilder = ImmutableSet.builder();
+ for (int i = 0; i < n; i++) {
+ valsBuilder.add("val-" + i);
+ }
+ ImmutableSet<String> vals = valsBuilder.build();
+ int k = 5;
+ // And we have all combinations of size k of these n values
+ Set<Set<String>> combinations = Sets.combinations(vals, k);
+ int numCombinations = combinations.size();
+ // And we have a MultisetSemaphore
+ final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
+ // with K max num unique values,
+ .maxNumUniqueValues(k)
+ .build();
+ // And a ExecutorService with nCk threads,
+ ExecutorService executorService = Executors.newFixedThreadPool(numCombinations);
+ // And a recorder for thrown exceptions,
+ ThrowableRecordingRunnableWrapper wrapper =
+ new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllSameSizedCombinations");
+ // And a ConcurrentHashMultiset for counting the multiplicities of the values ourselves,
+ ConcurrentHashMultiset<String> counts = ConcurrentHashMultiset.create();
+ for (Set<String> combination : combinations) {
+ // And, for each of the nCk combinations, we submit a Runnable, that
+ @SuppressWarnings("unused")
+ Future<?> possiblyIgnoredError =
+ executorService.submit(
+ wrapper.wrap(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Tries to acquire permits for its set of k values,
+ multisetSemaphore.acquireAll(combination);
+ // And then verifies that the multiplicities are as expected,
+ combination.forEach(counts::add);
+ assertThat(counts.entrySet().size()).isAtMost(k);
+ combination.forEach(counts::remove);
+ // And then releases the permits.
+ multisetSemaphore.releaseAll(combination);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }));
+ }
+ // Then all of our Runnables completed (without deadlock!), as expected,
+ boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService);
+ // And also none of them threw any Exceptions.
+ assertThat(wrapper.getFirstThrownError()).isNull();
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedException();
+ }
+ }
+
+ @Test
public void testSimpleDeadlock() throws Exception {
final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
.maxNumUniqueValues(2)