diff options
author | Nathan Harmata <nharmata@google.com> | 2015-06-15 18:12:48 +0000 |
---|---|---|
committer | Kristina Chodorow <kchodorow@google.com> | 2015-06-16 13:58:55 +0000 |
commit | 6f049bb19941b89d16364b26cca66aae09f9cb42 (patch) | |
tree | 8bbfa340a0f06d2af72929cd8c63f6c6f9e08ac8 /src | |
parent | 354e6d21cb04ce588a25379247ff5fa094321a2a (diff) |
Introduce a simple concurrent Multimap-like data structure with reference counting.
--
MOS_MIGRATED_REVID=96024804
Diffstat (limited to 'src')
4 files changed, 249 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStore.java b/src/main/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStore.java new file mode 100644 index 0000000000..df49454318 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStore.java @@ -0,0 +1,41 @@ +// Copyright 2015 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import com.google.devtools.build.lib.concurrent.ThreadSafety.ConditionallyThreadSafe; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; + +import javax.annotation.Nullable; + +/** A map from keys to values, with frequencies. */ +public interface KeyedFrequencyStore<K, V> { + @ConditionallyThreadSafe + /** + * Inserts {@code value} for the given {@code key} with the given non-negative {@code frequency} + * (overwriting any existing value for that key). + * + * <p>Cannot be called concurrently with a call to {@code consume(key)}. + */ + void put(K key, V value, int frequency); + + @Nullable + @ThreadSafe + /** + * Removes for consumption one of the occurrences of the value for {@code key}, if any. + * + * <p>Formally, a call {@code consume(k)} returns {@code v} if it is the {@code f'}th such call + * since a call to {@code put(k, v, f)} (with {@code f' <= f}), and {@code null} otherwise. + */ + V consume(K key); +}
\ No newline at end of file diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStore.java b/src/main/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStore.java new file mode 100644 index 0000000000..6779e43b0b --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStore.java @@ -0,0 +1,77 @@ +// Copyright 2015 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import com.google.common.base.Preconditions; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.Nullable; + +/** + * An implementation of {@link KeyedFrequencyStore} that uses ref counting to efficiently only + * store (key, value) pairs that have positive frequency. + */ +public class RefCountedHashMapKeyedFrequencyStore<K, V> implements KeyedFrequencyStore<K, V> { + private final ConcurrentHashMap<K, ValueWithFrequency<V>> map = new ConcurrentHashMap<>(); + + private static class ValueWithFrequency<V> { + private final V value; + private final AtomicInteger frequency; + + protected ValueWithFrequency(V value, int initialFrequency) { + this.value = value; + this.frequency = new AtomicInteger(initialFrequency); + } + } + + @Override + public void put(K key, V value, int frequency) { + Preconditions.checkState(frequency >= 0, frequency); + if (frequency == 0) { + map.remove(key); + } else { + map.put(key, new ValueWithFrequency<>(value, frequency)); + } + } + + @Override + @Nullable + public V consume(K key) { + ValueWithFrequency<V> vwf = map.get(key); + if (vwf == null) { + // Either the key isn't present or it has already been removed (because it has already + // been consumed). + return null; + } + int oldFrequency = vwf.frequency.getAndDecrement(); + if (oldFrequency <= 0) { + // This can happen as a result of the following race: suppose the current frequency for key K + // is F and T > F threads call consume(K) and all of them see the same object from the + // map.get call above.. F-1 of these consume calls will decrement the frequency all the way + // down to 1, one thread will "win the race" and decrement the frequency to 0 (see below + // code), but the other T-F threads will be left with a "stale" ValueWithFrequency instance. + // Since the value has already been exhaustively consumed, returning null is the appropriate + // behavior here. + return null; + } + if (oldFrequency == 1) { + // We are the final consumer of the key. + map.remove(key); + } + return vwf.value; + } +} + diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStoreTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStoreTest.java new file mode 100644 index 0000000000..47a34e9226 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStoreTest.java @@ -0,0 +1,105 @@ +// Copyright 2015 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.devtools.build.lib.testutil.TestUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** Base class for tests for {@link KeyedFrequencyStore} implementations. */ +public abstract class KeyedFrequencyStoreTest { + private static final int NUM_EXECUTOR_THREADS = 1000; + private ExecutorService executorService; + private ThrowableRecordingRunnableWrapper wrapper; + private KeyedFrequencyStore<String, Object> store; + + protected abstract KeyedFrequencyStore<String, Object> makeFreshStore(); + + @Before + public void setUp_KeyedFrequencyStoreTest() { + store = makeFreshStore(); + executorService = Executors.newFixedThreadPool(NUM_EXECUTOR_THREADS); + wrapper = new ThrowableRecordingRunnableWrapper("KeyedFrequencyStoreTest"); + } + + @After + public void tearDown_KeyedFrequencyStoreTest() { + store = null; + MoreExecutors.shutdownAndAwaitTermination(executorService, TestUtils.WAIT_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + } + + @Test + public void zeroFrequency() { + store.put("a", new Object(), 0); + assertSame(null, store.consume("a")); + } + + @Test + public void simpleSingleThreaded() { + int frequency = 100; + Object objA = new Object(); + store.put("a", objA, frequency); + for (int i = 0; i < frequency; i++) { + assertSame(objA, store.consume("a")); + } + assertSame(null, store.consume("a")); + assertSame(null, store.consume("a")); + } + + @Test + public void simpleMultiThreaded() throws Exception { + int extra = 100; + int frequency = NUM_EXECUTOR_THREADS - extra; + final Object objA = new Object(); + store.put("a", objA, frequency); + final AtomicInteger nullCount = new AtomicInteger(0); + final AtomicInteger nonNullCount = new AtomicInteger(0); + Runnable runnable = new Runnable() { + @Override + public void run() { + Object obj = store.consume("a"); + if (obj == null) { + nullCount.incrementAndGet(); + } else { + assertSame(objA, obj); + nonNullCount.incrementAndGet(); + } + } + }; + for (int i = 0; i < NUM_EXECUTOR_THREADS; i++) { + executorService.submit(wrapper.wrap(runnable)); + } + boolean interrupted = ExecutorShutdownUtil.interruptibleShutdown(executorService); + Throwables.propagateIfPossible(wrapper.getFirstThrownError()); + if (interrupted) { + Thread.currentThread().interrupt(); + throw new InterruptedException(); + } + assertEquals(frequency, nonNullCount.get()); + assertEquals(extra, nullCount.get()); + } +}
\ No newline at end of file diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStoreTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStoreTest.java new file mode 100644 index 0000000000..0a74cb6208 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStoreTest.java @@ -0,0 +1,26 @@ +// Copyright 2015 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link RefCountedHashMapKeyedFrequencyStore}. */ +@RunWith(JUnit4.class) +public class RefCountedHashMapKeyedFrequencyStoreTest extends KeyedFrequencyStoreTest { + @Override + protected KeyedFrequencyStore<String, Object> makeFreshStore() { + return new RefCountedHashMapKeyedFrequencyStore<String, Object>(); + } +}
\ No newline at end of file |