1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
#region Copyright notice and license
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading;
using System.Collections.Generic;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// Pool of objects that combines a shared pool and a thread local pool.
/// </summary>
internal class DefaultObjectPool<T> : IObjectPool<T>
where T : class, IDisposable
{
readonly object myLock = new object();
readonly Func<T> itemFactory;
// Queue shared between threads, access needs to be synchronized.
readonly Queue<T> sharedQueue;
readonly int sharedCapacity;
readonly ThreadLocal<ThreadLocalData> threadLocalData;
readonly int threadLocalCapacity;
readonly int rentLimit;
bool disposed;
/// <summary>
/// Initializes a new instance of <c>DefaultObjectPool</c> with given shared capacity and thread local capacity.
/// Thread local capacity should be significantly smaller than the shared capacity as we don't guarantee immediately
/// disposing the objects in the thread local pool after this pool is disposed (they will eventually be garbage collected
/// after the thread that owns them has finished).
/// On average, the shared pool will only be accessed approx. once for every <c>threadLocalCapacity / 2</c> rent or lease
/// operations.
/// </summary>
public DefaultObjectPool(Func<T> itemFactory, int sharedCapacity, int threadLocalCapacity)
{
GrpcPreconditions.CheckArgument(sharedCapacity >= 0);
GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0);
this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory));
this.sharedQueue = new Queue<T>(sharedCapacity);
this.sharedCapacity = sharedCapacity;
this.threadLocalData = new ThreadLocal<ThreadLocalData>(() => new ThreadLocalData(threadLocalCapacity), false);
this.threadLocalCapacity = threadLocalCapacity;
this.rentLimit = threadLocalCapacity != 1 ? threadLocalCapacity / 2 : 1;
}
/// <summary>
/// Leases an item from the pool or creates a new instance if the pool is empty.
/// Attempts to retrieve the item from the thread local pool first.
/// If the thread local pool is empty, the item is taken from the shared pool
/// along with more items that are moved to the thread local pool to avoid
/// prevent acquiring the lock for shared pool too often.
/// The methods should not be called after the pool is disposed, but it won't
/// results in an error to do so (after depleting the items potentially left
/// in the thread local pool, it will continue returning new objects created by the factory).
/// </summary>
public T Lease()
{
var localData = threadLocalData.Value;
if (localData.Queue.Count > 0)
{
return localData.Queue.Dequeue();
}
if (localData.CreateBudget > 0)
{
localData.CreateBudget --;
return itemFactory();
}
int itemsMoved = 0;
T leasedItem = null;
lock(myLock)
{
if (sharedQueue.Count > 0)
{
leasedItem = sharedQueue.Dequeue();
}
while (sharedQueue.Count > 0 && itemsMoved < rentLimit)
{
localData.Queue.Enqueue(sharedQueue.Dequeue());
itemsMoved ++;
}
}
// If the shared pool didn't contain all rentLimit items,
// next time we try to lease we will just create those
// instead of trying to grab them from the shared queue.
// This is to guarantee we won't be accessing the shared queue too often.
localData.CreateBudget = rentLimit - itemsMoved;
return leasedItem ?? itemFactory();
}
/// <summary>
/// Returns an item to the pool.
/// Attempts to add the item to the thread local pool first.
/// If the thread local pool is full, item is added to a shared pool,
/// along with half of the items for the thread local pool, which
/// should prevent acquiring the lock for shared pool too often.
/// If called after the pool is disposed, we make best effort not to
/// add anything to the thread local pool and we guarantee not to add
/// anything to the shared pool (items will be disposed instead).
/// </summary>
public void Return(T item)
{
GrpcPreconditions.CheckNotNull(item);
var localData = threadLocalData.Value;
if (localData.Queue.Count < threadLocalCapacity && !disposed)
{
localData.Queue.Enqueue(item);
return;
}
if (localData.DisposeBudget > 0)
{
localData.DisposeBudget --;
item.Dispose();
return;
}
int itemsReturned = 0;
int returnLimit = rentLimit + 1;
lock (myLock)
{
if (sharedQueue.Count < sharedCapacity && !disposed)
{
sharedQueue.Enqueue(item);
itemsReturned ++;
}
while (sharedQueue.Count < sharedCapacity && itemsReturned < returnLimit && !disposed)
{
sharedQueue.Enqueue(localData.Queue.Dequeue());
itemsReturned ++;
}
}
// If the shared pool could not accomodate all returnLimit items,
// next time we try to return we will just dispose the item
// instead of trying to return them to the shared queue.
// This is to guarantee we won't be accessing the shared queue too often.
localData.DisposeBudget = returnLimit - itemsReturned;
if (itemsReturned == 0)
{
localData.DisposeBudget --;
item.Dispose();
}
}
public void Dispose()
{
lock (myLock)
{
if (!disposed)
{
disposed = true;
while (sharedQueue.Count > 0)
{
sharedQueue.Dequeue().Dispose();
}
}
}
}
class ThreadLocalData
{
public ThreadLocalData(int capacity)
{
this.Queue = new Queue<T>(capacity);
}
public Queue<T> Queue { get; }
public int CreateBudget { get; set; }
public int DisposeBudget { get; set; }
}
}
}
|