diff options
-rw-r--r-- | src/ruby/lib/grpc/generic/bidi_call.rb | 32 |
1 files changed, 17 insertions, 15 deletions
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 1c1b3b0db7..b813ab5b54 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -78,13 +78,11 @@ module GRPC # @param requests the Enumerable of requests to send # @return an Enumerator of requests to yield def run_on_client(requests, &blk) - enq_th = start_write_loop(requests) - loop_th = start_read_loop + @enq_th = start_write_loop(requests) + @loop_th = start_read_loop replies = each_queued_msg return replies if blk.nil? replies.each { |r| blk.call(r) } - enq_th.join - loop_th.join end # Begins orchestration of the Bidi stream for a server generating replies. @@ -100,10 +98,8 @@ module GRPC # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) - enq_th = start_write_loop(replys, is_client: false) - loop_th = start_read_loop - loop_th.join - enq_th.join + @enq_th = start_write_loop(replys, is_client: false) + @loop_th = start_read_loop end private @@ -122,10 +118,13 @@ module GRPC logger.debug("each_queued_msg: msg##{count}") count += 1 req = @readq.pop + logger.debug("each_queued_msg: req = #{req}") throw req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req end + @loop_th.join + @enq_th.join end # during bidi-streaming, read the requests to send from a separate thread @@ -136,20 +135,23 @@ module GRPC begin count = 0 requests.each do |req| + logger.debug("bidi-write_loop: #{count}") count += 1 payload = @marshal.call(req) @call.run_batch(@cq, write_tag, INFINITE_FUTURE, SEND_MESSAGE => payload) end if is_client - logger.debug("bidi-client: sent #{count} reqs, waiting to finish") - @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_CLOSE_FROM_CLIENT => nil, - RECV_STATUS_ON_CLIENT => nil) + logger.debug("bidi-write-loop: sent #{count}, waiting to finish") + batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil, + RECV_STATUS_ON_CLIENT => nil) + batch_result.check_status end rescue StandardError => e - logger.warn('bidi: write_loop failed') + logger.warn('bidi-write_loop: failed') logger.warn(e) + raise e end end end @@ -163,7 +165,7 @@ module GRPC # queue the initial read before beginning the loop loop do - logger.debug("waiting for read #{count}") + logger.debug("bidi-read_loop: #{count}") count += 1 # TODO: ensure metadata is read if available, currently it's not batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, @@ -171,7 +173,7 @@ module GRPC # handle the next message if batch_result.message.nil? @readq.push(END_OF_READS) - logger.debug('done reading!') + logger.debug('bidi-read-loop: done reading!') break end |