aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/pb/test/client.rb
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2016-07-12 12:30:30 -0700
committerGravatar Alexander Polcyn <apolcyn@google.com>2016-07-12 13:47:27 -0700
commitcdff92f02ea04ef238b6393a843e3605060efde1 (patch)
treeccee4942a90bf252a7573255f91b7fdaa4f0b1ad /src/ruby/pb/test/client.rb
parent7c55ab090a952b90b79324fd163feb2bc99a0724 (diff)
added ruby client compression interop tests
Diffstat (limited to 'src/ruby/pb/test/client.rb')
-rwxr-xr-xsrc/ruby/pb/test/client.rb174
1 files changed, 170 insertions, 4 deletions
diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb
index 066a7bb90f..73494e6aab 100755
--- a/src/ruby/pb/test/client.rb
+++ b/src/ruby/pb/test/client.rb
@@ -52,9 +52,9 @@ require_relative '../../lib/grpc'
require 'googleauth'
require 'google/protobuf'
-require_relative 'proto/empty'
-require_relative 'proto/messages'
-require_relative 'proto/test_services'
+require_relative '../src/proto/grpc/testing/empty'
+require_relative '../src/proto/grpc/testing/messages'
+require_relative '../src/proto/grpc/testing/test_services'
AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR
@@ -111,6 +111,18 @@ end
# creates a test stub that accesses host:port securely.
def create_stub(opts)
address = "#{opts.host}:#{opts.port}"
+
+ # Provide channel args that request compression by default
+ # for compression interop tests
+ if ['client_compressed_unary',
+ 'client_compressed_streaming'].include?(opts.test_case)
+ compression_options =
+ GRPC::Core::CompressionOptions.new(default_algorithm: :gzip)
+ compression_channel_args = compression_options.to_channel_arg_hash
+ else
+ compression_channel_args = {}
+ end
+
if opts.secure
creds = ssl_creds(opts.use_test_ca)
stub_opts = {
@@ -145,10 +157,15 @@ def create_stub(opts)
end
GRPC.logger.info("... connecting securely to #{address}")
+ stub_opts[:channel_args].merge!(compression_channel_args)
Grpc::Testing::TestService::Stub.new(address, creds, **stub_opts)
else
GRPC.logger.info("... connecting insecurely to #{address}")
- Grpc::Testing::TestService::Stub.new(address, :this_channel_is_insecure)
+ Grpc::Testing::TestService::Stub.new(
+ address,
+ :this_channel_is_insecure,
+ channel_args: compression_channel_args
+ )
end
end
@@ -216,10 +233,41 @@ class BlockingEnumerator
end
end
+# Wraps a Queue to yield items to it.
+# Intended to be used to wrap a call_op as well, and to adjust
+# the write flag of the call_op in between messages yielded to it.
+class WriteFlagSettingEnumeratorQueue
+ extend Forwardable
+ def_delegators :@q, :push
+ attr_accessor :call_op
+
+ def initialize(sentinel)
+ @q = Queue.new
+ @sentinel = sentinel
+ @received_notes = {}
+ end
+
+ def each_item
+ return enum_for(:each_item) unless block_given?
+ loop do
+ request_and_write_flag = @q.pop
+ break if request_and_write_flag.equal?(@sentinel)
+ fail request_and_write_flag if
+ request_and_write_flag.is_a? Exception
+
+ @call_op.write_flag = request_and_write_flag[:write_flag] if
+ request_and_write_flag[:write_flag]
+
+ yield request_and_write_flag[:request]
+ end
+ end
+end
+
# defines methods corresponding to each interop test case.
class NamedTests
include Grpc::Testing
include Grpc::Testing::PayloadType
+ include GRPC::Core::MetadataKeys
def initialize(stub, args)
@stub = stub
@@ -235,6 +283,48 @@ class NamedTests
perform_large_unary
end
+ def client_compressed_unary
+ # first request used also for the probe
+ req_size, wanted_response_size = 271_828, 314_159
+ expect_compressed = BoolValue.new(value: true)
+ payload = Payload.new(type: :COMPRESSABLE, body: nulls(req_size))
+ req = SimpleRequest.new(response_type: :COMPRESSABLE,
+ response_size: wanted_response_size,
+ payload: payload,
+ expect_compressed: expect_compressed)
+
+ # send a probe to see if CompressedResponse is supported on the server
+ send_probe_for_compressed_request_support do
+ request_uncompressed_args = {
+ COMPRESSION_REQUEST_ALGORITHM => 'identity'
+ }
+ @stub.unary_call(req, metadata: request_uncompressed_args)
+ end
+
+ # make a call with a compressed message
+ resp = @stub.unary_call(req)
+ assert('Expected second unary call with compression to work') do
+ resp.payload.body.length == wanted_response_size
+ end
+
+ # make a call with an uncompressed message
+ stub_options = {
+ COMPRESSION_REQUEST_ALGORITHM => 'identity'
+ }
+
+ req = SimpleRequest.new(
+ response_type: :COMPRESSABLE,
+ response_size: wanted_response_size,
+ payload: payload,
+ expect_compressed: BoolValue.new(value: false)
+ )
+
+ resp = @stub.unary_call(req, metadata: stub_options)
+ assert('Expected second unary call with compression to work') do
+ resp.payload.body.length == wanted_response_size
+ end
+ end
+
def service_account_creds
# ignore this test if the oauth options are not set
if @args.oauth_scope.nil?
@@ -309,6 +399,59 @@ class NamedTests
end
end
+ def client_compressed_streaming
+ # first request used also by the probe
+ first_request = StreamingInputCallRequest.new(
+ payload: Payload.new(type: :COMPRESSABLE, body: nulls(27_182)),
+ expect_compressed: BoolValue.new(value: true)
+ )
+
+ # send a probe to see if CompressedResponse is supported on the server
+ send_probe_for_compressed_request_support do
+ request_uncompressed_args = {
+ COMPRESSION_REQUEST_ALGORITHM => 'identity'
+ }
+ @stub.streaming_input_call([first_request],
+ metadata: request_uncompressed_args)
+ end
+
+ # Create the deferred enumerator, start the streaming call with it, and
+ # set the enumerator's call_op to the call.
+ requests = WriteFlagSettingEnumeratorQueue.new(self)
+ call_op = @stub.streaming_input_call(requests.each_item,
+ return_op: true)
+ requests.call_op = call_op
+
+ request_thread = Thread.new do
+ call_op.execute
+ end
+
+ # send a compressed request
+ requests.push({ request: first_request })
+
+ # send an uncompressed request
+ second_request = StreamingInputCallRequest.new(
+ payload: Payload.new(type: :COMPRESSABLE, body: nulls(45_904)),
+ expect_compressed: BoolValue.new(value: false)
+ )
+
+ requests.push(
+ { request: second_request,
+ write_flag: GRPC::Core::WriteFlags::NO_COMPRESS
+ })
+
+ # Close the input stream
+ requests.push(self)
+
+ resp = request_thread.value
+
+ wanted_aggregate_size = 73_086
+
+ assert("#{__callee__}: aggregate payload size is incorrect") do
+ wanted_aggregate_size == resp.aggregated_payload_size
+ end
+ end
+
def server_streaming
msg_sizes = [31_415, 9, 2653, 58_979]
response_spec = msg_sizes.map { |s| ResponseParameters.new(size: s) }
@@ -415,6 +558,29 @@ class NamedTests
end
resp
end
+
+ # Send probing message for compressed request on the server, to see
+ # if it's implemented.
+ def send_probe_for_compressed_request_support(&send_probe)
+ bad_status_occured = false
+
+ begin
+ send_probe.call
+ rescue GRPC::BadStatus => e
+ if e.code == GRPC::Core::StatusCodes::INVALID_ARGUMENT
+ bad_status_occured = true
+ else
+ fail AssertionError, "Bad status received but code is #{e.code}"
+ end
+ rescue Exception => e
+ fail AssertionError, "Expected BadStatus. Received: #{e.inspect}"
+ end
+
+ assert('CompressedRequest probe failed') do
+ bad_status_occured
+ end
+ end
+
end
# Args is used to hold the command line info.