diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 141 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/bidi_call.rb | 62 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_desc.rb | 4 | ||||
-rw-r--r-- | src/ruby/spec/generic/active_call_spec.rb | 4 | ||||
-rw-r--r-- | src/ruby/spec/generic/client_stub_spec.rb | 136 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_desc_spec.rb | 10 |
6 files changed, 249 insertions, 108 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index cb407d236d..96c773a995 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -40,7 +40,7 @@ end module GRPC # The ActiveCall class provides simple methods for sending marshallable # data to a call - class ActiveCall + class ActiveCall # rubocop:disable Metrics/ClassLength include Core::TimeConsts include Core::CallOps extend Forwardable @@ -100,6 +100,11 @@ module GRPC fail(ArgumentError, 'Already sent md') if started && metadata_to_send @metadata_to_send = metadata_to_send || {} unless started @send_initial_md_mutex = Mutex.new + + @output_stream_done = false + @input_stream_done = false + @call_finished = false + @call_finished_mu = Mutex.new end # Sends the initial metadata that has yet to be sent. @@ -142,11 +147,9 @@ module GRPC Operation.new(self) end - # finished waits until a client call is completed. - # - # It blocks until the remote endpoint acknowledges by sending a status. - def finished + def receive_and_check_status batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) + set_input_stream_done attach_status_results_and_complete_call(batch_result) end @@ -155,8 +158,6 @@ module GRPC @call.trailing_metadata = recv_status_batch_result.status.metadata end @call.status = recv_status_batch_result.status - @call.close - op_is_done # The RECV_STATUS in run_batch always succeeds # Check the status for a bad status or failed run batch @@ -193,9 +194,19 @@ module GRPC } ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished @call.run_batch(ops) + set_output_stream_done + nil end + # Intended for use on server-side calls when a single request from + # the client is expected (i.e., unary and server-streaming RPC types). + def read_unary_request + req = remote_read + set_input_stream_done + req + end + def server_unary_response(req, trailing_metadata: {}, code: Core::StatusCodes::OK, details: 'OK') ops = {} @@ -211,6 +222,7 @@ module GRPC ops[RECV_CLOSE_ON_SERVER] = nil @call.run_batch(ops) + set_output_stream_done end # remote_read reads a response from the remote endpoint. @@ -241,6 +253,8 @@ module GRPC # each_remote_read passes each response to the given block or returns an # enumerator the responses if no block is given. + # Used to generate the request enumerable for + # server-side client-streaming RPC's. # # == Enumerator == # @@ -258,10 +272,14 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read return enum_for(:each_remote_read) unless block_given? - loop do - resp = remote_read - break if resp.nil? # the last response was received - yield resp + begin + loop do + resp = remote_read + break if resp.nil? # the last response was received + yield resp + end + ensure + set_input_stream_done end end @@ -287,13 +305,17 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read_then_finish return enum_for(:each_remote_read_then_finish) unless block_given? - loop do - resp = remote_read - if resp.nil? # the last response was received, but not finished yet - finished - break + begin + loop do + resp = remote_read + if resp.nil? # the last response was received + receive_and_check_status + break + end + yield resp end - yield resp + ensure + set_input_stream_done end end @@ -319,7 +341,15 @@ module GRPC end @metadata_sent = true end - batch_result = @call.run_batch(ops) + + begin + batch_result = @call.run_batch(ops) + # no need to check for cancellation after a CallError because this + # batch contains a RECV_STATUS op + ensure + set_input_stream_done + set_output_stream_done + end @call.metadata = batch_result.metadata attach_status_results_and_complete_call(batch_result) @@ -339,10 +369,19 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def client_streamer(requests, metadata: {}) - # Metadata might have already been sent if this is an operation view - merge_metadata_and_send_if_not_already_sent(metadata) + begin + merge_metadata_and_send_if_not_already_sent(metadata) + requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } + rescue GRPC::Core::CallError => e + receive_and_check_status # check for Cancelled + raise e + rescue => e + set_input_stream_done + raise e + ensure + set_output_stream_done + end - requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } batch_result = @call.run_batch( SEND_CLOSE_FROM_CLIENT => nil, RECV_INITIAL_METADATA => nil, @@ -350,12 +389,11 @@ module GRPC RECV_STATUS_ON_CLIENT => nil ) + set_input_stream_done + @call.metadata = batch_result.metadata attach_status_results_and_complete_call(batch_result) get_message_from_batch_result(batch_result) - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e end # server_streamer sends one request to the GRPC server, which yields a @@ -384,13 +422,22 @@ module GRPC end @metadata_sent = true end - @call.run_batch(ops) + + begin + @call.run_batch(ops) + rescue GRPC::Core::CallError => e + receive_and_check_status # checks for Cancelled + raise e + rescue => e + set_input_stream_done + raise e + ensure + set_output_stream_done + end + replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e end # bidi_streamer sends a stream of requests to the GRPC server, and yields @@ -428,7 +475,10 @@ module GRPC @unmarshal, metadata_received: @metadata_received) - bd.run_on_client(requests, @op_notifier, &blk) + bd.run_on_client(requests, + proc { set_input_stream_done }, + proc { set_output_stream_done }, + &blk) end # run_server_bidi orchestrates a BiDi stream processing on a server. @@ -449,7 +499,7 @@ module GRPC metadata_received: @metadata_received, req_view: MultiReqView.new(self)) - bd.run_on_server(gen_each_reply) + bd.run_on_server(gen_each_reply, proc { set_input_stream_done }) end # Waits till an operation completes @@ -459,7 +509,8 @@ module GRPC @op_notifier.wait end - # Signals that an operation is done + # Signals that an operation is done. + # Only relevant on the client-side (this is a no-op on the server-side) def op_is_done return if @op_notifier.nil? @op_notifier.notify(self) @@ -486,6 +537,34 @@ module GRPC private + # To be called once the "input stream" has been completelly + # read through (i.e, done reading from client or received status) + # note this is idempotent + def set_input_stream_done + @call_finished_mu.synchronize do + @input_stream_done = true + maybe_finish_and_close_call_locked + end + end + + # To be called once the "output stream" has been completelly + # sent through (i.e, done sending from client or sent status) + # note this is idempotent + def set_output_stream_done + @call_finished_mu.synchronize do + @output_stream_done = true + maybe_finish_and_close_call_locked + end + end + + def maybe_finish_and_close_call_locked + return unless @output_stream_done && @input_stream_done + return if @call_finished + @call_finished = true + op_is_done + @call.close + end + # Starts the call if not already started # @param metadata [Hash] metadata to be sent to the server. If a value is # a list, multiple metadata for its key are sent diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index e54cf78969..9e125cd986 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -62,12 +62,19 @@ module GRPC # block that can be invoked with each response. # # @param requests the Enumerable of requests to send - # @param op_notifier a Notifier used to signal completion + # @param set_input_stream_done [Proc] called back when we're done + # reading the input stream + # @param set_input_stream_done [Proc] called back when we're done + # sending data on the output stream # @return an Enumerator of requests to yield - def run_on_client(requests, op_notifier, &blk) - @op_notifier = op_notifier - @enq_th = Thread.new { write_loop(requests) } - read_loop(&blk) + def run_on_client(requests, + set_input_stream_done, + set_output_stream_done, + &blk) + @enq_th = Thread.new do + write_loop(requests, set_output_stream_done: set_output_stream_done) + end + read_loop(set_input_stream_done, &blk) end # Begins orchestration of the Bidi stream for a server generating replies. @@ -81,12 +88,17 @@ module GRPC # produced by gen_each_reply could ignore the received_msgs # # @param gen_each_reply [Proc] generates the BiDi stream replies. - def run_on_server(gen_each_reply) + # @param set_input_steam_done [Proc] call back to call when + # the reads have been completely read through. + def run_on_server(gen_each_reply, set_input_stream_done) # Pass in the optional call object parameter if possible if gen_each_reply.arity == 1 - replys = gen_each_reply.call(read_loop(is_client: false)) + replys = gen_each_reply.call( + read_loop(set_input_stream_done, is_client: false)) elsif gen_each_reply.arity == 2 - replys = gen_each_reply.call(read_loop(is_client: false), @req_view) + replys = gen_each_reply.call( + read_loop(set_input_stream_done, is_client: false), + @req_view) else fail 'Illegal arity of reply generator' end @@ -99,22 +111,6 @@ module GRPC END_OF_READS = :end_of_reads END_OF_WRITES = :end_of_writes - # signals that bidi operation is complete - def notify_done - return unless @op_notifier - GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}") - @op_notifier.notify(self) - end - - # signals that a bidi operation is complete (read + write) - def finished - @done_mutex.synchronize do - return unless @reads_complete && @writes_complete && !@complete - @call.close - @complete = true - end - end - # performs a read using @call.run_batch, ensures metadata is set up def read_using_run_batch ops = { RECV_MESSAGE => nil } @@ -127,7 +123,8 @@ module GRPC batch_result end - def write_loop(requests, is_client: true) + # set_output_stream_done is relevant on client-side + def write_loop(requests, is_client: true, set_output_stream_done: nil) GRPC.logger.debug('bidi-write-loop: starting') count = 0 requests.each do |req| @@ -151,23 +148,20 @@ module GRPC GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) GRPC.logger.debug('bidi-write-loop: done') - notify_done - @writes_complete = true - finished end GRPC.logger.debug('bidi-write-loop: finished') rescue StandardError => e GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn(e) - notify_done - @writes_complete = true - finished raise e + ensure + set_output_stream_done.call if is_client end # Provides an enumerator that yields results of remote reads - def read_loop(is_client: true) + def read_loop(set_input_stream_done, is_client: true) return enum_for(:read_loop, + set_input_stream_done, is_client: is_client) unless block_given? GRPC.logger.debug('bidi-read-loop: starting') begin @@ -201,10 +195,10 @@ module GRPC GRPC.logger.warn('bidi: read-loop failed') GRPC.logger.warn(e) raise e + ensure + set_input_stream_done.call end GRPC.logger.debug('bidi-read-loop: finished') - @reads_complete = true - finished # Make sure that the write loop is done done before finishing the call. # Note that blocking is ok at this point because we've already received # a status diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index ce0097573a..89cf8ff6a0 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -48,7 +48,7 @@ module GRPC end def handle_request_response(active_call, mth) - req = active_call.remote_read + req = active_call.read_unary_request resp = mth.call(req, active_call.single_req_view) active_call.server_unary_response( resp, trailing_metadata: active_call.output_metadata) @@ -61,7 +61,7 @@ module GRPC end def handle_server_streamer(active_call, mth) - req = active_call.remote_read + req = active_call.read_unary_request replys = mth.call(req, active_call.single_req_view) replys.each { |r| active_call.remote_send(r) } send_status(active_call, OK, 'OK', active_call.output_metadata) diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 72e55ebcce..ec0c294174 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -473,7 +473,7 @@ describe GRPC::ActiveCall do server_call.remote_send('server_response') expect(client_call.remote_read).to eq('server_response') server_call.send_status(OK, 'status code is OK') - expect { client_call.finished }.to_not raise_error + expect { client_call.receive_and_check_status }.to_not raise_error end it 'finishes ok if the server sends an early status response' do @@ -490,7 +490,7 @@ describe GRPC::ActiveCall do expect do call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) end.to_not raise_error - expect { client_call.finished }.to_not raise_error + expect { client_call.receive_and_check_status }.to_not raise_error end it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 09b88c7cef..3b8f72eda1 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -45,6 +45,7 @@ describe 'ClientStub' do @method = 'an_rpc_method' @pass = OK @fail = INTERNAL + @metadata = { k1: 'v1', k2: 'v2' } end after(:each) do @@ -107,7 +108,7 @@ describe 'ClientStub' do end end - describe '#request_response' do + describe '#request_response', request_response: true do before(:each) do @sent_msg, @resp = 'a_msg', 'a_reply' end @@ -187,13 +188,24 @@ describe 'ClientStub' do # Kill the server thread so tests can complete th.kill end + + it 'should raise ArgumentError if metadata contains invalid values' do + @metadata.merge!(k3: 3) + server_port = create_test_server + host = "localhost:#{server_port}" + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + expect do + get_response(stub) + end.to raise_error(ArgumentError, + /Header values must be of type string or array/) + end end describe 'without a call operation' do def get_response(stub, credentials: nil) puts credentials.inspect stub.request_response(@method, @sent_msg, noop, noop, - metadata: { k1: 'v1', k2: 'v2' }, + metadata: @metadata, credentials: credentials) end @@ -201,16 +213,19 @@ describe 'ClientStub' do end describe 'via a call operation' do + after(:each) do + # make sure op.wait doesn't hang, even if there's a bad status + @op.wait + end def get_response(stub, run_start_call_first: false, credentials: nil) - op = stub.request_response(@method, @sent_msg, noop, noop, - return_op: true, - metadata: { k1: 'v1', k2: 'v2' }, - deadline: from_relative_time(2), - credentials: credentials) - expect(op).to be_a(GRPC::ActiveCall::Operation) - op.start_call if run_start_call_first - result = op.execute - op.wait # make sure wait doesn't hang + @op = stub.request_response(@method, @sent_msg, noop, noop, + return_op: true, + metadata: @metadata, + deadline: from_relative_time(2), + credentials: credentials) + expect(@op).to be_a(GRPC::ActiveCall::Operation) + @op.start_call if run_start_call_first + result = @op.execute result end @@ -228,13 +243,12 @@ describe 'ClientStub' do end end - describe '#client_streamer' do + describe '#client_streamer', client_streamer: true do before(:each) do Thread.abort_on_exception = true server_port = create_test_server host = "localhost:#{server_port}" @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - @metadata = { k1: 'v1', k2: 'v2' } @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @resp = 'a_reply' end @@ -278,13 +292,16 @@ describe 'ClientStub' do end describe 'via a call operation' do + after(:each) do + # make sure op.wait doesn't hang, even if there's a bad status + @op.wait + end def get_response(stub, run_start_call_first: false) - op = stub.client_streamer(@method, @sent_msgs, noop, noop, - return_op: true, metadata: @metadata) - expect(op).to be_a(GRPC::ActiveCall::Operation) - op.start_call if run_start_call_first - result = op.execute - op.wait # make sure wait doesn't hang + @op = stub.client_streamer(@method, @sent_msgs, noop, noop, + return_op: true, metadata: @metadata) + expect(@op).to be_a(GRPC::ActiveCall::Operation) + @op.start_call if run_start_call_first + result = @op.execute result end @@ -298,7 +315,7 @@ describe 'ClientStub' do end end - describe '#server_streamer' do + describe '#server_streamer', server_streamer: true do before(:each) do @sent_msg = 'a_msg' @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @@ -334,12 +351,36 @@ describe 'ClientStub' do expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) th.join end + + it 'should raise ArgumentError if metadata contains invalid values' do + @metadata.merge!(k3: 3) + server_port = create_test_server + host = "localhost:#{server_port}" + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + expect do + get_responses(stub) + end.to raise_error(ArgumentError, + /Header values must be of type string or array/) + end + + it 'the call terminates when there is an unmarshalling error' do + server_port = create_test_server + host = "localhost:#{server_port}" + th = run_server_streamer(@sent_msg, @replys, @pass) + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + + unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') } + expect do + get_responses(stub, unmarshal: unmarshal).collect { |r| r } + end.to raise_error(ArgumentError, 'test unmarshalling error') + th.join + end end - describe 'without a call operation' do - def get_responses(stub) - e = stub.server_streamer(@method, @sent_msg, noop, noop, - metadata: { k1: 'v1', k2: 'v2' }) + describe 'without a call operation', test2: true do + def get_responses(stub, unmarshal: noop) + e = stub.server_streamer(@method, @sent_msg, noop, unmarshal, + metadata: @metadata) expect(e).to be_a(Enumerator) e end @@ -351,10 +392,10 @@ describe 'ClientStub' do after(:each) do @op.wait # make sure wait doesn't hang end - def get_responses(stub, run_start_call_first: false) - @op = stub.server_streamer(@method, @sent_msg, noop, noop, + def get_responses(stub, run_start_call_first: false, unmarshal: noop) + @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal, return_op: true, - metadata: { k1: 'v1', k2: 'v2' }) + metadata: @metadata) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first e = @op.execute @@ -377,7 +418,7 @@ describe 'ClientStub' do end end - describe '#bidi_streamer' do + describe '#bidi_streamer', bidi: true do before(:each) do @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @@ -386,7 +427,7 @@ describe 'ClientStub' do end shared_examples 'bidi streaming' do - it 'supports sending all the requests first', bidi: true do + it 'supports sending all the requests first' do th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, @pass) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) @@ -395,7 +436,7 @@ describe 'ClientStub' do th.join end - it 'supports client-initiated ping pong', bidi: true do + it 'supports client-initiated ping pong' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) @@ -403,18 +444,39 @@ describe 'ClientStub' do th.join end - it 'supports a server-initiated ping pong', bidi: true do + it 'supports a server-initiated ping pong' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end + + it 'should raise an error if the status is not ok' do + th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false) + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + e = get_responses(stub) + expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) + th.join + end + + # TODO: add test for metadata-related ArgumentError in a bidi call once + # issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed + + it 'should send metadata to the server ok' do + th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true, + **@metadata) + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + e = get_responses(stub) + expect(e.collect { |r| r }).to eq(@sent_msgs) + th.join + end end describe 'without a call operation' do def get_responses(stub) - e = stub.bidi_streamer(@method, @sent_msgs, noop, noop) + e = stub.bidi_streamer(@method, @sent_msgs, noop, noop, + metadata: @metadata) expect(e).to be_a(Enumerator) e end @@ -428,7 +490,8 @@ describe 'ClientStub' do end def get_responses(stub, run_start_call_first: false) @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop, - return_op: true) + return_op: true, + metadata: @metadata) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first e = @op.execute @@ -472,9 +535,14 @@ describe 'ClientStub' do end end - def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts) + def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts, + **kw) + wanted_metadata = kw.clone wakey_thread do |notifier| c = expect_server_to_be_invoked(notifier) + wanted_metadata.each do |k, v| + expect(c.metadata[k.to_s]).to eq(v) + end expected_inputs.each do |i| if client_starts expect(c.remote_read).to eq(i) diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index 100e9e8487..be578c40d3 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -38,14 +38,14 @@ describe GRPC::RpcDesc do shared_examples 'it handles errors' do it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:read_unary_request).once.and_return(Object.new) expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, metadata: {}) this_desc.run_server_method(@call, method(:bad_status)) end it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:read_unary_request).once.and_return(Object.new) expect(@call).to receive(:send_status).once.with(UNKNOWN, arg_error_msg, false, metadata: {}) @@ -53,7 +53,7 @@ describe GRPC::RpcDesc do end it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(CallError) + expect(@call).to receive(:read_unary_request).once.and_raise(CallError) blk = proc do this_desc.run_server_method(@call, method(:fake_reqresp)) end @@ -75,7 +75,7 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do req = Object.new - expect(@call).to receive(:remote_read).once.and_return(req) + expect(@call).to receive(:read_unary_request).once.and_return(req) expect(@call).to receive(:output_metadata).once.and_return(fake_md) expect(@call).to receive(:server_unary_response).once .with(@ok_response, trailing_metadata: fake_md) @@ -133,7 +133,7 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do req = Object.new - expect(@call).to receive(:remote_read).once.and_return(req) + expect(@call).to receive(:read_unary_request).once.and_return(req) expect(@call).to receive(:remote_send).twice.with(@ok_response) expect(@call).to receive(:output_metadata).and_return(fake_md) expect(@call).to receive(:send_status).once.with(OK, 'OK', true, |