// Copyright 2016 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.concurrent; import static com.google.common.truth.Truth.assertThat; import com.google.common.base.Preconditions; import com.google.common.collect.Collections2; import com.google.common.collect.ConcurrentHashMultiset; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.devtools.build.lib.testutil.TestThread; import com.google.devtools.build.lib.testutil.TestUtils; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** Tests for {@link MultisetSemaphore}. */ @RunWith(JUnit4.class) public class MultisetSemaphoreTest { @Test public void testSimple_Serial() throws Exception { // When we have a MultisetSemaphore MultisetSemaphore multisetSemaphore = MultisetSemaphore.newBuilder() // with 3 max num unique values, .maxNumUniqueValues(3) .build(); // Then it initially has 0 unique values. assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(0); // And then when we serially acquire permits for 3 unique values, multisetSemaphore.acquireAll(ImmutableSet.of("a", "b", "c")); // Then the MultisetSemaphore thinks it currently has 3 unique values. assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(3); // And then when we attempt to acquire permits for 2 of those same unique values, we don't block // forever, multisetSemaphore.acquireAll(ImmutableSet.of("b", "c")); // And the MultisetSemaphore still thinks it currently has 3 unique values. assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(3); // And then when we release one of the permit for one of those unique values, multisetSemaphore.releaseAll(ImmutableSet.of("c")); // The MultisetSemaphore still thinks it currently has 3 unique values. assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(3); // And then we release the final permit for that unique value, multisetSemaphore.releaseAll(ImmutableSet.of("c")); // The MultisetSemaphore thinks it currently has 2 unique values. assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(2); // And then we attempt to acquire a permit for a 4th unique value, we don't block forever, multisetSemaphore.acquireAll(ImmutableSet.of("d")); // And the MultisetSemaphore thinks it currently has 3 unique values. assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(3); // And then we release one permit each for the remaining 3 that unique values, multisetSemaphore.releaseAll(ImmutableSet.of("a", "b", "d")); // The MultisetSemaphore thinks it currently has 1 unique values. assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(1); // And then we release the final permit for the remaining unique value, multisetSemaphore.releaseAll(ImmutableSet.of("b")); // The MultisetSemaphore thinks it currently has 0 unique values. assertThat(multisetSemaphore.estimateCurrentNumUniqueValues()).isEqualTo(0); } @Test public void testSimple_Concurrent() throws Exception { // When we have N and M, with M > N and M|N. final int n = 10; int m = n * 2; Preconditions.checkState(m > n && m % n == 0, "M=%d N=%d", m, n); // When we have a MultisetSemaphore final MultisetSemaphore multisetSemaphore = MultisetSemaphore.newBuilder() // with N max num unique values, .maxNumUniqueValues(n) .build(); // And a ExecutorService with M threads, ExecutorService executorService = Executors.newFixedThreadPool(m); // And a recorder for thrown exceptions, ThrowableRecordingRunnableWrapper wrapper = new ThrowableRecordingRunnableWrapper("testSimple_Concurrent"); final AtomicInteger numThreadsJustAfterAcquireInFirstRound = new AtomicInteger(0); final AtomicInteger numThreadsJustAfterAcquireInSecondRound = new AtomicInteger(0); final AtomicInteger secondRoundCompleted = new AtomicInteger(0); final int napTimeMs = 42; for (int i = 0; i < m; i++) { final String val = "val" + i; // And we submit M Runnables, each of which @SuppressWarnings("unused") Future possiblyIgnoredError = executorService.submit( wrapper.wrap( new Runnable() { @Override public void run() { try { // Has two rounds // Wherein the first round // The Runnable acquire a permit for a unique value (among M values), ImmutableSet valSet = ImmutableSet.of(val); multisetSemaphore.acquireAll(valSet); assertThat(numThreadsJustAfterAcquireInFirstRound.getAndIncrement()) .isLessThan(n); // And then sleeps, Thread.sleep(napTimeMs); numThreadsJustAfterAcquireInFirstRound.decrementAndGet(); multisetSemaphore.releaseAll(valSet); // And wherein the second round // The Runnable again acquires a permit for its unique value, multisetSemaphore.acquireAll(valSet); assertThat(numThreadsJustAfterAcquireInSecondRound.getAndIncrement()) .isLessThan(n); // And then sleeps, Thread.sleep(napTimeMs); numThreadsJustAfterAcquireInSecondRound.decrementAndGet(); // And notes that it has completed the second round, secondRoundCompleted.incrementAndGet(); multisetSemaphore.releaseAll(valSet); } catch (InterruptedException e) { throw new IllegalStateException(e); } } })); } // And we wait for all M Runnables to complete (that is, none of them were deadlocked), boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService); // Then none of our Runnables threw any Exceptions. assertThat(wrapper.getFirstThrownError()).isNull(); if (interrupted) { Thread.currentThread().interrupt(); throw new InterruptedException(); } // And the counters we used for sanity checks were correctly reset to 0. assertThat(numThreadsJustAfterAcquireInFirstRound.get()).isEqualTo(0); assertThat(numThreadsJustAfterAcquireInSecondRound.get()).isEqualTo(0); // And all M Runnables completed the second round. assertThat(secondRoundCompleted.get()).isEqualTo(m); Set newVals = new HashSet<>(); for (int i = 0; i < n; i++) { newVals.add("newval" + i); } // And the main test thread is able to acquire permits for N new unique values (indirectly // confirming that the MultisetSemaphore previously had no outstanding permits). multisetSemaphore.acquireAll(newVals); } @Test public void testConcurrentAtomicity() throws Exception { int n = 100; // When we have a MultisetSemaphore final MultisetSemaphore multisetSemaphore = MultisetSemaphore.newBuilder() // with 2 max num unique values, .maxNumUniqueValues(2) .build(); // And a ExecutorService with N threads, ExecutorService executorService = Executors.newFixedThreadPool(n); // And a recorder for thrown exceptions, ThrowableRecordingRunnableWrapper wrapper = new ThrowableRecordingRunnableWrapper("testConcurrentAtomicity"); final int napTimeMs = 42; // And a done latch with initial count N, final CountDownLatch allDoneLatch = new CountDownLatch(n); final String sameVal = "same-val"; for (int i = 0; i < n; i++) { final String differentVal = "different-val" + i; // And we submit N Runnables, each of which @SuppressWarnings("unused") Future possiblyIgnoredError = executorService.submit( wrapper.wrap( new Runnable() { @Override public void run() { try { Set vals = ImmutableSet.of(sameVal, differentVal); // Tries to acquire permits for a set of two values, one of which is the // same for all the N Runnables and one of which is unique across all N // Runnables, multisetSemaphore.acquireAll(vals); // And then sleeps, Thread.sleep(napTimeMs); // And then releases its permits, multisetSemaphore.releaseAll(vals); // And then counts down the done latch, allDoneLatch.countDown(); } catch (InterruptedException e) { throw new IllegalStateException(e); } } })); } // Then all of our Runnables completed (without deadlock!), as expected, boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService); // And thus were able to count down the done latch, allDoneLatch.await(); // And also none of them threw any Exceptions. assertThat(wrapper.getFirstThrownError()).isNull(); if (interrupted) { Thread.currentThread().interrupt(); throw new InterruptedException(); } } @Test public void testConcurrentRace_AllPermuations() throws Exception { // When we have N values int n = 6; ArrayList vals = new ArrayList<>(); for (int i = 0; i < n; i++) { vals.add("val-" + i); } // And we have all permutations of these N values Collection> permutations = Collections2.orderedPermutations(vals); int numPermutations = permutations.size(); // And we have a MultisetSemaphore final MultisetSemaphore multisetSemaphore = MultisetSemaphore.newBuilder() // with N max num unique values, .maxNumUniqueValues(n) .build(); // And a ExecutorService with N! threads, ExecutorService executorService = Executors.newFixedThreadPool(numPermutations); // And a recorder for thrown exceptions, ThrowableRecordingRunnableWrapper wrapper = new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllPermuations"); for (List orderedVals : permutations) { final Set orderedSet = new LinkedHashSet<>(orderedVals); // And we submit N! Runnables, each of which @SuppressWarnings("unused") Future possiblyIgnoredError = executorService.submit( wrapper.wrap( new Runnable() { @Override public void run() { try { // Tries to acquire permits for the set of N values, with a unique // iteration order (across all the N! different permutations), multisetSemaphore.acquireAll(orderedSet); // And then immediately releases the permits. multisetSemaphore.releaseAll(orderedSet); } catch (InterruptedException e) { throw new IllegalStateException(e); } } })); } // Then all of our Runnables completed (without deadlock!), as expected, boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService); // And also none of them threw any Exceptions. assertThat(wrapper.getFirstThrownError()).isNull(); if (interrupted) { Thread.currentThread().interrupt(); throw new InterruptedException(); } } @Test public void testConcurrentRace_AllSameSizedCombinations() throws Exception { // When we have n values int n = 10; ImmutableSet.Builder valsBuilder = ImmutableSet.builder(); for (int i = 0; i < n; i++) { valsBuilder.add("val-" + i); } ImmutableSet vals = valsBuilder.build(); int k = 5; // And we have all combinations of size k of these n values Set> combinations = Sets.combinations(vals, k); int numCombinations = combinations.size(); // And we have a MultisetSemaphore final MultisetSemaphore multisetSemaphore = MultisetSemaphore.newBuilder() // with K max num unique values, .maxNumUniqueValues(k) .build(); // And a ExecutorService with nCk threads, ExecutorService executorService = Executors.newFixedThreadPool(numCombinations); // And a recorder for thrown exceptions, ThrowableRecordingRunnableWrapper wrapper = new ThrowableRecordingRunnableWrapper("testConcurrentRace_AllSameSizedCombinations"); // And a ConcurrentHashMultiset for counting the multiplicities of the values ourselves, ConcurrentHashMultiset counts = ConcurrentHashMultiset.create(); for (Set combination : combinations) { // And, for each of the nCk combinations, we submit a Runnable, that @SuppressWarnings("unused") Future possiblyIgnoredError = executorService.submit( wrapper.wrap( new Runnable() { @Override public void run() { try { // Tries to acquire permits for its set of k values, multisetSemaphore.acquireAll(combination); // And then verifies that the multiplicities are as expected, combination.forEach(counts::add); assertThat(counts.entrySet().size()).isAtMost(k); combination.forEach(counts::remove); // And then releases the permits. multisetSemaphore.releaseAll(combination); } catch (InterruptedException e) { throw new IllegalStateException(e); } } })); } // Then all of our Runnables completed (without deadlock!), as expected, boolean interrupted = ExecutorUtil.interruptibleShutdown(executorService); // And also none of them threw any Exceptions. assertThat(wrapper.getFirstThrownError()).isNull(); if (interrupted) { Thread.currentThread().interrupt(); throw new InterruptedException(); } } @Test public void testSimpleDeadlock() throws Exception { final MultisetSemaphore multisetSemaphore = MultisetSemaphore.newBuilder() .maxNumUniqueValues(2) .build(); CountDownLatch thread1AcquiredLatch = new CountDownLatch(1); CountDownLatch thread2AboutToAcquireLatch = new CountDownLatch(1); CountDownLatch thread3AboutToAcquireLatch = new CountDownLatch(1); TestThread thread1 = new TestThread() { @Override public void runTest() throws InterruptedException { multisetSemaphore.acquireAll(ImmutableSet.of("a", "b")); thread1AcquiredLatch.countDown(); thread2AboutToAcquireLatch.await( TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS); thread3AboutToAcquireLatch.await( TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS); Thread.sleep(1000); multisetSemaphore.releaseAll(ImmutableSet.of("a", "b")); } }; thread1.setName("Thread1"); TestThread thread2 = new TestThread() { @Override public void runTest() throws InterruptedException { thread1AcquiredLatch.await( TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS); thread2AboutToAcquireLatch.countDown(); multisetSemaphore.acquireAll(ImmutableSet.of("b", "c")); multisetSemaphore.releaseAll(ImmutableSet.of("b", "c")); } }; thread2.setName("Thread2"); TestThread thread3 = new TestThread() { @Override public void runTest() throws InterruptedException { thread2AboutToAcquireLatch.await( TestUtils.WAIT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS); Thread.sleep(1000); thread3AboutToAcquireLatch.countDown(); multisetSemaphore.acquireAll(ImmutableSet.of("a", "d")); multisetSemaphore.releaseAll(ImmutableSet.of("a", "d")); } }; thread3.setName("Thread3"); thread1.start(); thread2.start(); thread3.start(); thread1.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS); thread2.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS); thread3.joinAndAssertState(TestUtils.WAIT_TIMEOUT_MILLISECONDS); } }