diff options
author | Jan Tattermusch <jtattermusch@google.com> | 2015-02-21 17:51:52 -0800 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@google.com> | 2015-02-23 11:35:49 -0800 |
commit | 50faa8f78b210a9922644f624d3c0a36b4e2a2db (patch) | |
tree | d726e42f02f400a67090c5e7e4d0354bdd9de738 /src/csharp/Grpc.Core | |
parent | 7c15ee88a2cde0fbb7d88a0a31ae772c6895efe9 (diff) |
Added support for true synchronous unary call and added some performance tests.
Diffstat (limited to 'src/csharp/Grpc.Core')
-rw-r--r-- | src/csharp/Grpc.Core/Calls.cs | 27 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Grpc.Core.csproj | 1 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 25 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 10 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs | 68 |
5 files changed, 119 insertions, 12 deletions
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index b67332676a..c0973234e2 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -47,19 +47,22 @@ namespace Grpc.Core { public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) { + var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); + return asyncCall.UnaryCall(call.Channel, call.MethodName, req); + //TODO: implement this in real synchronous style. - try { - return AsyncUnaryCall(call, req, token).Result; - } catch(AggregateException ae) { - foreach (var e in ae.InnerExceptions) - { - if (e is RpcException) - { - throw e; - } - } - throw; - } +// try { +// return AsyncUnaryCall(call, req, token).Result; +// } catch(AggregateException ae) { +// foreach (var e in ae.InnerExceptions) +// { +// if (e is RpcException) +// { +// throw e; +// } +// } +// throw; +// } } public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 4ad32e10e4..664d534dc2 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -62,6 +62,7 @@ <Compile Include="Internal\ClientStreamingInputObserver.cs" /> <Compile Include="Internal\ServerStreamingOutputObserver.cs" /> <Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" /> + <Compile Include="Utils\BenchmarkUtil.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 5e96092e27..7c40661cf4 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -112,6 +112,31 @@ namespace Grpc.Core.Internal InitializeInternal(call, true); } + public TRead UnaryCall(Channel channel, String methodName, TWrite msg) + { + using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create()) + { + // TODO: handle serialization error... + byte[] payload = serializer(msg); + + unaryResponseTcs = new TaskCompletionSource<TRead>(); + + lock (myLock) + { + Initialize(channel, cq, methodName); + started = true; + halfcloseRequested = true; + readingDone = true; + } + call.BlockingUnary(cq, payload, unaryResponseHandler); + + // task should be finished once BlockingUnary returns. + return unaryResponseTcs.Task.Result; + + // TODO: unwrap aggregate exception... + } + } + public Task<TRead> UnaryCallAsync(TWrite msg) { lock (myLock) diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 659a383b4b..1c0bc98f06 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -63,6 +63,11 @@ namespace Grpc.Core.Internal byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); @@ -113,6 +118,11 @@ namespace Grpc.Core.Internal AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length))); } + public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback) + { + grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong) payload.Length)); + } + public void StartClientStreaming(CompletionCallbackDelegate callback) { AssertCallOk(grpcsharp_call_start_client_streaming(this, callback)); diff --git a/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs new file mode 100644 index 0000000000..3f0dae84cf --- /dev/null +++ b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs @@ -0,0 +1,68 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Collections.Concurrent; +using System.Diagnostics; + +namespace Grpc.Core.Utils +{ + public static class BenchmarkUtil + { + /// <summary> + /// Runs a simple benchmark preceded by warmup phase. + /// </summary> + public static void RunBenchmark(int warmupIterations, int benchmarkIterations, Action action) + { + Console.WriteLine("Warmup iterations: " + warmupIterations); + for (int i = 0; i < warmupIterations; i++) + { + action(); + } + + Console.WriteLine("Benchmark iterations: " + benchmarkIterations); + var stopwatch = new Stopwatch(); + stopwatch.Start(); + for (int i = 0; i < benchmarkIterations; i++) + { + action(); + } + stopwatch.Stop(); + Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms"); + Console.WriteLine("Ops per second: " + (int) ((double) benchmarkIterations * 1000 / stopwatch.ElapsedMilliseconds)); + } + } +} + |