aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.IntegrationTesting
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2016-03-21 13:52:50 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2016-03-25 16:28:16 -0700
commite26e2e5ad96f9bce89ff32a6a108190feda20046 (patch)
treef980bb3e06d1436442d52832cfa9d283706de2ec /src/csharp/Grpc.IntegrationTesting
parentc848502f552563cd12b065238ae860b120f7c72e (diff)
support streaming and async client
Diffstat (limited to 'src/csharp/Grpc.IntegrationTesting')
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ClientRunners.cs114
1 files changed, 92 insertions, 22 deletions
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
index 0b6a8ee626..76e877d4aa 100644
--- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
@@ -64,8 +64,6 @@ namespace Grpc.IntegrationTesting
string target = config.ServerTargets.Single();
GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop,
"Only closed loop scenario supported for C#");
- GrpcPreconditions.CheckArgument(config.ClientType == ClientType.SYNC_CLIENT,
- "Only sync client support for C#");
GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1");
if (config.OutstandingRpcsPerChannel != 0)
@@ -96,28 +94,24 @@ namespace Grpc.IntegrationTesting
}
var channel = new Channel(target, credentials, channelOptions);
- switch (config.RpcType)
- {
- case RpcType.UNARY:
- return new SyncUnaryClientRunner(channel,
- config.PayloadConfig.SimpleParams,
- config.HistogramParams);
-
- case RpcType.STREAMING:
- default:
- throw new ArgumentException("Unsupported RpcType.");
- }
+ return new SimpleClientRunner(channel,
+ config.ClientType,
+ config.RpcType,
+ config.PayloadConfig.SimpleParams,
+ config.HistogramParams);
}
}
/// <summary>
/// Client that starts synchronous unary calls in a closed loop.
/// </summary>
- public class SyncUnaryClientRunner : IClientRunner
+ public class SimpleClientRunner : IClientRunner
{
const double SecondsToNanos = 1e9;
readonly Channel channel;
+ readonly ClientType clientType;
+ readonly RpcType rpcType;
readonly SimpleProtoParams payloadParams;
readonly Histogram histogram;
@@ -126,14 +120,19 @@ namespace Grpc.IntegrationTesting
readonly CancellationTokenSource stoppedCts;
readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
- public SyncUnaryClientRunner(Channel channel, SimpleProtoParams payloadParams, HistogramParams histogramParams)
+ public SimpleClientRunner(Channel channel, ClientType clientType, RpcType rpcType, SimpleProtoParams payloadParams, HistogramParams histogramParams)
{
this.channel = GrpcPreconditions.CheckNotNull(channel);
+ this.clientType = clientType;
+ this.rpcType = rpcType;
+ this.payloadParams = payloadParams;
this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible);
this.stoppedCts = new CancellationTokenSource();
this.client = BenchmarkService.NewClient(channel);
- this.runnerTask = Task.Factory.StartNew(Run, TaskCreationOptions.LongRunning);
+
+ var threadBody = GetThreadBody();
+ this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning);
}
public ClientStats GetStats(bool reset)
@@ -158,13 +157,9 @@ namespace Grpc.IntegrationTesting
await channel.ShutdownAsync();
}
- private void Run()
+ private void RunClosedLoopUnary()
{
- var request = new SimpleRequest
- {
- Payload = CreateZerosPayload(payloadParams.ReqSize),
- ResponseSize = payloadParams.RespSize
- };
+ var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
while (!stoppedCts.Token.IsCancellationRequested)
@@ -178,6 +173,81 @@ namespace Grpc.IntegrationTesting
}
}
+ private async Task RunClosedLoopUnaryAsync()
+ {
+ var request = CreateSimpleRequest();
+ var stopwatch = new Stopwatch();
+
+ while (!stoppedCts.Token.IsCancellationRequested)
+ {
+ stopwatch.Restart();
+ await client.UnaryCallAsync(request);
+ stopwatch.Stop();
+
+ // spec requires data point in nanoseconds.
+ histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ }
+ }
+
+ private async Task RunClosedLoopStreamingAsync()
+ {
+ var request = CreateSimpleRequest();
+ var stopwatch = new Stopwatch();
+
+ using (var call = client.StreamingCall())
+ {
+ while (!stoppedCts.Token.IsCancellationRequested)
+ {
+ stopwatch.Restart();
+ await call.RequestStream.WriteAsync(request);
+ await call.ResponseStream.MoveNext();
+ stopwatch.Stop();
+
+ // spec requires data point in nanoseconds.
+ histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ }
+
+ // finish the streaming call
+ await call.RequestStream.CompleteAsync();
+ Assert.IsFalse(await call.ResponseStream.MoveNext());
+ }
+ }
+
+ private Action GetThreadBody()
+ {
+ if (clientType == ClientType.SYNC_CLIENT)
+ {
+ GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#");
+ return RunClosedLoopUnary;
+ }
+ else if (clientType == ClientType.ASYNC_CLIENT)
+ {
+ switch (rpcType)
+ {
+ case RpcType.UNARY:
+ return () =>
+ {
+ RunClosedLoopUnaryAsync().Wait();
+ };
+ case RpcType.STREAMING:
+ return () =>
+ {
+ RunClosedLoopStreamingAsync().Wait();
+ };
+ }
+ }
+ throw new ArgumentException("Unsupported configuration.");
+ }
+
+ private SimpleRequest CreateSimpleRequest()
+ {
+ return new SimpleRequest
+ {
+ Payload = CreateZerosPayload(payloadParams.ReqSize),
+ ResponseSize = payloadParams.RespSize
+ };
+ }
+
private static Payload CreateZerosPayload(int size)
{
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };