aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/spec/generic
diff options
context:
space:
mode:
authorGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2015-04-24 00:12:30 +0200
committerGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2015-04-24 00:38:19 +0200
commit23be2803662c19f73a66c82c9d5dbd62b537515f (patch)
tree9d8afeb18c21cddcbfb9bb014f08f1474021db98 /src/ruby/spec/generic
parentb7c2035e83a9b3e346f1fd37f9ad55c2070fb02e (diff)
parent3afd92ff511f52db3ecf892d9af65053323c89cb (diff)
Merge branch 'master' of github.com:grpc/grpc into the-purge-2
Conflicts: src/cpp/client/channel.cc vsprojects/vs2010/grpc++.vcxproj vsprojects/vs2013/grpc++.vcxproj.filters
Diffstat (limited to 'src/ruby/spec/generic')
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb144
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb137
-rw-r--r--src/ruby/spec/generic/rpc_desc_spec.rb130
-rw-r--r--src/ruby/spec/generic/rpc_server_pool_spec.rb5
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb187
5 files changed, 343 insertions, 260 deletions
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 96e07cacb4..575871afb1 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -34,12 +34,11 @@ include GRPC::Core::StatusCodes
describe GRPC::ActiveCall do
ActiveCall = GRPC::ActiveCall
Call = GRPC::Core::Call
- CompletionType = GRPC::Core::CompletionType
+ CallOps = GRPC::Core::CallOps
before(:each) do
@pass_through = proc { |x| x }
@server_tag = Object.new
- @server_done_tag = Object.new
@tag = Object.new
@client_queue = GRPC::Core::CompletionQueue.new
@@ -48,7 +47,7 @@ describe GRPC::ActiveCall do
@server = GRPC::Core::Server.new(@server_queue, nil)
server_port = @server.add_http2_port(host)
@server.start
- @ch = GRPC::Core::Channel.new("localhost:#{server_port}", nil)
+ @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil)
end
after(:each) do
@@ -58,12 +57,10 @@ describe GRPC::ActiveCall do
describe 'restricted view methods' do
before(:each) do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
@client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
end
describe '#multi_req_view' do
@@ -90,48 +87,45 @@ describe GRPC::ActiveCall do
describe '#remote_send' do
it 'allows a client to send a payload to the server' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
@client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
@client_call.remote_send(msg)
# check that server rpc new was received
- @server.request_call(@server_tag)
- ev = @server_queue.next(deadline)
- expect(ev.type).to be(CompletionType::SERVER_RPC_NEW)
- expect(ev.call).to be_a(Call)
- expect(ev.tag).to be(@server_tag)
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ expect(recvd_rpc).to_not eq nil
+ recvd_call = recvd_rpc.call
# Accept the call, and verify that the server reads the response ok.
- ev.call.server_accept(@client_queue, @server_tag)
- ev.call.server_end_initial_metadata
- server_call = ActiveCall.new(ev.call, @client_queue, @pass_through,
+ server_ops = {
+ CallOps::SEND_INITIAL_METADATA => {}
+ }
+ recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+ server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
@pass_through, deadline)
expect(server_call.remote_read).to eq(msg)
end
it 'marshals the payload using the marshal func' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ ActiveCall.client_invoke(call, @client_queue, deadline)
marshal = proc { |x| 'marshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, marshal,
- @pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ @pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
# confirm that the message was marshalled
- @server.request_call(@server_tag)
- ev = @server_queue.next(deadline)
- ev.call.server_accept(@client_queue, @server_tag)
- ev.call.server_end_initial_metadata
- server_call = ActiveCall.new(ev.call, @client_queue, @pass_through,
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_call = recvd_rpc.call
+ server_ops = {
+ CallOps::SEND_INITIAL_METADATA => nil
+ }
+ recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+ server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
@pass_through, deadline)
expect(server_call.remote_read).to eq('marshalled:' + msg)
end
@@ -142,23 +136,22 @@ describe GRPC::ActiveCall do
call = make_test_call
ActiveCall.client_invoke(call, @client_queue, deadline,
k1: 'v1', k2: 'v2')
- @server.request_call(@server_tag)
- ev = @server_queue.next(deadline)
- expect(ev).to_not be_nil
- expect(ev.result.metadata['k1']).to eq('v1')
- expect(ev.result.metadata['k2']).to eq('v2')
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_call = recvd_rpc.call
+ expect(recvd_call).to_not be_nil
+ expect(recvd_rpc.metadata).to_not be_nil
+ expect(recvd_rpc.metadata['k1']).to eq('v1')
+ expect(recvd_rpc.metadata['k2']).to eq('v2')
end
end
describe '#remote_read' do
it 'reads the response sent by a server' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -168,12 +161,10 @@ describe GRPC::ActiveCall do
it 'saves no metadata when the server adds no metadata' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -185,12 +176,10 @@ describe GRPC::ActiveCall do
it 'saves metadata add by the server' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2')
@@ -203,12 +192,10 @@ describe GRPC::ActiveCall do
it 'get a nil msg before a status when an OK status is sent' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
client_call.writes_done(false)
@@ -222,13 +209,11 @@ describe GRPC::ActiveCall do
it 'unmarshals the response using the unmarshal func' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
unmarshal = proc { |x| 'unmarshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, @pass_through,
unmarshal, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
# confirm the client receives the unmarshalled message
msg = 'message is a string'
@@ -249,13 +234,11 @@ describe GRPC::ActiveCall do
it 'the returns an enumerator that can read n responses' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
- msg = 'message is 4a string'
+ metadata_tag: md_tag)
+ msg = 'message is a string'
reply = 'server_response'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -269,12 +252,10 @@ describe GRPC::ActiveCall do
it 'the returns an enumerator that stops after an OK Status' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- read_metadata_tag: meta_tag,
- finished_tag: done_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
reply = 'server_response'
client_call.remote_send(msg)
@@ -294,12 +275,10 @@ describe GRPC::ActiveCall do
describe '#writes_done' do
it 'finishes ok if the server sends a status response' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
expect { client_call.writes_done(false) }.to_not raise_error
@@ -312,12 +291,10 @@ describe GRPC::ActiveCall do
it 'finishes ok if the server sends an early status response' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- read_metadata_tag: meta_tag,
- finished_tag: done_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -330,12 +307,10 @@ describe GRPC::ActiveCall do
it 'finishes ok if writes_done is true' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- read_metadata_tag: meta_tag,
- finished_tag: done_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -353,21 +328,20 @@ describe GRPC::ActiveCall do
end
def expect_server_to_be_invoked(**kw)
- @server.request_call(@server_tag)
- ev = @server_queue.next(deadline)
- ev.call.add_metadata(kw)
- ev.call.server_accept(@client_queue, @server_done_tag)
- ev.call.server_end_initial_metadata
- ActiveCall.new(ev.call, @client_queue, @pass_through,
- @pass_through, deadline,
- finished_tag: @server_done_tag)
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ expect(recvd_rpc).to_not eq nil
+ recvd_call = recvd_rpc.call
+ recvd_call.run_batch(@server_queue, @server_tag, deadline,
+ CallOps::SEND_INITIAL_METADATA => kw)
+ ActiveCall.new(recvd_call, @server_queue, @pass_through,
+ @pass_through, deadline)
end
def make_test_call
- @ch.create_call('dummy_method', 'dummy_host', deadline)
+ @ch.create_call(@client_queue, '/method', 'a.dummy.host', deadline)
end
def deadline
- Time.now + 1 # in 1 second; arbitrary
+ Time.now + 2 # in 2 seconds; arbitrary
end
end
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 0c98fc40d9..88c6b44c22 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -28,17 +28,42 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc'
-require 'xray/thread_dump_signal_handler'
-NOOP = proc { |x| x }
-FAKE_HOST = 'localhost:0'
+# Notifier is useful high-level synchronization primitive.
+class Notifier
+ attr_reader :payload, :notified
+ alias_method :notified?, :notified
+
+ def initialize
+ @mutex = Mutex.new
+ @cvar = ConditionVariable.new
+ @notified = false
+ @payload = nil
+ end
+
+ def wait
+ @mutex.synchronize do
+ @cvar.wait(@mutex) until notified?
+ end
+ end
+
+ def notify(payload)
+ @mutex.synchronize do
+ return Error.new('already notified') if notified?
+ @payload = payload
+ @notified = true
+ @cvar.signal
+ return nil
+ end
+ end
+end
def wakey_thread(&blk)
- awake_mutex, awake_cond = Mutex.new, ConditionVariable.new
+ n = Notifier.new
t = Thread.new do
- blk.call(awake_mutex, awake_cond)
+ blk.call(n)
end
- awake_mutex.synchronize { awake_cond.wait(awake_mutex) }
+ n.wait
t
end
@@ -50,8 +75,11 @@ end
include GRPC::Core::StatusCodes
include GRPC::Core::TimeConsts
+include GRPC::Core::CallOps
describe 'ClientStub' do
+ let(:noop) { proc { |x| x } }
+
before(:each) do
Thread.abort_on_exception = true
@server = nil
@@ -66,61 +94,56 @@ describe 'ClientStub' do
end
describe '#new' do
+ let(:fake_host) { 'localhost:0' }
it 'can be created from a host and args' do
- host = FAKE_HOST
opts = { a_channel_arg: 'an_arg' }
blk = proc do
- GRPC::ClientStub.new(host, @cq, **opts)
+ GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).not_to raise_error
end
it 'can be created with a default deadline' do
- host = FAKE_HOST
opts = { a_channel_arg: 'an_arg', deadline: 5 }
blk = proc do
- GRPC::ClientStub.new(host, @cq, **opts)
+ GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).not_to raise_error
end
it 'can be created with an channel override' do
- host = FAKE_HOST
opts = { a_channel_arg: 'an_arg', channel_override: @ch }
blk = proc do
- GRPC::ClientStub.new(host, @cq, **opts)
+ GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).not_to raise_error
end
it 'cannot be created with a bad channel override' do
- host = FAKE_HOST
blk = proc do
opts = { a_channel_arg: 'an_arg', channel_override: Object.new }
- GRPC::ClientStub.new(host, @cq, **opts)
+ GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).to raise_error
end
it 'cannot be created with bad credentials' do
- host = FAKE_HOST
blk = proc do
opts = { a_channel_arg: 'an_arg', creds: Object.new }
- GRPC::ClientStub.new(host, @cq, **opts)
+ GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).to raise_error
end
it 'can be created with test test credentials' do
certs = load_test_certs
- host = FAKE_HOST
blk = proc do
opts = {
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',
a_channel_arg: 'an_arg',
creds: GRPC::Core::Credentials.new(certs[0], nil, nil)
}
- GRPC::ClientStub.new(host, @cq, **opts)
+ GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).to_not raise_error
end
@@ -187,7 +210,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_response(stub)
- stub.request_response(@method, @sent_msg, NOOP, NOOP,
+ stub.request_response(@method, @sent_msg, noop, noop,
k1: 'v1', k2: 'v2')
end
@@ -196,7 +219,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_response(stub)
- op = stub.request_response(@method, @sent_msg, NOOP, NOOP,
+ op = stub.request_response(@method, @sent_msg, noop, noop,
return_op: true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute
@@ -259,7 +282,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_response(stub)
- stub.client_streamer(@method, @sent_msgs, NOOP, NOOP,
+ stub.client_streamer(@method, @sent_msgs, noop, noop,
k1: 'v1', k2: 'v2')
end
@@ -268,7 +291,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_response(stub)
- op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP,
+ op = stub.client_streamer(@method, @sent_msgs, noop, noop,
return_op: true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute
@@ -333,7 +356,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_responses(stub)
- e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP,
+ e = stub.server_streamer(@method, @sent_msg, noop, noop,
k1: 'v1', k2: 'v2')
expect(e).to be_a(Enumerator)
e
@@ -344,7 +367,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_responses(stub)
- op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP,
+ op = stub.server_streamer(@method, @sent_msg, noop, noop,
return_op: true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute
@@ -361,34 +384,30 @@ describe 'ClientStub' do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
+ server_port = create_test_server
+ @host = "localhost:#{server_port}"
end
it 'supports sending all the requests first', bidi: true do
- server_port = create_test_server
- host = "localhost:#{server_port}"
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass)
- stub = GRPC::ClientStub.new(host, @cq)
+ stub = GRPC::ClientStub.new(@host, @cq)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@replys)
th.join
end
it 'supports client-initiated ping pong', bidi: true do
- server_port = create_test_server
- host = "localhost:#{server_port}"
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
- stub = GRPC::ClientStub.new(host, @cq)
+ stub = GRPC::ClientStub.new(@host, @cq)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
it 'supports a server-initiated ping pong', bidi: true do
- server_port = create_test_server
- host = "localhost:#{server_port}"
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
- stub = GRPC::ClientStub.new(host, @cq)
+ stub = GRPC::ClientStub.new(@host, @cq)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
@@ -397,7 +416,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_responses(stub)
- e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP)
+ e = stub.bidi_streamer(@method, @sent_msgs, noop, noop)
expect(e).to be_a(Enumerator)
e
end
@@ -407,7 +426,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_responses(stub)
- op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP,
+ op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
return_op: true)
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute
@@ -421,8 +440,8 @@ describe 'ClientStub' do
def run_server_streamer(expected_input, replys, status, **kw)
wanted_metadata = kw.clone
- wakey_thread do |mtx, cnd|
- c = expect_server_to_be_invoked(mtx, cnd)
+ wakey_thread do |notifier|
+ c = expect_server_to_be_invoked(notifier)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
@@ -434,8 +453,8 @@ describe 'ClientStub' do
def run_bidi_streamer_handle_inputs_first(expected_inputs, replys,
status)
- wakey_thread do |mtx, cnd|
- c = expect_server_to_be_invoked(mtx, cnd)
+ wakey_thread do |notifier|
+ c = expect_server_to_be_invoked(notifier)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
replys.each { |r| c.remote_send(r) }
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
@@ -443,8 +462,8 @@ describe 'ClientStub' do
end
def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts)
- wakey_thread do |mtx, cnd|
- c = expect_server_to_be_invoked(mtx, cnd)
+ wakey_thread do |notifier|
+ c = expect_server_to_be_invoked(notifier)
expected_inputs.each do |i|
if client_starts
expect(c.remote_read).to eq(i)
@@ -460,8 +479,8 @@ describe 'ClientStub' do
def run_client_streamer(expected_inputs, resp, status, **kw)
wanted_metadata = kw.clone
- wakey_thread do |mtx, cnd|
- c = expect_server_to_be_invoked(mtx, cnd)
+ wakey_thread do |notifier|
+ c = expect_server_to_be_invoked(notifier)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
@@ -473,8 +492,8 @@ describe 'ClientStub' do
def run_request_response(expected_input, resp, status, **kw)
wanted_metadata = kw.clone
- wakey_thread do |mtx, cnd|
- c = expect_server_to_be_invoked(mtx, cnd)
+ wakey_thread do |notifier|
+ c = expect_server_to_be_invoked(notifier)
expect(c.remote_read).to eq(expected_input)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
@@ -490,24 +509,16 @@ describe 'ClientStub' do
@server.add_http2_port('0.0.0.0:0')
end
- def start_test_server(awake_mutex, awake_cond)
+ def expect_server_to_be_invoked(notifier)
@server.start
- @server_tag = Object.new
- @server.request_call(@server_tag)
- awake_mutex.synchronize { awake_cond.signal }
- end
-
- def expect_server_to_be_invoked(awake_mutex, awake_cond)
- start_test_server(awake_mutex, awake_cond)
- ev = @server_queue.pluck(@server_tag, INFINITE_FUTURE)
- fail OutOfTime if ev.nil?
- server_call = ev.call
- server_call.metadata = ev.result.metadata
- finished_tag = Object.new
- server_call.server_accept(@server_queue, finished_tag)
- server_call.server_end_initial_metadata
- GRPC::ActiveCall.new(server_call, @server_queue, NOOP, NOOP,
- INFINITE_FUTURE,
- finished_tag: finished_tag)
+ notifier.notify(nil)
+ server_tag = Object.new
+ recvd_rpc = @server.request_call(@server_queue, server_tag,
+ INFINITE_FUTURE)
+ recvd_call = recvd_rpc.call
+ recvd_call.metadata = recvd_rpc.metadata
+ recvd_call.run_batch(@server_queue, server_tag, Time.now + 2,
+ SEND_INITIAL_METADATA => nil)
+ GRPC::ActiveCall.new(recvd_call, @server_queue, noop, noop, INFINITE_FUTURE)
end
end
diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb
index 39d1e83748..083632a080 100644
--- a/src/ruby/spec/generic/rpc_desc_spec.rb
+++ b/src/ruby/spec/generic/rpc_desc_spec.rb
@@ -37,7 +37,6 @@ describe GRPC::RpcDesc do
INTERNAL = GRPC::Core::StatusCodes::INTERNAL
UNKNOWN = GRPC::Core::StatusCodes::UNKNOWN
CallError = GRPC::Core::CallError
- EventError = GRPC::Core::EventError
before(:each) do
@request_response = RpcDesc.new('rr', Object.new, Object.new, 'encode',
@@ -53,49 +52,49 @@ describe GRPC::RpcDesc do
@ok_response = Object.new
end
+ shared_examples 'it handles errors' do
+ it 'sends the specified status if BadStatus is raised' do
+ expect(@call).to receive(:remote_read).once.and_return(Object.new)
+ expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
+ {})
+ this_desc.run_server_method(@call, method(:bad_status))
+ end
+
+ it 'sends status UNKNOWN if other StandardErrors are raised' do
+ expect(@call).to receive(:remote_read).once.and_return(Object.new)
+ expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason,
+ false, {})
+ this_desc.run_server_method(@call, method(:other_error))
+ end
+
+ it 'absorbs CallError with no further action' do
+ expect(@call).to receive(:remote_read).once.and_raise(CallError)
+ blk = proc do
+ this_desc.run_server_method(@call, method(:fake_reqresp))
+ end
+ expect(&blk).to_not raise_error
+ end
+ end
+
describe '#run_server_method' do
+ let(:fake_md) { { k1: 'v1', k2: 'v2' } }
describe 'for request responses' do
+ let(:this_desc) { @request_response }
before(:each) do
@call = double('active_call')
allow(@call).to receive(:single_req_view).and_return(@call)
- allow(@call).to receive(:gc)
- end
-
- it 'sends the specified status if BadStatus is raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
- expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
- @request_response.run_server_method(@call, method(:bad_status))
- end
-
- it 'sends status UNKNOWN if other StandardErrors are raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
- expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason)
- @request_response.run_server_method(@call, method(:other_error))
- end
-
- it 'absorbs EventError with no further action' do
- expect(@call).to receive(:remote_read).once.and_raise(EventError)
- blk = proc do
- @request_response.run_server_method(@call, method(:fake_reqresp))
- end
- expect(&blk).to_not raise_error
end
- it 'absorbs CallError with no further action' do
- expect(@call).to receive(:remote_read).once.and_raise(CallError)
- blk = proc do
- @request_response.run_server_method(@call, method(:fake_reqresp))
- end
- expect(&blk).to_not raise_error
- end
+ it_behaves_like 'it handles errors'
it 'sends a response and closes the stream if there no errors' do
req = Object.new
expect(@call).to receive(:remote_read).once.and_return(req)
expect(@call).to receive(:remote_send).once.with(@ok_response)
- expect(@call).to receive(:send_status).once.with(OK, 'OK')
- expect(@call).to receive(:finished).once
- @request_response.run_server_method(@call, method(:fake_reqresp))
+ expect(@call).to receive(:output_metadata).and_return(fake_md)
+ expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
+ **fake_md)
+ this_desc.run_server_method(@call, method(:fake_reqresp))
end
end
@@ -103,27 +102,20 @@ describe GRPC::RpcDesc do
before(:each) do
@call = double('active_call')
allow(@call).to receive(:multi_req_view).and_return(@call)
- allow(@call).to receive(:gc)
end
it 'sends the specified status if BadStatus is raised' do
- expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
+ expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
+ {})
@client_streamer.run_server_method(@call, method(:bad_status_alt))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
- expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason)
+ expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason,
+ false, {})
@client_streamer.run_server_method(@call, method(:other_error_alt))
end
- it 'absorbs EventError with no further action' do
- expect(@call).to receive(:remote_send).once.and_raise(EventError)
- blk = proc do
- @client_streamer.run_server_method(@call, method(:fake_clstream))
- end
- expect(&blk).to_not raise_error
- end
-
it 'absorbs CallError with no further action' do
expect(@call).to receive(:remote_send).once.and_raise(CallError)
blk = proc do
@@ -134,53 +126,29 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do
expect(@call).to receive(:remote_send).once.with(@ok_response)
- expect(@call).to receive(:send_status).once.with(OK, 'OK')
- expect(@call).to receive(:finished).once
+ expect(@call).to receive(:output_metadata).and_return(fake_md)
+ expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
+ **fake_md)
@client_streamer.run_server_method(@call, method(:fake_clstream))
end
end
describe 'for server streaming' do
+ let(:this_desc) { @request_response }
before(:each) do
@call = double('active_call')
allow(@call).to receive(:single_req_view).and_return(@call)
- allow(@call).to receive(:gc)
- end
-
- it 'sends the specified status if BadStatus is raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
- expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
- @server_streamer.run_server_method(@call, method(:bad_status))
- end
-
- it 'sends status UNKNOWN if other StandardErrors are raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
- expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason)
- @server_streamer.run_server_method(@call, method(:other_error))
- end
-
- it 'absorbs EventError with no further action' do
- expect(@call).to receive(:remote_read).once.and_raise(EventError)
- blk = proc do
- @server_streamer.run_server_method(@call, method(:fake_svstream))
- end
- expect(&blk).to_not raise_error
end
- it 'absorbs CallError with no further action' do
- expect(@call).to receive(:remote_read).once.and_raise(CallError)
- blk = proc do
- @server_streamer.run_server_method(@call, method(:fake_svstream))
- end
- expect(&blk).to_not raise_error
- end
+ it_behaves_like 'it handles errors'
it 'sends a response and closes the stream if there no errors' do
req = Object.new
expect(@call).to receive(:remote_read).once.and_return(req)
expect(@call).to receive(:remote_send).twice.with(@ok_response)
- expect(@call).to receive(:send_status).once.with(OK, 'OK')
- expect(@call).to receive(:finished).once
+ expect(@call).to receive(:output_metadata).and_return(fake_md)
+ expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
+ **fake_md)
@server_streamer.run_server_method(@call, method(:fake_svstream))
end
end
@@ -191,26 +159,28 @@ describe GRPC::RpcDesc do
enq_th, rwl_th = double('enqueue_th'), ('read_write_loop_th')
allow(enq_th).to receive(:join)
allow(rwl_th).to receive(:join)
- allow(@call).to receive(:gc)
end
it 'sends the specified status if BadStatus is raised' do
e = GRPC::BadStatus.new(@bs_code, 'NOK')
expect(@call).to receive(:run_server_bidi).and_raise(e)
- expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
+ expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
+ {})
@bidi_streamer.run_server_method(@call, method(:bad_status_alt))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
expect(@call).to receive(:run_server_bidi).and_raise(StandardError)
- expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason)
+ expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason,
+ false, {})
@bidi_streamer.run_server_method(@call, method(:other_error_alt))
end
it 'closes the stream if there no errors' do
expect(@call).to receive(:run_server_bidi)
- expect(@call).to receive(:send_status).once.with(OK, 'OK')
- expect(@call).to receive(:finished).once
+ expect(@call).to receive(:output_metadata).and_return(fake_md)
+ expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
+ **fake_md)
@bidi_streamer.run_server_method(@call, method(:fake_bidistream))
end
end
diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb
index 8383dc1533..aae3a7d7cb 100644
--- a/src/ruby/spec/generic/rpc_server_pool_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb
@@ -28,11 +28,10 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc'
-require 'xray/thread_dump_signal_handler'
-Pool = GRPC::RpcServer::Pool
+describe GRPC::Pool do
+ Pool = GRPC::Pool
-describe Pool do
describe '#new' do
it 'raises if a non-positive size is used' do
expect { Pool.new(0) }.to raise_error
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 34e5cdcd04..2cd21a15e3 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -28,7 +28,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc'
-require 'xray/thread_dump_signal_handler'
def load_test_certs
test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
@@ -58,18 +57,20 @@ class NoRpcImplementation
rpc :an_rpc, EchoMsg, EchoMsg
end
-# A test service with an implementation.
+# A test service with an echo implementation.
class EchoService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
attr_reader :received_md
- def initialize(_default_var = 'ignored')
+ def initialize(**kw)
+ @trailing_metadata = kw
@received_md = []
end
def an_rpc(req, call)
logger.info('echo service received a request')
+ call.output_metadata.update(@trailing_metadata)
@received_md << call.metadata unless call.metadata.nil?
req
end
@@ -77,6 +78,25 @@ end
EchoStub = EchoService.rpc_stub_class
+# A test service with an implementation that fails with BadStatus
+class FailingService
+ include GRPC::GenericService
+ rpc :an_rpc, EchoMsg, EchoMsg
+ attr_reader :details, :code, :md
+
+ def initialize(_default_var = 'ignored')
+ @details = 'app error'
+ @code = 101
+ @md = { failed_method: 'an_rpc' }
+ end
+
+ def an_rpc(_req, _call)
+ fail GRPC::BadStatus.new(@code, @details, **@md)
+ end
+end
+
+FailingStub = FailingService.rpc_stub_class
+
# A slow test service.
class SlowService
include GRPC::GenericService
@@ -301,21 +321,20 @@ describe GRPC::RpcServer do
end
describe '#run' do
- before(:each) do
- @client_opts = {
- channel_override: @ch
- }
- @marshal = EchoService.rpc_descs[:an_rpc].marshal_proc
- @unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output)
- server_opts = {
- server_override: @server,
- completion_queue_override: @server_queue,
- poll_period: 1
- }
- @srv = RpcServer.new(**server_opts)
- end
+ let(:client_opts) { { channel_override: @ch } }
+ let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc }
+ let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) }
+
+ context 'with no connect_metadata' do
+ before(:each) do
+ server_opts = {
+ server_override: @server,
+ completion_queue_override: @server_queue,
+ poll_period: 1
+ }
+ @srv = RpcServer.new(**server_opts)
+ end
- describe 'when running' do
it 'should return NOT_FOUND status on unknown methods', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@@ -323,8 +342,8 @@ describe GRPC::RpcServer do
req = EchoMsg.new
blk = proc do
cq = GRPC::Core::CompletionQueue.new
- stub = GRPC::ClientStub.new(@host, cq, **@client_opts)
- stub.request_response('/unknown', req, @marshal, @unmarshal)
+ stub = GRPC::ClientStub.new(@host, cq, **client_opts)
+ stub.request_response('/unknown', req, marshal, unmarshal)
end
expect(&blk).to raise_error GRPC::BadStatus
@srv.stop
@@ -337,7 +356,7 @@ describe GRPC::RpcServer do
@srv.wait_till_running
req = EchoMsg.new
n = 5 # arbitrary
- stub = EchoStub.new(@host, **@client_opts)
+ stub = EchoStub.new(@host, **client_opts)
n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) }
@srv.stop
t.join
@@ -349,7 +368,7 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
- stub = EchoStub.new(@host, **@client_opts)
+ stub = EchoStub.new(@host, **client_opts)
expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
expect(service.received_md).to eq(wanted_md)
@@ -363,8 +382,8 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
- stub = SlowStub.new(@host, **@client_opts)
- deadline = service.delay + 0.5 # wait for long enough
+ stub = SlowStub.new(@host, **client_opts)
+ deadline = service.delay + 1.0 # wait for long enough
expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
expect(service.received_md).to eq(wanted_md)
@@ -378,7 +397,7 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
- stub = SlowStub.new(@host, **@client_opts)
+ stub = SlowStub.new(@host, **client_opts)
deadline = 0.1 # too short for SlowService to respond
blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') }
expect(&blk).to raise_error GRPC::BadStatus
@@ -388,19 +407,37 @@ describe GRPC::RpcServer do
t.join
end
+ it 'should handle cancellation correctly', server: true do
+ service = SlowService.new
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ stub = SlowStub.new(@host, **client_opts)
+ op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
+ Thread.new do # cancel the call
+ sleep 0.1
+ op.cancel
+ end
+ expect { op.execute }.to raise_error GRPC::Cancelled
+ @srv.stop
+ t.join
+ end
+
it 'should receive updated metadata', server: true do
service = EchoService.new
@srv.handle(service)
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
- @client_opts[:update_metadata] = proc do |md|
+ client_opts[:update_metadata] = proc do |md|
md[:k1] = 'updated-v1'
md
end
- stub = EchoStub.new(@host, **@client_opts)
+ stub = EchoStub.new(@host, **client_opts)
expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
- wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2' }]
+ wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2',
+ 'jwt_aud_uri' => "https://#{@host}/EchoService" }]
expect(service.received_md).to eq(wanted_md)
@srv.stop
t.join
@@ -415,7 +452,7 @@ describe GRPC::RpcServer do
threads = []
n.times do
threads << Thread.new do
- stub = EchoStub.new(@host, **@client_opts)
+ stub = EchoStub.new(@host, **client_opts)
q << stub.an_rpc(req)
end
end
@@ -443,7 +480,7 @@ describe GRPC::RpcServer do
one_failed_as_unavailable = false
n.times do
threads << Thread.new do
- stub = SlowStub.new(@host, **@client_opts)
+ stub = SlowStub.new(@host, **client_opts)
begin
stub.an_rpc(req)
rescue GRPC::BadStatus => e
@@ -456,5 +493,97 @@ describe GRPC::RpcServer do
expect(one_failed_as_unavailable).to be(true)
end
end
+
+ context 'with connect metadata' do
+ let(:test_md_proc) do
+ proc do |mth, md|
+ res = md.clone
+ res['method'] = mth
+ res['connect_k1'] = 'connect_v1'
+ res
+ end
+ end
+ before(:each) do
+ server_opts = {
+ server_override: @server,
+ completion_queue_override: @server_queue,
+ poll_period: 1,
+ connect_md_proc: test_md_proc
+ }
+ @srv = RpcServer.new(**server_opts)
+ end
+
+ it 'should send connect metadata to the client', server: true do
+ service = EchoService.new
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ stub = EchoStub.new(@host, **client_opts)
+ op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
+ expect(op.metadata).to be nil
+ expect(op.execute).to be_a(EchoMsg)
+ wanted_md = {
+ 'k1' => 'v1',
+ 'k2' => 'v2',
+ 'method' => '/EchoService/an_rpc',
+ 'connect_k1' => 'connect_v1'
+ }
+ expect(op.metadata).to eq(wanted_md)
+ @srv.stop
+ t.join
+ end
+ end
+
+ context 'with trailing metadata' do
+ before(:each) do
+ server_opts = {
+ server_override: @server,
+ completion_queue_override: @server_queue,
+ poll_period: 1
+ }
+ @srv = RpcServer.new(**server_opts)
+ end
+
+ it 'should be added to BadStatus when requests fail', server: true do
+ service = FailingService.new
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ stub = FailingStub.new(@host, **client_opts)
+ blk = proc { stub.an_rpc(req) }
+
+ # confirm it raise the expected error
+ expect(&blk).to raise_error GRPC::BadStatus
+
+ # call again and confirm exception contained the trailing metadata.
+ begin
+ blk.call
+ rescue GRPC::BadStatus => e
+ expect(e.code).to eq(service.code)
+ expect(e.details).to eq(service.details)
+ expect(e.metadata).to eq(service.md)
+ end
+ @srv.stop
+ t.join
+ end
+
+ it 'should be received by the client', server: true do
+ wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' }
+ service = EchoService.new(k1: 'out_v1', k2: 'out_v2')
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ stub = EchoStub.new(@host, **client_opts)
+ op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
+ expect(op.metadata).to be nil
+ expect(op.execute).to be_a(EchoMsg)
+ expect(op.metadata).to eq(wanted_trailers)
+ @srv.stop
+ t.join
+ end
+ end
end
end