aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2016-08-15 18:44:14 -0700
committerGravatar Alexander Polcyn <apolcyn@google.com>2016-08-17 14:47:28 -0700
commit847f9ecc22587170dfd76d4fc3589443f6b0dc4b (patch)
treead9cab11c1bda24345c94de9898083e01938f7f1 /src
parent3bc78cd0b5bd784a235c01612d634b1ec5f8fb97 (diff)
fail benchmarks with errors in a child rpc thread
Diffstat (limited to 'src')
-rw-r--r--src/ruby/qps/client.rb7
-rw-r--r--src/ruby/qps/server.rb3
-rwxr-xr-xsrc/ruby/qps/worker.rb8
3 files changed, 14 insertions, 4 deletions
diff --git a/src/ruby/qps/client.rb b/src/ruby/qps/client.rb
index 7ed648acef..8aed866da5 100644
--- a/src/ruby/qps/client.rb
+++ b/src/ruby/qps/client.rb
@@ -89,12 +89,14 @@ class BenchmarkClient
payload: gtp.new(type: gtpt::COMPRESSABLE,
body: nulls(simple_params.req_size)))
+ @child_threads = []
+
(0..config.client_channels-1).each do |chan|
gtbss = Grpc::Testing::BenchmarkService::Stub
st = config.server_targets
stub = gtbss.new(st[chan % st.length], cred, **opts)
(0..config.outstanding_rpcs_per_channel-1).each do |r|
- Thread.new {
+ @child_threads << Thread.new {
case config.load_params.load.to_s
when 'closed_loop'
waiter = nil
@@ -162,5 +164,8 @@ class BenchmarkClient
end
def shutdown
@done = true
+ @child_threads.each do |thread|
+ thread.join
+ end
end
end
diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb
index cd98ee1fd9..f51f86d9a9 100644
--- a/src/ruby/qps/server.rb
+++ b/src/ruby/qps/server.rb
@@ -71,7 +71,8 @@ class BenchmarkServer
else
cred = :this_port_is_insecure
end
- @server = GRPC::RpcServer.new
+ # Make sure server can handle the large number of calls in benchmarks
+ @server = GRPC::RpcServer.new(pool_size: 100, max_waiting_requests: 100)
@port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred)
@server.handle(BenchmarkServiceImpl.new)
@start_time = Time.now
diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb
index 12b8087ca0..61a0b723a3 100755
--- a/src/ruby/qps/worker.rb
+++ b/src/ruby/qps/worker.rb
@@ -64,8 +64,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
q.push(gtss.new(stats: bms.mark(req.mark.reset), cores: cpu_cores))
end
end
- q.push(self)
bms.stop
+ q.push(self)
}
q.each_item
end
@@ -83,8 +83,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
client.mark(req.mark.reset)))
end
end
- q.push(self)
client.shutdown
+ q.push(self)
}
q.each_item
end
@@ -118,6 +118,10 @@ def main
options['server_port'] = v
end
end.parse!
+
+ # Configure any errors with client or server child threads to surface
+ Thread.abort_on_exception = true
+
s = GRPC::RpcServer.new
s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s,
:this_port_is_insecure)