diff options
Diffstat (limited to 'src/csharp')
-rw-r--r-- | src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs | 28 | ||||
-rw-r--r-- | src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs | 106 | ||||
-rw-r--r-- | src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs | 58 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 37 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 20 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 22 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 8 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/INativeCall.cs | 27 | ||||
-rw-r--r-- | src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs | 12 |
9 files changed, 191 insertions, 127 deletions
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs index 9488ce29e9..41d7ee4075 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -64,7 +64,7 @@ namespace Grpc.Core.Internal.Tests public void CancelNotificationAfterStartDisposes() { var finishedTask = asyncCallServer.ServerSideCallAsync(); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -76,8 +76,8 @@ namespace Grpc.Core.Internal.Tests var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); - fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); Assert.IsFalse(moveNextTask.Result); AssertFinished(asyncCallServer, fakeCall, finishedTask); @@ -89,7 +89,7 @@ namespace Grpc.Core.Internal.Tests var finishedTask = asyncCallServer.ServerSideCallAsync(); var requestStream = new ServerRequestStream<string, string>(asyncCallServer); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true); // Check that starting a read after cancel notification has been processed is legal. var moveNextTask = requestStream.MoveNext(); @@ -107,10 +107,10 @@ namespace Grpc.Core.Internal.Tests // if a read completion's success==false, the request stream will silently finish // and we rely on C core cancelling the call. var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(false, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null); Assert.IsFalse(moveNextTask.Result); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -120,7 +120,7 @@ namespace Grpc.Core.Internal.Tests var finishedTask = asyncCallServer.ServerSideCallAsync(); var responseStream = new ServerResponseStream<string, string>(asyncCallServer); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true); // TODO(jtattermusch): should we throw a different exception type instead? Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1")); @@ -134,10 +134,10 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ServerResponseStream<string, string>(asyncCallServer); var writeTask = responseStream.WriteAsync("request1"); - fakeCall.SendCompletionHandler(false); + fakeCall.SendCompletionCallback.OnSendCompletion(false); Assert.ThrowsAsync(typeof(IOException), async () => await writeTask); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -150,13 +150,13 @@ namespace Grpc.Core.Internal.Tests var writeTask = responseStream.WriteAsync("request1"); var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null); - fakeCall.SendCompletionHandler(true); - fakeCall.SendStatusFromServerHandler(true); + fakeCall.SendCompletionCallback.OnSendCompletion(true); + fakeCall.SendStatusFromServerCallback.OnSendStatusFromServerCompletion(true); Assert.DoesNotThrowAsync(async () => await writeTask); Assert.DoesNotThrowAsync(async () => await writeStatusTask); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -170,8 +170,8 @@ namespace Grpc.Core.Internal.Tests asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null); Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await responseStream.WriteAsync("request1")); - fakeCall.SendStatusFromServerHandler(true); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.SendStatusFromServerCallback.OnSendStatusFromServerCompletion(true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); } diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index b2b49f3a48..9aab54d2d0 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -73,7 +73,7 @@ namespace Grpc.Core.Internal.Tests public void AsyncUnary_Success() { var resultTask = asyncCall.UnaryCallAsync("request1"); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), CreateResponsePayload(), new Metadata()); @@ -85,7 +85,7 @@ namespace Grpc.Core.Internal.Tests public void AsyncUnary_NonSuccessStatusCode() { var resultTask = asyncCall.UnaryCallAsync("request1"); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.InvalidArgument), null, new Metadata()); @@ -97,7 +97,7 @@ namespace Grpc.Core.Internal.Tests public void AsyncUnary_NullResponsePayload() { var resultTask = asyncCall.UnaryCallAsync("request1"); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), null, new Metadata()); @@ -118,7 +118,7 @@ namespace Grpc.Core.Internal.Tests public void ClientStreaming_NoRequest_Success() { var resultTask = asyncCall.ClientStreamingCallAsync(); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), CreateResponsePayload(), new Metadata()); @@ -130,7 +130,7 @@ namespace Grpc.Core.Internal.Tests public void ClientStreaming_NoRequest_NonSuccessStatusCode() { var resultTask = asyncCall.ClientStreamingCallAsync(); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.InvalidArgument), null, new Metadata()); @@ -145,18 +145,18 @@ namespace Grpc.Core.Internal.Tests var requestStream = new ClientRequestStream<string, string>(asyncCall); var writeTask = requestStream.WriteAsync("request1"); - fakeCall.SendCompletionHandler(true); + fakeCall.SendCompletionCallback.OnSendCompletion(true); writeTask.Wait(); var writeTask2 = requestStream.WriteAsync("request2"); - fakeCall.SendCompletionHandler(true); + fakeCall.SendCompletionCallback.OnSendCompletion(true); writeTask2.Wait(); var completeTask = requestStream.CompleteAsync(); - fakeCall.SendCompletionHandler(true); + fakeCall.SendCompletionCallback.OnSendCompletion(true); completeTask.Wait(); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), CreateResponsePayload(), new Metadata()); @@ -171,12 +171,12 @@ namespace Grpc.Core.Internal.Tests var requestStream = new ClientRequestStream<string, string>(asyncCall); var writeTask = requestStream.WriteAsync("request1"); - fakeCall.SendCompletionHandler(false); + fakeCall.SendCompletionCallback.OnSendCompletion(false); // The write will wait for call to finish to receive the status code. Assert.IsFalse(writeTask.IsCompleted); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.Internal), null, new Metadata()); @@ -195,12 +195,12 @@ namespace Grpc.Core.Internal.Tests var writeTask = requestStream.WriteAsync("request1"); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.Internal), null, new Metadata()); - fakeCall.SendCompletionHandler(false); + fakeCall.SendCompletionCallback.OnSendCompletion(false); var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode); @@ -215,13 +215,13 @@ namespace Grpc.Core.Internal.Tests var requestStream = new ClientRequestStream<string, string>(asyncCall); var writeTask = requestStream.WriteAsync("request1"); - fakeCall.SendCompletionHandler(false); + fakeCall.SendCompletionCallback.OnSendCompletion(false); // Until the delayed write completion has been triggered, // we still act as if there was an active write. Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request2")); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.Internal), null, new Metadata()); @@ -242,7 +242,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream<string, string>(asyncCall); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), CreateResponsePayload(), new Metadata()); @@ -260,7 +260,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream<string, string>(asyncCall); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()), CreateResponsePayload(), new Metadata()); @@ -282,9 +282,9 @@ namespace Grpc.Core.Internal.Tests Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1")); - fakeCall.SendCompletionHandler(true); + fakeCall.SendCompletionCallback.OnSendCompletion(true); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), CreateResponsePayload(), new Metadata()); @@ -298,7 +298,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream<string, string>(asyncCall); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), CreateResponsePayload(), new Metadata()); @@ -319,7 +319,7 @@ namespace Grpc.Core.Internal.Tests var writeTask = requestStream.WriteAsync("request1"); Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.Cancelled), null, new Metadata()); @@ -342,11 +342,11 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedResponseHeadersHandler(true, new Metadata()); + fakeCall.ReceivedResponseHeadersCallback.OnReceivedResponseHeaders(true, new Metadata()); Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); } @@ -359,8 +359,8 @@ namespace Grpc.Core.Internal.Tests var readTask = responseStream.MoveNext(); // try alternative order of completions - fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); - fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); } @@ -372,8 +372,8 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(false, null); // after a failed read, we rely on C core to deliver appropriate status code. - fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal)); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null); // after a failed read, we rely on C core to deliver appropriate status code. + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Internal)); AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal); } @@ -385,18 +385,18 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var readTask1 = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); Assert.IsTrue(readTask1.Result); Assert.AreEqual("response1", responseStream.Current); var readTask2 = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); Assert.IsTrue(readTask2.Result); Assert.AreEqual("response1", responseStream.Current); var readTask3 = responseStream.MoveNext(); - fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); - fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3); } @@ -409,12 +409,12 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var writeTask1 = requestStream.CompleteAsync(); - fakeCall.SendCompletionHandler(true); + fakeCall.SendCompletionCallback.OnSendCompletion(true); Assert.DoesNotThrowAsync(async () => await writeTask1); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); } @@ -427,8 +427,8 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); @@ -445,8 +445,8 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); @@ -461,14 +461,14 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var writeTask = requestStream.WriteAsync("request1"); - fakeCall.SendCompletionHandler(false); + fakeCall.SendCompletionCallback.OnSendCompletion(false); // The write will wait for call to finish to receive the status code. Assert.IsFalse(writeTask.IsCompleted); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied)); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied)); var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode); @@ -486,9 +486,9 @@ namespace Grpc.Core.Internal.Tests var writeTask = requestStream.WriteAsync("request1"); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied)); - fakeCall.SendCompletionHandler(false); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied)); + fakeCall.SendCompletionCallback.OnSendCompletion(false); var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode); @@ -510,8 +510,8 @@ namespace Grpc.Core.Internal.Tests Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled)); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled); } @@ -526,13 +526,13 @@ namespace Grpc.Core.Internal.Tests Assert.IsTrue(fakeCall.IsCancelled); var readTask1 = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); Assert.IsTrue(readTask1.Result); Assert.AreEqual("response1", responseStream.Current); var readTask2 = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled)); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); } @@ -547,13 +547,13 @@ namespace Grpc.Core.Internal.Tests asyncCall.Cancel(); Assert.IsTrue(fakeCall.IsCancelled); - fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); Assert.IsTrue(readTask1.Result); Assert.AreEqual("response1", responseStream.Current); var readTask2 = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled)); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); } diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs index c3a27167f9..581ac3384b 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs @@ -31,43 +31,43 @@ namespace Grpc.Core.Internal.Tests /// </summary> internal class FakeNativeCall : INativeCall { - public UnaryResponseClientHandler UnaryResponseClientHandler + public IUnaryResponseClientCallback UnaryResponseClientCallback { get; set; } - public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler + public IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback { get; set; } - public ReceivedMessageHandler ReceivedMessageHandler + public IReceivedMessageCallback ReceivedMessageCallback { get; set; } - public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler + public IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback { get; set; } - public SendCompletionHandler SendCompletionHandler + public ISendCompletionCallback SendCompletionCallback { get; set; } - public SendCompletionHandler SendStatusFromServerHandler + public ISendStatusFromServerCompletionCallback SendStatusFromServerCallback { get; set; } - public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler + public IReceivedCloseOnServerCallback ReceivedCloseOnServerCallback { get; set; @@ -100,9 +100,9 @@ namespace Grpc.Core.Internal.Tests return "PEER"; } - public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) + public void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { - UnaryResponseClientHandler = callback; + UnaryResponseClientCallback = callback; } public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) @@ -110,55 +110,55 @@ namespace Grpc.Core.Internal.Tests throw new NotImplementedException(); } - public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) + public void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { - UnaryResponseClientHandler = callback; + UnaryResponseClientCallback = callback; } - public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) + public void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { - ReceivedStatusOnClientHandler = callback; + ReceivedStatusOnClientCallback = callback; } - public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) + public void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { - ReceivedStatusOnClientHandler = callback; + ReceivedStatusOnClientCallback = callback; } - public void StartReceiveMessage(ReceivedMessageHandler callback) + public void StartReceiveMessage(IReceivedMessageCallback callback) { - ReceivedMessageHandler = callback; + ReceivedMessageCallback = callback; } - public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) + public void StartReceiveInitialMetadata(IReceivedResponseHeadersCallback callback) { - ReceivedResponseHeadersHandler = callback; + ReceivedResponseHeadersCallback = callback; } - public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) + public void StartSendInitialMetadata(ISendCompletionCallback callback, MetadataArraySafeHandle metadataArray) { - SendCompletionHandler = callback; + SendCompletionCallback = callback; } - public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) + public void StartSendMessage(ISendCompletionCallback callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) { - SendCompletionHandler = callback; + SendCompletionCallback = callback; } - public void StartSendCloseFromClient(SendCompletionHandler callback) + public void StartSendCloseFromClient(ISendCompletionCallback callback) { - SendCompletionHandler = callback; + SendCompletionCallback = callback; } - public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + public void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags) { - SendStatusFromServerHandler = callback; + SendStatusFromServerCallback = callback; } - public void StartServerSide(ReceivedCloseOnServerHandler callback) + public void StartServerSide(IReceivedCloseOnServerCallback callback) { - ReceivedCloseOnServerHandler = callback; + ReceivedCloseOnServerCallback = callback; } public void Dispose() diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 09fb722c81..aa2161267a 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -27,7 +27,7 @@ namespace Grpc.Core.Internal /// <summary> /// Manages client side native call lifecycle. /// </summary> - internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse> + internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>, IUnaryResponseClientCallback, IReceivedStatusOnClientCallback, IReceivedResponseHeadersCallback { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>(); @@ -138,7 +138,7 @@ namespace Grpc.Core.Internal unaryResponseTcs = new TaskCompletionSource<TResponse>(); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartUnary(HandleUnaryResponse, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); } return unaryResponseTcs.Task; } @@ -162,7 +162,7 @@ namespace Grpc.Core.Internal unaryResponseTcs = new TaskCompletionSource<TResponse>(); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartClientStreaming(HandleUnaryResponse, metadataArray, details.Options.Flags); + call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags); } return unaryResponseTcs.Task; @@ -188,9 +188,9 @@ namespace Grpc.Core.Internal streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartServerStreaming(HandleFinished, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); } - call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders); + call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); } } @@ -210,9 +210,9 @@ namespace Grpc.Core.Internal streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartDuplexStreaming(HandleFinished, metadataArray, details.Options.Flags); + call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags); } - call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders); + call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); } } @@ -256,7 +256,7 @@ namespace Grpc.Core.Internal halfcloseRequested = true; return TaskUtils.CompletedTask; } - call.StartSendCloseFromClient(HandleSendFinished); + call.StartSendCloseFromClient(SendCompletionCallback); halfcloseRequested = true; streamingWriteTcs = new TaskCompletionSource<object>(); @@ -516,5 +516,26 @@ namespace Grpc.Core.Internal streamingResponseCallFinishedTcs.SetResult(null); } + + IUnaryResponseClientCallback UnaryResponseClientCallback => this; + + void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) + { + HandleUnaryResponse(success, receivedStatus, receivedMessage, responseHeaders); + } + + IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback => this; + + void IReceivedStatusOnClientCallback.OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus) + { + HandleFinished(success, receivedStatus); + } + + IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback => this; + + void IReceivedResponseHeadersCallback.OnReceivedResponseHeaders(bool success, Metadata responseHeaders) + { + HandleReceivedResponseHeaders(success, responseHeaders); + } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index f379c85e00..3273c26b88 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -35,7 +35,7 @@ namespace Grpc.Core.Internal /// Base for handling both client side and server side calls. /// Manages native call lifecycle and provides convenience methods. /// </summary> - internal abstract class AsyncCallBase<TWrite, TRead> + internal abstract class AsyncCallBase<TWrite, TRead> : IReceivedMessageCallback, ISendCompletionCallback { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>(); protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message."); @@ -126,7 +126,7 @@ namespace Grpc.Core.Internal return earlyResult; } - call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); + call.StartSendMessage(SendCompletionCallback, payload, writeFlags, !initialMetadataSent); initialMetadataSent = true; streamingWritesCounter++; @@ -154,7 +154,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time"); GrpcPreconditions.CheckState(!disposed); - call.StartReceiveMessage(HandleReadFinished); + call.StartReceiveMessage(ReceivedMessageCallback); streamingReadTcs = new TaskCompletionSource<TRead>(); return streamingReadTcs.Task; } @@ -342,5 +342,19 @@ namespace Grpc.Core.Internal } origTcs.SetResult(msg); } + + protected ISendCompletionCallback SendCompletionCallback => this; + + void ISendCompletionCallback.OnSendCompletion(bool success) + { + HandleSendFinished(success); + } + + IReceivedMessageCallback ReceivedMessageCallback => this; + + void IReceivedMessageCallback.OnReceivedMessage(bool success, byte[] receivedMessage) + { + HandleReadFinished(success, receivedMessage); + } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 271a6ffadf..4f44514974 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -31,7 +31,7 @@ namespace Grpc.Core.Internal /// <summary> /// Manages server side native call lifecycle. /// </summary> - internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest> + internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>, IReceivedCloseOnServerCallback, ISendStatusFromServerCompletionCallback { readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); @@ -70,7 +70,7 @@ namespace Grpc.Core.Internal started = true; - call.StartServerSide(HandleFinishedServerside); + call.StartServerSide(ReceiveCloseOnServerCallback); return finishedServersideTcs.Task; } } @@ -114,7 +114,7 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { - call.StartSendInitialMetadata(HandleSendFinished, metadataArray); + call.StartSendInitialMetadata(SendCompletionCallback, metadataArray); } this.initialMetadataSent = true; @@ -140,7 +140,7 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { - call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent, + call.StartSendStatusFromServer(SendStatusFromServerCompletionCallback, status, metadataArray, !initialMetadataSent, payload, writeFlags); } halfcloseRequested = true; @@ -227,5 +227,19 @@ namespace Grpc.Core.Internal finishedServersideTcs.SetResult(null); } + + IReceivedCloseOnServerCallback ReceiveCloseOnServerCallback => this; + + void IReceivedCloseOnServerCallback.OnReceivedCloseOnServerHandler(bool success, bool cancelled) + { + HandleFinishedServerside(success, cancelled); + } + + ISendStatusFromServerCompletionCallback SendStatusFromServerCompletionCallback => this; + + void ISendStatusFromServerCompletionCallback.OnSendStatusFromServerCompletion(bool success) + { + HandleSendStatusFromServerFinished(success); + } } } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 589f6cbe16..7316bc2787 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -44,6 +44,8 @@ namespace Grpc.Core.Internal (success, context, state) => ((IReceivedResponseHeadersCallback)state).OnReceivedResponseHeaders(success, context.GetReceivedInitialMetadata()); static readonly BatchCompletionDelegate CompletionHandler_ISendCompletionCallback = (success, context, state) => ((ISendCompletionCallback)state).OnSendCompletion(success); + static readonly BatchCompletionDelegate CompletionHandler_ISendStatusFromServerCompletionCallback = + (success, context, state) => ((ISendStatusFromServerCompletionCallback)state).OnSendStatusFromServerCompletion(success); static readonly BatchCompletionDelegate CompletionHandler_IReceivedCloseOnServerCallback = (success, context, state) => ((IReceivedCloseOnServerCallback)state).OnReceivedCloseOnServerHandler(success, context.GetReceivedCloseOnServerCancelled()); @@ -81,7 +83,7 @@ namespace Grpc.Core.Internal .CheckOk(); } - public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) + public void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { using (completionQueue.NewScope()) { @@ -131,14 +133,14 @@ namespace Grpc.Core.Internal } } - public void StartSendStatusFromServer(ISendCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + public void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero; - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendStatusFromServerCompletionCallback, callback); var statusDetailBytes = MarshalUtils.GetBytesUTF8(status.Detail); Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata ? 1 : 0, optionalPayload, optionalPayloadLength, writeFlags).CheckOk(); diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs index cedf82a037..3474ad6208 100644 --- a/src/csharp/Grpc.Core/Internal/INativeCall.cs +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -59,6 +59,11 @@ namespace Grpc.Core.Internal void OnSendCompletion(bool success); } + internal interface ISendStatusFromServerCompletionCallback + { + void OnSendStatusFromServerCompletion(bool success); + } + internal interface IReceivedCloseOnServerCallback { void OnReceivedCloseOnServerHandler(bool success, bool cancelled); @@ -75,28 +80,28 @@ namespace Grpc.Core.Internal string GetPeer(); - void StartUnary(UnaryResponseClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags); + void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags); void StartUnary(BatchContextSafeHandle ctx, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags); - void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags); + void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags); - void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags); + void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags); - void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags); + void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags); - void StartReceiveMessage(ReceivedMessageHandler callback); + void StartReceiveMessage(IReceivedMessageCallback callback); - void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback); + void StartReceiveInitialMetadata(IReceivedResponseHeadersCallback callback); - void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray); + void StartSendInitialMetadata(ISendCompletionCallback callback, MetadataArraySafeHandle metadataArray); - void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata); + void StartSendMessage(ISendCompletionCallback callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata); - void StartSendCloseFromClient(SendCompletionHandler callback); + void StartSendCloseFromClient(ISendCompletionCallback callback); - void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags); + void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags); - void StartServerSide(ReceivedCloseOnServerHandler callback); + void StartServerSide(IReceivedCloseOnServerCallback callback); } } diff --git a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs index 6a8a29d770..9cff97eb88 100644 --- a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs +++ b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs @@ -59,14 +59,14 @@ namespace Grpc.Microbenchmarks var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry); var call = CreateFakeCall(cq); - var sendCompletionHandler = new SendCompletionHandler((success) => { }); + var sendCompletionCallback = new NopSendCompletionCallback(); var payload = new byte[payloadSize]; var writeFlags = default(WriteFlags); var stopwatch = Stopwatch.StartNew(); for (int i = 0; i < iterations; i++) { - call.StartSendMessage(sendCompletionHandler, payload, writeFlags, false); + call.StartSendMessage(sendCompletionCallback, payload, writeFlags, false); var callback = completionRegistry.Extract(completionRegistry.LastRegisteredKey); callback.OnComplete(true); } @@ -87,5 +87,13 @@ namespace Grpc.Microbenchmarks } return call; } + + private class NopSendCompletionCallback : ISendCompletionCallback + { + public void OnSendCompletion(bool success) + { + // NOP + } + } } } |