aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-06-09 17:47:56 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-06-09 17:47:56 -0700
commit9bfedd67e06c4bd4fa1855aabaf169461a3d6a54 (patch)
treee48bf5a1c3539393508c02b2b6e41052b4532411 /src/csharp/Grpc.Core/Internal
parent97f1454fc5ccdfc57e2f1296a52f1f83037cfd78 (diff)
parenta561ea66aeb460c6f33cfabff396d5ef2b748791 (diff)
Merge remote-tracking branch 'upstream/master' into you-complete-me-csharp
Diffstat (limited to 'src/csharp/Grpc.Core/Internal')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs35
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs43
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs (renamed from src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs)44
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs104
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs60
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs14
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionRegistry.cs88
-rw-r--r--src/csharp/Grpc.Core/Internal/DebugStats.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/Enums.cs40
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs20
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs27
-rw-r--r--src/csharp/Grpc.Core/Internal/Timespec.cs2
13 files changed, 337 insertions, 150 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 9bb918d53d..d350f45da6 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -47,9 +47,6 @@ namespace Grpc.Core.Internal
/// </summary>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
{
- readonly CompletionCallbackDelegate unaryResponseHandler;
- readonly CompletionCallbackDelegate finishedHandler;
-
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@@ -60,8 +57,6 @@ namespace Grpc.Core.Internal
public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
{
- this.unaryResponseHandler = CreateBatchCompletionCallback(HandleUnaryResponse);
- this.finishedHandler = CreateBatchCompletionCallback(HandleFinished);
}
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName)
@@ -96,7 +91,21 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.BlockingUnary(cq, payload, unaryResponseHandler, metadataArray);
+ using (var ctx = BatchContextSafeHandle.Create())
+ {
+ call.StartUnary(payload, ctx, metadataArray);
+ var ev = cq.Pluck(ctx.Handle);
+
+ bool success = (ev.success != 0);
+ try
+ {
+ HandleUnaryResponse(success, ctx);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking completion delegate: " + e);
+ }
+ }
}
try
@@ -129,7 +138,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.StartUnary(payload, unaryResponseHandler, metadataArray);
+ call.StartUnary(payload, HandleUnaryResponse, metadataArray);
}
return unaryResponseTcs.Task;
}
@@ -151,7 +160,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.StartClientStreaming(unaryResponseHandler, metadataArray);
+ call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
return unaryResponseTcs.Task;
@@ -175,7 +184,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.StartServerStreaming(payload, finishedHandler, metadataArray);
+ call.StartServerStreaming(payload, HandleFinished, metadataArray);
}
}
}
@@ -194,7 +203,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.StartDuplexStreaming(finishedHandler, metadataArray);
+ call.StartDuplexStreaming(HandleFinished, metadataArray);
}
}
}
@@ -229,7 +238,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendCloseFromClient(halfclosedHandler);
+ call.StartSendCloseFromClient(HandleHalfclosed);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
@@ -274,7 +283,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handler for unary response completion.
/// </summary>
- private void HandleUnaryResponse(bool success, BatchContextSafeHandleNotOwned ctx)
+ private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)
{
lock (myLock)
{
@@ -307,7 +316,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
- private void HandleFinished(bool success, BatchContextSafeHandleNotOwned ctx)
+ private void HandleFinished(bool success, BatchContextSafeHandle ctx)
{
var status = ctx.GetReceivedStatus();
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index b4f4edb17a..64713c8c52 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -51,13 +51,8 @@ namespace Grpc.Core.Internal
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
- protected readonly CompletionCallbackDelegate sendFinishedHandler;
- protected readonly CompletionCallbackDelegate readFinishedHandler;
- protected readonly CompletionCallbackDelegate halfclosedHandler;
-
protected readonly object myLock = new object();
- protected GCHandle gchandle;
protected CallSafeHandle call;
protected bool disposed;
@@ -77,10 +72,6 @@ namespace Grpc.Core.Internal
{
this.serializer = Preconditions.CheckNotNull(serializer);
this.deserializer = Preconditions.CheckNotNull(deserializer);
-
- this.sendFinishedHandler = CreateBatchCompletionCallback(HandleSendFinished);
- this.readFinishedHandler = CreateBatchCompletionCallback(HandleReadFinished);
- this.halfclosedHandler = CreateBatchCompletionCallback(HandleHalfclosed);
}
/// <summary>
@@ -121,9 +112,6 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
- // Make sure this object and the delegated held by it will not be garbage collected
- // before we release this handle.
- gchandle = GCHandle.Alloc(this);
this.call = call;
}
}
@@ -141,7 +129,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendMessage(payload, sendFinishedHandler);
+ call.StartSendMessage(payload, HandleSendFinished);
sendCompletionDelegate = completionDelegate;
}
}
@@ -157,7 +145,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckReadingAllowed();
- call.StartReceiveMessage(readFinishedHandler);
+ call.StartReceiveMessage(HandleReadFinished);
readCompletionDelegate = completionDelegate;
}
}
@@ -197,7 +185,6 @@ namespace Grpc.Core.Internal
{
call.Dispose();
}
- gchandle.Free();
disposed = true;
}
@@ -282,29 +269,9 @@ namespace Grpc.Core.Internal
}
/// <summary>
- /// Creates completion callback delegate that wraps the batch completion handler in a try catch block to
- /// prevent propagating exceptions accross managed/unmanaged boundary.
- /// </summary>
- protected CompletionCallbackDelegate CreateBatchCompletionCallback(Action<bool, BatchContextSafeHandleNotOwned> handler)
- {
- return new CompletionCallbackDelegate((success, batchContextPtr) =>
- {
- try
- {
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
- handler(success, ctx);
- }
- catch (Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
- });
- }
-
- /// <summary>
/// Handles send completion.
/// </summary>
- private void HandleSendFinished(bool success, BatchContextSafeHandleNotOwned ctx)
+ protected void HandleSendFinished(bool success, BatchContextSafeHandle ctx)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
@@ -328,7 +295,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles halfclose completion.
/// </summary>
- private void HandleHalfclosed(bool success, BatchContextSafeHandleNotOwned ctx)
+ protected void HandleHalfclosed(bool success, BatchContextSafeHandle ctx)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
@@ -353,7 +320,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles streaming read completion.
/// </summary>
- private void HandleReadFinished(bool success, BatchContextSafeHandleNotOwned ctx)
+ protected void HandleReadFinished(bool success, BatchContextSafeHandle ctx)
{
var payload = ctx.GetReceivedMessage();
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 1f0335e4e6..db1b86937f 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -47,12 +47,10 @@ namespace Grpc.Core.Internal
/// </summary>
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
{
- readonly CompletionCallbackDelegate finishedServersideHandler;
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer)
{
- this.finishedServersideHandler = CreateBatchCompletionCallback(HandleFinishedServerside);
}
public void Initialize(CallSafeHandle call)
@@ -72,7 +70,7 @@ namespace Grpc.Core.Internal
started = true;
- call.StartServerSide(finishedServersideHandler);
+ call.StartServerSide(HandleFinishedServerside);
return finishedServersideTcs.Task;
}
}
@@ -107,7 +105,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendStatusFromServer(status, halfclosedHandler);
+ call.StartSendStatusFromServer(status, HandleHalfclosed);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
}
@@ -121,7 +119,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles the server side close completion.
/// </summary>
- private void HandleFinishedServerside(bool success, BatchContextSafeHandleNotOwned ctx)
+ private void HandleFinishedServerside(bool success, BatchContextSafeHandle ctx)
{
bool cancelled = ctx.GetReceivedCloseOnServerCancelled();
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index b562abaa7a..861cbbe4c6 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -41,32 +41,50 @@ namespace Grpc.Core.Internal
/// Not owned version of
/// grpcsharp_batch_context
/// </summary>
- internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid
+ internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx);
+ static extern BatchContextSafeHandle grpcsharp_batch_context_create();
[DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen);
+ static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx);
+ static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen);
[DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char*
+ static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx);
+ static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandle ctx); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char*
+ static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandleNotOwned ctx);
+ static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandle ctx); // returns const char*
- public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false)
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandle ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern void grpcsharp_batch_context_destroy(IntPtr ctx);
+
+ private BatchContextSafeHandle()
+ {
+ }
+
+ public static BatchContextSafeHandle Create()
{
- SetHandle(handle);
+ return grpcsharp_batch_context_create();
+ }
+
+ public IntPtr Handle
+ {
+ get
+ {
+ return handle;
+ }
}
public Status GetReceivedStatus()
@@ -102,5 +120,11 @@ namespace Grpc.Core.Internal
{
return grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0;
}
+
+ protected override bool ReleaseHandle()
+ {
+ grpcsharp_batch_context_destroy(handle);
+ return true;
+ }
}
} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 491b8414ec..0651498f0e 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -37,8 +37,6 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
- internal delegate void CompletionCallbackDelegate(bool success, IntPtr batchContextPtr);
-
/// <summary>
/// grpc_call from <grpc/grpc.h>
/// </summary>
@@ -57,49 +55,40 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len,
- MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
+ MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage);
+ static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_destroy(IntPtr call);
@@ -113,64 +102,84 @@ namespace Grpc.Core.Internal
return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
}
- public void StartUnary(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
- AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ .CheckOk();
}
- public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray)
{
- grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray);
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ .CheckOk();
}
- public void StartClientStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
- AssertCallOk(grpcsharp_call_start_client_streaming(this, callback, metadataArray));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
- AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
}
- public void StartDuplexStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
- AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback, metadataArray));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)
+ public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong)payload.Length)));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
}
- public void StartSendCloseFromClient(CompletionCallbackDelegate callback)
+ public void StartSendCloseFromClient(BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_send_close_from_client(this, callback));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
- public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback)
+ public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk();
}
- public void StartReceiveMessage(CompletionCallbackDelegate callback)
+ public void StartReceiveMessage(BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_recv_message(this, callback));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_recv_message(this, ctx).CheckOk();
}
- public void StartServerSide(CompletionCallbackDelegate callback)
+ public void StartServerSide(BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_start_serverside(this, callback));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
public void Cancel()
{
- AssertCallOk(grpcsharp_call_cancel(this));
+ grpcsharp_call_cancel(this).CheckOk();
}
public void CancelWithStatus(Status status)
{
- AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail));
+ grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail).CheckOk();
}
protected override bool ReleaseHandle()
@@ -179,14 +188,11 @@ namespace Grpc.Core.Internal
return true;
}
- private static void AssertCallOk(GRPCCallError callError)
- {
- Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
- }
-
private static uint GetFlags(bool buffered)
{
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
}
+
+
}
} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
new file mode 100644
index 0000000000..3f517514a3
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
@@ -0,0 +1,60 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Runtime.InteropServices;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// grpc_event from grpc/grpc.h
+ /// </summary>
+ [StructLayout(LayoutKind.Sequential)]
+ internal struct CompletionQueueEvent
+ {
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern int grpcsharp_sizeof_grpc_event();
+
+ public GRPCCompletionType type;
+ public int success;
+ public IntPtr tag;
+
+ internal static int NativeSize
+ {
+ get
+ {
+ return grpcsharp_sizeof_grpc_event();
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
index 600d1fc87c..f64f3d4175 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
@@ -46,7 +46,10 @@ namespace Grpc.Core.Internal
static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCompletionType grpcsharp_completion_queue_next_with_callback(CompletionQueueSafeHandle cq);
+ static extern CompletionQueueEvent grpcsharp_completion_queue_next(CompletionQueueSafeHandle cq);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern CompletionQueueEvent grpcsharp_completion_queue_pluck(CompletionQueueSafeHandle cq, IntPtr tag);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_completion_queue_destroy(IntPtr cq);
@@ -60,9 +63,14 @@ namespace Grpc.Core.Internal
return grpcsharp_completion_queue_create();
}
- public GRPCCompletionType NextWithCallback()
+ public CompletionQueueEvent Next()
+ {
+ return grpcsharp_completion_queue_next(this);
+ }
+
+ public CompletionQueueEvent Pluck(IntPtr tag)
{
- return grpcsharp_completion_queue_next_with_callback(this);
+ return grpcsharp_completion_queue_pluck(this, tag);
}
public void Shutdown()
diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
new file mode 100644
index 0000000000..118aa13c5a
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
@@ -0,0 +1,88 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Runtime.InteropServices;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+ internal delegate void OpCompletionDelegate(bool success);
+ internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx);
+
+ internal class CompletionRegistry
+ {
+ readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>();
+
+ public void Register(IntPtr key, OpCompletionDelegate callback)
+ {
+ DebugStats.PendingBatchCompletions.Increment();
+ Preconditions.CheckState(dict.TryAdd(key, callback));
+ }
+
+ public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
+ {
+ OpCompletionDelegate opCallback = ((success) => HandleBatchCompletion(success, ctx, callback));
+ Register(ctx.Handle, opCallback);
+ }
+
+ public OpCompletionDelegate Extract(IntPtr key)
+ {
+ OpCompletionDelegate value;
+ Preconditions.CheckState(dict.TryRemove(key, out value));
+ DebugStats.PendingBatchCompletions.Decrement();
+ return value;
+ }
+
+ private static void HandleBatchCompletion(bool success, BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
+ {
+ try
+ {
+ callback(success, ctx);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking completion delegate: " + e);
+ }
+ finally
+ {
+ if (ctx != null)
+ {
+ ctx.Dispose();
+ }
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs
index 476914f751..ef9d9afe11 100644
--- a/src/csharp/Grpc.Core/Internal/DebugStats.cs
+++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs
@@ -41,5 +41,7 @@ namespace Grpc.Core.Internal
public static readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
public static readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
+
+ public static readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/Enums.cs b/src/csharp/Grpc.Core/Internal/Enums.cs
index 2b4f6cae0c..af11b5b9f3 100644
--- a/src/csharp/Grpc.Core/Internal/Enums.cs
+++ b/src/csharp/Grpc.Core/Internal/Enums.cs
@@ -33,35 +33,47 @@
using System;
using System.Runtime.InteropServices;
+using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
- /// from grpc/grpc.h
+ /// grpc_call_error from grpc/grpc.h
/// </summary>
internal enum GRPCCallError
{
/* everything went ok */
- GRPC_CALL_OK = 0,
+ OK = 0,
/* something failed, we don't know what */
- GRPC_CALL_ERROR,
+ Error,
/* this method is not available on the server */
- GRPC_CALL_ERROR_NOT_ON_SERVER,
+ NotOnServer,
/* this method is not available on the client */
- GRPC_CALL_ERROR_NOT_ON_CLIENT,
+ NotOnClient,
/* this method must be called before server_accept */
- GRPC_CALL_ERROR_ALREADY_ACCEPTED,
+ AlreadyAccepted,
/* this method must be called before invoke */
- GRPC_CALL_ERROR_ALREADY_INVOKED,
+ AlreadyInvoked,
/* this method must be called after invoke */
- GRPC_CALL_ERROR_NOT_INVOKED,
+ NotInvoked,
/* this call is already finished
(writes_done or write_status has already been called) */
- GRPC_CALL_ERROR_ALREADY_FINISHED,
+ AlreadyFinished,
/* there is already an outstanding read/write operation on the call */
- GRPC_CALL_ERROR_TOO_MANY_OPERATIONS,
+ TooManyOperations,
/* the flags value was illegal for this call */
- GRPC_CALL_ERROR_INVALID_FLAGS
+ InvalidFlags
+ }
+
+ internal static class CallErrorExtensions
+ {
+ /// <summary>
+ /// Checks the call API invocation's result is OK.
+ /// </summary>
+ public static void CheckOk(this GRPCCallError callError)
+ {
+ Preconditions.CheckState(callError == GRPCCallError.OK, "Call error: " + callError);
+ }
}
/// <summary>
@@ -70,12 +82,12 @@ namespace Grpc.Core.Internal
internal enum GRPCCompletionType
{
/* Shutting down */
- GRPC_QUEUE_SHUTDOWN,
+ Shutdown,
/* No event before timeout */
- GRPC_QUEUE_TIMEOUT,
+ Timeout,
/* operation completion */
- GRPC_OP_COMPLETE
+ OpComplete
}
}
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index f4224668f1..89b44a4e2b 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -112,12 +112,26 @@ namespace Grpc.Core.Internal
/// </summary>
private void RunHandlerLoop()
{
- GRPCCompletionType completionType;
+ CompletionQueueEvent ev;
do
{
- completionType = cq.NextWithCallback();
+ ev = cq.Next();
+ if (ev.type == GRPCCompletionType.OpComplete)
+ {
+ bool success = (ev.success != 0);
+ IntPtr tag = ev.tag;
+ try
+ {
+ var callback = GrpcEnvironment.CompletionRegistry.Extract(tag);
+ callback(success);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking completion delegate: " + e);
+ }
+ }
}
- while (completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN);
+ while (ev.type != GRPCCompletionType.Shutdown);
Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting.");
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index c44ee87bad..eda9afcb87 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -45,9 +45,6 @@ namespace Grpc.Core.Internal
internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
-
- [DllImport("grpc_csharp_ext.dll")]
static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
[DllImport("grpc_csharp_ext.dll")]
@@ -60,12 +57,15 @@ namespace Grpc.Core.Internal
static extern void grpcsharp_server_start(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_server_shutdown_and_notify_callback(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_cancel_all_calls(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
+ static extern void grpcsharp_server_shutdown_and_notify_callback(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_destroy(IntPtr server);
private ServerSafeHandle()
@@ -91,15 +91,19 @@ namespace Grpc.Core.Internal
{
grpcsharp_server_start(this);
}
-
- public void ShutdownAndNotify(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
+
+ public void ShutdownAndNotify(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
- grpcsharp_server_shutdown_and_notify_callback(this, cq, callback);
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_server_shutdown_and_notify_callback(this, cq, ctx);
}
- public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
+ public void RequestCall(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_server_request_call(this, cq, callback));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_server_request_call(this, cq, ctx).CheckOk();
}
protected override bool ReleaseHandle()
@@ -113,10 +117,5 @@ namespace Grpc.Core.Internal
{
grpcsharp_server_cancel_all_calls(this);
}
-
- private static void AssertCallOk(GRPCCallError callError)
- {
- Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
- }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs
index 94d48c2c49..775af27db9 100644
--- a/src/csharp/Grpc.Core/Internal/Timespec.cs
+++ b/src/csharp/Grpc.Core/Internal/Timespec.cs
@@ -51,7 +51,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern int gprsharp_sizeof_timespec();
- // TODO: revisit this.
+
// NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8
// so IntPtr seems to have the right size to work on both.
public System.IntPtr tv_sec;