aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.IntegrationTesting
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.IntegrationTesting')
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ClientRunners.cs67
1 files changed, 60 insertions, 7 deletions
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
index 76e877d4aa..a749f4a8a3 100644
--- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
@@ -97,22 +97,32 @@ namespace Grpc.IntegrationTesting
return new SimpleClientRunner(channel,
config.ClientType,
config.RpcType,
- config.PayloadConfig.SimpleParams,
+ config.PayloadConfig,
config.HistogramParams);
}
}
/// <summary>
- /// Client that starts synchronous unary calls in a closed loop.
+ /// Simple protobuf client.
/// </summary>
public class SimpleClientRunner : IClientRunner
{
const double SecondsToNanos = 1e9;
+ readonly static Marshaller<byte[]> ByteArrayMarshaller = new Marshaller<byte[]>((b) => b, (b) => b);
+
+ readonly static Method<byte[], byte[]> StreamingCallMethod = new Method<byte[], byte[]>(
+ MethodType.DuplexStreaming,
+ "grpc.testing.BenchmarkService",
+ "StreamingCall",
+ ByteArrayMarshaller,
+ ByteArrayMarshaller
+ );
+
readonly Channel channel;
readonly ClientType clientType;
readonly RpcType rpcType;
- readonly SimpleProtoParams payloadParams;
+ readonly PayloadConfig payloadConfig;
readonly Histogram histogram;
readonly BenchmarkService.IBenchmarkServiceClient client;
@@ -120,12 +130,12 @@ namespace Grpc.IntegrationTesting
readonly CancellationTokenSource stoppedCts;
readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
- public SimpleClientRunner(Channel channel, ClientType clientType, RpcType rpcType, SimpleProtoParams payloadParams, HistogramParams histogramParams)
+ public SimpleClientRunner(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams)
{
this.channel = GrpcPreconditions.CheckNotNull(channel);
this.clientType = clientType;
this.rpcType = rpcType;
- this.payloadParams = payloadParams;
+ this.payloadConfig = payloadConfig;
this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible);
this.stoppedCts = new CancellationTokenSource();
@@ -213,8 +223,45 @@ namespace Grpc.IntegrationTesting
}
}
+ private async Task RunGenericClosedLoopStreamingAsync()
+ {
+ var request = CreateByteBufferRequest();
+ var stopwatch = new Stopwatch();
+
+ var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, StreamingCallMethod, new CallOptions());
+
+ using (var call = Calls.AsyncDuplexStreamingCall(callDetails))
+ {
+ while (!stoppedCts.Token.IsCancellationRequested)
+ {
+ stopwatch.Restart();
+ await call.RequestStream.WriteAsync(request);
+ await call.ResponseStream.MoveNext();
+ stopwatch.Stop();
+
+ // spec requires data point in nanoseconds.
+ histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ }
+
+ // finish the streaming call
+ await call.RequestStream.CompleteAsync();
+ Assert.IsFalse(await call.ResponseStream.MoveNext());
+ }
+ }
+
private Action GetThreadBody()
{
+ if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams)
+ {
+ GrpcPreconditions.CheckArgument(clientType == ClientType.ASYNC_CLIENT, "Generic client only supports async API");
+ GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls");
+ return () =>
+ {
+ RunGenericClosedLoopStreamingAsync().Wait();
+ };
+ }
+
+ GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
if (clientType == ClientType.SYNC_CLIENT)
{
GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#");
@@ -241,13 +288,19 @@ namespace Grpc.IntegrationTesting
private SimpleRequest CreateSimpleRequest()
{
+ GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
return new SimpleRequest
{
- Payload = CreateZerosPayload(payloadParams.ReqSize),
- ResponseSize = payloadParams.RespSize
+ Payload = CreateZerosPayload(payloadConfig.SimpleParams.ReqSize),
+ ResponseSize = payloadConfig.SimpleParams.RespSize
};
}
+ private byte[] CreateByteBufferRequest()
+ {
+ return new byte[payloadConfig.BytebufParams.ReqSize];
+ }
+
private static Payload CreateZerosPayload(int size)
{
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };