diff options
Diffstat (limited to 'src/csharp/Grpc.Core/GrpcEnvironment.cs')
-rw-r--r-- | src/csharp/Grpc.Core/GrpcEnvironment.cs | 53 |
1 files changed, 40 insertions, 13 deletions
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index bee0ef1d62..18af1099f1 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -32,8 +32,9 @@ #endregion using System; +using System.Collections.Generic; +using System.Linq; using System.Runtime.InteropServices; -using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; using Grpc.Core.Utils; @@ -51,12 +52,13 @@ namespace Grpc.Core static GrpcEnvironment instance; static int refCount; static int? customThreadPoolSize; + static int? customCompletionQueueCount; static ILogger logger = new ConsoleLogger(); readonly GrpcThreadPool threadPool; - readonly CompletionRegistry completionRegistry; readonly DebugStats debugStats = new DebugStats(); + readonly AtomicCounter cqPickerCounter = new AtomicCounter(); bool isClosed; /// <summary> @@ -141,36 +143,51 @@ namespace Grpc.Core } /// <summary> + /// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events. + /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. + /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing. + /// Most users should rely on the default value provided by gRPC library. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// </summary> + public static void SetCompletionQueueCount(int completionQueueCount) + { + lock (staticLock) + { + GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); + GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number"); + customCompletionQueueCount = completionQueueCount; + } + } + + /// <summary> /// Creates gRPC environment. /// </summary> private GrpcEnvironment() { GrpcNativeInit(); - completionRegistry = new CompletionRegistry(this); - threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault()); + threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault()); threadPool.Start(); } /// <summary> - /// Gets the completion registry used by this gRPC environment. + /// Gets the completion queues used by this gRPC environment. /// </summary> - internal CompletionRegistry CompletionRegistry + internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues { get { - return this.completionRegistry; + return this.threadPool.CompletionQueues; } } /// <summary> - /// Gets the completion queue used by this gRPC environment. + /// Picks a completion queue in a round-robin fashion. + /// Shouldn't be invoked on a per-call basis (used at per-channel basis). /// </summary> - internal CompletionQueueSafeHandle CompletionQueue + internal CompletionQueueSafeHandle PickCompletionQueue() { - get - { - return this.threadPool.CompletionQueue; - } + var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count); + return this.threadPool.CompletionQueues.ElementAt(cqIndex); } /// <summary> @@ -230,5 +247,15 @@ namespace Grpc.Core // more work, but seems to work reasonably well for a start. return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2); } + + private int GetCompletionQueueCountOrDefault() + { + if (customCompletionQueueCount.HasValue) + { + return customCompletionQueueCount.Value; + } + // by default, create a completion queue for each thread + return GetThreadPoolSizeOrDefault(); + } } } |