aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby')
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb3
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb1
-rwxr-xr-xsrc/ruby/pb/test/server.rb65
-rw-r--r--src/ruby/qps/client.rb7
-rw-r--r--src/ruby/qps/qps-common.rb16
-rw-r--r--src/ruby/qps/server.rb14
-rwxr-xr-xsrc/ruby/qps/worker.rb8
7 files changed, 57 insertions, 57 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 23688dc924..dfc2644c46 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -240,11 +240,8 @@ module GRPC
@call.metadata = batch_result.metadata
@metadata_received = true
end
- GRPC.logger.debug("received req: #{batch_result}")
unless batch_result.nil? || batch_result.message.nil?
- GRPC.logger.debug("received req.to_s: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
- GRPC.logger.debug("received_req (unmarshalled): #{res.inspect}")
return res
end
GRPC.logger.debug('found nil; the final response has been sent')
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index da0f6503db..7dbcb7d479 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -304,7 +304,6 @@ module GRPC
# allow the metadata to be accessed from the call
an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers
- GRPC.logger.debug("call md is #{an_rpc.metadata}")
connect_md = nil
unless @connect_md_proc.nil?
connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
diff --git a/src/ruby/pb/test/server.rb b/src/ruby/pb/test/server.rb
index 0808121661..3f1e0a1ccf 100755
--- a/src/ruby/pb/test/server.rb
+++ b/src/ruby/pb/test/server.rb
@@ -129,27 +129,36 @@ def nulls(l)
[].pack('x' * l).force_encoding('ascii-8bit')
end
-# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item.
-class EnumeratorQueue
- extend Forwardable
- def_delegators :@q, :push
-
- def initialize(sentinel)
- @q = Queue.new
- @sentinel = sentinel
- end
+# A FullDuplexEnumerator passes requests to a block and yields generated responses
+class FullDuplexEnumerator
+ include Grpc::Testing
+ include Grpc::Testing::PayloadType
+ def initialize(requests)
+ @requests = requests
+ end
def each_item
return enum_for(:each_item) unless block_given?
- loop do
- r = @q.pop
- break if r.equal?(@sentinel)
- fail r if r.is_a? Exception
- yield r
+ GRPC.logger.info('interop-server: started receiving')
+ begin
+ cls = StreamingOutputCallResponse
+ @requests.each do |req|
+ req.response_parameters.each do |params|
+ resp_size = params.size
+ GRPC.logger.info("read a req, response size is #{resp_size}")
+ yield cls.new(payload: Payload.new(type: req.response_type,
+ body: nulls(resp_size)))
+ end
+ end
+ GRPC.logger.info('interop-server: finished receiving')
+ rescue StandardError => e
+ GRPC.logger.info('interop-server: failed')
+ GRPC.logger.warn(e)
+ fail e
end
end
end
-
+
# A runnable implementation of the schema-specified testing service, with each
# service method implemented as required by the interop testing spec.
class TestTarget < Grpc::Testing::TestService::Service
@@ -182,31 +191,9 @@ class TestTarget < Grpc::Testing::TestService::Service
def full_duplex_call(reqs)
# reqs is a lazy Enumerator of the requests sent by the client.
- q = EnumeratorQueue.new(self)
- cls = StreamingOutputCallResponse
- Thread.new do
- begin
- GRPC.logger.info('interop-server: started receiving')
- reqs.each do |req|
- req.response_parameters.each do |params|
- resp_size = params.size
- GRPC.logger.info("read a req, response size is #{resp_size}")
- resp = cls.new(payload: Payload.new(type: req.response_type,
- body: nulls(resp_size)))
- q.push(resp)
- end
- end
- GRPC.logger.info('interop-server: finished receiving')
- q.push(self)
- rescue StandardError => e
- GRPC.logger.info('interop-server: failed')
- GRPC.logger.warn(e)
- q.push(e) # share the exception with the enumerator
- end
- end
- q.each_item
+ FullDuplexEnumerator.new(reqs).each_item
end
-
+
def half_duplex_call(reqs)
# TODO: update with unique behaviour of the half_duplex_call if that's
# ever required by any of the tests.
diff --git a/src/ruby/qps/client.rb b/src/ruby/qps/client.rb
index 7ed648acef..8aed866da5 100644
--- a/src/ruby/qps/client.rb
+++ b/src/ruby/qps/client.rb
@@ -89,12 +89,14 @@ class BenchmarkClient
payload: gtp.new(type: gtpt::COMPRESSABLE,
body: nulls(simple_params.req_size)))
+ @child_threads = []
+
(0..config.client_channels-1).each do |chan|
gtbss = Grpc::Testing::BenchmarkService::Stub
st = config.server_targets
stub = gtbss.new(st[chan % st.length], cred, **opts)
(0..config.outstanding_rpcs_per_channel-1).each do |r|
- Thread.new {
+ @child_threads << Thread.new {
case config.load_params.load.to_s
when 'closed_loop'
waiter = nil
@@ -162,5 +164,8 @@ class BenchmarkClient
end
def shutdown
@done = true
+ @child_threads.each do |thread|
+ thread.join
+ end
end
end
diff --git a/src/ruby/qps/qps-common.rb b/src/ruby/qps/qps-common.rb
index 4119d600b1..4714ccfdb7 100644
--- a/src/ruby/qps/qps-common.rb
+++ b/src/ruby/qps/qps-common.rb
@@ -52,6 +52,7 @@ def load_test_certs
files.map { |f| File.open(File.join(data_dir, f)).read }
end
+
# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item.
class EnumeratorQueue
extend Forwardable
@@ -73,4 +74,19 @@ class EnumeratorQueue
end
end
+# A PingPongEnumerator reads requests and responds one-by-one when enumerated
+# via #each_item
+class PingPongEnumerator
+ def initialize(reqs)
+ @reqs = reqs
+ end
+ def each_item
+ return enum_for(:each_item) unless block_given?
+ sr = Grpc::Testing::SimpleResponse
+ pl = Grpc::Testing::Payload
+ @reqs.each do |req|
+ yield sr.new(payload: pl.new(body: nulls(req.response_size)))
+ end
+ end
+end
diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb
index cd98ee1fd9..d0c2073dd1 100644
--- a/src/ruby/qps/server.rb
+++ b/src/ruby/qps/server.rb
@@ -49,16 +49,7 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service
sr.new(payload: pl.new(body: nulls(req.response_size)))
end
def streaming_call(reqs)
- q = EnumeratorQueue.new(self)
- Thread.new {
- sr = Grpc::Testing::SimpleResponse
- pl = Grpc::Testing::Payload
- reqs.each do |req|
- q.push(sr.new(payload: pl.new(body: nulls(req.response_size))))
- end
- q.push(self)
- }
- q.each_item
+ PingPongEnumerator.new(reqs).each_item
end
end
@@ -71,7 +62,8 @@ class BenchmarkServer
else
cred = :this_port_is_insecure
end
- @server = GRPC::RpcServer.new
+ # Make sure server can handle the large number of calls in benchmarks
+ @server = GRPC::RpcServer.new(pool_size: 100, max_waiting_requests: 100)
@port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred)
@server.handle(BenchmarkServiceImpl.new)
@start_time = Time.now
diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb
index 12b8087ca0..61a0b723a3 100755
--- a/src/ruby/qps/worker.rb
+++ b/src/ruby/qps/worker.rb
@@ -64,8 +64,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
q.push(gtss.new(stats: bms.mark(req.mark.reset), cores: cpu_cores))
end
end
- q.push(self)
bms.stop
+ q.push(self)
}
q.each_item
end
@@ -83,8 +83,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service
client.mark(req.mark.reset)))
end
end
- q.push(self)
client.shutdown
+ q.push(self)
}
q.each_item
end
@@ -118,6 +118,10 @@ def main
options['server_port'] = v
end
end.parse!
+
+ # Configure any errors with client or server child threads to surface
+ Thread.abort_on_exception = true
+
s = GRPC::RpcServer.new
s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s,
:this_port_is_insecure)