aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/lib/grpc/generic/bidi_call.rb
diff options
context:
space:
mode:
authorGravatar Tim Emiola <temiola@google.com>2015-06-02 11:28:03 -0700
committerGravatar Tim Emiola <temiola@google.com>2015-06-02 11:28:03 -0700
commitd62d345b836f20ca0bc1c4fbe57072fee0f8375d (patch)
tree445d96c9731523071d79fb5b2fdc92ae1016c739 /src/ruby/lib/grpc/generic/bidi_call.rb
parent0bdfe8b147b8101080e95565d9472e402f4d98d4 (diff)
Corrects the cancel_after_first_response test
- stops attempting to verify by catching the CancelledError - instead the test examines the operation state after cancellation, which is equally valid and more stable. Also - simplifies bidi on the server
Diffstat (limited to 'src/ruby/lib/grpc/generic/bidi_call.rb')
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb64
1 files changed, 28 insertions, 36 deletions
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index f1b9f6b00d..489dd5162a 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -78,11 +78,9 @@ module GRPC
# @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield
def run_on_client(requests, &blk)
- @enq_th = start_write_loop(requests)
+ @enq_th = Thread.new { write_loop(requests) }
@loop_th = start_read_loop
- replies = each_queued_msg
- return replies if blk.nil?
- replies.each { |r| blk.call(r) }
+ each_queued_msg(&blk)
end
# Begins orchestration of the Bidi stream for a server generating replies.
@@ -98,9 +96,8 @@ module GRPC
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
- @enq_th = start_write_loop(replys, is_client: false)
@loop_th = start_read_loop
- @enq_th.join if @enq_th.alive?
+ write_loop(replys, is_client: false)
end
private
@@ -126,37 +123,32 @@ module GRPC
end
end
- # during bidi-streaming, read the requests to send from a separate thread
- # read so that read_loop does not block waiting for requests to read.
- def start_write_loop(requests, is_client: true)
- Thread.new do # TODO: run on a thread pool
- GRPC.logger.debug('bidi-write-loop: starting')
- begin
- write_tag = Object.new
- count = 0
- requests.each do |req|
- GRPC.logger.debug("bidi-write-loop: #{count}")
- count += 1
- payload = @marshal.call(req)
- @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
- SEND_MESSAGE => payload)
- end
- GRPC.logger.debug("bidi-write-loop: #{count} writes done")
- if is_client
- GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
- @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
- SEND_CLOSE_FROM_CLIENT => nil)
- batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
- RECV_STATUS_ON_CLIENT => nil)
- batch_result.check_status
- end
- rescue StandardError => e
- GRPC.logger.warn('bidi-write-loop: failed')
- GRPC.logger.warn(e)
- raise e
- end
- GRPC.logger.debug('bidi-write-loop: finished')
+ def write_loop(requests, is_client: true)
+ GRPC.logger.debug('bidi-write-loop: starting')
+ write_tag = Object.new
+ count = 0
+ requests.each do |req|
+ GRPC.logger.debug("bidi-write-loop: #{count}")
+ count += 1
+ payload = @marshal.call(req)
+ @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_MESSAGE => payload)
+ end
+ GRPC.logger.debug("bidi-write-loop: #{count} writes done")
+ if is_client
+ GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
+ batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_CLOSE_FROM_CLIENT => nil,
+ RECV_STATUS_ON_CLIENT => nil)
+ @call.status = batch_result.status
+ batch_result.check_status
+ GRPC.logger.debug("bidi-write-loop: done status #{@call.status}")
end
+ GRPC.logger.debug('bidi-write-loop: finished')
+ rescue StandardError => e
+ GRPC.logger.warn('bidi-write-loop: failed')
+ GRPC.logger.warn(e)
+ raise e
end
# starts the read loop