aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
diff options
context:
space:
mode:
authorGravatar Jorge Canizales <jcanizales@google.com>2016-06-30 15:37:11 -0700
committerGravatar Jorge Canizales <jcanizales@google.com>2016-06-30 15:37:11 -0700
commitc93d6a66a12110f82882807d95ad5dcb02370151 (patch)
treeefb5e632e8ad237e05d63af14e54c2a8d83ad87b /src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
parent095172c3a52a11c42aed0150eb8dbb47186fd2a0 (diff)
parenta5596db1a53723789d7c90c23d9cbfbb8207f949 (diff)
Merge master into merge-0.14-into-master
Conflicts: - gRPC.podspec - Only had non-trivial changes in the core file list, which will need to be regenerated (in gRPC-Core.podspec). - src/objective-c/BoringSSL.podspec - Had trivial conflicts in the version. - src/objective-c/examples/RemoteTestClient/RemoteTest.podspec - Trivial conflicts in quoting. - src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj and src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj - The master version is used, pending testing. The 0.14 version had emoji and some unneeded entries. - src/objective-c/tests/Podfile - Added CronetFramework pod, and warning silencing from master. - templates/gRPC.podspec.template - Deleted. - third_party/protobuf - Using master commit, but need to verify if it works for frameworks.
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs')
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs98
1 files changed, 75 insertions, 23 deletions
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index 4b7124ee74..a446c1f99f 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -33,15 +33,16 @@
using System;
using System.Collections.Generic;
-using System.Runtime.InteropServices;
+using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Logging;
+using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
- /// Pool of threads polling on the same completion queue.
+ /// Pool of threads polling on a set of completions queues.
/// </summary>
internal class GrpcThreadPool
{
@@ -51,25 +52,33 @@ namespace Grpc.Core.Internal
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
+ readonly int completionQueueCount;
- CompletionQueueSafeHandle cq;
+ bool stopRequested;
- public GrpcThreadPool(GrpcEnvironment environment, int poolSize)
+ IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
+
+ /// <summary>
+ /// Creates a thread pool threads polling on a set of completions queues.
+ /// </summary>
+ /// <param name="environment">Environment.</param>
+ /// <param name="poolSize">Pool size.</param>
+ /// <param name="completionQueueCount">Completion queue count.</param>
+ public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount)
{
this.environment = environment;
this.poolSize = poolSize;
+ this.completionQueueCount = completionQueueCount;
+ GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
+ "Thread pool size cannot be smaller than the number of completion queues used.");
}
public void Start()
{
lock (myLock)
{
- if (cq != null)
- {
- throw new InvalidOperationException("Already started.");
- }
-
- cq = CompletionQueueSafeHandle.Create();
+ GrpcPreconditions.CheckState(completionQueues == null, "Already started.");
+ completionQueues = CreateCompletionQueueList(environment, completionQueueCount);
for (int i = 0; i < poolSize; i++)
{
@@ -78,53 +87,85 @@ namespace Grpc.Core.Internal
}
}
- public void Stop()
+ public Task StopAsync()
{
lock (myLock)
{
- cq.Shutdown();
+ GrpcPreconditions.CheckState(!stopRequested, "Stop already requested.");
+ stopRequested = true;
+
+ foreach (var cq in completionQueues)
+ {
+ cq.Shutdown();
+ }
+ }
+
+ return Task.Run(() =>
+ {
foreach (var thread in threads)
{
thread.Join();
}
- cq.Dispose();
+ foreach (var cq in completionQueues)
+ {
+ cq.Dispose();
+ }
+ });
+ }
+
+ /// <summary>
+ /// Returns true if there is at least one thread pool thread that hasn't
+ /// already stopped.
+ /// Threads can either stop because all completion queues shut down or
+ /// because all foreground threads have already shutdown and process is
+ /// going to exit.
+ /// </summary>
+ internal bool IsAlive
+ {
+ get
+ {
+ return threads.Any(t => t.ThreadState != ThreadState.Stopped);
}
}
- internal CompletionQueueSafeHandle CompletionQueue
+ internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
- return cq;
+ return completionQueues;
}
}
- private Thread CreateAndStartThread(int i)
+ private Thread CreateAndStartThread(int threadIndex)
{
- var thread = new Thread(new ThreadStart(RunHandlerLoop));
- thread.IsBackground = false;
+ var cqIndex = threadIndex % completionQueues.Count;
+ var cq = completionQueues.ElementAt(cqIndex);
+
+ var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq)));
+ thread.IsBackground = true;
+ thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
thread.Start();
- thread.Name = "grpc " + i;
+
return thread;
}
/// <summary>
/// Body of the polling thread.
/// </summary>
- private void RunHandlerLoop()
+ private void RunHandlerLoop(CompletionQueueSafeHandle cq)
{
CompletionQueueEvent ev;
do
{
ev = cq.Next();
- if (ev.type == GRPCCompletionType.OpComplete)
+ if (ev.type == CompletionQueueEvent.CompletionType.OpComplete)
{
bool success = (ev.success != 0);
IntPtr tag = ev.tag;
try
{
- var callback = environment.CompletionRegistry.Extract(tag);
+ var callback = cq.CompletionRegistry.Extract(tag);
callback(success);
}
catch (Exception e)
@@ -133,7 +174,18 @@ namespace Grpc.Core.Internal
}
}
}
- while (ev.type != GRPCCompletionType.Shutdown);
+ while (ev.type != CompletionQueueEvent.CompletionType.Shutdown);
+ }
+
+ private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount)
+ {
+ var list = new List<CompletionQueueSafeHandle>();
+ for (int i = 0; i < completionQueueCount; i++)
+ {
+ var completionRegistry = new CompletionRegistry(environment);
+ list.Add(CompletionQueueSafeHandle.Create(completionRegistry));
+ }
+ return list.AsReadOnly();
}
}
}