aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Tim Emiola <temiola@google.com>2015-04-15 15:26:42 -0700
committerGravatar Tim Emiola <temiola@google.com>2015-04-15 17:30:13 -0700
commita10a8432f3a9b05dc3dd2ff4d3cabf9da4781cd7 (patch)
tree37f73f15c4c103d905881a25a5ab4d844067c5d1
parent1b39916bbaaef36662ce97dd9d7996e8ef6092a8 (diff)
Corrects handling of status during bidi call orchestration.
- Ensures that invalid status are raised as exceptions - Ensures that exceptions during the write thread are bubbled up to the starting thread Also - improves the debug messages during the BiDi calls
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb32
1 files changed, 17 insertions, 15 deletions
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 1c1b3b0db7..b813ab5b54 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -78,13 +78,11 @@ module GRPC
# @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield
def run_on_client(requests, &blk)
- enq_th = start_write_loop(requests)
- loop_th = start_read_loop
+ @enq_th = start_write_loop(requests)
+ @loop_th = start_read_loop
replies = each_queued_msg
return replies if blk.nil?
replies.each { |r| blk.call(r) }
- enq_th.join
- loop_th.join
end
# Begins orchestration of the Bidi stream for a server generating replies.
@@ -100,10 +98,8 @@ module GRPC
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
- enq_th = start_write_loop(replys, is_client: false)
- loop_th = start_read_loop
- loop_th.join
- enq_th.join
+ @enq_th = start_write_loop(replys, is_client: false)
+ @loop_th = start_read_loop
end
private
@@ -122,10 +118,13 @@ module GRPC
logger.debug("each_queued_msg: msg##{count}")
count += 1
req = @readq.pop
+ logger.debug("each_queued_msg: req = #{req}")
throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
end
+ @loop_th.join
+ @enq_th.join
end
# during bidi-streaming, read the requests to send from a separate thread
@@ -136,20 +135,23 @@ module GRPC
begin
count = 0
requests.each do |req|
+ logger.debug("bidi-write_loop: #{count}")
count += 1
payload = @marshal.call(req)
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload)
end
if is_client
- logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
- @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
- SEND_CLOSE_FROM_CLIENT => nil,
- RECV_STATUS_ON_CLIENT => nil)
+ logger.debug("bidi-write-loop: sent #{count}, waiting to finish")
+ batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_CLOSE_FROM_CLIENT => nil,
+ RECV_STATUS_ON_CLIENT => nil)
+ batch_result.check_status
end
rescue StandardError => e
- logger.warn('bidi: write_loop failed')
+ logger.warn('bidi-write_loop: failed')
logger.warn(e)
+ raise e
end
end
end
@@ -163,7 +165,7 @@ module GRPC
# queue the initial read before beginning the loop
loop do
- logger.debug("waiting for read #{count}")
+ logger.debug("bidi-read_loop: #{count}")
count += 1
# TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
@@ -171,7 +173,7 @@ module GRPC
# handle the next message
if batch_result.message.nil?
@readq.push(END_OF_READS)
- logger.debug('done reading!')
+ logger.debug('bidi-read-loop: done reading!')
break
end