diff options
Diffstat (limited to 'src/ruby')
-rw-r--r-- | src/ruby/README.md | 2 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_call.c | 33 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_call.h | 3 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_call_credentials.c | 7 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_channel_credentials.c | 6 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 172 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/bidi_call.rb | 62 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_desc.rb | 4 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 1 | ||||
-rw-r--r-- | src/ruby/spec/client_auth_spec.rb | 137 | ||||
-rw-r--r-- | src/ruby/spec/client_server_spec.rb | 34 | ||||
-rw-r--r-- | src/ruby/spec/generic/active_call_spec.rb | 4 | ||||
-rw-r--r-- | src/ruby/spec/generic/client_stub_spec.rb | 404 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_desc_spec.rb | 10 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_server_spec.rb | 145 | ||||
-rw-r--r-- | src/ruby/spec/testdata/client.key | 16 | ||||
-rw-r--r-- | src/ruby/spec/testdata/client.pem | 14 |
17 files changed, 890 insertions, 164 deletions
diff --git a/src/ruby/README.md b/src/ruby/README.md index 266d6b2c16..5c7dae654a 100644 --- a/src/ruby/README.md +++ b/src/ruby/README.md @@ -68,5 +68,5 @@ Directory structure is the layout for [ruby extensions][] [ruby extensions]:http://guides.rubygems.org/gems-with-extensions/ [rubydoc]: http://www.rubydoc.info/gems/grpc -[grpc.io]: http://www.grpc.io/docs/quickstart/ruby.html +[grpc.io]: https://grpc.io/docs/quickstart/ruby.html [Debian jessie-backports]:http://backports.debian.org/Instructions/ diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 67425a68e6..b99954883f 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -24,6 +24,8 @@ #include <grpc/grpc.h> #include <grpc/impl/codegen/compression_types.h> #include <grpc/support/alloc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include "rb_byte_buffer.h" #include "rb_call_credentials.h" @@ -368,7 +370,7 @@ static VALUE grpc_rb_call_set_credentials(VALUE self, VALUE credentials) { to fill grpc_metadata_array. it's capacity should have been computed via a prior call to - grpc_rb_md_ary_fill_hash_cb + grpc_rb_md_ary_capacity_hash_cb */ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { grpc_metadata_array *md_ary = NULL; @@ -376,7 +378,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { long i; grpc_slice key_slice; grpc_slice value_slice; - char *tmp_str; + char *tmp_str = NULL; if (TYPE(key) == T_SYMBOL) { key_slice = grpc_slice_from_static_string(rb_id2name(SYM2ID(key))); @@ -386,6 +388,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { } else { rb_raise(rb_eTypeError, "grpc_rb_md_ary_fill_hash_cb: bad type for key parameter"); + return ST_STOP; } if (!grpc_header_key_is_legal(key_slice)) { @@ -413,6 +416,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { tmp_str); return ST_STOP; } + GPR_ASSERT(md_ary->count < md_ary->capacity); md_ary->metadata[md_ary->count].key = key_slice; md_ary->metadata[md_ary->count].value = value_slice; md_ary->count += 1; @@ -428,6 +432,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { tmp_str); return ST_STOP; } + GPR_ASSERT(md_ary->count < md_ary->capacity); md_ary->metadata[md_ary->count].key = key_slice; md_ary->metadata[md_ary->count].value = value_slice; md_ary->count += 1; @@ -435,7 +440,6 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { rb_raise(rb_eArgError, "Header values must be of type string or array"); return ST_STOP; } - return ST_CONTINUE; } @@ -458,6 +462,7 @@ static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val, } else { md_ary->capacity += 1; } + return ST_CONTINUE; } @@ -480,7 +485,7 @@ void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) { md_ary_obj = TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary); rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj); - md_ary->metadata = gpr_malloc(md_ary->capacity * sizeof(grpc_metadata)); + md_ary->metadata = gpr_zalloc(md_ary->capacity * sizeof(grpc_metadata)); rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj); } @@ -611,13 +616,25 @@ static void grpc_run_batch_stack_init(run_batch_stack *st, st->write_flag = write_flag; } +void grpc_rb_metadata_array_destroy_including_entries( + grpc_metadata_array *array) { + size_t i; + if (array->metadata) { + for (i = 0; i < array->count; i++) { + grpc_slice_unref(array->metadata[i].key); + grpc_slice_unref(array->metadata[i].value); + } + } + grpc_metadata_array_destroy(array); +} + /* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly * cleaned up */ static void grpc_run_batch_stack_cleanup(run_batch_stack *st) { size_t i = 0; - grpc_metadata_array_destroy(&st->send_metadata); - grpc_metadata_array_destroy(&st->send_trailing_metadata); + grpc_rb_metadata_array_destroy_including_entries(&st->send_metadata); + grpc_rb_metadata_array_destroy_including_entries(&st->send_trailing_metadata); grpc_metadata_array_destroy(&st->recv_metadata); grpc_metadata_array_destroy(&st->recv_trailing_metadata); @@ -658,8 +675,6 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { st->ops[st->op_num].flags = 0; switch (NUM2INT(this_op)) { case GRPC_OP_SEND_INITIAL_METADATA: - /* N.B. later there is no need to explicitly delete the metadata keys - * and values, they are references to data in ruby objects. */ grpc_rb_md_ary_convert(this_value, &st->send_metadata); st->ops[st->op_num].data.send_initial_metadata.count = st->send_metadata.count; @@ -675,8 +690,6 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { case GRPC_OP_SEND_CLOSE_FROM_CLIENT: break; case GRPC_OP_SEND_STATUS_FROM_SERVER: - /* N.B. later there is no need to explicitly delete the metadata keys - * and values, they are references to data in ruby objects. */ grpc_rb_op_update_status_from_server( &st->ops[st->op_num], &st->send_trailing_metadata, &st->send_status_details, this_value); diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h index be791cf972..bfe8035e0f 100644 --- a/src/ruby/ext/grpc/rb_call.h +++ b/src/ruby/ext/grpc/rb_call.h @@ -40,6 +40,9 @@ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary); */ void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary); +void grpc_rb_metadata_array_destroy_including_entries( + grpc_metadata_array *md_ary); + /* grpc_rb_eCallError is the ruby class of the exception thrown during call operations. */ extern VALUE grpc_rb_eCallError; diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c index 95e01a2c19..049a869bdc 100644 --- a/src/ruby/ext/grpc/rb_call_credentials.c +++ b/src/ruby/ext/grpc/rb_call_credentials.c @@ -108,7 +108,7 @@ static void grpc_rb_call_credentials_callback_with_gil(void *param) { error_details = StringValueCStr(details); params->callback(params->user_data, md_ary.metadata, md_ary.count, status, error_details); - grpc_metadata_array_destroy(&md_ary); + grpc_rb_metadata_array_destroy_including_entries(&md_ary); gpr_free(params); } @@ -239,6 +239,7 @@ static VALUE grpc_rb_call_credentials_compose(int argc, VALUE *argv, VALUE self) { grpc_call_credentials *creds; grpc_call_credentials *other; + grpc_call_credentials *prev = NULL; VALUE mark; if (argc == 0) { return self; @@ -249,6 +250,10 @@ static VALUE grpc_rb_call_credentials_compose(int argc, VALUE *argv, rb_ary_push(mark, argv[i]); other = grpc_rb_get_wrapped_call_credentials(argv[i]); creds = grpc_composite_call_credentials_create(creds, other, NULL); + if (prev != NULL) { + grpc_call_credentials_release(prev); + } + prev = creds; } return grpc_rb_wrap_call_credentials(creds, mark); } diff --git a/src/ruby/ext/grpc/rb_channel_credentials.c b/src/ruby/ext/grpc/rb_channel_credentials.c index 07935a13dc..83601ca694 100644 --- a/src/ruby/ext/grpc/rb_channel_credentials.c +++ b/src/ruby/ext/grpc/rb_channel_credentials.c @@ -184,6 +184,7 @@ static VALUE grpc_rb_channel_credentials_compose(int argc, VALUE *argv, VALUE self) { grpc_channel_credentials *creds; grpc_call_credentials *other; + grpc_channel_credentials *prev = NULL; VALUE mark; if (argc == 0) { return self; @@ -195,6 +196,11 @@ static VALUE grpc_rb_channel_credentials_compose(int argc, VALUE *argv, rb_ary_push(mark, argv[i]); other = grpc_rb_get_wrapped_call_credentials(argv[i]); creds = grpc_composite_channel_credentials_create(creds, other, NULL); + if (prev != NULL) { + grpc_channel_credentials_release(prev); + } + prev = creds; + if (creds == NULL) { rb_raise(rb_eRuntimeError, "Failed to compose channel and call credentials"); diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index cb407d236d..87b29c26ea 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -40,13 +40,13 @@ end module GRPC # The ActiveCall class provides simple methods for sending marshallable # data to a call - class ActiveCall + class ActiveCall # rubocop:disable Metrics/ClassLength include Core::TimeConsts include Core::CallOps extend Forwardable - attr_reader :deadline, :metadata_sent, :metadata_to_send + attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, - :peer, :peer_cert, :trailing_metadata + :trailing_metadata, :status # client_invoke begins a client invocation. # @@ -100,6 +100,18 @@ module GRPC fail(ArgumentError, 'Already sent md') if started && metadata_to_send @metadata_to_send = metadata_to_send || {} unless started @send_initial_md_mutex = Mutex.new + + @output_stream_done = false + @input_stream_done = false + @call_finished = false + @call_finished_mu = Mutex.new + + @client_call_executed = false + @client_call_executed_mu = Mutex.new + + # set the peer now so that the accessor can still function + # after the server closes the call + @peer = call.peer end # Sends the initial metadata that has yet to be sent. @@ -142,11 +154,9 @@ module GRPC Operation.new(self) end - # finished waits until a client call is completed. - # - # It blocks until the remote endpoint acknowledges by sending a status. - def finished + def receive_and_check_status batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) + set_input_stream_done attach_status_results_and_complete_call(batch_result) end @@ -155,8 +165,6 @@ module GRPC @call.trailing_metadata = recv_status_batch_result.status.metadata end @call.status = recv_status_batch_result.status - @call.close - op_is_done # The RECV_STATUS in run_batch always succeeds # Check the status for a bad status or failed run batch @@ -193,9 +201,19 @@ module GRPC } ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished @call.run_batch(ops) + set_output_stream_done + nil end + # Intended for use on server-side calls when a single request from + # the client is expected (i.e., unary and server-streaming RPC types). + def read_unary_request + req = remote_read + set_input_stream_done + req + end + def server_unary_response(req, trailing_metadata: {}, code: Core::StatusCodes::OK, details: 'OK') ops = {} @@ -211,6 +229,7 @@ module GRPC ops[RECV_CLOSE_ON_SERVER] = nil @call.run_batch(ops) + set_output_stream_done end # remote_read reads a response from the remote endpoint. @@ -241,6 +260,8 @@ module GRPC # each_remote_read passes each response to the given block or returns an # enumerator the responses if no block is given. + # Used to generate the request enumerable for + # server-side client-streaming RPC's. # # == Enumerator == # @@ -258,10 +279,14 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read return enum_for(:each_remote_read) unless block_given? - loop do - resp = remote_read - break if resp.nil? # the last response was received - yield resp + begin + loop do + resp = remote_read + break if resp.nil? # the last response was received + yield resp + end + ensure + set_input_stream_done end end @@ -287,13 +312,17 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read_then_finish return enum_for(:each_remote_read_then_finish) unless block_given? - loop do - resp = remote_read - if resp.nil? # the last response was received, but not finished yet - finished - break + begin + loop do + resp = remote_read + if resp.nil? # the last response was received + receive_and_check_status + break + end + yield resp end - yield resp + ensure + set_input_stream_done end end @@ -305,6 +334,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def request_response(req, metadata: {}) + raise_error_if_already_executed ops = { SEND_MESSAGE => @marshal.call(req), SEND_CLOSE_FROM_CLIENT => nil, @@ -319,7 +349,15 @@ module GRPC end @metadata_sent = true end - batch_result = @call.run_batch(ops) + + begin + batch_result = @call.run_batch(ops) + # no need to check for cancellation after a CallError because this + # batch contains a RECV_STATUS op + ensure + set_input_stream_done + set_output_stream_done + end @call.metadata = batch_result.metadata attach_status_results_and_complete_call(batch_result) @@ -339,10 +377,20 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def client_streamer(requests, metadata: {}) - # Metadata might have already been sent if this is an operation view - merge_metadata_and_send_if_not_already_sent(metadata) + raise_error_if_already_executed + begin + merge_metadata_and_send_if_not_already_sent(metadata) + requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } + rescue GRPC::Core::CallError => e + receive_and_check_status # check for Cancelled + raise e + rescue => e + set_input_stream_done + raise e + ensure + set_output_stream_done + end - requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } batch_result = @call.run_batch( SEND_CLOSE_FROM_CLIENT => nil, RECV_INITIAL_METADATA => nil, @@ -350,12 +398,11 @@ module GRPC RECV_STATUS_ON_CLIENT => nil ) + set_input_stream_done + @call.metadata = batch_result.metadata attach_status_results_and_complete_call(batch_result) get_message_from_batch_result(batch_result) - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e end # server_streamer sends one request to the GRPC server, which yields a @@ -373,6 +420,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator|nil] a response Enumerator def server_streamer(req, metadata: {}) + raise_error_if_already_executed ops = { SEND_MESSAGE => @marshal.call(req), SEND_CLOSE_FROM_CLIENT => nil @@ -384,13 +432,22 @@ module GRPC end @metadata_sent = true end - @call.run_batch(ops) + + begin + @call.run_batch(ops) + rescue GRPC::Core::CallError => e + receive_and_check_status # checks for Cancelled + raise e + rescue => e + set_input_stream_done + raise e + ensure + set_output_stream_done + end + replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e end # bidi_streamer sends a stream of requests to the GRPC server, and yields @@ -421,6 +478,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, metadata: {}, &blk) + raise_error_if_already_executed # Metadata might have already been sent if this is an operation view merge_metadata_and_send_if_not_already_sent(metadata) bd = BidiCall.new(@call, @@ -428,7 +486,10 @@ module GRPC @unmarshal, metadata_received: @metadata_received) - bd.run_on_client(requests, @op_notifier, &blk) + bd.run_on_client(requests, + proc { set_input_stream_done }, + proc { set_output_stream_done }, + &blk) end # run_server_bidi orchestrates a BiDi stream processing on a server. @@ -449,7 +510,7 @@ module GRPC metadata_received: @metadata_received, req_view: MultiReqView.new(self)) - bd.run_on_server(gen_each_reply) + bd.run_on_server(gen_each_reply, proc { set_input_stream_done }) end # Waits till an operation completes @@ -459,7 +520,8 @@ module GRPC @op_notifier.wait end - # Signals that an operation is done + # Signals that an operation is done. + # Only relevant on the client-side (this is a no-op on the server-side) def op_is_done return if @op_notifier.nil? @op_notifier.notify(self) @@ -484,8 +546,40 @@ module GRPC end end + def attach_peer_cert(peer_cert) + @peer_cert = peer_cert + end + private + # To be called once the "input stream" has been completelly + # read through (i.e, done reading from client or received status) + # note this is idempotent + def set_input_stream_done + @call_finished_mu.synchronize do + @input_stream_done = true + maybe_finish_and_close_call_locked + end + end + + # To be called once the "output stream" has been completelly + # sent through (i.e, done sending from client or sent status) + # note this is idempotent + def set_output_stream_done + @call_finished_mu.synchronize do + @output_stream_done = true + maybe_finish_and_close_call_locked + end + end + + def maybe_finish_and_close_call_locked + return unless @output_stream_done && @input_stream_done + return if @call_finished + @call_finished = true + op_is_done + @call.close + end + # Starts the call if not already started # @param metadata [Hash] metadata to be sent to the server. If a value is # a list, multiple metadata for its key are sent @@ -493,6 +587,15 @@ module GRPC merge_metadata_to_send(metadata) && send_initial_metadata end + def raise_error_if_already_executed + @client_call_executed_mu.synchronize do + if @client_call_executed + fail GRPC::Core::CallError, 'attempting to re-run a call' + end + @client_call_executed = true + end + end + def self.view_class(*visible_methods) Class.new do extend ::Forwardable @@ -516,8 +619,9 @@ module GRPC # MultiReqView limits access to an ActiveCall's methods for use in # server client_streamer handlers. - MultiReqView = view_class(:cancelled?, :deadline, :each_queued_msg, + MultiReqView = view_class(:cancelled?, :deadline, :each_remote_read, :metadata, :output_metadata, + :peer, :peer_cert, :send_initial_metadata, :metadata_to_send, :merge_metadata_to_send, diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index e54cf78969..9e125cd986 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -62,12 +62,19 @@ module GRPC # block that can be invoked with each response. # # @param requests the Enumerable of requests to send - # @param op_notifier a Notifier used to signal completion + # @param set_input_stream_done [Proc] called back when we're done + # reading the input stream + # @param set_input_stream_done [Proc] called back when we're done + # sending data on the output stream # @return an Enumerator of requests to yield - def run_on_client(requests, op_notifier, &blk) - @op_notifier = op_notifier - @enq_th = Thread.new { write_loop(requests) } - read_loop(&blk) + def run_on_client(requests, + set_input_stream_done, + set_output_stream_done, + &blk) + @enq_th = Thread.new do + write_loop(requests, set_output_stream_done: set_output_stream_done) + end + read_loop(set_input_stream_done, &blk) end # Begins orchestration of the Bidi stream for a server generating replies. @@ -81,12 +88,17 @@ module GRPC # produced by gen_each_reply could ignore the received_msgs # # @param gen_each_reply [Proc] generates the BiDi stream replies. - def run_on_server(gen_each_reply) + # @param set_input_steam_done [Proc] call back to call when + # the reads have been completely read through. + def run_on_server(gen_each_reply, set_input_stream_done) # Pass in the optional call object parameter if possible if gen_each_reply.arity == 1 - replys = gen_each_reply.call(read_loop(is_client: false)) + replys = gen_each_reply.call( + read_loop(set_input_stream_done, is_client: false)) elsif gen_each_reply.arity == 2 - replys = gen_each_reply.call(read_loop(is_client: false), @req_view) + replys = gen_each_reply.call( + read_loop(set_input_stream_done, is_client: false), + @req_view) else fail 'Illegal arity of reply generator' end @@ -99,22 +111,6 @@ module GRPC END_OF_READS = :end_of_reads END_OF_WRITES = :end_of_writes - # signals that bidi operation is complete - def notify_done - return unless @op_notifier - GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}") - @op_notifier.notify(self) - end - - # signals that a bidi operation is complete (read + write) - def finished - @done_mutex.synchronize do - return unless @reads_complete && @writes_complete && !@complete - @call.close - @complete = true - end - end - # performs a read using @call.run_batch, ensures metadata is set up def read_using_run_batch ops = { RECV_MESSAGE => nil } @@ -127,7 +123,8 @@ module GRPC batch_result end - def write_loop(requests, is_client: true) + # set_output_stream_done is relevant on client-side + def write_loop(requests, is_client: true, set_output_stream_done: nil) GRPC.logger.debug('bidi-write-loop: starting') count = 0 requests.each do |req| @@ -151,23 +148,20 @@ module GRPC GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) GRPC.logger.debug('bidi-write-loop: done') - notify_done - @writes_complete = true - finished end GRPC.logger.debug('bidi-write-loop: finished') rescue StandardError => e GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn(e) - notify_done - @writes_complete = true - finished raise e + ensure + set_output_stream_done.call if is_client end # Provides an enumerator that yields results of remote reads - def read_loop(is_client: true) + def read_loop(set_input_stream_done, is_client: true) return enum_for(:read_loop, + set_input_stream_done, is_client: is_client) unless block_given? GRPC.logger.debug('bidi-read-loop: starting') begin @@ -201,10 +195,10 @@ module GRPC GRPC.logger.warn('bidi: read-loop failed') GRPC.logger.warn(e) raise e + ensure + set_input_stream_done.call end GRPC.logger.debug('bidi-read-loop: finished') - @reads_complete = true - finished # Make sure that the write loop is done done before finishing the call. # Note that blocking is ok at this point because we've already received # a status diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index ce0097573a..89cf8ff6a0 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -48,7 +48,7 @@ module GRPC end def handle_request_response(active_call, mth) - req = active_call.remote_read + req = active_call.read_unary_request resp = mth.call(req, active_call.single_req_view) active_call.server_unary_response( resp, trailing_metadata: active_call.output_metadata) @@ -61,7 +61,7 @@ module GRPC end def handle_server_streamer(active_call, mth) - req = active_call.remote_read + req = active_call.read_unary_request replys = mth.call(req, active_call.single_req_view) replys.each { |r| active_call.remote_send(r) } send_status(active_call, OK, 'OK', active_call.output_metadata) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index ef2cc0ce91..33b3cea1fc 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -418,6 +418,7 @@ module GRPC metadata_received: true, started: false, metadata_to_send: connect_md) + c.attach_peer_cert(an_rpc.call.peer_cert) mth = an_rpc.method.to_sym [c, mth] end diff --git a/src/ruby/spec/client_auth_spec.rb b/src/ruby/spec/client_auth_spec.rb new file mode 100644 index 0000000000..79c9192aa5 --- /dev/null +++ b/src/ruby/spec/client_auth_spec.rb @@ -0,0 +1,137 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'grpc' + +def create_channel_creds + test_root = File.join(File.dirname(__FILE__), 'testdata') + files = ['ca.pem', 'client.key', 'client.pem'] + creds = files.map { |f| File.open(File.join(test_root, f)).read } + GRPC::Core::ChannelCredentials.new(creds[0], creds[1], creds[2]) +end + +def client_cert + test_root = File.join(File.dirname(__FILE__), 'testdata') + cert = File.open(File.join(test_root, 'client.pem')).read + fail unless cert.is_a?(String) + cert +end + +def create_server_creds + test_root = File.join(File.dirname(__FILE__), 'testdata') + p "test root: #{test_root}" + files = ['ca.pem', 'server1.key', 'server1.pem'] + creds = files.map { |f| File.open(File.join(test_root, f)).read } + GRPC::Core::ServerCredentials.new( + creds[0], + [{ private_key: creds[1], cert_chain: creds[2] }], + true) # force client auth +end + +# A test message +class EchoMsg + def self.marshal(_o) + '' + end + + def self.unmarshal(_o) + EchoMsg.new + end +end + +# a test service that checks the cert of its peer +class SslTestService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg + rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg) + rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg) + + def check_peer_cert(call) + error_msg = "want:\n#{client_cert}\n\ngot:\n#{call.peer_cert}" + fail(error_msg) unless call.peer_cert == client_cert + end + + def an_rpc(req, call) + check_peer_cert(call) + req + end + + def a_client_streaming_rpc(call) + check_peer_cert(call) + call.each_remote_read.each { |r| p r } + EchoMsg.new + end + + def a_server_streaming_rpc(_, call) + check_peer_cert(call) + [EchoMsg.new, EchoMsg.new] + end + + def a_bidi_rpc(requests, call) + check_peer_cert(call) + requests.each { |r| p r } + [EchoMsg.new, EchoMsg.new] + end +end + +SslTestServiceStub = SslTestService.rpc_stub_class + +describe 'client-server auth' do + RpcServer = GRPC::RpcServer + + before(:all) do + server_opts = { + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + port = @srv.add_http2_port('0.0.0.0:0', create_server_creds) + @srv.handle(SslTestService) + @srv_thd = Thread.new { @srv.run } + @srv.wait_till_running + + client_opts = { + channel_args: { + GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr' + } + } + @stub = SslTestServiceStub.new("localhost:#{port}", + create_channel_creds, + **client_opts) + end + + after(:all) do + expect(@srv.stopped?).to be(false) + @srv.stop + @srv_thd.join + end + + it 'client-server auth with unary RPCs' do + @stub.an_rpc(EchoMsg.new) + end + + it 'client-server auth with client streaming RPCs' do + @stub.a_client_streaming_rpc([EchoMsg.new, EchoMsg.new]) + end + + it 'client-server auth with server streaming RPCs' do + responses = @stub.a_server_streaming_rpc(EchoMsg.new) + responses.each { |r| p r } + end + + it 'client-server auth with bidi RPCs' do + responses = @stub.a_bidi_rpc([EchoMsg.new, EchoMsg.new]) + responses.each { |r| p r } + end +end diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index af3752c4ac..b48b4179ce 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -448,11 +448,18 @@ describe 'the secure http client/server' do it_behaves_like 'GRPC metadata delivery works OK' do end - it 'modifies metadata with CallCredentials' do - auth_proc = proc { { 'k1' => 'updated-v1' } } + def credentials_update_test(creds_update_md) + auth_proc = proc { creds_update_md } call_creds = GRPC::Core::CallCredentials.new(auth_proc) - md = { 'k2' => 'v2' } - expected_md = { 'k1' => 'updated-v1', 'k2' => 'v2' } + + initial_md_key = 'k2' + initial_md_val = 'v2' + initial_md = {} + initial_md[initial_md_key] = initial_md_val + expected_md = creds_update_md.clone + fail 'bad test param' unless expected_md[initial_md_key].nil? + expected_md[initial_md_key] = initial_md_val + recvd_rpc = nil rcv_thread = Thread.new do recvd_rpc = @server.request_call @@ -461,7 +468,7 @@ describe 'the secure http client/server' do call = new_client_call call.set_credentials! call_creds client_ops = { - CallOps::SEND_INITIAL_METADATA => md + CallOps::SEND_INITIAL_METADATA => initial_md } batch_result = call.run_batch(client_ops) expect(batch_result.send_metadata).to be true @@ -473,4 +480,21 @@ describe 'the secure http client/server' do replace_symbols = Hash[expected_md.each_pair.collect { |x, y| [x.to_s, y] }] expect(recvd_md).to eq(recvd_md.merge(replace_symbols)) end + + it 'modifies metadata with CallCredentials' do + credentials_update_test('k1' => 'updated-v1') + end + + it 'modifies large metadata with CallCredentials' do + val_array = %w( + '00000000000000000000000000000000000000000000000000000000000000', + '11111111111111111111111111111111111111111111111111111111111111', + ) + md = { + k3: val_array, + k4: '0000000000000000000000000000000000000000000000000000000000', + keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v1' + } + credentials_update_test(md) + end end diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 72e55ebcce..ec0c294174 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -473,7 +473,7 @@ describe GRPC::ActiveCall do server_call.remote_send('server_response') expect(client_call.remote_read).to eq('server_response') server_call.send_status(OK, 'status code is OK') - expect { client_call.finished }.to_not raise_error + expect { client_call.receive_and_check_status }.to_not raise_error end it 'finishes ok if the server sends an early status response' do @@ -490,7 +490,7 @@ describe GRPC::ActiveCall do expect do call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) end.to_not raise_error - expect { client_call.finished }.to_not raise_error + expect { client_call.receive_and_check_status }.to_not raise_error end it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 09b88c7cef..42cff4041b 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -36,6 +36,53 @@ include GRPC::Core::StatusCodes include GRPC::Core::TimeConsts include GRPC::Core::CallOps +# check that methods on a finished/closed call t crash +def check_op_view_of_finished_client_call(op_view, + expected_metadata, + expected_trailing_metadata) + # use read_response_stream to try to iterate through + # possible response stream + fail('need something to attempt reads') unless block_given? + expect do + resp = op_view.execute + yield resp + end.to raise_error(GRPC::Core::CallError) + + expect { op_view.start_call }.to raise_error(RuntimeError) + + sanity_check_values_of_accessors(op_view, + expected_metadata, + expected_trailing_metadata) + + expect do + op_view.wait + op_view.cancel + op_view.write_flag = 1 + end.to_not raise_error +end + +def sanity_check_values_of_accessors(op_view, + expected_metadata, + expected_trailing_metadata) + expected_status = Struct::Status.new + expected_status.code = 0 + expected_status.details = 'OK' + expected_status.metadata = expected_trailing_metadata + + expect(op_view.status).to eq(expected_status) + expect(op_view.metadata).to eq(expected_metadata) + expect(op_view.trailing_metadata).to eq(expected_trailing_metadata) + + expect(op_view.cancelled?).to be(false) + expect(op_view.write_flag).to be(nil) + + # The deadline attribute of a call can be either + # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive. + # TODO: fix so that the accessor always returns the same type. + expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) || + op_view.deadline.is_a?(Time)).to be(true) +end + describe 'ClientStub' do let(:noop) { proc { |x| x } } @@ -45,6 +92,7 @@ describe 'ClientStub' do @method = 'an_rpc_method' @pass = OK @fail = INTERNAL + @metadata = { k1: 'v1', k2: 'v2' } end after(:each) do @@ -107,7 +155,7 @@ describe 'ClientStub' do end end - describe '#request_response' do + describe '#request_response', request_response: true do before(:each) do @sent_msg, @resp = 'a_msg', 'a_reply' end @@ -122,16 +170,42 @@ describe 'ClientStub' do th.join end - it 'should send metadata to the server ok' do + def metadata_test(md) server_port = create_test_server host = "localhost:#{server_port}" th = run_request_response(@sent_msg, @resp, @pass, - k1: 'v1', k2: 'v2') + expected_metadata: md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + @metadata = md expect(get_response(stub)).to eq(@resp) th.join end + it 'should send metadata to the server ok' do + metadata_test(k1: 'v1', k2: 'v2') + end + + # these tests mostly try to exercise when md might be allocated + # instead of inlined + it 'should send metadata with multiple large md to the server ok' do + val_array = %w( + '00000000000000000000000000000000000000000000000000000000000000', + '11111111111111111111111111111111111111111111111111111111111111', + '22222222222222222222222222222222222222222222222222222222222222', + ) + md = { + k1: val_array, + k2: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', + k3: 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb', + k4: 'cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc', + keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v5', + 'k66666666666666666666666666666666666666666666666666666' => 'v6', + 'k77777777777777777777777777777777777777777777777777777' => 'v7', + 'k88888888888888888888888888888888888888888888888888888' => 'v8' + } + metadata_test(md) + end + it 'should send a request when configured using an override channel' do server_port = create_test_server alt_host = "localhost:#{server_port}" @@ -187,13 +261,24 @@ describe 'ClientStub' do # Kill the server thread so tests can complete th.kill end + + it 'should raise ArgumentError if metadata contains invalid values' do + @metadata.merge!(k3: 3) + server_port = create_test_server + host = "localhost:#{server_port}" + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + expect do + get_response(stub) + end.to raise_error(ArgumentError, + /Header values must be of type string or array/) + end end describe 'without a call operation' do def get_response(stub, credentials: nil) puts credentials.inspect stub.request_response(@method, @sent_msg, noop, noop, - metadata: { k1: 'v1', k2: 'v2' }, + metadata: @metadata, credentials: credentials) end @@ -201,40 +286,62 @@ describe 'ClientStub' do end describe 'via a call operation' do + after(:each) do + # make sure op.wait doesn't hang, even if there's a bad status + @op.wait + end def get_response(stub, run_start_call_first: false, credentials: nil) - op = stub.request_response(@method, @sent_msg, noop, noop, - return_op: true, - metadata: { k1: 'v1', k2: 'v2' }, - deadline: from_relative_time(2), - credentials: credentials) - expect(op).to be_a(GRPC::ActiveCall::Operation) - op.start_call if run_start_call_first - result = op.execute - op.wait # make sure wait doesn't hang + @op = stub.request_response(@method, @sent_msg, noop, noop, + return_op: true, + metadata: @metadata, + deadline: from_relative_time(2), + credentials: credentials) + expect(@op).to be_a(GRPC::ActiveCall::Operation) + @op.start_call if run_start_call_first + result = @op.execute result end it_behaves_like 'request response' - it 'sends metadata to the server ok when running start_call first' do + def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" - th = run_request_response(@sent_msg, @resp, @pass, - k1: 'v1', k2: 'v2') + + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_request_response( + @sent_msg, @resp, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - expect(get_response(stub)).to eq(@resp) + expect( + get_response(stub, + run_start_call_first: run_start_call_first)).to eq(@resp) th.join end + + it 'sends metadata to the server ok when running start_call first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } + end end end - describe '#client_streamer' do + describe '#client_streamer', client_streamer: true do before(:each) do Thread.abort_on_exception = true server_port = create_test_server host = "localhost:#{server_port}" @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - @metadata = { k1: 'v1', k2: 'v2' } @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @resp = 'a_reply' end @@ -247,7 +354,8 @@ describe 'ClientStub' do end it 'should send metadata to the server ok' do - th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) + th = run_client_streamer(@sent_msgs, @resp, @pass, + expected_metadata: @metadata) expect(get_response(@stub)).to eq(@resp) th.join end @@ -278,27 +386,50 @@ describe 'ClientStub' do end describe 'via a call operation' do + after(:each) do + # make sure op.wait doesn't hang, even if there's a bad status + @op.wait + end def get_response(stub, run_start_call_first: false) - op = stub.client_streamer(@method, @sent_msgs, noop, noop, - return_op: true, metadata: @metadata) - expect(op).to be_a(GRPC::ActiveCall::Operation) - op.start_call if run_start_call_first - result = op.execute - op.wait # make sure wait doesn't hang + @op = stub.client_streamer(@method, @sent_msgs, noop, noop, + return_op: true, metadata: @metadata) + expect(@op).to be_a(GRPC::ActiveCall::Operation) + @op.start_call if run_start_call_first + result = @op.execute result end it_behaves_like 'client streaming' - it 'sends metadata to the server ok when running start_call first' do - th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) - expect(get_response(@stub, run_start_call_first: true)).to eq(@resp) + def run_op_view_metadata_test(run_start_call_first) + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_client_streamer( + @sent_msgs, @resp, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) + expect( + get_response(@stub, + run_start_call_first: run_start_call_first)).to eq(@resp) th.join end + + it 'sends metadata to the server ok when running start_call first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } + end end end - describe '#server_streamer' do + describe '#server_streamer', server_streamer: true do before(:each) do @sent_msg = 'a_msg' @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @@ -328,18 +459,63 @@ describe 'ClientStub' do server_port = create_test_server host = "localhost:#{server_port}" th = run_server_streamer(@sent_msg, @replys, @fail, - k1: 'v1', k2: 'v2') + expected_metadata: { k1: 'v1', k2: 'v2' }) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) e = get_responses(stub) expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) th.join end + + it 'should raise ArgumentError if metadata contains invalid values' do + @metadata.merge!(k3: 3) + server_port = create_test_server + host = "localhost:#{server_port}" + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + expect do + get_responses(stub) + end.to raise_error(ArgumentError, + /Header values must be of type string or array/) + end + + def run_server_streamer_against_client_with_unmarshal_error( + expected_input, replys) + wakey_thread do |notifier| + c = expect_server_to_be_invoked(notifier) + expect(c.remote_read).to eq(expected_input) + begin + replys.each { |r| c.remote_send(r) } + rescue GRPC::Core::CallError + # An attempt to write to the client might fail. This is ok + # because the client call is expected to fail when + # unmarshalling the first response, and to cancel the call, + # and there is a race as for when the server-side call will + # start to fail. + p 'remote_send failed (allowed because call expected to cancel)' + ensure + c.send_status(OK, 'OK', true) + end + end + end + + it 'the call terminates when there is an unmarshalling error' do + server_port = create_test_server + host = "localhost:#{server_port}" + th = run_server_streamer_against_client_with_unmarshal_error( + @sent_msg, @replys) + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + + unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') } + expect do + get_responses(stub, unmarshal: unmarshal).collect { |r| r } + end.to raise_error(ArgumentError, 'test unmarshalling error') + th.join + end end describe 'without a call operation' do - def get_responses(stub) - e = stub.server_streamer(@method, @sent_msg, noop, noop, - metadata: { k1: 'v1', k2: 'v2' }) + def get_responses(stub, unmarshal: noop) + e = stub.server_streamer(@method, @sent_msg, noop, unmarshal, + metadata: @metadata) expect(e).to be_a(Enumerator) e end @@ -351,10 +527,10 @@ describe 'ClientStub' do after(:each) do @op.wait # make sure wait doesn't hang end - def get_responses(stub, run_start_call_first: false) - @op = stub.server_streamer(@method, @sent_msg, noop, noop, + def get_responses(stub, run_start_call_first: false, unmarshal: noop) + @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal, return_op: true, - metadata: { k1: 'v1', k2: 'v2' }) + metadata: @metadata) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first e = @op.execute @@ -364,20 +540,41 @@ describe 'ClientStub' do it_behaves_like 'server streaming' - it 'should send metadata to the server ok when start_call is run first' do + def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" - th = run_server_streamer(@sent_msg, @replys, @fail, - k1: 'v1', k2: 'v2') + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_server_streamer( + @sent_msg, @replys, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - e = get_responses(stub, run_start_call_first: true) - expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) + e = get_responses(stub, run_start_call_first: run_start_call_first) + expect(e.collect { |r| r }).to eq(@replys) th.join end + + it 'should send metadata to the server ok when start_call is run first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| + responses.each { |r| p r } + end + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| + responses.each { |r| p r } + end + end end end - describe '#bidi_streamer' do + describe '#bidi_streamer', bidi: true do before(:each) do @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @@ -386,7 +583,7 @@ describe 'ClientStub' do end shared_examples 'bidi streaming' do - it 'supports sending all the requests first', bidi: true do + it 'supports sending all the requests first' do th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, @pass) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) @@ -395,7 +592,7 @@ describe 'ClientStub' do th.join end - it 'supports client-initiated ping pong', bidi: true do + it 'supports client-initiated ping pong' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) @@ -403,18 +600,39 @@ describe 'ClientStub' do th.join end - it 'supports a server-initiated ping pong', bidi: true do + it 'supports a server-initiated ping pong' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end + + it 'should raise an error if the status is not ok' do + th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false) + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + e = get_responses(stub) + expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) + th.join + end + + # TODO: add test for metadata-related ArgumentError in a bidi call once + # issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed + + it 'should send metadata to the server ok' do + th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true, + expected_metadata: @metadata) + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + e = get_responses(stub) + expect(e.collect { |r| r }).to eq(@sent_msgs) + th.join + end end describe 'without a call operation' do def get_responses(stub) - e = stub.bidi_streamer(@method, @sent_msgs, noop, noop) + e = stub.bidi_streamer(@method, @sent_msgs, noop, noop, + metadata: @metadata) expect(e).to be_a(Enumerator) e end @@ -428,7 +646,8 @@ describe 'ClientStub' do end def get_responses(stub, run_start_call_first: false) @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop, - return_op: true) + return_op: true, + metadata: @metadata) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first e = @op.execute @@ -438,27 +657,53 @@ describe 'ClientStub' do it_behaves_like 'bidi streaming' - it 'can run start_call before executing the call' do - th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, - @pass) + def run_op_view_metadata_test(run_start_call_first) + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_bidi_streamer_echo_ping_pong( + @sent_msgs, @pass, true, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) - e = get_responses(stub, run_start_call_first: true) - expect(e.collect { |r| r }).to eq(@replys) + e = get_responses(stub, run_start_call_first: run_start_call_first) + expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end + + it 'can run start_call before executing the call' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| + responses.each { |r| p r } + end + end + + it 'doesnt crash when op_view used after call has finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| + responses.each { |r| p r } + end + end end end - def run_server_streamer(expected_input, replys, status, **kw) - wanted_metadata = kw.clone + def run_server_streamer(expected_input, replys, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end expect(c.remote_read).to eq(expected_input) replys.each { |r| c.remote_send(r) } - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end @@ -472,9 +717,17 @@ describe 'ClientStub' do end end - def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts) + def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) + wanted_metadata.each do |k, v| + expect(c.metadata[k.to_s]).to eq(v) + end expected_inputs.each do |i| if client_starts expect(c.remote_read).to eq(i) @@ -484,33 +737,44 @@ describe 'ClientStub' do expect(c.remote_read).to eq(i) end end - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end - def run_client_streamer(expected_inputs, resp, status, **kw) - wanted_metadata = kw.clone + def run_client_streamer(expected_inputs, resp, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end - def run_request_response(expected_input, resp, status, **kw) - wanted_metadata = kw.clone + def run_request_response(expected_input, resp, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) expect(c.remote_read).to eq(expected_input) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end @@ -528,13 +792,13 @@ describe 'ClientStub' do @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) end - def expect_server_to_be_invoked(notifier) + def expect_server_to_be_invoked(notifier, metadata_to_send: nil) @server.start notifier.notify(nil) recvd_rpc = @server.request_call recvd_call = recvd_rpc.call recvd_call.metadata = recvd_rpc.metadata - recvd_call.run_batch(SEND_INITIAL_METADATA => nil) + recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send) GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true) end diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index 100e9e8487..be578c40d3 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -38,14 +38,14 @@ describe GRPC::RpcDesc do shared_examples 'it handles errors' do it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:read_unary_request).once.and_return(Object.new) expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, metadata: {}) this_desc.run_server_method(@call, method(:bad_status)) end it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:read_unary_request).once.and_return(Object.new) expect(@call).to receive(:send_status).once.with(UNKNOWN, arg_error_msg, false, metadata: {}) @@ -53,7 +53,7 @@ describe GRPC::RpcDesc do end it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(CallError) + expect(@call).to receive(:read_unary_request).once.and_raise(CallError) blk = proc do this_desc.run_server_method(@call, method(:fake_reqresp)) end @@ -75,7 +75,7 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do req = Object.new - expect(@call).to receive(:remote_read).once.and_return(req) + expect(@call).to receive(:read_unary_request).once.and_return(req) expect(@call).to receive(:output_metadata).once.and_return(fake_md) expect(@call).to receive(:server_unary_response).once .with(@ok_response, trailing_metadata: fake_md) @@ -133,7 +133,7 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do req = Object.new - expect(@call).to receive(:remote_read).once.and_return(req) + expect(@call).to receive(:read_unary_request).once.and_return(req) expect(@call).to receive(:remote_send).twice.with(@ok_response) expect(@call).to receive(:output_metadata).and_return(fake_md) expect(@call).to receive(:send_status).once.with(OK, 'OK', true, diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 9633a828a2..e0646f4599 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -111,6 +111,47 @@ end SlowStub = SlowService.rpc_stub_class +# a test service that hangs onto call objects +# and uses them after the server-side call has been +# finished +class CheckCallAfterFinishedService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg + rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg) + rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg) + attr_reader :server_side_call + + def an_rpc(req, call) + fail 'shouldnt reuse service' unless @server_side_call.nil? + @server_side_call = call + req + end + + def a_client_streaming_rpc(call) + fail 'shouldnt reuse service' unless @server_side_call.nil? + @server_side_call = call + # iterate through requests so call can complete + call.each_remote_read.each { |r| p r } + EchoMsg.new + end + + def a_server_streaming_rpc(_, call) + fail 'shouldnt reuse service' unless @server_side_call.nil? + @server_side_call = call + [EchoMsg.new, EchoMsg.new] + end + + def a_bidi_rpc(requests, call) + fail 'shouldnt reuse service' unless @server_side_call.nil? + @server_side_call = call + requests.each { |r| p r } + [EchoMsg.new, EchoMsg.new] + end +end + +CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class + describe GRPC::RpcServer do RpcServer = GRPC::RpcServer StatusCodes = GRPC::Core::StatusCodes @@ -505,5 +546,109 @@ describe GRPC::RpcServer do t.join end end + + context 'when call objects are used after calls have completed' do + before(:each) do + server_opts = { + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) + @alt_host = "0.0.0.0:#{alt_port}" + + @service = CheckCallAfterFinishedService.new + @srv.handle(@service) + @srv_thd = Thread.new { @srv.run } + @srv.wait_till_running + end + + # check that the server-side call is still in a usable state even + # after it has finished + def check_single_req_view_of_finished_call(call) + common_check_of_finished_server_call(call) + + expect(call.peer).to be_a(String) + expect(call.peer_cert).to be(nil) + end + + def check_multi_req_view_of_finished_call(call) + common_check_of_finished_server_call(call) + + expect do + call.each_remote_read.each { |r| p r } + end.to raise_error(GRPC::Core::CallError) + end + + def common_check_of_finished_server_call(call) + expect do + call.merge_metadata_to_send({}) + end.to raise_error(RuntimeError) + + expect do + call.send_initial_metadata + end.to_not raise_error + + expect(call.cancelled?).to be(false) + expect(call.metadata).to be_a(Hash) + expect(call.metadata['user-agent']).to be_a(String) + + expect(call.metadata_sent).to be(true) + expect(call.output_metadata).to eq({}) + expect(call.metadata_to_send).to eq({}) + expect(call.deadline.is_a?(Time)).to be(true) + end + + it 'should not crash when call used after an unary call is finished' do + req = EchoMsg.new + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + resp = stub.an_rpc(req) + expect(resp).to be_a(EchoMsg) + @srv.stop + @srv_thd.join + + check_single_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after client streaming finished' do + requests = [EchoMsg.new, EchoMsg.new] + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + resp = stub.a_client_streaming_rpc(requests) + expect(resp).to be_a(EchoMsg) + @srv.stop + @srv_thd.join + + check_multi_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after server streaming finished' do + req = EchoMsg.new + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + responses = stub.a_server_streaming_rpc(req) + responses.each do |r| + expect(r).to be_a(EchoMsg) + end + @srv.stop + @srv_thd.join + + check_single_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after a bidi call is finished' do + requests = [EchoMsg.new, EchoMsg.new] + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + responses = stub.a_bidi_rpc(requests) + responses.each do |r| + expect(r).to be_a(EchoMsg) + end + @srv.stop + @srv_thd.join + + check_multi_req_view_of_finished_call(@service.server_side_call) + end + end end end diff --git a/src/ruby/spec/testdata/client.key b/src/ruby/spec/testdata/client.key new file mode 100644 index 0000000000..f48d0735d9 --- /dev/null +++ b/src/ruby/spec/testdata/client.key @@ -0,0 +1,16 @@ +-----BEGIN PRIVATE KEY----- +MIICeQIBADANBgkqhkiG9w0BAQEFAASCAmMwggJfAgEAAoGBAOxUR9uhvhbeVUIM +s5WbH0px0mehl2+6sZpNjzvE2KimZpHzMJHukVH0Ffkvhs0b8+S5Ut9VNUAqd3IM +JCCAEGtRNoQhM1t9Yr2zAckSvbRacp+FL/Cj9eDmyo00KsVGaeefA4Dh4OW+ZhkT +NKcldXqkSuj1sEf244JZYuqZp6/tAgMBAAECgYEAi2NSVqpZMafE5YYUTcMGe6QS +k2jtpsqYgggI2RnLJ/2tNZwYI5pwP8QVSbnMaiF4gokD5hGdrNDfTnb2v+yIwYEH +0w8+oG7Z81KodsiZSIDJfTGsAZhVNwOz9y0VD8BBZZ1/274Zh52AUKLjZS/ZwIbS +W2ywya855dPnH/wj+0ECQQD9X8D920kByTNHhBG18biAEZ4pxs9f0OAG8333eVcI +w2lJDLsYDZrCB2ocgA3lUdozlzPC7YDYw8reg0tkiRY5AkEA7sdNzOeQsQRn7++5 +0bP9DtT/iON1gbfxRzCfCfXdoOtfQWIzTePWtURt9X/5D9NofI0Rg5W2oGy/MLe5 +/sXHVQJBAIup5XrJDkQywNZyAUU2ecn2bCWBFjwtqd+LBmuMciI9fOKsZtEKZrz/ +U0lkeMRoSwvXE8wmGLjjrAbdfohrXFkCQQDZEx/LtIl6JINJQiswVe0tWr6k+ASP +1WXoTm+HYpoF/XUvv9LccNF1IazFj34hwRQwhx7w/V52Ieb+p0jUMYGxAkEAjDhd +9pBO1fKXWiXzi9ZKfoyTNcUq3eBSVKwPG2nItg5ycXengjT5sgcWDnciIzW7BIVI +JiqOszq9GWESErAatg== +-----END PRIVATE KEY----- diff --git a/src/ruby/spec/testdata/client.pem b/src/ruby/spec/testdata/client.pem new file mode 100644 index 0000000000..e332091019 --- /dev/null +++ b/src/ruby/spec/testdata/client.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICHzCCAYgCAQEwDQYJKoZIhvcNAQEFBQAwVjELMAkGA1UEBhMCQVUxEzARBgNV +BAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0 +ZDEPMA0GA1UEAwwGdGVzdGNhMB4XDTE0MDcxNzIzNTYwMloXDTI0MDcxNDIzNTYw +MlowWjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDETMBEGA1UEAwwKdGVzdGNsaWVudDCB +nzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA7FRH26G+Ft5VQgyzlZsfSnHSZ6GX +b7qxmk2PO8TYqKZmkfMwke6RUfQV+S+GzRvz5LlS31U1QCp3cgwkIIAQa1E2hCEz +W31ivbMByRK9tFpyn4Uv8KP14ObKjTQqxUZp558DgOHg5b5mGRM0pyV1eqRK6PWw +R/bjglli6pmnr+0CAwEAATANBgkqhkiG9w0BAQUFAAOBgQAStSm5PM7ubROiKK6/ +T2FkKlhiTOx+Ryenm3Eio59emq+jXl+1nhPySX5G2PQzSR5vd1dIhwgZSR4Gyttk +tRZ57k/NI1brUW8joiEOMJA/Mr7H7asx7wIRYDE91Fs8GkKWd5LhoPAQj+qdG35C +OO+svdkmqH0KZo320ZUqdl2ooQ== +-----END CERTIFICATE----- |