aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-02-21 17:51:52 -0800
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-02-23 11:35:49 -0800
commit50faa8f78b210a9922644f624d3c0a36b4e2a2db (patch)
treed726e42f02f400a67090c5e7e4d0354bdd9de738 /src/csharp/Grpc.Core
parent7c15ee88a2cde0fbb7d88a0a31ae772c6895efe9 (diff)
Added support for true synchronous unary call and added some performance tests.
Diffstat (limited to 'src/csharp/Grpc.Core')
-rw-r--r--src/csharp/Grpc.Core/Calls.cs27
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj1
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs25
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs10
-rw-r--r--src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs68
5 files changed, 119 insertions, 12 deletions
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs
index b67332676a..c0973234e2 100644
--- a/src/csharp/Grpc.Core/Calls.cs
+++ b/src/csharp/Grpc.Core/Calls.cs
@@ -47,19 +47,22 @@ namespace Grpc.Core
{
public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{
+ var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
+ return asyncCall.UnaryCall(call.Channel, call.MethodName, req);
+
//TODO: implement this in real synchronous style.
- try {
- return AsyncUnaryCall(call, req, token).Result;
- } catch(AggregateException ae) {
- foreach (var e in ae.InnerExceptions)
- {
- if (e is RpcException)
- {
- throw e;
- }
- }
- throw;
- }
+// try {
+// return AsyncUnaryCall(call, req, token).Result;
+// } catch(AggregateException ae) {
+// foreach (var e in ae.InnerExceptions)
+// {
+// if (e is RpcException)
+// {
+// throw e;
+// }
+// }
+// throw;
+// }
}
public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 4ad32e10e4..664d534dc2 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -62,6 +62,7 @@
<Compile Include="Internal\ClientStreamingInputObserver.cs" />
<Compile Include="Internal\ServerStreamingOutputObserver.cs" />
<Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" />
+ <Compile Include="Utils\BenchmarkUtil.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 5e96092e27..7c40661cf4 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -112,6 +112,31 @@ namespace Grpc.Core.Internal
InitializeInternal(call, true);
}
+ public TRead UnaryCall(Channel channel, String methodName, TWrite msg)
+ {
+ using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
+ {
+ // TODO: handle serialization error...
+ byte[] payload = serializer(msg);
+
+ unaryResponseTcs = new TaskCompletionSource<TRead>();
+
+ lock (myLock)
+ {
+ Initialize(channel, cq, methodName);
+ started = true;
+ halfcloseRequested = true;
+ readingDone = true;
+ }
+ call.BlockingUnary(cq, payload, unaryResponseHandler);
+
+ // task should be finished once BlockingUnary returns.
+ return unaryResponseTcs.Task.Result;
+
+ // TODO: unwrap aggregate exception...
+ }
+ }
+
public Task<TRead> UnaryCallAsync(TWrite msg)
{
lock (myLock)
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 659a383b4b..1c0bc98f06 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -63,6 +63,11 @@ namespace Grpc.Core.Internal
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
+ static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
+ [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
+ byte[] send_buffer, UIntPtr send_buffer_len);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
@@ -113,6 +118,11 @@ namespace Grpc.Core.Internal
AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length)));
}
+ public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback)
+ {
+ grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong) payload.Length));
+ }
+
public void StartClientStreaming(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_client_streaming(this, callback));
diff --git a/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs
new file mode 100644
index 0000000000..3f0dae84cf
--- /dev/null
+++ b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs
@@ -0,0 +1,68 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Diagnostics;
+
+namespace Grpc.Core.Utils
+{
+ public static class BenchmarkUtil
+ {
+ /// <summary>
+ /// Runs a simple benchmark preceded by warmup phase.
+ /// </summary>
+ public static void RunBenchmark(int warmupIterations, int benchmarkIterations, Action action)
+ {
+ Console.WriteLine("Warmup iterations: " + warmupIterations);
+ for (int i = 0; i < warmupIterations; i++)
+ {
+ action();
+ }
+
+ Console.WriteLine("Benchmark iterations: " + benchmarkIterations);
+ var stopwatch = new Stopwatch();
+ stopwatch.Start();
+ for (int i = 0; i < benchmarkIterations; i++)
+ {
+ action();
+ }
+ stopwatch.Stop();
+ Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms");
+ Console.WriteLine("Ops per second: " + (int) ((double) benchmarkIterations * 1000 / stopwatch.ElapsedMilliseconds));
+ }
+ }
+}
+