diff options
author | 2015-03-07 02:21:20 +0000 | |
---|---|---|
committer | 2015-03-10 13:58:41 +0000 | |
commit | 9b370024faa05f44d13dee1a42b35f7907afa991 (patch) | |
tree | 32ebac1b14c55bb9afe497438238c7ffe47539eb | |
parent | 539f7ad83cba45b2286f7366ab3bd34cc435fe5a (diff) |
Introduce KeyedLocker, a nice concurrency abstraction for managing lots of mutexes, and RefCountedMultisetKeyedLocker, an efficient implementation of this abstraction.
--
MOS_MIGRATED_REVID=88000985
4 files changed, 340 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/KeyedLocker.java b/src/main/java/com/google/devtools/build/lib/concurrent/KeyedLocker.java new file mode 100644 index 0000000000..8228f5b236 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/concurrent/KeyedLocker.java @@ -0,0 +1,49 @@ +// Copyright 2015 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import javax.annotation.concurrent.ThreadSafe; + +/** A keyed store of locks. */ +@ThreadSafe +public interface KeyedLocker<K> { + /** Used to yield access to the implicit lock granted by {@link #lock}. */ + @ThreadSafe + interface AutoUnlocker extends AutoCloseable { + /** + * If this was returned by {@code lock(k)}, yields exclusive access to {@code k}. + * + * <p>This method should be called at most once, and may only be called by the same thread that + * acquired the {@link AutoUnlocker} via {@link #lock}. Implementations are free to do anything + * if this is violated. + */ + @Override + void close(); + } + + /** + * Blocks the current thread until it has exclusive access to do things with {@code k} and + * returns a {@link AutoUnlocker} instance for yielding the implicit lock. The intended usage + * is: + * + * <pre> + * {@code + * try (AutoUnlocker unlocker = locker.lock(k)) { + * // Your code here. + * } + * } + * </pre> + */ + AutoUnlocker lock(K key); +}
\ No newline at end of file diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLocker.java b/src/main/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLocker.java new file mode 100644 index 0000000000..4efa90f567 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLocker.java @@ -0,0 +1,98 @@ +// Copyright 2015 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ConcurrentHashMultiset; +import com.google.common.util.concurrent.Striped; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * An implementation of {@link KeyedLocker} that uses ref counting to efficiently only store locks + * that are live. + */ +public class RefCountedMultisetKeyedLocker<K> implements KeyedLocker<K> { + // Multiset of keys that have threads waiting on a lock or using a lock. + private final ConcurrentHashMultiset<K> waiters = ConcurrentHashMultiset.<K>create(); + + private static final int NUM_STRIPES = 256; + // For each key, gives the striped lock to use for atomically managing the waiters on that key + // internally. + private final Striped<Lock> waitersLocks = Striped.lazyWeakLock(NUM_STRIPES); + + // Map of key to current lock, for keys that have at least one waiting or live thread. + private final ConcurrentMap<K, RefCountedLockImpl> locks = new ConcurrentHashMap<>(); + + private class RefCountedLockImpl extends ReentrantLock implements AutoUnlocker { + private final K key; + + private RefCountedLockImpl(K key) { + this.key = key; + } + + @Override + public void close() { + Preconditions.checkState(isHeldByCurrentThread(), "For key %s, 'close' can be called at most " + + "once and the calling thread must be the one that acquired the AutoUnlocker", key); + try { + Lock waitersLock = waitersLocks.get(key); + try { + waitersLock.lock(); + // Note that ConcurrentHashMultiset automatically removes removes entries for keys whose + // count is 0. + waiters.remove(key); + if (waiters.count(key) == 0) { + // No other thread is waiting to access this key, so we garbage collect the lock. + Preconditions.checkState(locks.remove(key, this), key); + } + } finally { + waitersLock.unlock(); + } + } finally { + unlock(); + } + } + } + + @Override + public AutoUnlocker lock(K key) { + RefCountedLockImpl newLock = new RefCountedLockImpl(key); + // Pre-lock our fresh lock, in case we win the race to get access to 'key'. + newLock.lock(); + Lock waitersLock = waitersLocks.get(key); + try { + waitersLock.lock(); + // Add us to the set of waiters, so that in case we lose the race to access 'key', the winner + // will know that we are waiting. + waiters.add(key); + } finally { + waitersLock.unlock(); + } + RefCountedLockImpl lock; + lock = locks.putIfAbsent(key, newLock); + if (lock != null) { + // Another thread won the race to get access to 'key', so we wait for our turn. + Preconditions.checkState(lock != newLock); + newLock.unlock(); + lock.lock(); + return lock; + } + // We won the race, so the current lock for 'key' is the one we locked and inserted. + return newLock; + } +} diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/KeyedLockerTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/KeyedLockerTest.java new file mode 100644 index 0000000000..98862ee061 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/concurrent/KeyedLockerTest.java @@ -0,0 +1,167 @@ +// Copyright 2015 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.devtools.build.lib.concurrent.KeyedLocker.AutoUnlocker; +import com.google.devtools.build.lib.testutil.TestUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** Base class for tests for {@link KeyedLocker} implementations. */ +public abstract class KeyedLockerTest { + private static final int NUM_EXECUTOR_THREADS = 1000; + private KeyedLocker<String> locker; + private ExecutorService executorService; + + protected abstract KeyedLocker<String> makeFreshLocker(); + + @Before + public void setUp() { + locker = makeFreshLocker(); + executorService = Executors.newFixedThreadPool(NUM_EXECUTOR_THREADS); + } + + @After + public void tearDown() { + locker = null; + MoreExecutors.shutdownAndAwaitTermination(executorService, TestUtils.WAIT_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + } + + @Test + public void simpleSingleThreaded() { + locker.lock("cat"); + locker.lock("dog"); + locker.lock("cat"); + locker.lock("dog"); + } + + @Test + public void doubleUnlock() { + AutoUnlocker unlocker = locker.lock("cat"); + unlocker.close(); + try { + unlocker.close(); + fail(); + } catch (IllegalStateException e) { + String expectedMessage = "'close' can be called at most once"; + assertThat(e.getMessage()).contains(expectedMessage); + } + } + + @Test + public void unlockOnOtherThread() throws Exception { + final AtomicReference<AutoUnlocker> unlockerRef = new AtomicReference<>(); + final CountDownLatch unlockerRefSetLatch = new CountDownLatch(1); + final AtomicBoolean runnableInterrupted = new AtomicBoolean(false); + final AtomicBoolean runnable2Executed = new AtomicBoolean(false); + Runnable runnable1 = new Runnable() { + @Override + public void run() { + unlockerRef.set(locker.lock("cat")); + unlockerRefSetLatch.countDown(); + } + }; + Runnable runnable2 = new Runnable() { + @Override + public void run() { + try { + unlockerRefSetLatch.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException e) { + runnableInterrupted.set(true); + } + try { + Preconditions.checkNotNull(unlockerRef.get()).close(); + fail(); + } catch (IllegalStateException e) { + String expectedMessage = "the calling thread must be the one that acquired the " + + "AutoUnlocker"; + assertThat(e.getMessage()).contains(expectedMessage); + runnable2Executed.set(true); + } + } + }; + executorService.submit(runnable1); + executorService.submit(runnable2); + boolean interrupted = ExecutorShutdownUtil.interruptibleShutdown(executorService); + if (interrupted || runnableInterrupted.get()) { + Thread.currentThread().interrupt(); + throw new InterruptedException(); + } + assertTrue(runnable2Executed.get()); + } + + @Test + public void refCountingSanity() { + Set<AutoUnlocker> unlockers = new HashSet<>(); + for (int i = 0; i < 1000; i++) { + try (AutoUnlocker unlocker = locker.lock("cat")) { + assertTrue(unlockers.add(unlocker)); + } + } + } + + @Test + public void simpleMultiThreaded_MutualExclusion() throws InterruptedException { + final CountDownLatch runnableLatch = new CountDownLatch(NUM_EXECUTOR_THREADS); + final AtomicInteger mutexCounter = new AtomicInteger(0); + final AtomicInteger runnableCounter = new AtomicInteger(0); + final AtomicBoolean runnableInterrupted = new AtomicBoolean(false); + Runnable runnable = new Runnable() { + @Override + public void run() { + runnableLatch.countDown(); + try { + // Wait until all the Runnables are ready to try to acquire the lock all at once. + runnableLatch.await(TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException e) { + runnableInterrupted.set(true); + } + try (AutoUnlocker unlocker = locker.lock("cat")) { + runnableCounter.incrementAndGet(); + assertEquals(1, mutexCounter.incrementAndGet()); + assertEquals(0, mutexCounter.decrementAndGet()); + } + } + }; + for (int i = 0; i < NUM_EXECUTOR_THREADS; i++) { + executorService.submit(runnable); + } + boolean interrupted = ExecutorShutdownUtil.interruptibleShutdown(executorService); + if (interrupted || runnableInterrupted.get()) { + Thread.currentThread().interrupt(); + throw new InterruptedException(); + } + assertEquals(NUM_EXECUTOR_THREADS, runnableCounter.get()); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLockerTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLockerTest.java new file mode 100644 index 0000000000..a48cda517e --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/concurrent/RefCountedMultisetKeyedLockerTest.java @@ -0,0 +1,26 @@ +// Copyright 2015 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link RefCountedMultisetKeyedLocker}. */ +@RunWith(JUnit4.class) +public class RefCountedMultisetKeyedLockerTest extends KeyedLockerTest { + @Override + protected KeyedLocker<String> makeFreshLocker() { + return new RefCountedMultisetKeyedLocker<>(); + } +} |