aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
diff options
context:
space:
mode:
authorGravatar Nathan Harmata <nharmata@google.com>2016-11-09 22:16:14 +0000
committerGravatar Klaus Aehlig <aehlig@google.com>2016-11-10 09:22:18 +0000
commit1df80e54c3a53efc7f86f7c6da9973c7dd43d5fc (patch)
treeb8b95a9185768000fe94af0a324d0c75a74ea0a0 /src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
parent4665e709054dcfe34d1e246caefb8847a560e22a (diff)
Introduce MultisetSemaphore: A concurrency primitive for managing access to at most K unique things at once.
-- MOS_MIGRATED_REVID=138684040
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java229
1 files changed, 229 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
new file mode 100644
index 0000000000..d1f3ef7a77
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
@@ -0,0 +1,229 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.collect.MapMaker;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import com.google.devtools.build.lib.util.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+
+/**
+ * A concurrency primitive for managing access to at most K unique things at once, for a fixed K.
+ *
+ * <p>You can think of this as a pair of a {@link Semaphore} with K total permits and a
+ * {@link Multiset}, with permits being doled out and returned based on the current contents of the
+ * {@link Multiset}.
+ */
+@ThreadSafe
+public abstract class MultisetSemaphore<T> {
+ /**
+ * Blocks until permits are available for all the values in {@code valuesToAcquire}, and then
+ * atomically acquires these permits.
+ *
+ * <p>{@code acquireAll(valuesToAcquire)} atomically does the following
+ * <ol>
+ * <li>Computes {@code m}, the number of values in {@code valuesToAcquire} that are not
+ * currently in the backing {@link Multiset}.
+ * <li>Adds {@code valuesToAcquire} to the backing {@link Multiset}.
+ * <li>Blocks until {@code m} permits are available from the backing {@link Semaphore}.
+ * <li>Acquires these permits.
+ * </ol>
+ */
+ public abstract void acquireAll(Set<T> valuesToAcquire) throws InterruptedException;
+
+ /**
+ * Atomically releases permits for all the values in {@code valuesToAcquire}.
+ *
+ * <p>{@code releaseAll(valuesToRelease)} atomically does the following
+ * <ol>
+ * <li>Computes {@code m}, the number of values in {@code valuesToRelease} that are currently in
+ * the backing {@link Multiset} with multiplicity 1.
+ * <li>Removes {@code valuesToRelease} from the backing {@link Multiset}.
+ * <li>Release {@code m} permits from the backing {@link Semaphore}.
+ * </ol>
+ *
+ * <p>Assumes that this {@link MultisetSemaphore} has already given out permits for all the
+ * values in {@code valuesToAcquire}.
+ */
+ public abstract void releaseAll(Set<T> valuesToRelease);
+
+ /**
+ * Returns a {@link MultisetSemaphore} with a backing {@link Semaphore} that has an unbounded
+ * number of permits; that is, {@link #acquireAll} will never block.
+ */
+ public static <T> MultisetSemaphore<T> unbounded() {
+ return UnboundedMultisetSemaphore.instance();
+ }
+
+ /** Builder for {@link MultisetSemaphore} instances. */
+ public static class Builder {
+ private static final int UNSET_INT = -1;
+
+ private int maxNumUniqueValues = UNSET_INT;
+ private MapMaker mapMaker = new MapMaker();
+
+ private Builder() {
+ }
+
+ /**
+ * Sets the maximum number of unique values for which permits can be held at once in the
+ * to-be-constructed {@link MultisetSemaphore}.
+ */
+ public Builder maxNumUniqueValues(int maxNumUniqueValues) {
+ Preconditions.checkState(
+ maxNumUniqueValues > 0,
+ "maxNumUniqueValues must be positive (was %d)",
+ maxNumUniqueValues);
+ this.maxNumUniqueValues = maxNumUniqueValues;
+ return this;
+ }
+
+ /**
+ * Sets the concurrency level (expected number of concurrent usages) of internal data structures
+ * of the to-be-constructed {@link MultisetSemaphore}.
+ *
+ * <p>This is a hint for tweaking performance and lock contention.
+ */
+ public Builder concurrencyLevel(int concurrencyLevel) {
+ mapMaker = mapMaker.concurrencyLevel(concurrencyLevel);
+ return this;
+ }
+
+ public <T> MultisetSemaphore<T> build() {
+ Preconditions.checkState(
+ maxNumUniqueValues != UNSET_INT,
+ "maxNumUniqueValues(int) must be specified");
+ return new BoundedMultisetSemaphore<>(maxNumUniqueValues, mapMaker);
+ }
+ }
+
+ /** Returns a fresh {@link Builder}. */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ private static class UnboundedMultisetSemaphore<T> extends MultisetSemaphore<T> {
+ private static final UnboundedMultisetSemaphore<Object> INSTANCE =
+ new UnboundedMultisetSemaphore<Object>();
+
+ private UnboundedMultisetSemaphore() {
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> UnboundedMultisetSemaphore<T> instance() {
+ return (UnboundedMultisetSemaphore<T>) INSTANCE;
+ }
+
+ @Override
+ public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException {
+ }
+
+ @Override
+ public void releaseAll(Set<T> valuesToRelease) {
+ }
+ }
+
+ private static class BoundedMultisetSemaphore<T> extends MultisetSemaphore<T> {
+ // Implementation strategy:
+ //
+ // We have a single Semaphore, access to which is managed by two levels of Multisets, the first
+ // of which is an approximate accounting of the current multiplicities, and the second of which
+ // is an accurate accounting of the current multiplicities. The first level is used to decide
+ // how many permits to acquire from the semaphore on acquireAll and the second level is used to
+ // decide how many permits to release from the semaphore on releaseAll. The separation between
+ // these two levels ensure the atomicity of acquireAll and releaseAll.
+
+ // We also have a map of CountDownLatches, used to handle the case where there is a not-empty
+ // set that is a subset of the set of values for which multiple threads are concurrently trying
+ // to acquire permits.
+
+ private final Semaphore semaphore;
+ private final ConcurrentHashMultiset<T> tentativeValues;
+ private final ConcurrentHashMultiset<T> actualValues;
+ private final ConcurrentMap<T, CountDownLatch> latches;
+
+ private BoundedMultisetSemaphore(int maxNumUniqueValues, MapMaker mapMaker) {
+ this.semaphore = new Semaphore(maxNumUniqueValues);
+ // TODO(nharmata): Use ConcurrentHashMultiset#create(ConcurrentMap<E, AtomicInteger>) when
+ // Bazel is switched to use a more recent version of Guava. Until then we'll have unnecessary
+ // contention when using these Multisets.
+ this.tentativeValues = ConcurrentHashMultiset.create();
+ this.actualValues = ConcurrentHashMultiset.create();
+ this.latches = mapMaker.makeMap();
+ }
+
+ @Override
+ public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException {
+ int numValuesToAcquire = valuesToAcquire.size();
+ HashMap<T, CountDownLatch> latchesToCountDownByValue =
+ Maps.newHashMapWithExpectedSize(numValuesToAcquire);
+ ArrayList<CountDownLatch> latchesToAwait = new ArrayList<>(numValuesToAcquire);
+ for (T value : valuesToAcquire) {
+ int oldCount = tentativeValues.add(value, 1);
+ if (oldCount == 0) {
+ // The value was just uniquely added by us.
+ CountDownLatch latch = new CountDownLatch(1);
+ Preconditions.checkState(latches.put(value, latch) == null, value);
+ latchesToCountDownByValue.put(value, latch);
+ } else {
+ CountDownLatch latch = latches.get(value);
+ if (latch != null) {
+ // The value was recently added by another thread, and that thread is still waiting to
+ // acquire a permit for it.
+ latchesToAwait.add(latch);
+ }
+ }
+ }
+
+ int numUniqueValuesToAcquire = latchesToCountDownByValue.size();
+ semaphore.acquire(numUniqueValuesToAcquire);
+ for (T value : valuesToAcquire) {
+ actualValues.add(value);
+ }
+ for (Map.Entry<T, CountDownLatch> entry : latchesToCountDownByValue.entrySet()) {
+ T value = entry.getKey();
+ CountDownLatch latchToCountDown = entry.getValue();
+ latchToCountDown.countDown();
+ Preconditions.checkState(latchToCountDown == latches.remove(value), value);
+ }
+ for (CountDownLatch latchToAwait : latchesToAwait) {
+ latchToAwait.await();
+ }
+ }
+
+ @Override
+ public void releaseAll(Set<T> valuesToRelease) {
+ int numUniqueValuesToRelease = 0;
+ for (T value : valuesToRelease) {
+ int oldCount = actualValues.remove(value, 1);
+ Preconditions.checkState(oldCount >= 0, "%d %s", oldCount, value);
+ if (oldCount == 1) {
+ numUniqueValuesToRelease++;
+ }
+ tentativeValues.remove(value, 1);
+ }
+
+ semaphore.release(numUniqueValuesToRelease);
+ }
+ }
+}