diff options
Diffstat (limited to 'src/ruby/pb')
-rw-r--r-- | src/ruby/pb/src/proto/grpc/testing/messages.rb | 18 | ||||
-rwxr-xr-x | src/ruby/pb/test/client.rb | 184 | ||||
-rw-r--r-- | src/ruby/pb/test/proto/empty.rb | 15 | ||||
-rw-r--r-- | src/ruby/pb/test/proto/messages.rb | 80 | ||||
-rw-r--r-- | src/ruby/pb/test/proto/test.rb | 14 | ||||
-rw-r--r-- | src/ruby/pb/test/proto/test_services.rb | 64 | ||||
-rwxr-xr-x | src/ruby/pb/test/server.rb | 18 |
7 files changed, 191 insertions, 202 deletions
diff --git a/src/ruby/pb/src/proto/grpc/testing/messages.rb b/src/ruby/pb/src/proto/grpc/testing/messages.rb index 2bdfe0eade..e27ccd0dc0 100644 --- a/src/ruby/pb/src/proto/grpc/testing/messages.rb +++ b/src/ruby/pb/src/proto/grpc/testing/messages.rb @@ -4,6 +4,9 @@ require 'google/protobuf' Google::Protobuf::DescriptorPool.generated_pool.build do + add_message "grpc.testing.BoolValue" do + optional :value, :bool, 1 + end add_message "grpc.testing.Payload" do optional :type, :enum, 1, "grpc.testing.PayloadType" optional :body, :bytes, 2 @@ -18,8 +21,9 @@ Google::Protobuf::DescriptorPool.generated_pool.build do optional :payload, :message, 3, "grpc.testing.Payload" optional :fill_username, :bool, 4 optional :fill_oauth_scope, :bool, 5 - optional :response_compression, :enum, 6, "grpc.testing.CompressionType" + optional :response_compressed, :message, 6, "grpc.testing.BoolValue" optional :response_status, :message, 7, "grpc.testing.EchoStatus" + optional :expect_compressed, :message, 8, "grpc.testing.BoolValue" end add_message "grpc.testing.SimpleResponse" do optional :payload, :message, 1, "grpc.testing.Payload" @@ -28,6 +32,7 @@ Google::Protobuf::DescriptorPool.generated_pool.build do end add_message "grpc.testing.StreamingInputCallRequest" do optional :payload, :message, 1, "grpc.testing.Payload" + optional :expect_compressed, :message, 2, "grpc.testing.BoolValue" end add_message "grpc.testing.StreamingInputCallResponse" do optional :aggregated_payload_size, :int32, 1 @@ -35,12 +40,12 @@ Google::Protobuf::DescriptorPool.generated_pool.build do add_message "grpc.testing.ResponseParameters" do optional :size, :int32, 1 optional :interval_us, :int32, 2 + optional :compressed, :message, 3, "grpc.testing.BoolValue" end add_message "grpc.testing.StreamingOutputCallRequest" do optional :response_type, :enum, 1, "grpc.testing.PayloadType" repeated :response_parameters, :message, 2, "grpc.testing.ResponseParameters" optional :payload, :message, 3, "grpc.testing.Payload" - optional :response_compression, :enum, 6, "grpc.testing.CompressionType" optional :response_status, :message, 7, "grpc.testing.EchoStatus" end add_message "grpc.testing.StreamingOutputCallResponse" do @@ -55,18 +60,12 @@ Google::Protobuf::DescriptorPool.generated_pool.build do end add_enum "grpc.testing.PayloadType" do value :COMPRESSABLE, 0 - value :UNCOMPRESSABLE, 1 - value :RANDOM, 2 - end - add_enum "grpc.testing.CompressionType" do - value :NONE, 0 - value :GZIP, 1 - value :DEFLATE, 2 end end module Grpc module Testing + BoolValue = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.BoolValue").msgclass Payload = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Payload").msgclass EchoStatus = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.EchoStatus").msgclass SimpleRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleRequest").msgclass @@ -79,6 +78,5 @@ module Grpc ReconnectParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ReconnectParams").msgclass ReconnectInfo = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ReconnectInfo").msgclass PayloadType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.PayloadType").enummodule - CompressionType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.CompressionType").enummodule end end diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb index b6695482a2..4c6d441dcb 100755 --- a/src/ruby/pb/test/client.rb +++ b/src/ruby/pb/test/client.rb @@ -52,9 +52,9 @@ require_relative '../../lib/grpc' require 'googleauth' require 'google/protobuf' -require_relative 'proto/empty' -require_relative 'proto/messages' -require_relative 'proto/test_services' +require_relative '../src/proto/grpc/testing/empty' +require_relative '../src/proto/grpc/testing/messages' +require_relative '../src/proto/grpc/testing/test_services' AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR @@ -111,6 +111,18 @@ end # creates a test stub that accesses host:port securely. def create_stub(opts) address = "#{opts.host}:#{opts.port}" + + # Provide channel args that request compression by default + # for compression interop tests + if ['client_compressed_unary', + 'client_compressed_streaming'].include?(opts.test_case) + compression_options = + GRPC::Core::CompressionOptions.new(default_algorithm: :gzip) + compression_channel_args = compression_options.to_channel_arg_hash + else + compression_channel_args = {} + end + if opts.secure creds = ssl_creds(opts.use_test_ca) stub_opts = { @@ -145,10 +157,15 @@ def create_stub(opts) end GRPC.logger.info("... connecting securely to #{address}") + stub_opts[:channel_args].merge!(compression_channel_args) Grpc::Testing::TestService::Stub.new(address, creds, **stub_opts) else GRPC.logger.info("... connecting insecurely to #{address}") - Grpc::Testing::TestService::Stub.new(address, :this_channel_is_insecure) + Grpc::Testing::TestService::Stub.new( + address, + :this_channel_is_insecure, + channel_args: compression_channel_args + ) end end @@ -197,10 +214,47 @@ class PingPongPlayer end end +class BlockingEnumerator + include Grpc::Testing + include Grpc::Testing::PayloadType + + def initialize(req_size, sleep_time) + @req_size = req_size + @sleep_time = sleep_time + end + + def each_item + return enum_for(:each_item) unless block_given? + req_cls = StreamingOutputCallRequest + req = req_cls.new(payload: Payload.new(body: nulls(@req_size))) + yield req + # Sleep until after the deadline should have passed + sleep(@sleep_time) + end +end + +# Intended to be used to wrap a call_op, and to adjust +# the write flag of the call_op in between messages yielded to it. +class WriteFlagSettingStreamingInputEnumerable + attr_accessor :call_op + + def initialize(requests_and_write_flags) + @requests_and_write_flags = requests_and_write_flags + end + + def each + @requests_and_write_flags.each do |request_and_flag| + @call_op.write_flag = request_and_flag[:write_flag] + yield request_and_flag[:request] + end + end +end + # defines methods corresponding to each interop test case. class NamedTests include Grpc::Testing include Grpc::Testing::PayloadType + include GRPC::Core::MetadataKeys def initialize(stub, args) @stub = stub @@ -216,6 +270,48 @@ class NamedTests perform_large_unary end + def client_compressed_unary + # first request used also for the probe + req_size, wanted_response_size = 271_828, 314_159 + expect_compressed = BoolValue.new(value: true) + payload = Payload.new(type: :COMPRESSABLE, body: nulls(req_size)) + req = SimpleRequest.new(response_type: :COMPRESSABLE, + response_size: wanted_response_size, + payload: payload, + expect_compressed: expect_compressed) + + # send a probe to see if CompressedResponse is supported on the server + send_probe_for_compressed_request_support do + request_uncompressed_args = { + COMPRESSION_REQUEST_ALGORITHM => 'identity' + } + @stub.unary_call(req, metadata: request_uncompressed_args) + end + + # make a call with a compressed message + resp = @stub.unary_call(req) + assert('Expected second unary call with compression to work') do + resp.payload.body.length == wanted_response_size + end + + # make a call with an uncompressed message + stub_options = { + COMPRESSION_REQUEST_ALGORITHM => 'identity' + } + + req = SimpleRequest.new( + response_type: :COMPRESSABLE, + response_size: wanted_response_size, + payload: payload, + expect_compressed: BoolValue.new(value: false) + ) + + resp = @stub.unary_call(req, metadata: stub_options) + assert('Expected second unary call with compression to work') do + resp.payload.body.length == wanted_response_size + end + end + def service_account_creds # ignore this test if the oauth options are not set if @args.oauth_scope.nil? @@ -290,6 +386,50 @@ class NamedTests end end + def client_compressed_streaming + # first request used also by the probe + first_request = StreamingInputCallRequest.new( + payload: Payload.new(type: :COMPRESSABLE, body: nulls(27_182)), + expect_compressed: BoolValue.new(value: true) + ) + + # send a probe to see if CompressedResponse is supported on the server + send_probe_for_compressed_request_support do + request_uncompressed_args = { + COMPRESSION_REQUEST_ALGORITHM => 'identity' + } + @stub.streaming_input_call([first_request], + metadata: request_uncompressed_args) + end + + second_request = StreamingInputCallRequest.new( + payload: Payload.new(type: :COMPRESSABLE, body: nulls(45_904)), + expect_compressed: BoolValue.new(value: false) + ) + + # Create the requests messages and the corresponding write flags + # for each message + requests = WriteFlagSettingStreamingInputEnumerable.new([ + { request: first_request, + write_flag: 0 }, + { request: second_request, + write_flag: GRPC::Core::WriteFlags::NO_COMPRESS } + ]) + + # Create the call_op, pass it to the requests enumerable, and + # run the call + call_op = @stub.streaming_input_call(requests, + return_op: true) + requests.call_op = call_op + resp = call_op.execute + + wanted_aggregate_size = 73_086 + + assert("#{__callee__}: aggregate payload size is incorrect") do + wanted_aggregate_size == resp.aggregated_payload_size + end + end + def server_streaming msg_sizes = [31_415, 9, 2653, 58_979] response_spec = msg_sizes.map { |s| ResponseParameters.new(size: s) } @@ -315,11 +455,10 @@ class NamedTests end def timeout_on_sleeping_server - msg_sizes = [[27_182, 31_415]] - ppp = PingPongPlayer.new(msg_sizes) - deadline = GRPC::Core::TimeConsts::from_relative_time(0.001) - resps = @stub.full_duplex_call(ppp.each_item, deadline: deadline) - resps.each { |r| ppp.queue.push(r) } + enum = BlockingEnumerator.new(27_182, 2) + deadline = GRPC::Core::TimeConsts::from_relative_time(1) + resps = @stub.full_duplex_call(enum.each_item, deadline: deadline) + resps.each { } # wait to receive each request (or timeout) fail 'Should have raised GRPC::BadStatus(DEADLINE_EXCEEDED)' rescue GRPC::BadStatus => e assert("#{__callee__}: status was wrong") do @@ -351,7 +490,7 @@ class NamedTests op.execute fail 'Should have raised GRPC:Cancelled' rescue GRPC::Cancelled - assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled } + assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled? } end def cancel_after_first_response @@ -362,7 +501,7 @@ class NamedTests op.execute.each { |r| ppp.queue.push(r) } fail 'Should have raised GRPC:Cancelled' rescue GRPC::Cancelled - assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled } + assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled? } op.wait end @@ -397,6 +536,29 @@ class NamedTests end resp end + + # Send probing message for compressed request on the server, to see + # if it's implemented. + def send_probe_for_compressed_request_support(&send_probe) + bad_status_occured = false + + begin + send_probe.call + rescue GRPC::BadStatus => e + if e.code == GRPC::Core::StatusCodes::INVALID_ARGUMENT + bad_status_occured = true + else + fail AssertionError, "Bad status received but code is #{e.code}" + end + rescue Exception => e + fail AssertionError, "Expected BadStatus. Received: #{e.inspect}" + end + + assert('CompressedRequest probe failed') do + bad_status_occured + end + end + end # Args is used to hold the command line info. diff --git a/src/ruby/pb/test/proto/empty.rb b/src/ruby/pb/test/proto/empty.rb deleted file mode 100644 index 559adcc85e..0000000000 --- a/src/ruby/pb/test/proto/empty.rb +++ /dev/null @@ -1,15 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: test/proto/empty.proto - -require 'google/protobuf' - -Google::Protobuf::DescriptorPool.generated_pool.build do - add_message "grpc.testing.Empty" do - end -end - -module Grpc - module Testing - Empty = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Empty").msgclass - end -end diff --git a/src/ruby/pb/test/proto/messages.rb b/src/ruby/pb/test/proto/messages.rb deleted file mode 100644 index 5222c9824a..0000000000 --- a/src/ruby/pb/test/proto/messages.rb +++ /dev/null @@ -1,80 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: test/proto/messages.proto - -require 'google/protobuf' - -Google::Protobuf::DescriptorPool.generated_pool.build do - add_message "grpc.testing.Payload" do - optional :type, :enum, 1, "grpc.testing.PayloadType" - optional :body, :bytes, 2 - end - add_message "grpc.testing.EchoStatus" do - optional :code, :int32, 1 - optional :message, :string, 2 - end - add_message "grpc.testing.SimpleRequest" do - optional :response_type, :enum, 1, "grpc.testing.PayloadType" - optional :response_size, :int32, 2 - optional :payload, :message, 3, "grpc.testing.Payload" - optional :fill_username, :bool, 4 - optional :fill_oauth_scope, :bool, 5 - optional :response_compression, :enum, 6, "grpc.testing.CompressionType" - optional :response_status, :message, 7, "grpc.testing.EchoStatus" - end - add_message "grpc.testing.SimpleResponse" do - optional :payload, :message, 1, "grpc.testing.Payload" - optional :username, :string, 2 - optional :oauth_scope, :string, 3 - end - add_message "grpc.testing.StreamingInputCallRequest" do - optional :payload, :message, 1, "grpc.testing.Payload" - end - add_message "grpc.testing.StreamingInputCallResponse" do - optional :aggregated_payload_size, :int32, 1 - end - add_message "grpc.testing.ResponseParameters" do - optional :size, :int32, 1 - optional :interval_us, :int32, 2 - end - add_message "grpc.testing.StreamingOutputCallRequest" do - optional :response_type, :enum, 1, "grpc.testing.PayloadType" - repeated :response_parameters, :message, 2, "grpc.testing.ResponseParameters" - optional :payload, :message, 3, "grpc.testing.Payload" - optional :response_compression, :enum, 6, "grpc.testing.CompressionType" - optional :response_status, :message, 7, "grpc.testing.EchoStatus" - end - add_message "grpc.testing.StreamingOutputCallResponse" do - optional :payload, :message, 1, "grpc.testing.Payload" - end - add_message "grpc.testing.ReconnectInfo" do - optional :passed, :bool, 1 - repeated :backoff_ms, :int32, 2 - end - add_enum "grpc.testing.PayloadType" do - value :COMPRESSABLE, 0 - value :UNCOMPRESSABLE, 1 - value :RANDOM, 2 - end - add_enum "grpc.testing.CompressionType" do - value :NONE, 0 - value :GZIP, 1 - value :DEFLATE, 2 - end -end - -module Grpc - module Testing - Payload = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Payload").msgclass - EchoStatus = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.EchoStatus").msgclass - SimpleRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleRequest").msgclass - SimpleResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleResponse").msgclass - StreamingInputCallRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingInputCallRequest").msgclass - StreamingInputCallResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingInputCallResponse").msgclass - ResponseParameters = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ResponseParameters").msgclass - StreamingOutputCallRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingOutputCallRequest").msgclass - StreamingOutputCallResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingOutputCallResponse").msgclass - ReconnectInfo = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ReconnectInfo").msgclass - PayloadType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.PayloadType").enummodule - CompressionType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.CompressionType").enummodule - end -end diff --git a/src/ruby/pb/test/proto/test.rb b/src/ruby/pb/test/proto/test.rb deleted file mode 100644 index 100eb6505c..0000000000 --- a/src/ruby/pb/test/proto/test.rb +++ /dev/null @@ -1,14 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: test/proto/test.proto - -require 'google/protobuf' - -require 'test/proto/empty' -require 'test/proto/messages' -Google::Protobuf::DescriptorPool.generated_pool.build do -end - -module Grpc - module Testing - end -end diff --git a/src/ruby/pb/test/proto/test_services.rb b/src/ruby/pb/test/proto/test_services.rb deleted file mode 100644 index 9df9cc5860..0000000000 --- a/src/ruby/pb/test/proto/test_services.rb +++ /dev/null @@ -1,64 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# Source: test/proto/test.proto for package 'grpc.testing' - -require 'grpc' -require 'test/proto/test' - -module Grpc - module Testing - module TestService - - # TODO: add proto service documentation here - class Service - - include GRPC::GenericService - - self.marshal_class_method = :encode - self.unmarshal_class_method = :decode - self.service_name = 'grpc.testing.TestService' - - rpc :EmptyCall, Empty, Empty - rpc :UnaryCall, SimpleRequest, SimpleResponse - rpc :StreamingOutputCall, StreamingOutputCallRequest, stream(StreamingOutputCallResponse) - rpc :StreamingInputCall, stream(StreamingInputCallRequest), StreamingInputCallResponse - rpc :FullDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse) - rpc :HalfDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse) - end - - Stub = Service.rpc_stub_class - end - module UnimplementedService - - # TODO: add proto service documentation here - class Service - - include GRPC::GenericService - - self.marshal_class_method = :encode - self.unmarshal_class_method = :decode - self.service_name = 'grpc.testing.UnimplementedService' - - rpc :UnimplementedCall, Empty, Empty - end - - Stub = Service.rpc_stub_class - end - module ReconnectService - - # TODO: add proto service documentation here - class Service - - include GRPC::GenericService - - self.marshal_class_method = :encode - self.unmarshal_class_method = :decode - self.service_name = 'grpc.testing.ReconnectService' - - rpc :Start, Empty, Empty - rpc :Stop, Empty, ReconnectInfo - end - - Stub = Service.rpc_stub_class - end - end -end diff --git a/src/ruby/pb/test/server.rb b/src/ruby/pb/test/server.rb index 914c7cc79d..11ee3d465d 100755 --- a/src/ruby/pb/test/server.rb +++ b/src/ruby/pb/test/server.rb @@ -50,9 +50,9 @@ require 'optparse' require 'grpc' -require 'test/proto/empty' -require 'test/proto/messages' -require 'test/proto/test_services' +require_relative '../src/proto/grpc/testing/empty' +require_relative '../src/proto/grpc/testing/messages' +require_relative '../src/proto/grpc/testing/test_services' # DebugIsTruncated extends the default Logger to truncate debug messages class DebugIsTruncated < Logger @@ -188,11 +188,13 @@ class TestTarget < Grpc::Testing::TestService::Service begin GRPC.logger.info('interop-server: started receiving') reqs.each do |req| - resp_size = req.response_parameters[0].size - GRPC.logger.info("read a req, response size is #{resp_size}") - resp = cls.new(payload: Payload.new(type: req.response_type, - body: nulls(resp_size))) - q.push(resp) + req.response_parameters.each do |params| + resp_size = params.size + GRPC.logger.info("read a req, response size is #{resp_size}") + resp = cls.new(payload: Payload.new(type: req.response_type, + body: nulls(resp_size))) + q.push(resp) + end end GRPC.logger.info('interop-server: finished receiving') q.push(self) |