aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/pb
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2016-07-12 15:14:43 -0700
committerGravatar Alexander Polcyn <apolcyn@google.com>2016-07-12 15:14:43 -0700
commit81aab14ba654969b90b57825e18356f3bb08cf48 (patch)
tree1685aae299a16a7a33830c47fd163e620389a49b /src/ruby/pb
parentcdff92f02ea04ef238b6393a843e3605060efde1 (diff)
simplified client streaming compression request enumerable
Diffstat (limited to 'src/ruby/pb')
-rwxr-xr-xsrc/ruby/pb/test/client.rb61
1 files changed, 20 insertions, 41 deletions
diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb
index 73494e6aab..33f40c2b9d 100755
--- a/src/ruby/pb/test/client.rb
+++ b/src/ruby/pb/test/client.rb
@@ -236,29 +236,18 @@ end
# Wraps a Queue to yield items to it.
# Intended to be used to wrap a call_op as well, and to adjust
# the write flag of the call_op in between messages yielded to it.
-class WriteFlagSettingEnumeratorQueue
- extend Forwardable
- def_delegators :@q, :push
+class WriteFlagSettingStreamingInputEnumerable
attr_accessor :call_op
- def initialize(sentinel)
- @q = Queue.new
- @sentinel = sentinel
- @received_notes = {}
+ def initialize(requests_and_write_flags)
+ @requests_and_write_flags = requests_and_write_flags
end
- def each_item
- return enum_for(:each_item) unless block_given?
- loop do
- request_and_write_flag = @q.pop
- break if request_and_write_flag.equal?(@sentinel)
- fail request_and_write_flag if
- request_and_write_flag.is_a? Exception
-
- @call_op.write_flag = request_and_write_flag[:write_flag] if
- request_and_write_flag[:write_flag]
-
- yield request_and_write_flag[:request]
+ def each
+ @requests_and_write_flags.each do |request_and_flag|
+ @call_op.write_flag = request_and_flag[:write_flag] if
+ request_and_flag[:write_flag]
+ yield request_and_flag[:request]
end
end
end
@@ -415,35 +404,25 @@ class NamedTests
metadata: request_uncompressed_args)
end
- # Create the deferred enumerator, start the streaming call with it, and
- # set the enumerator's call_op to the call.
- requests = WriteFlagSettingEnumeratorQueue.new(self)
- call_op = @stub.streaming_input_call(requests.each_item,
- return_op: true)
- requests.call_op = call_op
-
- request_thread = Thread.new do
- call_op.execute
- end
-
- # send a compressed request
- requests.push({ request: first_request })
-
- # send an uncompressed request
second_request = StreamingInputCallRequest.new(
payload: Payload.new(type: :COMPRESSABLE, body: nulls(45_904)),
expect_compressed: BoolValue.new(value: false)
)
- requests.push(
+ # Create the requests messages and the corresponding write flags
+ # for each message
+ requests = WriteFlagSettingStreamingInputEnumerable.new([
+ { request: first_request },
{ request: second_request,
- write_flag: GRPC::Core::WriteFlags::NO_COMPRESS
- })
+ write_flag: GRPC::Core::WriteFlags::NO_COMPRESS }
+ ])
- # Close the input stream
- requests.push(self)
-
- resp = request_thread.value
+ # Create the call_op, pass it to the requests enumerable, and
+ # run the call
+ call_op = @stub.streaming_input_call(requests,
+ return_op: true)
+ requests.call_op = call_op
+ resp = call_op.execute
wanted_aggregate_size = 73_086