aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/actions/ResourceManager.java
blob: da62312a1a740da84ee452892f1e944c8be1b096 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
// Copyright 2014 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.lib.actions;

import static com.google.devtools.build.lib.profiler.AutoProfiler.profiled;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.eventbus.EventBus;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.profiler.AutoProfiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.util.Pair;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * Used to keep track of resources consumed by the Blaze action execution threads and throttle them
 * when necessary.
 *
 * <p>Threads which are known to consume a significant amount of resources should call
 * {@link #acquireResources} method. This method will check whether requested resources are
 * available and will either mark them as used and allow the thread to proceed or will block the
 * thread until requested resources will become available. When the thread completes its task, it
 * must release allocated resources by calling {@link #releaseResources} method.
 *
 * <p>Available resources can be calculated using one of three ways:
 * <ol>
 * <li>They can be preset using {@link #setAvailableResources(ResourceSet)} method. This is used
 *     mainly by the unit tests (however it is possible to provide a future option that would
 *     artificially limit amount of CPU/RAM consumed by the Blaze).
 * <li>They can be preset based on the /proc/cpuinfo and /proc/meminfo information. Blaze will
 *     calculate amount of available CPU cores (adjusting for hyperthreading logical cores) and
 *     amount of the total available memory and will limit itself to the number of effective cores
 *     and 2/3 of the available memory. For details, please look at the {@link
 *     LocalHostCapacity#getLocalHostCapacity} method.
 * </ol>
 *
 * <p>The resource manager also allows a slight overallocation of the resources to account for the
 * fact that requested resources are usually estimated using a pessimistic approximation. It also
 * guarantees that at least one thread will always be able to acquire any amount of requested
 * resources (even if it is greater than amount of available resources). Therefore, assuming that
 * threads correctly release acquired resources, Blaze will never be fully blocked.
 */
@ThreadSafe
public class ResourceManager {

  private EventBus eventBus;

  private final ThreadLocal<Boolean> threadLocked = new ThreadLocal<Boolean>() {
    @Override
    protected Boolean initialValue() {
      return false;
    }
  };

  /**
   * Singleton reference defined in a separate class to ensure thread-safe lazy
   * initialization.
   */
  private static class Singleton {
    static ResourceManager instance = new ResourceManager();
  }

  /**
   * Returns singleton instance of the resource manager.
   */
  public static ResourceManager instance() {
    return Singleton.instance;
  }

  // Allocated resources are allowed to go "negative", but at least
  // MIN_AVAILABLE_CPU_RATIO portion of CPU and MIN_AVAILABLE_RAM_RATIO portion
  // of RAM should be available.
  // Please note that this value is purely empirical - we assume that generally
  // requested resources are somewhat pessimistic and thread would end up
  // using less than requested amount.
  private static final double MIN_NECESSARY_CPU_RATIO = 0.6;
  private static final double MIN_NECESSARY_RAM_RATIO = 1.0;
  private static final double MIN_NECESSARY_IO_RATIO = 1.0;

  // List of blocked threads. Associated CountDownLatch object will always
  // be initialized to 1 during creation in the acquire() method.
  private final List<Pair<ResourceSet, CountDownLatch>> requestList;

  // The total amount of resources on the local host. Must be set by
  // an explicit call to setAvailableResources(), often using
  // LocalHostCapacity.getLocalHostCapacity() as an argument.
  private ResourceSet staticResources = null;

  private ResourceSet availableResources = null;

  // Used amount of CPU capacity (where 1.0 corresponds to the one fully
  // occupied CPU core. Corresponds to the CPU resource definition in the
  // ResourceSet class.
  private double usedCpu;

  // Used amount of RAM capacity in MB. Corresponds to the RAM resource
  // definition in the ResourceSet class.
  private double usedRam;

  // Used amount of I/O resources. Corresponds to the I/O resource
  // definition in the ResourceSet class.
  private double usedIo;

  // Used local test count. Corresponds to the local test count definition in the ResourceSet class.
  private int usedLocalTestCount;

  // Specifies how much of the RAM in staticResources we should allow to be used.
  public static final int DEFAULT_RAM_UTILIZATION_PERCENTAGE = 67;
  private int ramUtilizationPercentage = DEFAULT_RAM_UTILIZATION_PERCENTAGE;

  private ResourceManager() {
    requestList = new LinkedList<>();
  }

  @VisibleForTesting public static ResourceManager instanceForTestingOnly() {
    return new ResourceManager();
  }

  /**
   * Resets resource manager state and releases all thread locks.
   * Note - it does not reset available resources. Use
   * separate call to setAvailableResoures().
   */
  public synchronized void resetResourceUsage() {
    usedCpu = 0;
    usedRam = 0;
    usedIo = 0;
    usedLocalTestCount = 0;
    for (Pair<ResourceSet, CountDownLatch> request : requestList) {
      // CountDownLatch can be set only to 0 or 1.
      request.second.countDown();
    }
    requestList.clear();
  }

  /**
   * Sets available resources using given resource set. Must be called
   * at least once before using resource manager.
   */
  public synchronized void setAvailableResources(ResourceSet resources) {
    Preconditions.checkNotNull(resources);
    staticResources = resources;
    availableResources = ResourceSet.create(
        staticResources.getMemoryMb() * this.ramUtilizationPercentage / 100.0,
        staticResources.getCpuUsage(),
        staticResources.getIoUsage(),
        staticResources.getLocalTestCount());
    processWaitingThreads();
  }

  /**
   * Specify how much of the available RAM we should allow to be used.
   * This has no effect if autosensing is enabled.
   */
  public synchronized void setRamUtilizationPercentage(int percentage) {
    ramUtilizationPercentage = percentage;
  }

  /**
   * Acquires requested resource set. Will block if resource is not available.
   * NB! This method must be thread-safe!
   */
  public void acquireResources(ActionMetadata owner, ResourceSet resources)
      throws InterruptedException {
    Preconditions.checkNotNull(resources);
    AutoProfiler p = profiled(owner, ProfilerTask.ACTION_LOCK);
    CountDownLatch latch = null;
    try {
      waiting(owner);
      latch = acquire(resources);
      if (latch != null) {
        latch.await();
      }
    } finally {
      threadLocked.set(resources.getCpuUsage() != 0 || resources.getMemoryMb() != 0
          || resources.getIoUsage() != 0 || resources.getLocalTestCount() != 0);
      acquired(owner);

      // Profile acquisition only if it waited for resource to become available.
      if (latch != null) {
        p.complete();
      }
    }
  }

  /**
   * Acquires the given resources if available immediately. Does not block.
   * @return true iff the given resources were locked (all or nothing).
   */
  public boolean tryAcquire(ActionMetadata owner, ResourceSet resources) {
    boolean acquired = false;
    synchronized (this) {
      if (areResourcesAvailable(resources)) {
        incrementResources(resources);
        acquired = true;
      }
    }

    if (acquired) {
      threadLocked.set(resources.getCpuUsage() != 0 || resources.getMemoryMb() != 0
          || resources.getIoUsage() != 0 || resources.getLocalTestCount() != 0);
      acquired(owner);
    }

    return acquired;
  }

  private void incrementResources(ResourceSet resources) {
    usedCpu += resources.getCpuUsage();
    usedRam += resources.getMemoryMb();
    usedIo += resources.getIoUsage();
    usedLocalTestCount += resources.getLocalTestCount();
  }

  /**
   * Return true if any resources have been claimed through this manager.
   */
  public synchronized boolean inUse() {
    return usedCpu != 0.0 || usedRam != 0.0 || usedIo != 0.0 || usedLocalTestCount != 0
        || !requestList.isEmpty();
  }


  /**
   * Return true iff this thread has a lock on non-zero resources.
   */
  public boolean threadHasResources() {
    return threadLocked.get();
  }

  public void setEventBus(EventBus eventBus) {
    Preconditions.checkState(this.eventBus == null);
    this.eventBus = Preconditions.checkNotNull(eventBus);
  }

  public void unsetEventBus() {
    Preconditions.checkState(this.eventBus != null);
    this.eventBus = null;
  }

  private void waiting(ActionMetadata owner) {
    if (eventBus != null) {
      // Null only in tests.
      eventBus.post(ActionStatusMessage.schedulingStrategy(owner));
    }
  }

  private void acquired(ActionMetadata owner) {
    if (eventBus != null) {
      // Null only in tests.
      eventBus.post(ActionStatusMessage.runningStrategy(owner));
    }
  }

  /**
   * Releases previously requested resource =.
   *
   * <p>NB! This method must be thread-safe!
   */
  public void releaseResources(ActionMetadata owner, ResourceSet resources) {
    boolean isConflict = false;
    AutoProfiler p = profiled(owner, ProfilerTask.ACTION_RELEASE);
    try {
      isConflict = release(resources);
    } finally {
      threadLocked.set(false);

      // Profile resource release only if it resolved at least one allocation request.
      if (isConflict) {
        p.complete();
      }
    }
  }

  private synchronized CountDownLatch acquire(ResourceSet resources) {
    if (areResourcesAvailable(resources)) {
      incrementResources(resources);
      return null;
    }
    Pair<ResourceSet, CountDownLatch> request =
      new Pair<>(resources, new CountDownLatch(1));
    requestList.add(request);
    return request.second;
  }

  private synchronized boolean release(ResourceSet resources) {
    usedCpu -= resources.getCpuUsage();
    usedRam -= resources.getMemoryMb();
    usedIo -= resources.getIoUsage();
    usedLocalTestCount -= resources.getLocalTestCount();

    // TODO(bazel-team): (2010) rounding error can accumulate and value below can end up being
    // e.g. 1E-15. So if it is small enough, we set it to 0. But maybe there is a better solution.
    double epsilon = 0.0001;
    if (usedCpu < epsilon) {
      usedCpu = 0;
    }
    if (usedRam < epsilon) {
      usedRam = 0;
    }
    if (usedIo < epsilon) {
      usedIo = 0;
    }
    if (!requestList.isEmpty()) {
      processWaitingThreads();
      return true;
    }
    return false;
  }


  /**
   * Tries to unblock one or more waiting threads if there are sufficient resources available.
   */
  private synchronized void processWaitingThreads() {
    Iterator<Pair<ResourceSet, CountDownLatch>> iterator = requestList.iterator();
    while (iterator.hasNext()) {
      Pair<ResourceSet, CountDownLatch> request = iterator.next();
      if (areResourcesAvailable(request.first)) {
        incrementResources(request.first);
        request.second.countDown();
        iterator.remove();
      }
    }
  }

  // Method will return true if all requested resources are considered to be available.
  private boolean areResourcesAvailable(ResourceSet resources) {
    Preconditions.checkNotNull(availableResources);
    // Comparison below is robust, since any calculation errors will be fixed
    // by the release() method.
    if (usedCpu == 0.0 && usedRam == 0.0 && usedIo == 0.0 && usedLocalTestCount == 0) {
      return true;
    }
    // Use only MIN_NECESSARY_???_RATIO of the resource value to check for
    // allocation. This is necessary to account for the fact that most of the
    // requested resource sets use pessimistic estimations. Note that this
    // ratio is used only during comparison - for tracking we will actually
    // mark whole requested amount as used.
    double cpu = resources.getCpuUsage() * MIN_NECESSARY_CPU_RATIO;
    double ram = resources.getMemoryMb() * MIN_NECESSARY_RAM_RATIO;
    double io = resources.getIoUsage() * MIN_NECESSARY_IO_RATIO;
    int localTestCount = resources.getLocalTestCount();

    double availableCpu = availableResources.getCpuUsage();
    double availableRam = availableResources.getMemoryMb();
    double availableIo = availableResources.getIoUsage();
    int availableLocalTestCount = availableResources.getLocalTestCount();

    // Resources are considered available if any one of the conditions below is true:
    // 1) If resource is not requested at all, it is available.
    // 2) If resource is not used at the moment, it is considered to be
    // available regardless of how much is requested. This is necessary to
    // ensure that at any given time, at least one thread is able to acquire
    // resources even if it requests more than available.
    // 3) If used resource amount is less than total available resource amount.
    boolean cpuIsAvailable = cpu == 0.0 || usedCpu == 0.0 || usedCpu + cpu <= availableCpu;
    boolean ramIsAvailable = ram == 0.0 || usedRam == 0.0 || usedRam + ram <= availableRam;
    boolean ioIsAvailable = io == 0.0 || usedIo == 0.0 || usedIo + io <= availableIo;
    boolean localTestCountIsAvailable = localTestCount == 0 || usedLocalTestCount == 0
        || usedLocalTestCount + localTestCount <= availableLocalTestCount;
    return cpuIsAvailable && ramIsAvailable && ioIsAvailable && localTestCountIsAvailable;
  }


  @VisibleForTesting
  synchronized int getWaitCount() {
    return requestList.size();
  }

  @VisibleForTesting
  synchronized boolean isAvailable(double ram, double cpu, double io, int localTestCount) {
    return areResourcesAvailable(ResourceSet.create(ram, cpu, io, localTestCount));
  }
}