diff options
Diffstat (limited to 'src/csharp/Grpc.IntegrationTesting')
-rw-r--r-- | src/csharp/Grpc.IntegrationTesting/ClientRunners.cs | 67 |
1 files changed, 60 insertions, 7 deletions
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index 76e877d4aa..a749f4a8a3 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -97,22 +97,32 @@ namespace Grpc.IntegrationTesting return new SimpleClientRunner(channel, config.ClientType, config.RpcType, - config.PayloadConfig.SimpleParams, + config.PayloadConfig, config.HistogramParams); } } /// <summary> - /// Client that starts synchronous unary calls in a closed loop. + /// Simple protobuf client. /// </summary> public class SimpleClientRunner : IClientRunner { const double SecondsToNanos = 1e9; + readonly static Marshaller<byte[]> ByteArrayMarshaller = new Marshaller<byte[]>((b) => b, (b) => b); + + readonly static Method<byte[], byte[]> StreamingCallMethod = new Method<byte[], byte[]>( + MethodType.DuplexStreaming, + "grpc.testing.BenchmarkService", + "StreamingCall", + ByteArrayMarshaller, + ByteArrayMarshaller + ); + readonly Channel channel; readonly ClientType clientType; readonly RpcType rpcType; - readonly SimpleProtoParams payloadParams; + readonly PayloadConfig payloadConfig; readonly Histogram histogram; readonly BenchmarkService.IBenchmarkServiceClient client; @@ -120,12 +130,12 @@ namespace Grpc.IntegrationTesting readonly CancellationTokenSource stoppedCts; readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); - public SimpleClientRunner(Channel channel, ClientType clientType, RpcType rpcType, SimpleProtoParams payloadParams, HistogramParams histogramParams) + public SimpleClientRunner(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams) { this.channel = GrpcPreconditions.CheckNotNull(channel); this.clientType = clientType; this.rpcType = rpcType; - this.payloadParams = payloadParams; + this.payloadConfig = payloadConfig; this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); this.stoppedCts = new CancellationTokenSource(); @@ -213,8 +223,45 @@ namespace Grpc.IntegrationTesting } } + private async Task RunGenericClosedLoopStreamingAsync() + { + var request = CreateByteBufferRequest(); + var stopwatch = new Stopwatch(); + + var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, StreamingCallMethod, new CallOptions()); + + using (var call = Calls.AsyncDuplexStreamingCall(callDetails)) + { + while (!stoppedCts.Token.IsCancellationRequested) + { + stopwatch.Restart(); + await call.RequestStream.WriteAsync(request); + await call.ResponseStream.MoveNext(); + stopwatch.Stop(); + + // spec requires data point in nanoseconds. + histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + } + + // finish the streaming call + await call.RequestStream.CompleteAsync(); + Assert.IsFalse(await call.ResponseStream.MoveNext()); + } + } + private Action GetThreadBody() { + if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams) + { + GrpcPreconditions.CheckArgument(clientType == ClientType.ASYNC_CLIENT, "Generic client only supports async API"); + GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls"); + return () => + { + RunGenericClosedLoopStreamingAsync().Wait(); + }; + } + + GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams); if (clientType == ClientType.SYNC_CLIENT) { GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#"); @@ -241,13 +288,19 @@ namespace Grpc.IntegrationTesting private SimpleRequest CreateSimpleRequest() { + GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams); return new SimpleRequest { - Payload = CreateZerosPayload(payloadParams.ReqSize), - ResponseSize = payloadParams.RespSize + Payload = CreateZerosPayload(payloadConfig.SimpleParams.ReqSize), + ResponseSize = payloadConfig.SimpleParams.RespSize }; } + private byte[] CreateByteBufferRequest() + { + return new byte[payloadConfig.BytebufParams.ReqSize]; + } + private static Payload CreateZerosPayload(int size) { return new Payload { Body = ByteString.CopyFrom(new byte[size]) }; |