diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-07-05 21:08:33 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-07-05 21:08:33 -0700 |
commit | 27166a6f65581a9ee820882e5d7506d6bd07ade6 (patch) | |
tree | 0e89c5e09a955b2a074a8b34e0f3fe0a4135f346 /src/csharp/Grpc.Core/Internal | |
parent | 4efb6966bdfb62c725c6614b0d85ea374250bb51 (diff) | |
parent | d1dd3a68a2d4af56f1409327c197590dac6968cb (diff) |
Merge github.com:grpc/grpc into flow-like-lava-to-a-barnyard
Conflicts:
src/core/surface/call.c
src/core/transport/chttp2_transport.c
src/core/transport/transport.h
Diffstat (limited to 'src/csharp/Grpc.Core/Internal')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 35 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 43 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 9 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs (renamed from src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs) | 44 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 102 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs | 8 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs | 60 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs | 14 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CompletionRegistry.cs | 89 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/DebugStats.cs | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/Enums.cs | 40 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs | 20 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs | 31 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/Timespec.cs | 2 |
15 files changed, 347 insertions, 154 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 9bb918d53d..d350f45da6 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -47,9 +47,6 @@ namespace Grpc.Core.Internal /// </summary> internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse> { - readonly CompletionCallbackDelegate unaryResponseHandler; - readonly CompletionCallbackDelegate finishedHandler; - // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; @@ -60,8 +57,6 @@ namespace Grpc.Core.Internal public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer) { - this.unaryResponseHandler = CreateBatchCompletionCallback(HandleUnaryResponse); - this.finishedHandler = CreateBatchCompletionCallback(HandleFinished); } public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName) @@ -96,7 +91,21 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { - call.BlockingUnary(cq, payload, unaryResponseHandler, metadataArray); + using (var ctx = BatchContextSafeHandle.Create()) + { + call.StartUnary(payload, ctx, metadataArray); + var ev = cq.Pluck(ctx.Handle); + + bool success = (ev.success != 0); + try + { + HandleUnaryResponse(success, ctx); + } + catch (Exception e) + { + Console.WriteLine("Exception occured while invoking completion delegate: " + e); + } + } } try @@ -129,7 +138,7 @@ namespace Grpc.Core.Internal unaryResponseTcs = new TaskCompletionSource<TResponse>(); using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { - call.StartUnary(payload, unaryResponseHandler, metadataArray); + call.StartUnary(payload, HandleUnaryResponse, metadataArray); } return unaryResponseTcs.Task; } @@ -151,7 +160,7 @@ namespace Grpc.Core.Internal unaryResponseTcs = new TaskCompletionSource<TResponse>(); using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { - call.StartClientStreaming(unaryResponseHandler, metadataArray); + call.StartClientStreaming(HandleUnaryResponse, metadataArray); } return unaryResponseTcs.Task; @@ -175,7 +184,7 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { - call.StartServerStreaming(payload, finishedHandler, metadataArray); + call.StartServerStreaming(payload, HandleFinished, metadataArray); } } } @@ -194,7 +203,7 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { - call.StartDuplexStreaming(finishedHandler, metadataArray); + call.StartDuplexStreaming(HandleFinished, metadataArray); } } } @@ -229,7 +238,7 @@ namespace Grpc.Core.Internal Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(); - call.StartSendCloseFromClient(halfclosedHandler); + call.StartSendCloseFromClient(HandleHalfclosed); halfcloseRequested = true; sendCompletionDelegate = completionDelegate; @@ -274,7 +283,7 @@ namespace Grpc.Core.Internal /// <summary> /// Handler for unary response completion. /// </summary> - private void HandleUnaryResponse(bool success, BatchContextSafeHandleNotOwned ctx) + private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx) { lock (myLock) { @@ -307,7 +316,7 @@ namespace Grpc.Core.Internal /// <summary> /// Handles receive status completion for calls with streaming response. /// </summary> - private void HandleFinished(bool success, BatchContextSafeHandleNotOwned ctx) + private void HandleFinished(bool success, BatchContextSafeHandle ctx) { var status = ctx.GetReceivedStatus(); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index b4f4edb17a..64713c8c52 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -51,13 +51,8 @@ namespace Grpc.Core.Internal readonly Func<TWrite, byte[]> serializer; readonly Func<byte[], TRead> deserializer; - protected readonly CompletionCallbackDelegate sendFinishedHandler; - protected readonly CompletionCallbackDelegate readFinishedHandler; - protected readonly CompletionCallbackDelegate halfclosedHandler; - protected readonly object myLock = new object(); - protected GCHandle gchandle; protected CallSafeHandle call; protected bool disposed; @@ -77,10 +72,6 @@ namespace Grpc.Core.Internal { this.serializer = Preconditions.CheckNotNull(serializer); this.deserializer = Preconditions.CheckNotNull(deserializer); - - this.sendFinishedHandler = CreateBatchCompletionCallback(HandleSendFinished); - this.readFinishedHandler = CreateBatchCompletionCallback(HandleReadFinished); - this.halfclosedHandler = CreateBatchCompletionCallback(HandleHalfclosed); } /// <summary> @@ -121,9 +112,6 @@ namespace Grpc.Core.Internal { lock (myLock) { - // Make sure this object and the delegated held by it will not be garbage collected - // before we release this handle. - gchandle = GCHandle.Alloc(this); this.call = call; } } @@ -141,7 +129,7 @@ namespace Grpc.Core.Internal Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(); - call.StartSendMessage(payload, sendFinishedHandler); + call.StartSendMessage(payload, HandleSendFinished); sendCompletionDelegate = completionDelegate; } } @@ -157,7 +145,7 @@ namespace Grpc.Core.Internal Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckReadingAllowed(); - call.StartReceiveMessage(readFinishedHandler); + call.StartReceiveMessage(HandleReadFinished); readCompletionDelegate = completionDelegate; } } @@ -197,7 +185,6 @@ namespace Grpc.Core.Internal { call.Dispose(); } - gchandle.Free(); disposed = true; } @@ -282,29 +269,9 @@ namespace Grpc.Core.Internal } /// <summary> - /// Creates completion callback delegate that wraps the batch completion handler in a try catch block to - /// prevent propagating exceptions accross managed/unmanaged boundary. - /// </summary> - protected CompletionCallbackDelegate CreateBatchCompletionCallback(Action<bool, BatchContextSafeHandleNotOwned> handler) - { - return new CompletionCallbackDelegate((success, batchContextPtr) => - { - try - { - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - handler(success, ctx); - } - catch (Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); - } - }); - } - - /// <summary> /// Handles send completion. /// </summary> - private void HandleSendFinished(bool success, BatchContextSafeHandleNotOwned ctx) + protected void HandleSendFinished(bool success, BatchContextSafeHandle ctx) { AsyncCompletionDelegate<object> origCompletionDelegate = null; lock (myLock) @@ -328,7 +295,7 @@ namespace Grpc.Core.Internal /// <summary> /// Handles halfclose completion. /// </summary> - private void HandleHalfclosed(bool success, BatchContextSafeHandleNotOwned ctx) + protected void HandleHalfclosed(bool success, BatchContextSafeHandle ctx) { AsyncCompletionDelegate<object> origCompletionDelegate = null; lock (myLock) @@ -353,7 +320,7 @@ namespace Grpc.Core.Internal /// <summary> /// Handles streaming read completion. /// </summary> - private void HandleReadFinished(bool success, BatchContextSafeHandleNotOwned ctx) + protected void HandleReadFinished(bool success, BatchContextSafeHandle ctx) { var payload = ctx.GetReceivedMessage(); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 1f0335e4e6..4f510ba40a 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -47,12 +47,10 @@ namespace Grpc.Core.Internal /// </summary> internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest> { - readonly CompletionCallbackDelegate finishedServersideHandler; readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer) { - this.finishedServersideHandler = CreateBatchCompletionCallback(HandleFinishedServerside); } public void Initialize(CallSafeHandle call) @@ -72,7 +70,7 @@ namespace Grpc.Core.Internal started = true; - call.StartServerSide(finishedServersideHandler); + call.StartServerSide(HandleFinishedServerside); return finishedServersideTcs.Task; } } @@ -107,8 +105,9 @@ namespace Grpc.Core.Internal Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(); - call.StartSendStatusFromServer(status, halfclosedHandler); + call.StartSendStatusFromServer(status, HandleHalfclosed); halfcloseRequested = true; + readingDone = true; sendCompletionDelegate = completionDelegate; } } @@ -121,7 +120,7 @@ namespace Grpc.Core.Internal /// <summary> /// Handles the server side close completion. /// </summary> - private void HandleFinishedServerside(bool success, BatchContextSafeHandleNotOwned ctx) + private void HandleFinishedServerside(bool success, BatchContextSafeHandle ctx) { bool cancelled = ctx.GetReceivedCloseOnServerCancelled(); diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index b562abaa7a..861cbbe4c6 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -41,32 +41,50 @@ namespace Grpc.Core.Internal /// Not owned version of /// grpcsharp_batch_context /// </summary> - internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid + internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid { [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx); + static extern BatchContextSafeHandle grpcsharp_batch_context_create(); [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen); + static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] - static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx); + static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen); [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char* + static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx); + static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandle ctx); // returns const char* [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char* + static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] - static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandleNotOwned ctx); + static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandle ctx); // returns const char* - public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false) + [DllImport("grpc_csharp_ext.dll")] + static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandle ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_batch_context_destroy(IntPtr ctx); + + private BatchContextSafeHandle() + { + } + + public static BatchContextSafeHandle Create() { - SetHandle(handle); + return grpcsharp_batch_context_create(); + } + + public IntPtr Handle + { + get + { + return handle; + } } public Status GetReceivedStatus() @@ -102,5 +120,11 @@ namespace Grpc.Core.Internal { return grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0; } + + protected override bool ReleaseHandle() + { + grpcsharp_batch_context_destroy(handle); + return true; + } } }
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 491b8414ec..ef92b44402 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -37,8 +37,6 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { - internal delegate void CompletionCallbackDelegate(bool success, IntPtr batchContextPtr); - /// <summary> /// grpc_call from <grpc/grpc.h> /// </summary> @@ -57,49 +55,40 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray); - - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray); + BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - MetadataArraySafeHandle metadataArray); + BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - byte[] send_buffer, UIntPtr send_buffer_len, - MetadataArraySafeHandle metadataArray); + BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, + MetadataArraySafeHandle metadataArray); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - MetadataArraySafeHandle metadataArray); + BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - byte[] send_buffer, UIntPtr send_buffer_len); + BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage); + static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_call_destroy(IntPtr call); @@ -113,64 +102,84 @@ namespace Grpc.Core.Internal return grpcsharp_channel_create_call(channel, cq, method, host, deadline); } - public void StartUnary(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) { - AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray)); + var ctx = BatchContextSafeHandle.Create(); + GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray) + .CheckOk(); } - public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray) { - grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray); + grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray) + .CheckOk(); } - public void StartClientStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) { - AssertCallOk(grpcsharp_call_start_client_streaming(this, callback, metadataArray)); + var ctx = BatchContextSafeHandle.Create(); + GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) { - AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray)); + var ctx = BatchContextSafeHandle.Create(); + GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk(); } - public void StartDuplexStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) { - AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback, metadataArray)); + var ctx = BatchContextSafeHandle.Create(); + GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) + public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback) { - AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong)payload.Length))); + var ctx = BatchContextSafeHandle.Create(); + GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk(); } - public void StartSendCloseFromClient(CompletionCallbackDelegate callback) + public void StartSendCloseFromClient(BatchCompletionDelegate callback) { - AssertCallOk(grpcsharp_call_send_close_from_client(this, callback)); + var ctx = BatchContextSafeHandle.Create(); + GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } - public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback) + public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback) { - AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail)); + var ctx = BatchContextSafeHandle.Create(); + GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk(); } - public void StartReceiveMessage(CompletionCallbackDelegate callback) + public void StartReceiveMessage(BatchCompletionDelegate callback) { - AssertCallOk(grpcsharp_call_recv_message(this, callback)); + var ctx = BatchContextSafeHandle.Create(); + GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_call_recv_message(this, ctx).CheckOk(); } - public void StartServerSide(CompletionCallbackDelegate callback) + public void StartServerSide(BatchCompletionDelegate callback) { - AssertCallOk(grpcsharp_call_start_serverside(this, callback)); + var ctx = BatchContextSafeHandle.Create(); + GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_call_start_serverside(this, ctx).CheckOk(); } public void Cancel() { - AssertCallOk(grpcsharp_call_cancel(this)); + grpcsharp_call_cancel(this).CheckOk(); } public void CancelWithStatus(Status status) { - AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail)); + grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail).CheckOk(); } protected override bool ReleaseHandle() @@ -179,11 +188,6 @@ namespace Grpc.Core.Internal return true; } - private static void AssertCallOk(GRPCCallError callError) - { - Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); - } - private static uint GetFlags(bool buffered) { return buffered ? 0 : GRPC_WRITE_BUFFER_HINT; diff --git a/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs index c69f1a0d02..c12aec5a3a 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs @@ -45,6 +45,9 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)] static extern void grpcsharp_channel_args_set_string(ChannelArgsSafeHandle args, UIntPtr index, string key, string value); + [DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)] + static extern void grpcsharp_channel_args_set_integer(ChannelArgsSafeHandle args, UIntPtr index, string key, int value); + [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_channel_args_destroy(IntPtr args); @@ -67,6 +70,11 @@ namespace Grpc.Core.Internal grpcsharp_channel_args_set_string(this, new UIntPtr((uint)index), key, value); } + public void SetInteger(int index, string key, int value) + { + grpcsharp_channel_args_set_integer(this, new UIntPtr((uint)index), key, value); + } + protected override bool ReleaseHandle() { grpcsharp_channel_args_destroy(handle); diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs new file mode 100644 index 0000000000..3f517514a3 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs @@ -0,0 +1,60 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// grpc_event from grpc/grpc.h + /// </summary> + [StructLayout(LayoutKind.Sequential)] + internal struct CompletionQueueEvent + { + [DllImport("grpc_csharp_ext.dll")] + static extern int grpcsharp_sizeof_grpc_event(); + + public GRPCCompletionType type; + public int success; + public IntPtr tag; + + internal static int NativeSize + { + get + { + return grpcsharp_sizeof_grpc_event(); + } + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs index 600d1fc87c..f64f3d4175 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs @@ -46,7 +46,10 @@ namespace Grpc.Core.Internal static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCompletionType grpcsharp_completion_queue_next_with_callback(CompletionQueueSafeHandle cq); + static extern CompletionQueueEvent grpcsharp_completion_queue_next(CompletionQueueSafeHandle cq); + + [DllImport("grpc_csharp_ext.dll")] + static extern CompletionQueueEvent grpcsharp_completion_queue_pluck(CompletionQueueSafeHandle cq, IntPtr tag); [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_completion_queue_destroy(IntPtr cq); @@ -60,9 +63,14 @@ namespace Grpc.Core.Internal return grpcsharp_completion_queue_create(); } - public GRPCCompletionType NextWithCallback() + public CompletionQueueEvent Next() + { + return grpcsharp_completion_queue_next(this); + } + + public CompletionQueueEvent Pluck(IntPtr tag) { - return grpcsharp_completion_queue_next_with_callback(this); + return grpcsharp_completion_queue_pluck(this, tag); } public void Shutdown() diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs new file mode 100644 index 0000000000..80f006ae50 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs @@ -0,0 +1,89 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + internal delegate void OpCompletionDelegate(bool success); + + internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx); + + internal class CompletionRegistry + { + readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>(); + + public void Register(IntPtr key, OpCompletionDelegate callback) + { + DebugStats.PendingBatchCompletions.Increment(); + Preconditions.CheckState(dict.TryAdd(key, callback)); + } + + public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback) + { + OpCompletionDelegate opCallback = ((success) => HandleBatchCompletion(success, ctx, callback)); + Register(ctx.Handle, opCallback); + } + + public OpCompletionDelegate Extract(IntPtr key) + { + OpCompletionDelegate value; + Preconditions.CheckState(dict.TryRemove(key, out value)); + DebugStats.PendingBatchCompletions.Decrement(); + return value; + } + + private static void HandleBatchCompletion(bool success, BatchContextSafeHandle ctx, BatchCompletionDelegate callback) + { + try + { + callback(success, ctx); + } + catch (Exception e) + { + Console.WriteLine("Exception occured while invoking completion delegate: " + e); + } + finally + { + if (ctx != null) + { + ctx.Dispose(); + } + } + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs index 476914f751..ef9d9afe11 100644 --- a/src/csharp/Grpc.Core/Internal/DebugStats.cs +++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs @@ -41,5 +41,7 @@ namespace Grpc.Core.Internal public static readonly AtomicCounter ActiveClientCalls = new AtomicCounter(); public static readonly AtomicCounter ActiveServerCalls = new AtomicCounter(); + + public static readonly AtomicCounter PendingBatchCompletions = new AtomicCounter(); } } diff --git a/src/csharp/Grpc.Core/Internal/Enums.cs b/src/csharp/Grpc.Core/Internal/Enums.cs index 2b4f6cae0c..af11b5b9f3 100644 --- a/src/csharp/Grpc.Core/Internal/Enums.cs +++ b/src/csharp/Grpc.Core/Internal/Enums.cs @@ -33,35 +33,47 @@ using System; using System.Runtime.InteropServices; +using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// <summary> - /// from grpc/grpc.h + /// grpc_call_error from grpc/grpc.h /// </summary> internal enum GRPCCallError { /* everything went ok */ - GRPC_CALL_OK = 0, + OK = 0, /* something failed, we don't know what */ - GRPC_CALL_ERROR, + Error, /* this method is not available on the server */ - GRPC_CALL_ERROR_NOT_ON_SERVER, + NotOnServer, /* this method is not available on the client */ - GRPC_CALL_ERROR_NOT_ON_CLIENT, + NotOnClient, /* this method must be called before server_accept */ - GRPC_CALL_ERROR_ALREADY_ACCEPTED, + AlreadyAccepted, /* this method must be called before invoke */ - GRPC_CALL_ERROR_ALREADY_INVOKED, + AlreadyInvoked, /* this method must be called after invoke */ - GRPC_CALL_ERROR_NOT_INVOKED, + NotInvoked, /* this call is already finished (writes_done or write_status has already been called) */ - GRPC_CALL_ERROR_ALREADY_FINISHED, + AlreadyFinished, /* there is already an outstanding read/write operation on the call */ - GRPC_CALL_ERROR_TOO_MANY_OPERATIONS, + TooManyOperations, /* the flags value was illegal for this call */ - GRPC_CALL_ERROR_INVALID_FLAGS + InvalidFlags + } + + internal static class CallErrorExtensions + { + /// <summary> + /// Checks the call API invocation's result is OK. + /// </summary> + public static void CheckOk(this GRPCCallError callError) + { + Preconditions.CheckState(callError == GRPCCallError.OK, "Call error: " + callError); + } } /// <summary> @@ -70,12 +82,12 @@ namespace Grpc.Core.Internal internal enum GRPCCompletionType { /* Shutting down */ - GRPC_QUEUE_SHUTDOWN, + Shutdown, /* No event before timeout */ - GRPC_QUEUE_TIMEOUT, + Timeout, /* operation completion */ - GRPC_OP_COMPLETE + OpComplete } } diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index f4224668f1..89b44a4e2b 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -112,12 +112,26 @@ namespace Grpc.Core.Internal /// </summary> private void RunHandlerLoop() { - GRPCCompletionType completionType; + CompletionQueueEvent ev; do { - completionType = cq.NextWithCallback(); + ev = cq.Next(); + if (ev.type == GRPCCompletionType.OpComplete) + { + bool success = (ev.success != 0); + IntPtr tag = ev.tag; + try + { + var callback = GrpcEnvironment.CompletionRegistry.Extract(tag); + callback(success); + } + catch (Exception e) + { + Console.WriteLine("Exception occured while invoking completion delegate: " + e); + } + } } - while (completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN); + while (ev.type != GRPCCompletionType.Shutdown); Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting."); } } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index f494d9e0ff..c0e5bae13f 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -267,8 +267,6 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method.")); - // TODO(jtattermusch): if we don't read what client has sent, the server call never gets disposed. - await requestStream.ToList(); await finishedTask; } } diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index c44ee87bad..83dbb910aa 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -45,10 +45,7 @@ namespace Grpc.Core.Internal internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid { [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); - - [DllImport("grpc_csharp_ext.dll")] - static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); + static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args); [DllImport("grpc_csharp_ext.dll")] static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr); @@ -60,19 +57,22 @@ namespace Grpc.Core.Internal static extern void grpcsharp_server_start(ServerSafeHandle server); [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_server_shutdown_and_notify_callback(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_server_cancel_all_calls(ServerSafeHandle server); [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_server_shutdown_and_notify_callback(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx); + + [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_server_destroy(IntPtr server); private ServerSafeHandle() { } - public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, IntPtr args) + public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args) { return grpcsharp_server_create(cq, args); } @@ -91,15 +91,19 @@ namespace Grpc.Core.Internal { grpcsharp_server_start(this); } - - public void ShutdownAndNotify(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) + + public void ShutdownAndNotify(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback) { - grpcsharp_server_shutdown_and_notify_callback(this, cq, callback); + var ctx = BatchContextSafeHandle.Create(); + GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_server_shutdown_and_notify_callback(this, cq, ctx); } - public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) + public void RequestCall(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback) { - AssertCallOk(grpcsharp_server_request_call(this, cq, callback)); + var ctx = BatchContextSafeHandle.Create(); + GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_server_request_call(this, cq, ctx).CheckOk(); } protected override bool ReleaseHandle() @@ -113,10 +117,5 @@ namespace Grpc.Core.Internal { grpcsharp_server_cancel_all_calls(this); } - - private static void AssertCallOk(GRPCCallError callError) - { - Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); - } } } diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs index 94d48c2c49..775af27db9 100644 --- a/src/csharp/Grpc.Core/Internal/Timespec.cs +++ b/src/csharp/Grpc.Core/Internal/Timespec.cs @@ -51,7 +51,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern int gprsharp_sizeof_timespec(); - // TODO: revisit this. + // NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8 // so IntPtr seems to have the right size to work on both. public System.IntPtr tv_sec; |