using System; using Google.GRPC.Core.Internal; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; namespace Google.GRPC.Core.Internal { /// /// Pool of threads polling on the same completion queue. /// internal class GrpcThreadPool { readonly object myLock = new object(); readonly List threads = new List(); readonly int poolSize; readonly Action eventHandler; CompletionQueueSafeHandle cq; public GrpcThreadPool(int poolSize) { this.poolSize = poolSize; } internal GrpcThreadPool(int poolSize, Action eventHandler) { this.poolSize = poolSize; this.eventHandler = eventHandler; } public void Start() { lock (myLock) { if (cq != null) { throw new InvalidOperationException("Already started."); } cq = CompletionQueueSafeHandle.Create(); for (int i = 0; i < poolSize; i++) { threads.Add(CreateAndStartThread(i)); } } } public void Stop() { lock (myLock) { cq.Shutdown(); Console.WriteLine("Waiting for GPRC threads to finish."); foreach (var thread in threads) { thread.Join(); } cq.Dispose(); } } internal CompletionQueueSafeHandle CompletionQueue { get { return cq; } } private Thread CreateAndStartThread(int i) { Action body; if (eventHandler != null) { body = ThreadBodyWithHandler; } else { body = ThreadBodyNoHandler; } var thread = new Thread(new ThreadStart(body)); thread.IsBackground = false; thread.Start(); if (eventHandler != null) { thread.Name = "grpc_server_newrpc " + i; } else { thread.Name = "grpc " + i; } return thread; } /// /// Body of the polling thread. /// private void ThreadBodyNoHandler() { GRPCCompletionType completionType; do { completionType = cq.NextWithCallback(); } while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN); Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting."); } /// /// Body of the polling thread. /// private void ThreadBodyWithHandler() { GRPCCompletionType completionType; do { using (EventSafeHandle ev = cq.Next(Timespec.InfFuture)) { completionType = ev.GetCompletionType(); eventHandler(ev); } } while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN); Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting."); } } }