aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/csharp/Grpc.Core/Internal/AtomicCounter.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs37
2 files changed, 34 insertions, 5 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AtomicCounter.cs b/src/csharp/Grpc.Core/Internal/AtomicCounter.cs
index 64e16e4c54..20e25f9d88 100644
--- a/src/csharp/Grpc.Core/Internal/AtomicCounter.cs
+++ b/src/csharp/Grpc.Core/Internal/AtomicCounter.cs
@@ -64,7 +64,7 @@ namespace Grpc.Core.Internal
{
get
{
- return counter;
+ return Interlocked.Read(ref counter);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index 19b44c2618..3c94b602c0 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -33,8 +33,8 @@ namespace Grpc.Core.Internal
internal class GrpcThreadPool
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>();
- static readonly WaitCallback RunCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true));
- static readonly WaitCallback RunCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false));
+ const int FinishContinuationsSleepMillis = 10;
+ const int MaxFinishContinuationsSleepTotalMillis = 10000;
readonly GrpcEnvironment environment;
readonly object myLock = new object();
@@ -42,6 +42,9 @@ namespace Grpc.Core.Internal
readonly int poolSize;
readonly int completionQueueCount;
readonly bool inlineHandlers;
+ readonly WaitCallback runCompletionQueueEventCallbackSuccess;
+ readonly WaitCallback runCompletionQueueEventCallbackFailure;
+ readonly AtomicCounter queuedContinuationCounter = new AtomicCounter();
readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>(); // profilers assigned to threadpool threads
@@ -64,6 +67,9 @@ namespace Grpc.Core.Internal
this.inlineHandlers = inlineHandlers;
GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
"Thread pool size cannot be smaller than the number of completion queues used.");
+
+ this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true));
+ this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false));
}
public void Start()
@@ -173,7 +179,8 @@ namespace Grpc.Core.Internal
// Use cached delegates to avoid unnecessary allocations
if (!inlineHandlers)
{
- ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback);
+ queuedContinuationCounter.Increment();
+ ThreadPool.QueueUserWorkItem(success ? runCompletionQueueEventCallbackSuccess : runCompletionQueueEventCallbackFailure, callback);
}
else
{
@@ -187,6 +194,24 @@ namespace Grpc.Core.Internal
}
}
while (ev.type != CompletionQueueEvent.CompletionType.Shutdown);
+
+ // Continuations are running on default threadpool that consists of background threads.
+ // GrpcThreadPool thread (a foreground thread) will not exit unless all queued work had
+ // been finished to prevent terminating the continuations queued prematurely.
+ int sleepIterations = 0;
+ while (queuedContinuationCounter.Count != 0)
+ {
+ // Only happens on shutdown and having pending continuations shouldn't very common,
+ // so sleeping here for a little bit is fine.
+ if (sleepIterations >= MaxFinishContinuationsSleepTotalMillis / FinishContinuationsSleepMillis)
+ {
+ Logger.Warning("Shutting down gRPC thread [{0}] with unfinished callbacks (Timed out waiting for callbacks to finish).",
+ Thread.CurrentThread.Name);
+ break;
+ }
+ Thread.Sleep(FinishContinuationsSleepMillis);
+ sleepIterations ++;
+ }
}
private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount)
@@ -200,7 +225,7 @@ namespace Grpc.Core.Internal
return list.AsReadOnly();
}
- private static void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success)
+ private void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success)
{
try
{
@@ -210,6 +235,10 @@ namespace Grpc.Core.Internal
{
Logger.Error(e, "Exception occured while invoking completion delegate");
}
+ finally
+ {
+ queuedContinuationCounter.Decrement();
+ }
}
}
}