aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Nathan Harmata <nharmata@google.com>2015-06-15 18:12:48 +0000
committerGravatar Kristina Chodorow <kchodorow@google.com>2015-06-16 13:58:55 +0000
commit6f049bb19941b89d16364b26cca66aae09f9cb42 (patch)
tree8bbfa340a0f06d2af72929cd8c63f6c6f9e08ac8 /src
parent354e6d21cb04ce588a25379247ff5fa094321a2a (diff)
Introduce a simple concurrent Multimap-like data structure with reference counting.
-- MOS_MIGRATED_REVID=96024804
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStore.java41
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStore.java77
-rw-r--r--src/test/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStoreTest.java105
-rw-r--r--src/test/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStoreTest.java26
4 files changed, 249 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStore.java b/src/main/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStore.java
new file mode 100644
index 0000000000..df49454318
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStore.java
@@ -0,0 +1,41 @@
+// Copyright 2015 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ConditionallyThreadSafe;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+
+import javax.annotation.Nullable;
+
+/** A map from keys to values, with frequencies. */
+public interface KeyedFrequencyStore<K, V> {
+ @ConditionallyThreadSafe
+ /**
+ * Inserts {@code value} for the given {@code key} with the given non-negative {@code frequency}
+ * (overwriting any existing value for that key).
+ *
+ * <p>Cannot be called concurrently with a call to {@code consume(key)}.
+ */
+ void put(K key, V value, int frequency);
+
+ @Nullable
+ @ThreadSafe
+ /**
+ * Removes for consumption one of the occurrences of the value for {@code key}, if any.
+ *
+ * <p>Formally, a call {@code consume(k)} returns {@code v} if it is the {@code f'}th such call
+ * since a call to {@code put(k, v, f)} (with {@code f' <= f}), and {@code null} otherwise.
+ */
+ V consume(K key);
+} \ No newline at end of file
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStore.java b/src/main/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStore.java
new file mode 100644
index 0000000000..6779e43b0b
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStore.java
@@ -0,0 +1,77 @@
+// Copyright 2015 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+/**
+ * An implementation of {@link KeyedFrequencyStore} that uses ref counting to efficiently only
+ * store (key, value) pairs that have positive frequency.
+ */
+public class RefCountedHashMapKeyedFrequencyStore<K, V> implements KeyedFrequencyStore<K, V> {
+ private final ConcurrentHashMap<K, ValueWithFrequency<V>> map = new ConcurrentHashMap<>();
+
+ private static class ValueWithFrequency<V> {
+ private final V value;
+ private final AtomicInteger frequency;
+
+ protected ValueWithFrequency(V value, int initialFrequency) {
+ this.value = value;
+ this.frequency = new AtomicInteger(initialFrequency);
+ }
+ }
+
+ @Override
+ public void put(K key, V value, int frequency) {
+ Preconditions.checkState(frequency >= 0, frequency);
+ if (frequency == 0) {
+ map.remove(key);
+ } else {
+ map.put(key, new ValueWithFrequency<>(value, frequency));
+ }
+ }
+
+ @Override
+ @Nullable
+ public V consume(K key) {
+ ValueWithFrequency<V> vwf = map.get(key);
+ if (vwf == null) {
+ // Either the key isn't present or it has already been removed (because it has already
+ // been consumed).
+ return null;
+ }
+ int oldFrequency = vwf.frequency.getAndDecrement();
+ if (oldFrequency <= 0) {
+ // This can happen as a result of the following race: suppose the current frequency for key K
+ // is F and T > F threads call consume(K) and all of them see the same object from the
+ // map.get call above.. F-1 of these consume calls will decrement the frequency all the way
+ // down to 1, one thread will "win the race" and decrement the frequency to 0 (see below
+ // code), but the other T-F threads will be left with a "stale" ValueWithFrequency instance.
+ // Since the value has already been exhaustively consumed, returning null is the appropriate
+ // behavior here.
+ return null;
+ }
+ if (oldFrequency == 1) {
+ // We are the final consumer of the key.
+ map.remove(key);
+ }
+ return vwf.value;
+ }
+}
+
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStoreTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStoreTest.java
new file mode 100644
index 0000000000..47a34e9226
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/concurrent/KeyedFrequencyStoreTest.java
@@ -0,0 +1,105 @@
+// Copyright 2015 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.devtools.build.lib.testutil.TestUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Base class for tests for {@link KeyedFrequencyStore} implementations. */
+public abstract class KeyedFrequencyStoreTest {
+ private static final int NUM_EXECUTOR_THREADS = 1000;
+ private ExecutorService executorService;
+ private ThrowableRecordingRunnableWrapper wrapper;
+ private KeyedFrequencyStore<String, Object> store;
+
+ protected abstract KeyedFrequencyStore<String, Object> makeFreshStore();
+
+ @Before
+ public void setUp_KeyedFrequencyStoreTest() {
+ store = makeFreshStore();
+ executorService = Executors.newFixedThreadPool(NUM_EXECUTOR_THREADS);
+ wrapper = new ThrowableRecordingRunnableWrapper("KeyedFrequencyStoreTest");
+ }
+
+ @After
+ public void tearDown_KeyedFrequencyStoreTest() {
+ store = null;
+ MoreExecutors.shutdownAndAwaitTermination(executorService, TestUtils.WAIT_TIMEOUT_SECONDS,
+ TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void zeroFrequency() {
+ store.put("a", new Object(), 0);
+ assertSame(null, store.consume("a"));
+ }
+
+ @Test
+ public void simpleSingleThreaded() {
+ int frequency = 100;
+ Object objA = new Object();
+ store.put("a", objA, frequency);
+ for (int i = 0; i < frequency; i++) {
+ assertSame(objA, store.consume("a"));
+ }
+ assertSame(null, store.consume("a"));
+ assertSame(null, store.consume("a"));
+ }
+
+ @Test
+ public void simpleMultiThreaded() throws Exception {
+ int extra = 100;
+ int frequency = NUM_EXECUTOR_THREADS - extra;
+ final Object objA = new Object();
+ store.put("a", objA, frequency);
+ final AtomicInteger nullCount = new AtomicInteger(0);
+ final AtomicInteger nonNullCount = new AtomicInteger(0);
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ Object obj = store.consume("a");
+ if (obj == null) {
+ nullCount.incrementAndGet();
+ } else {
+ assertSame(objA, obj);
+ nonNullCount.incrementAndGet();
+ }
+ }
+ };
+ for (int i = 0; i < NUM_EXECUTOR_THREADS; i++) {
+ executorService.submit(wrapper.wrap(runnable));
+ }
+ boolean interrupted = ExecutorShutdownUtil.interruptibleShutdown(executorService);
+ Throwables.propagateIfPossible(wrapper.getFirstThrownError());
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedException();
+ }
+ assertEquals(frequency, nonNullCount.get());
+ assertEquals(extra, nullCount.get());
+ }
+} \ No newline at end of file
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStoreTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStoreTest.java
new file mode 100644
index 0000000000..0a74cb6208
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/concurrent/RefCountedHashMapKeyedFrequencyStoreTest.java
@@ -0,0 +1,26 @@
+// Copyright 2015 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RefCountedHashMapKeyedFrequencyStore}. */
+@RunWith(JUnit4.class)
+public class RefCountedHashMapKeyedFrequencyStoreTest extends KeyedFrequencyStoreTest {
+ @Override
+ protected KeyedFrequencyStore<String, Object> makeFreshStore() {
+ return new RefCountedHashMapKeyedFrequencyStore<String, Object>();
+ }
+} \ No newline at end of file