aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java48
1 files changed, 34 insertions, 14 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
index 8a0bb93bde..94611b5ef3 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
@@ -146,30 +146,50 @@ public abstract class MultisetSemaphore<T> {
@Override
public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException {
- int numUniqueValuesToAcquire = 0;
+ int oldNumNeededPermits;
synchronized (lock) {
- for (T value : valuesToAcquire) {
- int oldCount = actualValues.add(value, 1);
- if (oldCount == 0) {
- numUniqueValuesToAcquire++;
+ oldNumNeededPermits = computeNumNeededPermitsLocked(valuesToAcquire);
+ }
+ while (true) {
+ semaphore.acquire(oldNumNeededPermits);
+ synchronized (lock) {
+ int newNumNeededPermits = computeNumNeededPermitsLocked(valuesToAcquire);
+ if (newNumNeededPermits != oldNumNeededPermits) {
+ // While we were doing 'acquire' above, another thread won the race to acquire the first
+ // usage of one of the values in 'valuesToAcquire' or release the last usage of one of
+ // the values. This means we either acquired too many or too few permits, respectively,
+ // above. Release the permits we did acquire, in order to restore the accuracy of the
+ // semaphore's current count, and then try again.
+ semaphore.release(oldNumNeededPermits);
+ oldNumNeededPermits = newNumNeededPermits;
+ continue;
+ } else {
+ // Our modification to the semaphore was correct, so it's sound to update the multiset.
+ valuesToAcquire.forEach(actualValues::add);
+ return;
}
}
}
- semaphore.acquire(numUniqueValuesToAcquire);
+ }
+
+ private int computeNumNeededPermitsLocked(Set<T> valuesToAcquire) {
+ // We need a permit for each value that is not already in the multiset.
+ return (int) valuesToAcquire.stream()
+ .filter(v -> actualValues.count(v) == 0)
+ .count();
}
@Override
public void releaseAll(Set<T> valuesToRelease) {
- int numUniqueValuesToRelease = 0;
synchronized (lock) {
- for (T value : valuesToRelease) {
- int oldCount = actualValues.remove(value, 1);
- if (oldCount == 1) {
- numUniqueValuesToRelease++;
- }
- }
+ // We need to release a permit for each value that currently has multiplicity 1.
+ int numPermitsToRelease =
+ valuesToRelease
+ .stream()
+ .mapToInt(v -> actualValues.remove(v, 1) == 1 ? 1 : 0)
+ .sum();
+ semaphore.release(numPermitsToRelease);
}
- semaphore.release(numUniqueValuesToRelease);
}
@Override