diff options
author | 2017-11-29 19:31:42 +0100 | |
---|---|---|
committer | 2017-11-29 19:31:42 +0100 | |
commit | 361f8108e41497c53f174c0ad6ea0ffcc97022a9 (patch) | |
tree | 2fa08b8f0b047f627481c07ccdffd603c0edc42c /src/csharp | |
parent | 18a6837bb0bec8b62c566b4a49adc179f0450c1d (diff) | |
parent | f836c7e941beb003289dc6e9a58a6e47f5caa5f0 (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into upmerge-from-v1.7
Diffstat (limited to 'src/csharp')
30 files changed, 699 insertions, 254 deletions
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs index 9488ce29e9..e7d8939978 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -64,7 +64,7 @@ namespace Grpc.Core.Internal.Tests public void CancelNotificationAfterStartDisposes() { var finishedTask = asyncCallServer.ServerSideCallAsync(); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -76,8 +76,8 @@ namespace Grpc.Core.Internal.Tests var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); - fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); Assert.IsFalse(moveNextTask.Result); AssertFinished(asyncCallServer, fakeCall, finishedTask); @@ -89,7 +89,7 @@ namespace Grpc.Core.Internal.Tests var finishedTask = asyncCallServer.ServerSideCallAsync(); var requestStream = new ServerRequestStream<string, string>(asyncCallServer); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true); // Check that starting a read after cancel notification has been processed is legal. var moveNextTask = requestStream.MoveNext(); @@ -107,10 +107,10 @@ namespace Grpc.Core.Internal.Tests // if a read completion's success==false, the request stream will silently finish // and we rely on C core cancelling the call. var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(false, null); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null); Assert.IsFalse(moveNextTask.Result); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -120,7 +120,7 @@ namespace Grpc.Core.Internal.Tests var finishedTask = asyncCallServer.ServerSideCallAsync(); var responseStream = new ServerResponseStream<string, string>(asyncCallServer); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true); // TODO(jtattermusch): should we throw a different exception type instead? Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1")); @@ -134,10 +134,10 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ServerResponseStream<string, string>(asyncCallServer); var writeTask = responseStream.WriteAsync("request1"); - fakeCall.SendCompletionHandler(false); + fakeCall.SendCompletionCallback.OnSendCompletion(false); Assert.ThrowsAsync(typeof(IOException), async () => await writeTask); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -150,13 +150,13 @@ namespace Grpc.Core.Internal.Tests var writeTask = responseStream.WriteAsync("request1"); var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null); - fakeCall.SendCompletionHandler(true); - fakeCall.SendStatusFromServerHandler(true); + fakeCall.SendCompletionCallback.OnSendCompletion(true); + fakeCall.SendStatusFromServerCallback.OnSendStatusFromServerCompletion(true); Assert.DoesNotThrowAsync(async () => await writeTask); Assert.DoesNotThrowAsync(async () => await writeStatusTask); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -170,8 +170,8 @@ namespace Grpc.Core.Internal.Tests asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null); Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await responseStream.WriteAsync("request1")); - fakeCall.SendStatusFromServerHandler(true); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.SendStatusFromServerCallback.OnSendStatusFromServerCompletion(true); + fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); } diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index b2b49f3a48..9aab54d2d0 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -73,7 +73,7 @@ namespace Grpc.Core.Internal.Tests public void AsyncUnary_Success() { var resultTask = asyncCall.UnaryCallAsync("request1"); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), CreateResponsePayload(), new Metadata()); @@ -85,7 +85,7 @@ namespace Grpc.Core.Internal.Tests public void AsyncUnary_NonSuccessStatusCode() { var resultTask = asyncCall.UnaryCallAsync("request1"); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.InvalidArgument), null, new Metadata()); @@ -97,7 +97,7 @@ namespace Grpc.Core.Internal.Tests public void AsyncUnary_NullResponsePayload() { var resultTask = asyncCall.UnaryCallAsync("request1"); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), null, new Metadata()); @@ -118,7 +118,7 @@ namespace Grpc.Core.Internal.Tests public void ClientStreaming_NoRequest_Success() { var resultTask = asyncCall.ClientStreamingCallAsync(); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), CreateResponsePayload(), new Metadata()); @@ -130,7 +130,7 @@ namespace Grpc.Core.Internal.Tests public void ClientStreaming_NoRequest_NonSuccessStatusCode() { var resultTask = asyncCall.ClientStreamingCallAsync(); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.InvalidArgument), null, new Metadata()); @@ -145,18 +145,18 @@ namespace Grpc.Core.Internal.Tests var requestStream = new ClientRequestStream<string, string>(asyncCall); var writeTask = requestStream.WriteAsync("request1"); - fakeCall.SendCompletionHandler(true); + fakeCall.SendCompletionCallback.OnSendCompletion(true); writeTask.Wait(); var writeTask2 = requestStream.WriteAsync("request2"); - fakeCall.SendCompletionHandler(true); + fakeCall.SendCompletionCallback.OnSendCompletion(true); writeTask2.Wait(); var completeTask = requestStream.CompleteAsync(); - fakeCall.SendCompletionHandler(true); + fakeCall.SendCompletionCallback.OnSendCompletion(true); completeTask.Wait(); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), CreateResponsePayload(), new Metadata()); @@ -171,12 +171,12 @@ namespace Grpc.Core.Internal.Tests var requestStream = new ClientRequestStream<string, string>(asyncCall); var writeTask = requestStream.WriteAsync("request1"); - fakeCall.SendCompletionHandler(false); + fakeCall.SendCompletionCallback.OnSendCompletion(false); // The write will wait for call to finish to receive the status code. Assert.IsFalse(writeTask.IsCompleted); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.Internal), null, new Metadata()); @@ -195,12 +195,12 @@ namespace Grpc.Core.Internal.Tests var writeTask = requestStream.WriteAsync("request1"); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.Internal), null, new Metadata()); - fakeCall.SendCompletionHandler(false); + fakeCall.SendCompletionCallback.OnSendCompletion(false); var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode); @@ -215,13 +215,13 @@ namespace Grpc.Core.Internal.Tests var requestStream = new ClientRequestStream<string, string>(asyncCall); var writeTask = requestStream.WriteAsync("request1"); - fakeCall.SendCompletionHandler(false); + fakeCall.SendCompletionCallback.OnSendCompletion(false); // Until the delayed write completion has been triggered, // we still act as if there was an active write. Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request2")); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.Internal), null, new Metadata()); @@ -242,7 +242,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream<string, string>(asyncCall); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), CreateResponsePayload(), new Metadata()); @@ -260,7 +260,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream<string, string>(asyncCall); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()), CreateResponsePayload(), new Metadata()); @@ -282,9 +282,9 @@ namespace Grpc.Core.Internal.Tests Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1")); - fakeCall.SendCompletionHandler(true); + fakeCall.SendCompletionCallback.OnSendCompletion(true); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), CreateResponsePayload(), new Metadata()); @@ -298,7 +298,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream<string, string>(asyncCall); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), CreateResponsePayload(), new Metadata()); @@ -319,7 +319,7 @@ namespace Grpc.Core.Internal.Tests var writeTask = requestStream.WriteAsync("request1"); Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); - fakeCall.UnaryResponseClientHandler(true, + fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true, CreateClientSideStatus(StatusCode.Cancelled), null, new Metadata()); @@ -342,11 +342,11 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedResponseHeadersHandler(true, new Metadata()); + fakeCall.ReceivedResponseHeadersCallback.OnReceivedResponseHeaders(true, new Metadata()); Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); } @@ -359,8 +359,8 @@ namespace Grpc.Core.Internal.Tests var readTask = responseStream.MoveNext(); // try alternative order of completions - fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); - fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); } @@ -372,8 +372,8 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(false, null); // after a failed read, we rely on C core to deliver appropriate status code. - fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal)); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null); // after a failed read, we rely on C core to deliver appropriate status code. + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Internal)); AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal); } @@ -385,18 +385,18 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var readTask1 = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); Assert.IsTrue(readTask1.Result); Assert.AreEqual("response1", responseStream.Current); var readTask2 = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); Assert.IsTrue(readTask2.Result); Assert.AreEqual("response1", responseStream.Current); var readTask3 = responseStream.MoveNext(); - fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); - fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3); } @@ -409,12 +409,12 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var writeTask1 = requestStream.CompleteAsync(); - fakeCall.SendCompletionHandler(true); + fakeCall.SendCompletionCallback.OnSendCompletion(true); Assert.DoesNotThrowAsync(async () => await writeTask1); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); } @@ -427,8 +427,8 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); @@ -445,8 +445,8 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); @@ -461,14 +461,14 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream<string, string>(asyncCall); var writeTask = requestStream.WriteAsync("request1"); - fakeCall.SendCompletionHandler(false); + fakeCall.SendCompletionCallback.OnSendCompletion(false); // The write will wait for call to finish to receive the status code. Assert.IsFalse(writeTask.IsCompleted); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied)); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied)); var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode); @@ -486,9 +486,9 @@ namespace Grpc.Core.Internal.Tests var writeTask = requestStream.WriteAsync("request1"); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied)); - fakeCall.SendCompletionHandler(false); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied)); + fakeCall.SendCompletionCallback.OnSendCompletion(false); var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode); @@ -510,8 +510,8 @@ namespace Grpc.Core.Internal.Tests Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled)); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled); } @@ -526,13 +526,13 @@ namespace Grpc.Core.Internal.Tests Assert.IsTrue(fakeCall.IsCancelled); var readTask1 = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); Assert.IsTrue(readTask1.Result); Assert.AreEqual("response1", responseStream.Current); var readTask2 = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled)); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); } @@ -547,13 +547,13 @@ namespace Grpc.Core.Internal.Tests asyncCall.Cancel(); Assert.IsTrue(fakeCall.IsCancelled); - fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload()); Assert.IsTrue(readTask1.Result); Assert.AreEqual("response1", responseStream.Current); var readTask2 = responseStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled)); + fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null); + fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled)); AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); } diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs index c3a27167f9..581ac3384b 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs @@ -31,43 +31,43 @@ namespace Grpc.Core.Internal.Tests /// </summary> internal class FakeNativeCall : INativeCall { - public UnaryResponseClientHandler UnaryResponseClientHandler + public IUnaryResponseClientCallback UnaryResponseClientCallback { get; set; } - public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler + public IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback { get; set; } - public ReceivedMessageHandler ReceivedMessageHandler + public IReceivedMessageCallback ReceivedMessageCallback { get; set; } - public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler + public IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback { get; set; } - public SendCompletionHandler SendCompletionHandler + public ISendCompletionCallback SendCompletionCallback { get; set; } - public SendCompletionHandler SendStatusFromServerHandler + public ISendStatusFromServerCompletionCallback SendStatusFromServerCallback { get; set; } - public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler + public IReceivedCloseOnServerCallback ReceivedCloseOnServerCallback { get; set; @@ -100,9 +100,9 @@ namespace Grpc.Core.Internal.Tests return "PEER"; } - public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) + public void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { - UnaryResponseClientHandler = callback; + UnaryResponseClientCallback = callback; } public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) @@ -110,55 +110,55 @@ namespace Grpc.Core.Internal.Tests throw new NotImplementedException(); } - public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) + public void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { - UnaryResponseClientHandler = callback; + UnaryResponseClientCallback = callback; } - public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) + public void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { - ReceivedStatusOnClientHandler = callback; + ReceivedStatusOnClientCallback = callback; } - public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) + public void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { - ReceivedStatusOnClientHandler = callback; + ReceivedStatusOnClientCallback = callback; } - public void StartReceiveMessage(ReceivedMessageHandler callback) + public void StartReceiveMessage(IReceivedMessageCallback callback) { - ReceivedMessageHandler = callback; + ReceivedMessageCallback = callback; } - public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) + public void StartReceiveInitialMetadata(IReceivedResponseHeadersCallback callback) { - ReceivedResponseHeadersHandler = callback; + ReceivedResponseHeadersCallback = callback; } - public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) + public void StartSendInitialMetadata(ISendCompletionCallback callback, MetadataArraySafeHandle metadataArray) { - SendCompletionHandler = callback; + SendCompletionCallback = callback; } - public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) + public void StartSendMessage(ISendCompletionCallback callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) { - SendCompletionHandler = callback; + SendCompletionCallback = callback; } - public void StartSendCloseFromClient(SendCompletionHandler callback) + public void StartSendCloseFromClient(ISendCompletionCallback callback) { - SendCompletionHandler = callback; + SendCompletionCallback = callback; } - public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + public void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags) { - SendStatusFromServerHandler = callback; + SendStatusFromServerCallback = callback; } - public void StartServerSide(ReceivedCloseOnServerHandler callback) + public void StartServerSide(IReceivedCloseOnServerCallback callback) { - ReceivedCloseOnServerHandler = callback; + ReceivedCloseOnServerCallback = callback; } public void Dispose() diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs index 7529c44c4e..43f816bb1c 100644 --- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs +++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs @@ -63,7 +63,7 @@ namespace Grpc.Core.Tests [Ignore("Prevent running on Jenkins")] public void NativeCallbackBenchmark() { - OpCompletionDelegate handler = Handler; + NativeCallbackTestDelegate handler = Handler; counter = 0; BenchmarkUtil.RunBenchmark( @@ -91,7 +91,7 @@ namespace Grpc.Core.Tests 10000, 10000, () => { - Native.grpcsharp_test_callback(new OpCompletionDelegate(Handler)); + Native.grpcsharp_test_callback(new NativeCallbackTestDelegate(Handler)); }); Assert.AreNotEqual(0, counter); } diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 1803920662..f9925a8a76 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -127,6 +127,20 @@ namespace Grpc.Core } } + // cached handler for watch connectivity state + static readonly BatchCompletionDelegate WatchConnectivityStateHandler = (success, ctx, state) => + { + var tcs = (TaskCompletionSource<object>) state; + if (success) + { + tcs.SetResult(null); + } + else + { + tcs.SetCanceled(); + } + }; + /// <summary> /// Returned tasks completes once channel state has become different from /// given lastObservedState. @@ -138,18 +152,8 @@ namespace Grpc.Core "Shutdown is a terminal state. No further state changes can occur."); var tcs = new TaskCompletionSource<object>(); var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture; - var handler = new BatchCompletionDelegate((success, ctx) => - { - if (success) - { - tcs.SetResult(null); - } - else - { - tcs.SetCanceled(); - } - }); - handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, handler); + // pass "tcs" as "state" for WatchConnectivityStateHandler. + handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, WatchConnectivityStateHandler, tcs); return tcs.Task; } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 09fb722c81..aa2161267a 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -27,7 +27,7 @@ namespace Grpc.Core.Internal /// <summary> /// Manages client side native call lifecycle. /// </summary> - internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse> + internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>, IUnaryResponseClientCallback, IReceivedStatusOnClientCallback, IReceivedResponseHeadersCallback { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>(); @@ -138,7 +138,7 @@ namespace Grpc.Core.Internal unaryResponseTcs = new TaskCompletionSource<TResponse>(); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartUnary(HandleUnaryResponse, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); } return unaryResponseTcs.Task; } @@ -162,7 +162,7 @@ namespace Grpc.Core.Internal unaryResponseTcs = new TaskCompletionSource<TResponse>(); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartClientStreaming(HandleUnaryResponse, metadataArray, details.Options.Flags); + call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags); } return unaryResponseTcs.Task; @@ -188,9 +188,9 @@ namespace Grpc.Core.Internal streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartServerStreaming(HandleFinished, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); } - call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders); + call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); } } @@ -210,9 +210,9 @@ namespace Grpc.Core.Internal streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartDuplexStreaming(HandleFinished, metadataArray, details.Options.Flags); + call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags); } - call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders); + call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); } } @@ -256,7 +256,7 @@ namespace Grpc.Core.Internal halfcloseRequested = true; return TaskUtils.CompletedTask; } - call.StartSendCloseFromClient(HandleSendFinished); + call.StartSendCloseFromClient(SendCompletionCallback); halfcloseRequested = true; streamingWriteTcs = new TaskCompletionSource<object>(); @@ -516,5 +516,26 @@ namespace Grpc.Core.Internal streamingResponseCallFinishedTcs.SetResult(null); } + + IUnaryResponseClientCallback UnaryResponseClientCallback => this; + + void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) + { + HandleUnaryResponse(success, receivedStatus, receivedMessage, responseHeaders); + } + + IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback => this; + + void IReceivedStatusOnClientCallback.OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus) + { + HandleFinished(success, receivedStatus); + } + + IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback => this; + + void IReceivedResponseHeadersCallback.OnReceivedResponseHeaders(bool success, Metadata responseHeaders) + { + HandleReceivedResponseHeaders(success, responseHeaders); + } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index f379c85e00..3273c26b88 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -35,7 +35,7 @@ namespace Grpc.Core.Internal /// Base for handling both client side and server side calls. /// Manages native call lifecycle and provides convenience methods. /// </summary> - internal abstract class AsyncCallBase<TWrite, TRead> + internal abstract class AsyncCallBase<TWrite, TRead> : IReceivedMessageCallback, ISendCompletionCallback { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>(); protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message."); @@ -126,7 +126,7 @@ namespace Grpc.Core.Internal return earlyResult; } - call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); + call.StartSendMessage(SendCompletionCallback, payload, writeFlags, !initialMetadataSent); initialMetadataSent = true; streamingWritesCounter++; @@ -154,7 +154,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time"); GrpcPreconditions.CheckState(!disposed); - call.StartReceiveMessage(HandleReadFinished); + call.StartReceiveMessage(ReceivedMessageCallback); streamingReadTcs = new TaskCompletionSource<TRead>(); return streamingReadTcs.Task; } @@ -342,5 +342,19 @@ namespace Grpc.Core.Internal } origTcs.SetResult(msg); } + + protected ISendCompletionCallback SendCompletionCallback => this; + + void ISendCompletionCallback.OnSendCompletion(bool success) + { + HandleSendFinished(success); + } + + IReceivedMessageCallback ReceivedMessageCallback => this; + + void IReceivedMessageCallback.OnReceivedMessage(bool success, byte[] receivedMessage) + { + HandleReadFinished(success, receivedMessage); + } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 271a6ffadf..11acb27533 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -31,7 +31,7 @@ namespace Grpc.Core.Internal /// <summary> /// Manages server side native call lifecycle. /// </summary> - internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest> + internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>, IReceivedCloseOnServerCallback, ISendStatusFromServerCompletionCallback { readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); @@ -70,7 +70,7 @@ namespace Grpc.Core.Internal started = true; - call.StartServerSide(HandleFinishedServerside); + call.StartServerSide(ReceiveCloseOnServerCallback); return finishedServersideTcs.Task; } } @@ -114,7 +114,7 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { - call.StartSendInitialMetadata(HandleSendFinished, metadataArray); + call.StartSendInitialMetadata(SendCompletionCallback, metadataArray); } this.initialMetadataSent = true; @@ -127,10 +127,10 @@ namespace Grpc.Core.Internal /// Sends call result status, indicating we are done with writes. /// Sending a status different from StatusCode.OK will also implicitly cancel the call. /// </summary> - public Task SendStatusFromServerAsync(Status status, Metadata trailers, Tuple<TResponse, WriteFlags> optionalWrite) + public Task SendStatusFromServerAsync(Status status, Metadata trailers, ResponseWithFlags? optionalWrite) { - byte[] payload = optionalWrite != null ? UnsafeSerialize(optionalWrite.Item1) : null; - var writeFlags = optionalWrite != null ? optionalWrite.Item2 : default(WriteFlags); + byte[] payload = optionalWrite.HasValue ? UnsafeSerialize(optionalWrite.Value.Response) : null; + var writeFlags = optionalWrite.HasValue ? optionalWrite.Value.WriteFlags : default(WriteFlags); lock (myLock) { @@ -140,13 +140,13 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { - call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent, + call.StartSendStatusFromServer(SendStatusFromServerCompletionCallback, status, metadataArray, !initialMetadataSent, payload, writeFlags); } halfcloseRequested = true; initialMetadataSent = true; sendStatusFromServerTcs = new TaskCompletionSource<object>(); - if (optionalWrite != null) + if (optionalWrite.HasValue) { streamingWritesCounter++; } @@ -227,5 +227,31 @@ namespace Grpc.Core.Internal finishedServersideTcs.SetResult(null); } + + IReceivedCloseOnServerCallback ReceiveCloseOnServerCallback => this; + + void IReceivedCloseOnServerCallback.OnReceivedCloseOnServer(bool success, bool cancelled) + { + HandleFinishedServerside(success, cancelled); + } + + ISendStatusFromServerCompletionCallback SendStatusFromServerCompletionCallback => this; + + void ISendStatusFromServerCompletionCallback.OnSendStatusFromServerCompletion(bool success) + { + HandleSendStatusFromServerFinished(success); + } + + public struct ResponseWithFlags + { + public ResponseWithFlags(TResponse response, WriteFlags writeFlags) + { + this.Response = response; + this.WriteFlags = writeFlags; + } + + public TResponse Response { get; } + public WriteFlags WriteFlags { get; } + } } } diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index cd5e3d8911..1e6f1fba37 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -20,15 +20,25 @@ using System; using System.Runtime.InteropServices; using System.Text; using Grpc.Core; +using Grpc.Core.Logging; +using Grpc.Core.Utils; namespace Grpc.Core.Internal { + internal interface IOpCompletionCallback + { + void OnComplete(bool success); + } + /// <summary> /// grpcsharp_batch_context /// </summary> - internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid + internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback { static readonly NativeMethods Native = NativeMethods.Get(); + static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>(); + + CompletionCallbackData completionCallbackData; private BatchContextSafeHandle() { @@ -47,19 +57,26 @@ namespace Grpc.Core.Internal } } + public void SetCompletionCallback(BatchCompletionDelegate callback, object state) + { + GrpcPreconditions.CheckState(completionCallbackData.Callback == null); + GrpcPreconditions.CheckNotNull(callback, nameof(callback)); + completionCallbackData = new CompletionCallbackData(callback, state); + } + // Gets data of recv_initial_metadata completion. public Metadata GetReceivedInitialMetadata() { IntPtr metadataArrayPtr = Native.grpcsharp_batch_context_recv_initial_metadata(this); return MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr); } - + // Gets data of recv_status_on_client completion. public ClientSideStatus GetReceivedStatusOnClient() { UIntPtr detailsLength; IntPtr detailsPtr = Native.grpcsharp_batch_context_recv_status_on_client_details(this, out detailsLength); - string details = MarshalUtils.PtrToStringUTF8(detailsPtr, (int) detailsLength.ToUInt32()); + string details = MarshalUtils.PtrToStringUTF8(detailsPtr, (int)detailsLength.ToUInt32()); var status = new Status(Native.grpcsharp_batch_context_recv_status_on_client_status(this), details); IntPtr metadataArrayPtr = Native.grpcsharp_batch_context_recv_status_on_client_trailing_metadata(this); @@ -86,11 +103,40 @@ namespace Grpc.Core.Internal { return Native.grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0; } - + protected override bool ReleaseHandle() { Native.grpcsharp_batch_context_destroy(handle); return true; } + + void IOpCompletionCallback.OnComplete(bool success) + { + try + { + completionCallbackData.Callback(success, this, completionCallbackData.State); + } + catch (Exception e) + { + Logger.Error(e, "Exception occured while invoking batch completion delegate."); + } + finally + { + completionCallbackData = default(CompletionCallbackData); + Dispose(); + } + } + + struct CompletionCallbackData + { + public CompletionCallbackData(BatchCompletionDelegate callback, object state) + { + this.Callback = callback; + this.State = state; + } + + public BatchCompletionDelegate Callback { get; } + public object State { get; } + } } } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 3a7f97707b..d6a5ba586b 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -32,6 +32,23 @@ namespace Grpc.Core.Internal public static readonly CallSafeHandle NullInstance = new CallSafeHandle(); static readonly NativeMethods Native = NativeMethods.Get(); + // Completion handlers are pre-allocated to avoid unneccessary delegate allocations. + // The "state" field is used to store the actual callback to invoke. + static readonly BatchCompletionDelegate CompletionHandler_IUnaryResponseClientCallback = + (success, context, state) => ((IUnaryResponseClientCallback)state).OnUnaryResponseClient(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()); + static readonly BatchCompletionDelegate CompletionHandler_IReceivedStatusOnClientCallback = + (success, context, state) => ((IReceivedStatusOnClientCallback)state).OnReceivedStatusOnClient(success, context.GetReceivedStatusOnClient()); + static readonly BatchCompletionDelegate CompletionHandler_IReceivedMessageCallback = + (success, context, state) => ((IReceivedMessageCallback)state).OnReceivedMessage(success, context.GetReceivedMessage()); + static readonly BatchCompletionDelegate CompletionHandler_IReceivedResponseHeadersCallback = + (success, context, state) => ((IReceivedResponseHeadersCallback)state).OnReceivedResponseHeaders(success, context.GetReceivedInitialMetadata()); + static readonly BatchCompletionDelegate CompletionHandler_ISendCompletionCallback = + (success, context, state) => ((ISendCompletionCallback)state).OnSendCompletion(success); + static readonly BatchCompletionDelegate CompletionHandler_ISendStatusFromServerCompletionCallback = + (success, context, state) => ((ISendStatusFromServerCompletionCallback)state).OnSendStatusFromServerCompletion(success); + static readonly BatchCompletionDelegate CompletionHandler_IReceivedCloseOnServerCallback = + (success, context, state) => ((IReceivedCloseOnServerCallback)state).OnReceivedCloseOnServer(success, context.GetReceivedCloseOnServerCancelled()); + const uint GRPC_WRITE_BUFFER_HINT = 1; CompletionQueueSafeHandle completionQueue; @@ -49,12 +66,12 @@ namespace Grpc.Core.Internal Native.grpcsharp_call_set_credentials(this, credentials).CheckOk(); } - public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) + public void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback); Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags) .CheckOk(); } @@ -66,106 +83,106 @@ namespace Grpc.Core.Internal .CheckOk(); } - public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) + public void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback); Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray, callFlags).CheckOk(); } } - public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) + public void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback); Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags).CheckOk(); } } - public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) + public void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback); Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, callFlags).CheckOk(); } } - public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) + public void StartSendMessage(ISendCompletionCallback callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata ? 1 : 0).CheckOk(); } } - public void StartSendCloseFromClient(SendCompletionHandler callback) + public void StartSendCloseFromClient(ISendCompletionCallback callback) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } } - public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + public void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero; - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendStatusFromServerCompletionCallback, callback); var statusDetailBytes = MarshalUtils.GetBytesUTF8(status.Detail); Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata ? 1 : 0, optionalPayload, optionalPayloadLength, writeFlags).CheckOk(); } } - public void StartReceiveMessage(ReceivedMessageHandler callback) + public void StartReceiveMessage(IReceivedMessageCallback callback) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedMessageCallback, callback); Native.grpcsharp_call_recv_message(this, ctx).CheckOk(); } } - public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) + public void StartReceiveInitialMetadata(IReceivedResponseHeadersCallback callback) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedResponseHeadersCallback, callback); Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); } } - public void StartServerSide(ReceivedCloseOnServerHandler callback) + public void StartServerSide(IReceivedCloseOnServerCallback callback) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedCloseOnServerCallback, callback); Native.grpcsharp_call_start_serverside(this, ctx).CheckOk(); } } - public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) + public void StartSendInitialMetadata(ISendCompletionCallback callback, MetadataArraySafeHandle metadataArray) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index f826a17bad..1eeb0e3d97 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -64,10 +64,10 @@ namespace Grpc.Core.Internal return Native.grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0); } - public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback) + public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback, object callbackState) { var ctx = BatchContextSafeHandle.Create(); - cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback, callbackState); Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx); } diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs index 1102c8d14f..b68655b33c 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs @@ -19,15 +19,15 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Runtime.InteropServices; +using System.Threading; using Grpc.Core.Logging; using Grpc.Core.Utils; namespace Grpc.Core.Internal { - internal delegate void OpCompletionDelegate(bool success); - - internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx); + internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx, object state); internal delegate void RequestCallCompletionDelegate(bool success, RequestCallContextSafeHandle ctx); @@ -36,8 +36,8 @@ namespace Grpc.Core.Internal static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<CompletionRegistry>(); readonly GrpcEnvironment environment; - readonly Dictionary<IntPtr, OpCompletionDelegate> dict = new Dictionary<IntPtr, OpCompletionDelegate>(new IntPtrComparer()); - readonly object myLock = new object(); + readonly Dictionary<IntPtr, IOpCompletionCallback> dict = new Dictionary<IntPtr, IOpCompletionCallback>(new IntPtrComparer()); + SpinLock spinLock = new SpinLock(Debugger.IsAttached); IntPtr lastRegisteredKey; // only for testing public CompletionRegistry(GrpcEnvironment environment) @@ -45,38 +45,51 @@ namespace Grpc.Core.Internal this.environment = environment; } - public void Register(IntPtr key, OpCompletionDelegate callback) + public void Register(IntPtr key, IOpCompletionCallback callback) { environment.DebugStats.PendingBatchCompletions.Increment(); - lock (myLock) + + bool lockTaken = false; + try { + spinLock.Enter(ref lockTaken); + dict.Add(key, callback); this.lastRegisteredKey = key; } + finally + { + if (lockTaken) spinLock.Exit(); + } } - public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback) + public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback, object state) { - // TODO(jtattermusch): get rid of new delegate creation here - OpCompletionDelegate opCallback = ((success) => HandleBatchCompletion(success, ctx, callback)); - Register(ctx.Handle, opCallback); + ctx.SetCompletionCallback(callback, state); + Register(ctx.Handle, ctx); } public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback) { - // TODO(jtattermusch): get rid of new delegate creation here - OpCompletionDelegate opCallback = ((success) => HandleRequestCallCompletion(success, ctx, callback)); - Register(ctx.Handle, opCallback); + ctx.CompletionCallback = callback; + Register(ctx.Handle, ctx); } - public OpCompletionDelegate Extract(IntPtr key) + public IOpCompletionCallback Extract(IntPtr key) { - OpCompletionDelegate value = null; - lock (myLock) + IOpCompletionCallback value = null; + bool lockTaken = false; + try { + spinLock.Enter(ref lockTaken); + value = dict[key]; dict.Remove(key); } + finally + { + if (lockTaken) spinLock.Exit(); + } environment.DebugStats.PendingBatchCompletions.Decrement(); return value; } @@ -89,44 +102,6 @@ namespace Grpc.Core.Internal get { return this.lastRegisteredKey; } } - private static void HandleBatchCompletion(bool success, BatchContextSafeHandle ctx, BatchCompletionDelegate callback) - { - try - { - callback(success, ctx); - } - catch (Exception e) - { - Logger.Error(e, "Exception occured while invoking batch completion delegate."); - } - finally - { - if (ctx != null) - { - ctx.Dispose(); - } - } - } - - private static void HandleRequestCallCompletion(bool success, RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback) - { - try - { - callback(success, ctx); - } - catch (Exception e) - { - Logger.Error(e, "Exception occured while invoking request call completion delegate."); - } - finally - { - if (ctx != null) - { - ctx.Dispose(); - } - } - } - /// <summary> /// IntPtr doesn't implement <c>IEquatable{IntPtr}</c> so we need to use custom comparer to avoid boxing. /// </summary> diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index f7f723c00b..bd0229a9dd 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -68,8 +68,8 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount, "Thread pool size cannot be smaller than the number of completion queues used."); - this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true)); - this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false)); + this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((IOpCompletionCallback) callback, true)); + this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((IOpCompletionCallback) callback, false)); } public void Start() @@ -225,11 +225,11 @@ namespace Grpc.Core.Internal return list.AsReadOnly(); } - private void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success) + private void RunCompletionQueueEventCallback(IOpCompletionCallback callback, bool success) { try { - callback(success); + callback.OnComplete(success); } catch (Exception e) { diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs index f9c06583c8..5c35b2ba46 100644 --- a/src/csharp/Grpc.Core/Internal/INativeCall.cs +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -20,18 +20,41 @@ using Grpc.Core; namespace Grpc.Core.Internal { - internal delegate void UnaryResponseClientHandler(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders); + internal interface IUnaryResponseClientCallback + { + void OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders); + } // Received status for streaming response calls. - internal delegate void ReceivedStatusOnClientHandler(bool success, ClientSideStatus receivedStatus); + internal interface IReceivedStatusOnClientCallback + { + void OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus); + } - internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage); + internal interface IReceivedMessageCallback + { + void OnReceivedMessage(bool success, byte[] receivedMessage); + } - internal delegate void ReceivedResponseHeadersHandler(bool success, Metadata responseHeaders); + internal interface IReceivedResponseHeadersCallback + { + void OnReceivedResponseHeaders(bool success, Metadata responseHeaders); + } - internal delegate void SendCompletionHandler(bool success); + internal interface ISendCompletionCallback + { + void OnSendCompletion(bool success); + } - internal delegate void ReceivedCloseOnServerHandler(bool success, bool cancelled); + internal interface ISendStatusFromServerCompletionCallback + { + void OnSendStatusFromServerCompletion(bool success); + } + + internal interface IReceivedCloseOnServerCallback + { + void OnReceivedCloseOnServer(bool success, bool cancelled); + } /// <summary> /// Abstraction of a native call object. @@ -44,28 +67,28 @@ namespace Grpc.Core.Internal string GetPeer(); - void StartUnary(UnaryResponseClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags); + void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags); void StartUnary(BatchContextSafeHandle ctx, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags); - void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags); + void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags); - void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags); + void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags); - void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags); + void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags); - void StartReceiveMessage(ReceivedMessageHandler callback); + void StartReceiveMessage(IReceivedMessageCallback callback); - void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback); + void StartReceiveInitialMetadata(IReceivedResponseHeadersCallback callback); - void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray); + void StartSendInitialMetadata(ISendCompletionCallback callback, MetadataArraySafeHandle metadataArray); - void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata); + void StartSendMessage(ISendCompletionCallback callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata); - void StartSendCloseFromClient(SendCompletionHandler callback); + void StartSendCloseFromClient(ISendCompletionCallback callback); - void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags); + void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags); - void StartServerSide(ReceivedCloseOnServerHandler callback); + void StartServerSide(IReceivedCloseOnServerCallback callback); } } diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index 22faa19d9b..d517252cfe 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -29,6 +29,8 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { + internal delegate void NativeCallbackTestDelegate(bool success); + /// <summary> /// Provides access to all native methods provided by <c>NativeExtension</c>. /// An extra level of indirection is added to P/Invoke calls to allow intelligent loading @@ -420,7 +422,7 @@ namespace Grpc.Core.Internal public delegate Timespec gprsharp_convert_clock_type_delegate(Timespec t, ClockType targetClock); public delegate int gprsharp_sizeof_timespec_delegate(); - public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback); + public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] NativeCallbackTestDelegate callback); public delegate IntPtr grpcsharp_test_nop_delegate(IntPtr ptr); public delegate void grpcsharp_test_override_method_delegate(string methodName, string variant); } diff --git a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs index b7af0c102d..09f5c3e452 100644 --- a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs @@ -19,15 +19,17 @@ using System; using System.Runtime.InteropServices; using Grpc.Core; +using Grpc.Core.Logging; namespace Grpc.Core.Internal { /// <summary> /// grpcsharp_request_call_context /// </summary> - internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid + internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback { static readonly NativeMethods Native = NativeMethods.Get(); + static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<RequestCallContextSafeHandle>(); private RequestCallContextSafeHandle() { @@ -46,6 +48,8 @@ namespace Grpc.Core.Internal } } + public RequestCallCompletionDelegate CompletionCallback { get; set; } + // Gets data of server_rpc_new completion. public ServerRpcNew GetServerRpcNew(Server server) { @@ -72,5 +76,22 @@ namespace Grpc.Core.Internal Native.grpcsharp_request_call_context_destroy(handle); return true; } + + void IOpCompletionCallback.OnComplete(bool success) + { + try + { + CompletionCallback(success, this); + } + catch (Exception e) + { + Logger.Error(e, "Exception occured while invoking request call completion delegate."); + } + finally + { + CompletionCallback = null; + Dispose(); + } + } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 6019f8e793..98995a0862 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -60,7 +60,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - Tuple<TResponse,WriteFlags> responseTuple = null; + AsyncCallServer<TRequest,TResponse>.ResponseWithFlags? responseWithFlags = null; var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken); try { @@ -68,7 +68,7 @@ namespace Grpc.Core.Internal var request = requestStream.Current; var response = await handler(request, context).ConfigureAwait(false); status = context.Status; - responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); + responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); } catch (Exception e) { @@ -80,7 +80,7 @@ namespace Grpc.Core.Internal } try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseWithFlags).ConfigureAwait(false); } catch (Exception) { @@ -177,13 +177,13 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - Tuple<TResponse,WriteFlags> responseTuple = null; + AsyncCallServer<TRequest, TResponse>.ResponseWithFlags? responseWithFlags = null; var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken); try { var response = await handler(requestStream, context).ConfigureAwait(false); status = context.Status; - responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); + responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); } catch (Exception e) { @@ -196,7 +196,7 @@ namespace Grpc.Core.Internal try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseWithFlags).ConfigureAwait(false); } catch (Exception) { diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 63000e9a22..a308890cde 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -59,13 +59,15 @@ namespace Grpc.Core.Internal { Native.grpcsharp_server_start(this); } - + public void ShutdownAndNotify(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + // TODO(jtattermusch): delegate allocation by caller can be avoided by utilizing the "state" object, + // but server shutdown isn't worth optimizing right now. + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback, null); Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx); } } diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 77ad876bdf..71c7f108f3 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -387,7 +387,7 @@ namespace Grpc.Core /// <summary> /// Handles native callback. /// </summary> - private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx) + private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx, object state) { shutdownTcs.SetResult(null); } diff --git a/src/csharp/Grpc.Core/Version.csproj.include b/src/csharp/Grpc.Core/Version.csproj.include index 7cae4a55d4..2d9e4ba16a 100755 --- a/src/csharp/Grpc.Core/Version.csproj.include +++ b/src/csharp/Grpc.Core/Version.csproj.include @@ -1,7 +1,7 @@ <!-- This file is generated --> <Project> <PropertyGroup> - <GrpcCsharpVersion>1.7.2</GrpcCsharpVersion> + <GrpcCsharpVersion>1.9.0-dev</GrpcCsharpVersion> <GoogleProtobufVersion>3.3.0</GoogleProtobufVersion> </PropertyGroup> </Project> diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index e4c97ffda7..9b5da1c947 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -33,11 +33,11 @@ namespace Grpc.Core /// <summary> /// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies /// </summary> - public const string CurrentAssemblyFileVersion = "1.7.2.0"; + public const string CurrentAssemblyFileVersion = "1.9.0.0"; /// <summary> /// Current version of gRPC C# /// </summary> - public const string CurrentVersion = "1.7.2"; + public const string CurrentVersion = "1.9.0-dev"; } } diff --git a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs new file mode 100644 index 0000000000..2d1c33e9a0 --- /dev/null +++ b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs @@ -0,0 +1,78 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Runtime.InteropServices; +using System.Threading; +using Grpc.Core; +using Grpc.Core.Internal; +using System.Collections.Generic; +using System.Diagnostics; + +namespace Grpc.Microbenchmarks +{ + public class CompletionRegistryBenchmark + { + GrpcEnvironment environment; + + public void Init() + { + environment = GrpcEnvironment.AddRef(); + } + + public void Cleanup() + { + GrpcEnvironment.ReleaseAsync().Wait(); + } + + public void Run(int threadCount, int iterations, bool useSharedRegistry) + { + Console.WriteLine(string.Format("CompletionRegistryBenchmark: threads={0}, iterations={1}, useSharedRegistry={2}", threadCount, iterations, useSharedRegistry)); + CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment) : null; + var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, sharedRegistry)); + threadedBenchmark.Run(); + // TODO: parametrize by number of pending completions + } + + private void ThreadBody(int iterations, CompletionRegistry optionalSharedRegistry) + { + var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment); + var ctx = BatchContextSafeHandle.Create(); + + var stopwatch = Stopwatch.StartNew(); + for (int i = 0; i < iterations; i++) + { + completionRegistry.Register(ctx.Handle, ctx); + var callback = completionRegistry.Extract(ctx.Handle); + // NOTE: we are not calling the callback to avoid disposing ctx. + } + stopwatch.Stop(); + Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds); + + ctx.Dispose(); + } + + private class NopCompletionCallback : IOpCompletionCallback + { + public void OnComplete(bool success) + { + + } + } + } +} diff --git a/src/csharp/Grpc.Microbenchmarks/GCStats.cs b/src/csharp/Grpc.Microbenchmarks/GCStats.cs new file mode 100644 index 0000000000..ca7051ec4e --- /dev/null +++ b/src/csharp/Grpc.Microbenchmarks/GCStats.cs @@ -0,0 +1,69 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using Grpc.Core; +using Grpc.Core.Internal; + +namespace Grpc.Microbenchmarks +{ + internal class GCStats + { + readonly object myLock = new object(); + GCStatsSnapshot lastSnapshot; + + public GCStats() + { + lastSnapshot = new GCStatsSnapshot(GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2)); + } + + public GCStatsSnapshot GetSnapshot(bool reset = false) + { + lock (myLock) + { + var newSnapshot = new GCStatsSnapshot(GC.CollectionCount(0) - lastSnapshot.Gen0, + GC.CollectionCount(1) - lastSnapshot.Gen1, + GC.CollectionCount(2) - lastSnapshot.Gen2); + if (reset) + { + lastSnapshot = newSnapshot; + } + return newSnapshot; + } + } + } + + public class GCStatsSnapshot + { + public GCStatsSnapshot(int gen0, int gen1, int gen2) + { + this.Gen0 = gen0; + this.Gen1 = gen1; + this.Gen2 = gen2; + } + + public int Gen0 { get; } + public int Gen1 { get; } + public int Gen2 { get; } + + public override string ToString() + { + return string.Format("[GCCollectionCount: gen0 {0}, gen1 {1}, gen2 {2}]", Gen0, Gen1, Gen2); + } + } +} diff --git a/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj b/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj index 108357e4eb..8a629f9748 100644 --- a/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj +++ b/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj @@ -15,6 +15,10 @@ <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" /> </ItemGroup> + <ItemGroup> + <PackageReference Include="CommandLineParser" Version="2.1.1-beta" /> + </ItemGroup> + <ItemGroup Condition=" '$(TargetFramework)' == 'net45' "> <Reference Include="System" /> <Reference Include="Microsoft.CSharp" /> diff --git a/src/csharp/Grpc.Microbenchmarks/PInvokeByteArrayBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/PInvokeByteArrayBenchmark.cs new file mode 100644 index 0000000000..787b5508fb --- /dev/null +++ b/src/csharp/Grpc.Microbenchmarks/PInvokeByteArrayBenchmark.cs @@ -0,0 +1,64 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Runtime.InteropServices; +using System.Threading; +using Grpc.Core; +using Grpc.Core.Internal; +using System.Collections.Generic; +using System.Diagnostics; + +namespace Grpc.Microbenchmarks +{ + public class PInvokeByteArrayBenchmark + { + static readonly NativeMethods Native = NativeMethods.Get(); + + public void Init() + { + } + + public void Cleanup() + { + } + + public void Run(int threadCount, int iterations, int payloadSize) + { + Console.WriteLine(string.Format("PInvokeByteArrayBenchmark: threads={0}, iterations={1}, payloadSize={2}", threadCount, iterations, payloadSize)); + var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, payloadSize)); + threadedBenchmark.Run(); + } + + private void ThreadBody(int iterations, int payloadSize) + { + var payload = new byte[payloadSize]; + + var stopwatch = Stopwatch.StartNew(); + for (int i = 0; i < iterations; i++) + { + var gcHandle = GCHandle.Alloc(payload, GCHandleType.Pinned); + var payloadPtr = gcHandle.AddrOfPinnedObject(); + Native.grpcsharp_test_nop(payloadPtr); + gcHandle.Free(); + } + stopwatch.Stop(); + Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds); + } + } +} diff --git a/src/csharp/Grpc.Microbenchmarks/Program.cs b/src/csharp/Grpc.Microbenchmarks/Program.cs index d07d4187c4..a64c2979ab 100644 --- a/src/csharp/Grpc.Microbenchmarks/Program.cs +++ b/src/csharp/Grpc.Microbenchmarks/Program.cs @@ -20,14 +20,84 @@ using System; using Grpc.Core; using Grpc.Core.Internal; using Grpc.Core.Logging; +using CommandLine; +using CommandLine.Text; namespace Grpc.Microbenchmarks { class Program { + public enum MicrobenchmarkType + { + CompletionRegistry, + PInvokeByteArray, + SendMessage + } + + private class BenchmarkOptions + { + [Option("benchmark", Required = true, HelpText = "Benchmark to run")] + public MicrobenchmarkType Benchmark { get; set; } + } + public static void Main(string[] args) { GrpcEnvironment.SetLogger(new ConsoleLogger()); + var parserResult = Parser.Default.ParseArguments<BenchmarkOptions>(args) + .WithNotParsed(errors => { + Console.WriteLine("Supported benchmarks:"); + foreach (var enumValue in Enum.GetValues(typeof(MicrobenchmarkType))) + { + Console.WriteLine(" " + enumValue); + } + Environment.Exit(1); + }) + .WithParsed(options => + { + switch (options.Benchmark) + { + case MicrobenchmarkType.CompletionRegistry: + RunCompletionRegistryBenchmark(); + break; + case MicrobenchmarkType.PInvokeByteArray: + RunPInvokeByteArrayBenchmark(); + break; + case MicrobenchmarkType.SendMessage: + RunSendMessageBenchmark(); + break; + default: + throw new ArgumentException("Unsupported benchmark."); + } + }); + } + + static void RunCompletionRegistryBenchmark() + { + var benchmark = new CompletionRegistryBenchmark(); + benchmark.Init(); + foreach (int threadCount in new int[] {1, 1, 2, 4, 8, 12}) + { + foreach (bool useSharedRegistry in new bool[] {false, true}) + { + benchmark.Run(threadCount, 4 * 1000 * 1000, useSharedRegistry); + } + } + benchmark.Cleanup(); + } + + static void RunPInvokeByteArrayBenchmark() + { + var benchmark = new PInvokeByteArrayBenchmark(); + benchmark.Init(); + foreach (int threadCount in new int[] {1, 1, 2, 4, 8, 12}) + { + benchmark.Run(threadCount, 4 * 1000 * 1000, 0); + } + benchmark.Cleanup(); + } + + static void RunSendMessageBenchmark() + { var benchmark = new SendMessageBenchmark(); benchmark.Init(); foreach (int threadCount in new int[] {1, 1, 2, 4, 8, 12}) diff --git a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs index de67874580..9cff97eb88 100644 --- a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs +++ b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs @@ -59,16 +59,16 @@ namespace Grpc.Microbenchmarks var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry); var call = CreateFakeCall(cq); - var sendCompletionHandler = new SendCompletionHandler((success) => { }); + var sendCompletionCallback = new NopSendCompletionCallback(); var payload = new byte[payloadSize]; var writeFlags = default(WriteFlags); var stopwatch = Stopwatch.StartNew(); for (int i = 0; i < iterations; i++) { - call.StartSendMessage(sendCompletionHandler, payload, writeFlags, false); + call.StartSendMessage(sendCompletionCallback, payload, writeFlags, false); var callback = completionRegistry.Extract(completionRegistry.LastRegisteredKey); - callback(true); + callback.OnComplete(true); } stopwatch.Stop(); Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds); @@ -87,5 +87,13 @@ namespace Grpc.Microbenchmarks } return call; } + + private class NopSendCompletionCallback : ISendCompletionCallback + { + public void OnSendCompletion(bool success) + { + // NOP + } + } } } diff --git a/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs index feac8d1690..95b9aaaf3f 100644 --- a/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs +++ b/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs @@ -46,6 +46,7 @@ namespace Grpc.Microbenchmarks public void Run() { Console.WriteLine("Running threads."); + var gcStats = new GCStats(); var threads = new List<Thread>(); for (int i = 0; i < runners.Count; i++) { @@ -58,7 +59,7 @@ namespace Grpc.Microbenchmarks { thread.Join(); } - Console.WriteLine("All threads finished."); + Console.WriteLine("All threads finished (GC Stats Delta: " + gcStats.GetSnapshot() + ")"); } } } diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat index 3be049ae1d..8f89e2846a 100755 --- a/src/csharp/build_packages_dotnetcli.bat +++ b/src/csharp/build_packages_dotnetcli.bat @@ -13,7 +13,7 @@ @rem limitations under the License. @rem Current package versions -set VERSION=1.7.2 +set VERSION=1.9.0-dev @rem Adjust the location of nuget.exe set NUGET=C:\nuget\nuget.exe diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh index cf565b2e8d..6a6cafe2bd 100755 --- a/src/csharp/build_packages_dotnetcli.sh +++ b/src/csharp/build_packages_dotnetcli.sh @@ -39,7 +39,7 @@ dotnet pack --configuration Release Grpc.Auth --output ../../../artifacts dotnet pack --configuration Release Grpc.HealthCheck --output ../../../artifacts dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts -nuget pack Grpc.nuspec -Version "1.7.2" -OutputDirectory ../../artifacts -nuget pack Grpc.Tools.nuspec -Version "1.7.2" -OutputDirectory ../../artifacts +nuget pack Grpc.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts +nuget pack Grpc.Tools.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts (cd ../../artifacts && zip csharp_nugets_dotnetcli.zip *.nupkg) |