aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar kpayson64 <kpayson@google.com>2016-08-15 11:38:17 -0700
committerGravatar GitHub <noreply@github.com>2016-08-15 11:38:17 -0700
commite4947befe26c8f33ab05dace34c2988ac394de59 (patch)
treeb889581a4d6e1a54fab636182479fd44f3113480
parent37f6ffa125e6994be0fc7fec4a0918697cb1cf8d (diff)
parentdc3d561f4a470490d75a3018683f4a67705f1250 (diff)
Merge pull request #7669 from apolcyn/reduce_bidi_threads_ga
remove dedicated thread for ruby bidi read loop
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb95
1 files changed, 36 insertions, 59 deletions
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index c2ac3c4daf..75ddff0bfd 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -61,7 +61,6 @@ module GRPC
@call = call
@marshal = marshal
@op_notifier = nil # signals completion on clients
- @readq = Queue.new
@unmarshal = unmarshal
@metadata_received = metadata_received
@reads_complete = false
@@ -81,8 +80,7 @@ module GRPC
def run_on_client(requests, op_notifier, &blk)
@op_notifier = op_notifier
@enq_th = Thread.new { write_loop(requests) }
- @loop_th = start_read_loop
- each_queued_msg(&blk)
+ read_loop(&blk)
end
# Begins orchestration of the Bidi stream for a server generating replies.
@@ -97,8 +95,7 @@ module GRPC
#
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
- replys = gen_each_reply.call(each_queued_msg)
- @loop_th = start_read_loop(is_client: false)
+ replys = gen_each_reply.call(read_loop(is_client: false))
write_loop(replys, is_client: false)
end
@@ -135,24 +132,6 @@ module GRPC
batch_result
end
- # each_queued_msg yields each message on this instances readq
- #
- # - messages are added to the readq by #read_loop
- # - iteration ends when the instance itself is added
- def each_queued_msg
- return enum_for(:each_queued_msg) unless block_given?
- count = 0
- loop do
- GRPC.logger.debug("each_queued_msg: waiting##{count}")
- count += 1
- req = @readq.pop
- GRPC.logger.debug("each_queued_msg: req = #{req}")
- fail req if req.is_a? StandardError
- break if req.equal?(END_OF_READS)
- yield req
- end
- end
-
def write_loop(requests, is_client: true)
GRPC.logger.debug('bidi-write-loop: starting')
count = 0
@@ -190,47 +169,45 @@ module GRPC
raise e
end
- # starts the read loop
- def start_read_loop(is_client: true)
- Thread.new do
- GRPC.logger.debug('bidi-read-loop: starting')
- begin
- count = 0
- # queue the initial read before beginning the loop
- loop do
- GRPC.logger.debug("bidi-read-loop: #{count}")
- count += 1
- batch_result = read_using_run_batch
-
- # handle the next message
- if batch_result.message.nil?
- GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
-
- if is_client
- batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
- @call.status = batch_result.status
- batch_result.check_status
- GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
- end
-
- @readq.push(END_OF_READS)
- GRPC.logger.debug('bidi-read-loop: done reading!')
- break
+ # Provides an enumerator that yields results of remote reads
+ def read_loop(is_client: true)
+ return enum_for(:read_loop,
+ is_client: is_client) unless block_given?
+ GRPC.logger.debug('bidi-read-loop: starting')
+ begin
+ count = 0
+ # queue the initial read before beginning the loop
+ loop do
+ GRPC.logger.debug("bidi-read-loop: #{count}")
+ count += 1
+ batch_result = read_using_run_batch
+
+ # handle the next message
+ if batch_result.message.nil?
+ GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
+
+ if is_client
+ batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
+ @call.status = batch_result.status
+ batch_result.check_status
+ GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
end
- # push the latest read onto the queue and continue reading
- res = @unmarshal.call(batch_result.message)
- @readq.push(res)
+ GRPC.logger.debug('bidi-read-loop: done reading!')
+ break
end
- rescue StandardError => e
- GRPC.logger.warn('bidi: read-loop failed')
- GRPC.logger.warn(e)
- @readq.push(e) # let each_queued_msg terminate with this error
+
+ res = @unmarshal.call(batch_result.message)
+ yield res
end
- GRPC.logger.debug('bidi-read-loop: finished')
- @reads_complete = true
- finished
+ rescue StandardError => e
+ GRPC.logger.warn('bidi: read-loop failed')
+ GRPC.logger.warn(e)
+ raise e
end
+ GRPC.logger.debug('bidi-read-loop: finished')
+ @reads_complete = true
+ finished
end
end
end