diff options
author | vjpai <vpai@google.com> | 2016-03-30 09:58:46 -0700 |
---|---|---|
committer | vjpai <vpai@google.com> | 2016-03-30 09:58:46 -0700 |
commit | ad1c1cc6cf4495b2af05058335869eafedd3e9fb (patch) | |
tree | 55a29d8c6482a07b2c250f74f09acea3bb343b9d /src | |
parent | d7f43b3aabd3d34ebd7ac2ce55fe86095fb92a8b (diff) |
Add support for unary client, closed-loop or poisson
Diffstat (limited to 'src')
-rw-r--r-- | src/ruby/qps/client.rb | 81 | ||||
-rw-r--r-- | src/ruby/qps/qps-common.rb | 8 | ||||
-rw-r--r-- | src/ruby/qps/server.rb | 11 | ||||
-rwxr-xr-x | src/ruby/qps/worker.rb | 3 |
4 files changed, 92 insertions, 11 deletions
diff --git a/src/ruby/qps/client.rb b/src/ruby/qps/client.rb index 0460168ad5..aa57b06c97 100644 --- a/src/ruby/qps/client.rb +++ b/src/ruby/qps/client.rb @@ -55,3 +55,84 @@ class Poisson end end +class BenchmarkClient + def initialize(config) + if config.security_params + if config.security_params.use_test_ca + certs = load_test_certs + cred = GRPC::Core::Credentials.new(certs[0]) + else + p 'Unsupported to use non-test CA (TBD)' + exit + end + if config.security_params.server_host_override + p 'Unsupported to use severt host override (TBD)' + exit + end + else + cred = :this_channel_is_insecure + end + @histres = config.histogram_params.resolution + @histmax = config.histogram_params.max_possible + @start_time = Time.now + @histogram = Histogram.new(@histres, @histmax) + @done = false + (0..config.client_channels-1).each do |i| + Thread.new { + stub = '' + req = Grpc::Testing::SimpleRequest.new(response_type: Grpc::Testing::PayloadType::COMPRESSABLE, + response_size: config.payload_config.simple_params.resp_size, + payload: Grpc::Testing::Payload.new(type: Grpc::Testing::PayloadType::COMPRESSABLE, + body: nulls(config.payload_config.simple_params.req_size))) + case config.load_params.load.to_s + when 'closed_loop' + waiter = nil + when 'poisson' + waiter = Poisson.new(config.load_params.poisson.offered_load / config.client_channels) + end + stub = Grpc::Testing::BenchmarkService::Stub.new(config.server_targets[i % config.server_targets.length], cred) + case config.rpc_type + when :UNARY + unary_ping_ponger(req,stub,config,waiter) + when :STREAMING + streaming_ping_ponger(req,stub,config,waiter) + end + } + end + end + def wait_to_issue(waiter) + if waiter + delay = waiter.advance-Time.now + sleep delay if delay > 0 + end + end + def unary_ping_ponger(req, stub, config,waiter) + while !@done + wait_to_issue(waiter) + start = Time.now + resp = stub.unary_call(req) + @histogram.add((Time.now-start)*1e9) + end + end + def streaming_ping_ponger(req, stub, config, waiter) + end + def mark(reset) + lat = Grpc::Testing::HistogramData.new( + bucket: @histogram.contents, + min_seen: @histogram.minimum, + max_seen: @histogram.maximum, + sum: @histogram.sum, + sum_of_squares: @histogram.sum_of_squares, + count: @histogram.count + ) + elapsed = Time.now-@start_time + if reset + @start_time = Time.now + @histogram = Histogram.new(@histres, @histmax) + end + Grpc::Testing::ClientStats.new(latencies: lat, time_elapsed: elapsed) + end + def shutdown + @done = true + end +end diff --git a/src/ruby/qps/qps-common.rb b/src/ruby/qps/qps-common.rb index 82c24d42b9..4119d600b1 100644 --- a/src/ruby/qps/qps-common.rb +++ b/src/ruby/qps/qps-common.rb @@ -44,6 +44,14 @@ def nulls(l) [].pack('x' * l).force_encoding('ascii-8bit') end +# load the test-only certificates +def load_test_certs + this_dir = File.expand_path(File.dirname(__FILE__)) + data_dir = File.join(File.dirname(this_dir), 'spec/testdata') + files = ['ca.pem', 'server1.key', 'server1.pem'] + files.map { |f| File.open(File.join(data_dir, f)).read } +end + # A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. class EnumeratorQueue extend Forwardable diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb index 44a80988d7..25383be7af 100644 --- a/src/ruby/qps/server.rb +++ b/src/ruby/qps/server.rb @@ -62,13 +62,6 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service end end -def load_test_certs - this_dir = File.expand_path(File.dirname(__FILE__)) - data_dir = File.join(File.dirname(this_dir), 'spec/testdata') - files = ['ca.pem', 'server1.key', 'server1.pem'] - files.map { |f| File.open(File.join(data_dir, f)).read } -end - class BenchmarkServer def initialize(config, port) if config.security_params @@ -87,9 +80,7 @@ class BenchmarkServer end def mark(reset) s = Grpc::Testing::ServerStats.new(time_elapsed: (Time.now-@start_time).to_f) - if reset - @start_time = Time.now - end + @start_time = Time.now if reset s end def get_port diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb index 208211c410..744d19751d 100755 --- a/src/ruby/qps/worker.rb +++ b/src/ruby/qps/worker.rb @@ -73,7 +73,7 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service Thread.new { client = '' reqs.each do |req| - case req.argtype + case req.argtype.to_s when 'setup' client = BenchmarkClient.new(req.setup) q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(false))) @@ -82,6 +82,7 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service end end q.push(self) + client.shutdown } q.each_item end |