diff options
Diffstat (limited to 'src/csharp/Grpc.Core')
-rw-r--r-- | src/csharp/Grpc.Core/Calls.cs | 16 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Channel.cs | 30 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Grpc.Core.csproj | 9 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Grpc.Core.nuspec | 8 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/GrpcEnvironment.cs | 110 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 9 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 9 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 30 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CompletionRegistry.cs | 12 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/DebugStats.cs | 35 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs | 8 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 26 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs | 14 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Properties/AssemblyInfo.cs | 1 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Server.cs | 17 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Version.cs | 6 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/packages.config | 2 |
17 files changed, 211 insertions, 131 deletions
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index 9f8baac684..750282258f 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -58,7 +58,7 @@ namespace Grpc.Core where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name); + asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name); var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers); RegisterCancellationCallback(asyncCall, token); return await asyncResult; @@ -69,7 +69,7 @@ namespace Grpc.Core where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name); + asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name); asyncCall.StartServerStreamingCall(req, call.Headers); RegisterCancellationCallback(asyncCall, token); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); @@ -81,7 +81,7 @@ namespace Grpc.Core where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name); + asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name); var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers); RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); @@ -93,7 +93,7 @@ namespace Grpc.Core where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name); + asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name); asyncCall.StartDuplexStreamingCall(call.Headers); RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); @@ -108,13 +108,5 @@ namespace Grpc.Core token.Register(() => asyncCall.Cancel()); } } - - /// <summary> - /// Gets shared completion queue used for async calls. - /// </summary> - private static CompletionQueueSafeHandle GetCompletionQueue() - { - return GrpcEnvironment.ThreadPool.CompletionQueue; - } } } diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index d6bfbb7bc4..5baf260003 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -42,8 +42,10 @@ namespace Grpc.Core /// </summary> public class Channel : IDisposable { + readonly GrpcEnvironment environment; readonly ChannelSafeHandle handle; readonly string target; + bool disposed; /// <summary> /// Creates a channel that connects to a specific host. @@ -54,6 +56,7 @@ namespace Grpc.Core /// <param name="options">Channel options.</param> public Channel(string host, Credentials credentials = null, IEnumerable<ChannelOption> options = null) { + this.environment = GrpcEnvironment.GetInstance(); using (ChannelArgsSafeHandle nativeChannelArgs = ChannelOptions.CreateChannelArgs(options)) { if (credentials != null) @@ -105,10 +108,35 @@ namespace Grpc.Core } } + internal CompletionQueueSafeHandle CompletionQueue + { + get + { + return this.environment.CompletionQueue; + } + } + + internal CompletionRegistry CompletionRegistry + { + get + { + return this.environment.CompletionRegistry; + } + } + + internal GrpcEnvironment Environment + { + get + { + return this.environment; + } + } + protected virtual void Dispose(bool disposing) { - if (handle != null && !handle.IsInvalid) + if (disposing && handle != null && !disposed) { + disposed = true; handle.Dispose(); } } diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index a36a6a5acc..cde42c3b7e 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -33,8 +33,9 @@ </PropertyGroup> <ItemGroup> <Reference Include="System" /> - <Reference Include="System.Collections.Immutable"> - <HintPath>..\packages\Microsoft.Bcl.Immutable.1.0.34\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> + <Reference Include="System.Collections.Immutable, Version=1.1.36.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL"> + <SpecificVersion>False</SpecificVersion> + <HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> </Reference> <Reference Include="System.Interactive.Async"> <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> @@ -48,6 +49,7 @@ <Compile Include="IAsyncStreamWriter.cs" /> <Compile Include="IAsyncStreamReader.cs" /> <Compile Include="Internal\GrpcLog.cs" /> + <Compile Include="Version.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="RpcException.cs" /> <Compile Include="Calls.cs" /> @@ -103,6 +105,7 @@ <Compile Include="ChannelOptions.cs" /> </ItemGroup> <ItemGroup> + <None Include="Grpc.Core.nuspec" /> <None Include="packages.config" /> </ItemGroup> <Choose> @@ -130,4 +133,4 @@ </Target> <Import Project="..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets" Condition="Exists('..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets')" /> <Import Project="..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets" Condition="Exists('..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets')" /> -</Project> +</Project>
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec index 629b978fdf..5ace6dcf89 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.nuspec +++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec @@ -5,19 +5,19 @@ <title>gRPC C# Core</title> <summary>Core C# implementation of gRPC - an RPC library and framework</summary> <description>Core C# implementation of gRPC - an RPC library and framework. See project site for more info.</description> - <version>0.5.1</version> + <version>$version$</version> <authors>Google Inc.</authors> <owners>grpc-packages</owners> <licenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</licenseUrl> <projectUrl>https://github.com/grpc/grpc</projectUrl> <requireLicenseAcceptance>false</requireLicenseAcceptance> - <releaseNotes>Release 0.5.1 of gRPC C#</releaseNotes> + <releaseNotes>Release $version$ of gRPC C#</releaseNotes> <copyright>Copyright 2015, Google Inc.</copyright> <tags>gRPC RPC Protocol HTTP/2</tags> <dependencies> - <dependency id="Microsoft.Bcl.Immutable" version="1.0.34" /> + <dependency id="System.Collections.Immutable" version="1.1.36" /> <dependency id="Ix-Async" version="1.2.3" /> - <dependency id="grpc.native.csharp_ext" version="0.9.1" /> + <dependency id="grpc.native.csharp_ext" version="$GrpcNativeCsharpExtVersion$" /> </dependencies> </metadata> <files> diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 30ff289714..47d1651aab 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -33,7 +33,9 @@ using System; using System.Runtime.InteropServices; +using System.Threading.Tasks; using Grpc.Core.Internal; +using Grpc.Core.Utils; namespace Grpc.Core { @@ -51,20 +53,18 @@ namespace Grpc.Core static extern void grpcsharp_shutdown(); static object staticLock = new object(); - static volatile GrpcEnvironment instance; + static GrpcEnvironment instance; readonly GrpcThreadPool threadPool; readonly CompletionRegistry completionRegistry; + readonly DebugStats debugStats = new DebugStats(); bool isClosed; /// <summary> - /// Makes sure GRPC environment is initialized. Subsequent invocations don't have any - /// effect unless you call Shutdown first. - /// Although normal use cases assume you will call this just once in your application's - /// lifetime (and call Shutdown once you're done), for the sake of easier testing it's - /// allowed to initialize the environment again after it has been successfully shutdown. + /// Returns an instance of initialized gRPC environment. + /// Subsequent invocations return the same instance unless Shutdown has been called first. /// </summary> - public static void Initialize() + internal static GrpcEnvironment GetInstance() { lock (staticLock) { @@ -72,12 +72,13 @@ namespace Grpc.Core { instance = new GrpcEnvironment(); } + return instance; } } /// <summary> - /// Shuts down the GRPC environment if it was initialized before. - /// Repeated invocations have no effect. + /// Shuts down the gRPC environment if it was initialized before. + /// Blocks until the environment has been fully shutdown. /// </summary> public static void Shutdown() { @@ -87,50 +88,55 @@ namespace Grpc.Core { instance.Close(); instance = null; - - CheckDebugStats(); } } } - internal static GrpcThreadPool ThreadPool + /// <summary> + /// Creates gRPC environment. + /// </summary> + private GrpcEnvironment() + { + GrpcLog.RedirectNativeLogs(Console.Error); + grpcsharp_init(); + completionRegistry = new CompletionRegistry(this); + threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE); + threadPool.Start(); + // TODO: use proper logging here + Console.WriteLine("GRPC initialized."); + } + + /// <summary> + /// Gets the completion registry used by this gRPC environment. + /// </summary> + internal CompletionRegistry CompletionRegistry { get { - var inst = instance; - if (inst == null) - { - throw new InvalidOperationException("GRPC environment not initialized"); - } - return inst.threadPool; + return this.completionRegistry; } } - internal static CompletionRegistry CompletionRegistry + /// <summary> + /// Gets the completion queue used by this gRPC environment. + /// </summary> + internal CompletionQueueSafeHandle CompletionQueue { get { - var inst = instance; - if (inst == null) - { - throw new InvalidOperationException("GRPC environment not initialized"); - } - return inst.completionRegistry; + return this.threadPool.CompletionQueue; } } /// <summary> - /// Creates gRPC environment. + /// Gets the completion queue used by this gRPC environment. /// </summary> - private GrpcEnvironment() + internal DebugStats DebugStats { - GrpcLog.RedirectNativeLogs(Console.Error); - grpcsharp_init(); - completionRegistry = new CompletionRegistry(); - threadPool = new GrpcThreadPool(THREAD_POOL_SIZE); - threadPool.Start(); - // TODO: use proper logging here - Console.WriteLine("GRPC initialized."); + get + { + return this.debugStats; + } } /// <summary> @@ -146,32 +152,28 @@ namespace Grpc.Core grpcsharp_shutdown(); isClosed = true; + debugStats.CheckOK(); + // TODO: use proper logging here Console.WriteLine("GRPC shutdown."); } - private static void CheckDebugStats() + /// <summary> + /// Shuts down this environment asynchronously. + /// </summary> + private Task CloseAsync() { - var remainingClientCalls = DebugStats.ActiveClientCalls.Count; - if (remainingClientCalls != 0) - { - DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls)); - } - var remainingServerCalls = DebugStats.ActiveServerCalls.Count; - if (remainingServerCalls != 0) - { - DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls)); - } - var pendingBatchCompletions = DebugStats.PendingBatchCompletions.Count; - if (pendingBatchCompletions != 0) + return Task.Run(() => { - DebugWarning(string.Format("Detected {0} pending batch completions.", pendingBatchCompletions)); - } - } - - private static void DebugWarning(string message) - { - throw new Exception("Shutdown check: " + message); + try + { + Close(); + } + catch (Exception e) + { + Console.WriteLine("Error occured while shutting down GrpcEnvironment: " + e); + } + }); } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index d350f45da6..24b75d1668 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -47,6 +47,8 @@ namespace Grpc.Core.Internal /// </summary> internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse> { + Channel channel; + // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; @@ -61,8 +63,9 @@ namespace Grpc.Core.Internal public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName) { - var call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture); - DebugStats.ActiveClientCalls.Increment(); + this.channel = channel; + var call = CallSafeHandle.Create(channel.Handle, channel.CompletionRegistry, cq, methodName, channel.Target, Timespec.InfFuture); + channel.Environment.DebugStats.ActiveClientCalls.Increment(); InitializeInternal(call); } @@ -277,7 +280,7 @@ namespace Grpc.Core.Internal protected override void OnReleaseResources() { - DebugStats.ActiveClientCalls.Decrement(); + channel.Environment.DebugStats.ActiveClientCalls.Decrement(); } /// <summary> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 4f510ba40a..309067ea9d 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -48,14 +48,17 @@ namespace Grpc.Core.Internal internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest> { readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); + readonly GrpcEnvironment environment; - public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer) + public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer) { + this.environment = Preconditions.CheckNotNull(environment); } public void Initialize(CallSafeHandle call) { - DebugStats.ActiveServerCalls.Increment(); + call.SetCompletionRegistry(environment.CompletionRegistry); + environment.DebugStats.ActiveServerCalls.Increment(); InitializeInternal(call); } @@ -114,7 +117,7 @@ namespace Grpc.Core.Internal protected override void OnReleaseResources() { - DebugStats.ActiveServerCalls.Decrement(); + environment.DebugStats.ActiveServerCalls.Decrement(); } /// <summary> diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index ef92b44402..3b246ac01b 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -43,6 +43,7 @@ namespace Grpc.Core.Internal internal class CallSafeHandle : SafeHandleZeroIsInvalid { const uint GRPC_WRITE_BUFFER_HINT = 1; + CompletionRegistry completionRegistry; [DllImport("grpc_csharp_ext.dll")] static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); @@ -97,15 +98,22 @@ namespace Grpc.Core.Internal { } - public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) + public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) { - return grpcsharp_channel_create_call(channel, cq, method, host, deadline); + var result = grpcsharp_channel_create_call(channel, cq, method, host, deadline); + result.SetCompletionRegistry(registry); + return result; + } + + public void SetCompletionRegistry(CompletionRegistry completionRegistry) + { + this.completionRegistry = completionRegistry; } public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, callback); grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray) .CheckOk(); } @@ -119,56 +127,56 @@ namespace Grpc.Core.Internal public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, callback); grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); } public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, callback); grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk(); } public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, callback); grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); } public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback) { var ctx = BatchContextSafeHandle.Create(); - GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, callback); grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk(); } public void StartSendCloseFromClient(BatchCompletionDelegate callback) { var ctx = BatchContextSafeHandle.Create(); - GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, callback); grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback) { var ctx = BatchContextSafeHandle.Create(); - GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, callback); grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk(); } public void StartReceiveMessage(BatchCompletionDelegate callback) { var ctx = BatchContextSafeHandle.Create(); - GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, callback); grpcsharp_call_recv_message(this, ctx).CheckOk(); } public void StartServerSide(BatchCompletionDelegate callback) { var ctx = BatchContextSafeHandle.Create(); - GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, callback); grpcsharp_call_start_serverside(this, ctx).CheckOk(); } diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs index 80f006ae50..f6d8aa0600 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs @@ -45,11 +45,17 @@ namespace Grpc.Core.Internal internal class CompletionRegistry { - readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>(); + readonly GrpcEnvironment environment; + readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>(); + + public CompletionRegistry(GrpcEnvironment environment) + { + this.environment = environment; + } public void Register(IntPtr key, OpCompletionDelegate callback) { - DebugStats.PendingBatchCompletions.Increment(); + environment.DebugStats.PendingBatchCompletions.Increment(); Preconditions.CheckState(dict.TryAdd(key, callback)); } @@ -63,7 +69,7 @@ namespace Grpc.Core.Internal { OpCompletionDelegate value; Preconditions.CheckState(dict.TryRemove(key, out value)); - DebugStats.PendingBatchCompletions.Decrement(); + environment.DebugStats.PendingBatchCompletions.Decrement(); return value; } diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs index ef9d9afe11..8793450ff3 100644 --- a/src/csharp/Grpc.Core/Internal/DebugStats.cs +++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs @@ -36,12 +36,39 @@ using System.Threading; namespace Grpc.Core.Internal { - internal static class DebugStats + internal class DebugStats { - public static readonly AtomicCounter ActiveClientCalls = new AtomicCounter(); + public readonly AtomicCounter ActiveClientCalls = new AtomicCounter(); - public static readonly AtomicCounter ActiveServerCalls = new AtomicCounter(); + public readonly AtomicCounter ActiveServerCalls = new AtomicCounter(); - public static readonly AtomicCounter PendingBatchCompletions = new AtomicCounter(); + public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter(); + + /// <summary> + /// Checks the debug stats and take action for any inconsistency found. + /// </summary> + public void CheckOK() + { + var remainingClientCalls = ActiveClientCalls.Count; + if (remainingClientCalls != 0) + { + DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls)); + } + var remainingServerCalls = ActiveServerCalls.Count; + if (remainingServerCalls != 0) + { + DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls)); + } + var pendingBatchCompletions = PendingBatchCompletions.Count; + if (pendingBatchCompletions != 0) + { + DebugWarning(string.Format("Detected {0} pending batch completions.", pendingBatchCompletions)); + } + } + + private void DebugWarning(string message) + { + throw new Exception("Shutdown check: " + message); + } } } diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index 89b44a4e2b..b77e893044 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -45,14 +45,16 @@ namespace Grpc.Core.Internal /// </summary> internal class GrpcThreadPool { + readonly GrpcEnvironment environment; readonly object myLock = new object(); readonly List<Thread> threads = new List<Thread>(); readonly int poolSize; CompletionQueueSafeHandle cq; - public GrpcThreadPool(int poolSize) + public GrpcThreadPool(GrpcEnvironment environment, int poolSize) { + this.environment = environment; this.poolSize = poolSize; } @@ -80,7 +82,7 @@ namespace Grpc.Core.Internal { cq.Shutdown(); - Console.WriteLine("Waiting for GPRC threads to finish."); + Console.WriteLine("Waiting for GRPC threads to finish."); foreach (var thread in threads) { thread.Join(); @@ -122,7 +124,7 @@ namespace Grpc.Core.Internal IntPtr tag = ev.tag; try { - var callback = GrpcEnvironment.CompletionRegistry.Extract(tag); + var callback = environment.CompletionRegistry.Extract(tag); callback(success); } catch (Exception e) diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index c0e5bae13f..594e46b159 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -42,7 +42,7 @@ namespace Grpc.Core.Internal { internal interface IServerCallHandler { - Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq); + Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment); } internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler @@ -58,11 +58,12 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, - method.RequestMarshaller.Deserializer); + method.RequestMarshaller.Deserializer, + environment); asyncCall.Initialize(call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -110,11 +111,12 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, - method.RequestMarshaller.Deserializer); + method.RequestMarshaller.Deserializer, + environment); asyncCall.Initialize(call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -163,11 +165,12 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, - method.RequestMarshaller.Deserializer); + method.RequestMarshaller.Deserializer, + environment); asyncCall.Initialize(call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -219,11 +222,12 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, - method.RequestMarshaller.Deserializer); + method.RequestMarshaller.Deserializer, + environment); asyncCall.Initialize(call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -255,11 +259,11 @@ namespace Grpc.Core.Internal internal class NoSuchMethodCallHandler : IServerCallHandler { - public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment) { // We don't care about the payload type here. var asyncCall = new AsyncCallServer<byte[], byte[]>( - (payload) => payload, (payload) => payload); + (payload) => payload, (payload) => payload, environment); asyncCall.Initialize(call); var finishedTask = asyncCall.ServerSideCallAsync(); diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 83dbb910aa..9e1170e6dd 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -91,19 +91,19 @@ namespace Grpc.Core.Internal { grpcsharp_server_start(this); } - - public void ShutdownAndNotify(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback) + + public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment) { var ctx = BatchContextSafeHandle.Create(); - GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_server_shutdown_and_notify_callback(this, cq, ctx); + environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx); } - public void RequestCall(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback) + public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment) { var ctx = BatchContextSafeHandle.Create(); - GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_server_request_call(this, cq, ctx).CheckOk(); + environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk(); } protected override bool ReleaseHandle() diff --git a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs index 03b682181a..2b3d7530f2 100644 --- a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs +++ b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs @@ -9,6 +9,5 @@ using System.Runtime.CompilerServices; [assembly: AssemblyCopyright("Google Inc. All rights reserved.")] [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] -[assembly: AssemblyVersion("0.5.*")] [assembly: InternalsVisibleTo("Grpc.Core.Tests")] diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 8e818885d1..cbf77196cf 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -52,6 +52,7 @@ namespace Grpc.Core /// </summary> public const int PickUnusedPort = 0; + readonly GrpcEnvironment environment; readonly ServerSafeHandle handle; readonly object myLock = new object(); @@ -67,9 +68,10 @@ namespace Grpc.Core /// <param name="options">Channel options.</param> public Server(IEnumerable<ChannelOption> options = null) { + this.environment = GrpcEnvironment.GetInstance(); using (var channelArgs = ChannelOptions.CreateChannelArgs(options)) { - this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), channelArgs); + this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs); } } @@ -144,7 +146,7 @@ namespace Grpc.Core shutdownRequested = true; } - handle.ShutdownAndNotify(GetCompletionQueue(), HandleServerShutdown); + handle.ShutdownAndNotify(HandleServerShutdown, environment); await shutdownTcs.Task; handle.Dispose(); } @@ -173,7 +175,7 @@ namespace Grpc.Core shutdownRequested = true; } - handle.ShutdownAndNotify(GetCompletionQueue(), HandleServerShutdown); + handle.ShutdownAndNotify(HandleServerShutdown, environment); handle.CancelAllCalls(); await shutdownTcs.Task; handle.Dispose(); @@ -208,7 +210,7 @@ namespace Grpc.Core { if (!shutdownRequested) { - handle.RequestCall(GetCompletionQueue(), HandleNewServerRpc); + handle.RequestCall(HandleNewServerRpc, environment); } } } @@ -225,7 +227,7 @@ namespace Grpc.Core { callHandler = new NoSuchMethodCallHandler(); } - await callHandler.HandleCall(method, call, GetCompletionQueue()); + await callHandler.HandleCall(method, call, environment); } catch (Exception e) { @@ -259,10 +261,5 @@ namespace Grpc.Core { shutdownTcs.SetResult(null); } - - private static CompletionQueueSafeHandle GetCompletionQueue() - { - return GrpcEnvironment.ThreadPool.CompletionQueue; - } } } diff --git a/src/csharp/Grpc.Core/Version.cs b/src/csharp/Grpc.Core/Version.cs new file mode 100644 index 0000000000..972f495bd7 --- /dev/null +++ b/src/csharp/Grpc.Core/Version.cs @@ -0,0 +1,6 @@ +using System.Reflection; +using System.Runtime.CompilerServices; + +// The current version of gRPC C#. +[assembly: AssemblyVersion("0.6.0.*")] + diff --git a/src/csharp/Grpc.Core/packages.config b/src/csharp/Grpc.Core/packages.config index fb7eaaeeda..6cdcdf2656 100644 --- a/src/csharp/Grpc.Core/packages.config +++ b/src/csharp/Grpc.Core/packages.config @@ -3,5 +3,5 @@ <package id="grpc.dependencies.openssl.redist" version="1.0.2.2" targetFramework="net45" /> <package id="grpc.dependencies.zlib.redist" version="1.2.8.9" targetFramework="net45" /> <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> - <package id="Microsoft.Bcl.Immutable" version="1.0.34" targetFramework="net45" /> + <package id="System.Collections.Immutable" version="1.1.36" targetFramework="net45" /> </packages>
\ No newline at end of file |