diff options
author | Jan Tattermusch <jtattermusch@users.noreply.github.com> | 2017-07-28 10:34:53 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-07-28 10:34:53 +0200 |
commit | a02d8c882d75dc2739e2763f5957ab608ee9fb2e (patch) | |
tree | 1500bbc1c7f6d2e8e52b43761cbe5bf9baee66d5 | |
parent | 02f3b8604e4fccc382231259c12f4093f7d73468 (diff) | |
parent | 75e9eaed4410b54226d113bcb27aff7ba396f2fa (diff) |
Merge pull request #11930 from jtattermusch/csharp_safe_threadpool
C#: offload work from GrpcThreadPool by default.
-rw-r--r-- | src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs | 98 | ||||
-rwxr-xr-x | src/csharp/Grpc.Core/Grpc.Core.csproj | 1 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/GrpcEnvironment.cs | 22 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs | 31 | ||||
-rw-r--r-- | src/csharp/Grpc.IntegrationTesting/QpsWorker.cs | 5 | ||||
-rw-r--r-- | src/csharp/tests.json | 1 |
6 files changed, 149 insertions, 9 deletions
diff --git a/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs new file mode 100644 index 0000000000..fb18198945 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs @@ -0,0 +1,98 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using NUnit.Framework; +using System.Threading; +using System.Threading.Tasks; + +namespace Grpc.Core.Tests +{ + public class ThreadingModelTest + { + const string Host = "127.0.0.1"; + + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(Host); + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [TearDown] + public void Cleanup() + { + channel.ShutdownAsync().Wait(); + server.ShutdownAsync().Wait(); + } + + [Test] + public void BlockingCallInServerHandlerDoesNotDeadlock() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + int recursionDepth = int.Parse(request); + if (recursionDepth <= 0) { + return "SUCCESS"; + } + return Calls.BlockingUnaryCall(helper.CreateUnaryCall(), (recursionDepth - 1).ToString()); + }); + + int maxRecursionDepth = Environment.ProcessorCount * 2; // make sure we have more pending blocking calls than threads in GrpcThreadPool + Assert.AreEqual("SUCCESS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), maxRecursionDepth.ToString())); + } + + [Test] + public void HandlerDoesNotRunOnGrpcThread() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + if (IsRunningOnGrpcThreadPool()) { + return "Server handler should not run on gRPC threadpool thread."; + } + return request; + }); + + Assert.AreEqual("ABC", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC")); + } + + [Test] + public async Task ContinuationDoesNotRunOnGrpcThread() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + return request; + }); + + await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "ABC"); + Assert.IsFalse(IsRunningOnGrpcThreadPool()); + } + + private static bool IsRunningOnGrpcThreadPool() + { + var threadName = Thread.CurrentThread.Name ?? ""; + return threadName.Contains("grpc"); + } + } +} diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 50358298f4..e32711c520 100755 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -64,6 +64,7 @@ <ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.5' "> <PackageReference Include="System.Runtime.Loader" Version="4.0.0" /> <PackageReference Include="System.Threading.Thread" Version="4.0.0" /> + <PackageReference Include="System.Threading.ThreadPool" Version="4.0.0" /> </ItemGroup> <Import Project="NativeDeps.csproj.include" /> diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 8d0c66aa5b..0663ee9215 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -39,6 +39,7 @@ namespace Grpc.Core static int refCount; static int? customThreadPoolSize; static int? customCompletionQueueCount; + static bool inlineHandlers; static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>(); static readonly HashSet<Server> registeredServers = new HashSet<Server>(); @@ -218,12 +219,31 @@ namespace Grpc.Core } /// <summary> + /// By default, gRPC's internal event handlers get offloaded to .NET default thread pool thread (<c>inlineHandlers=false</c>). + /// Setting <c>inlineHandlers</c> to <c>true</c> will allow scheduling the event handlers directly to + /// <c>GrpcThreadPool</c> internal threads. That can lead to significant performance gains in some situations, + /// but requires user to never block in async code (incorrectly written code can easily lead to deadlocks). + /// Inlining handlers is an advanced setting and you should only use it if you know what you are doing. + /// Most users should rely on the default value provided by gRPC library. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// Note: <c>inlineHandlers=true</c> was the default in gRPC C# v1.4.x and earlier. + /// </summary> + public static void SetHandlerInlining(bool inlineHandlers) + { + lock (staticLock) + { + GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); + GrpcEnvironment.inlineHandlers = inlineHandlers; + } + } + + /// <summary> /// Creates gRPC environment. /// </summary> private GrpcEnvironment() { GrpcNativeInit(); - threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault()); + threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers); threadPool.Start(); } diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index f9ae77c74e..19b44c2618 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -33,12 +33,15 @@ namespace Grpc.Core.Internal internal class GrpcThreadPool { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>(); + static readonly WaitCallback RunCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true)); + static readonly WaitCallback RunCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false)); readonly GrpcEnvironment environment; readonly object myLock = new object(); readonly List<Thread> threads = new List<Thread>(); readonly int poolSize; readonly int completionQueueCount; + readonly bool inlineHandlers; readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>(); // profilers assigned to threadpool threads @@ -52,11 +55,13 @@ namespace Grpc.Core.Internal /// <param name="environment">Environment.</param> /// <param name="poolSize">Pool size.</param> /// <param name="completionQueueCount">Completion queue count.</param> - public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount) + /// <param name="inlineHandlers">Handler inlining.</param> + public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount, bool inlineHandlers) { this.environment = environment; this.poolSize = poolSize; this.completionQueueCount = completionQueueCount; + this.inlineHandlers = inlineHandlers; GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount, "Thread pool size cannot be smaller than the number of completion queues used."); } @@ -165,11 +170,19 @@ namespace Grpc.Core.Internal try { var callback = cq.CompletionRegistry.Extract(tag); - callback(success); + // Use cached delegates to avoid unnecessary allocations + if (!inlineHandlers) + { + ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback); + } + else + { + RunCompletionQueueEventCallback(callback, success); + } } catch (Exception e) { - Logger.Error(e, "Exception occured while invoking completion delegate"); + Logger.Error(e, "Exception occured while extracting event from completion registry."); } } } @@ -186,5 +199,17 @@ namespace Grpc.Core.Internal } return list.AsReadOnly(); } + + private static void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success) + { + try + { + callback(success); + } + catch (Exception e) + { + Logger.Error(e, "Exception occured while invoking completion delegate"); + } + } } } diff --git a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs index 7009a93b18..a579fb8040 100644 --- a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs +++ b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs @@ -63,11 +63,6 @@ namespace Grpc.IntegrationTesting private async Task RunAsync() { - // (ThreadPoolSize == ProcessorCount) gives best throughput in benchmarks - // and doesn't seem to harm performance even when server and client - // are running on the same machine. - GrpcEnvironment.SetThreadPoolSize(Environment.ProcessorCount); - string host = "0.0.0.0"; int port = options.DriverPort; diff --git a/src/csharp/tests.json b/src/csharp/tests.json index bc6adbbfe8..7841051052 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -31,6 +31,7 @@ "Grpc.Core.Tests.ShutdownHookPendingCallTest", "Grpc.Core.Tests.ShutdownHookServerTest", "Grpc.Core.Tests.ShutdownTest", + "Grpc.Core.Tests.ThreadingModelTest", "Grpc.Core.Tests.TimeoutsTest", "Grpc.Core.Tests.UserAgentStringTest" ], |