aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/GrpcEnvironment.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/GrpcEnvironment.cs')
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs177
1 files changed, 159 insertions, 18 deletions
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index bee0ef1d62..eeed699712 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -32,6 +32,8 @@
#endregion
using System;
+using System.Collections.Generic;
+using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@@ -45,18 +47,24 @@ namespace Grpc.Core
/// </summary>
public class GrpcEnvironment
{
+ const LogLevel DefaultLogLevel = LogLevel.Info;
const int MinDefaultThreadPoolSize = 4;
static object staticLock = new object();
static GrpcEnvironment instance;
static int refCount;
static int? customThreadPoolSize;
+ static int? customCompletionQueueCount;
+ static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
+ static readonly HashSet<Server> registeredServers = new HashSet<Server>();
- static ILogger logger = new ConsoleLogger();
+ static ILogger logger = new LogLevelFilterLogger(new ConsoleLogger(), DefaultLogLevel);
+ readonly object myLock = new object();
readonly GrpcThreadPool threadPool;
- readonly CompletionRegistry completionRegistry;
readonly DebugStats debugStats = new DebugStats();
+ readonly AtomicCounter cqPickerCounter = new AtomicCounter();
+
bool isClosed;
/// <summary>
@@ -65,6 +73,8 @@ namespace Grpc.Core
/// </summary>
internal static GrpcEnvironment AddRef()
{
+ ShutdownHooks.Register();
+
lock (staticLock)
{
refCount++;
@@ -77,21 +87,26 @@ namespace Grpc.Core
}
/// <summary>
- /// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero.
- /// (and blocks until the environment has been fully shutdown).
+ /// Decrements the reference count for currently active environment and asynchronously shuts down the gRPC environment if reference count drops to zero.
/// </summary>
- internal static void Release()
+ internal static async Task ReleaseAsync()
{
+ GrpcEnvironment instanceToShutdown = null;
lock (staticLock)
{
GrpcPreconditions.CheckState(refCount > 0);
refCount--;
if (refCount == 0)
{
- instance.Close();
+ instanceToShutdown = instance;
instance = null;
}
}
+
+ if (instanceToShutdown != null)
+ {
+ await instanceToShutdown.ShutdownAsync().ConfigureAwait(false);
+ }
}
internal static int GetRefCount()
@@ -102,6 +117,68 @@ namespace Grpc.Core
}
}
+ internal static void RegisterChannel(Channel channel)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(channel);
+ registeredChannels.Add(channel);
+ }
+ }
+
+ internal static void UnregisterChannel(Channel channel)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(channel);
+ GrpcPreconditions.CheckArgument(registeredChannels.Remove(channel), "Channel not found in the registered channels set.");
+ }
+ }
+
+ internal static void RegisterServer(Server server)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(server);
+ registeredServers.Add(server);
+ }
+ }
+
+ internal static void UnregisterServer(Server server)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(server);
+ GrpcPreconditions.CheckArgument(registeredServers.Remove(server), "Server not found in the registered servers set.");
+ }
+ }
+
+ /// <summary>
+ /// Requests shutdown of all channels created by the current process.
+ /// </summary>
+ public static Task ShutdownChannelsAsync()
+ {
+ HashSet<Channel> snapshot = null;
+ lock (staticLock)
+ {
+ snapshot = new HashSet<Channel>(registeredChannels);
+ }
+ return Task.WhenAll(snapshot.Select((channel) => channel.ShutdownAsync()));
+ }
+
+ /// <summary>
+ /// Requests immediate shutdown of all servers created by the current process.
+ /// </summary>
+ public static Task KillServersAsync()
+ {
+ HashSet<Server> snapshot = null;
+ lock (staticLock)
+ {
+ snapshot = new HashSet<Server>(registeredServers);
+ }
+ return Task.WhenAll(snapshot.Select((server) => server.KillAsync()));
+ }
+
/// <summary>
/// Gets application-wide logger used by gRPC.
/// </summary>
@@ -141,39 +218,62 @@ namespace Grpc.Core
}
/// <summary>
+ /// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events.
+ /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
+ /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing.
+ /// Most users should rely on the default value provided by gRPC library.
+ /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
+ /// </summary>
+ public static void SetCompletionQueueCount(int completionQueueCount)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
+ GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number");
+ customCompletionQueueCount = completionQueueCount;
+ }
+ }
+
+ /// <summary>
/// Creates gRPC environment.
/// </summary>
private GrpcEnvironment()
{
GrpcNativeInit();
- completionRegistry = new CompletionRegistry(this);
- threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault());
+ threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault());
threadPool.Start();
}
/// <summary>
- /// Gets the completion registry used by this gRPC environment.
+ /// Gets the completion queues used by this gRPC environment.
/// </summary>
- internal CompletionRegistry CompletionRegistry
+ internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
- return this.completionRegistry;
+ return this.threadPool.CompletionQueues;
}
}
- /// <summary>
- /// Gets the completion queue used by this gRPC environment.
- /// </summary>
- internal CompletionQueueSafeHandle CompletionQueue
+ internal bool IsAlive
{
get
{
- return this.threadPool.CompletionQueue;
+ return this.threadPool.IsAlive;
}
}
/// <summary>
+ /// Picks a completion queue in a round-robin fashion.
+ /// Shouldn't be invoked on a per-call basis (used at per-channel basis).
+ /// </summary>
+ internal CompletionQueueSafeHandle PickCompletionQueue()
+ {
+ var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count);
+ return this.threadPool.CompletionQueues.ElementAt(cqIndex);
+ }
+
+ /// <summary>
/// Gets the completion queue used by this gRPC environment.
/// </summary>
internal DebugStats DebugStats
@@ -206,13 +306,13 @@ namespace Grpc.Core
/// <summary>
/// Shuts down this environment.
/// </summary>
- private void Close()
+ private async Task ShutdownAsync()
{
if (isClosed)
{
throw new InvalidOperationException("Close has already been called");
}
- threadPool.Stop();
+ await threadPool.StopAsync().ConfigureAwait(false);
GrpcNativeShutdown();
isClosed = true;
@@ -230,5 +330,46 @@ namespace Grpc.Core
// more work, but seems to work reasonably well for a start.
return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2);
}
+
+ private int GetCompletionQueueCountOrDefault()
+ {
+ if (customCompletionQueueCount.HasValue)
+ {
+ return customCompletionQueueCount.Value;
+ }
+ // by default, create a completion queue for each thread
+ return GetThreadPoolSizeOrDefault();
+ }
+
+ private static class ShutdownHooks
+ {
+ static object staticLock = new object();
+ static bool hooksRegistered;
+
+ public static void Register()
+ {
+ lock (staticLock)
+ {
+ if (!hooksRegistered)
+ {
+ // TODO(jtattermusch): register shutdownhooks for CoreCLR as well
+#if !NETSTANDARD1_5
+
+ AppDomain.CurrentDomain.ProcessExit += ShutdownHookHandler;
+ AppDomain.CurrentDomain.DomainUnload += ShutdownHookHandler;
+#endif
+ }
+ hooksRegistered = true;
+ }
+ }
+
+ /// <summary>
+ /// Handler for AppDomain.DomainUnload and AppDomain.ProcessExit hooks.
+ /// </summary>
+ private static void ShutdownHookHandler(object sender, EventArgs e)
+ {
+ Task.WaitAll(GrpcEnvironment.ShutdownChannelsAsync(), GrpcEnvironment.KillServersAsync());
+ }
+ }
}
}