From 3d6644aefd73f5030ad46154917c4a2732544e82 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 21 Mar 2016 09:46:15 -0700 Subject: improve C# qps worker --- src/csharp/Grpc.IntegrationTesting/ServerRunners.cs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) (limited to 'src/csharp/Grpc.IntegrationTesting/ServerRunners.cs') diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs index 4a73645e6c..9f210081e7 100644 --- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs @@ -41,6 +41,7 @@ using System.Threading; using System.Threading.Tasks; using Google.Protobuf; using Grpc.Core; +using Grpc.Core.Logging; using Grpc.Core.Utils; using NUnit.Framework; using Grpc.Testing; @@ -50,16 +51,32 @@ namespace Grpc.IntegrationTesting /// /// Helper methods to start server runners for performance testing. /// - public static class ServerRunners + public class ServerRunners { + static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); + /// /// Creates a started server runner. /// public static IServerRunner CreateStarted(ServerConfig config) { - GrpcPreconditions.CheckArgument(config.ServerType == ServerType.ASYNC_SERVER); + Logger.Debug("ServerConfig: {0}", config); + GrpcPreconditions.CheckArgument(config.ServerType == ServerType.ASYNC_SERVER, "Only ASYNC_SERVER supported for C# QpsWorker"); var credentials = config.SecurityParams != null ? TestCredentials.CreateSslServerCredentials() : ServerCredentials.Insecure; + if (config.AsyncServerThreads != 0) + { + Logger.Warning("ServerConfig.AsyncServerThreads is not supported for C#. Ignoring the value"); + } + if (config.CoreLimit != 0) + { + Logger.Warning("ServerConfig.CoreLimit is not supported for C#. Ignoring the value"); + } + if (config.CoreList.Count > 0) + { + Logger.Warning("ServerConfig.CoreList is not supported for C#. Ignoring the value"); + } + // TODO: qps_driver needs to setup payload properly... int responseSize = config.PayloadConfig != null ? config.PayloadConfig.SimpleParams.RespSize : 0; var server = new Server -- cgit v1.2.3 From c848502f552563cd12b065238ae860b120f7c72e Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 21 Mar 2016 12:27:18 -0700 Subject: honor request.ReponseSize for benchmark service --- src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs | 9 +++------ src/csharp/Grpc.IntegrationTesting/ClientRunners.cs | 12 +++++++----- src/csharp/Grpc.IntegrationTesting/ServerRunners.cs | 6 +++--- 3 files changed, 13 insertions(+), 14 deletions(-) (limited to 'src/csharp/Grpc.IntegrationTesting/ServerRunners.cs') diff --git a/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs index 47a15224f1..7e7bc713a0 100644 --- a/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs +++ b/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs @@ -46,16 +46,13 @@ namespace Grpc.Testing /// public class BenchmarkServiceImpl : BenchmarkService.IBenchmarkService { - private readonly int responseSize; - - public BenchmarkServiceImpl(int responseSize) + public BenchmarkServiceImpl() { - this.responseSize = responseSize; } public Task UnaryCall(SimpleRequest request, ServerCallContext context) { - var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) }; + var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) }; return Task.FromResult(response); } @@ -63,7 +60,7 @@ namespace Grpc.Testing { await requestStream.ForEachAsync(async request => { - var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) }; + var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) }; await responseStream.WriteAsync(response); }); } diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index 3223d0d080..0b6a8ee626 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -64,6 +64,8 @@ namespace Grpc.IntegrationTesting string target = config.ServerTargets.Single(); GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop, "Only closed loop scenario supported for C#"); + GrpcPreconditions.CheckArgument(config.ClientType == ClientType.SYNC_CLIENT, + "Only sync client support for C#"); GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1"); if (config.OutstandingRpcsPerChannel != 0) @@ -98,7 +100,7 @@ namespace Grpc.IntegrationTesting { case RpcType.UNARY: return new SyncUnaryClientRunner(channel, - config.PayloadConfig.SimpleParams.ReqSize, + config.PayloadConfig.SimpleParams, config.HistogramParams); case RpcType.STREAMING: @@ -116,7 +118,7 @@ namespace Grpc.IntegrationTesting const double SecondsToNanos = 1e9; readonly Channel channel; - readonly int payloadSize; + readonly SimpleProtoParams payloadParams; readonly Histogram histogram; readonly BenchmarkService.IBenchmarkServiceClient client; @@ -124,10 +126,9 @@ namespace Grpc.IntegrationTesting readonly CancellationTokenSource stoppedCts; readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); - public SyncUnaryClientRunner(Channel channel, int payloadSize, HistogramParams histogramParams) + public SyncUnaryClientRunner(Channel channel, SimpleProtoParams payloadParams, HistogramParams histogramParams) { this.channel = GrpcPreconditions.CheckNotNull(channel); - this.payloadSize = payloadSize; this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); this.stoppedCts = new CancellationTokenSource(); @@ -161,7 +162,8 @@ namespace Grpc.IntegrationTesting { var request = new SimpleRequest { - Payload = CreateZerosPayload(payloadSize) + Payload = CreateZerosPayload(payloadParams.ReqSize), + ResponseSize = payloadParams.RespSize }; var stopwatch = new Stopwatch(); diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs index 9f210081e7..516436ac5a 100644 --- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs @@ -77,11 +77,11 @@ namespace Grpc.IntegrationTesting Logger.Warning("ServerConfig.CoreList is not supported for C#. Ignoring the value"); } - // TODO: qps_driver needs to setup payload properly... - int responseSize = config.PayloadConfig != null ? config.PayloadConfig.SimpleParams.RespSize : 0; + GrpcPreconditions.CheckArgument(config.PayloadConfig == null, + "ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server."); var server = new Server { - Services = { BenchmarkService.BindService(new BenchmarkServiceImpl(responseSize)) }, + Services = { BenchmarkService.BindService(new BenchmarkServiceImpl()) }, Ports = { new ServerPort("[::]", config.Port, credentials) } }; -- cgit v1.2.3 From 253769e92d5ff1883c1623fd0ee130ae4ce4b380 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 21 Mar 2016 16:25:59 -0700 Subject: add ASYNC_GENERIC_SERVER support for C# --- .../Grpc.IntegrationTesting/ClientRunners.cs | 21 ++----- .../Grpc.IntegrationTesting/GenericService.cs | 71 ++++++++++++++++++++++ .../Grpc.IntegrationTesting.csproj | 1 + .../Grpc.IntegrationTesting/ServerRunners.cs | 46 ++++++++++++-- 4 files changed, 116 insertions(+), 23 deletions(-) create mode 100644 src/csharp/Grpc.IntegrationTesting/GenericService.cs (limited to 'src/csharp/Grpc.IntegrationTesting/ServerRunners.cs') diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index a749f4a8a3..e6dc2321c4 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -94,7 +94,7 @@ namespace Grpc.IntegrationTesting } var channel = new Channel(target, credentials, channelOptions); - return new SimpleClientRunner(channel, + return new ClientRunnerImpl(channel, config.ClientType, config.RpcType, config.PayloadConfig, @@ -102,23 +102,10 @@ namespace Grpc.IntegrationTesting } } - /// - /// Simple protobuf client. - /// - public class SimpleClientRunner : IClientRunner + public class ClientRunnerImpl : IClientRunner { const double SecondsToNanos = 1e9; - readonly static Marshaller ByteArrayMarshaller = new Marshaller((b) => b, (b) => b); - - readonly static Method StreamingCallMethod = new Method( - MethodType.DuplexStreaming, - "grpc.testing.BenchmarkService", - "StreamingCall", - ByteArrayMarshaller, - ByteArrayMarshaller - ); - readonly Channel channel; readonly ClientType clientType; readonly RpcType rpcType; @@ -130,7 +117,7 @@ namespace Grpc.IntegrationTesting readonly CancellationTokenSource stoppedCts; readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); - public SimpleClientRunner(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams) + public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams) { this.channel = GrpcPreconditions.CheckNotNull(channel); this.clientType = clientType; @@ -228,7 +215,7 @@ namespace Grpc.IntegrationTesting var request = CreateByteBufferRequest(); var stopwatch = new Stopwatch(); - var callDetails = new CallInvocationDetails(channel, StreamingCallMethod, new CallOptions()); + var callDetails = new CallInvocationDetails(channel, GenericService.StreamingCallMethod, new CallOptions()); using (var call = Calls.AsyncDuplexStreamingCall(callDetails)) { diff --git a/src/csharp/Grpc.IntegrationTesting/GenericService.cs b/src/csharp/Grpc.IntegrationTesting/GenericService.cs new file mode 100644 index 0000000000..c6128264ac --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/GenericService.cs @@ -0,0 +1,71 @@ +#region Copyright notice and license + +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using Google.Protobuf; +using Grpc.Core; +using Grpc.Core.Utils; +using NUnit.Framework; +using Grpc.Testing; + +namespace Grpc.IntegrationTesting +{ + /// + /// Utility methods for defining and calling a service that doesn't use protobufs + /// for serialization/deserialization. + /// + public static class GenericService + { + readonly static Marshaller ByteArrayMarshaller = new Marshaller((b) => b, (b) => b); + + public readonly static Method StreamingCallMethod = new Method( + MethodType.DuplexStreaming, + "grpc.testing.BenchmarkService", + "StreamingCall", + ByteArrayMarshaller, + ByteArrayMarshaller + ); + + public static ServerServiceDefinition BindHandler(DuplexStreamingServerMethod handler) + { + return ServerServiceDefinition.CreateBuilder(StreamingCallMethod.ServiceName) + .AddMethod(StreamingCallMethod, handler).Build(); + } + } +} diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 372991374e..4c049944ea 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -120,6 +120,7 @@ + diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs index 516436ac5a..c326378cfa 100644 --- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs @@ -61,7 +61,6 @@ namespace Grpc.IntegrationTesting public static IServerRunner CreateStarted(ServerConfig config) { Logger.Debug("ServerConfig: {0}", config); - GrpcPreconditions.CheckArgument(config.ServerType == ServerType.ASYNC_SERVER, "Only ASYNC_SERVER supported for C# QpsWorker"); var credentials = config.SecurityParams != null ? TestCredentials.CreateSslServerCredentials() : ServerCredentials.Insecure; if (config.AsyncServerThreads != 0) @@ -77,17 +76,53 @@ namespace Grpc.IntegrationTesting Logger.Warning("ServerConfig.CoreList is not supported for C#. Ignoring the value"); } - GrpcPreconditions.CheckArgument(config.PayloadConfig == null, - "ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server."); + ServerServiceDefinition service = null; + if (config.ServerType == ServerType.ASYNC_SERVER) + { + GrpcPreconditions.CheckArgument(config.PayloadConfig == null, + "ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server."); + service = BenchmarkService.BindService(new BenchmarkServiceImpl()); + } + else if (config.ServerType == ServerType.ASYNC_GENERIC_SERVER) + { + var genericService = new GenericServiceImpl(config.PayloadConfig.BytebufParams.RespSize); + service = GenericService.BindHandler(genericService.StreamingCall); + } + else + { + throw new ArgumentException("Unsupported ServerType"); + } + var server = new Server { - Services = { BenchmarkService.BindService(new BenchmarkServiceImpl()) }, + Services = { service }, Ports = { new ServerPort("[::]", config.Port, credentials) } }; server.Start(); return new ServerRunnerImpl(server); } + + private class GenericServiceImpl + { + readonly byte[] response; + + public GenericServiceImpl(int responseSize) + { + this.response = new byte[responseSize]; + } + + /// + /// Generic streaming call handler. + /// + public async Task StreamingCall(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + { + await requestStream.ForEachAsync(async request => + { + await responseStream.WriteAsync(response); + }); + } + } } /// @@ -136,6 +171,5 @@ namespace Grpc.IntegrationTesting { return server.ShutdownAsync(); } - } - + } } -- cgit v1.2.3