aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar vjpai <vpai@google.com>2016-03-30 09:58:46 -0700
committerGravatar vjpai <vpai@google.com>2016-03-30 09:58:46 -0700
commitad1c1cc6cf4495b2af05058335869eafedd3e9fb (patch)
tree55a29d8c6482a07b2c250f74f09acea3bb343b9d /src
parentd7f43b3aabd3d34ebd7ac2ce55fe86095fb92a8b (diff)
Add support for unary client, closed-loop or poisson
Diffstat (limited to 'src')
-rw-r--r--src/ruby/qps/client.rb81
-rw-r--r--src/ruby/qps/qps-common.rb8
-rw-r--r--src/ruby/qps/server.rb11
-rwxr-xr-xsrc/ruby/qps/worker.rb3
4 files changed, 92 insertions, 11 deletions
diff --git a/src/ruby/qps/client.rb b/src/ruby/qps/client.rb
index 0460168ad5..aa57b06c97 100644
--- a/src/ruby/qps/client.rb
+++ b/src/ruby/qps/client.rb
@@ -55,3 +55,84 @@ class Poisson
end
end
+class BenchmarkClient
+ def initialize(config)
+ if config.security_params
+ if config.security_params.use_test_ca
+ certs = load_test_certs
+ cred = GRPC::Core::Credentials.new(certs[0])
+ else
+ p 'Unsupported to use non-test CA (TBD)'
+ exit
+ end
+ if config.security_params.server_host_override
+ p 'Unsupported to use severt host override (TBD)'
+ exit
+ end
+ else
+ cred = :this_channel_is_insecure
+ end
+ @histres = config.histogram_params.resolution
+ @histmax = config.histogram_params.max_possible
+ @start_time = Time.now
+ @histogram = Histogram.new(@histres, @histmax)
+ @done = false
+ (0..config.client_channels-1).each do |i|
+ Thread.new {
+ stub = ''
+ req = Grpc::Testing::SimpleRequest.new(response_type: Grpc::Testing::PayloadType::COMPRESSABLE,
+ response_size: config.payload_config.simple_params.resp_size,
+ payload: Grpc::Testing::Payload.new(type: Grpc::Testing::PayloadType::COMPRESSABLE,
+ body: nulls(config.payload_config.simple_params.req_size)))
+ case config.load_params.load.to_s
+ when 'closed_loop'
+ waiter = nil
+ when 'poisson'
+ waiter = Poisson.new(config.load_params.poisson.offered_load / config.client_channels)
+ end
+ stub = Grpc::Testing::BenchmarkService::Stub.new(config.server_targets[i % config.server_targets.length], cred)
+ case config.rpc_type
+ when :UNARY
+ unary_ping_ponger(req,stub,config,waiter)
+ when :STREAMING
+ streaming_ping_ponger(req,stub,config,waiter)
+ end
+ }
+ end
+ end
+ def wait_to_issue(waiter)
+ if waiter
+ delay = waiter.advance-Time.now
+ sleep delay if delay > 0
+ end
+ end
+ def unary_ping_ponger(req, stub, config,waiter)
+ while !@done
+ wait_to_issue(waiter)
+ start = Time.now
+ resp = stub.unary_call(req)
+ @histogram.add((Time.now-start)*1e9)
+ end
+ end
+ def streaming_ping_ponger(req, stub, config, waiter)
+ end
+ def mark(reset)
+ lat = Grpc::Testing::HistogramData.new(
+ bucket: @histogram.contents,
+ min_seen: @histogram.minimum,
+ max_seen: @histogram.maximum,
+ sum: @histogram.sum,
+ sum_of_squares: @histogram.sum_of_squares,
+ count: @histogram.count
+ )
+ elapsed = Time.now-@start_time
+ if reset
+ @start_time = Time.now
+ @histogram = Histogram.new(@histres, @histmax)
+ end
+ Grpc::Testing::ClientStats.new(latencies: lat, time_elapsed: elapsed)
+ end
+ def shutdown
+ @done = true
+ end
+end
diff --git a/src/ruby/qps/qps-common.rb b/src/ruby/qps/qps-common.rb
index 82c24d42b9..4119d600b1 100644
--- a/src/ruby/qps/qps-common.rb
+++ b/src/ruby/qps/qps-common.rb
@@ -44,6 +44,14 @@ def nulls(l)
[].pack('x' * l).force_encoding('ascii-8bit')
end
+# load the test-only certificates
+def load_test_certs
+ this_dir = File.expand_path(File.dirname(__FILE__))
+ data_dir = File.join(File.dirname(this_dir), 'spec/testdata')
+ files = ['ca.pem', 'server1.key', 'server1.pem']
+ files.map { |f| File.open(File.join(data_dir, f)).read }
+end
+
# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item.
class EnumeratorQueue
extend Forwardable
diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb
index 44a80988d7..25383be7af 100644
--- a/src/ruby/qps/server.rb
+++ b/src/ruby/qps/server.rb
@@ -62,13 +62,6 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service
end
end
-def load_test_certs
- this_dir = File.expand_path(File.dirname(__FILE__))
- data_dir = File.join(File.dirname(this_dir), 'spec/testdata')
- files = ['ca.pem', 'server1.key', 'server1.pem']
- files.map { |f| File.open(File.join(data_dir, f)).read }
-end
-
class BenchmarkServer
def initialize(config, port)
if config.security_params
@@ -87,9 +80,7 @@ class BenchmarkServer
end
def mark(reset)
s = Grpc::Testing::ServerStats.new(time_elapsed: (Time.now-@start_time).to_f)
- if reset
- @start_time = Time.now
- end
+ @start_time = Time.now if reset
s
end
def get_port
diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb
index 208211c410..744d19751d 100755
--- a/src/ruby/qps/worker.rb
+++ b/src/ruby/qps/worker.rb
@@ -73,7 +73,7 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
Thread.new {
client = ''
reqs.each do |req|
- case req.argtype
+ case req.argtype.to_s
when 'setup'
client = BenchmarkClient.new(req.setup)
q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(false)))
@@ -82,6 +82,7 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
end
end
q.push(self)
+ client.shutdown
}
q.each_item
end