diff options
author | Nathan Harmata <nharmata@google.com> | 2016-11-09 22:16:14 +0000 |
---|---|---|
committer | Klaus Aehlig <aehlig@google.com> | 2016-11-10 09:22:18 +0000 |
commit | 1df80e54c3a53efc7f86f7c6da9973c7dd43d5fc (patch) | |
tree | b8b95a9185768000fe94af0a324d0c75a74ea0a0 | |
parent | 4665e709054dcfe34d1e246caefb8847a560e22a (diff) |
Introduce MultisetSemaphore: A concurrency primitive for managing access to at most K unique things at once.
--
MOS_MIGRATED_REVID=138684040
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java | 229 | ||||
-rw-r--r-- | src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java | 246 |
2 files changed, 475 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java new file mode 100644 index 0000000000..d1f3ef7a77 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java @@ -0,0 +1,229 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import com.google.common.collect.ConcurrentHashMultiset; +import com.google.common.collect.MapMaker; +import com.google.common.collect.Maps; +import com.google.common.collect.Multiset; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; +import com.google.devtools.build.lib.util.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; + +/** + * A concurrency primitive for managing access to at most K unique things at once, for a fixed K. + * + * <p>You can think of this as a pair of a {@link Semaphore} with K total permits and a + * {@link Multiset}, with permits being doled out and returned based on the current contents of the + * {@link Multiset}. + */ +@ThreadSafe +public abstract class MultisetSemaphore<T> { + /** + * Blocks until permits are available for all the values in {@code valuesToAcquire}, and then + * atomically acquires these permits. + * + * <p>{@code acquireAll(valuesToAcquire)} atomically does the following + * <ol> + * <li>Computes {@code m}, the number of values in {@code valuesToAcquire} that are not + * currently in the backing {@link Multiset}. + * <li>Adds {@code valuesToAcquire} to the backing {@link Multiset}. + * <li>Blocks until {@code m} permits are available from the backing {@link Semaphore}. + * <li>Acquires these permits. + * </ol> + */ + public abstract void acquireAll(Set<T> valuesToAcquire) throws InterruptedException; + + /** + * Atomically releases permits for all the values in {@code valuesToAcquire}. + * + * <p>{@code releaseAll(valuesToRelease)} atomically does the following + * <ol> + * <li>Computes {@code m}, the number of values in {@code valuesToRelease} that are currently in + * the backing {@link Multiset} with multiplicity 1. + * <li>Removes {@code valuesToRelease} from the backing {@link Multiset}. + * <li>Release {@code m} permits from the backing {@link Semaphore}. + * </ol> + * + * <p>Assumes that this {@link MultisetSemaphore} has already given out permits for all the + * values in {@code valuesToAcquire}. + */ + public abstract void releaseAll(Set<T> valuesToRelease); + + /** + * Returns a {@link MultisetSemaphore} with a backing {@link Semaphore} that has an unbounded + * number of permits; that is, {@link #acquireAll} will never block. + */ + public static <T> MultisetSemaphore<T> unbounded() { + return UnboundedMultisetSemaphore.instance(); + } + + /** Builder for {@link MultisetSemaphore} instances. */ + public static class Builder { + private static final int UNSET_INT = -1; + + private int maxNumUniqueValues = UNSET_INT; + private MapMaker mapMaker = new MapMaker(); + + private Builder() { + } + + /** + * Sets the maximum number of unique values for which permits can be held at once in the + * to-be-constructed {@link MultisetSemaphore}. + */ + public Builder maxNumUniqueValues(int maxNumUniqueValues) { + Preconditions.checkState( + maxNumUniqueValues > 0, + "maxNumUniqueValues must be positive (was %d)", + maxNumUniqueValues); + this.maxNumUniqueValues = maxNumUniqueValues; + return this; + } + + /** + * Sets the concurrency level (expected number of concurrent usages) of internal data structures + * of the to-be-constructed {@link MultisetSemaphore}. + * + * <p>This is a hint for tweaking performance and lock contention. + */ + public Builder concurrencyLevel(int concurrencyLevel) { + mapMaker = mapMaker.concurrencyLevel(concurrencyLevel); + return this; + } + + public <T> MultisetSemaphore<T> build() { + Preconditions.checkState( + maxNumUniqueValues != UNSET_INT, + "maxNumUniqueValues(int) must be specified"); + return new BoundedMultisetSemaphore<>(maxNumUniqueValues, mapMaker); + } + } + + /** Returns a fresh {@link Builder}. */ + public static Builder newBuilder() { + return new Builder(); + } + + private static class UnboundedMultisetSemaphore<T> extends MultisetSemaphore<T> { + private static final UnboundedMultisetSemaphore<Object> INSTANCE = + new UnboundedMultisetSemaphore<Object>(); + + private UnboundedMultisetSemaphore() { + } + + @SuppressWarnings("unchecked") + private static <T> UnboundedMultisetSemaphore<T> instance() { + return (UnboundedMultisetSemaphore<T>) INSTANCE; + } + + @Override + public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException { + } + + @Override + public void releaseAll(Set<T> valuesToRelease) { + } + } + + private static class BoundedMultisetSemaphore<T> extends MultisetSemaphore<T> { + // Implementation strategy: + // + // We have a single Semaphore, access to which is managed by two levels of Multisets, the first + // of which is an approximate accounting of the current multiplicities, and the second of which + // is an accurate accounting of the current multiplicities. The first level is used to decide + // how many permits to acquire from the semaphore on acquireAll and the second level is used to + // decide how many permits to release from the semaphore on releaseAll. The separation between + // these two levels ensure the atomicity of acquireAll and releaseAll. + + // We also have a map of CountDownLatches, used to handle the case where there is a not-empty + // set that is a subset of the set of values for which multiple threads are concurrently trying + // to acquire permits. + + private final Semaphore semaphore; + private final ConcurrentHashMultiset<T> tentativeValues; + private final ConcurrentHashMultiset<T> actualValues; + private final ConcurrentMap<T, CountDownLatch> latches; + + private BoundedMultisetSemaphore(int maxNumUniqueValues, MapMaker mapMaker) { + this.semaphore = new Semaphore(maxNumUniqueValues); + // TODO(nharmata): Use ConcurrentHashMultiset#create(ConcurrentMap<E, AtomicInteger>) when + // Bazel is switched to use a more recent version of Guava. Until then we'll have unnecessary + // contention when using these Multisets. + this.tentativeValues = ConcurrentHashMultiset.create(); + this.actualValues = ConcurrentHashMultiset.create(); + this.latches = mapMaker.makeMap(); + } + + @Override + public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException { + int numValuesToAcquire = valuesToAcquire.size(); + HashMap<T, CountDownLatch> latchesToCountDownByValue = + Maps.newHashMapWithExpectedSize(numValuesToAcquire); + ArrayList<CountDownLatch> latchesToAwait = new ArrayList<>(numValuesToAcquire); + for (T value : valuesToAcquire) { + int oldCount = tentativeValues.add(value, 1); + if (oldCount == 0) { + // The value was just uniquely added by us. + CountDownLatch latch = new CountDownLatch(1); + Preconditions.checkState(latches.put(value, latch) == null, value); + latchesToCountDownByValue.put(value, latch); + } else { + CountDownLatch latch = latches.get(value); + if (latch != null) { + // The value was recently added by another thread, and that thread is still waiting to + // acquire a permit for it. + latchesToAwait.add(latch); + } + } + } + + int numUniqueValuesToAcquire = latchesToCountDownByValue.size(); + semaphore.acquire(numUniqueValuesToAcquire); + for (T value : valuesToAcquire) { + actualValues.add(value); + } + for (Map.Entry<T, CountDownLatch> entry : latchesToCountDownByValue.entrySet()) { + T value = entry.getKey(); + CountDownLatch latchToCountDown = entry.getValue(); + latchToCountDown.countDown(); + Preconditions.checkState(latchToCountDown == latches.remove(value), value); + } + for (CountDownLatch latchToAwait : latchesToAwait) { + latchToAwait.await(); + } + } + + @Override + public void releaseAll(Set<T> valuesToRelease) { + int numUniqueValuesToRelease = 0; + for (T value : valuesToRelease) { + int oldCount = actualValues.remove(value, 1); + Preconditions.checkState(oldCount >= 0, "%d %s", oldCount, value); + if (oldCount == 1) { + numUniqueValuesToRelease++; + } + tentativeValues.remove(value, 1); + } + + semaphore.release(numUniqueValuesToRelease); + } + } +} diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java new file mode 100644 index 0000000000..4a1abe5a2c --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/concurrent/MultisetSemaphoreTest.java @@ -0,0 +1,246 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableSet; +import com.google.devtools.build.lib.util.Preconditions; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link MultisetSemaphore}. */ +@RunWith(JUnit4.class) +public class MultisetSemaphoreTest { + + @Test + public void testSimple_Serial() throws Exception { + // When we have a MultisetSemaphore + MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder() + .concurrencyLevel(1) + // with 3 max num unique values, + .maxNumUniqueValues(3) + .build(); + + // And we serially acquire permits for 3 unique values + multisetSemaphore.acquireAll(ImmutableSet.of("a", "b", "c")); + // And then attempt to acquire permits for 2 of those same unique values, + // Then we don't deadlock. + multisetSemaphore.acquireAll(ImmutableSet.of("b", "c")); + // And then we release one of the permit for one of those unique values, + multisetSemaphore.releaseAll(ImmutableSet.of("c")); + // And then we release the other permit, + multisetSemaphore.releaseAll(ImmutableSet.of("c")); + // We are able to acquire a permit for a 4th unique value. + multisetSemaphore.acquireAll(ImmutableSet.of("d")); + } + + @Test + public void testSimple_Concurrent() throws Exception { + // When we have N and M, with M > N and M|N. + final int n = 10; + int m = n * 2; + Preconditions.checkState(m > n && m % n == 0, "M=%d N=%d", m, n); + // When we have a MultisetSemaphore + final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder() + // With a concurrency level of M + .concurrencyLevel(m) + // And N max num unique values, + .maxNumUniqueValues(n) + .build(); + + // And a ExecutorService with M threads, + ExecutorService executorService = Executors.newFixedThreadPool(m); + // And a recorder for thrown exceptions, + ThrowableRecordingRunnableWrapper wrapper = + new ThrowableRecordingRunnableWrapper("testSimple_Concurrent"); + final AtomicInteger numThreadsJustAfterAcquireInFirstRound = new AtomicInteger(0); + final AtomicInteger numThreadsJustAfterAcquireInSecondRound = new AtomicInteger(0); + final AtomicInteger secondRoundCompleted = new AtomicInteger(0); + final int napTimeMs = 42; + for (int i = 0; i < m; i++) { + final String val = "val" + i; + // And we submit M Runnables, each of which + executorService.submit(wrapper.wrap(new Runnable() { + @Override + public void run() { + try { + // Has two rounds + + // Wherein the first round + // The Runnable acquire a permit for a unique value (among M values), + ImmutableSet<String> valSet = ImmutableSet.of(val); + multisetSemaphore.acquireAll(valSet); + assertThat(numThreadsJustAfterAcquireInFirstRound.getAndIncrement()).isLessThan(n); + // And then sleeps, + Thread.sleep(napTimeMs); + numThreadsJustAfterAcquireInFirstRound.decrementAndGet(); + multisetSemaphore.releaseAll(valSet); + + // And wherein the second round + // The Runnable again acquires a permit for its unique value, + multisetSemaphore.acquireAll(valSet); + assertThat(numThreadsJustAfterAcquireInSecondRound.getAndIncrement()).isLessThan(n); + // And then sleeps, + Thread.sleep(napTimeMs); + numThreadsJustAfterAcquireInSecondRound.decrementAndGet(); + // And notes that it has completed the second round, + secondRoundCompleted.incrementAndGet(); + multisetSemaphore.releaseAll(valSet); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + })); + } + // And we wait for all M Runnables to complete (that is, none of them were deadlocked), + boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService); + // Then none of our Runnables threw any Exceptions. + assertThat(wrapper.getFirstThrownError()).isNull(); + if (interrupted) { + Thread.currentThread().interrupt(); + throw new InterruptedException(); + } + // And the counters we used for sanity checks were correctly reset to 0. + assertThat(numThreadsJustAfterAcquireInFirstRound.get()).isEqualTo(0); + assertThat(numThreadsJustAfterAcquireInSecondRound.get()).isEqualTo(0); + // And all M Runnables completed the second round. + assertThat(secondRoundCompleted.get()).isEqualTo(m); + Set<String> newVals = new HashSet<>(); + for (int i = 0; i < n; i++) { + newVals.add("newval" + i); + } + // And the main test thread is able to acquire permits for N new unique values (indirectly + // confirming that the MultisetSemaphore previously had no outstanding permits). + multisetSemaphore.acquireAll(newVals); + } + + @Test + public void testConcurrentAtomicity() throws Exception { + int n = 100; + // When we have a MultisetSemaphore + final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder() + // With a concurrency level of N + .concurrencyLevel(n) + // And 2 max num unique values, + .maxNumUniqueValues(2) + .build(); + // And a ExecutorService with N threads, + ExecutorService executorService = Executors.newFixedThreadPool(n); + // And a recorder for thrown exceptions, + ThrowableRecordingRunnableWrapper wrapper = + new ThrowableRecordingRunnableWrapper("testConcurrentAtomicity"); + final int napTimeMs = 42; + // And a done latch with initial count N, + final CountDownLatch allDoneLatch = new CountDownLatch(n); + final String sameVal = "same-val"; + for (int i = 0; i < n; i++) { + final String differentVal = "different-val" + i; + // And we submit N Runnables, each of which + executorService.submit(wrapper.wrap(new Runnable() { + @Override + public void run() { + try { + Set<String> vals = ImmutableSet.of(sameVal, differentVal); + // Tries to acquire a permit for a set of two values, one of which is the same for all + // the N Runnables and one of which is unique across all N Runnables. + multisetSemaphore.acquireAll(vals); + // And then sleeps + Thread.sleep(napTimeMs); + // And then releases its permits + multisetSemaphore.releaseAll(vals); + // And then counts down the done latch, + allDoneLatch.countDown(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + })); + } + // Then all of our Runnables completed (without deadlock!), as expected, + boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService); + // And thus were able to count down the done latch, + allDoneLatch.await(); + // And also none of them threw any Exceptions. + assertThat(wrapper.getFirstThrownError()).isNull(); + if (interrupted) { + Thread.currentThread().interrupt(); + throw new InterruptedException(); + } + } + + @Test + public void testConcurrentRace() throws Exception { + // When we have N values + int n = 6; + ArrayList<String> vals = new ArrayList<>(); + for (int i = 0; i < n; i++) { + vals.add("val-" + i); + } + // And we have all permutations of these N values + Collection<List<String>> permutations = Collections2.orderedPermutations(vals); + int numPermutations = permutations.size(); + // And we have a MultisetSemaphore + final MultisetSemaphore<String> multisetSemaphore = MultisetSemaphore.newBuilder() + // With a concurrency level of N! + .concurrencyLevel(numPermutations) + // And with N max num unique values, + .maxNumUniqueValues(n) + .build(); + // And a ExecutorService with N! threads, + ExecutorService executorService = Executors.newFixedThreadPool(numPermutations); + // And a recorder for thrown exceptions, + ThrowableRecordingRunnableWrapper wrapper = + new ThrowableRecordingRunnableWrapper("testConcurrentRace"); + for (List<String> orderedVals : permutations) { + final Set<String> orderedSet = new LinkedHashSet<>(orderedVals); + // And we submit N! Runnables, each of which + executorService.submit(wrapper.wrap(new Runnable() { + @Override + public void run() { + try { + // Tries to acquire a permit for the set of N values, with a unique iteration order + // (across all the N! different permutations) + multisetSemaphore.acquireAll(orderedSet); + // And then immediately releases the permit. + multisetSemaphore.releaseAll(orderedSet); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + })); + } + // Then all of our Runnables completed (without deadlock!), as expected, + boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService); + // And also none of them threw any Exceptions. + assertThat(wrapper.getFirstThrownError()).isNull(); + if (interrupted) { + Thread.currentThread().interrupt(); + throw new InterruptedException(); + } + } +} + |