aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-03-29 08:29:32 -0700
committerGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-03-29 08:29:32 -0700
commitd2eb23974d62e7d6f2ed6486ba559e36f16b8081 (patch)
tree71d96eab212c4653b14d1e5114c72cee422db181 /src
parentc91504ed57c93d30f654d9f3a0fee62c1236fad8 (diff)
parent42140525abdcd0186996d7b64bde89aaf10488c7 (diff)
Merge pull request #5904 from jtattermusch/csharp_perf_intergrate
C# performance worker improvements
Diffstat (limited to 'src')
-rw-r--r--src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs11
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ClientRunners.cs195
-rw-r--r--src/csharp/Grpc.IntegrationTesting/GenericService.cs71
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj1
-rw-r--r--src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs12
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ServerRunners.cs65
6 files changed, 306 insertions, 49 deletions
diff --git a/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs
index 47a15224f1..1edeedae2f 100644
--- a/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs
+++ b/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs
@@ -1,6 +1,6 @@
#region Copyright notice and license
-// Copyright 2015, Google Inc.
+// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -46,16 +46,13 @@ namespace Grpc.Testing
/// </summary>
public class BenchmarkServiceImpl : BenchmarkService.IBenchmarkService
{
- private readonly int responseSize;
-
- public BenchmarkServiceImpl(int responseSize)
+ public BenchmarkServiceImpl()
{
- this.responseSize = responseSize;
}
public Task<SimpleResponse> UnaryCall(SimpleRequest request, ServerCallContext context)
{
- var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) };
+ var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) };
return Task.FromResult(response);
}
@@ -63,7 +60,7 @@ namespace Grpc.Testing
{
await requestStream.ForEachAsync(async request =>
{
- var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) };
+ var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) };
await responseStream.WriteAsync(response);
});
}
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
index c4016012cb..e6dc2321c4 100644
--- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
@@ -41,6 +41,7 @@ using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
+using Grpc.Core.Logging;
using Grpc.Core.Utils;
using NUnit.Framework;
using Grpc.Testing;
@@ -50,42 +51,65 @@ namespace Grpc.IntegrationTesting
/// <summary>
/// Helper methods to start client runners for performance testing.
/// </summary>
- public static class ClientRunners
+ public class ClientRunners
{
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientRunners>();
+
/// <summary>
/// Creates a started client runner.
/// </summary>
public static IClientRunner CreateStarted(ClientConfig config)
{
+ Logger.Debug("ClientConfig: {0}", config);
string target = config.ServerTargets.Single();
- GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop);
+ GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop,
+ "Only closed loop scenario supported for C#");
+ GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1");
- var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
- var channel = new Channel(target, credentials);
+ if (config.OutstandingRpcsPerChannel != 0)
+ {
+ Logger.Warning("ClientConfig.OutstandingRpcsPerChannel is not supported for C#. Ignoring the value");
+ }
+ if (config.AsyncClientThreads != 0)
+ {
+ Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value");
+ }
+ if (config.CoreLimit != 0)
+ {
+ Logger.Warning("ClientConfig.CoreLimit is not supported for C#. Ignoring the value");
+ }
+ if (config.CoreList.Count > 0)
+ {
+ Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value");
+ }
- switch (config.RpcType)
+ var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
+ List<ChannelOption> channelOptions = null;
+ if (config.SecurityParams != null && config.SecurityParams.ServerHostOverride != "")
{
- case RpcType.UNARY:
- return new SyncUnaryClientRunner(channel,
- config.PayloadConfig.SimpleParams.ReqSize,
- config.HistogramParams);
-
- case RpcType.STREAMING:
- default:
- throw new ArgumentException("Unsupported RpcType.");
+ channelOptions = new List<ChannelOption>
+ {
+ new ChannelOption(ChannelOptions.SslTargetNameOverride, config.SecurityParams.ServerHostOverride)
+ };
}
+ var channel = new Channel(target, credentials, channelOptions);
+
+ return new ClientRunnerImpl(channel,
+ config.ClientType,
+ config.RpcType,
+ config.PayloadConfig,
+ config.HistogramParams);
}
}
- /// <summary>
- /// Client that starts synchronous unary calls in a closed loop.
- /// </summary>
- public class SyncUnaryClientRunner : IClientRunner
+ public class ClientRunnerImpl : IClientRunner
{
const double SecondsToNanos = 1e9;
readonly Channel channel;
- readonly int payloadSize;
+ readonly ClientType clientType;
+ readonly RpcType rpcType;
+ readonly PayloadConfig payloadConfig;
readonly Histogram histogram;
readonly BenchmarkService.IBenchmarkServiceClient client;
@@ -93,15 +117,19 @@ namespace Grpc.IntegrationTesting
readonly CancellationTokenSource stoppedCts;
readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
- public SyncUnaryClientRunner(Channel channel, int payloadSize, HistogramParams histogramParams)
+ public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams)
{
this.channel = GrpcPreconditions.CheckNotNull(channel);
- this.payloadSize = payloadSize;
+ this.clientType = clientType;
+ this.rpcType = rpcType;
+ this.payloadConfig = payloadConfig;
this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible);
this.stoppedCts = new CancellationTokenSource();
this.client = BenchmarkService.NewClient(channel);
- this.runnerTask = Task.Factory.StartNew(Run, TaskCreationOptions.LongRunning);
+
+ var threadBody = GetThreadBody();
+ this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning);
}
public ClientStats GetStats(bool reset)
@@ -126,12 +154,9 @@ namespace Grpc.IntegrationTesting
await channel.ShutdownAsync();
}
- private void Run()
+ private void RunClosedLoopUnary()
{
- var request = new SimpleRequest
- {
- Payload = CreateZerosPayload(payloadSize)
- };
+ var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
while (!stoppedCts.Token.IsCancellationRequested)
@@ -145,6 +170,124 @@ namespace Grpc.IntegrationTesting
}
}
+ private async Task RunClosedLoopUnaryAsync()
+ {
+ var request = CreateSimpleRequest();
+ var stopwatch = new Stopwatch();
+
+ while (!stoppedCts.Token.IsCancellationRequested)
+ {
+ stopwatch.Restart();
+ await client.UnaryCallAsync(request);
+ stopwatch.Stop();
+
+ // spec requires data point in nanoseconds.
+ histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ }
+ }
+
+ private async Task RunClosedLoopStreamingAsync()
+ {
+ var request = CreateSimpleRequest();
+ var stopwatch = new Stopwatch();
+
+ using (var call = client.StreamingCall())
+ {
+ while (!stoppedCts.Token.IsCancellationRequested)
+ {
+ stopwatch.Restart();
+ await call.RequestStream.WriteAsync(request);
+ await call.ResponseStream.MoveNext();
+ stopwatch.Stop();
+
+ // spec requires data point in nanoseconds.
+ histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ }
+
+ // finish the streaming call
+ await call.RequestStream.CompleteAsync();
+ Assert.IsFalse(await call.ResponseStream.MoveNext());
+ }
+ }
+
+ private async Task RunGenericClosedLoopStreamingAsync()
+ {
+ var request = CreateByteBufferRequest();
+ var stopwatch = new Stopwatch();
+
+ var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions());
+
+ using (var call = Calls.AsyncDuplexStreamingCall(callDetails))
+ {
+ while (!stoppedCts.Token.IsCancellationRequested)
+ {
+ stopwatch.Restart();
+ await call.RequestStream.WriteAsync(request);
+ await call.ResponseStream.MoveNext();
+ stopwatch.Stop();
+
+ // spec requires data point in nanoseconds.
+ histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ }
+
+ // finish the streaming call
+ await call.RequestStream.CompleteAsync();
+ Assert.IsFalse(await call.ResponseStream.MoveNext());
+ }
+ }
+
+ private Action GetThreadBody()
+ {
+ if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams)
+ {
+ GrpcPreconditions.CheckArgument(clientType == ClientType.ASYNC_CLIENT, "Generic client only supports async API");
+ GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls");
+ return () =>
+ {
+ RunGenericClosedLoopStreamingAsync().Wait();
+ };
+ }
+
+ GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
+ if (clientType == ClientType.SYNC_CLIENT)
+ {
+ GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#");
+ return RunClosedLoopUnary;
+ }
+ else if (clientType == ClientType.ASYNC_CLIENT)
+ {
+ switch (rpcType)
+ {
+ case RpcType.UNARY:
+ return () =>
+ {
+ RunClosedLoopUnaryAsync().Wait();
+ };
+ case RpcType.STREAMING:
+ return () =>
+ {
+ RunClosedLoopStreamingAsync().Wait();
+ };
+ }
+ }
+ throw new ArgumentException("Unsupported configuration.");
+ }
+
+ private SimpleRequest CreateSimpleRequest()
+ {
+ GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
+ return new SimpleRequest
+ {
+ Payload = CreateZerosPayload(payloadConfig.SimpleParams.ReqSize),
+ ResponseSize = payloadConfig.SimpleParams.RespSize
+ };
+ }
+
+ private byte[] CreateByteBufferRequest()
+ {
+ return new byte[payloadConfig.BytebufParams.ReqSize];
+ }
+
private static Payload CreateZerosPayload(int size)
{
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
diff --git a/src/csharp/Grpc.IntegrationTesting/GenericService.cs b/src/csharp/Grpc.IntegrationTesting/GenericService.cs
new file mode 100644
index 0000000000..c6128264ac
--- /dev/null
+++ b/src/csharp/Grpc.IntegrationTesting/GenericService.cs
@@ -0,0 +1,71 @@
+#region Copyright notice and license
+
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Text.RegularExpressions;
+using System.Threading.Tasks;
+using Google.Protobuf;
+using Grpc.Core;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+using Grpc.Testing;
+
+namespace Grpc.IntegrationTesting
+{
+ /// <summary>
+ /// Utility methods for defining and calling a service that doesn't use protobufs
+ /// for serialization/deserialization.
+ /// </summary>
+ public static class GenericService
+ {
+ readonly static Marshaller<byte[]> ByteArrayMarshaller = new Marshaller<byte[]>((b) => b, (b) => b);
+
+ public readonly static Method<byte[], byte[]> StreamingCallMethod = new Method<byte[], byte[]>(
+ MethodType.DuplexStreaming,
+ "grpc.testing.BenchmarkService",
+ "StreamingCall",
+ ByteArrayMarshaller,
+ ByteArrayMarshaller
+ );
+
+ public static ServerServiceDefinition BindHandler(DuplexStreamingServerMethod<byte[], byte[]> handler)
+ {
+ return ServerServiceDefinition.CreateBuilder(StreamingCallMethod.ServiceName)
+ .AddMethod(StreamingCallMethod, handler).Build();
+ }
+ }
+}
diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
index 372991374e..4c049944ea 100644
--- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
+++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
@@ -120,6 +120,7 @@
<Compile Include="WorkerServiceImpl.cs" />
<Compile Include="QpsWorker.cs" />
<Compile Include="WallClockStopwatch.cs" />
+ <Compile Include="GenericService.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs
index 06d5ee93d8..a8cf75bd81 100644
--- a/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs
@@ -55,14 +55,7 @@ namespace Grpc.IntegrationTesting
{
var serverConfig = new ServerConfig
{
- ServerType = ServerType.ASYNC_SERVER,
- PayloadConfig = new PayloadConfig
- {
- SimpleParams = new SimpleProtoParams
- {
- RespSize = 100
- }
- }
+ ServerType = ServerType.ASYNC_SERVER
};
serverRunner = ServerRunners.CreateStarted(serverConfig);
}
@@ -88,7 +81,8 @@ namespace Grpc.IntegrationTesting
{
SimpleParams = new SimpleProtoParams
{
- ReqSize = 100
+ ReqSize = 100,
+ RespSize = 100
}
},
HistogramParams = new HistogramParams
diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
index 4a73645e6c..c326378cfa 100644
--- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
@@ -41,6 +41,7 @@ using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
+using Grpc.Core.Logging;
using Grpc.Core.Utils;
using NUnit.Framework;
using Grpc.Testing;
@@ -50,27 +51,78 @@ namespace Grpc.IntegrationTesting
/// <summary>
/// Helper methods to start server runners for performance testing.
/// </summary>
- public static class ServerRunners
+ public class ServerRunners
{
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ServerRunners>();
+
/// <summary>
/// Creates a started server runner.
/// </summary>
public static IServerRunner CreateStarted(ServerConfig config)
{
- GrpcPreconditions.CheckArgument(config.ServerType == ServerType.ASYNC_SERVER);
+ Logger.Debug("ServerConfig: {0}", config);
var credentials = config.SecurityParams != null ? TestCredentials.CreateSslServerCredentials() : ServerCredentials.Insecure;
- // TODO: qps_driver needs to setup payload properly...
- int responseSize = config.PayloadConfig != null ? config.PayloadConfig.SimpleParams.RespSize : 0;
+ if (config.AsyncServerThreads != 0)
+ {
+ Logger.Warning("ServerConfig.AsyncServerThreads is not supported for C#. Ignoring the value");
+ }
+ if (config.CoreLimit != 0)
+ {
+ Logger.Warning("ServerConfig.CoreLimit is not supported for C#. Ignoring the value");
+ }
+ if (config.CoreList.Count > 0)
+ {
+ Logger.Warning("ServerConfig.CoreList is not supported for C#. Ignoring the value");
+ }
+
+ ServerServiceDefinition service = null;
+ if (config.ServerType == ServerType.ASYNC_SERVER)
+ {
+ GrpcPreconditions.CheckArgument(config.PayloadConfig == null,
+ "ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server.");
+ service = BenchmarkService.BindService(new BenchmarkServiceImpl());
+ }
+ else if (config.ServerType == ServerType.ASYNC_GENERIC_SERVER)
+ {
+ var genericService = new GenericServiceImpl(config.PayloadConfig.BytebufParams.RespSize);
+ service = GenericService.BindHandler(genericService.StreamingCall);
+ }
+ else
+ {
+ throw new ArgumentException("Unsupported ServerType");
+ }
+
var server = new Server
{
- Services = { BenchmarkService.BindService(new BenchmarkServiceImpl(responseSize)) },
+ Services = { service },
Ports = { new ServerPort("[::]", config.Port, credentials) }
};
server.Start();
return new ServerRunnerImpl(server);
}
+
+ private class GenericServiceImpl
+ {
+ readonly byte[] response;
+
+ public GenericServiceImpl(int responseSize)
+ {
+ this.response = new byte[responseSize];
+ }
+
+ /// <summary>
+ /// Generic streaming call handler.
+ /// </summary>
+ public async Task StreamingCall(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext context)
+ {
+ await requestStream.ForEachAsync(async request =>
+ {
+ await responseStream.WriteAsync(response);
+ });
+ }
+ }
}
/// <summary>
@@ -119,6 +171,5 @@ namespace Grpc.IntegrationTesting
{
return server.ShutdownAsync();
}
- }
-
+ }
}