aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2016-04-05 23:05:01 +0000
committerGravatar Yang Gao <yangg@google.com>2016-04-05 23:05:01 +0000
commit39afce836de1fb6e49c0eefc5ed77a4d44f1837a (patch)
treea8ae7a5a9a365d7039bb4f63113f85d20a8efb43
parentdb566d21292e089232663092f31bf27efa3d0391 (diff)
parentaa4a7f526a22f0205ff4ac927df2b92fafd38f4a (diff)
Merge branch 'master' into hpack_table
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c36
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ClientRunners.cs123
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj1
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs148
-rw-r--r--src/ruby/ext/grpc/rb_call.c6
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb35
-rw-r--r--tools/run_tests/performance/__init__.py28
-rwxr-xr-xtools/run_tests/performance/build_performance.sh15
-rw-r--r--tools/run_tests/performance/scenario_config.py153
-rwxr-xr-xtools/run_tests/run_performance_tests.py133
10 files changed, 469 insertions, 209 deletions
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index b50b4ee480..76c03c08b5 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -31,13 +31,13 @@
*
*/
-#include <grpc/support/port_platform.h>
-
+#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
+#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/client_config/lb_policy_registry.h"
@@ -263,22 +263,24 @@ static grpc_resolver *sockaddr_create(
r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
- r->lb_policy_name = NULL;
- if (0 != strcmp(args->uri->query, "")) {
- gpr_slice query_slice;
- gpr_slice_buffer query_parts;
-
- query_slice =
- gpr_slice_new(args->uri->query, strlen(args->uri->query), do_nothing);
- gpr_slice_buffer_init(&query_parts);
- gpr_slice_split(query_slice, "=", &query_parts);
- GPR_ASSERT(query_parts.count == 2);
- if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) {
- r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII);
- }
- gpr_slice_buffer_destroy(&query_parts);
- gpr_slice_unref(query_slice);
+ r->lb_policy_name =
+ gpr_strdup(grpc_uri_get_query_arg(args->uri, "lb_policy"));
+ const char *lb_enabled_qpart =
+ grpc_uri_get_query_arg(args->uri, "lb_enabled");
+ /* anything other than "0" is interpreted as true */
+ const bool lb_enabled =
+ (lb_enabled_qpart != NULL && (strcmp("0", lb_enabled_qpart) != 0));
+
+ if (r->lb_policy_name != NULL && strcmp("grpclb", r->lb_policy_name) == 0 &&
+ !lb_enabled) {
+ /* we want grpclb but the "resolved" addresses aren't LB enabled. Bail
+ * out, as this is meant mostly for tests. */
+ gpr_log(GPR_ERROR,
+ "Requested 'grpclb' LB policy but resolved addresses don't "
+ "support load balancing.");
+ abort();
}
+
if (r->lb_policy_name == NULL) {
r->lb_policy_name = gpr_strdup(default_lb_policy_name);
}
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
index 0bcacf76e5..f954ca5f34 100644
--- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
@@ -61,15 +61,7 @@ namespace Grpc.IntegrationTesting
public static IClientRunner CreateStarted(ClientConfig config)
{
Logger.Debug("ClientConfig: {0}", config);
- string target = config.ServerTargets.Single();
- GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop,
- "Only closed loop scenario supported for C#");
- GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1");
- if (config.OutstandingRpcsPerChannel != 0)
- {
- Logger.Warning("ClientConfig.OutstandingRpcsPerChannel is not supported for C#. Ignoring the value");
- }
if (config.AsyncClientThreads != 0)
{
Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value");
@@ -83,22 +75,40 @@ namespace Grpc.IntegrationTesting
Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value");
}
- var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
+ var channels = CreateChannels(config.ClientChannels, config.ServerTargets, config.SecurityParams);
+
+ return new ClientRunnerImpl(channels,
+ config.ClientType,
+ config.RpcType,
+ config.OutstandingRpcsPerChannel,
+ config.LoadParams,
+ config.PayloadConfig,
+ config.HistogramParams);
+ }
+
+ private static List<Channel> CreateChannels(int clientChannels, IEnumerable<string> serverTargets, SecurityParams securityParams)
+ {
+ GrpcPreconditions.CheckArgument(clientChannels > 0, "clientChannels needs to be at least 1.");
+ GrpcPreconditions.CheckArgument(serverTargets.Count() > 0, "at least one serverTarget needs to be specified.");
+
+ var credentials = securityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
List<ChannelOption> channelOptions = null;
- if (config.SecurityParams != null && config.SecurityParams.ServerHostOverride != "")
+ if (securityParams != null && securityParams.ServerHostOverride != "")
{
channelOptions = new List<ChannelOption>
{
- new ChannelOption(ChannelOptions.SslTargetNameOverride, config.SecurityParams.ServerHostOverride)
+ new ChannelOption(ChannelOptions.SslTargetNameOverride, securityParams.ServerHostOverride)
};
}
- var channel = new Channel(target, credentials, channelOptions);
- return new ClientRunnerImpl(channel,
- config.ClientType,
- config.RpcType,
- config.PayloadConfig,
- config.HistogramParams);
+ var result = new List<Channel>();
+ for (int i = 0; i < clientChannels; i++)
+ {
+ var target = serverTargets.ElementAt(i % serverTargets.Count());
+ var channel = new Channel(target, credentials, channelOptions);
+ result.Add(channel);
+ }
+ return result;
}
}
@@ -106,30 +116,35 @@ namespace Grpc.IntegrationTesting
{
const double SecondsToNanos = 1e9;
- readonly Channel channel;
+ readonly List<Channel> channels;
readonly ClientType clientType;
readonly RpcType rpcType;
readonly PayloadConfig payloadConfig;
readonly Histogram histogram;
- readonly BenchmarkService.BenchmarkServiceClient client;
- readonly Task runnerTask;
- readonly CancellationTokenSource stoppedCts;
+ readonly List<Task> runnerTasks;
+ readonly CancellationTokenSource stoppedCts = new CancellationTokenSource();
readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
- public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams)
+ public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams)
{
- this.channel = GrpcPreconditions.CheckNotNull(channel);
+ GrpcPreconditions.CheckArgument(outstandingRpcsPerChannel > 0, "outstandingRpcsPerChannel");
+ this.channels = new List<Channel>(channels);
this.clientType = clientType;
this.rpcType = rpcType;
this.payloadConfig = payloadConfig;
this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible);
- this.stoppedCts = new CancellationTokenSource();
- this.client = BenchmarkService.NewClient(channel);
-
- var threadBody = GetThreadBody();
- this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning);
+ this.runnerTasks = new List<Task>();
+ foreach (var channel in this.channels)
+ {
+ for (int i = 0; i < outstandingRpcsPerChannel; i++)
+ {
+ var timer = CreateTimer(loadParams, 1.0 / this.channels.Count / outstandingRpcsPerChannel);
+ var threadBody = GetThreadBody(channel, timer);
+ this.runnerTasks.Add(Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning));
+ }
+ }
}
public ClientStats GetStats(bool reset)
@@ -150,12 +165,19 @@ namespace Grpc.IntegrationTesting
public async Task StopAsync()
{
stoppedCts.Cancel();
- await runnerTask;
- await channel.ShutdownAsync();
+ foreach (var runnerTask in runnerTasks)
+ {
+ await runnerTask;
+ }
+ foreach (var channel in channels)
+ {
+ await channel.ShutdownAsync();
+ }
}
- private void RunClosedLoopUnary()
+ private void RunUnary(Channel channel, IInterarrivalTimer timer)
{
+ var client = BenchmarkService.NewClient(channel);
var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
@@ -167,11 +189,14 @@ namespace Grpc.IntegrationTesting
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+
+ timer.WaitForNext();
}
}
- private async Task RunClosedLoopUnaryAsync()
+ private async Task RunUnaryAsync(Channel channel, IInterarrivalTimer timer)
{
+ var client = BenchmarkService.NewClient(channel);
var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
@@ -183,11 +208,14 @@ namespace Grpc.IntegrationTesting
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+
+ await timer.WaitForNextAsync();
}
}
- private async Task RunClosedLoopStreamingAsync()
+ private async Task RunStreamingPingPongAsync(Channel channel, IInterarrivalTimer timer)
{
+ var client = BenchmarkService.NewClient(channel);
var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
@@ -202,6 +230,8 @@ namespace Grpc.IntegrationTesting
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+
+ await timer.WaitForNextAsync();
}
// finish the streaming call
@@ -210,7 +240,7 @@ namespace Grpc.IntegrationTesting
}
}
- private async Task RunGenericClosedLoopStreamingAsync()
+ private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer)
{
var request = CreateByteBufferRequest();
var stopwatch = new Stopwatch();
@@ -228,6 +258,8 @@ namespace Grpc.IntegrationTesting
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+
+ await timer.WaitForNextAsync();
}
// finish the streaming call
@@ -236,7 +268,7 @@ namespace Grpc.IntegrationTesting
}
}
- private Action GetThreadBody()
+ private Action GetThreadBody(Channel channel, IInterarrivalTimer timer)
{
if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams)
{
@@ -244,7 +276,7 @@ namespace Grpc.IntegrationTesting
GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls");
return () =>
{
- RunGenericClosedLoopStreamingAsync().Wait();
+ RunGenericStreamingAsync(channel, timer).Wait();
};
}
@@ -252,7 +284,7 @@ namespace Grpc.IntegrationTesting
if (clientType == ClientType.SYNC_CLIENT)
{
GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#");
- return RunClosedLoopUnary;
+ return () => RunUnary(channel, timer);
}
else if (clientType == ClientType.ASYNC_CLIENT)
{
@@ -261,12 +293,12 @@ namespace Grpc.IntegrationTesting
case RpcType.UNARY:
return () =>
{
- RunClosedLoopUnaryAsync().Wait();
+ RunUnaryAsync(channel, timer).Wait();
};
case RpcType.STREAMING:
return () =>
{
- RunClosedLoopStreamingAsync().Wait();
+ RunStreamingPingPongAsync(channel, timer).Wait();
};
}
}
@@ -292,5 +324,18 @@ namespace Grpc.IntegrationTesting
{
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
}
+
+ private static IInterarrivalTimer CreateTimer(LoadParams loadParams, double loadMultiplier)
+ {
+ switch (loadParams.LoadCase)
+ {
+ case LoadParams.LoadOneofCase.ClosedLoop:
+ return new ClosedLoopInterarrivalTimer();
+ case LoadParams.LoadOneofCase.Poisson:
+ return new PoissonInterarrivalTimer(loadParams.Poisson.OfferedLoad * loadMultiplier);
+ default:
+ throw new ArgumentException("Unknown load type");
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
index 64d14b0df5..7ea80b11f0 100644
--- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
+++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
@@ -115,6 +115,7 @@
<Compile Include="GenericService.cs" />
<Compile Include="GeneratedServiceBaseTest.cs" />
<Compile Include="GeneratedClientTest.cs" />
+ <Compile Include="InterarrivalTimers.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs b/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs
new file mode 100644
index 0000000000..6492d34890
--- /dev/null
+++ b/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs
@@ -0,0 +1,148 @@
+#region Copyright notice and license
+
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Text.RegularExpressions;
+using System.Threading;
+using System.Threading.Tasks;
+using Google.Protobuf;
+using Grpc.Core;
+using Grpc.Core.Utils;
+using Grpc.Testing;
+
+namespace Grpc.IntegrationTesting
+{
+ public interface IInterarrivalTimer
+ {
+ void WaitForNext();
+
+ Task WaitForNextAsync();
+ }
+
+ /// <summary>
+ /// Interarrival timer that doesn't wait at all.
+ /// </summary>
+ public class ClosedLoopInterarrivalTimer : IInterarrivalTimer
+ {
+ public ClosedLoopInterarrivalTimer()
+ {
+ }
+
+ public void WaitForNext()
+ {
+ // NOP
+ }
+
+ public Task WaitForNextAsync()
+ {
+ return Task.FromResult<object>(null);
+ }
+ }
+
+ /// <summary>
+ /// Interarrival timer that generates Poisson process load.
+ /// </summary>
+ public class PoissonInterarrivalTimer : IInterarrivalTimer
+ {
+ readonly ExponentialDistribution exponentialDistribution;
+ DateTime? lastEventTime;
+
+ public PoissonInterarrivalTimer(double offeredLoad)
+ {
+ this.exponentialDistribution = new ExponentialDistribution(new Random(), offeredLoad);
+ this.lastEventTime = DateTime.UtcNow;
+ }
+
+ public void WaitForNext()
+ {
+ var waitDuration = GetNextWaitDuration();
+ int millisTimeout = (int) Math.Round(waitDuration.TotalMilliseconds);
+ if (millisTimeout > 0)
+ {
+ // TODO(jtattermusch): probably only works well for a relatively low interarrival rate
+ Thread.Sleep(millisTimeout);
+ }
+ }
+
+ public async Task WaitForNextAsync()
+ {
+ var waitDuration = GetNextWaitDuration();
+ int millisTimeout = (int) Math.Round(waitDuration.TotalMilliseconds);
+ if (millisTimeout > 0)
+ {
+ // TODO(jtattermusch): probably only works well for a relatively low interarrival rate
+ await Task.Delay(millisTimeout);
+ }
+ }
+
+ private TimeSpan GetNextWaitDuration()
+ {
+ if (!lastEventTime.HasValue)
+ {
+ this.lastEventTime = DateTime.Now;
+ }
+
+ var origLastEventTime = this.lastEventTime.Value;
+ this.lastEventTime = origLastEventTime + TimeSpan.FromSeconds(exponentialDistribution.Next());
+ return this.lastEventTime.Value - origLastEventTime;
+ }
+
+ /// <summary>
+ /// Exp generator.
+ /// </summary>
+ private class ExponentialDistribution
+ {
+ readonly Random random;
+ readonly double lambda;
+ readonly double lambdaReciprocal;
+
+ public ExponentialDistribution(Random random, double lambda)
+ {
+ this.random = random;
+ this.lambda = lambda;
+ this.lambdaReciprocal = 1.0 / lambda;
+ }
+
+ public double Next()
+ {
+ double uniform = random.NextDouble();
+ // Use 1.0-uni above to avoid NaN if uni is 0
+ return lambdaReciprocal * (-Math.Log(1.0 - uniform));
+ }
+ }
+ }
+}
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index dc80d18b45..f5fdbb2ffd 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -359,7 +359,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
md_ary->metadata[md_ary->count].value_length = value_len;
md_ary->count += 1;
}
- } else {
+ } else if (TYPE(val) == T_STRING) {
value_str = RSTRING_PTR(val);
value_len = RSTRING_LEN(val);
if (!grpc_is_binary_header(key_str, key_len) &&
@@ -373,6 +373,10 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
md_ary->metadata[md_ary->count].value = value_str;
md_ary->metadata[md_ary->count].value_length = value_len;
md_ary->count += 1;
+ } else {
+ rb_raise(rb_eArgError,
+ "Header values must be of type string or array");
+ return ST_STOP;
}
return ST_CONTINUE;
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 5e13c25fcf..dd8e2e9f7a 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -193,44 +193,45 @@ describe 'ClientStub' do
describe '#client_streamer' do
shared_examples 'client streaming' do
before(:each) do
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ @stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
+ @options = { k1: 'v1', k2: 'v2' }
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@resp = 'a_reply'
end
it 'should send requests to/receive a reply from a server' do
- server_port = create_test_server
- host = "localhost:#{server_port}"
th = run_client_streamer(@sent_msgs, @resp, @pass)
- stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
- expect(get_response(stub)).to eq(@resp)
+ expect(get_response(@stub)).to eq(@resp)
th.join
end
it 'should send metadata to the server ok' do
- server_port = create_test_server
- host = "localhost:#{server_port}"
- th = run_client_streamer(@sent_msgs, @resp, @pass,
- k1: 'v1', k2: 'v2')
- stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
- expect(get_response(stub)).to eq(@resp)
+ th = run_client_streamer(@sent_msgs, @resp, @pass, @options)
+ expect(get_response(@stub)).to eq(@resp)
th.join
end
it 'should raise an error if the status is not ok' do
- server_port = create_test_server
- host = "localhost:#{server_port}"
th = run_client_streamer(@sent_msgs, @resp, @fail)
- stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
- blk = proc { get_response(stub) }
+ blk = proc { get_response(@stub) }
expect(&blk).to raise_error(GRPC::BadStatus)
th.join
end
+
+ it 'should raise ArgumentError if metadata contains invalid values' do
+ @options.merge!(k3: 3)
+ expect do
+ get_response(@stub)
+ end.to raise_error(ArgumentError,
+ /Header values must be of type string or array/)
+ end
end
describe 'without a call operation' do
def get_response(stub)
- stub.client_streamer(@method, @sent_msgs, noop, noop,
- k1: 'v1', k2: 'v2')
+ stub.client_streamer(@method, @sent_msgs, noop, noop, @options)
end
it_behaves_like 'client streaming'
@@ -239,7 +240,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_response(stub)
op = stub.client_streamer(@method, @sent_msgs, noop, noop,
- return_op: true, k1: 'v1', k2: 'v2')
+ @options.merge(return_op: true))
expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute
end
diff --git a/tools/run_tests/performance/__init__.py b/tools/run_tests/performance/__init__.py
new file mode 100644
index 0000000000..100a624dc9
--- /dev/null
+++ b/tools/run_tests/performance/__init__.py
@@ -0,0 +1,28 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/tools/run_tests/performance/build_performance.sh b/tools/run_tests/performance/build_performance.sh
index e5c0f86369..b33ee3a58c 100755
--- a/tools/run_tests/performance/build_performance.sh
+++ b/tools/run_tests/performance/build_performance.sh
@@ -36,16 +36,17 @@ cd $(dirname $0)/../../..
CONFIG=${CONFIG:-opt}
+# build C++ qps worker & driver always - we need at least the driver to
+# run any of the scenarios.
+# TODO(jtattermusch): not embedding OpenSSL breaks the C# build because
+# grpc_csharp_ext needs OpenSSL embedded and some intermediate files from
+# this build will be reused.
+make CONFIG=${CONFIG} EMBED_OPENSSL=true EMBED_ZLIB=true qps_worker qps_driver -j8
+
for language in $@
do
- if [ "$language" == "c++" ]
+ if [ "$language" != "c++" ]
then
- # build C++ qps worker & driver
- # TODO(jtattermusch): not embedding OpenSSL breaks the C# build because
- # grpc_csharp_ext needs OpenSSL embedded and some intermediate files from
- # this build will be reused.
- make CONFIG=${CONFIG} EMBED_OPENSSL=true EMBED_ZLIB=true qps_worker qps_driver -j8
- else
tools/run_tests/run_tests.py -l $language -c $CONFIG --build_only -j 8
fi
done
diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py
new file mode 100644
index 0000000000..f95e531fa2
--- /dev/null
+++ b/tools/run_tests/performance/scenario_config.py
@@ -0,0 +1,153 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+# performance scenario configuration for various languages
+
+class CXXLanguage:
+
+ def __init__(self):
+ self.safename = 'cxx'
+
+ def worker_cmdline(self):
+ return ['bins/opt/qps_worker']
+
+ def worker_port_offset(self):
+ return 0
+
+ def scenarios(self):
+ # TODO(jtattermusch): add more scenarios
+ return {
+ # Scenario 1: generic async streaming ping-pong (contentionless latency)
+ 'cpp_async_generic_streaming_ping_pong': [
+ '--rpc_type=STREAMING',
+ '--client_type=ASYNC_CLIENT',
+ '--server_type=ASYNC_GENERIC_SERVER',
+ '--outstanding_rpcs_per_channel=1',
+ '--client_channels=1',
+ '--bbuf_req_size=0',
+ '--bbuf_resp_size=0',
+ '--async_client_threads=1',
+ '--async_server_threads=1',
+ '--secure_test=true',
+ '--num_servers=1',
+ '--num_clients=1',
+ '--server_core_limit=0',
+ '--client_core_limit=0'],
+ # Scenario 5: Sync unary ping-pong with protobufs
+ 'cpp_sync_unary_ping_pong_protobuf': [
+ '--rpc_type=UNARY',
+ '--client_type=SYNC_CLIENT',
+ '--server_type=SYNC_SERVER',
+ '--outstanding_rpcs_per_channel=1',
+ '--client_channels=1',
+ '--simple_req_size=0',
+ '--simple_resp_size=0',
+ '--secure_test=true',
+ '--num_servers=1',
+ '--num_clients=1',
+ '--server_core_limit=0',
+ '--client_core_limit=0']}
+
+ def __str__(self):
+ return 'c++'
+
+
+class CSharpLanguage:
+
+ def __init__(self):
+ self.safename = str(self)
+
+ def worker_cmdline(self):
+ return ['tools/run_tests/performance/run_worker_csharp.sh']
+
+ def worker_port_offset(self):
+ return 100
+
+ def scenarios(self):
+ # TODO(jtattermusch): add more scenarios
+ return {
+ # Scenario 1: generic async streaming ping-pong (contentionless latency)
+ 'csharp_async_generic_streaming_ping_pong': [
+ '--rpc_type=STREAMING',
+ '--client_type=ASYNC_CLIENT',
+ '--server_type=ASYNC_GENERIC_SERVER',
+ '--outstanding_rpcs_per_channel=1',
+ '--client_channels=1',
+ '--bbuf_req_size=0',
+ '--bbuf_resp_size=0',
+ '--async_client_threads=1',
+ '--async_server_threads=1',
+ '--secure_test=true',
+ '--num_servers=1',
+ '--num_clients=1',
+ '--server_core_limit=0',
+ '--client_core_limit=0']}
+
+ def __str__(self):
+ return 'csharp'
+
+
+class NodeLanguage:
+
+ def __init__(self):
+ pass
+ self.safename = str(self)
+
+ def worker_cmdline(self):
+ return ['tools/run_tests/performance/run_worker_node.sh']
+
+ def worker_port_offset(self):
+ return 200
+
+ def scenarios(self):
+ # TODO(jtattermusch): add more scenarios
+ return {
+ 'node_sync_unary_ping_pong_protobuf': [
+ '--rpc_type=UNARY',
+ '--client_type=ASYNC_CLIENT',
+ '--server_type=ASYNC_SERVER',
+ '--outstanding_rpcs_per_channel=1',
+ '--client_channels=1',
+ '--simple_req_size=0',
+ '--simple_resp_size=0',
+ '--secure_test=false',
+ '--num_servers=1',
+ '--num_clients=1',
+ '--server_core_limit=0',
+ '--client_core_limit=0']}
+
+ def __str__(self):
+ return 'node'
+
+
+LANGUAGES = {
+ 'c++' : CXXLanguage(),
+ 'csharp' : CSharpLanguage(),
+ 'node' : NodeLanguage(),
+}
diff --git a/tools/run_tests/run_performance_tests.py b/tools/run_tests/run_performance_tests.py
index 0ab3d264a5..e1268e2ecb 100755
--- a/tools/run_tests/run_performance_tests.py
+++ b/tools/run_tests/run_performance_tests.py
@@ -40,6 +40,7 @@ import sys
import tempfile
import time
import uuid
+import performance.scenario_config as scenario_config
_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
@@ -49,130 +50,6 @@ os.chdir(_ROOT)
_REMOTE_HOST_USERNAME = 'jenkins'
-class CXXLanguage:
-
- def __init__(self):
- self.safename = 'cxx'
-
- def worker_cmdline(self):
- return ['bins/opt/qps_worker']
-
- def worker_port_offset(self):
- return 0
-
- def scenarios(self):
- # TODO(jtattermusch): add more scenarios
- return {
- # Scenario 1: generic async streaming ping-pong (contentionless latency)
- 'cpp_async_generic_streaming_ping_pong': [
- '--rpc_type=STREAMING',
- '--client_type=ASYNC_CLIENT',
- '--server_type=ASYNC_GENERIC_SERVER',
- '--outstanding_rpcs_per_channel=1',
- '--client_channels=1',
- '--bbuf_req_size=0',
- '--bbuf_resp_size=0',
- '--async_client_threads=1',
- '--async_server_threads=1',
- '--secure_test=true',
- '--num_servers=1',
- '--num_clients=1',
- '--server_core_limit=0',
- '--client_core_limit=0'],
- # Scenario 5: Sync unary ping-pong with protobufs
- 'cpp_sync_unary_ping_pong_protobuf': [
- '--rpc_type=UNARY',
- '--client_type=SYNC_CLIENT',
- '--server_type=SYNC_SERVER',
- '--outstanding_rpcs_per_channel=1',
- '--client_channels=1',
- '--simple_req_size=0',
- '--simple_resp_size=0',
- '--secure_test=true',
- '--num_servers=1',
- '--num_clients=1',
- '--server_core_limit=0',
- '--client_core_limit=0']}
-
- def __str__(self):
- return 'c++'
-
-
-class CSharpLanguage:
-
- def __init__(self):
- self.safename = str(self)
-
- def worker_cmdline(self):
- return ['tools/run_tests/performance/run_worker_csharp.sh']
-
- def worker_port_offset(self):
- return 100
-
- def scenarios(self):
- # TODO(jtattermusch): add more scenarios
- return {
- # Scenario 1: generic async streaming ping-pong (contentionless latency)
- 'csharp_async_generic_streaming_ping_pong': [
- '--rpc_type=STREAMING',
- '--client_type=ASYNC_CLIENT',
- '--server_type=ASYNC_GENERIC_SERVER',
- '--outstanding_rpcs_per_channel=1',
- '--client_channels=1',
- '--bbuf_req_size=0',
- '--bbuf_resp_size=0',
- '--async_client_threads=1',
- '--async_server_threads=1',
- '--secure_test=true',
- '--num_servers=1',
- '--num_clients=1',
- '--server_core_limit=0',
- '--client_core_limit=0']}
-
- def __str__(self):
- return 'csharp'
-
-
-class NodeLanguage:
-
- def __init__(self):
- pass
- self.safename = str(self)
-
- def worker_cmdline(self):
- return ['tools/run_tests/performance/run_worker_node.sh']
-
- def worker_port_offset(self):
- return 200
-
- def scenarios(self):
- # TODO(jtattermusch): add more scenarios
- return {
- 'node_sync_unary_ping_pong_protobuf': [
- '--rpc_type=UNARY',
- '--client_type=ASYNC_CLIENT',
- '--server_type=ASYNC_SERVER',
- '--outstanding_rpcs_per_channel=1',
- '--client_channels=1',
- '--simple_req_size=0',
- '--simple_resp_size=0',
- '--secure_test=false',
- '--num_servers=1',
- '--num_clients=1',
- '--server_core_limit=0',
- '--client_core_limit=0']}
-
- def __str__(self):
- return 'node'
-
-
-_LANGUAGES = {
- 'c++' : CXXLanguage(),
- 'csharp' : CSharpLanguage(),
- 'node' : NodeLanguage(),
-}
-
-
class QpsWorkerJob:
"""Encapsulates a qps worker server job."""
@@ -272,7 +149,7 @@ def prepare_remote_hosts(hosts):
sys.exit(1)
-def build_on_remote_hosts(hosts, languages=_LANGUAGES.keys(), build_local=False):
+def build_on_remote_hosts(hosts, languages=scenario_config.LANGUAGES.keys(), build_local=False):
"""Builds performance worker on remote hosts (and maybe also locally)."""
build_timeout = 15*60
build_jobs = []
@@ -366,7 +243,7 @@ def finish_qps_workers(jobs):
argp = argparse.ArgumentParser(description='Run performance tests.')
argp.add_argument('-l', '--language',
- choices=['all'] + sorted(_LANGUAGES.keys()),
+ choices=['all'] + sorted(scenario_config.LANGUAGES.keys()),
nargs='+',
default=['all'],
help='Languages to benchmark.')
@@ -380,9 +257,9 @@ argp.add_argument('--remote_worker_host',
args = argp.parse_args()
-languages = set(_LANGUAGES[l]
+languages = set(scenario_config.LANGUAGES[l]
for l in itertools.chain.from_iterable(
- _LANGUAGES.iterkeys() if x == 'all' else [x]
+ scenario_config.LANGUAGES.iterkeys() if x == 'all' else [x]
for x in args.language))
# Put together set of remote hosts where to run and build