diff options
author | Jan Tattermusch <jtattermusch@google.com> | 2017-11-20 16:53:10 +0100 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@google.com> | 2017-11-25 19:57:08 +0100 |
commit | 0f41e496d27ae0adbc4b7017fdc3802fc88ada6e (patch) | |
tree | 66f202baae816d7f80fcdcdbe57c08c3534f37a9 /src/csharp/Grpc.Core/Internal | |
parent | 1ee85d13a4d81cfdbfe944d3453ffb4854a0baec (diff) |
simple version of batchcontext pooling
Diffstat (limited to 'src/csharp/Grpc.Core/Internal')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 25 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs | 21 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 33 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs | 3 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CompletionRegistry.cs | 10 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs | 196 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/IObjectPool.cs | 35 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs | 3 |
9 files changed, 283 insertions, 45 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index aa2161267a..9946d1a6cf 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -92,23 +92,28 @@ namespace Grpc.Core.Internal } using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) - using (var ctx = BatchContextSafeHandle.Create()) { - call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); - - var ev = cq.Pluck(ctx.Handle); - - bool success = (ev.success != 0); + var ctx = details.Channel.Environment.BatchContextPool.Lease(); try { - using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + var ev = cq.Pluck(ctx.Handle); + bool success = (ev.success != 0); + try + { + using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + { + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + } + } + catch (Exception e) { - HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + Logger.Error(e, "Exception occured while invoking completion delegate."); } } - catch (Exception e) + finally { - Logger.Error(e, "Exception occured while invoking completion delegate."); + ctx.Recycle(); } } diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index be26b341ef..83385ad7d3 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -38,15 +38,18 @@ namespace Grpc.Core.Internal static readonly NativeMethods Native = NativeMethods.Get(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>(); + IObjectPool<BatchContextSafeHandle> ownedByPool; CompletionCallbackData completionCallbackData; private BatchContextSafeHandle() { } - public static BatchContextSafeHandle Create() + public static BatchContextSafeHandle Create(IObjectPool<BatchContextSafeHandle> ownedByPool = null) { - return Native.grpcsharp_batch_context_create(); + var ctx = Native.grpcsharp_batch_context_create(); + ctx.ownedByPool = ownedByPool; + return ctx; } public IntPtr Handle @@ -104,9 +107,17 @@ namespace Grpc.Core.Internal return Native.grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0; } - public void Reset() + public void Recycle() { - Native.grpcsharp_batch_context_reset(this); + if (ownedByPool != null) + { + Native.grpcsharp_batch_context_reset(this); + ownedByPool.Return(this); + } + else + { + Dispose(); + } } protected override bool ReleaseHandle() @@ -128,7 +139,7 @@ namespace Grpc.Core.Internal finally { completionCallbackData = default(CompletionCallbackData); - Dispose(); + Recycle(); } } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index d6a5ba586b..a3ef3e61ee 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -70,8 +70,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IUnaryResponseClientCallback, callback); Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags) .CheckOk(); } @@ -87,8 +86,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IUnaryResponseClientCallback, callback); Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray, callFlags).CheckOk(); } } @@ -97,8 +95,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedStatusOnClientCallback, callback); Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags).CheckOk(); } } @@ -107,8 +104,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedStatusOnClientCallback, callback); Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, callFlags).CheckOk(); } } @@ -117,8 +113,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata ? 1 : 0).CheckOk(); } } @@ -127,8 +122,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } } @@ -138,9 +132,8 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendStatusFromServerCompletionCallback, callback); var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero; - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendStatusFromServerCompletionCallback, callback); var statusDetailBytes = MarshalUtils.GetBytesUTF8(status.Detail); Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata ? 1 : 0, optionalPayload, optionalPayloadLength, writeFlags).CheckOk(); @@ -151,8 +144,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedMessageCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedMessageCallback, callback); Native.grpcsharp_call_recv_message(this, ctx).CheckOk(); } } @@ -161,8 +153,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedResponseHeadersCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedResponseHeadersCallback, callback); Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); } } @@ -171,8 +162,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedCloseOnServerCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedCloseOnServerCallback, callback); Native.grpcsharp_call_start_serverside(this, ctx).CheckOk(); } } @@ -181,8 +171,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index 1eeb0e3d97..cd5f8ed92e 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -66,8 +66,7 @@ namespace Grpc.Core.Internal public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback, object callbackState) { - var ctx = BatchContextSafeHandle.Create(); - cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback, callbackState); + var ctx = cq.CompletionRegistry.RegisterBatchCompletion(callback, callbackState); Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx); } diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs index b68655b33c..cf3f3c0995 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs @@ -36,13 +36,15 @@ namespace Grpc.Core.Internal static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<CompletionRegistry>(); readonly GrpcEnvironment environment; + readonly Func<BatchContextSafeHandle> batchContextFactory; readonly Dictionary<IntPtr, IOpCompletionCallback> dict = new Dictionary<IntPtr, IOpCompletionCallback>(new IntPtrComparer()); SpinLock spinLock = new SpinLock(Debugger.IsAttached); IntPtr lastRegisteredKey; // only for testing - public CompletionRegistry(GrpcEnvironment environment) + public CompletionRegistry(GrpcEnvironment environment, Func<BatchContextSafeHandle> batchContextFactory) { - this.environment = environment; + this.environment = GrpcPreconditions.CheckNotNull(environment); + this.batchContextFactory = GrpcPreconditions.CheckNotNull(batchContextFactory); } public void Register(IntPtr key, IOpCompletionCallback callback) @@ -63,10 +65,12 @@ namespace Grpc.Core.Internal } } - public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback, object state) + public BatchContextSafeHandle RegisterBatchCompletion(BatchCompletionDelegate callback, object state) { + var ctx = batchContextFactory(); ctx.SetCompletionCallback(callback, state); Register(ctx.Handle, ctx); + return ctx; } public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback) diff --git a/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs new file mode 100644 index 0000000000..37c9ae06fe --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs @@ -0,0 +1,196 @@ +#region Copyright notice and license + +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Collections.Generic; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Pool of objects that combines a shared pool and a thread local pool. + /// </summary> + internal class DefaultObjectPool<T> : IObjectPool<T> + where T : class, IDisposable + { + readonly object myLock = new object(); + readonly Func<T> itemFactory; + + // Queue shared between threads, access needs to be synchronized. + readonly Queue<T> sharedQueue; + readonly int sharedCapacity; + + readonly ThreadLocal<ThreadLocalData> threadLocalData; + readonly int threadLocalCapacity; + readonly int rentLimit; + + bool disposed; + + /// <summary> + /// Initializes a new instance of <c>DefaultObjectPool</c> with given shared capacity and thread local capacity. + /// Thread local capacity should be significantly smaller than the shared capacity as we don't guarantee immediately + /// disposing the objects in the thread local pool after this pool is disposed (they will eventually be garbage collected + /// after the thread that owns them has finished). + /// On average, the shared pool will only be accessed approx. once for every <c>threadLocalCapacity / 2</c> rent or lease + /// operations. + /// </summary> + public DefaultObjectPool(Func<T> itemFactory, int sharedCapacity, int threadLocalCapacity) + { + GrpcPreconditions.CheckArgument(sharedCapacity >= 0); + GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0); + this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory)); + this.sharedQueue = new Queue<T>(sharedCapacity); + this.sharedCapacity = sharedCapacity; + this.threadLocalData = new ThreadLocal<ThreadLocalData>(() => new ThreadLocalData(threadLocalCapacity), false); + this.threadLocalCapacity = threadLocalCapacity; + this.rentLimit = threadLocalCapacity / 2; + } + + /// <summary> + /// Leases an item from the pool or creates a new instance if the pool is empty. + /// Attempts to retrieve the item from the thread local pool first. + /// If the thread local pool is empty, the item is taken from the shared pool + /// along with more items that are moved to the thread local pool to avoid + /// prevent acquiring the lock for shared pool too often. + /// The methods should not be called after the pool is disposed, but it won't + /// results in an error to do so (after depleting the items potentially left + /// in the thread local pool, it will continue returning new objects created by the factory). + /// </summary> + public T Lease() + { + var localData = threadLocalData.Value; + if (localData.Queue.Count > 0) + { + return localData.Queue.Dequeue(); + } + if (localData.CreateBudget > 0) + { + localData.CreateBudget --; + return itemFactory(); + } + + int itemsMoved = 0; + T leasedItem = null; + lock(myLock) + { + if (sharedQueue.Count > 0) + { + leasedItem = sharedQueue.Dequeue(); + } + while (sharedQueue.Count > 0 && itemsMoved < rentLimit) + { + localData.Queue.Enqueue(sharedQueue.Dequeue()); + itemsMoved ++; + } + } + + // If the shared pool didn't contain all rentLimit items, + // next time we try to lease we will just create those + // instead of trying to grab them from the shared queue. + // This is to guarantee we won't be accessing the shared queue too often. + localData.CreateBudget += rentLimit - itemsMoved; + + return leasedItem ?? itemFactory(); + } + + /// <summary> + /// Returns an item to the pool. + /// Attempts to add the item to the thread local pool first. + /// If the thread local pool is full, item is added to a shared pool, + /// along with half of the items for the thread local pool, which + /// should prevent acquiring the lock for shared pool too often. + /// If called after the pool is disposed, we make best effort not to + /// add anything to the thread local pool and we guarantee not to add + /// anything to the shared pool (items will be disposed instead). + /// </summary> + public void Return(T item) + { + GrpcPreconditions.CheckNotNull(item); + + var localData = threadLocalData.Value; + if (localData.Queue.Count < threadLocalCapacity && !disposed) + { + localData.Queue.Enqueue(item); + return; + } + if (localData.DisposeBudget > 0) + { + localData.DisposeBudget --; + item.Dispose(); + return; + } + + int itemsReturned = 0; + int returnLimit = rentLimit + 1; + lock (myLock) + { + if (sharedQueue.Count < sharedCapacity && !disposed) + { + sharedQueue.Enqueue(item); + itemsReturned ++; + } + while (sharedQueue.Count < sharedCapacity && itemsReturned < returnLimit && !disposed) + { + sharedQueue.Enqueue(localData.Queue.Dequeue()); + itemsReturned ++; + } + } + + // If the shared pool could not accomodate all returnLimit items, + // next time we try to return we will just dispose the item + // instead of trying to return them to the shared queue. + // This is to guarantee we won't be accessing the shared queue too often. + localData.DisposeBudget += returnLimit - itemsReturned; + + if (itemsReturned == 0) + { + localData.DisposeBudget --; + item.Dispose(); + } + } + + public void Dispose() + { + lock (myLock) + { + if (!disposed) + { + disposed = true; + + while (sharedQueue.Count > 0) + { + sharedQueue.Dequeue().Dispose(); + } + } + } + } + + class ThreadLocalData + { + public ThreadLocalData(int capacity) + { + this.Queue = new Queue<T>(capacity); + } + + public Queue<T> Queue { get; } + public int CreateBudget { get; set; } + public int DisposeBudget { get; set; } + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index bd0229a9dd..f1b5a4f9ff 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -219,7 +219,7 @@ namespace Grpc.Core.Internal var list = new List<CompletionQueueSafeHandle>(); for (int i = 0; i < completionQueueCount; i++) { - var completionRegistry = new CompletionRegistry(environment); + var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease()); list.Add(CompletionQueueSafeHandle.CreateAsync(completionRegistry)); } return list.AsReadOnly(); diff --git a/src/csharp/Grpc.Core/Internal/IObjectPool.cs b/src/csharp/Grpc.Core/Internal/IObjectPool.cs new file mode 100644 index 0000000000..f7d6e30a2a --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/IObjectPool.cs @@ -0,0 +1,35 @@ +#region Copyright notice and license + +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Collections.Generic; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Pool of objects. + /// </summary> + internal interface IObjectPool<T> : IDisposable + where T : class + { + T Lease(); + void Return(T item); + } +} diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index a308890cde..9b7ea884dd 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -64,10 +64,9 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); // TODO(jtattermusch): delegate allocation by caller can be avoided by utilizing the "state" object, // but server shutdown isn't worth optimizing right now. - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback, null); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(callback, null); Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx); } } |