aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2016-04-22 13:37:26 -0700
committerGravatar murgatroid99 <mlumish@google.com>2016-04-22 13:37:26 -0700
commitddaa69f15d8b3bb1a6bf9aff231950406fe5e961 (patch)
tree1ec99eee6ff817f2566d5f2794dc0d9863b6e5ab /src/ruby
parente621f13ecd64d005ad3dfe84f67e81dde6c113ef (diff)
Got Ruby stress client working, with some modifications to interop tests
Diffstat (limited to 'src/ruby')
-rw-r--r--src/ruby/pb/grpc/testing/metrics.rb28
-rw-r--r--src/ruby/pb/grpc/testing/metrics_services.rb27
-rwxr-xr-xsrc/ruby/pb/test/client.rb28
-rwxr-xr-xsrc/ruby/pb/test/server.rb2
-rw-r--r--src/ruby/stress/metrics_server.rb83
-rwxr-xr-xsrc/ruby/stress/stress_client.rb155
6 files changed, 301 insertions, 22 deletions
diff --git a/src/ruby/pb/grpc/testing/metrics.rb b/src/ruby/pb/grpc/testing/metrics.rb
new file mode 100644
index 0000000000..3b3c8cd61b
--- /dev/null
+++ b/src/ruby/pb/grpc/testing/metrics.rb
@@ -0,0 +1,28 @@
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# source: grpc/testing/metrics.proto
+
+require 'google/protobuf'
+
+Google::Protobuf::DescriptorPool.generated_pool.build do
+ add_message "grpc.testing.GaugeResponse" do
+ optional :name, :string, 1
+ oneof :value do
+ optional :long_value, :int64, 2
+ optional :double_value, :double, 3
+ optional :string_value, :string, 4
+ end
+ end
+ add_message "grpc.testing.GaugeRequest" do
+ optional :name, :string, 1
+ end
+ add_message "grpc.testing.EmptyMessage" do
+ end
+end
+
+module Grpc
+ module Testing
+ GaugeResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.GaugeResponse").msgclass
+ GaugeRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.GaugeRequest").msgclass
+ EmptyMessage = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.EmptyMessage").msgclass
+ end
+end
diff --git a/src/ruby/pb/grpc/testing/metrics_services.rb b/src/ruby/pb/grpc/testing/metrics_services.rb
new file mode 100644
index 0000000000..f5778bbbb1
--- /dev/null
+++ b/src/ruby/pb/grpc/testing/metrics_services.rb
@@ -0,0 +1,27 @@
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# Source: grpc/testing/metrics.proto for package 'grpc.testing'
+
+require 'grpc'
+require 'grpc/testing/metrics'
+
+module Grpc
+ module Testing
+ module MetricsService
+
+ # TODO: add proto service documentation here
+ class Service
+
+ include GRPC::GenericService
+
+ self.marshal_class_method = :encode
+ self.unmarshal_class_method = :decode
+ self.service_name = 'grpc.testing.MetricsService'
+
+ rpc :GetAllGauges, EmptyMessage, stream(GaugeResponse)
+ rpc :GetGauge, GaugeRequest, GaugeResponse
+ end
+
+ Stub = Service.rpc_stub_class
+ end
+ end
+end
diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb
index 695a5c4ea2..95b059a18e 100755
--- a/src/ruby/pb/test/client.rb
+++ b/src/ruby/pb/test/client.rb
@@ -38,23 +38,23 @@
# --server_port=<port> \
# --test_case=<testcase_name>
+# These lines are required for the generated files to load grpc
this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
-pb_dir = File.dirname(File.dirname(this_dir))
+pb_dir = File.dirname(this_dir)
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir)
-$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
require 'optparse'
require 'logger'
-require 'grpc'
+require_relative '../../lib/grpc'
require 'googleauth'
require 'google/protobuf'
-require 'test/proto/empty'
-require 'test/proto/messages'
-require 'test/proto/test_services'
+require_relative 'proto/empty'
+require_relative 'proto/messages'
+require_relative 'proto/test_services'
AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR
@@ -208,12 +208,10 @@ class NamedTests
def empty_unary
resp = @stub.empty_call(Empty.new)
assert('empty_unary: invalid response') { resp.is_a?(Empty) }
- p 'OK: empty_unary'
end
def large_unary
perform_large_unary
- p 'OK: large_unary'
end
def service_account_creds
@@ -230,7 +228,6 @@ class NamedTests
assert("#{__callee__}: bad oauth scope") do
@args.oauth_scope.include?(resp.oauth_scope)
end
- p "OK: #{__callee__}"
end
def jwt_token_creds
@@ -238,7 +235,6 @@ class NamedTests
wanted_email = MultiJson.load(json_key)['client_email']
resp = perform_large_unary(fill_username: true)
assert("#{__callee__}: bad username") { wanted_email == resp.username }
- p "OK: #{__callee__}"
end
def compute_engine_creds
@@ -247,7 +243,6 @@ class NamedTests
assert("#{__callee__}: bad username") do
@args.default_service_account == resp.username
end
- p "OK: #{__callee__}"
end
def oauth2_auth_token
@@ -259,7 +254,6 @@ class NamedTests
assert("#{__callee__}: bad oauth scope") do
@args.oauth_scope.include?(resp.oauth_scope)
end
- p "OK: #{__callee__}"
end
def per_rpc_creds
@@ -279,7 +273,6 @@ class NamedTests
assert("#{__callee__}: bad oauth scope") do
@args.oauth_scope.include?(resp.oauth_scope)
end
- p "OK: #{__callee__}"
end
def client_streaming
@@ -293,7 +286,6 @@ class NamedTests
assert("#{__callee__}: aggregate payload size is incorrect") do
wanted_aggregate_size == resp.aggregated_payload_size
end
- p "OK: #{__callee__}"
end
def server_streaming
@@ -311,7 +303,6 @@ class NamedTests
:COMPRESSABLE == r.payload.type
end
end
- p "OK: #{__callee__}"
end
def ping_pong
@@ -319,7 +310,6 @@ class NamedTests
ppp = PingPongPlayer.new(msg_sizes)
resps = @stub.full_duplex_call(ppp.each_item)
resps.each { |r| ppp.queue.push(r) }
- p "OK: #{__callee__}"
end
def timeout_on_sleeping_server
@@ -332,7 +322,6 @@ class NamedTests
assert("#{__callee__}: status was wrong") do
e.code == GRPC::Core::StatusCodes::DEADLINE_EXCEEDED
end
- p "OK: #{__callee__}"
end
def empty_stream
@@ -346,7 +335,6 @@ class NamedTests
assert("#{__callee__}: too many responses expected 0") do
count == 0
end
- p "OK: #{__callee__}"
end
def cancel_after_begin
@@ -361,7 +349,6 @@ class NamedTests
fail 'Should have raised GRPC:Cancelled'
rescue GRPC::Cancelled
assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled }
- p "OK: #{__callee__}"
end
def cancel_after_first_response
@@ -374,7 +361,6 @@ class NamedTests
rescue GRPC::Cancelled
assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled }
op.wait
- p "OK: #{__callee__}"
end
def all
@@ -442,7 +428,7 @@ def parse_args
opts.on('--use_tls USE_TLS', ['false', 'true'],
'require a secure connection?') do |v|
args['secure'] = v == 'true'
- end
+p end
opts.on('--use_test_ca USE_TEST_CA', ['false', 'true'],
'if secure, use the test certificate?') do |v|
args['use_test_ca'] = v == 'true'
diff --git a/src/ruby/pb/test/server.rb b/src/ruby/pb/test/server.rb
index 851e815222..914c7cc79d 100755
--- a/src/ruby/pb/test/server.rb
+++ b/src/ruby/pb/test/server.rb
@@ -39,7 +39,7 @@
this_dir = File.expand_path(File.dirname(__FILE__))
lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib')
-pb_dir = File.dirname(File.dirname(this_dir))
+pb_dir = File.dirname(this_dir)
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir)
$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
diff --git a/src/ruby/stress/metrics_server.rb b/src/ruby/stress/metrics_server.rb
new file mode 100644
index 0000000000..13638c4d21
--- /dev/null
+++ b/src/ruby/stress/metrics_server.rb
@@ -0,0 +1,83 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+require_relative '../pb/grpc/testing/metrics.rb'
+require_relative '../pb/grpc/testing/metrics_services.rb'
+
+class Gauge
+ def get_name
+ raise NoMethodError.new
+ end
+
+ def get_type
+ raise NoMethodError.new
+ end
+
+ def get_value
+ raise NoMethodError.new
+ end
+end
+
+class MetricsServiceImpl < Grpc::Testing::MetricsService::Service
+ include Grpc::Testing
+ @gauges
+
+ def initialize
+ @gauges = {}
+ end
+
+ def register_gauge(gauge)
+ @gauges[gauge.get_name] = gauge
+ end
+
+ def make_gauge_response(gauge)
+ response = GaugeResponse.new(:name => gauge.get_name)
+ value = gauge.get_value
+ case gauge.get_type
+ when 'long'
+ response.long_value = value
+ when 'double'
+ response.double_value = value
+ when 'string'
+ response.string_value = value
+ end
+ response
+ end
+
+ def get_all_gauges(_empty, _call)
+ @gauges.values.map do |gauge|
+ make_gauge_response gauge
+ end
+ end
+
+ def get_gauge(gauge_req, _call)
+ gauge = @gauges[gauge_req.name]
+ make_gauge_response gauge
+ end
+end
diff --git a/src/ruby/stress/stress_client.rb b/src/ruby/stress/stress_client.rb
new file mode 100755
index 0000000000..698f9f1b87
--- /dev/null
+++ b/src/ruby/stress/stress_client.rb
@@ -0,0 +1,155 @@
+#!/usr/bin/env ruby
+
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+require 'optparse'
+require 'thread'
+require_relative '../pb/test/client'
+require_relative './metrics_server'
+require_relative '../lib/grpc'
+
+class QpsGauge < Gauge
+ @query_count
+ @query_mutex
+ @start_time
+
+ def initialize
+ @query_count = 0
+ @query_mutex = Mutex.new
+ @start_time = Time.now
+ end
+
+ def increment_queries
+ @query_mutex.synchronize { @query_count += 1}
+ end
+
+ def get_name
+ 'qps'
+ end
+
+ def get_type
+ 'long'
+ end
+
+ def get_value
+ (@query_mutex.synchronize { @query_count / (Time.now - @start_time) }).to_i
+ end
+end
+
+def start_metrics_server(port)
+ host = "0.0.0.0:#{port}"
+ server = GRPC::RpcServer.new
+ server.add_http2_port(host, :this_port_is_insecure)
+ service = MetricsServiceImpl.new
+ server.handle(service)
+ server_thread = Thread.new { server.run_till_terminated }
+ [server, service, server_thread]
+end
+
+StressArgs = Struct.new(:server_addresses, :test_cases, :duration,
+ :channels_per_server, :concurrent_calls, :metrics_port)
+
+def start(stress_args)
+ running = true
+ threads = []
+ qps_gauge = QpsGauge.new
+ metrics_server, metrics_service, metrics_thread =
+ start_metrics_server(stress_args.metrics_port)
+ metrics_service.register_gauge(qps_gauge)
+ stress_args.server_addresses.each do |address|
+ stress_args.channels_per_server.times do
+ client_args = Args.new
+ client_args.host, client_args.port = address.split(':')
+ client_args.secure = false
+ client_args.test_case = ''
+ stub = create_stub(client_args)
+ named_tests = NamedTests.new(stub, client_args)
+ stress_args.concurrent_calls.times do
+ threads << Thread.new do
+ while running
+ named_tests.method(stress_args.test_cases.sample).call
+ qps_gauge.increment_queries
+ end
+ end
+ end
+ end
+ end
+ if stress_args.duration >= 0
+ sleep stress_args.duration
+ running = false
+ metrics_server.stop
+ p "QPS: #{qps_gauge.get_value}"
+ threads.each { |thd| thd.join; }
+ end
+ metrics_thread.join
+end
+
+def parse_stress_args
+ stress_args = StressArgs.new
+ stress_args.server_addresses = ['localhost:8080']
+ stress_args.test_cases = []
+ stress_args.duration = -1
+ stress_args.channels_per_server = 1
+ stress_args.concurrent_calls = 1
+ stress_args.metrics_port = '8081'
+ OptionParser.new do |opts|
+ opts.on('--server_addresses [LIST]', Array) do |addrs|
+ stress_args.server_addresses = addrs
+ end
+ opts.on('--test_cases cases', Array) do |cases|
+ stress_args.test_cases = (cases.map do |item|
+ split = item.split(':')
+ [split[0]] * split[1].to_i
+ end).reduce([], :+)
+ end
+ opts.on('--test_duration_secs [INT]', OptionParser::DecimalInteger) do |time|
+ stress_args.duration = time
+ end
+ opts.on('--num_channels_per_server [INT]', OptionParser::DecimalInteger) do |channels|
+ stress_args.channels_per_server = channels
+ end
+ opts.on('--num_stubs_per_channel [INT]', OptionParser::DecimalInteger) do |stubs|
+ stress_args.concurrent_calls = stubs
+ end
+ opts.on('--metrics_port [port]') do |port|
+ stress_args.metrics_port = port
+ end
+ end.parse!
+ stress_args
+end
+
+def main
+ opts = parse_stress_args
+ start(opts)
+end
+
+if __FILE__ == $0
+ main
+end