aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.IntegrationTesting/StressTestClient.cs')
-rw-r--r--src/csharp/Grpc.IntegrationTesting/StressTestClient.cs318
1 files changed, 318 insertions, 0 deletions
diff --git a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs
new file mode 100644
index 0000000000..8db691cb04
--- /dev/null
+++ b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs
@@ -0,0 +1,318 @@
+#region Copyright notice and license
+
+// Copyright 2015-2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+using CommandLine;
+using CommandLine.Text;
+using Grpc.Core;
+using Grpc.Core.Logging;
+using Grpc.Core.Utils;
+using Grpc.Testing;
+
+namespace Grpc.IntegrationTesting
+{
+ public class StressTestClient
+ {
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<StressTestClient>();
+ const double SecondsToNanos = 1e9;
+
+ private class ClientOptions
+ {
+ [Option("server_addresses", DefaultValue = "localhost:8080")]
+ public string ServerAddresses { get; set; }
+
+ [Option("test_cases", DefaultValue = "large_unary:100")]
+ public string TestCases { get; set; }
+
+ [Option("test_duration_secs", DefaultValue = -1)]
+ public int TestDurationSecs { get; set; }
+
+ [Option("num_channels_per_server", DefaultValue = 1)]
+ public int NumChannelsPerServer { get; set; }
+
+ [Option("num_stubs_per_channel", DefaultValue = 1)]
+ public int NumStubsPerChannel { get; set; }
+
+ [Option("metrics_port", DefaultValue = 8081)]
+ public int MetricsPort { get; set; }
+
+ [HelpOption]
+ public string GetUsage()
+ {
+ var help = new HelpText
+ {
+ Heading = "gRPC C# stress test client",
+ AddDashesToOption = true
+ };
+ help.AddPreOptionsLine("Usage:");
+ help.AddOptions(this);
+ return help;
+ }
+ }
+
+ ClientOptions options;
+ List<string> serverAddresses;
+ Dictionary<string, int> weightedTestCases;
+ WeightedRandomGenerator testCaseGenerator;
+
+ // cancellation will be emitted once test_duration_secs has elapsed.
+ CancellationTokenSource finishedTokenSource = new CancellationTokenSource();
+ Histogram histogram = new Histogram(0.01, 60 * SecondsToNanos);
+
+ private StressTestClient(ClientOptions options, List<string> serverAddresses, Dictionary<string, int> weightedTestCases)
+ {
+ this.options = options;
+ this.serverAddresses = serverAddresses;
+ this.weightedTestCases = weightedTestCases;
+ this.testCaseGenerator = new WeightedRandomGenerator(this.weightedTestCases);
+ }
+
+ public static void Run(string[] args)
+ {
+ var options = new ClientOptions();
+ if (!Parser.Default.ParseArguments(args, options))
+ {
+ Environment.Exit(1);
+ }
+
+ GrpcPreconditions.CheckArgument(options.NumChannelsPerServer > 0);
+ GrpcPreconditions.CheckArgument(options.NumStubsPerChannel > 0);
+
+ var serverAddresses = options.ServerAddresses.Split(',');
+ GrpcPreconditions.CheckArgument(serverAddresses.Length > 0, "You need to provide at least one server address");
+
+ var testCases = ParseWeightedTestCases(options.TestCases);
+ GrpcPreconditions.CheckArgument(testCases.Count > 0, "You need to provide at least one test case");
+
+ var interopClient = new StressTestClient(options, serverAddresses.ToList(), testCases);
+ interopClient.Run().Wait();
+ }
+
+ async Task Run()
+ {
+ var metricsServer = new Server()
+ {
+ Services = { MetricsService.BindService(new MetricsServiceImpl(histogram)) },
+ Ports = { { "[::]", options.MetricsPort, ServerCredentials.Insecure } }
+ };
+ metricsServer.Start();
+
+ if (options.TestDurationSecs >= 0)
+ {
+ finishedTokenSource.CancelAfter(TimeSpan.FromSeconds(options.TestDurationSecs));
+ }
+
+ var tasks = new List<Task>();
+ var channels = new List<Channel>();
+ foreach (var serverAddress in serverAddresses)
+ {
+ for (int i = 0; i < options.NumChannelsPerServer; i++)
+ {
+ var channel = new Channel(serverAddress, ChannelCredentials.Insecure);
+ channels.Add(channel);
+ for (int j = 0; j < options.NumStubsPerChannel; j++)
+ {
+ var client = TestService.NewClient(channel);
+ var task = Task.Factory.StartNew(() => RunBodyAsync(client).GetAwaiter().GetResult(),
+ TaskCreationOptions.LongRunning);
+ tasks.Add(task);
+ }
+ }
+ }
+ await Task.WhenAll(tasks);
+
+ foreach (var channel in channels)
+ {
+ await channel.ShutdownAsync();
+ }
+
+ await metricsServer.ShutdownAsync();
+ }
+
+ async Task RunBodyAsync(TestService.TestServiceClient client)
+ {
+ Logger.Info("Starting stress test client thread.");
+ while (!finishedTokenSource.Token.IsCancellationRequested)
+ {
+ var testCase = testCaseGenerator.GetNext();
+
+ var stopwatch = Stopwatch.StartNew();
+
+ await RunTestCaseAsync(client, testCase);
+
+ stopwatch.Stop();
+ histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ }
+ Logger.Info("Stress test client thread finished.");
+ }
+
+ async Task RunTestCaseAsync(TestService.TestServiceClient client, string testCase)
+ {
+ switch (testCase)
+ {
+ case "empty_unary":
+ InteropClient.RunEmptyUnary(client);
+ break;
+ case "large_unary":
+ InteropClient.RunLargeUnary(client);
+ break;
+ case "client_streaming":
+ await InteropClient.RunClientStreamingAsync(client);
+ break;
+ case "server_streaming":
+ await InteropClient.RunServerStreamingAsync(client);
+ break;
+ case "ping_pong":
+ await InteropClient.RunPingPongAsync(client);
+ break;
+ case "empty_stream":
+ await InteropClient.RunEmptyStreamAsync(client);
+ break;
+ case "cancel_after_begin":
+ await InteropClient.RunCancelAfterBeginAsync(client);
+ break;
+ case "cancel_after_first_response":
+ await InteropClient.RunCancelAfterFirstResponseAsync(client);
+ break;
+ case "timeout_on_sleeping_server":
+ await InteropClient.RunTimeoutOnSleepingServerAsync(client);
+ break;
+ case "custom_metadata":
+ await InteropClient.RunCustomMetadataAsync(client);
+ break;
+ case "status_code_and_message":
+ await InteropClient.RunStatusCodeAndMessageAsync(client);
+ break;
+ default:
+ throw new ArgumentException("Unsupported test case " + testCase);
+ }
+ }
+
+ static Dictionary<string, int> ParseWeightedTestCases(string weightedTestCases)
+ {
+ var result = new Dictionary<string, int>();
+ foreach (var weightedTestCase in weightedTestCases.Split(','))
+ {
+ var parts = weightedTestCase.Split(new char[] {':'}, 2);
+ GrpcPreconditions.CheckArgument(parts.Length == 2, "Malformed test_cases option.");
+ result.Add(parts[0], int.Parse(parts[1]));
+ }
+ return result;
+ }
+
+ class WeightedRandomGenerator
+ {
+ readonly Random random = new Random();
+ readonly List<Tuple<int, string>> cumulativeSums;
+ readonly int weightSum;
+
+ public WeightedRandomGenerator(Dictionary<string, int> weightedItems)
+ {
+ cumulativeSums = new List<Tuple<int, string>>();
+ weightSum = 0;
+ foreach (var entry in weightedItems)
+ {
+ weightSum += entry.Value;
+ cumulativeSums.Add(Tuple.Create(weightSum, entry.Key));
+ }
+ }
+
+ public string GetNext()
+ {
+ int rand = random.Next(weightSum);
+ foreach (var entry in cumulativeSums)
+ {
+ if (rand < entry.Item1)
+ {
+ return entry.Item2;
+ }
+ }
+ throw new InvalidOperationException("GetNext() failed.");
+ }
+ }
+
+ class MetricsServiceImpl : MetricsService.MetricsServiceBase
+ {
+ const string GaugeName = "csharp_overall_qps";
+
+ readonly Histogram histogram;
+ readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
+
+ public MetricsServiceImpl(Histogram histogram)
+ {
+ this.histogram = histogram;
+ }
+
+ public override Task<GaugeResponse> GetGauge(GaugeRequest request, ServerCallContext context)
+ {
+ if (request.Name == GaugeName)
+ {
+ long qps = GetQpsAndReset();
+
+ return Task.FromResult(new GaugeResponse
+ {
+ Name = GaugeName,
+ LongValue = qps
+ });
+ }
+ throw new RpcException(new Status(StatusCode.InvalidArgument, "Gauge does not exist"));
+ }
+
+ public override async Task GetAllGauges(EmptyMessage request, IServerStreamWriter<GaugeResponse> responseStream, ServerCallContext context)
+ {
+ long qps = GetQpsAndReset();
+
+ var response = new GaugeResponse
+ {
+ Name = GaugeName,
+ LongValue = qps
+ };
+ await responseStream.WriteAsync(response);
+ }
+
+ long GetQpsAndReset()
+ {
+ var snapshot = histogram.GetSnapshot(true);
+ var elapsedSnapshot = wallClockStopwatch.GetElapsedSnapshot(true);
+
+ return (long) (snapshot.Count / elapsedSnapshot.Seconds);
+ }
+ }
+ }
+}