aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2017-07-28 10:34:53 +0200
committerGravatar GitHub <noreply@github.com>2017-07-28 10:34:53 +0200
commita02d8c882d75dc2739e2763f5957ab608ee9fb2e (patch)
tree1500bbc1c7f6d2e8e52b43761cbe5bf9baee66d5
parent02f3b8604e4fccc382231259c12f4093f7d73468 (diff)
parent75e9eaed4410b54226d113bcb27aff7ba396f2fa (diff)
Merge pull request #11930 from jtattermusch/csharp_safe_threadpool
C#: offload work from GrpcThreadPool by default.
-rw-r--r--src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs98
-rwxr-xr-xsrc/csharp/Grpc.Core/Grpc.Core.csproj1
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs22
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs31
-rw-r--r--src/csharp/Grpc.IntegrationTesting/QpsWorker.cs5
-rw-r--r--src/csharp/tests.json1
6 files changed, 149 insertions, 9 deletions
diff --git a/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs
new file mode 100644
index 0000000000..fb18198945
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ThreadingModelTest.cs
@@ -0,0 +1,98 @@
+#region Copyright notice and license
+
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using NUnit.Framework;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Grpc.Core.Tests
+{
+ public class ThreadingModelTest
+ {
+ const string Host = "127.0.0.1";
+
+ MockServiceHelper helper;
+ Server server;
+ Channel channel;
+
+ [SetUp]
+ public void Init()
+ {
+ helper = new MockServiceHelper(Host);
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ channel.ShutdownAsync().Wait();
+ server.ShutdownAsync().Wait();
+ }
+
+ [Test]
+ public void BlockingCallInServerHandlerDoesNotDeadlock()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ int recursionDepth = int.Parse(request);
+ if (recursionDepth <= 0) {
+ return "SUCCESS";
+ }
+ return Calls.BlockingUnaryCall(helper.CreateUnaryCall(), (recursionDepth - 1).ToString());
+ });
+
+ int maxRecursionDepth = Environment.ProcessorCount * 2; // make sure we have more pending blocking calls than threads in GrpcThreadPool
+ Assert.AreEqual("SUCCESS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), maxRecursionDepth.ToString()));
+ }
+
+ [Test]
+ public void HandlerDoesNotRunOnGrpcThread()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ if (IsRunningOnGrpcThreadPool()) {
+ return "Server handler should not run on gRPC threadpool thread.";
+ }
+ return request;
+ });
+
+ Assert.AreEqual("ABC", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
+ }
+
+ [Test]
+ public async Task ContinuationDoesNotRunOnGrpcThread()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ return request;
+ });
+
+ await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "ABC");
+ Assert.IsFalse(IsRunningOnGrpcThreadPool());
+ }
+
+ private static bool IsRunningOnGrpcThreadPool()
+ {
+ var threadName = Thread.CurrentThread.Name ?? "";
+ return threadName.Contains("grpc");
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 50358298f4..e32711c520 100755
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -64,6 +64,7 @@
<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard1.5' ">
<PackageReference Include="System.Runtime.Loader" Version="4.0.0" />
<PackageReference Include="System.Threading.Thread" Version="4.0.0" />
+ <PackageReference Include="System.Threading.ThreadPool" Version="4.0.0" />
</ItemGroup>
<Import Project="NativeDeps.csproj.include" />
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 8d0c66aa5b..0663ee9215 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -39,6 +39,7 @@ namespace Grpc.Core
static int refCount;
static int? customThreadPoolSize;
static int? customCompletionQueueCount;
+ static bool inlineHandlers;
static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
static readonly HashSet<Server> registeredServers = new HashSet<Server>();
@@ -218,12 +219,31 @@ namespace Grpc.Core
}
/// <summary>
+ /// By default, gRPC's internal event handlers get offloaded to .NET default thread pool thread (<c>inlineHandlers=false</c>).
+ /// Setting <c>inlineHandlers</c> to <c>true</c> will allow scheduling the event handlers directly to
+ /// <c>GrpcThreadPool</c> internal threads. That can lead to significant performance gains in some situations,
+ /// but requires user to never block in async code (incorrectly written code can easily lead to deadlocks).
+ /// Inlining handlers is an advanced setting and you should only use it if you know what you are doing.
+ /// Most users should rely on the default value provided by gRPC library.
+ /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
+ /// Note: <c>inlineHandlers=true</c> was the default in gRPC C# v1.4.x and earlier.
+ /// </summary>
+ public static void SetHandlerInlining(bool inlineHandlers)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
+ GrpcEnvironment.inlineHandlers = inlineHandlers;
+ }
+ }
+
+ /// <summary>
/// Creates gRPC environment.
/// </summary>
private GrpcEnvironment()
{
GrpcNativeInit();
- threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault());
+ threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers);
threadPool.Start();
}
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index f9ae77c74e..19b44c2618 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -33,12 +33,15 @@ namespace Grpc.Core.Internal
internal class GrpcThreadPool
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>();
+ static readonly WaitCallback RunCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true));
+ static readonly WaitCallback RunCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false));
readonly GrpcEnvironment environment;
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
readonly int completionQueueCount;
+ readonly bool inlineHandlers;
readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>(); // profilers assigned to threadpool threads
@@ -52,11 +55,13 @@ namespace Grpc.Core.Internal
/// <param name="environment">Environment.</param>
/// <param name="poolSize">Pool size.</param>
/// <param name="completionQueueCount">Completion queue count.</param>
- public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount)
+ /// <param name="inlineHandlers">Handler inlining.</param>
+ public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount, bool inlineHandlers)
{
this.environment = environment;
this.poolSize = poolSize;
this.completionQueueCount = completionQueueCount;
+ this.inlineHandlers = inlineHandlers;
GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
"Thread pool size cannot be smaller than the number of completion queues used.");
}
@@ -165,11 +170,19 @@ namespace Grpc.Core.Internal
try
{
var callback = cq.CompletionRegistry.Extract(tag);
- callback(success);
+ // Use cached delegates to avoid unnecessary allocations
+ if (!inlineHandlers)
+ {
+ ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback);
+ }
+ else
+ {
+ RunCompletionQueueEventCallback(callback, success);
+ }
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured while invoking completion delegate");
+ Logger.Error(e, "Exception occured while extracting event from completion registry.");
}
}
}
@@ -186,5 +199,17 @@ namespace Grpc.Core.Internal
}
return list.AsReadOnly();
}
+
+ private static void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success)
+ {
+ try
+ {
+ callback(success);
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, "Exception occured while invoking completion delegate");
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
index 7009a93b18..a579fb8040 100644
--- a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
+++ b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
@@ -63,11 +63,6 @@ namespace Grpc.IntegrationTesting
private async Task RunAsync()
{
- // (ThreadPoolSize == ProcessorCount) gives best throughput in benchmarks
- // and doesn't seem to harm performance even when server and client
- // are running on the same machine.
- GrpcEnvironment.SetThreadPoolSize(Environment.ProcessorCount);
-
string host = "0.0.0.0";
int port = options.DriverPort;
diff --git a/src/csharp/tests.json b/src/csharp/tests.json
index bc6adbbfe8..7841051052 100644
--- a/src/csharp/tests.json
+++ b/src/csharp/tests.json
@@ -31,6 +31,7 @@
"Grpc.Core.Tests.ShutdownHookPendingCallTest",
"Grpc.Core.Tests.ShutdownHookServerTest",
"Grpc.Core.Tests.ShutdownTest",
+ "Grpc.Core.Tests.ThreadingModelTest",
"Grpc.Core.Tests.TimeoutsTest",
"Grpc.Core.Tests.UserAgentStringTest"
],