aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp
diff options
context:
space:
mode:
authorGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2017-11-29 19:31:42 +0100
committerGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2017-11-29 19:31:42 +0100
commit361f8108e41497c53f174c0ad6ea0ffcc97022a9 (patch)
tree2fa08b8f0b047f627481c07ccdffd603c0edc42c /src/csharp
parent18a6837bb0bec8b62c566b4a49adc179f0450c1d (diff)
parentf836c7e941beb003289dc6e9a58a6e47f5caa5f0 (diff)
Merge branch 'master' of https://github.com/grpc/grpc into upmerge-from-v1.7
Diffstat (limited to 'src/csharp')
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs28
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs106
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs58
-rw-r--r--src/csharp/Grpc.Core.Tests/PInvokeTest.cs4
-rw-r--r--src/csharp/Grpc.Core/Channel.cs28
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs37
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs20
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs42
-rw-r--r--src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs54
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs61
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionRegistry.cs85
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/INativeCall.cs57
-rw-r--r--src/csharp/Grpc.Core/Internal/NativeMethods.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs23
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs12
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs6
-rw-r--r--src/csharp/Grpc.Core/Server.cs2
-rwxr-xr-xsrc/csharp/Grpc.Core/Version.csproj.include2
-rw-r--r--src/csharp/Grpc.Core/VersionInfo.cs4
-rw-r--r--src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs78
-rw-r--r--src/csharp/Grpc.Microbenchmarks/GCStats.cs69
-rw-r--r--src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj4
-rw-r--r--src/csharp/Grpc.Microbenchmarks/PInvokeByteArrayBenchmark.cs64
-rw-r--r--src/csharp/Grpc.Microbenchmarks/Program.cs70
-rw-r--r--src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs14
-rw-r--r--src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs3
-rwxr-xr-xsrc/csharp/build_packages_dotnetcli.bat2
-rwxr-xr-xsrc/csharp/build_packages_dotnetcli.sh4
30 files changed, 699 insertions, 254 deletions
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
index 9488ce29e9..e7d8939978 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
@@ -64,7 +64,7 @@ namespace Grpc.Core.Internal.Tests
public void CancelNotificationAfterStartDisposes()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
- fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@@ -76,8 +76,8 @@ namespace Grpc.Core.Internal.Tests
var moveNextTask = requestStream.MoveNext();
- fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
- fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
Assert.IsFalse(moveNextTask.Result);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
@@ -89,7 +89,7 @@ namespace Grpc.Core.Internal.Tests
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
- fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
// Check that starting a read after cancel notification has been processed is legal.
var moveNextTask = requestStream.MoveNext();
@@ -107,10 +107,10 @@ namespace Grpc.Core.Internal.Tests
// if a read completion's success==false, the request stream will silently finish
// and we rely on C core cancelling the call.
var moveNextTask = requestStream.MoveNext();
- fakeCall.ReceivedMessageHandler(false, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null);
Assert.IsFalse(moveNextTask.Result);
- fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@@ -120,7 +120,7 @@ namespace Grpc.Core.Internal.Tests
var finishedTask = asyncCallServer.ServerSideCallAsync();
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
- fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
// TODO(jtattermusch): should we throw a different exception type instead?
Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1"));
@@ -134,10 +134,10 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
var writeTask = responseStream.WriteAsync("request1");
- fakeCall.SendCompletionHandler(false);
+ fakeCall.SendCompletionCallback.OnSendCompletion(false);
Assert.ThrowsAsync(typeof(IOException), async () => await writeTask);
- fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@@ -150,13 +150,13 @@ namespace Grpc.Core.Internal.Tests
var writeTask = responseStream.WriteAsync("request1");
var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
- fakeCall.SendCompletionHandler(true);
- fakeCall.SendStatusFromServerHandler(true);
+ fakeCall.SendCompletionCallback.OnSendCompletion(true);
+ fakeCall.SendStatusFromServerCallback.OnSendStatusFromServerCompletion(true);
Assert.DoesNotThrowAsync(async () => await writeTask);
Assert.DoesNotThrowAsync(async () => await writeStatusTask);
- fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@@ -170,8 +170,8 @@ namespace Grpc.Core.Internal.Tests
asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await responseStream.WriteAsync("request1"));
- fakeCall.SendStatusFromServerHandler(true);
- fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ fakeCall.SendStatusFromServerCallback.OnSendStatusFromServerCompletion(true);
+ fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index b2b49f3a48..9aab54d2d0 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -73,7 +73,7 @@ namespace Grpc.Core.Internal.Tests
public void AsyncUnary_Success()
{
var resultTask = asyncCall.UnaryCallAsync("request1");
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@@ -85,7 +85,7 @@ namespace Grpc.Core.Internal.Tests
public void AsyncUnary_NonSuccessStatusCode()
{
var resultTask = asyncCall.UnaryCallAsync("request1");
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
null,
new Metadata());
@@ -97,7 +97,7 @@ namespace Grpc.Core.Internal.Tests
public void AsyncUnary_NullResponsePayload()
{
var resultTask = asyncCall.UnaryCallAsync("request1");
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
null,
new Metadata());
@@ -118,7 +118,7 @@ namespace Grpc.Core.Internal.Tests
public void ClientStreaming_NoRequest_Success()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@@ -130,7 +130,7 @@ namespace Grpc.Core.Internal.Tests
public void ClientStreaming_NoRequest_NonSuccessStatusCode()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
null,
new Metadata());
@@ -145,18 +145,18 @@ namespace Grpc.Core.Internal.Tests
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
- fakeCall.SendCompletionHandler(true);
+ fakeCall.SendCompletionCallback.OnSendCompletion(true);
writeTask.Wait();
var writeTask2 = requestStream.WriteAsync("request2");
- fakeCall.SendCompletionHandler(true);
+ fakeCall.SendCompletionCallback.OnSendCompletion(true);
writeTask2.Wait();
var completeTask = requestStream.CompleteAsync();
- fakeCall.SendCompletionHandler(true);
+ fakeCall.SendCompletionCallback.OnSendCompletion(true);
completeTask.Wait();
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@@ -171,12 +171,12 @@ namespace Grpc.Core.Internal.Tests
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
- fakeCall.SendCompletionHandler(false);
+ fakeCall.SendCompletionCallback.OnSendCompletion(false);
// The write will wait for call to finish to receive the status code.
Assert.IsFalse(writeTask.IsCompleted);
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());
@@ -195,12 +195,12 @@ namespace Grpc.Core.Internal.Tests
var writeTask = requestStream.WriteAsync("request1");
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());
- fakeCall.SendCompletionHandler(false);
+ fakeCall.SendCompletionCallback.OnSendCompletion(false);
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
@@ -215,13 +215,13 @@ namespace Grpc.Core.Internal.Tests
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
- fakeCall.SendCompletionHandler(false);
+ fakeCall.SendCompletionCallback.OnSendCompletion(false);
// Until the delayed write completion has been triggered,
// we still act as if there was an active write.
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request2"));
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());
@@ -242,7 +242,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@@ -260,7 +260,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
CreateResponsePayload(),
new Metadata());
@@ -282,9 +282,9 @@ namespace Grpc.Core.Internal.Tests
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
- fakeCall.SendCompletionHandler(true);
+ fakeCall.SendCompletionCallback.OnSendCompletion(true);
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@@ -298,7 +298,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
@@ -319,7 +319,7 @@ namespace Grpc.Core.Internal.Tests
var writeTask = requestStream.WriteAsync("request1");
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
- fakeCall.UnaryResponseClientHandler(true,
+ fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Cancelled),
null,
new Metadata());
@@ -342,11 +342,11 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedResponseHeadersHandler(true, new Metadata());
+ fakeCall.ReceivedResponseHeadersCallback.OnReceivedResponseHeaders(true, new Metadata());
Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
- fakeCall.ReceivedMessageHandler(true, null);
- fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
@@ -359,8 +359,8 @@ namespace Grpc.Core.Internal.Tests
var readTask = responseStream.MoveNext();
// try alternative order of completions
- fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
- fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
@@ -372,8 +372,8 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageHandler(false, null); // after a failed read, we rely on C core to deliver appropriate status code.
- fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal));
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null); // after a failed read, we rely on C core to deliver appropriate status code.
+ fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Internal));
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal);
}
@@ -385,18 +385,18 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask1 = responseStream.MoveNext();
- fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
- fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
Assert.IsTrue(readTask2.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask3 = responseStream.MoveNext();
- fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
- fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3);
}
@@ -409,12 +409,12 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var writeTask1 = requestStream.CompleteAsync();
- fakeCall.SendCompletionHandler(true);
+ fakeCall.SendCompletionCallback.OnSendCompletion(true);
Assert.DoesNotThrowAsync(async () => await writeTask1);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageHandler(true, null);
- fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
@@ -427,8 +427,8 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageHandler(true, null);
- fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@@ -445,8 +445,8 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageHandler(true, null);
- fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@@ -461,14 +461,14 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
- fakeCall.SendCompletionHandler(false);
+ fakeCall.SendCompletionCallback.OnSendCompletion(false);
// The write will wait for call to finish to receive the status code.
Assert.IsFalse(writeTask.IsCompleted);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageHandler(true, null);
- fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied));
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
@@ -486,9 +486,9 @@ namespace Grpc.Core.Internal.Tests
var writeTask = requestStream.WriteAsync("request1");
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageHandler(true, null);
- fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied));
- fakeCall.SendCompletionHandler(false);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
+ fakeCall.SendCompletionCallback.OnSendCompletion(false);
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
@@ -510,8 +510,8 @@ namespace Grpc.Core.Internal.Tests
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageHandler(true, null);
- fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled);
}
@@ -526,13 +526,13 @@ namespace Grpc.Core.Internal.Tests
Assert.IsTrue(fakeCall.IsCancelled);
var readTask1 = responseStream.MoveNext();
- fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
- fakeCall.ReceivedMessageHandler(true, null);
- fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
}
@@ -547,13 +547,13 @@ namespace Grpc.Core.Internal.Tests
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
- fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
- fakeCall.ReceivedMessageHandler(true, null);
- fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
index c3a27167f9..581ac3384b 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
@@ -31,43 +31,43 @@ namespace Grpc.Core.Internal.Tests
/// </summary>
internal class FakeNativeCall : INativeCall
{
- public UnaryResponseClientHandler UnaryResponseClientHandler
+ public IUnaryResponseClientCallback UnaryResponseClientCallback
{
get;
set;
}
- public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
+ public IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback
{
get;
set;
}
- public ReceivedMessageHandler ReceivedMessageHandler
+ public IReceivedMessageCallback ReceivedMessageCallback
{
get;
set;
}
- public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
+ public IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback
{
get;
set;
}
- public SendCompletionHandler SendCompletionHandler
+ public ISendCompletionCallback SendCompletionCallback
{
get;
set;
}
- public SendCompletionHandler SendStatusFromServerHandler
+ public ISendStatusFromServerCompletionCallback SendStatusFromServerCallback
{
get;
set;
}
- public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
+ public IReceivedCloseOnServerCallback ReceivedCloseOnServerCallback
{
get;
set;
@@ -100,9 +100,9 @@ namespace Grpc.Core.Internal.Tests
return "PEER";
}
- public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
+ public void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
- UnaryResponseClientHandler = callback;
+ UnaryResponseClientCallback = callback;
}
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
@@ -110,55 +110,55 @@ namespace Grpc.Core.Internal.Tests
throw new NotImplementedException();
}
- public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
+ public void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
- UnaryResponseClientHandler = callback;
+ UnaryResponseClientCallback = callback;
}
- public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
+ public void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
- ReceivedStatusOnClientHandler = callback;
+ ReceivedStatusOnClientCallback = callback;
}
- public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
+ public void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
- ReceivedStatusOnClientHandler = callback;
+ ReceivedStatusOnClientCallback = callback;
}
- public void StartReceiveMessage(ReceivedMessageHandler callback)
+ public void StartReceiveMessage(IReceivedMessageCallback callback)
{
- ReceivedMessageHandler = callback;
+ ReceivedMessageCallback = callback;
}
- public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
+ public void StartReceiveInitialMetadata(IReceivedResponseHeadersCallback callback)
{
- ReceivedResponseHeadersHandler = callback;
+ ReceivedResponseHeadersCallback = callback;
}
- public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
+ public void StartSendInitialMetadata(ISendCompletionCallback callback, MetadataArraySafeHandle metadataArray)
{
- SendCompletionHandler = callback;
+ SendCompletionCallback = callback;
}
- public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
+ public void StartSendMessage(ISendCompletionCallback callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
- SendCompletionHandler = callback;
+ SendCompletionCallback = callback;
}
- public void StartSendCloseFromClient(SendCompletionHandler callback)
+ public void StartSendCloseFromClient(ISendCompletionCallback callback)
{
- SendCompletionHandler = callback;
+ SendCompletionCallback = callback;
}
- public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ public void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
byte[] optionalPayload, WriteFlags writeFlags)
{
- SendStatusFromServerHandler = callback;
+ SendStatusFromServerCallback = callback;
}
- public void StartServerSide(ReceivedCloseOnServerHandler callback)
+ public void StartServerSide(IReceivedCloseOnServerCallback callback)
{
- ReceivedCloseOnServerHandler = callback;
+ ReceivedCloseOnServerCallback = callback;
}
public void Dispose()
diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
index 7529c44c4e..43f816bb1c 100644
--- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
+++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
@@ -63,7 +63,7 @@ namespace Grpc.Core.Tests
[Ignore("Prevent running on Jenkins")]
public void NativeCallbackBenchmark()
{
- OpCompletionDelegate handler = Handler;
+ NativeCallbackTestDelegate handler = Handler;
counter = 0;
BenchmarkUtil.RunBenchmark(
@@ -91,7 +91,7 @@ namespace Grpc.Core.Tests
10000, 10000,
() =>
{
- Native.grpcsharp_test_callback(new OpCompletionDelegate(Handler));
+ Native.grpcsharp_test_callback(new NativeCallbackTestDelegate(Handler));
});
Assert.AreNotEqual(0, counter);
}
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 1803920662..f9925a8a76 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -127,6 +127,20 @@ namespace Grpc.Core
}
}
+ // cached handler for watch connectivity state
+ static readonly BatchCompletionDelegate WatchConnectivityStateHandler = (success, ctx, state) =>
+ {
+ var tcs = (TaskCompletionSource<object>) state;
+ if (success)
+ {
+ tcs.SetResult(null);
+ }
+ else
+ {
+ tcs.SetCanceled();
+ }
+ };
+
/// <summary>
/// Returned tasks completes once channel state has become different from
/// given lastObservedState.
@@ -138,18 +152,8 @@ namespace Grpc.Core
"Shutdown is a terminal state. No further state changes can occur.");
var tcs = new TaskCompletionSource<object>();
var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture;
- var handler = new BatchCompletionDelegate((success, ctx) =>
- {
- if (success)
- {
- tcs.SetResult(null);
- }
- else
- {
- tcs.SetCanceled();
- }
- });
- handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, handler);
+ // pass "tcs" as "state" for WatchConnectivityStateHandler.
+ handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, WatchConnectivityStateHandler, tcs);
return tcs.Task;
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 09fb722c81..aa2161267a 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -27,7 +27,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Manages client side native call lifecycle.
/// </summary>
- internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
+ internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>, IUnaryResponseClientCallback, IReceivedStatusOnClientCallback, IReceivedResponseHeadersCallback
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
@@ -138,7 +138,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartUnary(HandleUnaryResponse, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
+ call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
}
return unaryResponseTcs.Task;
}
@@ -162,7 +162,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartClientStreaming(HandleUnaryResponse, metadataArray, details.Options.Flags);
+ call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags);
}
return unaryResponseTcs.Task;
@@ -188,9 +188,9 @@ namespace Grpc.Core.Internal
streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartServerStreaming(HandleFinished, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
+ call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
}
- call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
+ call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
}
}
@@ -210,9 +210,9 @@ namespace Grpc.Core.Internal
streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartDuplexStreaming(HandleFinished, metadataArray, details.Options.Flags);
+ call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags);
}
- call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
+ call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
}
}
@@ -256,7 +256,7 @@ namespace Grpc.Core.Internal
halfcloseRequested = true;
return TaskUtils.CompletedTask;
}
- call.StartSendCloseFromClient(HandleSendFinished);
+ call.StartSendCloseFromClient(SendCompletionCallback);
halfcloseRequested = true;
streamingWriteTcs = new TaskCompletionSource<object>();
@@ -516,5 +516,26 @@ namespace Grpc.Core.Internal
streamingResponseCallFinishedTcs.SetResult(null);
}
+
+ IUnaryResponseClientCallback UnaryResponseClientCallback => this;
+
+ void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
+ {
+ HandleUnaryResponse(success, receivedStatus, receivedMessage, responseHeaders);
+ }
+
+ IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback => this;
+
+ void IReceivedStatusOnClientCallback.OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus)
+ {
+ HandleFinished(success, receivedStatus);
+ }
+
+ IReceivedResponseHeadersCallback ReceivedResponseHeadersCallback => this;
+
+ void IReceivedResponseHeadersCallback.OnReceivedResponseHeaders(bool success, Metadata responseHeaders)
+ {
+ HandleReceivedResponseHeaders(success, responseHeaders);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index f379c85e00..3273c26b88 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -35,7 +35,7 @@ namespace Grpc.Core.Internal
/// Base for handling both client side and server side calls.
/// Manages native call lifecycle and provides convenience methods.
/// </summary>
- internal abstract class AsyncCallBase<TWrite, TRead>
+ internal abstract class AsyncCallBase<TWrite, TRead> : IReceivedMessageCallback, ISendCompletionCallback
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>();
protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message.");
@@ -126,7 +126,7 @@ namespace Grpc.Core.Internal
return earlyResult;
}
- call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
+ call.StartSendMessage(SendCompletionCallback, payload, writeFlags, !initialMetadataSent);
initialMetadataSent = true;
streamingWritesCounter++;
@@ -154,7 +154,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time");
GrpcPreconditions.CheckState(!disposed);
- call.StartReceiveMessage(HandleReadFinished);
+ call.StartReceiveMessage(ReceivedMessageCallback);
streamingReadTcs = new TaskCompletionSource<TRead>();
return streamingReadTcs.Task;
}
@@ -342,5 +342,19 @@ namespace Grpc.Core.Internal
}
origTcs.SetResult(msg);
}
+
+ protected ISendCompletionCallback SendCompletionCallback => this;
+
+ void ISendCompletionCallback.OnSendCompletion(bool success)
+ {
+ HandleSendFinished(success);
+ }
+
+ IReceivedMessageCallback ReceivedMessageCallback => this;
+
+ void IReceivedMessageCallback.OnReceivedMessage(bool success, byte[] receivedMessage)
+ {
+ HandleReadFinished(success, receivedMessage);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 271a6ffadf..11acb27533 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -31,7 +31,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Manages server side native call lifecycle.
/// </summary>
- internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
+ internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>, IReceivedCloseOnServerCallback, ISendStatusFromServerCompletionCallback
{
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
@@ -70,7 +70,7 @@ namespace Grpc.Core.Internal
started = true;
- call.StartServerSide(HandleFinishedServerside);
+ call.StartServerSide(ReceiveCloseOnServerCallback);
return finishedServersideTcs.Task;
}
}
@@ -114,7 +114,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.StartSendInitialMetadata(HandleSendFinished, metadataArray);
+ call.StartSendInitialMetadata(SendCompletionCallback, metadataArray);
}
this.initialMetadataSent = true;
@@ -127,10 +127,10 @@ namespace Grpc.Core.Internal
/// Sends call result status, indicating we are done with writes.
/// Sending a status different from StatusCode.OK will also implicitly cancel the call.
/// </summary>
- public Task SendStatusFromServerAsync(Status status, Metadata trailers, Tuple<TResponse, WriteFlags> optionalWrite)
+ public Task SendStatusFromServerAsync(Status status, Metadata trailers, ResponseWithFlags? optionalWrite)
{
- byte[] payload = optionalWrite != null ? UnsafeSerialize(optionalWrite.Item1) : null;
- var writeFlags = optionalWrite != null ? optionalWrite.Item2 : default(WriteFlags);
+ byte[] payload = optionalWrite.HasValue ? UnsafeSerialize(optionalWrite.Value.Response) : null;
+ var writeFlags = optionalWrite.HasValue ? optionalWrite.Value.WriteFlags : default(WriteFlags);
lock (myLock)
{
@@ -140,13 +140,13 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent,
+ call.StartSendStatusFromServer(SendStatusFromServerCompletionCallback, status, metadataArray, !initialMetadataSent,
payload, writeFlags);
}
halfcloseRequested = true;
initialMetadataSent = true;
sendStatusFromServerTcs = new TaskCompletionSource<object>();
- if (optionalWrite != null)
+ if (optionalWrite.HasValue)
{
streamingWritesCounter++;
}
@@ -227,5 +227,31 @@ namespace Grpc.Core.Internal
finishedServersideTcs.SetResult(null);
}
+
+ IReceivedCloseOnServerCallback ReceiveCloseOnServerCallback => this;
+
+ void IReceivedCloseOnServerCallback.OnReceivedCloseOnServer(bool success, bool cancelled)
+ {
+ HandleFinishedServerside(success, cancelled);
+ }
+
+ ISendStatusFromServerCompletionCallback SendStatusFromServerCompletionCallback => this;
+
+ void ISendStatusFromServerCompletionCallback.OnSendStatusFromServerCompletion(bool success)
+ {
+ HandleSendStatusFromServerFinished(success);
+ }
+
+ public struct ResponseWithFlags
+ {
+ public ResponseWithFlags(TResponse response, WriteFlags writeFlags)
+ {
+ this.Response = response;
+ this.WriteFlags = writeFlags;
+ }
+
+ public TResponse Response { get; }
+ public WriteFlags WriteFlags { get; }
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index cd5e3d8911..1e6f1fba37 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -20,15 +20,25 @@ using System;
using System.Runtime.InteropServices;
using System.Text;
using Grpc.Core;
+using Grpc.Core.Logging;
+using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
+ internal interface IOpCompletionCallback
+ {
+ void OnComplete(bool success);
+ }
+
/// <summary>
/// grpcsharp_batch_context
/// </summary>
- internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid
+ internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback
{
static readonly NativeMethods Native = NativeMethods.Get();
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>();
+
+ CompletionCallbackData completionCallbackData;
private BatchContextSafeHandle()
{
@@ -47,19 +57,26 @@ namespace Grpc.Core.Internal
}
}
+ public void SetCompletionCallback(BatchCompletionDelegate callback, object state)
+ {
+ GrpcPreconditions.CheckState(completionCallbackData.Callback == null);
+ GrpcPreconditions.CheckNotNull(callback, nameof(callback));
+ completionCallbackData = new CompletionCallbackData(callback, state);
+ }
+
// Gets data of recv_initial_metadata completion.
public Metadata GetReceivedInitialMetadata()
{
IntPtr metadataArrayPtr = Native.grpcsharp_batch_context_recv_initial_metadata(this);
return MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
}
-
+
// Gets data of recv_status_on_client completion.
public ClientSideStatus GetReceivedStatusOnClient()
{
UIntPtr detailsLength;
IntPtr detailsPtr = Native.grpcsharp_batch_context_recv_status_on_client_details(this, out detailsLength);
- string details = MarshalUtils.PtrToStringUTF8(detailsPtr, (int) detailsLength.ToUInt32());
+ string details = MarshalUtils.PtrToStringUTF8(detailsPtr, (int)detailsLength.ToUInt32());
var status = new Status(Native.grpcsharp_batch_context_recv_status_on_client_status(this), details);
IntPtr metadataArrayPtr = Native.grpcsharp_batch_context_recv_status_on_client_trailing_metadata(this);
@@ -86,11 +103,40 @@ namespace Grpc.Core.Internal
{
return Native.grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0;
}
-
+
protected override bool ReleaseHandle()
{
Native.grpcsharp_batch_context_destroy(handle);
return true;
}
+
+ void IOpCompletionCallback.OnComplete(bool success)
+ {
+ try
+ {
+ completionCallbackData.Callback(success, this, completionCallbackData.State);
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, "Exception occured while invoking batch completion delegate.");
+ }
+ finally
+ {
+ completionCallbackData = default(CompletionCallbackData);
+ Dispose();
+ }
+ }
+
+ struct CompletionCallbackData
+ {
+ public CompletionCallbackData(BatchCompletionDelegate callback, object state)
+ {
+ this.Callback = callback;
+ this.State = state;
+ }
+
+ public BatchCompletionDelegate Callback { get; }
+ public object State { get; }
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 3a7f97707b..d6a5ba586b 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -32,6 +32,23 @@ namespace Grpc.Core.Internal
public static readonly CallSafeHandle NullInstance = new CallSafeHandle();
static readonly NativeMethods Native = NativeMethods.Get();
+ // Completion handlers are pre-allocated to avoid unneccessary delegate allocations.
+ // The "state" field is used to store the actual callback to invoke.
+ static readonly BatchCompletionDelegate CompletionHandler_IUnaryResponseClientCallback =
+ (success, context, state) => ((IUnaryResponseClientCallback)state).OnUnaryResponseClient(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata());
+ static readonly BatchCompletionDelegate CompletionHandler_IReceivedStatusOnClientCallback =
+ (success, context, state) => ((IReceivedStatusOnClientCallback)state).OnReceivedStatusOnClient(success, context.GetReceivedStatusOnClient());
+ static readonly BatchCompletionDelegate CompletionHandler_IReceivedMessageCallback =
+ (success, context, state) => ((IReceivedMessageCallback)state).OnReceivedMessage(success, context.GetReceivedMessage());
+ static readonly BatchCompletionDelegate CompletionHandler_IReceivedResponseHeadersCallback =
+ (success, context, state) => ((IReceivedResponseHeadersCallback)state).OnReceivedResponseHeaders(success, context.GetReceivedInitialMetadata());
+ static readonly BatchCompletionDelegate CompletionHandler_ISendCompletionCallback =
+ (success, context, state) => ((ISendCompletionCallback)state).OnSendCompletion(success);
+ static readonly BatchCompletionDelegate CompletionHandler_ISendStatusFromServerCompletionCallback =
+ (success, context, state) => ((ISendStatusFromServerCompletionCallback)state).OnSendStatusFromServerCompletion(success);
+ static readonly BatchCompletionDelegate CompletionHandler_IReceivedCloseOnServerCallback =
+ (success, context, state) => ((IReceivedCloseOnServerCallback)state).OnReceivedCloseOnServer(success, context.GetReceivedCloseOnServerCancelled());
+
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionQueueSafeHandle completionQueue;
@@ -49,12 +66,12 @@ namespace Grpc.Core.Internal
Native.grpcsharp_call_set_credentials(this, credentials).CheckOk();
}
- public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
+ public void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback);
Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags)
.CheckOk();
}
@@ -66,106 +83,106 @@ namespace Grpc.Core.Internal
.CheckOk();
}
- public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
+ public void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback);
Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray, callFlags).CheckOk();
}
}
- public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
+ public void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback);
Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags).CheckOk();
}
}
- public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
+ public void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback);
Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, callFlags).CheckOk();
}
}
- public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
+ public void StartSendMessage(ISendCompletionCallback callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback);
Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata ? 1 : 0).CheckOk();
}
}
- public void StartSendCloseFromClient(SendCompletionHandler callback)
+ public void StartSendCloseFromClient(ISendCompletionCallback callback)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback);
Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
}
- public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ public void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
byte[] optionalPayload, WriteFlags writeFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendStatusFromServerCompletionCallback, callback);
var statusDetailBytes = MarshalUtils.GetBytesUTF8(status.Detail);
Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata ? 1 : 0,
optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
}
}
- public void StartReceiveMessage(ReceivedMessageHandler callback)
+ public void StartReceiveMessage(IReceivedMessageCallback callback)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedMessageCallback, callback);
Native.grpcsharp_call_recv_message(this, ctx).CheckOk();
}
}
- public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
+ public void StartReceiveInitialMetadata(IReceivedResponseHeadersCallback callback)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedResponseHeadersCallback, callback);
Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
}
}
- public void StartServerSide(ReceivedCloseOnServerHandler callback)
+ public void StartServerSide(IReceivedCloseOnServerCallback callback)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedCloseOnServerCallback, callback);
Native.grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
}
- public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
+ public void StartSendInitialMetadata(ISendCompletionCallback callback, MetadataArraySafeHandle metadataArray)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback);
Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index f826a17bad..1eeb0e3d97 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -64,10 +64,10 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0);
}
- public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
+ public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback, object callbackState)
{
var ctx = BatchContextSafeHandle.Create();
- cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback, callbackState);
Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
index 1102c8d14f..b68655b33c 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
@@ -19,15 +19,15 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Runtime.InteropServices;
+using System.Threading;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
- internal delegate void OpCompletionDelegate(bool success);
-
- internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx);
+ internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx, object state);
internal delegate void RequestCallCompletionDelegate(bool success, RequestCallContextSafeHandle ctx);
@@ -36,8 +36,8 @@ namespace Grpc.Core.Internal
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<CompletionRegistry>();
readonly GrpcEnvironment environment;
- readonly Dictionary<IntPtr, OpCompletionDelegate> dict = new Dictionary<IntPtr, OpCompletionDelegate>(new IntPtrComparer());
- readonly object myLock = new object();
+ readonly Dictionary<IntPtr, IOpCompletionCallback> dict = new Dictionary<IntPtr, IOpCompletionCallback>(new IntPtrComparer());
+ SpinLock spinLock = new SpinLock(Debugger.IsAttached);
IntPtr lastRegisteredKey; // only for testing
public CompletionRegistry(GrpcEnvironment environment)
@@ -45,38 +45,51 @@ namespace Grpc.Core.Internal
this.environment = environment;
}
- public void Register(IntPtr key, OpCompletionDelegate callback)
+ public void Register(IntPtr key, IOpCompletionCallback callback)
{
environment.DebugStats.PendingBatchCompletions.Increment();
- lock (myLock)
+
+ bool lockTaken = false;
+ try
{
+ spinLock.Enter(ref lockTaken);
+
dict.Add(key, callback);
this.lastRegisteredKey = key;
}
+ finally
+ {
+ if (lockTaken) spinLock.Exit();
+ }
}
- public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
+ public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback, object state)
{
- // TODO(jtattermusch): get rid of new delegate creation here
- OpCompletionDelegate opCallback = ((success) => HandleBatchCompletion(success, ctx, callback));
- Register(ctx.Handle, opCallback);
+ ctx.SetCompletionCallback(callback, state);
+ Register(ctx.Handle, ctx);
}
public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback)
{
- // TODO(jtattermusch): get rid of new delegate creation here
- OpCompletionDelegate opCallback = ((success) => HandleRequestCallCompletion(success, ctx, callback));
- Register(ctx.Handle, opCallback);
+ ctx.CompletionCallback = callback;
+ Register(ctx.Handle, ctx);
}
- public OpCompletionDelegate Extract(IntPtr key)
+ public IOpCompletionCallback Extract(IntPtr key)
{
- OpCompletionDelegate value = null;
- lock (myLock)
+ IOpCompletionCallback value = null;
+ bool lockTaken = false;
+ try
{
+ spinLock.Enter(ref lockTaken);
+
value = dict[key];
dict.Remove(key);
}
+ finally
+ {
+ if (lockTaken) spinLock.Exit();
+ }
environment.DebugStats.PendingBatchCompletions.Decrement();
return value;
}
@@ -89,44 +102,6 @@ namespace Grpc.Core.Internal
get { return this.lastRegisteredKey; }
}
- private static void HandleBatchCompletion(bool success, BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
- {
- try
- {
- callback(success, ctx);
- }
- catch (Exception e)
- {
- Logger.Error(e, "Exception occured while invoking batch completion delegate.");
- }
- finally
- {
- if (ctx != null)
- {
- ctx.Dispose();
- }
- }
- }
-
- private static void HandleRequestCallCompletion(bool success, RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback)
- {
- try
- {
- callback(success, ctx);
- }
- catch (Exception e)
- {
- Logger.Error(e, "Exception occured while invoking request call completion delegate.");
- }
- finally
- {
- if (ctx != null)
- {
- ctx.Dispose();
- }
- }
- }
-
/// <summary>
/// IntPtr doesn't implement <c>IEquatable{IntPtr}</c> so we need to use custom comparer to avoid boxing.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index f7f723c00b..bd0229a9dd 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -68,8 +68,8 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
"Thread pool size cannot be smaller than the number of completion queues used.");
- this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true));
- this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false));
+ this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((IOpCompletionCallback) callback, true));
+ this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((IOpCompletionCallback) callback, false));
}
public void Start()
@@ -225,11 +225,11 @@ namespace Grpc.Core.Internal
return list.AsReadOnly();
}
- private void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success)
+ private void RunCompletionQueueEventCallback(IOpCompletionCallback callback, bool success)
{
try
{
- callback(success);
+ callback.OnComplete(success);
}
catch (Exception e)
{
diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs
index f9c06583c8..5c35b2ba46 100644
--- a/src/csharp/Grpc.Core/Internal/INativeCall.cs
+++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs
@@ -20,18 +20,41 @@ using Grpc.Core;
namespace Grpc.Core.Internal
{
- internal delegate void UnaryResponseClientHandler(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders);
+ internal interface IUnaryResponseClientCallback
+ {
+ void OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders);
+ }
// Received status for streaming response calls.
- internal delegate void ReceivedStatusOnClientHandler(bool success, ClientSideStatus receivedStatus);
+ internal interface IReceivedStatusOnClientCallback
+ {
+ void OnReceivedStatusOnClient(bool success, ClientSideStatus receivedStatus);
+ }
- internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage);
+ internal interface IReceivedMessageCallback
+ {
+ void OnReceivedMessage(bool success, byte[] receivedMessage);
+ }
- internal delegate void ReceivedResponseHeadersHandler(bool success, Metadata responseHeaders);
+ internal interface IReceivedResponseHeadersCallback
+ {
+ void OnReceivedResponseHeaders(bool success, Metadata responseHeaders);
+ }
- internal delegate void SendCompletionHandler(bool success);
+ internal interface ISendCompletionCallback
+ {
+ void OnSendCompletion(bool success);
+ }
- internal delegate void ReceivedCloseOnServerHandler(bool success, bool cancelled);
+ internal interface ISendStatusFromServerCompletionCallback
+ {
+ void OnSendStatusFromServerCompletion(bool success);
+ }
+
+ internal interface IReceivedCloseOnServerCallback
+ {
+ void OnReceivedCloseOnServer(bool success, bool cancelled);
+ }
/// <summary>
/// Abstraction of a native call object.
@@ -44,28 +67,28 @@ namespace Grpc.Core.Internal
string GetPeer();
- void StartUnary(UnaryResponseClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
+ void StartUnary(IUnaryResponseClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
void StartUnary(BatchContextSafeHandle ctx, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
- void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
+ void StartClientStreaming(IUnaryResponseClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
- void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
+ void StartServerStreaming(IReceivedStatusOnClientCallback callback, byte[] payload, WriteFlags writeFlags, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
- void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
+ void StartDuplexStreaming(IReceivedStatusOnClientCallback callback, MetadataArraySafeHandle metadataArray, CallFlags callFlags);
- void StartReceiveMessage(ReceivedMessageHandler callback);
+ void StartReceiveMessage(IReceivedMessageCallback callback);
- void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback);
+ void StartReceiveInitialMetadata(IReceivedResponseHeadersCallback callback);
- void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray);
+ void StartSendInitialMetadata(ISendCompletionCallback callback, MetadataArraySafeHandle metadataArray);
- void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+ void StartSendMessage(ISendCompletionCallback callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
- void StartSendCloseFromClient(SendCompletionHandler callback);
+ void StartSendCloseFromClient(ISendCompletionCallback callback);
- void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags);
+ void StartSendStatusFromServer(ISendStatusFromServerCompletionCallback callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, WriteFlags writeFlags);
- void StartServerSide(ReceivedCloseOnServerHandler callback);
+ void StartServerSide(IReceivedCloseOnServerCallback callback);
}
}
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
index 22faa19d9b..d517252cfe 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
@@ -29,6 +29,8 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
+ internal delegate void NativeCallbackTestDelegate(bool success);
+
/// <summary>
/// Provides access to all native methods provided by <c>NativeExtension</c>.
/// An extra level of indirection is added to P/Invoke calls to allow intelligent loading
@@ -420,7 +422,7 @@ namespace Grpc.Core.Internal
public delegate Timespec gprsharp_convert_clock_type_delegate(Timespec t, ClockType targetClock);
public delegate int gprsharp_sizeof_timespec_delegate();
- public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
+ public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] NativeCallbackTestDelegate callback);
public delegate IntPtr grpcsharp_test_nop_delegate(IntPtr ptr);
public delegate void grpcsharp_test_override_method_delegate(string methodName, string variant);
}
diff --git a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs
index b7af0c102d..09f5c3e452 100644
--- a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs
@@ -19,15 +19,17 @@
using System;
using System.Runtime.InteropServices;
using Grpc.Core;
+using Grpc.Core.Logging;
namespace Grpc.Core.Internal
{
/// <summary>
/// grpcsharp_request_call_context
/// </summary>
- internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid
+ internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback
{
static readonly NativeMethods Native = NativeMethods.Get();
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<RequestCallContextSafeHandle>();
private RequestCallContextSafeHandle()
{
@@ -46,6 +48,8 @@ namespace Grpc.Core.Internal
}
}
+ public RequestCallCompletionDelegate CompletionCallback { get; set; }
+
// Gets data of server_rpc_new completion.
public ServerRpcNew GetServerRpcNew(Server server)
{
@@ -72,5 +76,22 @@ namespace Grpc.Core.Internal
Native.grpcsharp_request_call_context_destroy(handle);
return true;
}
+
+ void IOpCompletionCallback.OnComplete(bool success)
+ {
+ try
+ {
+ CompletionCallback(success, this);
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, "Exception occured while invoking request call completion delegate.");
+ }
+ finally
+ {
+ CompletionCallback = null;
+ Dispose();
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 6019f8e793..98995a0862 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -60,7 +60,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- Tuple<TResponse,WriteFlags> responseTuple = null;
+ AsyncCallServer<TRequest,TResponse>.ResponseWithFlags? responseWithFlags = null;
var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken);
try
{
@@ -68,7 +68,7 @@ namespace Grpc.Core.Internal
var request = requestStream.Current;
var response = await handler(request, context).ConfigureAwait(false);
status = context.Status;
- responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
+ responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
catch (Exception e)
{
@@ -80,7 +80,7 @@ namespace Grpc.Core.Internal
}
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseWithFlags).ConfigureAwait(false);
}
catch (Exception)
{
@@ -177,13 +177,13 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- Tuple<TResponse,WriteFlags> responseTuple = null;
+ AsyncCallServer<TRequest, TResponse>.ResponseWithFlags? responseWithFlags = null;
var context = HandlerUtils.NewContext(newRpc, responseStream, asyncCall.CancellationToken);
try
{
var response = await handler(requestStream, context).ConfigureAwait(false);
status = context.Status;
- responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
+ responseWithFlags = new AsyncCallServer<TRequest, TResponse>.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
catch (Exception e)
{
@@ -196,7 +196,7 @@ namespace Grpc.Core.Internal
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseWithFlags).ConfigureAwait(false);
}
catch (Exception)
{
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index 63000e9a22..a308890cde 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -59,13 +59,15 @@ namespace Grpc.Core.Internal
{
Native.grpcsharp_server_start(this);
}
-
+
public void ShutdownAndNotify(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ // TODO(jtattermusch): delegate allocation by caller can be avoided by utilizing the "state" object,
+ // but server shutdown isn't worth optimizing right now.
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback, null);
Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx);
}
}
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index 77ad876bdf..71c7f108f3 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -387,7 +387,7 @@ namespace Grpc.Core
/// <summary>
/// Handles native callback.
/// </summary>
- private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx)
+ private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx, object state)
{
shutdownTcs.SetResult(null);
}
diff --git a/src/csharp/Grpc.Core/Version.csproj.include b/src/csharp/Grpc.Core/Version.csproj.include
index 7cae4a55d4..2d9e4ba16a 100755
--- a/src/csharp/Grpc.Core/Version.csproj.include
+++ b/src/csharp/Grpc.Core/Version.csproj.include
@@ -1,7 +1,7 @@
<!-- This file is generated -->
<Project>
<PropertyGroup>
- <GrpcCsharpVersion>1.7.2</GrpcCsharpVersion>
+ <GrpcCsharpVersion>1.9.0-dev</GrpcCsharpVersion>
<GoogleProtobufVersion>3.3.0</GoogleProtobufVersion>
</PropertyGroup>
</Project>
diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs
index e4c97ffda7..9b5da1c947 100644
--- a/src/csharp/Grpc.Core/VersionInfo.cs
+++ b/src/csharp/Grpc.Core/VersionInfo.cs
@@ -33,11 +33,11 @@ namespace Grpc.Core
/// <summary>
/// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies
/// </summary>
- public const string CurrentAssemblyFileVersion = "1.7.2.0";
+ public const string CurrentAssemblyFileVersion = "1.9.0.0";
/// <summary>
/// Current version of gRPC C#
/// </summary>
- public const string CurrentVersion = "1.7.2";
+ public const string CurrentVersion = "1.9.0-dev";
}
}
diff --git a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs
new file mode 100644
index 0000000000..2d1c33e9a0
--- /dev/null
+++ b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs
@@ -0,0 +1,78 @@
+#region Copyright notice and license
+
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Runtime.InteropServices;
+using System.Threading;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using System.Collections.Generic;
+using System.Diagnostics;
+
+namespace Grpc.Microbenchmarks
+{
+ public class CompletionRegistryBenchmark
+ {
+ GrpcEnvironment environment;
+
+ public void Init()
+ {
+ environment = GrpcEnvironment.AddRef();
+ }
+
+ public void Cleanup()
+ {
+ GrpcEnvironment.ReleaseAsync().Wait();
+ }
+
+ public void Run(int threadCount, int iterations, bool useSharedRegistry)
+ {
+ Console.WriteLine(string.Format("CompletionRegistryBenchmark: threads={0}, iterations={1}, useSharedRegistry={2}", threadCount, iterations, useSharedRegistry));
+ CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment) : null;
+ var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, sharedRegistry));
+ threadedBenchmark.Run();
+ // TODO: parametrize by number of pending completions
+ }
+
+ private void ThreadBody(int iterations, CompletionRegistry optionalSharedRegistry)
+ {
+ var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment);
+ var ctx = BatchContextSafeHandle.Create();
+
+ var stopwatch = Stopwatch.StartNew();
+ for (int i = 0; i < iterations; i++)
+ {
+ completionRegistry.Register(ctx.Handle, ctx);
+ var callback = completionRegistry.Extract(ctx.Handle);
+ // NOTE: we are not calling the callback to avoid disposing ctx.
+ }
+ stopwatch.Stop();
+ Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds);
+
+ ctx.Dispose();
+ }
+
+ private class NopCompletionCallback : IOpCompletionCallback
+ {
+ public void OnComplete(bool success)
+ {
+
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Microbenchmarks/GCStats.cs b/src/csharp/Grpc.Microbenchmarks/GCStats.cs
new file mode 100644
index 0000000000..ca7051ec4e
--- /dev/null
+++ b/src/csharp/Grpc.Microbenchmarks/GCStats.cs
@@ -0,0 +1,69 @@
+#region Copyright notice and license
+
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using Grpc.Core;
+using Grpc.Core.Internal;
+
+namespace Grpc.Microbenchmarks
+{
+ internal class GCStats
+ {
+ readonly object myLock = new object();
+ GCStatsSnapshot lastSnapshot;
+
+ public GCStats()
+ {
+ lastSnapshot = new GCStatsSnapshot(GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2));
+ }
+
+ public GCStatsSnapshot GetSnapshot(bool reset = false)
+ {
+ lock (myLock)
+ {
+ var newSnapshot = new GCStatsSnapshot(GC.CollectionCount(0) - lastSnapshot.Gen0,
+ GC.CollectionCount(1) - lastSnapshot.Gen1,
+ GC.CollectionCount(2) - lastSnapshot.Gen2);
+ if (reset)
+ {
+ lastSnapshot = newSnapshot;
+ }
+ return newSnapshot;
+ }
+ }
+ }
+
+ public class GCStatsSnapshot
+ {
+ public GCStatsSnapshot(int gen0, int gen1, int gen2)
+ {
+ this.Gen0 = gen0;
+ this.Gen1 = gen1;
+ this.Gen2 = gen2;
+ }
+
+ public int Gen0 { get; }
+ public int Gen1 { get; }
+ public int Gen2 { get; }
+
+ public override string ToString()
+ {
+ return string.Format("[GCCollectionCount: gen0 {0}, gen1 {1}, gen2 {2}]", Gen0, Gen1, Gen2);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj b/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj
index 108357e4eb..8a629f9748 100644
--- a/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj
+++ b/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj
@@ -15,6 +15,10 @@
<ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" />
</ItemGroup>
+ <ItemGroup>
+ <PackageReference Include="CommandLineParser" Version="2.1.1-beta" />
+ </ItemGroup>
+
<ItemGroup Condition=" '$(TargetFramework)' == 'net45' ">
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
diff --git a/src/csharp/Grpc.Microbenchmarks/PInvokeByteArrayBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/PInvokeByteArrayBenchmark.cs
new file mode 100644
index 0000000000..787b5508fb
--- /dev/null
+++ b/src/csharp/Grpc.Microbenchmarks/PInvokeByteArrayBenchmark.cs
@@ -0,0 +1,64 @@
+#region Copyright notice and license
+
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Runtime.InteropServices;
+using System.Threading;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using System.Collections.Generic;
+using System.Diagnostics;
+
+namespace Grpc.Microbenchmarks
+{
+ public class PInvokeByteArrayBenchmark
+ {
+ static readonly NativeMethods Native = NativeMethods.Get();
+
+ public void Init()
+ {
+ }
+
+ public void Cleanup()
+ {
+ }
+
+ public void Run(int threadCount, int iterations, int payloadSize)
+ {
+ Console.WriteLine(string.Format("PInvokeByteArrayBenchmark: threads={0}, iterations={1}, payloadSize={2}", threadCount, iterations, payloadSize));
+ var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, payloadSize));
+ threadedBenchmark.Run();
+ }
+
+ private void ThreadBody(int iterations, int payloadSize)
+ {
+ var payload = new byte[payloadSize];
+
+ var stopwatch = Stopwatch.StartNew();
+ for (int i = 0; i < iterations; i++)
+ {
+ var gcHandle = GCHandle.Alloc(payload, GCHandleType.Pinned);
+ var payloadPtr = gcHandle.AddrOfPinnedObject();
+ Native.grpcsharp_test_nop(payloadPtr);
+ gcHandle.Free();
+ }
+ stopwatch.Stop();
+ Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Microbenchmarks/Program.cs b/src/csharp/Grpc.Microbenchmarks/Program.cs
index d07d4187c4..a64c2979ab 100644
--- a/src/csharp/Grpc.Microbenchmarks/Program.cs
+++ b/src/csharp/Grpc.Microbenchmarks/Program.cs
@@ -20,14 +20,84 @@ using System;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
+using CommandLine;
+using CommandLine.Text;
namespace Grpc.Microbenchmarks
{
class Program
{
+ public enum MicrobenchmarkType
+ {
+ CompletionRegistry,
+ PInvokeByteArray,
+ SendMessage
+ }
+
+ private class BenchmarkOptions
+ {
+ [Option("benchmark", Required = true, HelpText = "Benchmark to run")]
+ public MicrobenchmarkType Benchmark { get; set; }
+ }
+
public static void Main(string[] args)
{
GrpcEnvironment.SetLogger(new ConsoleLogger());
+ var parserResult = Parser.Default.ParseArguments<BenchmarkOptions>(args)
+ .WithNotParsed(errors => {
+ Console.WriteLine("Supported benchmarks:");
+ foreach (var enumValue in Enum.GetValues(typeof(MicrobenchmarkType)))
+ {
+ Console.WriteLine(" " + enumValue);
+ }
+ Environment.Exit(1);
+ })
+ .WithParsed(options =>
+ {
+ switch (options.Benchmark)
+ {
+ case MicrobenchmarkType.CompletionRegistry:
+ RunCompletionRegistryBenchmark();
+ break;
+ case MicrobenchmarkType.PInvokeByteArray:
+ RunPInvokeByteArrayBenchmark();
+ break;
+ case MicrobenchmarkType.SendMessage:
+ RunSendMessageBenchmark();
+ break;
+ default:
+ throw new ArgumentException("Unsupported benchmark.");
+ }
+ });
+ }
+
+ static void RunCompletionRegistryBenchmark()
+ {
+ var benchmark = new CompletionRegistryBenchmark();
+ benchmark.Init();
+ foreach (int threadCount in new int[] {1, 1, 2, 4, 8, 12})
+ {
+ foreach (bool useSharedRegistry in new bool[] {false, true})
+ {
+ benchmark.Run(threadCount, 4 * 1000 * 1000, useSharedRegistry);
+ }
+ }
+ benchmark.Cleanup();
+ }
+
+ static void RunPInvokeByteArrayBenchmark()
+ {
+ var benchmark = new PInvokeByteArrayBenchmark();
+ benchmark.Init();
+ foreach (int threadCount in new int[] {1, 1, 2, 4, 8, 12})
+ {
+ benchmark.Run(threadCount, 4 * 1000 * 1000, 0);
+ }
+ benchmark.Cleanup();
+ }
+
+ static void RunSendMessageBenchmark()
+ {
var benchmark = new SendMessageBenchmark();
benchmark.Init();
foreach (int threadCount in new int[] {1, 1, 2, 4, 8, 12})
diff --git a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs
index de67874580..9cff97eb88 100644
--- a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs
+++ b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs
@@ -59,16 +59,16 @@ namespace Grpc.Microbenchmarks
var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry);
var call = CreateFakeCall(cq);
- var sendCompletionHandler = new SendCompletionHandler((success) => { });
+ var sendCompletionCallback = new NopSendCompletionCallback();
var payload = new byte[payloadSize];
var writeFlags = default(WriteFlags);
var stopwatch = Stopwatch.StartNew();
for (int i = 0; i < iterations; i++)
{
- call.StartSendMessage(sendCompletionHandler, payload, writeFlags, false);
+ call.StartSendMessage(sendCompletionCallback, payload, writeFlags, false);
var callback = completionRegistry.Extract(completionRegistry.LastRegisteredKey);
- callback(true);
+ callback.OnComplete(true);
}
stopwatch.Stop();
Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds);
@@ -87,5 +87,13 @@ namespace Grpc.Microbenchmarks
}
return call;
}
+
+ private class NopSendCompletionCallback : ISendCompletionCallback
+ {
+ public void OnSendCompletion(bool success)
+ {
+ // NOP
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs
index feac8d1690..95b9aaaf3f 100644
--- a/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs
+++ b/src/csharp/Grpc.Microbenchmarks/ThreadedBenchmark.cs
@@ -46,6 +46,7 @@ namespace Grpc.Microbenchmarks
public void Run()
{
Console.WriteLine("Running threads.");
+ var gcStats = new GCStats();
var threads = new List<Thread>();
for (int i = 0; i < runners.Count; i++)
{
@@ -58,7 +59,7 @@ namespace Grpc.Microbenchmarks
{
thread.Join();
}
- Console.WriteLine("All threads finished.");
+ Console.WriteLine("All threads finished (GC Stats Delta: " + gcStats.GetSnapshot() + ")");
}
}
}
diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat
index 3be049ae1d..8f89e2846a 100755
--- a/src/csharp/build_packages_dotnetcli.bat
+++ b/src/csharp/build_packages_dotnetcli.bat
@@ -13,7 +13,7 @@
@rem limitations under the License.
@rem Current package versions
-set VERSION=1.7.2
+set VERSION=1.9.0-dev
@rem Adjust the location of nuget.exe
set NUGET=C:\nuget\nuget.exe
diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh
index cf565b2e8d..6a6cafe2bd 100755
--- a/src/csharp/build_packages_dotnetcli.sh
+++ b/src/csharp/build_packages_dotnetcli.sh
@@ -39,7 +39,7 @@ dotnet pack --configuration Release Grpc.Auth --output ../../../artifacts
dotnet pack --configuration Release Grpc.HealthCheck --output ../../../artifacts
dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts
-nuget pack Grpc.nuspec -Version "1.7.2" -OutputDirectory ../../artifacts
-nuget pack Grpc.Tools.nuspec -Version "1.7.2" -OutputDirectory ../../artifacts
+nuget pack Grpc.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts
+nuget pack Grpc.Tools.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts
(cd ../../artifacts && zip csharp_nugets_dotnetcli.zip *.nupkg)