// Copyright 2014 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.actions; import static com.google.devtools.build.lib.profiler.AutoProfiler.profiled; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.profiler.AutoProfiler; import com.google.devtools.build.lib.profiler.ProfilerTask; import com.google.devtools.build.lib.util.Pair; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; /** * Used to keep track of resources consumed by the Blaze action execution threads and throttle them * when necessary. * *

Threads which are known to consume a significant amount of resources should call * {@link #acquireResources} method. This method will check whether requested resources are * available and will either mark them as used and allow the thread to proceed or will block the * thread until requested resources will become available. When the thread completes its task, it * must release allocated resources by calling {@link #releaseResources} method. * *

Available resources can be calculated using one of three ways: *

    *
  1. They can be preset using {@link #setAvailableResources(ResourceSet)} method. This is used * mainly by the unit tests (however it is possible to provide a future option that would * artificially limit amount of CPU/RAM consumed by the Blaze). *
  2. They can be preset based on the /proc/cpuinfo and /proc/meminfo information. Blaze will * calculate amount of available CPU cores (adjusting for hyperthreading logical cores) and * amount of the total available memory and will limit itself to the number of effective cores * and 2/3 of the available memory. For details, please look at the {@link * LocalHostCapacity#getLocalHostCapacity} method. *
* *

The resource manager also allows a slight overallocation of the resources to account for the * fact that requested resources are usually estimated using a pessimistic approximation. It also * guarantees that at least one thread will always be able to acquire any amount of requested * resources (even if it is greater than amount of available resources). Therefore, assuming that * threads correctly release acquired resources, Blaze will never be fully blocked. */ @ThreadSafe public class ResourceManager { /** * A handle returned by {@link #acquireResources(ActionExecutionMetadata, ResourceSet)} that must * be closed in order to free the resources again. */ public static class ResourceHandle implements AutoCloseable { final ResourceManager rm; final ActionExecutionMetadata actionMetadata; final ResourceSet resourceSet; public ResourceHandle(ResourceManager rm, ActionExecutionMetadata actionMetadata, ResourceSet resources) { this.rm = rm; this.actionMetadata = actionMetadata; this.resourceSet = resources; } /** * Closing the ResourceHandle releases the resources associated with it. */ @Override public void close() { rm.releaseResources(actionMetadata, resourceSet); } } private final ThreadLocal threadLocked = new ThreadLocal() { @Override protected Boolean initialValue() { return false; } }; /** * Singleton reference defined in a separate class to ensure thread-safe lazy * initialization. */ private static class Singleton { static ResourceManager instance = new ResourceManager(); } /** * Returns singleton instance of the resource manager. */ public static ResourceManager instance() { return Singleton.instance; } // Allocated resources are allowed to go "negative", but at least // MIN_AVAILABLE_CPU_RATIO portion of CPU and MIN_AVAILABLE_RAM_RATIO portion // of RAM should be available. // Please note that this value is purely empirical - we assume that generally // requested resources are somewhat pessimistic and thread would end up // using less than requested amount. private static final double MIN_NECESSARY_CPU_RATIO = 0.6; private static final double MIN_NECESSARY_RAM_RATIO = 1.0; private static final double MIN_NECESSARY_IO_RATIO = 1.0; // List of blocked threads. Associated CountDownLatch object will always // be initialized to 1 during creation in the acquire() method. private final List> requestList; // The total amount of resources on the local host. Must be set by // an explicit call to setAvailableResources(), often using // LocalHostCapacity.getLocalHostCapacity() as an argument. private ResourceSet staticResources = null; private ResourceSet availableResources = null; // Used amount of CPU capacity (where 1.0 corresponds to the one fully // occupied CPU core. Corresponds to the CPU resource definition in the // ResourceSet class. private double usedCpu; // Used amount of RAM capacity in MB. Corresponds to the RAM resource // definition in the ResourceSet class. private double usedRam; // Used amount of I/O resources. Corresponds to the I/O resource // definition in the ResourceSet class. private double usedIo; // Used local test count. Corresponds to the local test count definition in the ResourceSet class. private int usedLocalTestCount; // Specifies how much of the RAM in staticResources we should allow to be used. public static final int DEFAULT_RAM_UTILIZATION_PERCENTAGE = 67; private int ramUtilizationPercentage = DEFAULT_RAM_UTILIZATION_PERCENTAGE; private ResourceManager() { requestList = new LinkedList<>(); } @VisibleForTesting public static ResourceManager instanceForTestingOnly() { return new ResourceManager(); } /** * Resets resource manager state and releases all thread locks. * Note - it does not reset available resources. Use separate call to setAvailableResources(). */ public synchronized void resetResourceUsage() { usedCpu = 0; usedRam = 0; usedIo = 0; usedLocalTestCount = 0; for (Pair request : requestList) { // CountDownLatch can be set only to 0 or 1. request.second.countDown(); } requestList.clear(); } /** * Sets available resources using given resource set. Must be called * at least once before using resource manager. */ public synchronized void setAvailableResources(ResourceSet resources) { Preconditions.checkNotNull(resources); staticResources = resources; availableResources = ResourceSet.create( staticResources.getMemoryMb() * this.ramUtilizationPercentage / 100.0, staticResources.getCpuUsage(), staticResources.getIoUsage(), staticResources.getLocalTestCount()); processWaitingThreads(); } /** * Specify how much of the available RAM we should allow to be used. */ public synchronized void setRamUtilizationPercentage(int percentage) { ramUtilizationPercentage = percentage; } /** * Acquires requested resource set. Will block if resource is not available. * NB! This method must be thread-safe! */ public ResourceHandle acquireResources(ActionExecutionMetadata owner, ResourceSet resources) throws InterruptedException { Preconditions.checkNotNull( resources, "acquireResources called with resources == NULL during %s", owner); Preconditions.checkState( !threadHasResources(), "acquireResources with existing resource lock during %s", owner); AutoProfiler p = profiled(owner, ProfilerTask.ACTION_LOCK); CountDownLatch latch = null; try { latch = acquire(resources); if (latch != null) { latch.await(); } } catch (InterruptedException e) { // Synchronize on this to avoid any racing with #processWaitingThreads synchronized (this) { if (latch.getCount() == 0) { // Resources already acquired by other side. Release them, but not inside this // synchronized block to avoid deadlock. release(resources); } else { // Inform other side that resources shouldn't be acquired. latch.countDown(); } } throw e; } threadLocked.set(true); // Profile acquisition only if it waited for resource to become available. if (latch != null) { p.complete(); } return new ResourceHandle(this, owner, resources); } /** * Acquires the given resources if available immediately. Does not block. * * @return a ResourceHandle iff the given resources were locked (all or nothing), null otherwise. */ @VisibleForTesting ResourceHandle tryAcquire(ActionExecutionMetadata owner, ResourceSet resources) { Preconditions.checkNotNull( resources, "tryAcquire called with resources == NULL during %s", owner); Preconditions.checkState( !threadHasResources(), "tryAcquire with existing resource lock during %s", owner); boolean acquired = false; synchronized (this) { if (areResourcesAvailable(resources)) { incrementResources(resources); acquired = true; } } if (acquired) { threadLocked.set(resources != ResourceSet.ZERO); return new ResourceHandle(this, owner, resources); } return null; } private void incrementResources(ResourceSet resources) { usedCpu += resources.getCpuUsage(); usedRam += resources.getMemoryMb(); usedIo += resources.getIoUsage(); usedLocalTestCount += resources.getLocalTestCount(); } /** * Return true if any resources have been claimed through this manager. */ public synchronized boolean inUse() { return usedCpu != 0.0 || usedRam != 0.0 || usedIo != 0.0 || usedLocalTestCount != 0 || !requestList.isEmpty(); } /** * Return true iff this thread has a lock on non-zero resources. */ public boolean threadHasResources() { return threadLocked.get(); } /** * Releases previously requested resource =. * *

NB! This method must be thread-safe! */ @VisibleForTesting void releaseResources(ActionExecutionMetadata owner, ResourceSet resources) { Preconditions.checkNotNull( resources, "releaseResources called with resources == NULL during %s", owner); Preconditions.checkState( threadHasResources(), "releaseResources without resource lock during %s", owner); boolean isConflict = false; AutoProfiler p = profiled(owner, ProfilerTask.ACTION_RELEASE); try { isConflict = release(resources); } finally { threadLocked.set(false); // Profile resource release only if it resolved at least one allocation request. if (isConflict) { p.complete(); } } } private synchronized CountDownLatch acquire(ResourceSet resources) { if (areResourcesAvailable(resources)) { incrementResources(resources); return null; } Pair request = new Pair<>(resources, new CountDownLatch(1)); requestList.add(request); return request.second; } private synchronized boolean release(ResourceSet resources) { usedCpu -= resources.getCpuUsage(); usedRam -= resources.getMemoryMb(); usedIo -= resources.getIoUsage(); usedLocalTestCount -= resources.getLocalTestCount(); // TODO(bazel-team): (2010) rounding error can accumulate and value below can end up being // e.g. 1E-15. So if it is small enough, we set it to 0. But maybe there is a better solution. double epsilon = 0.0001; if (usedCpu < epsilon) { usedCpu = 0; } if (usedRam < epsilon) { usedRam = 0; } if (usedIo < epsilon) { usedIo = 0; } if (!requestList.isEmpty()) { processWaitingThreads(); return true; } return false; } /** * Tries to unblock one or more waiting threads if there are sufficient resources available. */ private synchronized void processWaitingThreads() { Iterator> iterator = requestList.iterator(); while (iterator.hasNext()) { Pair request = iterator.next(); if (request.second.getCount() != 0) { if (areResourcesAvailable(request.first)) { incrementResources(request.first); request.second.countDown(); iterator.remove(); } } else { // Cancelled by other side. iterator.remove(); } } } // Method will return true if all requested resources are considered to be available. private boolean areResourcesAvailable(ResourceSet resources) { Preconditions.checkNotNull(availableResources); // Comparison below is robust, since any calculation errors will be fixed // by the release() method. if (usedCpu == 0.0 && usedRam == 0.0 && usedIo == 0.0 && usedLocalTestCount == 0) { return true; } // Use only MIN_NECESSARY_???_RATIO of the resource value to check for // allocation. This is necessary to account for the fact that most of the // requested resource sets use pessimistic estimations. Note that this // ratio is used only during comparison - for tracking we will actually // mark whole requested amount as used. double cpu = resources.getCpuUsage() * MIN_NECESSARY_CPU_RATIO; double ram = resources.getMemoryMb() * MIN_NECESSARY_RAM_RATIO; double io = resources.getIoUsage() * MIN_NECESSARY_IO_RATIO; int localTestCount = resources.getLocalTestCount(); double availableCpu = availableResources.getCpuUsage(); double availableRam = availableResources.getMemoryMb(); double availableIo = availableResources.getIoUsage(); int availableLocalTestCount = availableResources.getLocalTestCount(); // Resources are considered available if any one of the conditions below is true: // 1) If resource is not requested at all, it is available. // 2) If resource is not used at the moment, it is considered to be // available regardless of how much is requested. This is necessary to // ensure that at any given time, at least one thread is able to acquire // resources even if it requests more than available. // 3) If used resource amount is less than total available resource amount. boolean cpuIsAvailable = cpu == 0.0 || usedCpu == 0.0 || usedCpu + cpu <= availableCpu; boolean ramIsAvailable = ram == 0.0 || usedRam == 0.0 || usedRam + ram <= availableRam; boolean ioIsAvailable = io == 0.0 || usedIo == 0.0 || usedIo + io <= availableIo; boolean localTestCountIsAvailable = localTestCount == 0 || usedLocalTestCount == 0 || usedLocalTestCount + localTestCount <= availableLocalTestCount; return cpuIsAvailable && ramIsAvailable && ioIsAvailable && localTestCountIsAvailable; } @VisibleForTesting synchronized int getWaitCount() { return requestList.size(); } @VisibleForTesting synchronized boolean isAvailable(double ram, double cpu, double io, int localTestCount) { return areResourcesAvailable(ResourceSet.create(ram, cpu, io, localTestCount)); } }