aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
blob: 9d41d34414aa55285926d4f39ecb92a093813255 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
#region Copyright notice and license

// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#endregion

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Profiling;
using Grpc.Core.Utils;
using NUnit.Framework;
using Grpc.Testing;

namespace Grpc.IntegrationTesting
{
    /// <summary>
    /// Helper methods to start client runners for performance testing.
    /// </summary>
    public class ClientRunners
    {
        static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientRunners>();

        // Profilers to use for clients.
        static readonly BlockingCollection<BasicProfiler> profilers = new BlockingCollection<BasicProfiler>();

        internal static void AddProfiler(BasicProfiler profiler)
        {
            GrpcPreconditions.CheckNotNull(profiler);
            profilers.Add(profiler);
        }

        /// <summary>
        /// Creates a started client runner.
        /// </summary>
        public static IClientRunner CreateStarted(ClientConfig config)
        {
            Logger.Debug("ClientConfig: {0}", config);

            if (config.AsyncClientThreads != 0)
            {
                Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value");
            }
            if (config.CoreLimit != 0)
            {
                Logger.Warning("ClientConfig.CoreLimit is not supported for C#. Ignoring the value");
            }
            if (config.CoreList.Count > 0)
            {
                Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value");
            }

            var channels = CreateChannels(config.ClientChannels, config.ServerTargets, config.SecurityParams, config.ChannelArgs);

            return new ClientRunnerImpl(channels,
                config.ClientType,
                config.RpcType,
                config.OutstandingRpcsPerChannel,
                config.LoadParams,
                config.PayloadConfig,
                config.HistogramParams,
                () => GetNextProfiler());
        }

        private static List<Channel> CreateChannels(int clientChannels, IEnumerable<string> serverTargets, SecurityParams securityParams, IEnumerable<ChannelArg> channelArguments)
        {
            GrpcPreconditions.CheckArgument(clientChannels > 0, "clientChannels needs to be at least 1.");
            GrpcPreconditions.CheckArgument(serverTargets.Count() > 0, "at least one serverTarget needs to be specified.");

            var credentials = securityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
            var channelOptions = new List<ChannelOption>();
            if (securityParams != null && securityParams.ServerHostOverride != "")
            {
                channelOptions.Add(new ChannelOption(ChannelOptions.SslTargetNameOverride, securityParams.ServerHostOverride));
            }
            foreach (var channelArgument in channelArguments)
            {
                channelOptions.Add(channelArgument.ToChannelOption());
            }

            var result = new List<Channel>();
            for (int i = 0; i < clientChannels; i++)
            {
                var target = serverTargets.ElementAt(i % serverTargets.Count());
                var channel = new Channel(target, credentials, channelOptions);
                result.Add(channel);
            }
            return result;
        }

        private static BasicProfiler GetNextProfiler()
        {
            BasicProfiler result = null;
            profilers.TryTake(out result);
            return result;
        }
    }

    internal class ClientRunnerImpl : IClientRunner
    {
        const double SecondsToNanos = 1e9;

        readonly List<Channel> channels;
        readonly ClientType clientType;
        readonly RpcType rpcType;
        readonly PayloadConfig payloadConfig;
        readonly Lazy<byte[]> cachedByteBufferRequest;
        readonly ThreadLocal<Histogram> threadLocalHistogram;

        readonly List<Task> runnerTasks;
        readonly CancellationTokenSource stoppedCts = new CancellationTokenSource();
        readonly TimeStats timeStats = new TimeStats();
        readonly AtomicCounter statsResetCount = new AtomicCounter();
        
        public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams, Func<BasicProfiler> profilerFactory)
        {
            GrpcPreconditions.CheckArgument(outstandingRpcsPerChannel > 0, "outstandingRpcsPerChannel");
            GrpcPreconditions.CheckNotNull(histogramParams, "histogramParams");
            this.channels = new List<Channel>(channels);
            this.clientType = clientType;
            this.rpcType = rpcType;
            this.payloadConfig = payloadConfig;
            this.cachedByteBufferRequest = new Lazy<byte[]>(() => new byte[payloadConfig.BytebufParams.ReqSize]);
            this.threadLocalHistogram = new ThreadLocal<Histogram>(() => new Histogram(histogramParams.Resolution, histogramParams.MaxPossible), true);

            this.runnerTasks = new List<Task>();
            foreach (var channel in this.channels)
            {
                for (int i = 0; i < outstandingRpcsPerChannel; i++)
                {
                    var timer = CreateTimer(loadParams, 1.0 / this.channels.Count / outstandingRpcsPerChannel);
                    var optionalProfiler = profilerFactory();
                    this.runnerTasks.Add(RunClientAsync(channel, timer, optionalProfiler));
                }
            }
        }

        public ClientStats GetStats(bool reset)
        {
            var histogramData = new HistogramData();
            foreach (var hist in threadLocalHistogram.Values)
            {
                hist.GetSnapshot(histogramData, reset);
            }

            var timeSnapshot = timeStats.GetSnapshot(reset);

            if (reset)
            {
                statsResetCount.Increment();
            }

            GrpcEnvironment.Logger.Info("[ClientRunnerImpl.GetStats] GC collection counts: gen0 {0}, gen1 {1}, gen2 {2}, (histogram reset count:{3}, seconds since reset: {4})",
                GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), statsResetCount.Count, timeSnapshot.WallClockTime.TotalSeconds);

