diff options
Diffstat (limited to 'src/csharp/Grpc.IntegrationTesting')
3 files changed, 235 insertions, 39 deletions
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index 0bcacf76e5..f954ca5f34 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -61,15 +61,7 @@ namespace Grpc.IntegrationTesting public static IClientRunner CreateStarted(ClientConfig config) { Logger.Debug("ClientConfig: {0}", config); - string target = config.ServerTargets.Single(); - GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop, - "Only closed loop scenario supported for C#"); - GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1"); - if (config.OutstandingRpcsPerChannel != 0) - { - Logger.Warning("ClientConfig.OutstandingRpcsPerChannel is not supported for C#. Ignoring the value"); - } if (config.AsyncClientThreads != 0) { Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value"); @@ -83,22 +75,40 @@ namespace Grpc.IntegrationTesting Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value"); } - var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure; + var channels = CreateChannels(config.ClientChannels, config.ServerTargets, config.SecurityParams); + + return new ClientRunnerImpl(channels, + config.ClientType, + config.RpcType, + config.OutstandingRpcsPerChannel, + config.LoadParams, + config.PayloadConfig, + config.HistogramParams); + } + + private static List<Channel> CreateChannels(int clientChannels, IEnumerable<string> serverTargets, SecurityParams securityParams) + { + GrpcPreconditions.CheckArgument(clientChannels > 0, "clientChannels needs to be at least 1."); + GrpcPreconditions.CheckArgument(serverTargets.Count() > 0, "at least one serverTarget needs to be specified."); + + var credentials = securityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure; List<ChannelOption> channelOptions = null; - if (config.SecurityParams != null && config.SecurityParams.ServerHostOverride != "") + if (securityParams != null && securityParams.ServerHostOverride != "") { channelOptions = new List<ChannelOption> { - new ChannelOption(ChannelOptions.SslTargetNameOverride, config.SecurityParams.ServerHostOverride) + new ChannelOption(ChannelOptions.SslTargetNameOverride, securityParams.ServerHostOverride) }; } - var channel = new Channel(target, credentials, channelOptions); - return new ClientRunnerImpl(channel, - config.ClientType, - config.RpcType, - config.PayloadConfig, - config.HistogramParams); + var result = new List<Channel>(); + for (int i = 0; i < clientChannels; i++) + { + var target = serverTargets.ElementAt(i % serverTargets.Count()); + var channel = new Channel(target, credentials, channelOptions); + result.Add(channel); + } + return result; } } @@ -106,30 +116,35 @@ namespace Grpc.IntegrationTesting { const double SecondsToNanos = 1e9; - readonly Channel channel; + readonly List<Channel> channels; readonly ClientType clientType; readonly RpcType rpcType; readonly PayloadConfig payloadConfig; readonly Histogram histogram; - readonly BenchmarkService.BenchmarkServiceClient client; - readonly Task runnerTask; - readonly CancellationTokenSource stoppedCts; + readonly List<Task> runnerTasks; + readonly CancellationTokenSource stoppedCts = new CancellationTokenSource(); readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); - public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams) + public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams) { - this.channel = GrpcPreconditions.CheckNotNull(channel); + GrpcPreconditions.CheckArgument(outstandingRpcsPerChannel > 0, "outstandingRpcsPerChannel"); + this.channels = new List<Channel>(channels); this.clientType = clientType; this.rpcType = rpcType; this.payloadConfig = payloadConfig; this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); - this.stoppedCts = new CancellationTokenSource(); - this.client = BenchmarkService.NewClient(channel); - - var threadBody = GetThreadBody(); - this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning); + this.runnerTasks = new List<Task>(); + foreach (var channel in this.channels) + { + for (int i = 0; i < outstandingRpcsPerChannel; i++) + { + var timer = CreateTimer(loadParams, 1.0 / this.channels.Count / outstandingRpcsPerChannel); + var threadBody = GetThreadBody(channel, timer); + this.runnerTasks.Add(Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning)); + } + } } public ClientStats GetStats(bool reset) @@ -150,12 +165,19 @@ namespace Grpc.IntegrationTesting public async Task StopAsync() { stoppedCts.Cancel(); - await runnerTask; - await channel.ShutdownAsync(); + foreach (var runnerTask in runnerTasks) + { + await runnerTask; + } + foreach (var channel in channels) + { + await channel.ShutdownAsync(); + } } - private void RunClosedLoopUnary() + private void RunUnary(Channel channel, IInterarrivalTimer timer) { + var client = BenchmarkService.NewClient(channel); var request = CreateSimpleRequest(); var stopwatch = new Stopwatch(); @@ -167,11 +189,14 @@ namespace Grpc.IntegrationTesting // spec requires data point in nanoseconds. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + + timer.WaitForNext(); } } - private async Task RunClosedLoopUnaryAsync() + private async Task RunUnaryAsync(Channel channel, IInterarrivalTimer timer) { + var client = BenchmarkService.NewClient(channel); var request = CreateSimpleRequest(); var stopwatch = new Stopwatch(); @@ -183,11 +208,14 @@ namespace Grpc.IntegrationTesting // spec requires data point in nanoseconds. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + + await timer.WaitForNextAsync(); } } - private async Task RunClosedLoopStreamingAsync() + private async Task RunStreamingPingPongAsync(Channel channel, IInterarrivalTimer timer) { + var client = BenchmarkService.NewClient(channel); var request = CreateSimpleRequest(); var stopwatch = new Stopwatch(); @@ -202,6 +230,8 @@ namespace Grpc.IntegrationTesting // spec requires data point in nanoseconds. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + + await timer.WaitForNextAsync(); } // finish the streaming call @@ -210,7 +240,7 @@ namespace Grpc.IntegrationTesting } } - private async Task RunGenericClosedLoopStreamingAsync() + private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer) { var request = CreateByteBufferRequest(); var stopwatch = new Stopwatch(); @@ -228,6 +258,8 @@ namespace Grpc.IntegrationTesting // spec requires data point in nanoseconds. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + + await timer.WaitForNextAsync(); } // finish the streaming call @@ -236,7 +268,7 @@ namespace Grpc.IntegrationTesting } } - private Action GetThreadBody() + private Action GetThreadBody(Channel channel, IInterarrivalTimer timer) { if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams) { @@ -244,7 +276,7 @@ namespace Grpc.IntegrationTesting GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls"); return () => { - RunGenericClosedLoopStreamingAsync().Wait(); + RunGenericStreamingAsync(channel, timer).Wait(); }; } @@ -252,7 +284,7 @@ namespace Grpc.IntegrationTesting if (clientType == ClientType.SYNC_CLIENT) { GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#"); - return RunClosedLoopUnary; + return () => RunUnary(channel, timer); } else if (clientType == ClientType.ASYNC_CLIENT) { @@ -261,12 +293,12 @@ namespace Grpc.IntegrationTesting case RpcType.UNARY: return () => { - RunClosedLoopUnaryAsync().Wait(); + RunUnaryAsync(channel, timer).Wait(); }; case RpcType.STREAMING: return () => { - RunClosedLoopStreamingAsync().Wait(); + RunStreamingPingPongAsync(channel, timer).Wait(); }; } } @@ -292,5 +324,18 @@ namespace Grpc.IntegrationTesting { return new Payload { Body = ByteString.CopyFrom(new byte[size]) }; } + + private static IInterarrivalTimer CreateTimer(LoadParams loadParams, double loadMultiplier) + { + switch (loadParams.LoadCase) + { + case LoadParams.LoadOneofCase.ClosedLoop: + return new ClosedLoopInterarrivalTimer(); + case LoadParams.LoadOneofCase.Poisson: + return new PoissonInterarrivalTimer(loadParams.Poisson.OfferedLoad * loadMultiplier); + default: + throw new ArgumentException("Unknown load type"); + } + } } } diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 64d14b0df5..7ea80b11f0 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -115,6 +115,7 @@ <Compile Include="GenericService.cs" /> <Compile Include="GeneratedServiceBaseTest.cs" /> <Compile Include="GeneratedClientTest.cs" /> + <Compile Include="InterarrivalTimers.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs b/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs new file mode 100644 index 0000000000..e8d7cbe8bb --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs @@ -0,0 +1,150 @@ +#region Copyright notice and license + +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; +using Google.Protobuf; +using Grpc.Core; +using Grpc.Core.Utils; +using Grpc.Testing; + +namespace Grpc.IntegrationTesting +{ + public interface IInterarrivalTimer + { + void WaitForNext(); + + Task WaitForNextAsync(); + } + + /// <summary> + /// Interarrival timer that doesn't wait at all. + /// </summary> + public class ClosedLoopInterarrivalTimer : IInterarrivalTimer + { + public ClosedLoopInterarrivalTimer() + { + } + + public void WaitForNext() + { + // NOP + } + + public Task WaitForNextAsync() + { + return Task.FromResult<object>(null); + } + } + + /// <summary> + /// Interarrival timer that generates Poisson process load. + /// </summary> + public class PoissonInterarrivalTimer : IInterarrivalTimer + { + const double NanosToSeconds = 1e-9; + + readonly ExponentialDistribution exponentialDistribution; + DateTime? lastEventTime; + + public PoissonInterarrivalTimer(double offeredLoad) + { + this.exponentialDistribution = new ExponentialDistribution(new Random(), offeredLoad); + this.lastEventTime = DateTime.UtcNow; + } + + public void WaitForNext() + { + var waitDuration = GetNextWaitDuration(); + int millisTimeout = (int) Math.Round(waitDuration.TotalMilliseconds); + if (millisTimeout > 0) + { + // TODO(jtattermusch): probably only works well for a relatively low interarrival rate + Thread.Sleep(millisTimeout); + } + } + + public async Task WaitForNextAsync() + { + var waitDuration = GetNextWaitDuration(); + int millisTimeout = (int) Math.Round(waitDuration.TotalMilliseconds); + if (millisTimeout > 0) + { + // TODO(jtattermusch): probably only works well for a relatively low interarrival rate + await Task.Delay(millisTimeout); + } + } + + private TimeSpan GetNextWaitDuration() + { + if (!lastEventTime.HasValue) + { + this.lastEventTime = DateTime.Now; + } + + var origLastEventTime = this.lastEventTime.Value; + this.lastEventTime = origLastEventTime + TimeSpan.FromSeconds(exponentialDistribution.Next() * NanosToSeconds); + return this.lastEventTime.Value - origLastEventTime; + } + + /// <summary> + /// Exp generator. + /// </summary> + private class ExponentialDistribution + { + readonly Random random; + readonly double lambda; + readonly double lambdaReciprocal; + + public ExponentialDistribution(Random random, double lambda) + { + this.random = random; + this.lambda = lambda; + this.lambdaReciprocal = 1.0 / lambda; + } + + public double Next() + { + double uniform = random.NextDouble(); + // Use 1.0-uni above to avoid NaN if uni is 0 + return lambdaReciprocal * (-Math.Log(1.0 - uniform)); + } + } + } +} |