aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/concurrent/MultisetSemaphore.java
blob: 8a0bb93bde450cfab35f0564cd401b51ab5f0c10 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.concurrent;

import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import java.util.Set;
import java.util.concurrent.Semaphore;

/**
 * A concurrency primitive for managing access to at most K unique things at once, for a fixed K.
 *
 * <p>You can think of this as a pair of a {@link Semaphore} with K total permits and a
 * {@link Multiset}, with permits being doled out and returned based on the current contents of the
 * {@link Multiset}.
 */
@ThreadSafe
public abstract class MultisetSemaphore<T> {
  /**
   * Blocks until permits are available for all the values in {@code valuesToAcquire}, and then
   * atomically acquires these permits.
   *
   * <p>{@code acquireAll(valuesToAcquire)} atomically does the following
   * <ol>
   *   <li>Computes {@code m}, the number of values in {@code valuesToAcquire} that are not
   *   currently in the backing {@link Multiset}.
   *   <li>Adds {@code valuesToAcquire} to the backing {@link Multiset}.
   *   <li>Blocks until {@code m} permits are available from the backing {@link Semaphore}.
   *   <li>Acquires these permits.
   * </ol>
   */
  public abstract void acquireAll(Set<T> valuesToAcquire) throws InterruptedException;

  /**
   * Atomically releases permits for all the values in {@code valuesToAcquire}.
   *
   * <p>{@code releaseAll(valuesToRelease)} atomically does the following
   * <ol>
   *   <li>Computes {@code m}, the number of values in {@code valuesToRelease} that are currently in
   *   the backing {@link Multiset} with multiplicity 1.
   *   <li>Removes {@code valuesToRelease} from the backing {@link Multiset}.
   *   <li>Release {@code m} permits from the backing {@link Semaphore}.
   * </ol>
   *
   * <p>Assumes that this {@link MultisetSemaphore} has already given out permits for all the
   * values in {@code valuesToAcquire}.
   */
  public abstract void releaseAll(Set<T> valuesToRelease);

  public abstract int estimateCurrentNumUniqueValues();

  /**
   * Returns a {@link MultisetSemaphore} with a backing {@link Semaphore} that has an unbounded
   * number of permits; that is, {@link #acquireAll} will never block.
   */
  public static <T> MultisetSemaphore<T> unbounded() {
    return UnboundedMultisetSemaphore.instance();
  }

  /** Builder for {@link MultisetSemaphore} instances. */
  public static class Builder {
    private static final int UNSET_INT = -1;

    private int maxNumUniqueValues = UNSET_INT;

    private Builder() {
    }

    /**
     * Sets the maximum number of unique values for which permits can be held at once in the
     * to-be-constructed {@link MultisetSemaphore}.
     */
    public Builder maxNumUniqueValues(int maxNumUniqueValues) {
      Preconditions.checkState(
          maxNumUniqueValues > 0,
          "maxNumUniqueValues must be positive (was %d)",
          maxNumUniqueValues);
      this.maxNumUniqueValues = maxNumUniqueValues;
      return this;
    }

    public <T> MultisetSemaphore<T> build() {
      Preconditions.checkState(
          maxNumUniqueValues != UNSET_INT,
          "maxNumUniqueValues(int) must be specified");
      return new NaiveMultisetSemaphore<>(maxNumUniqueValues);
    }
  }

  /** Returns a fresh {@link Builder}. */
  public static Builder newBuilder() {
    return new Builder();
  }

  private static class UnboundedMultisetSemaphore<T> extends MultisetSemaphore<T> {
    private static final UnboundedMultisetSemaphore<Object> INSTANCE =
        new UnboundedMultisetSemaphore<Object>();

    private UnboundedMultisetSemaphore() {
    }

    @SuppressWarnings("unchecked")
    private static <T> UnboundedMultisetSemaphore<T> instance() {
      return (UnboundedMultisetSemaphore<T>) INSTANCE;
    }

    @Override
    public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException {
    }

    @Override
    public void releaseAll(Set<T> valuesToRelease) {
    }

    @Override
    public int estimateCurrentNumUniqueValues() {
      // We can't give a good estimate since we don't track values at all.
      return 0;
    }
  }

  private static class NaiveMultisetSemaphore<T> extends MultisetSemaphore<T> {
    private final int maxNumUniqueValues;
    private final Semaphore semaphore;
    private final Object lock = new Object();
    // Protected by 'lock'.
    private final HashMultiset<T> actualValues = HashMultiset.create();

    private NaiveMultisetSemaphore(int maxNumUniqueValues) {
      this.maxNumUniqueValues = maxNumUniqueValues;
      this.semaphore = new Semaphore(maxNumUniqueValues);
    }

    @Override
    public void acquireAll(Set<T> valuesToAcquire) throws InterruptedException {
      int numUniqueValuesToAcquire = 0;
      synchronized (lock) {
        for (T value : valuesToAcquire) {
          int oldCount = actualValues.add(value, 1);
          if (oldCount == 0) {
            numUniqueValuesToAcquire++;
          }
        }
      }
      semaphore.acquire(numUniqueValuesToAcquire);
    }

    @Override
    public void releaseAll(Set<T> valuesToRelease) {
      int numUniqueValuesToRelease = 0;
      synchronized (lock) {
        for (T value : valuesToRelease) {
          int oldCount = actualValues.remove(value, 1);
          if (oldCount == 1) {
            numUniqueValuesToRelease++;
          }
        }
      }
      semaphore.release(numUniqueValuesToRelease);
    }

    @Override
    public int estimateCurrentNumUniqueValues() {
      return maxNumUniqueValues - semaphore.availablePermits();
    }
  }
}