aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Nathan Harmata <nharmata@google.com>2016-11-09 22:16:14 +0000
committerGravatar Klaus Aehlig <aehlig@google.com>2016-11-10 09:22:18 +0000
commit1df80e54c3a53efc7f86f7c6da9973c7dd43d5fc (patch)
treeb8b95a9185768000fe94af0a324d0c75a74ea0a0
parent4665e709054dcfe34d1e246caefb8847a560e22a (diff)
Introduce MultisetSemaphore: A concurrency primitive for managing access to at most K unique things at once.
-- MOS_MIGRATED_REVID=138684040
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java229
-rw-r--r--src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java246
2 files changed, 475 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
new file mode 100644
index 0000000000..d1f3ef7a77
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
@@ -0,0 +1,229 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.collect.MapMaker;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import com.google.devtools.build.lib.util.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+/**
+ * A concurrency primitive for managing access to at most K unique things at once, for a fixed K.
+ *
+ * <p>You can think of this as a pair of a {@link Semaphore} with K total permits and a
+ * {@link Multiset}, with permits being doled out and returned based on the current contents of the
+ * {@link Multiset}.
+ */
+@ThreadSafe
+public abstract class MultisetSemaphore<T> {
+ /**
+ * Blocks until permits are available for all the values in {@code valuesToAcquire}, and then
+ * atomically acquires these permits.
+ *
+ * <p>{@code acquireAll(valuesToAcquire)} atomically does the following
+ * <ol>
+ * <li>Computes {@code m}, the number of values in {@code valuesToAcquire} that are not
+ * currently in the backing {@link Multiset}.
+ * <li>Adds {@code valuesToAcquire} to the backing {@link Multiset}.
+ * <li>Blocks until {@code m} permits are available from the backing {@link Semaphore}.
+ * <li>Acquires these permits.
+ * </ol>
+ */
+ public abstract void acquireAll(Set<T> valuesToAcquire) throws InterruptedException;
+
+ /**
+ * Atomically releases permits for all the values in {@code valuesToAcquire}.
+ *
+ * <p>{@code releaseAll(valuesToRelease)} atomically does the following
+ * <ol>
+ * <li>Computes {@code m}, the number of values in {@code valuesToRelease} that are currently in
+ * the backing {@link Multiset} with multiplicity 1.
+ * <li>Removes {@code valuesToRelease} from the backing {@link Multiset}.
+ * <li>Release {@code m} permits from the backing {@link Semaphore}.
+ * </ol>
+ *
+ * <p>Assumes that this {@link MultisetSemaphore} has already given out permits for all the
+ * values in {@code valuesToAcquire}.
+ */
+ public abstract void releaseAll(Set<T> valuesToRelease);
+
+ /**
+ * Returns a {@link MultisetSemaphore} with a backing {@link Semaphore} that has an unbounded
+ * number of permits; that is, {@link #acquireAll} will never block.
+ */
+ public static <T> MultisetSemaphore<T> unbounded() {
+ return UnboundedMultisetSemaphore.instance();
+ }
+
+ /** Builder for {@link MultisetSemaphore} instances. */
+ public static class Builder {
+ private static final int UNSET_INT = -1;
+
+ private int maxNumUniqueValues = UNSET_INT;
+ private MapMaker mapMaker = new MapMaker();
+
+ private Builder() {
+ }
+
+ /**
+ * Sets the maximum number of unique values for which permits can be held at once in the
+ * to-be-constructed {@link MultisetSemaphore}.
+ */
+ public Builder maxNumUniqueValues(int maxNumUniqueValues) {
+ Preconditions.checkState(
+ maxNumUniqueValues > 0,
+ "maxNumUniqueValues must be positive (was %d)",
+ maxNumUniqueValues);
+ this.maxNumUniqueValues = maxNumUniqueValues;
+ return this;
+ }
+
+ /**
+ * Sets the concurrency level (expected number of concurrent usages) of internal data structures
+ * of the to-be-constructed {@link MultisetSemaphore}.
+ *
+ * <p>This is a hint for tweaking performance and lock contention.
+ */
+ public Builder concurrencyLevel(int concurrencyLevel) {
+ mapMaker = mapMaker.concurrencyLevel(concurrencyLevel);
+ return this;
+ }
+
+ public <T> MultisetSemaphore<T> build() {
+ Preconditions.checkState(
+ maxNumUniqueValues != UNSET_INT,
+ "maxNumUniqueValues(int) must be specified");
+ return new BoundedMultisetSemaphore<>(maxNumUniqueValues, mapMaker);
+ }
+ }
+
+ /** Returns a fresh {@link Builder}. */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ private static class UnboundedMultisetSemaphore<T> extends MultisetSemaphore<T> {
+ private static final UnboundedMultisetSemaphore<Object> INSTANCE =
+ new UnboundedMultisetSemaphore<Object>();
+
+ private UnboundedMultisetSemaphore() {
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> UnboundedMultisetSemaphore<T> instance() {
+ return (UnboundedMultisetSemaphore<T>) INSTANCE;
+ }
+
+ @Override
+ public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException {
+ }
+
+ @Override
+ public void releaseAll(Set<T> valuesToRelease) {
+ }
+ }
+
+ private static class BoundedMultisetSemaphore<T> extends MultisetSemaphore<T> {
+ // Implementation strategy:
+ //
+ // We have a single Semaphore, access to which is managed by two levels of Multisets, the first
+ // of which is an approximate accounting of the current multiplicities, and the second of which
+ // is an accurate accounting of the current multiplicities. The first level is used to decide
+ // how many permits to acquire from the semaphore on acquireAll and the second level is used to
+ // decide how many permits to release from the semaphore on releaseAll. The separation between
+ // these two levels ensure the atomicity of acquireAll and releaseAll.
+
+ // We also have a map of CountDownLatches, used to handle the case where there is a not-empty
+ // set that is a subset of the set of values for which multiple threads are concurrently trying
+ // to acquire permits.
+
+ private final Semaphore semaphore;
+ private final ConcurrentHashMultiset<T> tentativeValues;
+ private final ConcurrentHashMultiset<T> actualValues;
+ private final ConcurrentMap<T, CountDownLatch> latches;
+
+ private BoundedMultisetSemaphore(int maxNumUniqueValues, MapMaker mapMaker) {
+ this.semaphore = new Semaphore(maxNumUniqueValues);
+ // TODO(nharmata): Use ConcurrentHashMultiset#create(ConcurrentMap<E, AtomicInteger>) when
+ // Bazel is switched to use a more recent version of Guava. Until then we'll have unnecessary
+ // contention when using these Multisets.
+ this.tentativeValues = ConcurrentHashMultiset.create();
+ this.actualValues = ConcurrentHashMultiset.create();
+ this.latches = mapMaker.makeMap();
+ }
+
+ @Override
+ public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException {
+ int numValuesToAcquire = valuesToAcquire.size();
+ HashMap<T, CountDownLatch> latchesToCountDownByValue =
+ Maps.newHashMapWithExpectedSize(numValuesToAcquire);
+ ArrayList<CountDownLatch> latchesToAwait = new ArrayList<>(numValuesToAcquire);
+ for (T value : valuesToAcquire) {
+ int oldCount = tentativeValues.add(value, 1);
+ if (oldCount == 0) {
+ // The value was just uniquely added by us.
+ CountDownLatch latch = new CountDownLatch(1);
+ Preconditions.checkState(latches.put(value, latch) == null, value);
+ latchesToCountDownByValue.put(value, latch);
+ } else {
+ CountDownLatch latch = latches.get(value);
+ if (latch != null) {
+ // The value was recently added by another thread, and that thread is still waiting to
+ // acquire a permit for it.
+ latchesToAwait.add(latch);
+ }
+ }
+ }
+
+ int numUniqueValuesToAcquire = latchesToCountDownByValue.size();
+ semaphore.acquire(numUniqueValuesToAcquire);
+ for (T value : valuesToAcquire) {
+ actualValues.add(value);
+ }
+ for (Map.Entry<T, CountDownLatch> entry : latchesToCountDownByValue.entrySet()) {
+ T value = entry.getKey();
+ CountDownLatch latchToCountDown = entry.getValue();
+ latchToCountDown.countDown();
+ Preconditions.checkState(latchToCountDown == latches.remove(value), value);
+ }
+ for (CountDownLatch latchToAwait : latchesToAwait) {
+ latchToAwait.await();
+ }
+ }
+
+ @Override
+ public void releaseAll(Set<T> valuesToRelease) {
+ int numUniqueValuesToRelease = 0;
+ for (T value : valuesToRelease) {
+ int oldCount = actualValues.remove(value, 1);
+ Preconditions.checkState(oldCount >= 0, "%d %s", oldCount, value);
+ if (oldCount == 1) {
+ numUniqueValuesToRelease++;
+ }
+ tentativeValues.remove(value, 1);
+ }
+
+ semaphore.release(numUniqueValuesToRelease);
+ }
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java
new file mode 100644
index 0000000000..4a1abe5a2c
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java
@@ -0,0 +1,246 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableSet;
+import com.google.devtools.build.lib.util.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link MultisetSemaphore}. */
+@RunWith(JUnit4.class)
+public class MultisetSemaphoreTest {
+
+ @Test
+ public void testSimple_Serial() throws Exception {
+ // When we have a MultisetSemaphore
+ MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
+ .concurrencyLevel(1)
+ // with 3 max num unique values,
+ .maxNumUniqueValues(3)
+ .build();
+
+ // And we serially acquire permits for 3 unique values
+ multisetSemaphore.acquireAll(ImmutableSet.of("a", "b", "c"));
+ // And then attempt to acquire permits for 2 of those same unique values,
+ // Then we don't deadlock.
+ multisetSemaphore.acquireAll(ImmutableSet.of("b", "c"));
+ // And then we release one of the permit for one of those unique values,
+ multisetSemaphore.releaseAll(ImmutableSet.of("c"));
+ // And then we release the other permit,
+ multisetSemaphore.releaseAll(ImmutableSet.of("c"));
+ // We are able to acquire a permit for a 4th unique value.
+ multisetSemaphore.acquireAll(ImmutableSet.of("d"));
+ }
+
+ @Test
+ public void testSimple_Concurrent() throws Exception {
+ // When we have N and M, with M > N and M|N.
+ final int n = 10;
+ int m = n * 2;
+ Preconditions.checkState(m > n && m % n == 0, "M=%d N=%d", m, n);
+ // When we have a MultisetSemaphore
+ final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
+ // With a concurrency level of M
+ .concurrencyLevel(m)
+ // And N max num unique values,
+ .maxNumUniqueValues(n)
+ .build();
+
+ // And a ExecutorService with M threads,
+ ExecutorService executorService = Executors.newFixedThreadPool(m);
+ // And a recorder for thrown exceptions,
+ ThrowableRecordingRunnableWrapper wrapper =
+ new ThrowableRecordingRunnableWrapper("testSimple_Concurrent");
+ final AtomicInteger numThreadsJustAfterAcquireInFirstRound = new AtomicInteger(0);
+ final AtomicInteger numThreadsJustAfterAcquireInSecondRound = new AtomicInteger(0);
+ final AtomicInteger secondRoundCompleted = new AtomicInteger(0);
+ final int napTimeMs = 42;
+ for (int i = 0; i < m; i++) {
+ final String val = "val" + i;
+ // And we submit M Runnables, each of which
+ executorService.submit(wrapper.wrap(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Has two rounds
+
+ // Wherein the first round
+ // The Runnable acquire a permit for a unique value (among M values),
+ ImmutableSet<String> valSet = ImmutableSet.of(val);
+ multisetSemaphore.acquireAll(valSet);
+ assertThat(numThreadsJustAfterAcquireInFirstRound.getAndIncrement()).isLessThan(n);
+ // And then sleeps,
+ Thread.sleep(napTimeMs);
+ numThreadsJustAfterAcquireInFirstRound.decrementAndGet();
+ multisetSemaphore.releaseAll(valSet);
+
+ // And wherein the second round
+ // The Runnable again acquires a permit for its unique value,
+ multisetSemaphore.acquireAll(valSet);
+ assertThat(numThreadsJustAfterAcquireInSecondRound.getAndIncrement()).isLessThan(n);
+ // And then sleeps,
+ Thread.sleep(napTimeMs);
+ numThreadsJustAfterAcquireInSecondRound.decrementAndGet();
+ // And notes that it has completed the second round,
+ secondRoundCompleted.incrementAndGet();
+ multisetSemaphore.releaseAll(valSet);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }));
+ }
+ // And we wait for all M Runnables to complete (that is, none of them were deadlocked),
+ boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService);
+ // Then none of our Runnables threw any Exceptions.
+ assertThat(wrapper.getFirstThrownError()).isNull();
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedException();
+ }
+ // And the counters we used for sanity checks were correctly reset to 0.
+ assertThat(numThreadsJustAfterAcquireInFirstRound.get()).isEqualTo(0);
+ assertThat(numThreadsJustAfterAcquireInSecondRound.get()).isEqualTo(0);
+ // And all M Runnables completed the second round.
+ assertThat(secondRoundCompleted.get()).isEqualTo(m);
+ Set<String> newVals = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ newVals.add("newval" + i);
+ }
+ // And the main test thread is able to acquire permits for N new unique values (indirectly
+ // confirming that the MultisetSemaphore previously had no outstanding permits).
+ multisetSemaphore.acquireAll(newVals);
+ }
+
+ @Test
+ public void testConcurrentAtomicity() throws Exception {
+ int n = 100;
+ // When we have a MultisetSemaphore
+ final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
+ // With a concurrency level of N
+ .concurrencyLevel(n)
+ // And 2 max num unique values,
+ .maxNumUniqueValues(2)
+ .build();
+ // And a ExecutorService with N threads,
+ ExecutorService executorService = Executors.newFixedThreadPool(n);
+ // And a recorder for thrown exceptions,
+ ThrowableRecordingRunnableWrapper wrapper =
+ new ThrowableRecordingRunnableWrapper("testConcurrentAtomicity");
+ final int napTimeMs = 42;
+ // And a done latch with initial count N,
+ final CountDownLatch allDoneLatch = new CountDownLatch(n);
+ final String sameVal = "same-val";
+ for (int i = 0; i < n; i++) {
+ final String differentVal = "different-val" + i;
+ // And we submit N Runnables, each of which
+ executorService.submit(wrapper.wrap(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Set<String> vals = ImmutableSet.of(sameVal, differentVal);
+ // Tries to acquire a permit for a set of two values, one of which is the same for all
+ // the N Runnables and one of which is unique across all N Runnables.
+ multisetSemaphore.acquireAll(vals);
+ // And then sleeps
+ Thread.sleep(napTimeMs);
+ // And then releases its permits
+ multisetSemaphore.releaseAll(vals);
+ // And then counts down the done latch,
+ allDoneLatch.countDown();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }));
+ }
+ // Then all of our Runnables completed (without deadlock!), as expected,
+ boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService);
+ // And thus were able to count down the done latch,
+ allDoneLatch.await();
+ // And also none of them threw any Exceptions.
+ assertThat(wrapper.getFirstThrownError()).isNull();
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedException();
+ }
+ }
+
+ @Test
+ public void testConcurrentRace() throws Exception {
+ // When we have N values
+ int n = 6;
+ ArrayList<String> vals = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ vals.add("val-" + i);
+ }
+ // And we have all permutations of these N values
+ Collection<List<String>> permutations = Collections2.orderedPermutations(vals);
+ int numPermutations = permutations.size();
+ // And we have a MultisetSemaphore
+ final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder()
+ // With a concurrency level of N!
+ .concurrencyLevel(numPermutations)
+ // And with N max num unique values,
+ .maxNumUniqueValues(n)
+ .build();
+ // And a ExecutorService with N! threads,
+ ExecutorService executorService = Executors.newFixedThreadPool(numPermutations);
+ // And a recorder for thrown exceptions,
+ ThrowableRecordingRunnableWrapper wrapper =
+ new ThrowableRecordingRunnableWrapper("testConcurrentRace");
+ for (List<String> orderedVals : permutations) {
+ final Set<String> orderedSet = new LinkedHashSet<>(orderedVals);
+ // And we submit N! Runnables, each of which
+ executorService.submit(wrapper.wrap(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Tries to acquire a permit for the set of N values, with a unique iteration order
+ // (across all the N! different permutations)
+ multisetSemaphore.acquireAll(orderedSet);
+ // And then immediately releases the permit.
+ multisetSemaphore.releaseAll(orderedSet);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }));
+ }
+ // Then all of our Runnables completed (without deadlock!), as expected,
+ boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService);
+ // And also none of them threw any Exceptions.
+ assertThat(wrapper.getFirstThrownError()).isNull();
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedException();
+ }
+ }
+}
+