diff options
author | Nathan Harmata <nharmata@google.com> | 2016-11-09 22:16:14 +0000 |
---|---|---|
committer | Klaus Aehlig <aehlig@google.com> | 2016-11-10 09:22:18 +0000 |
commit | 1df80e54c3a53efc7f86f7c6da9973c7dd43d5fc (patch) | |
tree | b8b95a9185768000fe94af0a324d0c75a74ea0a0 /src/main/java | |
parent | 4665e709054dcfe34d1e246caefb8847a560e22a (diff) |
Introduce MultisetSemaphore: A concurrency primitive for managing access to at most K unique things at once.
--
MOS_MIGRATED_REVID=138684040
Diffstat (limited to 'src/main/java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java | 229 |
1 files changed, 229 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java new file mode 100644 index 0000000000..d1f3ef7a77 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java @@ -0,0 +1,229 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import com.google.common.collect.ConcurrentHashMultiset; +import com.google.common.collect.MapMaker; +import com.google.common.collect.Maps; +import com.google.common.collect.Multiset; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; +import com.google.devtools.build.lib.util.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; + +/** + * A concurrency primitive for managing access to at most K unique things at once, for a fixed K. + * + * <p>You can think of this as a pair of a {@link Semaphore} with K total permits and a + * {@link Multiset}, with permits being doled out and returned based on the current contents of the + * {@link Multiset}. + */ +@ThreadSafe +public abstract class MultisetSemaphore<T> { + /** + * Blocks until permits are available for all the values in {@code valuesToAcquire}, and then + * atomically acquires these permits. + * + * <p>{@code acquireAll(valuesToAcquire)} atomically does the following + * <ol> + * <li>Computes {@code m}, the number of values in {@code valuesToAcquire} that are not + * currently in the backing {@link Multiset}. + * <li>Adds {@code valuesToAcquire} to the backing {@link Multiset}. + * <li>Blocks until {@code m} permits are available from the backing {@link Semaphore}. + * <li>Acquires these permits. + * </ol> + */ + public abstract void acquireAll(Set<T> valuesToAcquire) throws InterruptedException; + + /** + * Atomically releases permits for all the values in {@code valuesToAcquire}. + * + * <p>{@code releaseAll(valuesToRelease)} atomically does the following + * <ol> + * <li>Computes {@code m}, the number of values in {@code valuesToRelease} that are currently in + * the backing {@link Multiset} with multiplicity 1. + * <li>Removes {@code valuesToRelease} from the backing {@link Multiset}. + * <li>Release {@code m} permits from the backing {@link Semaphore}. + * </ol> + * + * <p>Assumes that this {@link MultisetSemaphore} has already given out permits for all the + * values in {@code valuesToAcquire}. + */ + public abstract void releaseAll(Set<T> valuesToRelease); + + /** + * Returns a {@link MultisetSemaphore} with a backing {@link Semaphore} that has an unbounded + * number of permits; that is, {@link #acquireAll} will never block. + */ + public static <T> MultisetSemaphore<T> unbounded() { + return UnboundedMultisetSemaphore.instance(); + } + + /** Builder for {@link MultisetSemaphore} instances. */ + public static class Builder { + private static final int UNSET_INT = -1; + + private int maxNumUniqueValues = UNSET_INT; + private MapMaker mapMaker = new MapMaker(); + + private Builder() { + } + + /** + * Sets the maximum number of unique values for which permits can be held at once in the + * to-be-constructed {@link MultisetSemaphore}. + */ + public Builder maxNumUniqueValues(int maxNumUniqueValues) { + Preconditions.checkState( + maxNumUniqueValues > 0, + "maxNumUniqueValues must be positive (was %d)", + maxNumUniqueValues); + this.maxNumUniqueValues = maxNumUniqueValues; + return this; + } + + /** + * Sets the concurrency level (expected number of concurrent usages) of internal data structures + * of the to-be-constructed {@link MultisetSemaphore}. + * + * <p>This is a hint for tweaking performance and lock contention. + */ + public Builder concurrencyLevel(int concurrencyLevel) { + mapMaker = mapMaker.concurrencyLevel(concurrencyLevel); + return this; + } + + public <T> MultisetSemaphore<T> build() { + Preconditions.checkState( + maxNumUniqueValues != UNSET_INT, + "maxNumUniqueValues(int) must be specified"); + return new BoundedMultisetSemaphore<>(maxNumUniqueValues, mapMaker); + } + } + + /** Returns a fresh {@link Builder}. */ + public static Builder newBuilder() { + return new Builder(); + } + + private static class UnboundedMultisetSemaphore<T> extends MultisetSemaphore<T> { + private static final UnboundedMultisetSemaphore<Object> INSTANCE = + new UnboundedMultisetSemaphore<Object>(); + + private UnboundedMultisetSemaphore() { + } + + @SuppressWarnings("unchecked") + private static <T> UnboundedMultisetSemaphore<T> instance() { + return (UnboundedMultisetSemaphore<T>) INSTANCE; + } + + @Override + public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException { + } + + @Override + public void releaseAll(Set<T> valuesToRelease) { + } + } + + private static class BoundedMultisetSemaphore<T> extends MultisetSemaphore<T> { + // Implementation strategy: + // + // We have a single Semaphore, access to which is managed by two levels of Multisets, the first + // of which is an approximate accounting of the current multiplicities, and the second of which + // is an accurate accounting of the current multiplicities. The first level is used to decide + // how many permits to acquire from the semaphore on acquireAll and the second level is used to + // decide how many permits to release from the semaphore on releaseAll. The separation between + // these two levels ensure the atomicity of acquireAll and releaseAll. + + // We also have a map of CountDownLatches, used to handle the case where there is a not-empty + // set that is a subset of the set of values for which multiple threads are concurrently trying + // to acquire permits. + + private final Semaphore semaphore; + private final ConcurrentHashMultiset<T> tentativeValues; + private final ConcurrentHashMultiset<T> actualValues; + private final ConcurrentMap<T, CountDownLatch> latches; + + private BoundedMultisetSemaphore(int maxNumUniqueValues, MapMaker mapMaker) { + this.semaphore = new Semaphore(maxNumUniqueValues); + // TODO(nharmata): Use ConcurrentHashMultiset#create(ConcurrentMap<E, AtomicInteger>) when + // Bazel is switched to use a more recent version of Guava. Until then we'll have unnecessary + // contention when using these Multisets. + this.tentativeValues = ConcurrentHashMultiset.create(); + this.actualValues = ConcurrentHashMultiset.create(); + this.latches = mapMaker.makeMap(); + } + + @Override + public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException { + int numValuesToAcquire = valuesToAcquire.size(); + HashMap<T, CountDownLatch> latchesToCountDownByValue = + Maps.newHashMapWithExpectedSize(numValuesToAcquire); + ArrayList<CountDownLatch> latchesToAwait = new ArrayList<>(numValuesToAcquire); + for (T value : valuesToAcquire) { + int oldCount = tentativeValues.add(value, 1); + if (oldCount == 0) { + // The value was just uniquely added by us. + CountDownLatch latch = new CountDownLatch(1); + Preconditions.checkState(latches.put(value, latch) == null, value); + latchesToCountDownByValue.put(value, latch); + } else { + CountDownLatch latch = latches.get(value); + if (latch != null) { + // The value was recently added by another thread, and that thread is still waiting to + // acquire a permit for it. + latchesToAwait.add(latch); + } + } + } + + int numUniqueValuesToAcquire = latchesToCountDownByValue.size(); + semaphore.acquire(numUniqueValuesToAcquire); + for (T value : valuesToAcquire) { + actualValues.add(value); + } + for (Map.Entry<T, CountDownLatch> entry : latchesToCountDownByValue.entrySet()) { + T value = entry.getKey(); + CountDownLatch latchToCountDown = entry.getValue(); + latchToCountDown.countDown(); + Preconditions.checkState(latchToCountDown == latches.remove(value), value); + } + for (CountDownLatch latchToAwait : latchesToAwait) { + latchToAwait.await(); + } + } + + @Override + public void releaseAll(Set<T> valuesToRelease) { + int numUniqueValuesToRelease = 0; + for (T value : valuesToRelease) { + int oldCount = actualValues.remove(value, 1); + Preconditions.checkState(oldCount >= 0, "%d %s", oldCount, value); + if (oldCount == 1) { + numUniqueValuesToRelease++; + } + tentativeValues.remove(value, 1); + } + + semaphore.release(numUniqueValuesToRelease); + } + } +} |