diff options
author | Alexander Polcyn <apolcyn@google.com> | 2016-09-19 18:36:33 -0700 |
---|---|---|
committer | Alexander Polcyn <apolcyn@google.com> | 2016-09-20 11:00:36 -0700 |
commit | 689e89c2e6e7a118046f5bc14aea92e3655e3dfb (patch) | |
tree | 2261b00d057a517e80817073428760827131d51e /src/ruby/pb | |
parent | cc730a413768cfa545b1ad44d2c3eb4f027936c8 (diff) |
dont create extra thread on bidi call in benchmark, and change suggested
code for ruby bidi calls
Diffstat (limited to 'src/ruby/pb')
-rwxr-xr-x | src/ruby/pb/test/server.rb | 65 |
1 files changed, 26 insertions, 39 deletions
diff --git a/src/ruby/pb/test/server.rb b/src/ruby/pb/test/server.rb index 0808121661..3f1e0a1ccf 100755 --- a/src/ruby/pb/test/server.rb +++ b/src/ruby/pb/test/server.rb @@ -129,27 +129,36 @@ def nulls(l) [].pack('x' * l).force_encoding('ascii-8bit') end -# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. -class EnumeratorQueue - extend Forwardable - def_delegators :@q, :push - - def initialize(sentinel) - @q = Queue.new - @sentinel = sentinel - end +# A FullDuplexEnumerator passes requests to a block and yields generated responses +class FullDuplexEnumerator + include Grpc::Testing + include Grpc::Testing::PayloadType + def initialize(requests) + @requests = requests + end def each_item return enum_for(:each_item) unless block_given? - loop do - r = @q.pop - break if r.equal?(@sentinel) - fail r if r.is_a? Exception - yield r + GRPC.logger.info('interop-server: started receiving') + begin + cls = StreamingOutputCallResponse + @requests.each do |req| + req.response_parameters.each do |params| + resp_size = params.size + GRPC.logger.info("read a req, response size is #{resp_size}") + yield cls.new(payload: Payload.new(type: req.response_type, + body: nulls(resp_size))) + end + end + GRPC.logger.info('interop-server: finished receiving') + rescue StandardError => e + GRPC.logger.info('interop-server: failed') + GRPC.logger.warn(e) + fail e end end end - + # A runnable implementation of the schema-specified testing service, with each # service method implemented as required by the interop testing spec. class TestTarget < Grpc::Testing::TestService::Service @@ -182,31 +191,9 @@ class TestTarget < Grpc::Testing::TestService::Service def full_duplex_call(reqs) # reqs is a lazy Enumerator of the requests sent by the client. - q = EnumeratorQueue.new(self) - cls = StreamingOutputCallResponse - Thread.new do - begin - GRPC.logger.info('interop-server: started receiving') - reqs.each do |req| - req.response_parameters.each do |params| - resp_size = params.size - GRPC.logger.info("read a req, response size is #{resp_size}") - resp = cls.new(payload: Payload.new(type: req.response_type, - body: nulls(resp_size))) - q.push(resp) - end - end - GRPC.logger.info('interop-server: finished receiving') - q.push(self) - rescue StandardError => e - GRPC.logger.info('interop-server: failed') - GRPC.logger.warn(e) - q.push(e) # share the exception with the enumerator - end - end - q.each_item + FullDuplexEnumerator.new(reqs).each_item end - + def half_duplex_call(reqs) # TODO: update with unique behaviour of the half_duplex_call if that's # ever required by any of the tests. |