diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java | 48 |
1 files changed, 34 insertions, 14 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java index 8a0bb93bde..94611b5ef3 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java @@ -146,30 +146,50 @@ public abstract class MultisetSemaphore<T> { @Override public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException { - int numUniqueValuesToAcquire = 0; + int oldNumNeededPermits; synchronized (lock) { - for (T value : valuesToAcquire) { - int oldCount = actualValues.add(value, 1); - if (oldCount == 0) { - numUniqueValuesToAcquire++; + oldNumNeededPermits = computeNumNeededPermitsLocked(valuesToAcquire); + } + while (true) { + semaphore.acquire(oldNumNeededPermits); + synchronized (lock) { + int newNumNeededPermits = computeNumNeededPermitsLocked(valuesToAcquire); + if (newNumNeededPermits != oldNumNeededPermits) { + // While we were doing 'acquire' above, another thread won the race to acquire the first + // usage of one of the values in 'valuesToAcquire' or release the last usage of one of + // the values. This means we either acquired too many or too few permits, respectively, + // above. Release the permits we did acquire, in order to restore the accuracy of the + // semaphore's current count, and then try again. + semaphore.release(oldNumNeededPermits); + oldNumNeededPermits = newNumNeededPermits; + continue; + } else { + // Our modification to the semaphore was correct, so it's sound to update the multiset. + valuesToAcquire.forEach(actualValues::add); + return; } } } - semaphore.acquire(numUniqueValuesToAcquire); + } + + private int computeNumNeededPermitsLocked(Set<T> valuesToAcquire) { + // We need a permit for each value that is not already in the multiset. + return (int) valuesToAcquire.stream() + .filter(v -> actualValues.count(v) == 0) + .count(); } @Override public void releaseAll(Set<T> valuesToRelease) { - int numUniqueValuesToRelease = 0; synchronized (lock) { - for (T value : valuesToRelease) { - int oldCount = actualValues.remove(value, 1); - if (oldCount == 1) { - numUniqueValuesToRelease++; - } - } + // We need to release a permit for each value that currently has multiplicity 1. + int numPermitsToRelease = + valuesToRelease + .stream() + .mapToInt(v -> actualValues.remove(v, 1) == 1 ? 1 : 0) + .sum(); + semaphore.release(numPermitsToRelease); } - semaphore.release(numUniqueValuesToRelease); } @Override |