aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Nathan Harmata <nharmata@google.com>2015-03-07 02:21:20 +0000
committerGravatar Damien Martin-Guillerez <dmarting@google.com>2015-03-10 13:58:41 +0000
commit9b370024faa05f44d13dee1a42b35f7907afa991 (patch)
tree32ebac1b14c55bb9afe497438238c7ffe47539eb
parent539f7ad83cba45b2286f7366ab3bd34cc435fe5a (diff)
Introduce KeyedLocker, a nice concurrency abstraction for managing lots of mutexes, and RefCountedMultisetKeyedLocker, an efficient implementation of this abstraction.
-- MOS_MIGRATED_REVID=88000985
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/KeyedLocker.java49
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLocker.java98
-rw-r--r--src/test/java/com/google/devtools/build/lib/concurrent/KeyedLockerTest.java167
-rw-r--r--src/test/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLockerTest.java26
4 files changed, 340 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/KeyedLocker.java b/src/main/java/com/google/devtools/build/lib/concurrent/KeyedLocker.java
new file mode 100644
index 0000000000..8228f5b236
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/KeyedLocker.java
@@ -0,0 +1,49 @@
+// Copyright 2015 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/** A keyed store of locks. */
+@ThreadSafe
+public interface KeyedLocker<K> {
+ /** Used to yield access to the implicit lock granted by {@link #lock}. */
+ @ThreadSafe
+ interface AutoUnlocker extends AutoCloseable {
+ /**
+ * If this was returned by {@code lock(k)}, yields exclusive access to {@code k}.
+ *
+ * <p>This method should be called at most once, and may only be called by the same thread that
+ * acquired the {@link AutoUnlocker} via {@link #lock}. Implementations are free to do anything
+ * if this is violated.
+ */
+ @Override
+ void close();
+ }
+
+ /**
+ * Blocks the current thread until it has exclusive access to do things with {@code k} and
+ * returns a {@link AutoUnlocker} instance for yielding the implicit lock. The intended usage
+ * is:
+ *
+ * <pre>
+ * {@code
+ * try (AutoUnlocker unlocker = locker.lock(k)) {
+ * // Your code here.
+ * }
+ * }
+ * </pre>
+ */
+ AutoUnlocker lock(K key);
+} \ No newline at end of file
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLocker.java b/src/main/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLocker.java
new file mode 100644
index 0000000000..4efa90f567
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLocker.java
@@ -0,0 +1,98 @@
+// Copyright 2015 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.Striped;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * An implementation of {@link KeyedLocker} that uses ref counting to efficiently only store locks
+ * that are live.
+ */
+public class RefCountedMultisetKeyedLocker<K> implements KeyedLocker<K> {
+ // Multiset of keys that have threads waiting on a lock or using a lock.
+ private final ConcurrentHashMultiset<K> waiters = ConcurrentHashMultiset.<K>create();
+
+ private static final int NUM_STRIPES = 256;
+ // For each key, gives the striped lock to use for atomically managing the waiters on that key
+ // internally.
+ private final Striped<Lock> waitersLocks = Striped.lazyWeakLock(NUM_STRIPES);
+
+ // Map of key to current lock, for keys that have at least one waiting or live thread.
+ private final ConcurrentMap<K, RefCountedLockImpl> locks = new ConcurrentHashMap<>();
+
+ private class RefCountedLockImpl extends ReentrantLock implements AutoUnlocker {
+ private final K key;
+
+ private RefCountedLockImpl(K key) {
+ this.key = key;
+ }
+
+ @Override
+ public void close() {
+ Preconditions.checkState(isHeldByCurrentThread(), "For key %s, 'close' can be called at most "
+ + "once and the calling thread must be the one that acquired the AutoUnlocker", key);
+ try {
+ Lock waitersLock = waitersLocks.get(key);
+ try {
+ waitersLock.lock();
+ // Note that ConcurrentHashMultiset automatically removes removes entries for keys whose
+ // count is 0.
+ waiters.remove(key);
+ if (waiters.count(key) == 0) {
+ // No other thread is waiting to access this key, so we garbage collect the lock.
+ Preconditions.checkState(locks.remove(key, this), key);
+ }
+ } finally {
+ waitersLock.unlock();
+ }
+ } finally {
+ unlock();
+ }
+ }
+ }
+
+ @Override
+ public AutoUnlocker lock(K key) {
+ RefCountedLockImpl newLock = new RefCountedLockImpl(key);
+ // Pre-lock our fresh lock, in case we win the race to get access to 'key'.
+ newLock.lock();
+ Lock waitersLock = waitersLocks.get(key);
+ try {
+ waitersLock.lock();
+ // Add us to the set of waiters, so that in case we lose the race to access 'key', the winner
+ // will know that we are waiting.
+ waiters.add(key);
+ } finally {
+ waitersLock.unlock();
+ }
+ RefCountedLockImpl lock;
+ lock = locks.putIfAbsent(key, newLock);
+ if (lock != null) {
+ // Another thread won the race to get access to 'key', so we wait for our turn.
+ Preconditions.checkState(lock != newLock);
+ newLock.unlock();
+ lock.lock();
+ return lock;
+ }
+ // We won the race, so the current lock for 'key' is the one we locked and inserted.
+ return newLock;
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/KeyedLockerTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/KeyedLockerTest.java
new file mode 100644
index 0000000000..98862ee061
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/concurrent/KeyedLockerTest.java
@@ -0,0 +1,167 @@
+// Copyright 2015 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.devtools.build.lib.concurrent.KeyedLocker.AutoUnlocker;
+import com.google.devtools.build.lib.testutil.TestUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** Base class for tests for {@link KeyedLocker} implementations. */
+public abstract class KeyedLockerTest {
+ private static final int NUM_EXECUTOR_THREADS = 1000;
+ private KeyedLocker<String> locker;
+ private ExecutorService executorService;
+
+ protected abstract KeyedLocker<String> makeFreshLocker();
+
+ @Before
+ public void setUp() {
+ locker = makeFreshLocker();
+ executorService = Executors.newFixedThreadPool(NUM_EXECUTOR_THREADS);
+ }
+
+ @After
+ public void tearDown() {
+ locker = null;
+ MoreExecutors.shutdownAndAwaitTermination(executorService, TestUtils.WAIT_TIMEOUT_SECONDS,
+ TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void simpleSingleThreaded() {
+ locker.lock("cat");
+ locker.lock("dog");
+ locker.lock("cat");
+ locker.lock("dog");
+ }
+
+ @Test
+ public void doubleUnlock() {
+ AutoUnlocker unlocker = locker.lock("cat");
+ unlocker.close();
+ try {
+ unlocker.close();
+ fail();
+ } catch (IllegalStateException e) {
+ String expectedMessage = "'close' can be called at most once";
+ assertThat(e.getMessage()).contains(expectedMessage);
+ }
+ }
+
+ @Test
+ public void unlockOnOtherThread() throws Exception {
+ final AtomicReference<AutoUnlocker> unlockerRef = new AtomicReference<>();
+ final CountDownLatch unlockerRefSetLatch = new CountDownLatch(1);
+ final AtomicBoolean runnableInterrupted = new AtomicBoolean(false);
+ final AtomicBoolean runnable2Executed = new AtomicBoolean(false);
+ Runnable runnable1 = new Runnable() {
+ @Override
+ public void run() {
+ unlockerRef.set(locker.lock("cat"));
+ unlockerRefSetLatch.countDown();
+ }
+ };
+ Runnable runnable2 = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ unlockerRefSetLatch.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ runnableInterrupted.set(true);
+ }
+ try {
+ Preconditions.checkNotNull(unlockerRef.get()).close();
+ fail();
+ } catch (IllegalStateException e) {
+ String expectedMessage = "the calling thread must be the one that acquired the "
+ + "AutoUnlocker";
+ assertThat(e.getMessage()).contains(expectedMessage);
+ runnable2Executed.set(true);
+ }
+ }
+ };
+ executorService.submit(runnable1);
+ executorService.submit(runnable2);
+ boolean interrupted = ExecutorShutdownUtil.interruptibleShutdown(executorService);
+ if (interrupted || runnableInterrupted.get()) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedException();
+ }
+ assertTrue(runnable2Executed.get());
+ }
+
+ @Test
+ public void refCountingSanity() {
+ Set<AutoUnlocker> unlockers = new HashSet<>();
+ for (int i = 0; i < 1000; i++) {
+ try (AutoUnlocker unlocker = locker.lock("cat")) {
+ assertTrue(unlockers.add(unlocker));
+ }
+ }
+ }
+
+ @Test
+ public void simpleMultiThreaded_MutualExclusion() throws InterruptedException {
+ final CountDownLatch runnableLatch = new CountDownLatch(NUM_EXECUTOR_THREADS);
+ final AtomicInteger mutexCounter = new AtomicInteger(0);
+ final AtomicInteger runnableCounter = new AtomicInteger(0);
+ final AtomicBoolean runnableInterrupted = new AtomicBoolean(false);
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ runnableLatch.countDown();
+ try {
+ // Wait until all the Runnables are ready to try to acquire the lock all at once.
+ runnableLatch.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ runnableInterrupted.set(true);
+ }
+ try (AutoUnlocker unlocker = locker.lock("cat")) {
+ runnableCounter.incrementAndGet();
+ assertEquals(1, mutexCounter.incrementAndGet());
+ assertEquals(0, mutexCounter.decrementAndGet());
+ }
+ }
+ };
+ for (int i = 0; i < NUM_EXECUTOR_THREADS; i++) {
+ executorService.submit(runnable);
+ }
+ boolean interrupted = ExecutorShutdownUtil.interruptibleShutdown(executorService);
+ if (interrupted || runnableInterrupted.get()) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedException();
+ }
+ assertEquals(NUM_EXECUTOR_THREADS, runnableCounter.get());
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLockerTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLockerTest.java
new file mode 100644
index 0000000000..a48cda517e
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLockerTest.java
@@ -0,0 +1,26 @@
+// Copyright 2015 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RefCountedMultisetKeyedLocker}. */
+@RunWith(JUnit4.class)
+public class RefCountedMultisetKeyedLockerTest extends KeyedLockerTest {
+ @Override
+ protected KeyedLocker<String> makeFreshLocker() {
+ return new RefCountedMultisetKeyedLocker<>();
+ }
+}