aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/qps
diff options
context:
space:
mode:
authorGravatar vjpai <vpai@google.com>2016-03-29 17:21:28 -0700
committerGravatar vjpai <vpai@google.com>2016-03-29 17:21:28 -0700
commit45be26ef0d5c466a30273e3702103d6b0cf3b4ab (patch)
tree6dc0ebd04195bf1f999af005588e87356dd97429 /src/ruby/qps
parent691387afc0a481abc4f2ab6a65d9b8328830479b (diff)
Working ruby server implementation
Diffstat (limited to 'src/ruby/qps')
-rw-r--r--src/ruby/qps/server.rb35
-rwxr-xr-xsrc/ruby/qps/worker.rb32
2 files changed, 52 insertions, 15 deletions
diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb
index edc06806aa..44a80988d7 100644
--- a/src/ruby/qps/server.rb
+++ b/src/ruby/qps/server.rb
@@ -40,6 +40,7 @@ require 'grpc'
require 'qps-common'
require 'src/proto/grpc/testing/messages'
require 'src/proto/grpc/testing/services_services'
+require 'src/proto/grpc/testing/stats'
class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service
def unary_call(req, _call)
@@ -61,8 +62,38 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service
end
end
-def benchmark_server(config)
-
+def load_test_certs
+ this_dir = File.expand_path(File.dirname(__FILE__))
+ data_dir = File.join(File.dirname(this_dir), 'spec/testdata')
+ files = ['ca.pem', 'server1.key', 'server1.pem']
+ files.map { |f| File.open(File.join(data_dir, f)).read }
end
+class BenchmarkServer
+ def initialize(config, port)
+ if config.security_params
+ certs = load_test_certs
+ cred = GRPC::Core::Credentials.new(certs[0])
+ else
+ cred = :this_port_is_insecure
+ end
+ @server = GRPC::RpcServer.new
+ @port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred)
+ @server.handle(BenchmarkServiceImpl.new)
+ @start_time = Time.now
+ Thread.new {
+ @server.run
+ }
+ end
+ def mark(reset)
+ s = Grpc::Testing::ServerStats.new(time_elapsed: (Time.now-@start_time).to_f)
+ if reset
+ @start_time = Time.now
+ end
+ s
+ end
+ def get_port
+ @port
+ end
+end
diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb
index ca60f51925..1ebf129a13 100755
--- a/src/ruby/qps/worker.rb
+++ b/src/ruby/qps/worker.rb
@@ -54,15 +54,16 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
q = EnumeratorQueue.new(self)
Thread.new {
reqs.each do |req|
- case req.argtype
- when "setup"
- server = BenchmarkServer.new(req.setup)
- q.push(Grpc::Testing::ServerStatus.new(stats: server.mark(false), port: server.get_port))
- when "mark"
- q.push(Grpc::Testing::ServerStatus.new(stats: server.mark(req.mark.reset), cores: cpu_cores))
+ case req.argtype.to_s
+ when 'setup'
+ @bms = BenchmarkServer.new(req.setup, @server_port)
+ q.push(Grpc::Testing::ServerStatus.new(stats: @bms.mark(false), port: @bms.get_port))
+ when 'mark'
+ q.push(Grpc::Testing::ServerStatus.new(stats: @bms.mark(req.mark.reset), cores: cpu_cores))
end
end
q.push(self)
+ @bms.stop
}
q.each_item
end
@@ -71,10 +72,10 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
Thread.new {
reqs.each do |req|
case req.argtype
- when "setup"
- server = BenchmarkClient.new(req.setup)
+ when 'setup'
+ client = BenchmarkClient.new(req.setup)
q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(false)))
- when "mark"
+ when 'mark'
q.push(Grpc::Testing::ClientStatus.new(stats: client.mark(req.mark.reset)))
end
end
@@ -92,25 +93,30 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
}
Grpc::Testing::Void.new
end
- def initialize(s)
+ def initialize(s, sp)
@server = s
+ @server_port = sp
end
end
def main
options = {
- 'driver_port' => 0
+ 'driver_port' => 0,
+ 'server_port' => 0
}
OptionParser.new do |opts|
- opts.banner = 'Usage: [--driver_port <port>]'
+ opts.banner = 'Usage: [--driver_port <port>] [--server_port <port>]'
opts.on('--driver_port PORT', '<port>') do |v|
options['driver_port'] = v
end
+ opts.on('--server_port PORT', '<port>') do |v|
+ options['server_port'] = v
+ end
end.parse!
s = GRPC::RpcServer.new
s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s,
:this_port_is_insecure)
- s.handle(WorkerServiceImpl.new(s))
+ s.handle(WorkerServiceImpl.new(s, options['server_port'].to_i))
s.run
end