diff options
Diffstat (limited to 'src/csharp/Grpc.IntegrationTesting/ClientRunners.cs')
-rw-r--r-- | src/csharp/Grpc.IntegrationTesting/ClientRunners.cs | 123 |
1 files changed, 84 insertions, 39 deletions
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index 5bfc89d591..f954ca5f34 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -61,15 +61,7 @@ namespace Grpc.IntegrationTesting public static IClientRunner CreateStarted(ClientConfig config) { Logger.Debug("ClientConfig: {0}", config); - string target = config.ServerTargets.Single(); - GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop, - "Only closed loop scenario supported for C#"); - GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1"); - if (config.OutstandingRpcsPerChannel != 0) - { - Logger.Warning("ClientConfig.OutstandingRpcsPerChannel is not supported for C#. Ignoring the value"); - } if (config.AsyncClientThreads != 0) { Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value"); @@ -83,22 +75,40 @@ namespace Grpc.IntegrationTesting Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value"); } - var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure; + var channels = CreateChannels(config.ClientChannels, config.ServerTargets, config.SecurityParams); + + return new ClientRunnerImpl(channels, + config.ClientType, + config.RpcType, + config.OutstandingRpcsPerChannel, + config.LoadParams, + config.PayloadConfig, + config.HistogramParams); + } + + private static List<Channel> CreateChannels(int clientChannels, IEnumerable<string> serverTargets, SecurityParams securityParams) + { + GrpcPreconditions.CheckArgument(clientChannels > 0, "clientChannels needs to be at least 1."); + GrpcPreconditions.CheckArgument(serverTargets.Count() > 0, "at least one serverTarget needs to be specified."); + + var credentials = securityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure; List<ChannelOption> channelOptions = null; - if (config.SecurityParams != null && config.SecurityParams.ServerHostOverride != "") + if (securityParams != null && securityParams.ServerHostOverride != "") { channelOptions = new List<ChannelOption> { - new ChannelOption(ChannelOptions.SslTargetNameOverride, config.SecurityParams.ServerHostOverride) + new ChannelOption(ChannelOptions.SslTargetNameOverride, securityParams.ServerHostOverride) }; } - var channel = new Channel(target, credentials, channelOptions); - return new ClientRunnerImpl(channel, - config.ClientType, - config.RpcType, - config.PayloadConfig, - config.HistogramParams); + var result = new List<Channel>(); + for (int i = 0; i < clientChannels; i++) + { + var target = serverTargets.ElementAt(i % serverTargets.Count()); + var channel = new Channel(target, credentials, channelOptions); + result.Add(channel); + } + return result; } } @@ -106,30 +116,35 @@ namespace Grpc.IntegrationTesting { const double SecondsToNanos = 1e9; - readonly Channel channel; + readonly List<Channel> channels; readonly ClientType clientType; readonly RpcType rpcType; readonly PayloadConfig payloadConfig; readonly Histogram histogram; - readonly BenchmarkService.IBenchmarkServiceClient client; - readonly Task runnerTask; - readonly CancellationTokenSource stoppedCts; + readonly List<Task> runnerTasks; + readonly CancellationTokenSource stoppedCts = new CancellationTokenSource(); readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); - public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams) + public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams) { - this.channel = GrpcPreconditions.CheckNotNull(channel); + GrpcPreconditions.CheckArgument(outstandingRpcsPerChannel > 0, "outstandingRpcsPerChannel"); + this.channels = new List<Channel>(channels); this.clientType = clientType; this.rpcType = rpcType; this.payloadConfig = payloadConfig; this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); - this.stoppedCts = new CancellationTokenSource(); - this.client = BenchmarkService.NewClient(channel); - - var threadBody = GetThreadBody(); - this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning); + this.runnerTasks = new List<Task>(); + foreach (var channel in this.channels) + { + for (int i = 0; i < outstandingRpcsPerChannel; i++) + { + var timer = CreateTimer(loadParams, 1.0 / this.channels.Count / outstandingRpcsPerChannel); + var threadBody = GetThreadBody(channel, timer); + this.runnerTasks.Add(Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning)); + } + } } public ClientStats GetStats(bool reset) @@ -150,12 +165,19 @@ namespace Grpc.IntegrationTesting public async Task StopAsync() { stoppedCts.Cancel(); - await runnerTask; - await channel.ShutdownAsync(); + foreach (var runnerTask in runnerTasks) + { + await runnerTask; + } + foreach (var channel in channels) + { + await channel.ShutdownAsync(); + } } - private void RunClosedLoopUnary() + private void RunUnary(Channel channel, IInterarrivalTimer timer) { + var client = BenchmarkService.NewClient(channel); var request = CreateSimpleRequest(); var stopwatch = new Stopwatch(); @@ -167,11 +189,14 @@ namespace Grpc.IntegrationTesting // spec requires data point in nanoseconds. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + + timer.WaitForNext(); } } - private async Task RunClosedLoopUnaryAsync() + private async Task RunUnaryAsync(Channel channel, IInterarrivalTimer timer) { + var client = BenchmarkService.NewClient(channel); var request = CreateSimpleRequest(); var stopwatch = new Stopwatch(); @@ -183,11 +208,14 @@ namespace Grpc.IntegrationTesting // spec requires data point in nanoseconds. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + + await timer.WaitForNextAsync(); } } - private async Task RunClosedLoopStreamingAsync() + private async Task RunStreamingPingPongAsync(Channel channel, IInterarrivalTimer timer) { + var client = BenchmarkService.NewClient(channel); var request = CreateSimpleRequest(); var stopwatch = new Stopwatch(); @@ -202,6 +230,8 @@ namespace Grpc.IntegrationTesting // spec requires data point in nanoseconds. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + + await timer.WaitForNextAsync(); } // finish the streaming call @@ -210,7 +240,7 @@ namespace Grpc.IntegrationTesting } } - private async Task RunGenericClosedLoopStreamingAsync() + private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer) { var request = CreateByteBufferRequest(); var stopwatch = new Stopwatch(); @@ -228,6 +258,8 @@ namespace Grpc.IntegrationTesting // spec requires data point in nanoseconds. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + + await timer.WaitForNextAsync(); } // finish the streaming call @@ -236,7 +268,7 @@ namespace Grpc.IntegrationTesting } } - private Action GetThreadBody() + private Action GetThreadBody(Channel channel, IInterarrivalTimer timer) { if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams) { @@ -244,7 +276,7 @@ namespace Grpc.IntegrationTesting GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls"); return () => { - RunGenericClosedLoopStreamingAsync().Wait(); + RunGenericStreamingAsync(channel, timer).Wait(); }; } @@ -252,7 +284,7 @@ namespace Grpc.IntegrationTesting if (clientType == ClientType.SYNC_CLIENT) { GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#"); - return RunClosedLoopUnary; + return () => RunUnary(channel, timer); } else if (clientType == ClientType.ASYNC_CLIENT) { @@ -261,12 +293,12 @@ namespace Grpc.IntegrationTesting case RpcType.UNARY: return () => { - RunClosedLoopUnaryAsync().Wait(); + RunUnaryAsync(channel, timer).Wait(); }; case RpcType.STREAMING: return () => { - RunClosedLoopStreamingAsync().Wait(); + RunStreamingPingPongAsync(channel, timer).Wait(); }; } } @@ -292,5 +324,18 @@ namespace Grpc.IntegrationTesting { return new Payload { Body = ByteString.CopyFrom(new byte[size]) }; } + + private static IInterarrivalTimer CreateTimer(LoadParams loadParams, double loadMultiplier) + { + switch (loadParams.LoadCase) + { + case LoadParams.LoadOneofCase.ClosedLoop: + return new ClosedLoopInterarrivalTimer(); + case LoadParams.LoadOneofCase.Poisson: + return new PoissonInterarrivalTimer(loadParams.Poisson.OfferedLoad * loadMultiplier); + default: + throw new ArgumentException("Unknown load type"); + } + } } } |