From 81aab14ba654969b90b57825e18356f3bb08cf48 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 12 Jul 2016 15:14:43 -0700 Subject: simplified client streaming compression request enumerable --- src/ruby/pb/test/client.rb | 61 +++++++++++++++------------------------------- 1 file changed, 20 insertions(+), 41 deletions(-) (limited to 'src/ruby/pb') diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb index 73494e6aab..33f40c2b9d 100755 --- a/src/ruby/pb/test/client.rb +++ b/src/ruby/pb/test/client.rb @@ -236,29 +236,18 @@ end # Wraps a Queue to yield items to it. # Intended to be used to wrap a call_op as well, and to adjust # the write flag of the call_op in between messages yielded to it. -class WriteFlagSettingEnumeratorQueue - extend Forwardable - def_delegators :@q, :push +class WriteFlagSettingStreamingInputEnumerable attr_accessor :call_op - def initialize(sentinel) - @q = Queue.new - @sentinel = sentinel - @received_notes = {} + def initialize(requests_and_write_flags) + @requests_and_write_flags = requests_and_write_flags end - def each_item - return enum_for(:each_item) unless block_given? - loop do - request_and_write_flag = @q.pop - break if request_and_write_flag.equal?(@sentinel) - fail request_and_write_flag if - request_and_write_flag.is_a? Exception - - @call_op.write_flag = request_and_write_flag[:write_flag] if - request_and_write_flag[:write_flag] - - yield request_and_write_flag[:request] + def each + @requests_and_write_flags.each do |request_and_flag| + @call_op.write_flag = request_and_flag[:write_flag] if + request_and_flag[:write_flag] + yield request_and_flag[:request] end end end @@ -415,35 +404,25 @@ class NamedTests metadata: request_uncompressed_args) end - # Create the deferred enumerator, start the streaming call with it, and - # set the enumerator's call_op to the call. - requests = WriteFlagSettingEnumeratorQueue.new(self) - call_op = @stub.streaming_input_call(requests.each_item, - return_op: true) - requests.call_op = call_op - - request_thread = Thread.new do - call_op.execute - end - - # send a compressed request - requests.push({ request: first_request }) - - # send an uncompressed request second_request = StreamingInputCallRequest.new( payload: Payload.new(type: :COMPRESSABLE, body: nulls(45_904)), expect_compressed: BoolValue.new(value: false) ) - requests.push( + # Create the requests messages and the corresponding write flags + # for each message + requests = WriteFlagSettingStreamingInputEnumerable.new([ + { request: first_request }, { request: second_request, - write_flag: GRPC::Core::WriteFlags::NO_COMPRESS - }) + write_flag: GRPC::Core::WriteFlags::NO_COMPRESS } + ]) - # Close the input stream - requests.push(self) - - resp = request_thread.value + # Create the call_op, pass it to the requests enumerable, and + # run the call + call_op = @stub.streaming_input_call(requests, + return_op: true) + requests.call_op = call_op + resp = call_op.execute wanted_aggregate_size = 73_086 -- cgit v1.2.3