diff options
author | Jan Tattermusch <jtattermusch@users.noreply.github.com> | 2017-12-08 14:06:05 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-08 14:06:05 -0800 |
commit | 7a64e6964e18c4c9a642134ebea89ccbb5c75c2e (patch) | |
tree | 9651da3926ea8abd638f675efc388ea74c5a9cb4 /src/csharp | |
parent | 5b1a66cef125ef0dcc99c792a52ce76bc380564a (diff) | |
parent | 1aee9d9880cf2d10fc14b884f0bc94a37b0a1162 (diff) |
Merge pull request #13521 from jtattermusch/csharp_requestcallcontext_pooling
C#: Reuse RequestCallContextSafeHandle objects by pooling them.
Diffstat (limited to 'src/csharp')
11 files changed, 72 insertions, 16 deletions
diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs index 775c950c8c..7e4e2975c1 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs @@ -40,7 +40,7 @@ namespace Grpc.Core.Internal.Tests public void CreateAsyncAndShutdown() { var env = GrpcEnvironment.AddRef(); - var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env, () => BatchContextSafeHandle.Create())); + var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env, () => BatchContextSafeHandle.Create(), () => RequestCallContextSafeHandle.Create())); cq.Shutdown(); var ev = cq.Next(); cq.Dispose(); diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 2b1b5e32d7..7b4342b74b 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -35,6 +35,8 @@ namespace Grpc.Core const int MinDefaultThreadPoolSize = 4; const int DefaultBatchContextPoolSharedCapacity = 10000; const int DefaultBatchContextPoolThreadLocalCapacity = 64; + const int DefaultRequestCallContextPoolSharedCapacity = 10000; + const int DefaultRequestCallContextPoolThreadLocalCapacity = 64; static object staticLock = new object(); static GrpcEnvironment instance; @@ -44,12 +46,15 @@ namespace Grpc.Core static bool inlineHandlers; static int batchContextPoolSharedCapacity = DefaultBatchContextPoolSharedCapacity; static int batchContextPoolThreadLocalCapacity = DefaultBatchContextPoolThreadLocalCapacity; + static int requestCallContextPoolSharedCapacity = DefaultRequestCallContextPoolSharedCapacity; + static int requestCallContextPoolThreadLocalCapacity = DefaultRequestCallContextPoolThreadLocalCapacity; static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>(); static readonly HashSet<Server> registeredServers = new HashSet<Server>(); static ILogger logger = new LogLevelFilterLogger(new ConsoleLogger(), LogLevel.Off, true); readonly IObjectPool<BatchContextSafeHandle> batchContextPool; + readonly IObjectPool<RequestCallContextSafeHandle> requestCallContextPool; readonly GrpcThreadPool threadPool; readonly DebugStats debugStats = new DebugStats(); readonly AtomicCounter cqPickerCounter = new AtomicCounter(); @@ -263,6 +268,26 @@ namespace Grpc.Core } /// <summary> + /// Sets the parameters for a pool that caches request call context instances. Reusing request call context instances + /// instead of creating a new one for every requested call in C core helps reducing the GC pressure. + /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. + /// This is an advanced setting and you should only use it if you know what you are doing. + /// Most users should rely on the default value provided by gRPC library. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// </summary> + public static void SetRequestCallContextPoolParams(int sharedCapacity, int threadLocalCapacity) + { + lock (staticLock) + { + GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); + GrpcPreconditions.CheckArgument(sharedCapacity >= 0, "Shared capacity needs to be a non-negative number"); + GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0, "Thread local capacity needs to be a non-negative number"); + requestCallContextPoolSharedCapacity = sharedCapacity; + requestCallContextPoolThreadLocalCapacity = threadLocalCapacity; + } + } + + /// <summary> /// Occurs when <c>GrpcEnvironment</c> is about the start the shutdown logic. /// If <c>GrpcEnvironment</c> is later initialized and shutdown, the event will be fired again (unless unregistered first). /// </summary> @@ -275,6 +300,7 @@ namespace Grpc.Core { GrpcNativeInit(); batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(this.batchContextPool), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity); + requestCallContextPool = new DefaultObjectPool<RequestCallContextSafeHandle>(() => RequestCallContextSafeHandle.Create(this.requestCallContextPool), requestCallContextPoolSharedCapacity, requestCallContextPoolThreadLocalCapacity); threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers); threadPool.Start(); } @@ -292,6 +318,8 @@ namespace Grpc.Core internal IObjectPool<BatchContextSafeHandle> BatchContextPool => batchContextPool; + internal IObjectPool<RequestCallContextSafeHandle> RequestCallContextPool => requestCallContextPool; + internal bool IsAlive { get diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs index cf3f3c0995..79d0c91420 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs @@ -37,14 +37,16 @@ namespace Grpc.Core.Internal readonly GrpcEnvironment environment; readonly Func<BatchContextSafeHandle> batchContextFactory; + readonly Func<RequestCallContextSafeHandle> requestCallContextFactory; readonly Dictionary<IntPtr, IOpCompletionCallback> dict = new Dictionary<IntPtr, IOpCompletionCallback>(new IntPtrComparer()); SpinLock spinLock = new SpinLock(Debugger.IsAttached); IntPtr lastRegisteredKey; // only for testing - public CompletionRegistry(GrpcEnvironment environment, Func<BatchContextSafeHandle> batchContextFactory) + public CompletionRegistry(GrpcEnvironment environment, Func<BatchContextSafeHandle> batchContextFactory, Func<RequestCallContextSafeHandle> requestCallContextFactory) { this.environment = GrpcPreconditions.CheckNotNull(environment); this.batchContextFactory = GrpcPreconditions.CheckNotNull(batchContextFactory); + this.requestCallContextFactory = GrpcPreconditions.CheckNotNull(requestCallContextFactory); } public void Register(IntPtr key, IOpCompletionCallback callback) @@ -73,10 +75,12 @@ namespace Grpc.Core.Internal return ctx; } - public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback) + public RequestCallContextSafeHandle RegisterRequestCallCompletion(RequestCallCompletionDelegate callback) { + var ctx = requestCallContextFactory(); ctx.CompletionCallback = callback; Register(ctx.Handle, ctx); + return ctx; } public IOpCompletionCallback Extract(IntPtr key) diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index f1b5a4f9ff..8ddda9be5c 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -219,7 +219,7 @@ namespace Grpc.Core.Internal var list = new List<CompletionQueueSafeHandle>(); for (int i = 0; i < completionQueueCount; i++) { - var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease()); + var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease(), () => environment.RequestCallContextPool.Lease()); list.Add(CompletionQueueSafeHandle.CreateAsync(completionRegistry)); } return list.AsReadOnly(); diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index 43acb8f915..8b15c2690f 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -61,6 +61,7 @@ namespace Grpc.Core.Internal public readonly Delegates.grpcsharp_request_call_context_host_delegate grpcsharp_request_call_context_host; public readonly Delegates.grpcsharp_request_call_context_deadline_delegate grpcsharp_request_call_context_deadline; public readonly Delegates.grpcsharp_request_call_context_request_metadata_delegate grpcsharp_request_call_context_request_metadata; + public readonly Delegates.grpcsharp_request_call_context_reset_delegate grpcsharp_request_call_context_reset; public readonly Delegates.grpcsharp_request_call_context_destroy_delegate grpcsharp_request_call_context_destroy; public readonly Delegates.grpcsharp_composite_call_credentials_create_delegate grpcsharp_composite_call_credentials_create; @@ -179,6 +180,7 @@ namespace Grpc.Core.Internal this.grpcsharp_request_call_context_host = GetMethodDelegate<Delegates.grpcsharp_request_call_context_host_delegate>(library); this.grpcsharp_request_call_context_deadline = GetMethodDelegate<Delegates.grpcsharp_request_call_context_deadline_delegate>(library); this.grpcsharp_request_call_context_request_metadata = GetMethodDelegate<Delegates.grpcsharp_request_call_context_request_metadata_delegate>(library); + this.grpcsharp_request_call_context_reset = GetMethodDelegate<Delegates.grpcsharp_request_call_context_reset_delegate>(library); this.grpcsharp_request_call_context_destroy = GetMethodDelegate<Delegates.grpcsharp_request_call_context_destroy_delegate>(library); this.grpcsharp_composite_call_credentials_create = GetMethodDelegate<Delegates.grpcsharp_composite_call_credentials_create_delegate>(library); @@ -322,6 +324,7 @@ namespace Grpc.Core.Internal public delegate IntPtr grpcsharp_request_call_context_host_delegate(RequestCallContextSafeHandle ctx, out UIntPtr hostLength); public delegate Timespec grpcsharp_request_call_context_deadline_delegate(RequestCallContextSafeHandle ctx); public delegate IntPtr grpcsharp_request_call_context_request_metadata_delegate(RequestCallContextSafeHandle ctx); + public delegate void grpcsharp_request_call_context_reset_delegate(RequestCallContextSafeHandle ctx); public delegate void grpcsharp_request_call_context_destroy_delegate(IntPtr ctx); public delegate CallCredentialsSafeHandle grpcsharp_composite_call_credentials_create_delegate(CallCredentialsSafeHandle creds1, CallCredentialsSafeHandle creds2); diff --git a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs index 09f5c3e452..59e9d9b1ab 100644 --- a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs @@ -30,14 +30,17 @@ namespace Grpc.Core.Internal { static readonly NativeMethods Native = NativeMethods.Get(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<RequestCallContextSafeHandle>(); + IObjectPool<RequestCallContextSafeHandle> ownedByPool; private RequestCallContextSafeHandle() { } - public static RequestCallContextSafeHandle Create() + public static RequestCallContextSafeHandle Create(IObjectPool<RequestCallContextSafeHandle> ownedByPool = null) { - return Native.grpcsharp_request_call_context_create(); + var ctx = Native.grpcsharp_request_call_context_create(); + ctx.ownedByPool = ownedByPool; + return ctx; } public IntPtr Handle @@ -71,6 +74,19 @@ namespace Grpc.Core.Internal return new ServerRpcNew(server, call, method, host, deadline, metadata); } + public void Recycle() + { + if (ownedByPool != null) + { + Native.grpcsharp_request_call_context_reset(this); + ownedByPool.Return(this); + } + else + { + Dispose(); + } + } + protected override bool ReleaseHandle() { Native.grpcsharp_request_call_context_destroy(handle); @@ -90,7 +106,7 @@ namespace Grpc.Core.Internal finally { CompletionCallback = null; - Dispose(); + Recycle(); } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 9b7ea884dd..56dda9cff5 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -75,8 +75,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = RequestCallContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterRequestCallCompletion(ctx, callback); + var ctx = completionQueue.CompletionRegistry.RegisterRequestCallCompletion(callback); Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 71c7f108f3..60dacbf126 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -300,6 +300,7 @@ namespace Grpc.Core { if (!shutdownRequested) { + // TODO(jtattermusch): avoid unnecessary delegate allocation handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq); } } diff --git a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs index eefdb50e39..bb57a6968f 100644 --- a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs +++ b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs @@ -43,7 +43,7 @@ namespace Grpc.Microbenchmarks public void Run(int threadCount, int iterations, bool useSharedRegistry) { Console.WriteLine(string.Format("CompletionRegistryBenchmark: threads={0}, iterations={1}, useSharedRegistry={2}", threadCount, iterations, useSharedRegistry)); - CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create()) : null; + CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create(), () => RequestCallContextSafeHandle.Create()) : null; var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, sharedRegistry)); threadedBenchmark.Run(); // TODO: parametrize by number of pending completions @@ -51,7 +51,7 @@ namespace Grpc.Microbenchmarks private void ThreadBody(int iterations, CompletionRegistry optionalSharedRegistry) { - var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create()); + var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment, () => throw new NotImplementedException(), () => throw new NotImplementedException()); var ctx = BatchContextSafeHandle.Create(); var stopwatch = Stopwatch.StartNew(); diff --git a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs index da4f35ff96..390c062298 100644 --- a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs +++ b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs @@ -52,7 +52,7 @@ namespace Grpc.Microbenchmarks private void ThreadBody(int iterations, int payloadSize) { - var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease()); + var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease(), () => throw new NotImplementedException()); var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry); var call = CreateFakeCall(cq); diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 24d779e1e5..6875d40aa0 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -226,17 +226,22 @@ grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) { } GPR_EXPORT void GPR_CALLTYPE -grpcsharp_request_call_context_destroy(grpcsharp_request_call_context* ctx) { - if (!ctx) { - return; - } +grpcsharp_request_call_context_reset(grpcsharp_request_call_context* ctx) { /* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is supposed to take its ownership. */ grpc_call_details_destroy(&(ctx->call_details)); grpcsharp_metadata_array_destroy_metadata_only(&(ctx->request_metadata)); + memset(ctx, 0, sizeof(grpcsharp_request_call_context)); +} +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_request_call_context_destroy(grpcsharp_request_call_context* ctx) { + if (!ctx) { + return; + } + grpcsharp_request_call_context_reset(ctx); gpr_free(ctx); } |