From 633434aed2b7db1e206c884257b198e04f2bf60e Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 24 Jul 2017 17:11:35 +0200 Subject: dont run user handlers on grpc threadpool threads --- src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs | 98 ++++++++++++++++++++++++ src/csharp/Grpc.Core/Grpc.Core.csproj | 1 + src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs | 19 ++++- src/csharp/tests.json | 1 + 4 files changed, 117 insertions(+), 2 deletions(-) create mode 100644 src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs diff --git a/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs new file mode 100644 index 0000000000..fb18198945 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs @@ -0,0 +1,98 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using NUnit.Framework; +using System.Threading; +using System.Threading.Tasks; + +namespace Grpc.Core.Tests +{ + public class ThreadingModelTest + { + const string Host = "127.0.0.1"; + + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(Host); + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [TearDown] + public void Cleanup() + { + channel.ShutdownAsync().Wait(); + server.ShutdownAsync().Wait(); + } + + [Test] + public void BlockingCallInServerHandlerDoesNotDeadlock() + { + helper.UnaryHandler = new UnaryServerMethod(async (request, context) => + { + int recursionDepth = int.Parse(request); + if (recursionDepth <= 0) { + return "SUCCESS"; + } + return Calls.BlockingUnaryCall(helper.CreateUnaryCall(), (recursionDepth - 1).ToString()); + }); + + int maxRecursionDepth = Environment.ProcessorCount * 2; // make sure we have more pending blocking calls than threads in GrpcThreadPool + Assert.AreEqual("SUCCESS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), maxRecursionDepth.ToString())); + } + + [Test] + public void HandlerDoesNotRunOnGrpcThread() + { + helper.UnaryHandler = new UnaryServerMethod(async (request, context) => + { + if (IsRunningOnGrpcThreadPool()) { + return "Server handler should not run on gRPC threadpool thread."; + } + return request; + }); + + Assert.AreEqual("ABC", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC")); + } + + [Test] + public async Task ContinuationDoesNotRunOnGrpcThread() + { + helper.UnaryHandler = new UnaryServerMethod(async (request, context) => + { + return request; + }); + + await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "ABC"); + Assert.IsFalse(IsRunningOnGrpcThreadPool()); + } + + private static bool IsRunningOnGrpcThreadPool() + { + var threadName = Thread.CurrentThread.Name ?? ""; + return threadName.Contains("grpc"); + } + } +} diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 50358298f4..e32711c520 100755 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -64,6 +64,7 @@ + diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index f9ae77c74e..8640058b0c 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -33,6 +33,8 @@ namespace Grpc.Core.Internal internal class GrpcThreadPool { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); + static readonly WaitCallback RunCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true)); + static readonly WaitCallback RunCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false)); readonly GrpcEnvironment environment; readonly object myLock = new object(); @@ -165,11 +167,12 @@ namespace Grpc.Core.Internal try { var callback = cq.CompletionRegistry.Extract(tag); - callback(success); + // Use cached delegates to avoid unnecessary allocations + ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback); } catch (Exception e) { - Logger.Error(e, "Exception occured while invoking completion delegate"); + Logger.Error(e, "Exception occured while extracting event from completion registry."); } } } @@ -186,5 +189,17 @@ namespace Grpc.Core.Internal } return list.AsReadOnly(); } + + private static void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success) + { + try + { + callback(success); + } + catch (Exception e) + { + Logger.Error(e, "Exception occured while invoking completion delegate"); + } + } } } diff --git a/src/csharp/tests.json b/src/csharp/tests.json index bc6adbbfe8..7841051052 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -31,6 +31,7 @@ "Grpc.Core.Tests.ShutdownHookPendingCallTest", "Grpc.Core.Tests.ShutdownHookServerTest", "Grpc.Core.Tests.ShutdownTest", + "Grpc.Core.Tests.ThreadingModelTest", "Grpc.Core.Tests.TimeoutsTest", "Grpc.Core.Tests.UserAgentStringTest" ], -- cgit v1.2.3 From 7dbd72497ceee212a33db09c51e0ad3c50a90b7e Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 25 Jul 2017 14:24:58 +0200 Subject: introduce inlineHandlers setting --- src/csharp/Grpc.Core/GrpcEnvironment.cs | 22 +++++++++++++++++++++- src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs | 14 ++++++++++++-- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 8d0c66aa5b..0663ee9215 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -39,6 +39,7 @@ namespace Grpc.Core static int refCount; static int? customThreadPoolSize; static int? customCompletionQueueCount; + static bool inlineHandlers; static readonly HashSet registeredChannels = new HashSet(); static readonly HashSet registeredServers = new HashSet(); @@ -217,13 +218,32 @@ namespace Grpc.Core } } + /// + /// By default, gRPC's internal event handlers get offloaded to .NET default thread pool thread (inlineHandlers=false). + /// Setting inlineHandlers to true will allow scheduling the event handlers directly to + /// GrpcThreadPool internal threads. That can lead to significant performance gains in some situations, + /// but requires user to never block in async code (incorrectly written code can easily lead to deadlocks). + /// Inlining handlers is an advanced setting and you should only use it if you know what you are doing. + /// Most users should rely on the default value provided by gRPC library. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// Note: inlineHandlers=true was the default in gRPC C# v1.4.x and earlier. + /// + public static void SetHandlerInlining(bool inlineHandlers) + { + lock (staticLock) + { + GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); + GrpcEnvironment.inlineHandlers = inlineHandlers; + } + } + /// /// Creates gRPC environment. /// private GrpcEnvironment() { GrpcNativeInit(); - threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault()); + threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers); threadPool.Start(); } diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index 8640058b0c..19b44c2618 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -41,6 +41,7 @@ namespace Grpc.Core.Internal readonly List threads = new List(); readonly int poolSize; readonly int completionQueueCount; + readonly bool inlineHandlers; readonly List threadProfilers = new List(); // profilers assigned to threadpool threads @@ -54,11 +55,13 @@ namespace Grpc.Core.Internal /// Environment. /// Pool size. /// Completion queue count. - public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount) + /// Handler inlining. + public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount, bool inlineHandlers) { this.environment = environment; this.poolSize = poolSize; this.completionQueueCount = completionQueueCount; + this.inlineHandlers = inlineHandlers; GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount, "Thread pool size cannot be smaller than the number of completion queues used."); } @@ -168,7 +171,14 @@ namespace Grpc.Core.Internal { var callback = cq.CompletionRegistry.Extract(tag); // Use cached delegates to avoid unnecessary allocations - ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback); + if (!inlineHandlers) + { + ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback); + } + else + { + RunCompletionQueueEventCallback(callback, success); + } } catch (Exception e) { -- cgit v1.2.3 From 75e9eaed4410b54226d113bcb27aff7ba396f2fa Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 25 Jul 2017 12:23:28 +0200 Subject: reduce threadpool size for qps benchmarks --- src/csharp/Grpc.IntegrationTesting/QpsWorker.cs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs index 7009a93b18..a579fb8040 100644 --- a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs +++ b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs @@ -63,11 +63,6 @@ namespace Grpc.IntegrationTesting private async Task RunAsync() { - // (ThreadPoolSize == ProcessorCount) gives best throughput in benchmarks - // and doesn't seem to harm performance even when server and client - // are running on the same machine. - GrpcEnvironment.SetThreadPoolSize(Environment.ProcessorCount); - string host = "0.0.0.0"; int port = options.DriverPort; -- cgit v1.2.3