aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2016-06-01 14:08:26 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2016-06-06 15:04:05 -0700
commit739ee1b159cd0925cbc448c4b95728926f1a0e60 (patch)
tree49fa5c4485e4fabac8928c2c51191f3bed232ed4
parent63386a1064f9b27b0590c1e10f7176a45b0a3f36 (diff)
support GrpcEnvironment.KillServersAsync
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs40
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs15
-rw-r--r--src/csharp/Grpc.Core/Server.cs26
3 files changed, 80 insertions, 1 deletions
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index c25022a5d4..ceaa2ec439 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -55,6 +55,7 @@ namespace Grpc.Core
static int? customThreadPoolSize;
static int? customCompletionQueueCount;
static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
+ static readonly HashSet<Server> registeredServers = new HashSet<Server>();
static ILogger logger = new ConsoleLogger();
@@ -131,6 +132,24 @@ namespace Grpc.Core
}
}
+ internal static void RegisterServer(Server server)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(server);
+ registeredServers.Add(server);
+ }
+ }
+
+ internal static void UnregisterServer(Server server)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(server);
+ GrpcPreconditions.CheckArgument(registeredServers.Remove(server), "Server not found in the registered servers set.");
+ }
+ }
+
/// <summary>
/// Requests shutdown of all channels created by the current process.
/// </summary>
@@ -145,6 +164,19 @@ namespace Grpc.Core
}
/// <summary>
+ /// Requests immediate shutdown of all servers created by the current process.
+ /// </summary>
+ public static Task KillServersAsync()
+ {
+ HashSet<Server> snapshot = null;
+ lock (staticLock)
+ {
+ snapshot = new HashSet<Server>(registeredServers);
+ }
+ return Task.WhenAll(snapshot.Select((server) => server.KillAsync()));
+ }
+
+ /// <summary>
/// Gets application-wide logger used by gRPC.
/// </summary>
/// <value>The logger.</value>
@@ -220,6 +252,14 @@ namespace Grpc.Core
}
}
+ internal bool IsAlive
+ {
+ get
+ {
+ return this.threadPool.IsAlive;
+ }
+ }
+
/// <summary>
/// Picks a completion queue in a round-robin fashion.
/// Shouldn't be invoked on a per-call basis (used at per-channel basis).
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index 8643abf536..a446c1f99f 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -114,6 +114,21 @@ namespace Grpc.Core.Internal
});
}
+ /// <summary>
+ /// Returns true if there is at least one thread pool thread that hasn't
+ /// already stopped.
+ /// Threads can either stop because all completion queues shut down or
+ /// because all foreground threads have already shutdown and process is
+ /// going to exit.
+ /// </summary>
+ internal bool IsAlive
+ {
+ get
+ {
+ return threads.Any(t => t.ThreadState != ThreadState.Stopped);
+ }
+ }
+
internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index 88045a51c8..e3468ee842 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -86,6 +86,7 @@ namespace Grpc.Core
{
this.handle.RegisterCompletionQueue(cq);
}
+ GrpcEnvironment.RegisterServer(this);
}
/// <summary>
@@ -198,6 +199,7 @@ namespace Grpc.Core
GrpcPreconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
+ GrpcEnvironment.UnregisterServer(this);
var cq = environment.CompletionQueues.First(); // any cq will do
handle.ShutdownAndNotify(HandleServerShutdown, cq);
@@ -205,13 +207,35 @@ namespace Grpc.Core
{
handle.CancelAllCalls();
}
- await shutdownTcs.Task.ConfigureAwait(false);
+
+ await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false);
+
DisposeHandle();
await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
}
/// <summary>
+ /// In case the environment's threadpool becomes dead, the shutdown completion will
+ /// never be delivered, but we need to release the environment's handle anyway.
+ /// </summary>
+ private async Task ShutdownCompleteOrEnvironmentDeadAsync()
+ {
+ while (true)
+ {
+ var task = await Task.WhenAny(shutdownTcs.Task, Task.Delay(20)).ConfigureAwait(false);
+ if (shutdownTcs.Task == task)
+ {
+ return;
+ }
+ if (!environment.IsAlive)
+ {
+ return;
+ }
+ }
+ }
+
+ /// <summary>
/// Adds a service definition.
/// </summary>
private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition)