diff options
author | Jan Tattermusch <jtattermusch@google.com> | 2016-06-01 14:08:26 -0700 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@google.com> | 2016-06-06 15:04:05 -0700 |
commit | 739ee1b159cd0925cbc448c4b95728926f1a0e60 (patch) | |
tree | 49fa5c4485e4fabac8928c2c51191f3bed232ed4 | |
parent | 63386a1064f9b27b0590c1e10f7176a45b0a3f36 (diff) |
support GrpcEnvironment.KillServersAsync
-rw-r--r-- | src/csharp/Grpc.Core/GrpcEnvironment.cs | 40 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs | 15 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Server.cs | 26 |
3 files changed, 80 insertions, 1 deletions
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index c25022a5d4..ceaa2ec439 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -55,6 +55,7 @@ namespace Grpc.Core static int? customThreadPoolSize; static int? customCompletionQueueCount; static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>(); + static readonly HashSet<Server> registeredServers = new HashSet<Server>(); static ILogger logger = new ConsoleLogger(); @@ -131,6 +132,24 @@ namespace Grpc.Core } } + internal static void RegisterServer(Server server) + { + lock (staticLock) + { + GrpcPreconditions.CheckNotNull(server); + registeredServers.Add(server); + } + } + + internal static void UnregisterServer(Server server) + { + lock (staticLock) + { + GrpcPreconditions.CheckNotNull(server); + GrpcPreconditions.CheckArgument(registeredServers.Remove(server), "Server not found in the registered servers set."); + } + } + /// <summary> /// Requests shutdown of all channels created by the current process. /// </summary> @@ -145,6 +164,19 @@ namespace Grpc.Core } /// <summary> + /// Requests immediate shutdown of all servers created by the current process. + /// </summary> + public static Task KillServersAsync() + { + HashSet<Server> snapshot = null; + lock (staticLock) + { + snapshot = new HashSet<Server>(registeredServers); + } + return Task.WhenAll(snapshot.Select((server) => server.KillAsync())); + } + + /// <summary> /// Gets application-wide logger used by gRPC. /// </summary> /// <value>The logger.</value> @@ -220,6 +252,14 @@ namespace Grpc.Core } } + internal bool IsAlive + { + get + { + return this.threadPool.IsAlive; + } + } + /// <summary> /// Picks a completion queue in a round-robin fashion. /// Shouldn't be invoked on a per-call basis (used at per-channel basis). diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index 8643abf536..a446c1f99f 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -114,6 +114,21 @@ namespace Grpc.Core.Internal }); } + /// <summary> + /// Returns true if there is at least one thread pool thread that hasn't + /// already stopped. + /// Threads can either stop because all completion queues shut down or + /// because all foreground threads have already shutdown and process is + /// going to exit. + /// </summary> + internal bool IsAlive + { + get + { + return threads.Any(t => t.ThreadState != ThreadState.Stopped); + } + } + internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues { get diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 88045a51c8..e3468ee842 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -86,6 +86,7 @@ namespace Grpc.Core { this.handle.RegisterCompletionQueue(cq); } + GrpcEnvironment.RegisterServer(this); } /// <summary> @@ -198,6 +199,7 @@ namespace Grpc.Core GrpcPreconditions.CheckState(!shutdownRequested); shutdownRequested = true; } + GrpcEnvironment.UnregisterServer(this); var cq = environment.CompletionQueues.First(); // any cq will do handle.ShutdownAndNotify(HandleServerShutdown, cq); @@ -205,13 +207,35 @@ namespace Grpc.Core { handle.CancelAllCalls(); } - await shutdownTcs.Task.ConfigureAwait(false); + + await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false); + DisposeHandle(); await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false); } /// <summary> + /// In case the environment's threadpool becomes dead, the shutdown completion will + /// never be delivered, but we need to release the environment's handle anyway. + /// </summary> + private async Task ShutdownCompleteOrEnvironmentDeadAsync() + { + while (true) + { + var task = await Task.WhenAny(shutdownTcs.Task, Task.Delay(20)).ConfigureAwait(false); + if (shutdownTcs.Task == task) + { + return; + } + if (!environment.IsAlive) + { + return; + } + } + } + + /// <summary> /// Adds a service definition. /// </summary> private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition) |