aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2017-07-11 17:49:21 -0700
committerGravatar Alexander Polcyn <apolcyn@google.com>2017-07-13 13:51:44 -0700
commitfb1e164cd81bd15b7d2aad595510e0c083fb3d5b (patch)
tree1999ac6affcdb75f7908bc84c540232f400fdf96 /src/ruby
parent72cdf6f08242251810edae2df0b3f261afc73c2e (diff)
dont wait for gc to destroy calls on ruby server
Diffstat (limited to 'src/ruby')
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb141
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb62
-rw-r--r--src/ruby/lib/grpc/generic/rpc_desc.rb4
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb4
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb136
-rw-r--r--src/ruby/spec/generic/rpc_desc_spec.rb10
6 files changed, 249 insertions, 108 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index cb407d236d..96c773a995 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -40,7 +40,7 @@ end
module GRPC
# The ActiveCall class provides simple methods for sending marshallable
# data to a call
- class ActiveCall
+ class ActiveCall # rubocop:disable Metrics/ClassLength
include Core::TimeConsts
include Core::CallOps
extend Forwardable
@@ -100,6 +100,11 @@ module GRPC
fail(ArgumentError, 'Already sent md') if started && metadata_to_send
@metadata_to_send = metadata_to_send || {} unless started
@send_initial_md_mutex = Mutex.new
+
+ @output_stream_done = false
+ @input_stream_done = false
+ @call_finished = false
+ @call_finished_mu = Mutex.new
end
# Sends the initial metadata that has yet to be sent.
@@ -142,11 +147,9 @@ module GRPC
Operation.new(self)
end
- # finished waits until a client call is completed.
- #
- # It blocks until the remote endpoint acknowledges by sending a status.
- def finished
+ def receive_and_check_status
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
+ set_input_stream_done
attach_status_results_and_complete_call(batch_result)
end
@@ -155,8 +158,6 @@ module GRPC
@call.trailing_metadata = recv_status_batch_result.status.metadata
end
@call.status = recv_status_batch_result.status
- @call.close
- op_is_done
# The RECV_STATUS in run_batch always succeeds
# Check the status for a bad status or failed run batch
@@ -193,9 +194,19 @@ module GRPC
}
ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
@call.run_batch(ops)
+ set_output_stream_done
+
nil
end
+ # Intended for use on server-side calls when a single request from
+ # the client is expected (i.e., unary and server-streaming RPC types).
+ def read_unary_request
+ req = remote_read
+ set_input_stream_done
+ req
+ end
+
def server_unary_response(req, trailing_metadata: {},
code: Core::StatusCodes::OK, details: 'OK')
ops = {}
@@ -211,6 +222,7 @@ module GRPC
ops[RECV_CLOSE_ON_SERVER] = nil
@call.run_batch(ops)
+ set_output_stream_done
end
# remote_read reads a response from the remote endpoint.
@@ -241,6 +253,8 @@ module GRPC
# each_remote_read passes each response to the given block or returns an
# enumerator the responses if no block is given.
+ # Used to generate the request enumerable for
+ # server-side client-streaming RPC's.
#
# == Enumerator ==
#
@@ -258,10 +272,14 @@ module GRPC
# @return [Enumerator] if no block was given
def each_remote_read
return enum_for(:each_remote_read) unless block_given?
- loop do
- resp = remote_read
- break if resp.nil? # the last response was received
- yield resp
+ begin
+ loop do
+ resp = remote_read
+ break if resp.nil? # the last response was received
+ yield resp
+ end
+ ensure
+ set_input_stream_done
end
end
@@ -287,13 +305,17 @@ module GRPC
# @return [Enumerator] if no block was given
def each_remote_read_then_finish
return enum_for(:each_remote_read_then_finish) unless block_given?
- loop do
- resp = remote_read
- if resp.nil? # the last response was received, but not finished yet
- finished
- break
+ begin
+ loop do
+ resp = remote_read
+ if resp.nil? # the last response was received
+ receive_and_check_status
+ break
+ end
+ yield resp
end
- yield resp
+ ensure
+ set_input_stream_done
end
end
@@ -319,7 +341,15 @@ module GRPC
end
@metadata_sent = true
end
- batch_result = @call.run_batch(ops)
+
+ begin
+ batch_result = @call.run_batch(ops)
+ # no need to check for cancellation after a CallError because this
+ # batch contains a RECV_STATUS op
+ ensure
+ set_input_stream_done
+ set_output_stream_done
+ end
@call.metadata = batch_result.metadata
attach_status_results_and_complete_call(batch_result)
@@ -339,10 +369,19 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def client_streamer(requests, metadata: {})
- # Metadata might have already been sent if this is an operation view
- merge_metadata_and_send_if_not_already_sent(metadata)
+ begin
+ merge_metadata_and_send_if_not_already_sent(metadata)
+ requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
+ rescue GRPC::Core::CallError => e
+ receive_and_check_status # check for Cancelled
+ raise e
+ rescue => e
+ set_input_stream_done
+ raise e
+ ensure
+ set_output_stream_done
+ end
- requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
batch_result = @call.run_batch(
SEND_CLOSE_FROM_CLIENT => nil,
RECV_INITIAL_METADATA => nil,
@@ -350,12 +389,11 @@ module GRPC
RECV_STATUS_ON_CLIENT => nil
)
+ set_input_stream_done
+
@call.metadata = batch_result.metadata
attach_status_results_and_complete_call(batch_result)
get_message_from_batch_result(batch_result)
- rescue GRPC::Core::CallError => e
- finished # checks for Cancelled
- raise e
end
# server_streamer sends one request to the GRPC server, which yields a
@@ -384,13 +422,22 @@ module GRPC
end
@metadata_sent = true
end
- @call.run_batch(ops)
+
+ begin
+ @call.run_batch(ops)
+ rescue GRPC::Core::CallError => e
+ receive_and_check_status # checks for Cancelled
+ raise e
+ rescue => e
+ set_input_stream_done
+ raise e
+ ensure
+ set_output_stream_done
+ end
+
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
- rescue GRPC::Core::CallError => e
- finished # checks for Cancelled
- raise e
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -428,7 +475,10 @@ module GRPC
@unmarshal,
metadata_received: @metadata_received)
- bd.run_on_client(requests, @op_notifier, &blk)
+ bd.run_on_client(requests,
+ proc { set_input_stream_done },
+ proc { set_output_stream_done },
+ &blk)
end
# run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -449,7 +499,7 @@ module GRPC
metadata_received: @metadata_received,
req_view: MultiReqView.new(self))
- bd.run_on_server(gen_each_reply)
+ bd.run_on_server(gen_each_reply, proc { set_input_stream_done })
end
# Waits till an operation completes
@@ -459,7 +509,8 @@ module GRPC
@op_notifier.wait
end
- # Signals that an operation is done
+ # Signals that an operation is done.
+ # Only relevant on the client-side (this is a no-op on the server-side)
def op_is_done
return if @op_notifier.nil?
@op_notifier.notify(self)
@@ -486,6 +537,34 @@ module GRPC
private
+ # To be called once the "input stream" has been completelly
+ # read through (i.e, done reading from client or received status)
+ # note this is idempotent
+ def set_input_stream_done
+ @call_finished_mu.synchronize do
+ @input_stream_done = true
+ maybe_finish_and_close_call_locked
+ end
+ end
+
+ # To be called once the "output stream" has been completelly
+ # sent through (i.e, done sending from client or sent status)
+ # note this is idempotent
+ def set_output_stream_done
+ @call_finished_mu.synchronize do
+ @output_stream_done = true
+ maybe_finish_and_close_call_locked
+ end
+ end
+
+ def maybe_finish_and_close_call_locked
+ return unless @output_stream_done && @input_stream_done
+ return if @call_finished
+ @call_finished = true
+ op_is_done
+ @call.close
+ end
+
# Starts the call if not already started
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index e54cf78969..9e125cd986 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -62,12 +62,19 @@ module GRPC
# block that can be invoked with each response.
#
# @param requests the Enumerable of requests to send
- # @param op_notifier a Notifier used to signal completion
+ # @param set_input_stream_done [Proc] called back when we're done
+ # reading the input stream
+ # @param set_input_stream_done [Proc] called back when we're done
+ # sending data on the output stream
# @return an Enumerator of requests to yield
- def run_on_client(requests, op_notifier, &blk)
- @op_notifier = op_notifier
- @enq_th = Thread.new { write_loop(requests) }
- read_loop(&blk)
+ def run_on_client(requests,
+ set_input_stream_done,
+ set_output_stream_done,
+ &blk)
+ @enq_th = Thread.new do
+ write_loop(requests, set_output_stream_done: set_output_stream_done)
+ end
+ read_loop(set_input_stream_done, &blk)
end
# Begins orchestration of the Bidi stream for a server generating replies.
@@ -81,12 +88,17 @@ module GRPC
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies.
- def run_on_server(gen_each_reply)
+ # @param set_input_steam_done [Proc] call back to call when
+ # the reads have been completely read through.
+ def run_on_server(gen_each_reply, set_input_stream_done)
# Pass in the optional call object parameter if possible
if gen_each_reply.arity == 1
- replys = gen_each_reply.call(read_loop(is_client: false))
+ replys = gen_each_reply.call(
+ read_loop(set_input_stream_done, is_client: false))
elsif gen_each_reply.arity == 2
- replys = gen_each_reply.call(read_loop(is_client: false), @req_view)
+ replys = gen_each_reply.call(
+ read_loop(set_input_stream_done, is_client: false),
+ @req_view)
else
fail 'Illegal arity of reply generator'
end
@@ -99,22 +111,6 @@ module GRPC
END_OF_READS = :end_of_reads
END_OF_WRITES = :end_of_writes
- # signals that bidi operation is complete
- def notify_done
- return unless @op_notifier
- GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}")
- @op_notifier.notify(self)
- end
-
- # signals that a bidi operation is complete (read + write)
- def finished
- @done_mutex.synchronize do
- return unless @reads_complete && @writes_complete && !@complete
- @call.close
- @complete = true
- end
- end
-
# performs a read using @call.run_batch, ensures metadata is set up
def read_using_run_batch
ops = { RECV_MESSAGE => nil }
@@ -127,7 +123,8 @@ module GRPC
batch_result
end
- def write_loop(requests, is_client: true)
+ # set_output_stream_done is relevant on client-side
+ def write_loop(requests, is_client: true, set_output_stream_done: nil)
GRPC.logger.debug('bidi-write-loop: starting')
count = 0
requests.each do |req|
@@ -151,23 +148,20 @@ module GRPC
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
@call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
GRPC.logger.debug('bidi-write-loop: done')
- notify_done
- @writes_complete = true
- finished
end
GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
- notify_done
- @writes_complete = true
- finished
raise e
+ ensure
+ set_output_stream_done.call if is_client
end
# Provides an enumerator that yields results of remote reads
- def read_loop(is_client: true)
+ def read_loop(set_input_stream_done, is_client: true)
return enum_for(:read_loop,
+ set_input_stream_done,
is_client: is_client) unless block_given?
GRPC.logger.debug('bidi-read-loop: starting')
begin
@@ -201,10 +195,10 @@ module GRPC
GRPC.logger.warn('bidi: read-loop failed')
GRPC.logger.warn(e)
raise e
+ ensure
+ set_input_stream_done.call
end
GRPC.logger.debug('bidi-read-loop: finished')
- @reads_complete = true
- finished
# Make sure that the write loop is done done before finishing the call.
# Note that blocking is ok at this point because we've already received
# a status
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index ce0097573a..89cf8ff6a0 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -48,7 +48,7 @@ module GRPC
end
def handle_request_response(active_call, mth)
- req = active_call.remote_read
+ req = active_call.read_unary_request
resp = mth.call(req, active_call.single_req_view)
active_call.server_unary_response(
resp, trailing_metadata: active_call.output_metadata)
@@ -61,7 +61,7 @@ module GRPC
end
def handle_server_streamer(active_call, mth)
- req = active_call.remote_read
+ req = active_call.read_unary_request
replys = mth.call(req, active_call.single_req_view)
replys.each { |r| active_call.remote_send(r) }
send_status(active_call, OK, 'OK', active_call.output_metadata)
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 72e55ebcce..ec0c294174 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -473,7 +473,7 @@ describe GRPC::ActiveCall do
server_call.remote_send('server_response')
expect(client_call.remote_read).to eq('server_response')
server_call.send_status(OK, 'status code is OK')
- expect { client_call.finished }.to_not raise_error
+ expect { client_call.receive_and_check_status }.to_not raise_error
end
it 'finishes ok if the server sends an early status response' do
@@ -490,7 +490,7 @@ describe GRPC::ActiveCall do
expect do
call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
end.to_not raise_error
- expect { client_call.finished }.to_not raise_error
+ expect { client_call.receive_and_check_status }.to_not raise_error
end
it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 09b88c7cef..3b8f72eda1 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -45,6 +45,7 @@ describe 'ClientStub' do
@method = 'an_rpc_method'
@pass = OK
@fail = INTERNAL
+ @metadata = { k1: 'v1', k2: 'v2' }
end
after(:each) do
@@ -107,7 +108,7 @@ describe 'ClientStub' do
end
end
- describe '#request_response' do
+ describe '#request_response', request_response: true do
before(:each) do
@sent_msg, @resp = 'a_msg', 'a_reply'
end
@@ -187,13 +188,24 @@ describe 'ClientStub' do
# Kill the server thread so tests can complete
th.kill
end
+
+ it 'should raise ArgumentError if metadata contains invalid values' do
+ @metadata.merge!(k3: 3)
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+ expect do
+ get_response(stub)
+ end.to raise_error(ArgumentError,
+ /Header values must be of type string or array/)
+ end
end
describe 'without a call operation' do
def get_response(stub, credentials: nil)
puts credentials.inspect
stub.request_response(@method, @sent_msg, noop, noop,
- metadata: { k1: 'v1', k2: 'v2' },
+ metadata: @metadata,
credentials: credentials)
end
@@ -201,16 +213,19 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
+ after(:each) do
+ # make sure op.wait doesn't hang, even if there's a bad status
+ @op.wait
+ end
def get_response(stub, run_start_call_first: false, credentials: nil)
- op = stub.request_response(@method, @sent_msg, noop, noop,
- return_op: true,
- metadata: { k1: 'v1', k2: 'v2' },
- deadline: from_relative_time(2),
- credentials: credentials)
- expect(op).to be_a(GRPC::ActiveCall::Operation)
- op.start_call if run_start_call_first
- result = op.execute
- op.wait # make sure wait doesn't hang
+ @op = stub.request_response(@method, @sent_msg, noop, noop,
+ return_op: true,
+ metadata: @metadata,
+ deadline: from_relative_time(2),
+ credentials: credentials)
+ expect(@op).to be_a(GRPC::ActiveCall::Operation)
+ @op.start_call if run_start_call_first
+ result = @op.execute
result
end
@@ -228,13 +243,12 @@ describe 'ClientStub' do
end
end
- describe '#client_streamer' do
+ describe '#client_streamer', client_streamer: true do
before(:each) do
Thread.abort_on_exception = true
server_port = create_test_server
host = "localhost:#{server_port}"
@stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
- @metadata = { k1: 'v1', k2: 'v2' }
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@resp = 'a_reply'
end
@@ -278,13 +292,16 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
+ after(:each) do
+ # make sure op.wait doesn't hang, even if there's a bad status
+ @op.wait
+ end
def get_response(stub, run_start_call_first: false)
- op = stub.client_streamer(@method, @sent_msgs, noop, noop,
- return_op: true, metadata: @metadata)
- expect(op).to be_a(GRPC::ActiveCall::Operation)
- op.start_call if run_start_call_first
- result = op.execute
- op.wait # make sure wait doesn't hang
+ @op = stub.client_streamer(@method, @sent_msgs, noop, noop,
+ return_op: true, metadata: @metadata)
+ expect(@op).to be_a(GRPC::ActiveCall::Operation)
+ @op.start_call if run_start_call_first
+ result = @op.execute
result
end
@@ -298,7 +315,7 @@ describe 'ClientStub' do
end
end
- describe '#server_streamer' do
+ describe '#server_streamer', server_streamer: true do
before(:each) do
@sent_msg = 'a_msg'
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@@ -334,12 +351,36 @@ describe 'ClientStub' do
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
th.join
end
+
+ it 'should raise ArgumentError if metadata contains invalid values' do
+ @metadata.merge!(k3: 3)
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+ expect do
+ get_responses(stub)
+ end.to raise_error(ArgumentError,
+ /Header values must be of type string or array/)
+ end
+
+ it 'the call terminates when there is an unmarshalling error' do
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ th = run_server_streamer(@sent_msg, @replys, @pass)
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+
+ unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') }
+ expect do
+ get_responses(stub, unmarshal: unmarshal).collect { |r| r }
+ end.to raise_error(ArgumentError, 'test unmarshalling error')
+ th.join
+ end
end
- describe 'without a call operation' do
- def get_responses(stub)
- e = stub.server_streamer(@method, @sent_msg, noop, noop,
- metadata: { k1: 'v1', k2: 'v2' })
+ describe 'without a call operation', test2: true do
+ def get_responses(stub, unmarshal: noop)
+ e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
+ metadata: @metadata)
expect(e).to be_a(Enumerator)
e
end
@@ -351,10 +392,10 @@ describe 'ClientStub' do
after(:each) do
@op.wait # make sure wait doesn't hang
end
- def get_responses(stub, run_start_call_first: false)
- @op = stub.server_streamer(@method, @sent_msg, noop, noop,
+ def get_responses(stub, run_start_call_first: false, unmarshal: noop)
+ @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
return_op: true,
- metadata: { k1: 'v1', k2: 'v2' })
+ metadata: @metadata)
expect(@op).to be_a(GRPC::ActiveCall::Operation)
@op.start_call if run_start_call_first
e = @op.execute
@@ -377,7 +418,7 @@ describe 'ClientStub' do
end
end
- describe '#bidi_streamer' do
+ describe '#bidi_streamer', bidi: true do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@@ -386,7 +427,7 @@ describe 'ClientStub' do
end
shared_examples 'bidi streaming' do
- it 'supports sending all the requests first', bidi: true do
+ it 'supports sending all the requests first' do
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
@@ -395,7 +436,7 @@ describe 'ClientStub' do
th.join
end
- it 'supports client-initiated ping pong', bidi: true do
+ it 'supports client-initiated ping pong' do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub)
@@ -403,18 +444,39 @@ describe 'ClientStub' do
th.join
end
- it 'supports a server-initiated ping pong', bidi: true do
+ it 'supports a server-initiated ping pong' do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
+
+ it 'should raise an error if the status is not ok' do
+ th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false)
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ e = get_responses(stub)
+ expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
+ th.join
+ end
+
+ # TODO: add test for metadata-related ArgumentError in a bidi call once
+ # issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed
+
+ it 'should send metadata to the server ok' do
+ th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
+ **@metadata)
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ e = get_responses(stub)
+ expect(e.collect { |r| r }).to eq(@sent_msgs)
+ th.join
+ end
end
describe 'without a call operation' do
def get_responses(stub)
- e = stub.bidi_streamer(@method, @sent_msgs, noop, noop)
+ e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
+ metadata: @metadata)
expect(e).to be_a(Enumerator)
e
end
@@ -428,7 +490,8 @@ describe 'ClientStub' do
end
def get_responses(stub, run_start_call_first: false)
@op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
- return_op: true)
+ return_op: true,
+ metadata: @metadata)
expect(@op).to be_a(GRPC::ActiveCall::Operation)
@op.start_call if run_start_call_first
e = @op.execute
@@ -472,9 +535,14 @@ describe 'ClientStub' do
end
end
- def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts)
+ def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts,
+ **kw)
+ wanted_metadata = kw.clone
wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier)
+ wanted_metadata.each do |k, v|
+ expect(c.metadata[k.to_s]).to eq(v)
+ end
expected_inputs.each do |i|
if client_starts
expect(c.remote_read).to eq(i)
diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb
index 100e9e8487..be578c40d3 100644
--- a/src/ruby/spec/generic/rpc_desc_spec.rb
+++ b/src/ruby/spec/generic/rpc_desc_spec.rb
@@ -38,14 +38,14 @@ describe GRPC::RpcDesc do
shared_examples 'it handles errors' do
it 'sends the specified status if BadStatus is raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
+ expect(@call).to receive(:read_unary_request).once.and_return(Object.new)
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
metadata: {})
this_desc.run_server_method(@call, method(:bad_status))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
+ expect(@call).to receive(:read_unary_request).once.and_return(Object.new)
expect(@call).to receive(:send_status).once.with(UNKNOWN,
arg_error_msg,
false, metadata: {})
@@ -53,7 +53,7 @@ describe GRPC::RpcDesc do
end
it 'absorbs CallError with no further action' do
- expect(@call).to receive(:remote_read).once.and_raise(CallError)
+ expect(@call).to receive(:read_unary_request).once.and_raise(CallError)
blk = proc do
this_desc.run_server_method(@call, method(:fake_reqresp))
end
@@ -75,7 +75,7 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do
req = Object.new
- expect(@call).to receive(:remote_read).once.and_return(req)
+ expect(@call).to receive(:read_unary_request).once.and_return(req)
expect(@call).to receive(:output_metadata).once.and_return(fake_md)
expect(@call).to receive(:server_unary_response).once
.with(@ok_response, trailing_metadata: fake_md)
@@ -133,7 +133,7 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do
req = Object.new
- expect(@call).to receive(:remote_read).once.and_return(req)
+ expect(@call).to receive(:read_unary_request).once.and_return(req)
expect(@call).to receive(:remote_send).twice.with(@ok_response)
expect(@call).to receive(:output_metadata).and_return(fake_md)
expect(@call).to receive(:send_status).once.with(OK, 'OK', true,