            return new ClientStats
            {
                Latencies = histogramData,
                TimeElapsed = timeSnapshot.WallClockTime.TotalSeconds,
                TimeUser = timeSnapshot.UserProcessorTime.TotalSeconds,
                TimeSystem = timeSnapshot.PrivilegedProcessorTime.TotalSeconds
            };
        }

        public async Task StopAsync()
        {
            stoppedCts.Cancel();
            foreach (var runnerTask in runnerTasks)
            {
                await runnerTask;
            }
            foreach (var channel in channels)
            {
                await channel.ShutdownAsync();
            }
        }

        private void RunUnary(Channel channel, IInterarrivalTimer timer, BasicProfiler optionalProfiler)
        {
            if (optionalProfiler != null)
            {
                Profilers.SetForCurrentThread(optionalProfiler);
            }

            bool profilerReset = false;

            var client = new BenchmarkService.BenchmarkServiceClient(channel);
            var request = CreateSimpleRequest();
            var stopwatch = new Stopwatch();

            while (!stoppedCts.Token.IsCancellationRequested)
            {
                // after the first stats reset, also reset the profiler.
                if (optionalProfiler != null && !profilerReset && statsResetCount.Count > 0)
                {
                    optionalProfiler.Reset();
                    profilerReset = true;
                }

                stopwatch.Restart();
                client.UnaryCall(request);
                stopwatch.Stop();

                // spec requires data point in nanoseconds.
                threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);

                timer.WaitForNext();
            }
        }

        private async Task RunUnaryAsync(Channel channel, IInterarrivalTimer timer)
        {
            var client = new BenchmarkService.BenchmarkServiceClient(channel);
            var request = CreateSimpleRequest();
            var stopwatch = new Stopwatch();

            while (!stoppedCts.Token.IsCancellationRequested)
            {
                stopwatch.Restart();
                await client.UnaryCallAsync(request);
                stopwatch.Stop();

                // spec requires data point in nanoseconds.
                threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);

                await timer.WaitForNextAsync();
            }
        }

        private async Task RunStreamingPingPongAsync(Channel channel, IInterarrivalTimer timer)
        {
            var client = new BenchmarkService.BenchmarkServiceClient(channel);
            var request = CreateSimpleRequest();
            var stopwatch = new Stopwatch();

            using (var call = client.StreamingCall())
            {
                while (!stoppedCts.Token.IsCancellationRequested)
                {
                    stopwatch.Restart();
                    await call.RequestStream.WriteAsync(request);
                    await call.ResponseStream.MoveNext();
                    stopwatch.Stop();

                    // spec requires data point in nanoseconds.
                    threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);

                    await timer.WaitForNextAsync();
                }

                // finish the streaming call
                await call.RequestStream.CompleteAsync();
                Assert.IsFalse(await call.ResponseStream.MoveNext());
            }
        }

        private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer)
        {
            var request = cachedByteBufferRequest.Value;
            var stopwatch = new Stopwatch();

            var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions());

            using (var call = Calls.AsyncDuplexStreamingCall(callDetails))
            {
                while (!stoppedCts.Token.IsCancellationRequested)
                {
                    stopwatch.Restart();
                    await call.RequestStream.WriteAsync(request);
                    await call.ResponseStream.MoveNext();
                    stopwatch.Stop();

                    // spec requires data point in nanoseconds.
                    threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);

                    await timer.WaitForNextAsync();
                }

                // finish the streaming call
                await call.RequestStream.CompleteAsync();
                Assert.IsFalse(await call.ResponseStream.MoveNext());
            }
        }

        private Task RunClientAsync(Channel channel, IInterarrivalTimer timer, BasicProfiler optionalProfiler)
        {
            if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams)
            {
                GrpcPreconditions.CheckArgument(clientType == ClientType.AsyncClient, "Generic client only supports async API");
                GrpcPreconditions.CheckArgument(rpcType == RpcType.Streaming, "Generic client only supports streaming calls");
                return RunGenericStreamingAsync(channel, timer);
            }

            GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
            if (clientType == ClientType.SyncClient)
            {
                GrpcPreconditions.CheckArgument(rpcType == RpcType.Unary, "Sync client can only be used for Unary calls in C#");
                // create a dedicated thread for the synchronous client
                return Task.Factory.StartNew(() => RunUnary(channel, timer, optionalProfiler), TaskCreationOptions.LongRunning);
            }
            else if (clientType == ClientType.AsyncClient)
            {
                switch (rpcType)
                {
                    case RpcType.Unary:
                        return RunUnaryAsync(channel, timer);
                    case RpcType.Streaming:
                        return RunStreamingPingPongAsync(channel, timer);
                }
            }
            throw new ArgumentException("Unsupported configuration.");
        }

        private SimpleRequest CreateSimpleRequest()
        {
            GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
            return new SimpleRequest
            {
                Payload = CreateZerosPayload(payloadConfig.SimpleParams.ReqSize),
                ResponseSize = payloadConfig.SimpleParams.RespSize
            };
        }

        private static Payload CreateZerosPayload(int size)
        {
            return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
        }

        private static IInterarrivalTimer CreateTimer(LoadParams loadParams, double loadMultiplier)
        {
            switch (loadParams.LoadCase)
            {
                case LoadParams.LoadOneofCase.ClosedLoop:
                    return new ClosedLoopInterarrivalTimer();
                case LoadParams.LoadOneofCase.Poisson:
                    return new PoissonInterarrivalTimer(loadParams.Poisson.OfferedLoad * loadMultiplier);
                default:
                    throw new ArgumentException("Unknown load type");
            }
        }
    }
}