diff options
Diffstat (limited to 'src/ruby')
-rw-r--r-- | src/ruby/lib/grpc/generic/bidi_call.rb | 95 |
1 files changed, 36 insertions, 59 deletions
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index c2ac3c4daf..75ddff0bfd 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -61,7 +61,6 @@ module GRPC @call = call @marshal = marshal @op_notifier = nil # signals completion on clients - @readq = Queue.new @unmarshal = unmarshal @metadata_received = metadata_received @reads_complete = false @@ -81,8 +80,7 @@ module GRPC def run_on_client(requests, op_notifier, &blk) @op_notifier = op_notifier @enq_th = Thread.new { write_loop(requests) } - @loop_th = start_read_loop - each_queued_msg(&blk) + read_loop(&blk) end # Begins orchestration of the Bidi stream for a server generating replies. @@ -97,8 +95,7 @@ module GRPC # # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) - replys = gen_each_reply.call(each_queued_msg) - @loop_th = start_read_loop(is_client: false) + replys = gen_each_reply.call(read_loop(is_client: false)) write_loop(replys, is_client: false) end @@ -135,24 +132,6 @@ module GRPC batch_result end - # each_queued_msg yields each message on this instances readq - # - # - messages are added to the readq by #read_loop - # - iteration ends when the instance itself is added - def each_queued_msg - return enum_for(:each_queued_msg) unless block_given? - count = 0 - loop do - GRPC.logger.debug("each_queued_msg: waiting##{count}") - count += 1 - req = @readq.pop - GRPC.logger.debug("each_queued_msg: req = #{req}") - fail req if req.is_a? StandardError - break if req.equal?(END_OF_READS) - yield req - end - end - def write_loop(requests, is_client: true) GRPC.logger.debug('bidi-write-loop: starting') count = 0 @@ -190,47 +169,45 @@ module GRPC raise e end - # starts the read loop - def start_read_loop(is_client: true) - Thread.new do - GRPC.logger.debug('bidi-read-loop: starting') - begin - count = 0 - # queue the initial read before beginning the loop - loop do - GRPC.logger.debug("bidi-read-loop: #{count}") - count += 1 - batch_result = read_using_run_batch - - # handle the next message - if batch_result.message.nil? - GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") - - if is_client - batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) - @call.status = batch_result.status - batch_result.check_status - GRPC.logger.debug("bidi-read-loop: done status #{@call.status}") - end - - @readq.push(END_OF_READS) - GRPC.logger.debug('bidi-read-loop: done reading!') - break + # Provides an enumerator that yields results of remote reads + def read_loop(is_client: true) + return enum_for(:read_loop, + is_client: is_client) unless block_given? + GRPC.logger.debug('bidi-read-loop: starting') + begin + count = 0 + # queue the initial read before beginning the loop + loop do + GRPC.logger.debug("bidi-read-loop: #{count}") + count += 1 + batch_result = read_using_run_batch + + # handle the next message + if batch_result.message.nil? + GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") + + if is_client + batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) + @call.status = batch_result.status + batch_result.check_status + GRPC.logger.debug("bidi-read-loop: done status #{@call.status}") end - # push the latest read onto the queue and continue reading - res = @unmarshal.call(batch_result.message) - @readq.push(res) + GRPC.logger.debug('bidi-read-loop: done reading!') + break end - rescue StandardError => e - GRPC.logger.warn('bidi: read-loop failed') - GRPC.logger.warn(e) - @readq.push(e) # let each_queued_msg terminate with this error + + res = @unmarshal.call(batch_result.message) + yield res end - GRPC.logger.debug('bidi-read-loop: finished') - @reads_complete = true - finished + rescue StandardError => e + GRPC.logger.warn('bidi: read-loop failed') + GRPC.logger.warn(e) + raise e end + GRPC.logger.debug('bidi-read-loop: finished') + @reads_complete = true + finished end end end |