aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/lib/grpc/generic/bidi_call.rb
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/lib/grpc/generic/bidi_call.rb')
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb92
1 files changed, 32 insertions, 60 deletions
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index c66deaae60..4ca3004d6f 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -30,18 +30,12 @@
require 'forwardable'
require 'grpc/grpc'
-def assert_event_type(ev, want)
- fail OutOfTime if ev.nil?
- got = ev.type
- fail("Unexpected rpc event: got #{got}, want #{want}") unless got == want
-end
-
# GRPC contains the General RPC module.
module GRPC
# The BiDiCall class orchestrates exection of a BiDi stream on a client or
# server.
class BidiCall
- include Core::CompletionType
+ include Core::CallOps
include Core::StatusCodes
include Core::TimeConsts
@@ -63,8 +57,7 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
- # @param finished_tag [Object] the object used as the call's finish tag,
- def initialize(call, q, marshal, unmarshal, deadline, finished_tag)
+ def initialize(call, q, marshal, unmarshal, deadline)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
@@ -72,7 +65,6 @@ module GRPC
@call = call
@cq = q
@deadline = deadline
- @finished_tag = finished_tag
@marshal = marshal
@readq = Queue.new
@unmarshal = unmarshal
@@ -86,13 +78,11 @@ module GRPC
# @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield
def run_on_client(requests, &blk)
- enq_th = start_write_loop(requests)
- loop_th = start_read_loop
+ @enq_th = start_write_loop(requests)
+ @loop_th = start_read_loop
replies = each_queued_msg
return replies if blk.nil?
replies.each { |r| blk.call(r) }
- enq_th.join
- loop_th.join
end
# Begins orchestration of the Bidi stream for a server generating replies.
@@ -108,10 +98,8 @@ module GRPC
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
- enq_th = start_write_loop(replys, is_client: false)
- loop_th = start_read_loop
- loop_th.join
- enq_th.join
+ @enq_th = start_write_loop(replys, is_client: false)
+ @loop_th = start_read_loop
end
private
@@ -130,10 +118,12 @@ module GRPC
logger.debug("each_queued_msg: msg##{count}")
count += 1
req = @readq.pop
+ logger.debug("each_queued_msg: req = #{req}")
throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
end
+ @enq_th.join if @enq_th.alive?
end
# during bidi-streaming, read the requests to send from a separate thread
@@ -144,36 +134,23 @@ module GRPC
begin
count = 0
requests.each do |req|
+ logger.debug("bidi-write_loop: #{count}")
count += 1
payload = @marshal.call(req)
- @call.start_write(Core::ByteBuffer.new(payload), write_tag)
- ev = @cq.pluck(write_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, WRITE_ACCEPTED)
- ensure
- ev.close
- end
+ @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_MESSAGE => payload)
end
if is_client
- @call.writes_done(write_tag)
- ev = @cq.pluck(write_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISH_ACCEPTED)
- ensure
- ev.close
- end
- logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
- ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISHED)
- ensure
- ev.close
- end
- logger.debug('bidi-client: finished received')
+ logger.debug("bidi-write-loop: sent #{count}, waiting to finish")
+ batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_CLOSE_FROM_CLIENT => nil,
+ RECV_STATUS_ON_CLIENT => nil)
+ batch_result.check_status
end
rescue StandardError => e
- logger.warn('bidi: write_loop failed')
+ logger.warn('bidi-write_loop: failed')
logger.warn(e)
+ raise e
end
end
end
@@ -187,27 +164,22 @@ module GRPC
# queue the initial read before beginning the loop
loop do
- logger.debug("waiting for read #{count}")
+ logger.debug("bidi-read_loop: #{count}")
count += 1
- @call.start_read(read_tag)
- ev = @cq.pluck(read_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, READ)
-
- # handle the next event.
- if ev.result.nil?
- @readq.push(END_OF_READS)
- logger.debug('done reading!')
- break
- end
-
- # push the latest read onto the queue and continue reading
- logger.debug("received req: #{ev.result}")
- res = @unmarshal.call(ev.result.to_s)
- @readq.push(res)
- ensure
- ev.close
+ # TODO: ensure metadata is read if available, currently it's not
+ batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
+ RECV_MESSAGE => nil)
+ # handle the next message
+ if batch_result.message.nil?
+ @readq.push(END_OF_READS)
+ logger.debug('bidi-read-loop: done reading!')
+ break
end
+
+ # push the latest read onto the queue and continue reading
+ logger.debug("received req: #{batch_result.message}")
+ res = @unmarshal.call(batch_result.message)
+ @readq.push(res)
end
rescue StandardError => e