aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/GrpcEnvironment.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/GrpcEnvironment.cs')
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs53
1 files changed, 40 insertions, 13 deletions
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index bee0ef1d62..18af1099f1 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -32,8 +32,9 @@
#endregion
using System;
+using System.Collections.Generic;
+using System.Linq;
using System.Runtime.InteropServices;
-using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
@@ -51,12 +52,13 @@ namespace Grpc.Core
static GrpcEnvironment instance;
static int refCount;
static int? customThreadPoolSize;
+ static int? customCompletionQueueCount;
static ILogger logger = new ConsoleLogger();
readonly GrpcThreadPool threadPool;
- readonly CompletionRegistry completionRegistry;
readonly DebugStats debugStats = new DebugStats();
+ readonly AtomicCounter cqPickerCounter = new AtomicCounter();
bool isClosed;
/// <summary>
@@ -141,36 +143,51 @@ namespace Grpc.Core
}
/// <summary>
+ /// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events.
+ /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
+ /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing.
+ /// Most users should rely on the default value provided by gRPC library.
+ /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
+ /// </summary>
+ public static void SetCompletionQueueCount(int completionQueueCount)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
+ GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number");
+ customCompletionQueueCount = completionQueueCount;
+ }
+ }
+
+ /// <summary>
/// Creates gRPC environment.
/// </summary>
private GrpcEnvironment()
{
GrpcNativeInit();
- completionRegistry = new CompletionRegistry(this);
- threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault());
+ threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault());
threadPool.Start();
}
/// <summary>
- /// Gets the completion registry used by this gRPC environment.
+ /// Gets the completion queues used by this gRPC environment.
/// </summary>
- internal CompletionRegistry CompletionRegistry
+ internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
- return this.completionRegistry;
+ return this.threadPool.CompletionQueues;
}
}
/// <summary>
- /// Gets the completion queue used by this gRPC environment.
+ /// Picks a completion queue in a round-robin fashion.
+ /// Shouldn't be invoked on a per-call basis (used at per-channel basis).
/// </summary>
- internal CompletionQueueSafeHandle CompletionQueue
+ internal CompletionQueueSafeHandle PickCompletionQueue()
{
- get
- {
- return this.threadPool.CompletionQueue;
- }
+ var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count);
+ return this.threadPool.CompletionQueues.ElementAt(cqIndex);
}
/// <summary>
@@ -230,5 +247,15 @@ namespace Grpc.Core
// more work, but seems to work reasonably well for a start.
return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2);
}
+
+ private int GetCompletionQueueCountOrDefault()
+ {
+ if (customCompletionQueueCount.HasValue)
+ {
+ return customCompletionQueueCount.Value;
+ }
+ // by default, create a completion queue for each thread
+ return GetThreadPoolSizeOrDefault();
+ }
}
}