aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/pb/test/client.rb
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/pb/test/client.rb')
-rwxr-xr-xsrc/ruby/pb/test/client.rb184
1 files changed, 173 insertions, 11 deletions
diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb
index b6695482a2..4c6d441dcb 100755
--- a/src/ruby/pb/test/client.rb
+++ b/src/ruby/pb/test/client.rb
@@ -52,9 +52,9 @@ require_relative '../../lib/grpc'
require 'googleauth'
require 'google/protobuf'
-require_relative 'proto/empty'
-require_relative 'proto/messages'
-require_relative 'proto/test_services'
+require_relative '../src/proto/grpc/testing/empty'
+require_relative '../src/proto/grpc/testing/messages'
+require_relative '../src/proto/grpc/testing/test_services'
AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR
@@ -111,6 +111,18 @@ end
# creates a test stub that accesses host:port securely.
def create_stub(opts)
address = "#{opts.host}:#{opts.port}"
+
+ # Provide channel args that request compression by default
+ # for compression interop tests
+ if ['client_compressed_unary',
+ 'client_compressed_streaming'].include?(opts.test_case)
+ compression_options =
+ GRPC::Core::CompressionOptions.new(default_algorithm: :gzip)
+ compression_channel_args = compression_options.to_channel_arg_hash
+ else
+ compression_channel_args = {}
+ end
+
if opts.secure
creds = ssl_creds(opts.use_test_ca)
stub_opts = {
@@ -145,10 +157,15 @@ def create_stub(opts)
end
GRPC.logger.info("... connecting securely to #{address}")
+ stub_opts[:channel_args].merge!(compression_channel_args)
Grpc::Testing::TestService::Stub.new(address, creds, **stub_opts)
else
GRPC.logger.info("... connecting insecurely to #{address}")
- Grpc::Testing::TestService::Stub.new(address, :this_channel_is_insecure)
+ Grpc::Testing::TestService::Stub.new(
+ address,
+ :this_channel_is_insecure,
+ channel_args: compression_channel_args
+ )
end
end
@@ -197,10 +214,47 @@ class PingPongPlayer
end
end
+class BlockingEnumerator
+ include Grpc::Testing
+ include Grpc::Testing::PayloadType
+
+ def initialize(req_size, sleep_time)
+ @req_size = req_size
+ @sleep_time = sleep_time
+ end
+
+ def each_item
+ return enum_for(:each_item) unless block_given?
+ req_cls = StreamingOutputCallRequest
+ req = req_cls.new(payload: Payload.new(body: nulls(@req_size)))
+ yield req
+ # Sleep until after the deadline should have passed
+ sleep(@sleep_time)
+ end
+end
+
+# Intended to be used to wrap a call_op, and to adjust
+# the write flag of the call_op in between messages yielded to it.
+class WriteFlagSettingStreamingInputEnumerable
+ attr_accessor :call_op
+
+ def initialize(requests_and_write_flags)
+ @requests_and_write_flags = requests_and_write_flags
+ end
+
+ def each
+ @requests_and_write_flags.each do |request_and_flag|
+ @call_op.write_flag = request_and_flag[:write_flag]
+ yield request_and_flag[:request]
+ end
+ end
+end
+
# defines methods corresponding to each interop test case.
class NamedTests
include Grpc::Testing
include Grpc::Testing::PayloadType
+ include GRPC::Core::MetadataKeys
def initialize(stub, args)
@stub = stub
@@ -216,6 +270,48 @@ class NamedTests
perform_large_unary
end
+ def client_compressed_unary
+ # first request used also for the probe
+ req_size, wanted_response_size = 271_828, 314_159
+ expect_compressed = BoolValue.new(value: true)
+ payload = Payload.new(type: :COMPRESSABLE, body: nulls(req_size))
+ req = SimpleRequest.new(response_type: :COMPRESSABLE,
+ response_size: wanted_response_size,
+ payload: payload,
+ expect_compressed: expect_compressed)
+
+ # send a probe to see if CompressedResponse is supported on the server
+ send_probe_for_compressed_request_support do
+ request_uncompressed_args = {
+ COMPRESSION_REQUEST_ALGORITHM => 'identity'
+ }
+ @stub.unary_call(req, metadata: request_uncompressed_args)
+ end
+
+ # make a call with a compressed message
+ resp = @stub.unary_call(req)
+ assert('Expected second unary call with compression to work') do
+ resp.payload.body.length == wanted_response_size
+ end
+
+ # make a call with an uncompressed message
+ stub_options = {
+ COMPRESSION_REQUEST_ALGORITHM => 'identity'
+ }
+
+ req = SimpleRequest.new(
+ response_type: :COMPRESSABLE,
+ response_size: wanted_response_size,
+ payload: payload,
+ expect_compressed: BoolValue.new(value: false)
+ )
+
+ resp = @stub.unary_call(req, metadata: stub_options)
+ assert('Expected second unary call with compression to work') do
+ resp.payload.body.length == wanted_response_size
+ end
+ end
+
def service_account_creds
# ignore this test if the oauth options are not set
if @args.oauth_scope.nil?
@@ -290,6 +386,50 @@ class NamedTests
end
end
+ def client_compressed_streaming
+ # first request used also by the probe
+ first_request = StreamingInputCallRequest.new(
+ payload: Payload.new(type: :COMPRESSABLE, body: nulls(27_182)),
+ expect_compressed: BoolValue.new(value: true)
+ )
+
+ # send a probe to see if CompressedResponse is supported on the server
+ send_probe_for_compressed_request_support do
+ request_uncompressed_args = {
+ COMPRESSION_REQUEST_ALGORITHM => 'identity'
+ }
+ @stub.streaming_input_call([first_request],
+ metadata: request_uncompressed_args)
+ end
+
+ second_request = StreamingInputCallRequest.new(
+ payload: Payload.new(type: :COMPRESSABLE, body: nulls(45_904)),
+ expect_compressed: BoolValue.new(value: false)
+ )
+
+ # Create the requests messages and the corresponding write flags
+ # for each message
+ requests = WriteFlagSettingStreamingInputEnumerable.new([
+ { request: first_request,
+ write_flag: 0 },
+ { request: second_request,
+ write_flag: GRPC::Core::WriteFlags::NO_COMPRESS }
+ ])
+
+ # Create the call_op, pass it to the requests enumerable, and
+ # run the call
+ call_op = @stub.streaming_input_call(requests,
+ return_op: true)
+ requests.call_op = call_op
+ resp = call_op.execute
+
+ wanted_aggregate_size = 73_086
+
+ assert("#{__callee__}: aggregate payload size is incorrect") do
+ wanted_aggregate_size == resp.aggregated_payload_size
+ end
+ end
+
def server_streaming
msg_sizes = [31_415, 9, 2653, 58_979]
response_spec = msg_sizes.map { |s| ResponseParameters.new(size: s) }
@@ -315,11 +455,10 @@ class NamedTests
end
def timeout_on_sleeping_server
- msg_sizes = [[27_182, 31_415]]
- ppp = PingPongPlayer.new(msg_sizes)
- deadline = GRPC::Core::TimeConsts::from_relative_time(0.001)
- resps = @stub.full_duplex_call(ppp.each_item, deadline: deadline)
- resps.each { |r| ppp.queue.push(r) }
+ enum = BlockingEnumerator.new(27_182, 2)
+ deadline = GRPC::Core::TimeConsts::from_relative_time(1)
+ resps = @stub.full_duplex_call(enum.each_item, deadline: deadline)
+ resps.each { } # wait to receive each request (or timeout)
fail 'Should have raised GRPC::BadStatus(DEADLINE_EXCEEDED)'
rescue GRPC::BadStatus => e
assert("#{__callee__}: status was wrong") do
@@ -351,7 +490,7 @@ class NamedTests
op.execute
fail 'Should have raised GRPC:Cancelled'
rescue GRPC::Cancelled
- assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled }
+ assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled? }
end
def cancel_after_first_response
@@ -362,7 +501,7 @@ class NamedTests
op.execute.each { |r| ppp.queue.push(r) }
fail 'Should have raised GRPC:Cancelled'
rescue GRPC::Cancelled
- assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled }
+ assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled? }
op.wait
end
@@ -397,6 +536,29 @@ class NamedTests
end
resp
end
+
+ # Send probing message for compressed request on the server, to see
+ # if it's implemented.
+ def send_probe_for_compressed_request_support(&send_probe)
+ bad_status_occured = false
+
+ begin
+ send_probe.call
+ rescue GRPC::BadStatus => e
+ if e.code == GRPC::Core::StatusCodes::INVALID_ARGUMENT
+ bad_status_occured = true
+ else
+ fail AssertionError, "Bad status received but code is #{e.code}"
+ end
+ rescue Exception => e
+ fail AssertionError, "Expected BadStatus. Received: #{e.inspect}"
+ end
+
+ assert('CompressedRequest probe failed') do
+ bad_status_occured
+ end
+ end
+
end
# Args is used to hold the command line info.