#region Copyright notice and license // Copyright 2017 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #endregion using System; using System.Threading; using System.Collections.Generic; using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// /// Pool of objects that combines a shared pool and a thread local pool. /// internal class DefaultObjectPool : IObjectPool where T : class, IPooledObject { readonly object myLock = new object(); readonly Action returnAction; readonly Func itemFactory; // Queue shared between threads, access needs to be synchronized. readonly Queue sharedQueue; readonly int sharedCapacity; readonly ThreadLocal threadLocalData; readonly int threadLocalCapacity; readonly int rentLimit; bool disposed; /// /// Initializes a new instance of DefaultObjectPool with given shared capacity and thread local capacity. /// Thread local capacity should be significantly smaller than the shared capacity as we don't guarantee immediately /// disposing the objects in the thread local pool after this pool is disposed (they will eventually be garbage collected /// after the thread that owns them has finished). /// On average, the shared pool will only be accessed approx. once for every threadLocalCapacity / 2 rent or lease /// operations. /// public DefaultObjectPool(Func itemFactory, int sharedCapacity, int threadLocalCapacity) { GrpcPreconditions.CheckArgument(sharedCapacity >= 0); GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0); this.returnAction = Return; this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory)); this.sharedQueue = new Queue(sharedCapacity); this.sharedCapacity = sharedCapacity; this.threadLocalData = new ThreadLocal(() => new ThreadLocalData(threadLocalCapacity), false); this.threadLocalCapacity = threadLocalCapacity; this.rentLimit = threadLocalCapacity != 1 ? threadLocalCapacity / 2 : 1; } /// /// Leases an item from the pool or creates a new instance if the pool is empty. /// Attempts to retrieve the item from the thread local pool first. /// If the thread local pool is empty, the item is taken from the shared pool /// along with more items that are moved to the thread local pool to avoid /// prevent acquiring the lock for shared pool too often. /// The methods should not be called after the pool is disposed, but it won't /// results in an error to do so (after depleting the items potentially left /// in the thread local pool, it will continue returning new objects created by the factory). /// public T Lease() { var item = LeaseInternal(); item.SetReturnToPoolAction(returnAction); return item; } private T LeaseInternal() { var localData = threadLocalData.Value; if (localData.Queue.Count > 0) { return localData.Queue.Dequeue(); } if (localData.CreateBudget > 0) { localData.CreateBudget --; return itemFactory(); } int itemsMoved = 0; T leasedItem = null; lock(myLock) { if (sharedQueue.Count > 0) { leasedItem = sharedQueue.Dequeue(); } while (sharedQueue.Count > 0 && itemsMoved < rentLimit) { localData.Queue.Enqueue(sharedQueue.Dequeue()); itemsMoved ++; } } // If the shared pool didn't contain all rentLimit items, // next time we try to lease we will just create those // instead of trying to grab them from the shared queue. // This is to guarantee we won't be accessing the shared queue too often. localData.CreateBudget = rentLimit - itemsMoved; return leasedItem ?? itemFactory(); } /// /// Returns an item to the pool. /// Attempts to add the item to the thread local pool first. /// If the thread local pool is full, item is added to a shared pool, /// along with half of the items for the thread local pool, which /// should prevent acquiring the lock for shared pool too often. /// If called after the pool is disposed, we make best effort not to /// add anything to the thread local pool and we guarantee not to add /// anything to the shared pool (items will be disposed instead). /// public void Return(T item) { GrpcPreconditions.CheckNotNull(item); var localData = threadLocalData.Value; if (localData.Queue.Count < threadLocalCapacity && !disposed) { localData.Queue.Enqueue(item); return; } if (localData.DisposeBudget > 0) { localData.DisposeBudget --; item.Dispose(); return; } int itemsReturned = 0; int returnLimit = rentLimit + 1; lock (myLock) { if (sharedQueue.Count < sharedCapacity && !disposed) { sharedQueue.Enqueue(item); itemsReturned ++; } while (sharedQueue.Count < sharedCapacity && itemsReturned < returnLimit && !disposed) { sharedQueue.Enqueue(localData.Queue.Dequeue()); itemsReturned ++; } } // If the shared pool could not accomodate all returnLimit items, // next time we try to return we will just dispose the item // instead of trying to return them to the shared queue. // This is to guarantee we won't be accessing the shared queue too often. localData.DisposeBudget = returnLimit - itemsReturned; if (itemsReturned == 0) { localData.DisposeBudget --; item.Dispose(); } } public void Dispose() { lock (myLock) { if (!disposed) { disposed = true; while (sharedQueue.Count > 0) { sharedQueue.Dequeue().Dispose(); } } } } class ThreadLocalData { public ThreadLocalData(int capacity) { this.Queue = new Queue(capacity); } public Queue Queue { get; } public int CreateBudget { get; set; } public int DisposeBudget { get; set; } } } }