From 1ee85d13a4d81cfdbfe944d3453ffb4854a0baec Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 26 Apr 2017 10:58:48 +0200 Subject: expose batchcontext.Reset --- src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs | 5 +++++ src/csharp/Grpc.Core/Internal/NativeMethods.cs | 3 +++ 2 files changed, 8 insertions(+) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index 1e6f1fba37..be26b341ef 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -104,6 +104,11 @@ namespace Grpc.Core.Internal return Native.grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0; } + public void Reset() + { + Native.grpcsharp_batch_context_reset(this); + } + protected override bool ReleaseHandle() { Native.grpcsharp_batch_context_destroy(handle); diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index d517252cfe..43acb8f915 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -52,6 +52,7 @@ namespace Grpc.Core.Internal public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate grpcsharp_batch_context_recv_status_on_client_details; public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate grpcsharp_batch_context_recv_status_on_client_trailing_metadata; public readonly Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate grpcsharp_batch_context_recv_close_on_server_cancelled; + public readonly Delegates.grpcsharp_batch_context_reset_delegate grpcsharp_batch_context_reset; public readonly Delegates.grpcsharp_batch_context_destroy_delegate grpcsharp_batch_context_destroy; public readonly Delegates.grpcsharp_request_call_context_create_delegate grpcsharp_request_call_context_create; @@ -169,6 +170,7 @@ namespace Grpc.Core.Internal this.grpcsharp_batch_context_recv_status_on_client_details = GetMethodDelegate(library); this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = GetMethodDelegate(library); this.grpcsharp_batch_context_recv_close_on_server_cancelled = GetMethodDelegate(library); + this.grpcsharp_batch_context_reset = GetMethodDelegate(library); this.grpcsharp_batch_context_destroy = GetMethodDelegate(library); this.grpcsharp_request_call_context_create = GetMethodDelegate(library); @@ -311,6 +313,7 @@ namespace Grpc.Core.Internal public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_details_delegate(BatchContextSafeHandle ctx, out UIntPtr detailsLength); public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate(BatchContextSafeHandle ctx); public delegate int grpcsharp_batch_context_recv_close_on_server_cancelled_delegate(BatchContextSafeHandle ctx); + public delegate void grpcsharp_batch_context_reset_delegate(BatchContextSafeHandle ctx); public delegate void grpcsharp_batch_context_destroy_delegate(IntPtr ctx); public delegate RequestCallContextSafeHandle grpcsharp_request_call_context_create_delegate(); -- cgit v1.2.3 From 0f41e496d27ae0adbc4b7017fdc3802fc88ada6e Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 20 Nov 2017 16:53:10 +0100 Subject: simple version of batchcontext pooling --- .../Internal/CompletionQueueSafeHandleTest.cs | 2 +- src/csharp/Grpc.Core/GrpcEnvironment.cs | 6 + src/csharp/Grpc.Core/Internal/AsyncCall.cs | 25 +-- .../Grpc.Core/Internal/BatchContextSafeHandle.cs | 21 ++- src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 33 ++-- src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs | 3 +- .../Grpc.Core/Internal/CompletionRegistry.cs | 10 +- src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs | 196 +++++++++++++++++++++ src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs | 2 +- src/csharp/Grpc.Core/Internal/IObjectPool.cs | 35 ++++ src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs | 3 +- .../CompletionRegistryBenchmark.cs | 6 +- 12 files changed, 293 insertions(+), 49 deletions(-) create mode 100644 src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs create mode 100644 src/csharp/Grpc.Core/Internal/IObjectPool.cs (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs index 1d9475a8b8..775c950c8c 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs @@ -40,7 +40,7 @@ namespace Grpc.Core.Internal.Tests public void CreateAsyncAndShutdown() { var env = GrpcEnvironment.AddRef(); - var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env)); + var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env, () => BatchContextSafeHandle.Create())); cq.Shutdown(); var ev = cq.Next(); cq.Dispose(); diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 80031cb7ef..8f69b3a967 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -45,6 +45,7 @@ namespace Grpc.Core static ILogger logger = new LogLevelFilterLogger(new ConsoleLogger(), LogLevel.Off, true); + readonly IObjectPool batchContextPool; readonly GrpcThreadPool threadPool; readonly DebugStats debugStats = new DebugStats(); readonly AtomicCounter cqPickerCounter = new AtomicCounter(); @@ -249,6 +250,8 @@ namespace Grpc.Core private GrpcEnvironment() { GrpcNativeInit(); + // TODO(jtattermusch): configure params + batchContextPool = new DefaultObjectPool(() => BatchContextSafeHandle.Create(this.batchContextPool), 10000, 64); threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers); threadPool.Start(); } @@ -264,6 +267,8 @@ namespace Grpc.Core } } + internal IObjectPool BatchContextPool => batchContextPool; + internal bool IsAlive { get @@ -325,6 +330,7 @@ namespace Grpc.Core await Task.Run(() => ShuttingDown?.Invoke(this, null)).ConfigureAwait(false); await threadPool.StopAsync().ConfigureAwait(false); + batchContextPool.Dispose(); GrpcNativeShutdown(); isShutdown = true; diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index aa2161267a..9946d1a6cf 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -92,23 +92,28 @@ namespace Grpc.Core.Internal } using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) - using (var ctx = BatchContextSafeHandle.Create()) { - call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); - - var ev = cq.Pluck(ctx.Handle); - - bool success = (ev.success != 0); + var ctx = details.Channel.Environment.BatchContextPool.Lease(); try { - using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + var ev = cq.Pluck(ctx.Handle); + bool success = (ev.success != 0); + try + { + using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + { + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + } + } + catch (Exception e) { - HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + Logger.Error(e, "Exception occured while invoking completion delegate."); } } - catch (Exception e) + finally { - Logger.Error(e, "Exception occured while invoking completion delegate."); + ctx.Recycle(); } } diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index be26b341ef..83385ad7d3 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -38,15 +38,18 @@ namespace Grpc.Core.Internal static readonly NativeMethods Native = NativeMethods.Get(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); + IObjectPool ownedByPool; CompletionCallbackData completionCallbackData; private BatchContextSafeHandle() { } - public static BatchContextSafeHandle Create() + public static BatchContextSafeHandle Create(IObjectPool ownedByPool = null) { - return Native.grpcsharp_batch_context_create(); + var ctx = Native.grpcsharp_batch_context_create(); + ctx.ownedByPool = ownedByPool; + return ctx; } public IntPtr Handle @@ -104,9 +107,17 @@ namespace Grpc.Core.Internal return Native.grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0; } - public void Reset() + public void Recycle() { - Native.grpcsharp_batch_context_reset(this); + if (ownedByPool != null) + { + Native.grpcsharp_batch_context_reset(this); + ownedByPool.Return(this); + } + else + { + Dispose(); + } } protected override bool ReleaseHandle() @@ -128,7 +139,7 @@ namespace Grpc.Core.Internal finally { completionCallbackData = default(CompletionCallbackData); - Dispose(); + Recycle(); } } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index d6a5ba586b..a3ef3e61ee 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -70,8 +70,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IUnaryResponseClientCallback, callback); Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags) .CheckOk(); } @@ -87,8 +86,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IUnaryResponseClientCallback, callback); Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray, callFlags).CheckOk(); } } @@ -97,8 +95,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedStatusOnClientCallback, callback); Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags).CheckOk(); } } @@ -107,8 +104,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedStatusOnClientCallback, callback); Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, callFlags).CheckOk(); } } @@ -117,8 +113,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata ? 1 : 0).CheckOk(); } } @@ -127,8 +122,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } } @@ -138,9 +132,8 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendStatusFromServerCompletionCallback, callback); var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero; - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendStatusFromServerCompletionCallback, callback); var statusDetailBytes = MarshalUtils.GetBytesUTF8(status.Detail); Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata ? 1 : 0, optionalPayload, optionalPayloadLength, writeFlags).CheckOk(); @@ -151,8 +144,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedMessageCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedMessageCallback, callback); Native.grpcsharp_call_recv_message(this, ctx).CheckOk(); } } @@ -161,8 +153,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedResponseHeadersCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedResponseHeadersCallback, callback); Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); } } @@ -171,8 +162,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedCloseOnServerCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedCloseOnServerCallback, callback); Native.grpcsharp_call_start_serverside(this, ctx).CheckOk(); } } @@ -181,8 +171,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index 1eeb0e3d97..cd5f8ed92e 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -66,8 +66,7 @@ namespace Grpc.Core.Internal public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback, object callbackState) { - var ctx = BatchContextSafeHandle.Create(); - cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback, callbackState); + var ctx = cq.CompletionRegistry.RegisterBatchCompletion(callback, callbackState); Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx); } diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs index b68655b33c..cf3f3c0995 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs @@ -36,13 +36,15 @@ namespace Grpc.Core.Internal static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); readonly GrpcEnvironment environment; + readonly Func batchContextFactory; readonly Dictionary dict = new Dictionary(new IntPtrComparer()); SpinLock spinLock = new SpinLock(Debugger.IsAttached); IntPtr lastRegisteredKey; // only for testing - public CompletionRegistry(GrpcEnvironment environment) + public CompletionRegistry(GrpcEnvironment environment, Func batchContextFactory) { - this.environment = environment; + this.environment = GrpcPreconditions.CheckNotNull(environment); + this.batchContextFactory = GrpcPreconditions.CheckNotNull(batchContextFactory); } public void Register(IntPtr key, IOpCompletionCallback callback) @@ -63,10 +65,12 @@ namespace Grpc.Core.Internal } } - public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback, object state) + public BatchContextSafeHandle RegisterBatchCompletion(BatchCompletionDelegate callback, object state) { + var ctx = batchContextFactory(); ctx.SetCompletionCallback(callback, state); Register(ctx.Handle, ctx); + return ctx; } public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback) diff --git a/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs new file mode 100644 index 0000000000..37c9ae06fe --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs @@ -0,0 +1,196 @@ +#region Copyright notice and license + +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Collections.Generic; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// + /// Pool of objects that combines a shared pool and a thread local pool. + /// + internal class DefaultObjectPool : IObjectPool + where T : class, IDisposable + { + readonly object myLock = new object(); + readonly Func itemFactory; + + // Queue shared between threads, access needs to be synchronized. + readonly Queue sharedQueue; + readonly int sharedCapacity; + + readonly ThreadLocal threadLocalData; + readonly int threadLocalCapacity; + readonly int rentLimit; + + bool disposed; + + /// + /// Initializes a new instance of DefaultObjectPool with given shared capacity and thread local capacity. + /// Thread local capacity should be significantly smaller than the shared capacity as we don't guarantee immediately + /// disposing the objects in the thread local pool after this pool is disposed (they will eventually be garbage collected + /// after the thread that owns them has finished). + /// On average, the shared pool will only be accessed approx. once for every threadLocalCapacity / 2 rent or lease + /// operations. + /// + public DefaultObjectPool(Func itemFactory, int sharedCapacity, int threadLocalCapacity) + { + GrpcPreconditions.CheckArgument(sharedCapacity >= 0); + GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0); + this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory)); + this.sharedQueue = new Queue(sharedCapacity); + this.sharedCapacity = sharedCapacity; + this.threadLocalData = new ThreadLocal(() => new ThreadLocalData(threadLocalCapacity), false); + this.threadLocalCapacity = threadLocalCapacity; + this.rentLimit = threadLocalCapacity / 2; + } + + /// + /// Leases an item from the pool or creates a new instance if the pool is empty. + /// Attempts to retrieve the item from the thread local pool first. + /// If the thread local pool is empty, the item is taken from the shared pool + /// along with more items that are moved to the thread local pool to avoid + /// prevent acquiring the lock for shared pool too often. + /// The methods should not be called after the pool is disposed, but it won't + /// results in an error to do so (after depleting the items potentially left + /// in the thread local pool, it will continue returning new objects created by the factory). + /// + public T Lease() + { + var localData = threadLocalData.Value; + if (localData.Queue.Count > 0) + { + return localData.Queue.Dequeue(); + } + if (localData.CreateBudget > 0) + { + localData.CreateBudget --; + return itemFactory(); + } + + int itemsMoved = 0; + T leasedItem = null; + lock(myLock) + { + if (sharedQueue.Count > 0) + { + leasedItem = sharedQueue.Dequeue(); + } + while (sharedQueue.Count > 0 && itemsMoved < rentLimit) + { + localData.Queue.Enqueue(sharedQueue.Dequeue()); + itemsMoved ++; + } + } + + // If the shared pool didn't contain all rentLimit items, + // next time we try to lease we will just create those + // instead of trying to grab them from the shared queue. + // This is to guarantee we won't be accessing the shared queue too often. + localData.CreateBudget += rentLimit - itemsMoved; + + return leasedItem ?? itemFactory(); + } + + /// + /// Returns an item to the pool. + /// Attempts to add the item to the thread local pool first. + /// If the thread local pool is full, item is added to a shared pool, + /// along with half of the items for the thread local pool, which + /// should prevent acquiring the lock for shared pool too often. + /// If called after the pool is disposed, we make best effort not to + /// add anything to the thread local pool and we guarantee not to add + /// anything to the shared pool (items will be disposed instead). + /// + public void Return(T item) + { + GrpcPreconditions.CheckNotNull(item); + + var localData = threadLocalData.Value; + if (localData.Queue.Count < threadLocalCapacity && !disposed) + { + localData.Queue.Enqueue(item); + return; + } + if (localData.DisposeBudget > 0) + { + localData.DisposeBudget --; + item.Dispose(); + return; + } + + int itemsReturned = 0; + int returnLimit = rentLimit + 1; + lock (myLock) + { + if (sharedQueue.Count < sharedCapacity && !disposed) + { + sharedQueue.Enqueue(item); + itemsReturned ++; + } + while (sharedQueue.Count < sharedCapacity && itemsReturned < returnLimit && !disposed) + { + sharedQueue.Enqueue(localData.Queue.Dequeue()); + itemsReturned ++; + } + } + + // If the shared pool could not accomodate all returnLimit items, + // next time we try to return we will just dispose the item + // instead of trying to return them to the shared queue. + // This is to guarantee we won't be accessing the shared queue too often. + localData.DisposeBudget += returnLimit - itemsReturned; + + if (itemsReturned == 0) + { + localData.DisposeBudget --; + item.Dispose(); + } + } + + public void Dispose() + { + lock (myLock) + { + if (!disposed) + { + disposed = true; + + while (sharedQueue.Count > 0) + { + sharedQueue.Dequeue().Dispose(); + } + } + } + } + + class ThreadLocalData + { + public ThreadLocalData(int capacity) + { + this.Queue = new Queue(capacity); + } + + public Queue Queue { get; } + public int CreateBudget { get; set; } + public int DisposeBudget { get; set; } + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index bd0229a9dd..f1b5a4f9ff 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -219,7 +219,7 @@ namespace Grpc.Core.Internal var list = new List(); for (int i = 0; i < completionQueueCount; i++) { - var completionRegistry = new CompletionRegistry(environment); + var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease()); list.Add(CompletionQueueSafeHandle.CreateAsync(completionRegistry)); } return list.AsReadOnly(); diff --git a/src/csharp/Grpc.Core/Internal/IObjectPool.cs b/src/csharp/Grpc.Core/Internal/IObjectPool.cs new file mode 100644 index 0000000000..f7d6e30a2a --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/IObjectPool.cs @@ -0,0 +1,35 @@ +#region Copyright notice and license + +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Collections.Generic; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// + /// Pool of objects. + /// + internal interface IObjectPool : IDisposable + where T : class + { + T Lease(); + void Return(T item); + } +} diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index a308890cde..9b7ea884dd 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -64,10 +64,9 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); // TODO(jtattermusch): delegate allocation by caller can be avoided by utilizing the "state" object, // but server shutdown isn't worth optimizing right now. - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback, null); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(callback, null); Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx); } } diff --git a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs index 2d1c33e9a0..eefdb50e39 100644 --- a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs +++ b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs @@ -43,7 +43,7 @@ namespace Grpc.Microbenchmarks public void Run(int threadCount, int iterations, bool useSharedRegistry) { Console.WriteLine(string.Format("CompletionRegistryBenchmark: threads={0}, iterations={1}, useSharedRegistry={2}", threadCount, iterations, useSharedRegistry)); - CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment) : null; + CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create()) : null; var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, sharedRegistry)); threadedBenchmark.Run(); // TODO: parametrize by number of pending completions @@ -51,7 +51,7 @@ namespace Grpc.Microbenchmarks private void ThreadBody(int iterations, CompletionRegistry optionalSharedRegistry) { - var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment); + var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create()); var ctx = BatchContextSafeHandle.Create(); var stopwatch = Stopwatch.StartNew(); @@ -64,7 +64,7 @@ namespace Grpc.Microbenchmarks stopwatch.Stop(); Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds); - ctx.Dispose(); + ctx.Recycle(); } private class NopCompletionCallback : IOpCompletionCallback -- cgit v1.2.3 From 2e631706e2b2c2a3190e039ee26781fcd0483b40 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Sat, 25 Nov 2017 19:37:45 +0100 Subject: expose batchcontextpool settings --- src/csharp/Grpc.Core/GrpcEnvironment.cs | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 8f69b3a967..2b1b5e32d7 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -33,6 +33,8 @@ namespace Grpc.Core public class GrpcEnvironment { const int MinDefaultThreadPoolSize = 4; + const int DefaultBatchContextPoolSharedCapacity = 10000; + const int DefaultBatchContextPoolThreadLocalCapacity = 64; static object staticLock = new object(); static GrpcEnvironment instance; @@ -40,6 +42,8 @@ namespace Grpc.Core static int? customThreadPoolSize; static int? customCompletionQueueCount; static bool inlineHandlers; + static int batchContextPoolSharedCapacity = DefaultBatchContextPoolSharedCapacity; + static int batchContextPoolThreadLocalCapacity = DefaultBatchContextPoolThreadLocalCapacity; static readonly HashSet registeredChannels = new HashSet(); static readonly HashSet registeredServers = new HashSet(); @@ -187,7 +191,7 @@ namespace Grpc.Core /// /// Sets the number of threads in the gRPC thread pool that polls for internal RPC events. - /// Can be only invoke before the GrpcEnviroment is started and cannot be changed afterwards. + /// Can be only invoked before the GrpcEnviroment is started and cannot be changed afterwards. /// Setting thread pool size is an advanced setting and you should only use it if you know what you are doing. /// Most users should rely on the default value provided by gRPC library. /// Note: this method is part of an experimental API that can change or be removed without any prior notice. @@ -204,7 +208,7 @@ namespace Grpc.Core /// /// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events. - /// Can be only invoke before the GrpcEnviroment is started and cannot be changed afterwards. + /// Can be only invoked before the GrpcEnviroment is started and cannot be changed afterwards. /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing. /// Most users should rely on the default value provided by gRPC library. /// Note: this method is part of an experimental API that can change or be removed without any prior notice. @@ -238,6 +242,26 @@ namespace Grpc.Core } } + /// + /// Sets the parameters for a pool that caches batch context instances. Reusing batch context instances + /// instead of creating a new one for every C core operation helps reducing the GC pressure. + /// Can be only invoked before the GrpcEnviroment is started and cannot be changed afterwards. + /// This is an advanced setting and you should only use it if you know what you are doing. + /// Most users should rely on the default value provided by gRPC library. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// + public static void SetBatchContextPoolParams(int sharedCapacity, int threadLocalCapacity) + { + lock (staticLock) + { + GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); + GrpcPreconditions.CheckArgument(sharedCapacity >= 0, "Shared capacity needs to be a non-negative number"); + GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0, "Thread local capacity needs to be a non-negative number"); + batchContextPoolSharedCapacity = sharedCapacity; + batchContextPoolThreadLocalCapacity = threadLocalCapacity; + } + } + /// /// Occurs when GrpcEnvironment is about the start the shutdown logic. /// If GrpcEnvironment is later initialized and shutdown, the event will be fired again (unless unregistered first). @@ -250,8 +274,7 @@ namespace Grpc.Core private GrpcEnvironment() { GrpcNativeInit(); - // TODO(jtattermusch): configure params - batchContextPool = new DefaultObjectPool(() => BatchContextSafeHandle.Create(this.batchContextPool), 10000, 64); + batchContextPool = new DefaultObjectPool(() => BatchContextSafeHandle.Create(this.batchContextPool), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity); threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers); threadPool.Start(); } -- cgit v1.2.3 From 53bbde3c490a6dda7d80aad329df739efb5f1cf5 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 28 Nov 2017 10:27:54 +0100 Subject: address comments --- src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs index 37c9ae06fe..2f030f3e02 100644 --- a/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs +++ b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs @@ -59,7 +59,7 @@ namespace Grpc.Core.Internal this.sharedCapacity = sharedCapacity; this.threadLocalData = new ThreadLocal(() => new ThreadLocalData(threadLocalCapacity), false); this.threadLocalCapacity = threadLocalCapacity; - this.rentLimit = threadLocalCapacity / 2; + this.rentLimit = threadLocalCapacity != 1 ? threadLocalCapacity / 2 : 1; } /// @@ -104,7 +104,7 @@ namespace Grpc.Core.Internal // next time we try to lease we will just create those // instead of trying to grab them from the shared queue. // This is to guarantee we won't be accessing the shared queue too often. - localData.CreateBudget += rentLimit - itemsMoved; + localData.CreateBudget = rentLimit - itemsMoved; return leasedItem ?? itemFactory(); } @@ -156,7 +156,7 @@ namespace Grpc.Core.Internal // next time we try to return we will just dispose the item // instead of trying to return them to the shared queue. // This is to guarantee we won't be accessing the shared queue too often. - localData.DisposeBudget += returnLimit - itemsReturned; + localData.DisposeBudget = returnLimit - itemsReturned; if (itemsReturned == 0) { -- cgit v1.2.3