aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Internal')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs9
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs9
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs30
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionRegistry.cs12
-rw-r--r--src/csharp/Grpc.Core/Internal/DebugStats.cs35
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs26
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs14
8 files changed, 98 insertions, 45 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index d350f45da6..24b75d1668 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -47,6 +47,8 @@ namespace Grpc.Core.Internal
/// </summary>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
{
+ Channel channel;
+
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@@ -61,8 +63,9 @@ namespace Grpc.Core.Internal
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName)
{
- var call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture);
- DebugStats.ActiveClientCalls.Increment();
+ this.channel = channel;
+ var call = CallSafeHandle.Create(channel.Handle, channel.CompletionRegistry, cq, methodName, channel.Target, Timespec.InfFuture);
+ channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
}
@@ -277,7 +280,7 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
- DebugStats.ActiveClientCalls.Decrement();
+ channel.Environment.DebugStats.ActiveClientCalls.Decrement();
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 4f510ba40a..309067ea9d 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -48,14 +48,17 @@ namespace Grpc.Core.Internal
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
{
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
+ readonly GrpcEnvironment environment;
- public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer)
+ public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer)
{
+ this.environment = Preconditions.CheckNotNull(environment);
}
public void Initialize(CallSafeHandle call)
{
- DebugStats.ActiveServerCalls.Increment();
+ call.SetCompletionRegistry(environment.CompletionRegistry);
+ environment.DebugStats.ActiveServerCalls.Increment();
InitializeInternal(call);
}
@@ -114,7 +117,7 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
- DebugStats.ActiveServerCalls.Decrement();
+ environment.DebugStats.ActiveServerCalls.Decrement();
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index ef92b44402..3b246ac01b 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -43,6 +43,7 @@ namespace Grpc.Core.Internal
internal class CallSafeHandle : SafeHandleZeroIsInvalid
{
const uint GRPC_WRITE_BUFFER_HINT = 1;
+ CompletionRegistry completionRegistry;
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
@@ -97,15 +98,22 @@ namespace Grpc.Core.Internal
{
}
- public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
+ public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
- return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
+ var result = grpcsharp_channel_create_call(channel, cq, method, host, deadline);
+ result.SetCompletionRegistry(registry);
+ return result;
+ }
+
+ public void SetCompletionRegistry(CompletionRegistry completionRegistry)
+ {
+ this.completionRegistry = completionRegistry;
}
public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
.CheckOk();
}
@@ -119,56 +127,56 @@ namespace Grpc.Core.Internal
public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
}
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
}
public void StartSendCloseFromClient(BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_recv_message(this, ctx).CheckOk();
}
public void StartServerSide(BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
index 80f006ae50..f6d8aa0600 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
@@ -45,11 +45,17 @@ namespace Grpc.Core.Internal
internal class CompletionRegistry
{
- readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>();
+ readonly GrpcEnvironment environment;
+ readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>();
+
+ public CompletionRegistry(GrpcEnvironment environment)
+ {
+ this.environment = environment;
+ }
public void Register(IntPtr key, OpCompletionDelegate callback)
{
- DebugStats.PendingBatchCompletions.Increment();
+ environment.DebugStats.PendingBatchCompletions.Increment();
Preconditions.CheckState(dict.TryAdd(key, callback));
}
@@ -63,7 +69,7 @@ namespace Grpc.Core.Internal
{
OpCompletionDelegate value;
Preconditions.CheckState(dict.TryRemove(key, out value));
- DebugStats.PendingBatchCompletions.Decrement();
+ environment.DebugStats.PendingBatchCompletions.Decrement();
return value;
}
diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs
index ef9d9afe11..8793450ff3 100644
--- a/src/csharp/Grpc.Core/Internal/DebugStats.cs
+++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs
@@ -36,12 +36,39 @@ using System.Threading;
namespace Grpc.Core.Internal
{
- internal static class DebugStats
+ internal class DebugStats
{
- public static readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
+ public readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
- public static readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
+ public readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
- public static readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
+ public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
+
+ /// <summary>
+ /// Checks the debug stats and take action for any inconsistency found.
+ /// </summary>
+ public void CheckOK()
+ {
+ var remainingClientCalls = ActiveClientCalls.Count;
+ if (remainingClientCalls != 0)
+ {
+ DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
+ }
+ var remainingServerCalls = ActiveServerCalls.Count;
+ if (remainingServerCalls != 0)
+ {
+ DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
+ }
+ var pendingBatchCompletions = PendingBatchCompletions.Count;
+ if (pendingBatchCompletions != 0)
+ {
+ DebugWarning(string.Format("Detected {0} pending batch completions.", pendingBatchCompletions));
+ }
+ }
+
+ private void DebugWarning(string message)
+ {
+ throw new Exception("Shutdown check: " + message);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index 89b44a4e2b..b77e893044 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -45,14 +45,16 @@ namespace Grpc.Core.Internal
/// </summary>
internal class GrpcThreadPool
{
+ readonly GrpcEnvironment environment;
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
CompletionQueueSafeHandle cq;
- public GrpcThreadPool(int poolSize)
+ public GrpcThreadPool(GrpcEnvironment environment, int poolSize)
{
+ this.environment = environment;
this.poolSize = poolSize;
}
@@ -80,7 +82,7 @@ namespace Grpc.Core.Internal
{
cq.Shutdown();
- Console.WriteLine("Waiting for GPRC threads to finish.");
+ Console.WriteLine("Waiting for GRPC threads to finish.");
foreach (var thread in threads)
{
thread.Join();
@@ -122,7 +124,7 @@ namespace Grpc.Core.Internal
IntPtr tag = ev.tag;
try
{
- var callback = GrpcEnvironment.CompletionRegistry.Extract(tag);
+ var callback = environment.CompletionRegistry.Extract(tag);
callback(success);
}
catch (Exception e)
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index c0e5bae13f..594e46b159 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -42,7 +42,7 @@ namespace Grpc.Core.Internal
{
internal interface IServerCallHandler
{
- Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq);
+ Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment);
}
internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
@@ -58,11 +58,12 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
- method.RequestMarshaller.Deserializer);
+ method.RequestMarshaller.Deserializer,
+ environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -110,11 +111,12 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
- method.RequestMarshaller.Deserializer);
+ method.RequestMarshaller.Deserializer,
+ environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -163,11 +165,12 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
- method.RequestMarshaller.Deserializer);
+ method.RequestMarshaller.Deserializer,
+ environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -219,11 +222,12 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
- method.RequestMarshaller.Deserializer);
+ method.RequestMarshaller.Deserializer,
+ environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -255,11 +259,11 @@ namespace Grpc.Core.Internal
internal class NoSuchMethodCallHandler : IServerCallHandler
{
- public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
- (payload) => payload, (payload) => payload);
+ (payload) => payload, (payload) => payload, environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index 83dbb910aa..9e1170e6dd 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -91,19 +91,19 @@ namespace Grpc.Core.Internal
{
grpcsharp_server_start(this);
}
-
- public void ShutdownAndNotify(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
+
+ public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_server_shutdown_and_notify_callback(this, cq, ctx);
+ environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx);
}
- public void RequestCall(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
+ public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_server_request_call(this, cq, ctx).CheckOk();
+ environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk();
}
protected override bool ReleaseHandle()