diff options
Diffstat (limited to 'src/ruby/pb/test/client.rb')
-rwxr-xr-x | src/ruby/pb/test/client.rb | 184 |
1 files changed, 173 insertions, 11 deletions
diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb index b6695482a2..1e3ae65630 100755 --- a/src/ruby/pb/test/client.rb +++ b/src/ruby/pb/test/client.rb @@ -52,9 +52,9 @@ require_relative '../../lib/grpc' require 'googleauth' require 'google/protobuf' -require_relative 'proto/empty' -require_relative 'proto/messages' -require_relative 'proto/test_services' +require_relative '../src/proto/grpc/testing/empty_pb' +require_relative '../src/proto/grpc/testing/messages_pb' +require_relative '../src/proto/grpc/testing/test_services_pb' AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR @@ -111,6 +111,18 @@ end # creates a test stub that accesses host:port securely. def create_stub(opts) address = "#{opts.host}:#{opts.port}" + + # Provide channel args that request compression by default + # for compression interop tests + if ['client_compressed_unary', + 'client_compressed_streaming'].include?(opts.test_case) + compression_options = + GRPC::Core::CompressionOptions.new(default_algorithm: :gzip) + compression_channel_args = compression_options.to_channel_arg_hash + else + compression_channel_args = {} + end + if opts.secure creds = ssl_creds(opts.use_test_ca) stub_opts = { @@ -145,10 +157,15 @@ def create_stub(opts) end GRPC.logger.info("... connecting securely to #{address}") + stub_opts[:channel_args].merge!(compression_channel_args) Grpc::Testing::TestService::Stub.new(address, creds, **stub_opts) else GRPC.logger.info("... connecting insecurely to #{address}") - Grpc::Testing::TestService::Stub.new(address, :this_channel_is_insecure) + Grpc::Testing::TestService::Stub.new( + address, + :this_channel_is_insecure, + channel_args: compression_channel_args + ) end end @@ -197,10 +214,47 @@ class PingPongPlayer end end +class BlockingEnumerator + include Grpc::Testing + include Grpc::Testing::PayloadType + + def initialize(req_size, sleep_time) + @req_size = req_size + @sleep_time = sleep_time + end + + def each_item + return enum_for(:each_item) unless block_given? + req_cls = StreamingOutputCallRequest + req = req_cls.new(payload: Payload.new(body: nulls(@req_size))) + yield req + # Sleep until after the deadline should have passed + sleep(@sleep_time) + end +end + +# Intended to be used to wrap a call_op, and to adjust +# the write flag of the call_op in between messages yielded to it. +class WriteFlagSettingStreamingInputEnumerable + attr_accessor :call_op + + def initialize(requests_and_write_flags) + @requests_and_write_flags = requests_and_write_flags + end + + def each + @requests_and_write_flags.each do |request_and_flag| + @call_op.write_flag = request_and_flag[:write_flag] + yield request_and_flag[:request] + end + end +end + # defines methods corresponding to each interop test case. class NamedTests include Grpc::Testing include Grpc::Testing::PayloadType + include GRPC::Core::MetadataKeys def initialize(stub, args) @stub = stub @@ -216,6 +270,48 @@ class NamedTests perform_large_unary end + def client_compressed_unary + # first request used also for the probe + req_size, wanted_response_size = 271_828, 314_159 + expect_compressed = BoolValue.new(value: true) + payload = Payload.new(type: :COMPRESSABLE, body: nulls(req_size)) + req = SimpleRequest.new(response_type: :COMPRESSABLE, + response_size: wanted_response_size, + payload: payload, + expect_compressed: expect_compressed) + + # send a probe to see if CompressedResponse is supported on the server + send_probe_for_compressed_request_support do + request_uncompressed_args = { + COMPRESSION_REQUEST_ALGORITHM => 'identity' + } + @stub.unary_call(req, metadata: request_uncompressed_args) + end + + # make a call with a compressed message + resp = @stub.unary_call(req) + assert('Expected second unary call with compression to work') do + resp.payload.body.length == wanted_response_size + end + + # make a call with an uncompressed message + stub_options = { + COMPRESSION_REQUEST_ALGORITHM => 'identity' + } + + req = SimpleRequest.new( + response_type: :COMPRESSABLE, + response_size: wanted_response_size, + payload: payload, + expect_compressed: BoolValue.new(value: false) + ) + + resp = @stub.unary_call(req, metadata: stub_options) + assert('Expected second unary call with compression to work') do + resp.payload.body.length == wanted_response_size + end + end + def service_account_creds # ignore this test if the oauth options are not set if @args.oauth_scope.nil? @@ -290,6 +386,50 @@ class NamedTests end end + def client_compressed_streaming + # first request used also by the probe + first_request = StreamingInputCallRequest.new( + payload: Payload.new(type: :COMPRESSABLE, body: nulls(27_182)), + expect_compressed: BoolValue.new(value: true) + ) + + # send a probe to see if CompressedResponse is supported on the server + send_probe_for_compressed_request_support do + request_uncompressed_args = { + COMPRESSION_REQUEST_ALGORITHM => 'identity' + } + @stub.streaming_input_call([first_request], + metadata: request_uncompressed_args) + end + + second_request = StreamingInputCallRequest.new( + payload: Payload.new(type: :COMPRESSABLE, body: nulls(45_904)), + expect_compressed: BoolValue.new(value: false) + ) + + # Create the requests messages and the corresponding write flags + # for each message + requests = WriteFlagSettingStreamingInputEnumerable.new([ + { request: first_request, + write_flag: 0 }, + { request: second_request, + write_flag: GRPC::Core::WriteFlags::NO_COMPRESS } + ]) + + # Create the call_op, pass it to the requests enumerable, and + # run the call + call_op = @stub.streaming_input_call(requests, + return_op: true) + requests.call_op = call_op + resp = call_op.execute + + wanted_aggregate_size = 73_086 + + assert("#{__callee__}: aggregate payload size is incorrect") do + wanted_aggregate_size == resp.aggregated_payload_size + end + end + def server_streaming msg_sizes = [31_415, 9, 2653, 58_979] response_spec = msg_sizes.map { |s| ResponseParameters.new(size: s) } @@ -315,11 +455,10 @@ class NamedTests end def timeout_on_sleeping_server - msg_sizes = [[27_182, 31_415]] - ppp = PingPongPlayer.new(msg_sizes) - deadline = GRPC::Core::TimeConsts::from_relative_time(0.001) - resps = @stub.full_duplex_call(ppp.each_item, deadline: deadline) - resps.each { |r| ppp.queue.push(r) } + enum = BlockingEnumerator.new(27_182, 2) + deadline = GRPC::Core::TimeConsts::from_relative_time(1) + resps = @stub.full_duplex_call(enum.each_item, deadline: deadline) + resps.each { } # wait to receive each request (or timeout) fail 'Should have raised GRPC::BadStatus(DEADLINE_EXCEEDED)' rescue GRPC::BadStatus => e assert("#{__callee__}: status was wrong") do @@ -351,7 +490,7 @@ class NamedTests op.execute fail 'Should have raised GRPC:Cancelled' rescue GRPC::Cancelled - assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled } + assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled? } end def cancel_after_first_response @@ -362,7 +501,7 @@ class NamedTests op.execute.each { |r| ppp.queue.push(r) } fail 'Should have raised GRPC:Cancelled' rescue GRPC::Cancelled - assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled } + assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled? } op.wait end @@ -397,6 +536,29 @@ class NamedTests end resp end + + # Send probing message for compressed request on the server, to see + # if it's implemented. + def send_probe_for_compressed_request_support(&send_probe) + bad_status_occured = false + + begin + send_probe.call + rescue GRPC::BadStatus => e + if e.code == GRPC::Core::StatusCodes::INVALID_ARGUMENT + bad_status_occured = true + else + fail AssertionError, "Bad status received but code is #{e.code}" + end + rescue Exception => e + fail AssertionError, "Expected BadStatus. Received: #{e.inspect}" + end + + assert('CompressedRequest probe failed') do + bad_status_occured + end + end + end # Args is used to hold the command line info. |