diff options
Diffstat (limited to 'src/ruby/lib/grpc/generic/bidi_call.rb')
-rw-r--r-- | src/ruby/lib/grpc/generic/bidi_call.rb | 64 |
1 files changed, 28 insertions, 36 deletions
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index f1b9f6b00d..489dd5162a 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -78,11 +78,9 @@ module GRPC # @param requests the Enumerable of requests to send # @return an Enumerator of requests to yield def run_on_client(requests, &blk) - @enq_th = start_write_loop(requests) + @enq_th = Thread.new { write_loop(requests) } @loop_th = start_read_loop - replies = each_queued_msg - return replies if blk.nil? - replies.each { |r| blk.call(r) } + each_queued_msg(&blk) end # Begins orchestration of the Bidi stream for a server generating replies. @@ -98,9 +96,8 @@ module GRPC # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) - @enq_th = start_write_loop(replys, is_client: false) @loop_th = start_read_loop - @enq_th.join if @enq_th.alive? + write_loop(replys, is_client: false) end private @@ -126,37 +123,32 @@ module GRPC end end - # during bidi-streaming, read the requests to send from a separate thread - # read so that read_loop does not block waiting for requests to read. - def start_write_loop(requests, is_client: true) - Thread.new do # TODO: run on a thread pool - GRPC.logger.debug('bidi-write-loop: starting') - begin - write_tag = Object.new - count = 0 - requests.each do |req| - GRPC.logger.debug("bidi-write-loop: #{count}") - count += 1 - payload = @marshal.call(req) - @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_MESSAGE => payload) - end - GRPC.logger.debug("bidi-write-loop: #{count} writes done") - if is_client - GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") - @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_CLOSE_FROM_CLIENT => nil) - batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - RECV_STATUS_ON_CLIENT => nil) - batch_result.check_status - end - rescue StandardError => e - GRPC.logger.warn('bidi-write-loop: failed') - GRPC.logger.warn(e) - raise e - end - GRPC.logger.debug('bidi-write-loop: finished') + def write_loop(requests, is_client: true) + GRPC.logger.debug('bidi-write-loop: starting') + write_tag = Object.new + count = 0 + requests.each do |req| + GRPC.logger.debug("bidi-write-loop: #{count}") + count += 1 + payload = @marshal.call(req) + @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_MESSAGE => payload) + end + GRPC.logger.debug("bidi-write-loop: #{count} writes done") + if is_client + GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") + batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil, + RECV_STATUS_ON_CLIENT => nil) + @call.status = batch_result.status + batch_result.check_status + GRPC.logger.debug("bidi-write-loop: done status #{@call.status}") end + GRPC.logger.debug('bidi-write-loop: finished') + rescue StandardError => e + GRPC.logger.warn('bidi-write-loop: failed') + GRPC.logger.warn(e) + raise e end # starts the read loop |