diff options
author | Jan Tattermusch <jtattermusch@google.com> | 2016-03-21 13:52:50 -0700 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@google.com> | 2016-03-25 16:28:16 -0700 |
commit | e26e2e5ad96f9bce89ff32a6a108190feda20046 (patch) | |
tree | f980bb3e06d1436442d52832cfa9d283706de2ec /src/csharp/Grpc.IntegrationTesting | |
parent | c848502f552563cd12b065238ae860b120f7c72e (diff) |
support streaming and async client
Diffstat (limited to 'src/csharp/Grpc.IntegrationTesting')
-rw-r--r-- | src/csharp/Grpc.IntegrationTesting/ClientRunners.cs | 114 |
1 files changed, 92 insertions, 22 deletions
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index 0b6a8ee626..76e877d4aa 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -64,8 +64,6 @@ namespace Grpc.IntegrationTesting string target = config.ServerTargets.Single(); GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop, "Only closed loop scenario supported for C#"); - GrpcPreconditions.CheckArgument(config.ClientType == ClientType.SYNC_CLIENT, - "Only sync client support for C#"); GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1"); if (config.OutstandingRpcsPerChannel != 0) @@ -96,28 +94,24 @@ namespace Grpc.IntegrationTesting } var channel = new Channel(target, credentials, channelOptions); - switch (config.RpcType) - { - case RpcType.UNARY: - return new SyncUnaryClientRunner(channel, - config.PayloadConfig.SimpleParams, - config.HistogramParams); - - case RpcType.STREAMING: - default: - throw new ArgumentException("Unsupported RpcType."); - } + return new SimpleClientRunner(channel, + config.ClientType, + config.RpcType, + config.PayloadConfig.SimpleParams, + config.HistogramParams); } } /// <summary> /// Client that starts synchronous unary calls in a closed loop. /// </summary> - public class SyncUnaryClientRunner : IClientRunner + public class SimpleClientRunner : IClientRunner { const double SecondsToNanos = 1e9; readonly Channel channel; + readonly ClientType clientType; + readonly RpcType rpcType; readonly SimpleProtoParams payloadParams; readonly Histogram histogram; @@ -126,14 +120,19 @@ namespace Grpc.IntegrationTesting readonly CancellationTokenSource stoppedCts; readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); - public SyncUnaryClientRunner(Channel channel, SimpleProtoParams payloadParams, HistogramParams histogramParams) + public SimpleClientRunner(Channel channel, ClientType clientType, RpcType rpcType, SimpleProtoParams payloadParams, HistogramParams histogramParams) { this.channel = GrpcPreconditions.CheckNotNull(channel); + this.clientType = clientType; + this.rpcType = rpcType; + this.payloadParams = payloadParams; this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); this.stoppedCts = new CancellationTokenSource(); this.client = BenchmarkService.NewClient(channel); - this.runnerTask = Task.Factory.StartNew(Run, TaskCreationOptions.LongRunning); + + var threadBody = GetThreadBody(); + this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning); } public ClientStats GetStats(bool reset) @@ -158,13 +157,9 @@ namespace Grpc.IntegrationTesting await channel.ShutdownAsync(); } - private void Run() + private void RunClosedLoopUnary() { - var request = new SimpleRequest - { - Payload = CreateZerosPayload(payloadParams.ReqSize), - ResponseSize = payloadParams.RespSize - }; + var request = CreateSimpleRequest(); var stopwatch = new Stopwatch(); while (!stoppedCts.Token.IsCancellationRequested) @@ -178,6 +173,81 @@ namespace Grpc.IntegrationTesting } } + private async Task RunClosedLoopUnaryAsync() + { + var request = CreateSimpleRequest(); + var stopwatch = new Stopwatch(); + + while (!stoppedCts.Token.IsCancellationRequested) + { + stopwatch.Restart(); + await client.UnaryCallAsync(request); + stopwatch.Stop(); + + // spec requires data point in nanoseconds. + histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + } + } + + private async Task RunClosedLoopStreamingAsync() + { + var request = CreateSimpleRequest(); + var stopwatch = new Stopwatch(); + + using (var call = client.StreamingCall()) + { + while (!stoppedCts.Token.IsCancellationRequested) + { + stopwatch.Restart(); + await call.RequestStream.WriteAsync(request); + await call.ResponseStream.MoveNext(); + stopwatch.Stop(); + + // spec requires data point in nanoseconds. + histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + } + + // finish the streaming call + await call.RequestStream.CompleteAsync(); + Assert.IsFalse(await call.ResponseStream.MoveNext()); + } + } + + private Action GetThreadBody() + { + if (clientType == ClientType.SYNC_CLIENT) + { + GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#"); + return RunClosedLoopUnary; + } + else if (clientType == ClientType.ASYNC_CLIENT) + { + switch (rpcType) + { + case RpcType.UNARY: + return () => + { + RunClosedLoopUnaryAsync().Wait(); + }; + case RpcType.STREAMING: + return () => + { + RunClosedLoopStreamingAsync().Wait(); + }; + } + } + throw new ArgumentException("Unsupported configuration."); + } + + private SimpleRequest CreateSimpleRequest() + { + return new SimpleRequest + { + Payload = CreateZerosPayload(payloadParams.ReqSize), + ResponseSize = payloadParams.RespSize + }; + } + private static Payload CreateZerosPayload(int size) { return new Payload { Body = ByteString.CopyFrom(new byte[size]) }; |