aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-02-21 17:51:52 -0800
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-02-23 11:35:49 -0800
commit50faa8f78b210a9922644f624d3c0a36b4e2a2db (patch)
treed726e42f02f400a67090c5e7e4d0354bdd9de738
parent7c15ee88a2cde0fbb7d88a0a31ae772c6895efe9 (diff)
Added support for true synchronous unary call and added some performance tests.
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs11
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj1
-rw-r--r--src/csharp/Grpc.Core.Tests/PInvokeTest.cs139
-rw-r--r--src/csharp/Grpc.Core/Calls.cs27
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj1
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs25
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs10
-rw-r--r--src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs68
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Client.cs11
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c25
10 files changed, 297 insertions, 21 deletions
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index 7e564a2fba..39be35c219 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -101,15 +101,8 @@ namespace Grpc.Core.Tests
using (Channel channel = new Channel(host + ":" + port))
{
var call = new Call<string, string>(unaryEchoStringMethod, channel);
-
- var stopwatch = new Stopwatch();
- stopwatch.Start();
- for (int i = 0; i < 1000; i++)
- {
- Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken));
- }
- stopwatch.Stop();
- Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms");
+ BenchmarkUtil.RunBenchmark(100, 1000,
+ () => { Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); });
}
server.ShutdownAsync().Wait();
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index 687be3c0cb..a365320f05 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -41,6 +41,7 @@
<Compile Include="ServerTest.cs" />
<Compile Include="GrpcEnvironmentTest.cs" />
<Compile Include="TimespecTest.cs" />
+ <Compile Include="PInvokeTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
new file mode 100644
index 0000000000..b78609ccf0
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
@@ -0,0 +1,139 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+using System.Runtime.InteropServices;
+
+namespace Grpc.Core.Tests
+{
+ public class PInvokeTest
+ {
+ int counter;
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern IntPtr grpcsharp_test_nop(IntPtr ptr);
+
+ [TestFixtureSetUp]
+ public void Init()
+ {
+ GrpcEnvironment.Initialize();
+ }
+
+ [TestFixtureTearDown]
+ public void Cleanup()
+ {
+ GrpcEnvironment.Shutdown();
+ }
+
+ [Test]
+ public void CompletionQueueCreateDestroyBenchmark()
+ {
+ BenchmarkUtil.RunBenchmark(
+ 100000, 1000000,
+ () => {
+ CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create();
+ cq.Dispose();
+ }
+ );
+ }
+
+
+ /// <summary>
+ /// Approximate results:
+ /// (mono ~80ns)
+ /// </summary>
+ [Test]
+ public void NativeCallbackBenchmark()
+ {
+ CompletionCallbackDelegate handler = Handler;
+
+ counter = 0;
+ BenchmarkUtil.RunBenchmark(
+ 1000000, 10000000,
+ () => {
+ grpcsharp_test_callback(handler);
+ }
+ );
+ Assert.AreNotEqual(0, counter);
+ }
+
+ /// <summary>
+ /// Creating a new native-to-managed callback has significant overhead
+ /// compared to using an existing one. We need to be aware of this.
+ /// (~50us on mono)
+ /// </summary>
+ [Test]
+ public void NewNativeCallbackBenchmark()
+ {
+ counter = 0;
+ BenchmarkUtil.RunBenchmark(
+ 10000, 10000,
+ () => {
+ grpcsharp_test_callback(new CompletionCallbackDelegate(Handler));
+ }
+ );
+ Assert.AreNotEqual(0, counter);
+ }
+
+ /// <summary>
+ /// Tests overhead of a simple PInvoke call.
+ /// </summary>
+ [Test]
+ public void NopPInvokeBenchmark()
+ {
+ CompletionCallbackDelegate handler = Handler;
+
+ BenchmarkUtil.RunBenchmark(
+ 1000000, 100000000,
+ () => {
+ grpcsharp_test_nop(IntPtr.Zero);
+ }
+ );
+ }
+
+ private void Handler(GRPCOpError op, IntPtr ptr) {
+ counter ++;
+ }
+ }
+}
+
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs
index b67332676a..c0973234e2 100644
--- a/src/csharp/Grpc.Core/Calls.cs
+++ b/src/csharp/Grpc.Core/Calls.cs
@@ -47,19 +47,22 @@ namespace Grpc.Core
{
public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{
+ var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
+ return asyncCall.UnaryCall(call.Channel, call.MethodName, req);
+
//TODO: implement this in real synchronous style.
- try {
- return AsyncUnaryCall(call, req, token).Result;
- } catch(AggregateException ae) {
- foreach (var e in ae.InnerExceptions)
- {
- if (e is RpcException)
- {
- throw e;
- }
- }
- throw;
- }
+// try {
+// return AsyncUnaryCall(call, req, token).Result;
+// } catch(AggregateException ae) {
+// foreach (var e in ae.InnerExceptions)
+// {
+// if (e is RpcException)
+// {
+// throw e;
+// }
+// }
+// throw;
+// }
}
public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 4ad32e10e4..664d534dc2 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -62,6 +62,7 @@
<Compile Include="Internal\ClientStreamingInputObserver.cs" />
<Compile Include="Internal\ServerStreamingOutputObserver.cs" />
<Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" />
+ <Compile Include="Utils\BenchmarkUtil.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 5e96092e27..7c40661cf4 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -112,6 +112,31 @@ namespace Grpc.Core.Internal
InitializeInternal(call, true);
}
+ public TRead UnaryCall(Channel channel, String methodName, TWrite msg)
+ {
+ using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
+ {
+ // TODO: handle serialization error...
+ byte[] payload = serializer(msg);
+
+ unaryResponseTcs = new TaskCompletionSource<TRead>();
+
+ lock (myLock)
+ {
+ Initialize(channel, cq, methodName);
+ started = true;
+ halfcloseRequested = true;
+ readingDone = true;
+ }
+ call.BlockingUnary(cq, payload, unaryResponseHandler);
+
+ // task should be finished once BlockingUnary returns.
+ return unaryResponseTcs.Task.Result;
+
+ // TODO: unwrap aggregate exception...
+ }
+ }
+
public Task<TRead> UnaryCallAsync(TWrite msg)
{
lock (myLock)
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 659a383b4b..1c0bc98f06 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -63,6 +63,11 @@ namespace Grpc.Core.Internal
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
+ static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
+ byte[] send_buffer, UIntPtr send_buffer_len);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
@@ -113,6 +118,11 @@ namespace Grpc.Core.Internal
AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length)));
}
+ public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback)
+ {
+ grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong) payload.Length));
+ }
+
public void StartClientStreaming(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_client_streaming(this, callback));
diff --git a/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs
new file mode 100644
index 0000000000..3f0dae84cf
--- /dev/null
+++ b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs
@@ -0,0 +1,68 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Diagnostics;
+
+namespace Grpc.Core.Utils
+{
+ public static class BenchmarkUtil
+ {
+ /// <summary>
+ /// Runs a simple benchmark preceded by warmup phase.
+ /// </summary>
+ public static void RunBenchmark(int warmupIterations, int benchmarkIterations, Action action)
+ {
+ Console.WriteLine("Warmup iterations: " + warmupIterations);
+ for (int i = 0; i < warmupIterations; i++)
+ {
+ action();
+ }
+
+ Console.WriteLine("Benchmark iterations: " + benchmarkIterations);
+ var stopwatch = new Stopwatch();
+ stopwatch.Start();
+ for (int i = 0; i < benchmarkIterations; i++)
+ {
+ action();
+ }
+ stopwatch.Stop();
+ Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms");
+ Console.WriteLine("Ops per second: " + (int) ((double) benchmarkIterations * 1000 / stopwatch.ElapsedMilliseconds));
+ }
+ }
+}
+
diff --git a/src/csharp/Grpc.IntegrationTesting/Client.cs b/src/csharp/Grpc.IntegrationTesting/Client.cs
index bb650a112d..0c70744cea 100644
--- a/src/csharp/Grpc.IntegrationTesting/Client.cs
+++ b/src/csharp/Grpc.IntegrationTesting/Client.cs
@@ -33,7 +33,9 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Text.RegularExpressions;
+using System.Threading.Tasks;
using Google.ProtocolBuffers;
using Grpc.Core;
using Grpc.Core.Utils;
@@ -128,6 +130,9 @@ namespace Grpc.IntegrationTesting
case "empty_stream":
RunEmptyStream(client);
break;
+ case "benchmark_empty_unary":
+ RunBenchmarkEmptyUnary(client);
+ break;
default:
throw new ArgumentException("Unknown test case " + testCase);
}
@@ -267,6 +272,12 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("Passed!");
}
+ // This is not an official interop test, but it's useful.
+ private void RunBenchmarkEmptyUnary(TestServiceGrpc.ITestServiceClient client)
+ {
+ BenchmarkUtil.RunBenchmark(10000, 10000,
+ () => { client.EmptyCall(Empty.DefaultInstance);});
+ }
private Payload CreateZerosPayload(int size) {
return Payload.CreateBuilder().SetBody(ByteString.CopyFrom(new byte[size])).Build();
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index 5f9f22cab1..18e0431e3b 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -343,6 +343,18 @@ grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
+/* Synchronous unary call */
+GPR_EXPORT void GPR_CALLTYPE
+grpcsharp_call_blocking_unary(grpc_call *call, grpc_completion_queue *dedicated_cq, callback_funcptr callback,
+ const char *send_buffer, size_t send_buffer_len) {
+ GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer, send_buffer_len) == GRPC_CALL_OK);
+
+ /* TODO: we would like to use pluck, but we don't know the tag */
+ GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == GRPC_OP_COMPLETE);
+ grpc_completion_queue_shutdown(dedicated_cq);
+ GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == GRPC_QUEUE_SHUTDOWN);
+}
+
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_client_streaming(grpc_call *call,
callback_funcptr callback) {
@@ -566,3 +578,16 @@ grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq,
server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details),
&(ctx->server_rpc_new.request_metadata), cq, ctx);
}
+
+
+/* For testing */
+GPR_EXPORT void GPR_CALLTYPE
+grpcsharp_test_callback(callback_funcptr callback) {
+ callback(GRPC_OP_OK, NULL);
+}
+
+/* For testing */
+GPR_EXPORT void *GPR_CALLTYPE
+grpcsharp_test_nop(void *ptr) {
+ return ptr;
+}