aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Michael Lumish <mlumish@google.com>2015-07-14 16:13:17 -0700
committerGravatar Michael Lumish <mlumish@google.com>2015-07-14 16:13:17 -0700
commit177503c508a1df88de1d6fd8407764eaf78cbc8c (patch)
tree37d699e5a0360a7a271151c61b8a5a7012c390b6
parente53f593d142baec148d1f0355314a87ad175cdcb (diff)
parent04eb89ca26164cb545bc75e6d585ac396b580d84 (diff)
Merge pull request #2332 from jtattermusch/implicit_initialize_rebased
Get rid of explicit GrpcEnvironment.Initialize
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs6
-rw-r--r--src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs17
-rw-r--r--src/csharp/Grpc.Core.Tests/PInvokeTest.cs12
-rw-r--r--src/csharp/Grpc.Core.Tests/ServerTest.cs3
-rw-r--r--src/csharp/Grpc.Core/Calls.cs16
-rw-r--r--src/csharp/Grpc.Core/Channel.cs30
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs110
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs9
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs9
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs30
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionRegistry.cs12
-rw-r--r--src/csharp/Grpc.Core/Internal/DebugStats.cs35
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs26
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs14
-rw-r--r--src/csharp/Grpc.Core/Server.cs17
-rw-r--r--src/csharp/Grpc.Examples.MathClient/MathClient.cs2
-rw-r--r--src/csharp/Grpc.Examples.MathServer/MathServer.cs2
-rw-r--r--src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs3
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs3
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs3
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropServer.cs2
22 files changed, 202 insertions, 167 deletions
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index 21f94d3cf5..e797dd82f2 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -73,12 +73,6 @@ namespace Grpc.Core.Tests
Server server;
Channel channel;
- [TestFixtureSetUp]
- public void InitClass()
- {
- GrpcEnvironment.Initialize();
- }
-
[SetUp]
public void Init()
{
diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
index 6a132a5b22..9ae12776f3 100644
--- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
+++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
@@ -43,16 +43,17 @@ namespace Grpc.Core.Tests
[Test]
public void InitializeAndShutdownGrpcEnvironment()
{
- GrpcEnvironment.Initialize();
- Assert.IsNotNull(GrpcEnvironment.ThreadPool.CompletionQueue);
+ var env = GrpcEnvironment.GetInstance();
+ Assert.IsNotNull(env.CompletionQueue);
GrpcEnvironment.Shutdown();
}
[Test]
public void SubsequentInvocations()
{
- GrpcEnvironment.Initialize();
- GrpcEnvironment.Initialize();
+ var env1 = GrpcEnvironment.GetInstance();
+ var env2 = GrpcEnvironment.GetInstance();
+ Assert.IsTrue(object.ReferenceEquals(env1, env2));
GrpcEnvironment.Shutdown();
GrpcEnvironment.Shutdown();
}
@@ -60,15 +61,13 @@ namespace Grpc.Core.Tests
[Test]
public void InitializeAfterShutdown()
{
- GrpcEnvironment.Initialize();
- var tp1 = GrpcEnvironment.ThreadPool;
+ var env1 = GrpcEnvironment.GetInstance();
GrpcEnvironment.Shutdown();
- GrpcEnvironment.Initialize();
- var tp2 = GrpcEnvironment.ThreadPool;
+ var env2 = GrpcEnvironment.GetInstance();
GrpcEnvironment.Shutdown();
- Assert.IsFalse(object.ReferenceEquals(tp1, tp2));
+ Assert.IsFalse(object.ReferenceEquals(env1, env2));
}
}
}
diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
index 8b3c910251..714c2f7494 100644
--- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
+++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
@@ -53,18 +53,6 @@ namespace Grpc.Core.Tests
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_test_nop(IntPtr ptr);
- [TestFixtureSetUp]
- public void Init()
- {
- GrpcEnvironment.Initialize();
- }
-
- [TestFixtureTearDown]
- public void Cleanup()
- {
- GrpcEnvironment.Shutdown();
- }
-
/// <summary>
/// (~1.26us .NET Windows)
/// </summary>
diff --git a/src/csharp/Grpc.Core.Tests/ServerTest.cs b/src/csharp/Grpc.Core.Tests/ServerTest.cs
index 02c773c9cc..1119aa370e 100644
--- a/src/csharp/Grpc.Core.Tests/ServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ServerTest.cs
@@ -44,13 +44,10 @@ namespace Grpc.Core.Tests
[Test]
public void StartAndShutdownServer()
{
- GrpcEnvironment.Initialize();
-
Server server = new Server();
server.AddListeningPort("localhost", Server.PickUnusedPort);
server.Start();
server.ShutdownAsync().Wait();
-
GrpcEnvironment.Shutdown();
}
}
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs
index 9f8baac684..750282258f 100644
--- a/src/csharp/Grpc.Core/Calls.cs
+++ b/src/csharp/Grpc.Core/Calls.cs
@@ -58,7 +58,7 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
+ asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers);
RegisterCancellationCallback(asyncCall, token);
return await asyncResult;
@@ -69,7 +69,7 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
+ asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
asyncCall.StartServerStreamingCall(req, call.Headers);
RegisterCancellationCallback(asyncCall, token);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
@@ -81,7 +81,7 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
+ asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
@@ -93,7 +93,7 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
+ asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
asyncCall.StartDuplexStreamingCall(call.Headers);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
@@ -108,13 +108,5 @@ namespace Grpc.Core
token.Register(() => asyncCall.Cancel());
}
}
-
- /// <summary>
- /// Gets shared completion queue used for async calls.
- /// </summary>
- private static CompletionQueueSafeHandle GetCompletionQueue()
- {
- return GrpcEnvironment.ThreadPool.CompletionQueue;
- }
}
}
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index d6bfbb7bc4..5baf260003 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -42,8 +42,10 @@ namespace Grpc.Core
/// </summary>
public class Channel : IDisposable
{
+ readonly GrpcEnvironment environment;
readonly ChannelSafeHandle handle;
readonly string target;
+ bool disposed;
/// <summary>
/// Creates a channel that connects to a specific host.
@@ -54,6 +56,7 @@ namespace Grpc.Core
/// <param name="options">Channel options.</param>
public Channel(string host, Credentials credentials = null, IEnumerable<ChannelOption> options = null)
{
+ this.environment = GrpcEnvironment.GetInstance();
using (ChannelArgsSafeHandle nativeChannelArgs = ChannelOptions.CreateChannelArgs(options))
{
if (credentials != null)
@@ -105,10 +108,35 @@ namespace Grpc.Core
}
}
+ internal CompletionQueueSafeHandle CompletionQueue
+ {
+ get
+ {
+ return this.environment.CompletionQueue;
+ }
+ }
+
+ internal CompletionRegistry CompletionRegistry
+ {
+ get
+ {
+ return this.environment.CompletionRegistry;
+ }
+ }
+
+ internal GrpcEnvironment Environment
+ {
+ get
+ {
+ return this.environment;
+ }
+ }
+
protected virtual void Dispose(bool disposing)
{
- if (handle != null && !handle.IsInvalid)
+ if (disposing && handle != null && !disposed)
{
+ disposed = true;
handle.Dispose();
}
}
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 30ff289714..47d1651aab 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -33,7 +33,9 @@
using System;
using System.Runtime.InteropServices;
+using System.Threading.Tasks;
using Grpc.Core.Internal;
+using Grpc.Core.Utils;
namespace Grpc.Core
{
@@ -51,20 +53,18 @@ namespace Grpc.Core
static extern void grpcsharp_shutdown();
static object staticLock = new object();
- static volatile GrpcEnvironment instance;
+ static GrpcEnvironment instance;
readonly GrpcThreadPool threadPool;
readonly CompletionRegistry completionRegistry;
+ readonly DebugStats debugStats = new DebugStats();
bool isClosed;
/// <summary>
- /// Makes sure GRPC environment is initialized. Subsequent invocations don't have any
- /// effect unless you call Shutdown first.
- /// Although normal use cases assume you will call this just once in your application's
- /// lifetime (and call Shutdown once you're done), for the sake of easier testing it's
- /// allowed to initialize the environment again after it has been successfully shutdown.
+ /// Returns an instance of initialized gRPC environment.
+ /// Subsequent invocations return the same instance unless Shutdown has been called first.
/// </summary>
- public static void Initialize()
+ internal static GrpcEnvironment GetInstance()
{
lock (staticLock)
{
@@ -72,12 +72,13 @@ namespace Grpc.Core
{
instance = new GrpcEnvironment();
}
+ return instance;
}
}
/// <summary>
- /// Shuts down the GRPC environment if it was initialized before.
- /// Repeated invocations have no effect.
+ /// Shuts down the gRPC environment if it was initialized before.
+ /// Blocks until the environment has been fully shutdown.
/// </summary>
public static void Shutdown()
{
@@ -87,50 +88,55 @@ namespace Grpc.Core
{
instance.Close();
instance = null;
-
- CheckDebugStats();
}
}
}
- internal static GrpcThreadPool ThreadPool
+ /// <summary>
+ /// Creates gRPC environment.
+ /// </summary>
+ private GrpcEnvironment()
+ {
+ GrpcLog.RedirectNativeLogs(Console.Error);
+ grpcsharp_init();
+ completionRegistry = new CompletionRegistry(this);
+ threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE);
+ threadPool.Start();
+ // TODO: use proper logging here
+ Console.WriteLine("GRPC initialized.");
+ }
+
+ /// <summary>
+ /// Gets the completion registry used by this gRPC environment.
+ /// </summary>
+ internal CompletionRegistry CompletionRegistry
{
get
{
- var inst = instance;
- if (inst == null)
- {
- throw new InvalidOperationException("GRPC environment not initialized");
- }
- return inst.threadPool;
+ return this.completionRegistry;
}
}
- internal static CompletionRegistry CompletionRegistry
+ /// <summary>
+ /// Gets the completion queue used by this gRPC environment.
+ /// </summary>
+ internal CompletionQueueSafeHandle CompletionQueue
{
get
{
- var inst = instance;
- if (inst == null)
- {
- throw new InvalidOperationException("GRPC environment not initialized");
- }
- return inst.completionRegistry;
+ return this.threadPool.CompletionQueue;
}
}
/// <summary>
- /// Creates gRPC environment.
+ /// Gets the completion queue used by this gRPC environment.
/// </summary>
- private GrpcEnvironment()
+ internal DebugStats DebugStats
{
- GrpcLog.RedirectNativeLogs(Console.Error);
- grpcsharp_init();
- completionRegistry = new CompletionRegistry();
- threadPool = new GrpcThreadPool(THREAD_POOL_SIZE);
- threadPool.Start();
- // TODO: use proper logging here
- Console.WriteLine("GRPC initialized.");
+ get
+ {
+ return this.debugStats;
+ }
}
/// <summary>
@@ -146,32 +152,28 @@ namespace Grpc.Core
grpcsharp_shutdown();
isClosed = true;
+ debugStats.CheckOK();
+
// TODO: use proper logging here
Console.WriteLine("GRPC shutdown.");
}
- private static void CheckDebugStats()
+ /// <summary>
+ /// Shuts down this environment asynchronously.
+ /// </summary>
+ private Task CloseAsync()
{
- var remainingClientCalls = DebugStats.ActiveClientCalls.Count;
- if (remainingClientCalls != 0)
- {
- DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
- }
- var remainingServerCalls = DebugStats.ActiveServerCalls.Count;
- if (remainingServerCalls != 0)
- {
- DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
- }
- var pendingBatchCompletions = DebugStats.PendingBatchCompletions.Count;
- if (pendingBatchCompletions != 0)
+ return Task.Run(() =>
{
- DebugWarning(string.Format("Detected {0} pending batch completions.", pendingBatchCompletions));
- }
- }
-
- private static void DebugWarning(string message)
- {
- throw new Exception("Shutdown check: " + message);
+ try
+ {
+ Close();
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Error occured while shutting down GrpcEnvironment: " + e);
+ }
+ });
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index d350f45da6..24b75d1668 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -47,6 +47,8 @@ namespace Grpc.Core.Internal
/// </summary>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
{
+ Channel channel;
+
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@@ -61,8 +63,9 @@ namespace Grpc.Core.Internal
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName)
{
- var call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture);
- DebugStats.ActiveClientCalls.Increment();
+ this.channel = channel;
+ var call = CallSafeHandle.Create(channel.Handle, channel.CompletionRegistry, cq, methodName, channel.Target, Timespec.InfFuture);
+ channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
}
@@ -277,7 +280,7 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
- DebugStats.ActiveClientCalls.Decrement();
+ channel.Environment.DebugStats.ActiveClientCalls.Decrement();
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 4f510ba40a..309067ea9d 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -48,14 +48,17 @@ namespace Grpc.Core.Internal
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
{
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
+ readonly GrpcEnvironment environment;
- public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer)
+ public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer)
{
+ this.environment = Preconditions.CheckNotNull(environment);
}
public void Initialize(CallSafeHandle call)
{
- DebugStats.ActiveServerCalls.Increment();
+ call.SetCompletionRegistry(environment.CompletionRegistry);
+ environment.DebugStats.ActiveServerCalls.Increment();
InitializeInternal(call);
}
@@ -114,7 +117,7 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
- DebugStats.ActiveServerCalls.Decrement();
+ environment.DebugStats.ActiveServerCalls.Decrement();
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index ef92b44402..3b246ac01b 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -43,6 +43,7 @@ namespace Grpc.Core.Internal
internal class CallSafeHandle : SafeHandleZeroIsInvalid
{
const uint GRPC_WRITE_BUFFER_HINT = 1;
+ CompletionRegistry completionRegistry;
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
@@ -97,15 +98,22 @@ namespace Grpc.Core.Internal
{
}
- public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
+ public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
- return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
+ var result = grpcsharp_channel_create_call(channel, cq, method, host, deadline);
+ result.SetCompletionRegistry(registry);
+ return result;
+ }
+
+ public void SetCompletionRegistry(CompletionRegistry completionRegistry)
+ {
+ this.completionRegistry = completionRegistry;
}
public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
.CheckOk();
}
@@ -119,56 +127,56 @@ namespace Grpc.Core.Internal
public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
}
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
}
public void StartSendCloseFromClient(BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_recv_message(this, ctx).CheckOk();
}
public void StartServerSide(BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
index 80f006ae50..f6d8aa0600 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
@@ -45,11 +45,17 @@ namespace Grpc.Core.Internal
internal class CompletionRegistry
{
- readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>();
+ readonly GrpcEnvironment environment;
+ readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>();
+
+ public CompletionRegistry(GrpcEnvironment environment)
+ {
+ this.environment = environment;
+ }
public void Register(IntPtr key, OpCompletionDelegate callback)
{
- DebugStats.PendingBatchCompletions.Increment();
+ environment.DebugStats.PendingBatchCompletions.Increment();
Preconditions.CheckState(dict.TryAdd(key, callback));
}
@@ -63,7 +69,7 @@ namespace Grpc.Core.Internal
{
OpCompletionDelegate value;
Preconditions.CheckState(dict.TryRemove(key, out value));
- DebugStats.PendingBatchCompletions.Decrement();
+ environment.DebugStats.PendingBatchCompletions.Decrement();
return value;
}
diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs
index ef9d9afe11..8793450ff3 100644
--- a/src/csharp/Grpc.Core/Internal/DebugStats.cs
+++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs
@@ -36,12 +36,39 @@ using System.Threading;
namespace Grpc.Core.Internal
{
- internal static class DebugStats
+ internal class DebugStats
{
- public static readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
+ public readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
- public static readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
+ public readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
- public static readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
+ public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
+
+ /// <summary>
+ /// Checks the debug stats and take action for any inconsistency found.
+ /// </summary>
+ public void CheckOK()
+ {
+ var remainingClientCalls = ActiveClientCalls.Count;
+ if (remainingClientCalls != 0)
+ {
+ DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
+ }
+ var remainingServerCalls = ActiveServerCalls.Count;
+ if (remainingServerCalls != 0)
+ {
+ DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
+ }
+ var pendingBatchCompletions = PendingBatchCompletions.Count;
+ if (pendingBatchCompletions != 0)
+ {
+ DebugWarning(string.Format("Detected {0} pending batch completions.", pendingBatchCompletions));
+ }
+ }
+
+ private void DebugWarning(string message)
+ {
+ throw new Exception("Shutdown check: " + message);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index 89b44a4e2b..b77e893044 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -45,14 +45,16 @@ namespace Grpc.Core.Internal
/// </summary>
internal class GrpcThreadPool
{
+ readonly GrpcEnvironment environment;
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
CompletionQueueSafeHandle cq;
- public GrpcThreadPool(int poolSize)
+ public GrpcThreadPool(GrpcEnvironment environment, int poolSize)
{
+ this.environment = environment;
this.poolSize = poolSize;
}
@@ -80,7 +82,7 @@ namespace Grpc.Core.Internal
{
cq.Shutdown();
- Console.WriteLine("Waiting for GPRC threads to finish.");
+ Console.WriteLine("Waiting for GRPC threads to finish.");
foreach (var thread in threads)
{
thread.Join();
@@ -122,7 +124,7 @@ namespace Grpc.Core.Internal
IntPtr tag = ev.tag;
try
{
- var callback = GrpcEnvironment.CompletionRegistry.Extract(tag);
+ var callback = environment.CompletionRegistry.Extract(tag);
callback(success);
}
catch (Exception e)
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index c0e5bae13f..594e46b159 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -42,7 +42,7 @@ namespace Grpc.Core.Internal
{
internal interface IServerCallHandler
{
- Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq);
+ Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment);
}
internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
@@ -58,11 +58,12 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
- method.RequestMarshaller.Deserializer);
+ method.RequestMarshaller.Deserializer,
+ environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -110,11 +111,12 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
- method.RequestMarshaller.Deserializer);
+ method.RequestMarshaller.Deserializer,
+ environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -163,11 +165,12 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
- method.RequestMarshaller.Deserializer);
+ method.RequestMarshaller.Deserializer,
+ environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -219,11 +222,12 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
- method.RequestMarshaller.Deserializer);
+ method.RequestMarshaller.Deserializer,
+ environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -255,11 +259,11 @@ namespace Grpc.Core.Internal
internal class NoSuchMethodCallHandler : IServerCallHandler
{
- public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
- (payload) => payload, (payload) => payload);
+ (payload) => payload, (payload) => payload, environment);
asyncCall.Initialize(call);
var finishedTask = asyncCall.ServerSideCallAsync();
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index 83dbb910aa..9e1170e6dd 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -91,19 +91,19 @@ namespace Grpc.Core.Internal
{
grpcsharp_server_start(this);
}
-
- public void ShutdownAndNotify(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
+
+ public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_server_shutdown_and_notify_callback(this, cq, ctx);
+ environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx);
}
- public void RequestCall(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
+ public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment)
{
var ctx = BatchContextSafeHandle.Create();
- GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_server_request_call(this, cq, ctx).CheckOk();
+ environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk();
}
protected override bool ReleaseHandle()
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index 8e818885d1..cbf77196cf 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -52,6 +52,7 @@ namespace Grpc.Core
/// </summary>
public const int PickUnusedPort = 0;
+ readonly GrpcEnvironment environment;
readonly ServerSafeHandle handle;
readonly object myLock = new object();
@@ -67,9 +68,10 @@ namespace Grpc.Core
/// <param name="options">Channel options.</param>
public Server(IEnumerable<ChannelOption> options = null)
{
+ this.environment = GrpcEnvironment.GetInstance();
using (var channelArgs = ChannelOptions.CreateChannelArgs(options))
{
- this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), channelArgs);
+ this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs);
}
}
@@ -144,7 +146,7 @@ namespace Grpc.Core
shutdownRequested = true;
}
- handle.ShutdownAndNotify(GetCompletionQueue(), HandleServerShutdown);
+ handle.ShutdownAndNotify(HandleServerShutdown, environment);
await shutdownTcs.Task;
handle.Dispose();
}
@@ -173,7 +175,7 @@ namespace Grpc.Core
shutdownRequested = true;
}
- handle.ShutdownAndNotify(GetCompletionQueue(), HandleServerShutdown);
+ handle.ShutdownAndNotify(HandleServerShutdown, environment);
handle.CancelAllCalls();
await shutdownTcs.Task;
handle.Dispose();
@@ -208,7 +210,7 @@ namespace Grpc.Core
{
if (!shutdownRequested)
{
- handle.RequestCall(GetCompletionQueue(), HandleNewServerRpc);
+ handle.RequestCall(HandleNewServerRpc, environment);
}
}
}
@@ -225,7 +227,7 @@ namespace Grpc.Core
{
callHandler = new NoSuchMethodCallHandler();
}
- await callHandler.HandleCall(method, call, GetCompletionQueue());
+ await callHandler.HandleCall(method, call, environment);
}
catch (Exception e)
{
@@ -259,10 +261,5 @@ namespace Grpc.Core
{
shutdownTcs.SetResult(null);
}
-
- private static CompletionQueueSafeHandle GetCompletionQueue()
- {
- return GrpcEnvironment.ThreadPool.CompletionQueue;
- }
}
}
diff --git a/src/csharp/Grpc.Examples.MathClient/MathClient.cs b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
index 360fe928dd..b763721460 100644
--- a/src/csharp/Grpc.Examples.MathClient/MathClient.cs
+++ b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
@@ -39,8 +39,6 @@ namespace math
{
public static void Main(string[] args)
{
- GrpcEnvironment.Initialize();
-
using (Channel channel = new Channel("127.0.0.1", 23456))
{
Math.IMathClient stub = new Math.MathClient(channel);
diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
index d05e3f2808..f440985112 100644
--- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs
+++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
@@ -42,8 +42,6 @@ namespace math
{
string host = "0.0.0.0";
- GrpcEnvironment.Initialize();
-
Server server = new Server();
server.AddServiceDefinition(Math.BindService(new MathServiceImpl()));
int port = server.AddListeningPort(host, 23456);
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index aadd49f795..10dceb60aa 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -54,8 +54,6 @@ namespace math.Tests
[TestFixtureSetUp]
public void Init()
{
- GrpcEnvironment.Initialize();
-
server = new Server();
server.AddServiceDefinition(Math.BindService(new MathServiceImpl()));
int port = server.AddListeningPort(host, Server.PickUnusedPort);
@@ -75,7 +73,6 @@ namespace math.Tests
public void Cleanup()
{
channel.Dispose();
-
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index f0be522bc6..bdcb2c505c 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -102,8 +102,6 @@ namespace Grpc.IntegrationTesting
private void Run()
{
- GrpcEnvironment.Initialize();
-
Credentials credentials = null;
if (options.useTls)
{
@@ -135,7 +133,6 @@ namespace Grpc.IntegrationTesting
TestService.ITestServiceClient client = new TestService.TestServiceClient(channel, stubConfig);
RunTestCase(options.testCase, client);
}
-
GrpcEnvironment.Shutdown();
}
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
index 1a733450c1..6c2da9d2ee 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
@@ -55,8 +55,6 @@ namespace Grpc.IntegrationTesting
[TestFixtureSetUp]
public void Init()
{
- GrpcEnvironment.Initialize();
-
server = new Server();
server.AddServiceDefinition(TestService.BindService(new TestServiceImpl()));
int port = server.AddListeningPort(host, Server.PickUnusedPort, TestCredentials.CreateTestServerCredentials());
@@ -74,7 +72,6 @@ namespace Grpc.IntegrationTesting
public void Cleanup()
{
channel.Dispose();
-
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
index 87c3cbe1d4..9475e66c40 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
@@ -88,8 +88,6 @@ namespace Grpc.IntegrationTesting
private void Run()
{
- GrpcEnvironment.Initialize();
-
var server = new Server();
server.AddServiceDefinition(TestService.BindService(new TestServiceImpl()));