diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/CallSafeHandle.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 105 |
1 files changed, 70 insertions, 35 deletions
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index ad2e2919b7..69dbdfea5e 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -47,6 +47,7 @@ namespace Grpc.Core.Internal const uint GRPC_WRITE_BUFFER_HINT = 1; CompletionRegistry completionRegistry; + CompletionQueueSafeHandle completionQueue; [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); @@ -112,9 +113,10 @@ namespace Grpc.Core.Internal { } - public void SetCompletionRegistry(CompletionRegistry completionRegistry) + public void Initialize(CompletionRegistry completionRegistry, CompletionQueueSafeHandle completionQueue) { this.completionRegistry = completionRegistry; + this.completionQueue = completionQueue; } public void SetCredentials(CallCredentialsSafeHandle credentials) @@ -124,10 +126,13 @@ namespace Grpc.Core.Internal public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); - grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) - .CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); + grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) + .CheckOk(); + } } public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) @@ -141,72 +146,102 @@ namespace Grpc.Core.Internal public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); - grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); + grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); + } } public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); - grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); + grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk(); + } } public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); - grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); + grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); + } } public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); - grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); + } } public void StartSendCloseFromClient(SendCompletionHandler callback) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); - grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); + } } public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); - grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); + } } public void StartReceiveMessage(ReceivedMessageHandler callback) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage())); - grpcsharp_call_recv_message(this, ctx).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage())); + grpcsharp_call_recv_message(this, ctx).CheckOk(); + } } public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata())); - grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata())); + grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); + } } public void StartServerSide(ReceivedCloseOnServerHandler callback) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled())); - grpcsharp_call_start_serverside(this, ctx).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled())); + grpcsharp_call_start_serverside(this, ctx).CheckOk(); + } } public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); - grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); + } } public void Cancel() |