diff options
author | Nicolas Noble <nicolasnoble@users.noreply.github.com> | 2016-09-29 15:20:55 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-29 15:20:55 -0700 |
commit | 2587f20d08238abbbba35e7abcad41efa15cd020 (patch) | |
tree | 91da19911e83e70b4515e21d6f5c654fa5096c1a /src/ruby | |
parent | d24b822d73c8608a4838c8887d7ecddf39c324a0 (diff) | |
parent | beeeddd571b667278075799f566a79735eaa8137 (diff) |
Merge pull request #8238 from nicolasnoble/master-upmerge-from-deep-under
Upmerge from 1.0.x
Diffstat (limited to 'src/ruby')
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 3 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 1 | ||||
-rwxr-xr-x | src/ruby/pb/test/server.rb | 65 | ||||
-rw-r--r-- | src/ruby/qps/client.rb | 7 | ||||
-rw-r--r-- | src/ruby/qps/qps-common.rb | 16 | ||||
-rw-r--r-- | src/ruby/qps/server.rb | 14 | ||||
-rwxr-xr-x | src/ruby/qps/worker.rb | 8 |
7 files changed, 57 insertions, 57 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 23688dc924..dfc2644c46 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -240,11 +240,8 @@ module GRPC @call.metadata = batch_result.metadata @metadata_received = true end - GRPC.logger.debug("received req: #{batch_result}") unless batch_result.nil? || batch_result.message.nil? - GRPC.logger.debug("received req.to_s: #{batch_result.message}") res = @unmarshal.call(batch_result.message) - GRPC.logger.debug("received_req (unmarshalled): #{res.inspect}") return res end GRPC.logger.debug('found nil; the final response has been sent') diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index da0f6503db..7dbcb7d479 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -304,7 +304,6 @@ module GRPC # allow the metadata to be accessed from the call an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers - GRPC.logger.debug("call md is #{an_rpc.metadata}") connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) diff --git a/src/ruby/pb/test/server.rb b/src/ruby/pb/test/server.rb index 0808121661..3f1e0a1ccf 100755 --- a/src/ruby/pb/test/server.rb +++ b/src/ruby/pb/test/server.rb @@ -129,27 +129,36 @@ def nulls(l) [].pack('x' * l).force_encoding('ascii-8bit') end -# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. -class EnumeratorQueue - extend Forwardable - def_delegators :@q, :push - - def initialize(sentinel) - @q = Queue.new - @sentinel = sentinel - end +# A FullDuplexEnumerator passes requests to a block and yields generated responses +class FullDuplexEnumerator + include Grpc::Testing + include Grpc::Testing::PayloadType + def initialize(requests) + @requests = requests + end def each_item return enum_for(:each_item) unless block_given? - loop do - r = @q.pop - break if r.equal?(@sentinel) - fail r if r.is_a? Exception - yield r + GRPC.logger.info('interop-server: started receiving') + begin + cls = StreamingOutputCallResponse + @requests.each do |req| + req.response_parameters.each do |params| + resp_size = params.size + GRPC.logger.info("read a req, response size is #{resp_size}") + yield cls.new(payload: Payload.new(type: req.response_type, + body: nulls(resp_size))) + end + end + GRPC.logger.info('interop-server: finished receiving') + rescue StandardError => e + GRPC.logger.info('interop-server: failed') + GRPC.logger.warn(e) + fail e end end end - + # A runnable implementation of the schema-specified testing service, with each # service method implemented as required by the interop testing spec. class TestTarget < Grpc::Testing::TestService::Service @@ -182,31 +191,9 @@ class TestTarget < Grpc::Testing::TestService::Service def full_duplex_call(reqs) # reqs is a lazy Enumerator of the requests sent by the client. - q = EnumeratorQueue.new(self) - cls = StreamingOutputCallResponse - Thread.new do - begin - GRPC.logger.info('interop-server: started receiving') - reqs.each do |req| - req.response_parameters.each do |params| - resp_size = params.size - GRPC.logger.info("read a req, response size is #{resp_size}") - resp = cls.new(payload: Payload.new(type: req.response_type, - body: nulls(resp_size))) - q.push(resp) - end - end - GRPC.logger.info('interop-server: finished receiving') - q.push(self) - rescue StandardError => e - GRPC.logger.info('interop-server: failed') - GRPC.logger.warn(e) - q.push(e) # share the exception with the enumerator - end - end - q.each_item + FullDuplexEnumerator.new(reqs).each_item end - + def half_duplex_call(reqs) # TODO: update with unique behaviour of the half_duplex_call if that's # ever required by any of the tests. diff --git a/src/ruby/qps/client.rb b/src/ruby/qps/client.rb index 7ed648acef..8aed866da5 100644 --- a/src/ruby/qps/client.rb +++ b/src/ruby/qps/client.rb @@ -89,12 +89,14 @@ class BenchmarkClient payload: gtp.new(type: gtpt::COMPRESSABLE, body: nulls(simple_params.req_size))) + @child_threads = [] + (0..config.client_channels-1).each do |chan| gtbss = Grpc::Testing::BenchmarkService::Stub st = config.server_targets stub = gtbss.new(st[chan % st.length], cred, **opts) (0..config.outstanding_rpcs_per_channel-1).each do |r| - Thread.new { + @child_threads << Thread.new { case config.load_params.load.to_s when 'closed_loop' waiter = nil @@ -162,5 +164,8 @@ class BenchmarkClient end def shutdown @done = true + @child_threads.each do |thread| + thread.join + end end end diff --git a/src/ruby/qps/qps-common.rb b/src/ruby/qps/qps-common.rb index 4119d600b1..4714ccfdb7 100644 --- a/src/ruby/qps/qps-common.rb +++ b/src/ruby/qps/qps-common.rb @@ -52,6 +52,7 @@ def load_test_certs files.map { |f| File.open(File.join(data_dir, f)).read } end + # A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. class EnumeratorQueue extend Forwardable @@ -73,4 +74,19 @@ class EnumeratorQueue end end +# A PingPongEnumerator reads requests and responds one-by-one when enumerated +# via #each_item +class PingPongEnumerator + def initialize(reqs) + @reqs = reqs + end + def each_item + return enum_for(:each_item) unless block_given? + sr = Grpc::Testing::SimpleResponse + pl = Grpc::Testing::Payload + @reqs.each do |req| + yield sr.new(payload: pl.new(body: nulls(req.response_size))) + end + end +end diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb index cd98ee1fd9..d0c2073dd1 100644 --- a/src/ruby/qps/server.rb +++ b/src/ruby/qps/server.rb @@ -49,16 +49,7 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service sr.new(payload: pl.new(body: nulls(req.response_size))) end def streaming_call(reqs) - q = EnumeratorQueue.new(self) - Thread.new { - sr = Grpc::Testing::SimpleResponse - pl = Grpc::Testing::Payload - reqs.each do |req| - q.push(sr.new(payload: pl.new(body: nulls(req.response_size)))) - end - q.push(self) - } - q.each_item + PingPongEnumerator.new(reqs).each_item end end @@ -71,7 +62,8 @@ class BenchmarkServer else cred = :this_port_is_insecure end - @server = GRPC::RpcServer.new + # Make sure server can handle the large number of calls in benchmarks + @server = GRPC::RpcServer.new(pool_size: 100, max_waiting_requests: 100) @port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred) @server.handle(BenchmarkServiceImpl.new) @start_time = Time.now diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb index 12b8087ca0..61a0b723a3 100755 --- a/src/ruby/qps/worker.rb +++ b/src/ruby/qps/worker.rb @@ -64,8 +64,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service q.push(gtss.new(stats: bms.mark(req.mark.reset), cores: cpu_cores)) end end - q.push(self) bms.stop + q.push(self) } q.each_item end @@ -83,8 +83,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service client.mark(req.mark.reset))) end end - q.push(self) client.shutdown + q.push(self) } q.each_item end @@ -118,6 +118,10 @@ def main options['server_port'] = v end end.parse! + + # Configure any errors with client or server child threads to surface + Thread.abort_on_exception = true + s = GRPC::RpcServer.new s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s, :this_port_is_insecure) |