diff options
author | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2015-04-24 00:12:30 +0200 |
---|---|---|
committer | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2015-04-24 00:38:19 +0200 |
commit | 23be2803662c19f73a66c82c9d5dbd62b537515f (patch) | |
tree | 9d8afeb18c21cddcbfb9bb014f08f1474021db98 /src/ruby/spec/generic | |
parent | b7c2035e83a9b3e346f1fd37f9ad55c2070fb02e (diff) | |
parent | 3afd92ff511f52db3ecf892d9af65053323c89cb (diff) |
Merge branch 'master' of github.com:grpc/grpc into the-purge-2
Conflicts:
src/cpp/client/channel.cc
vsprojects/vs2010/grpc++.vcxproj
vsprojects/vs2013/grpc++.vcxproj.filters
Diffstat (limited to 'src/ruby/spec/generic')
-rw-r--r-- | src/ruby/spec/generic/active_call_spec.rb | 144 | ||||
-rw-r--r-- | src/ruby/spec/generic/client_stub_spec.rb | 137 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_desc_spec.rb | 130 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_server_pool_spec.rb | 5 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_server_spec.rb | 187 |
5 files changed, 343 insertions, 260 deletions
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 96e07cacb4..575871afb1 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -34,12 +34,11 @@ include GRPC::Core::StatusCodes describe GRPC::ActiveCall do ActiveCall = GRPC::ActiveCall Call = GRPC::Core::Call - CompletionType = GRPC::Core::CompletionType + CallOps = GRPC::Core::CallOps before(:each) do @pass_through = proc { |x| x } @server_tag = Object.new - @server_done_tag = Object.new @tag = Object.new @client_queue = GRPC::Core::CompletionQueue.new @@ -48,7 +47,7 @@ describe GRPC::ActiveCall do @server = GRPC::Core::Server.new(@server_queue, nil) server_port = @server.add_http2_port(host) @server.start - @ch = GRPC::Core::Channel.new("localhost:#{server_port}", nil) + @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil) end after(:each) do @@ -58,12 +57,10 @@ describe GRPC::ActiveCall do describe 'restricted view methods' do before(:each) do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) @client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) end describe '#multi_req_view' do @@ -90,48 +87,45 @@ describe GRPC::ActiveCall do describe '#remote_send' do it 'allows a client to send a payload to the server' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) @client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) msg = 'message is a string' @client_call.remote_send(msg) # check that server rpc new was received - @server.request_call(@server_tag) - ev = @server_queue.next(deadline) - expect(ev.type).to be(CompletionType::SERVER_RPC_NEW) - expect(ev.call).to be_a(Call) - expect(ev.tag).to be(@server_tag) + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + expect(recvd_rpc).to_not eq nil + recvd_call = recvd_rpc.call # Accept the call, and verify that the server reads the response ok. - ev.call.server_accept(@client_queue, @server_tag) - ev.call.server_end_initial_metadata - server_call = ActiveCall.new(ev.call, @client_queue, @pass_through, + server_ops = { + CallOps::SEND_INITIAL_METADATA => {} + } + recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops) + server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through, @pass_through, deadline) expect(server_call.remote_read).to eq(msg) end it 'marshals the payload using the marshal func' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + ActiveCall.client_invoke(call, @client_queue, deadline) marshal = proc { |x| 'marshalled:' + x } client_call = ActiveCall.new(call, @client_queue, marshal, - @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + @pass_through, deadline) msg = 'message is a string' client_call.remote_send(msg) # confirm that the message was marshalled - @server.request_call(@server_tag) - ev = @server_queue.next(deadline) - ev.call.server_accept(@client_queue, @server_tag) - ev.call.server_end_initial_metadata - server_call = ActiveCall.new(ev.call, @client_queue, @pass_through, + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + recvd_call = recvd_rpc.call + server_ops = { + CallOps::SEND_INITIAL_METADATA => nil + } + recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops) + server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through, @pass_through, deadline) expect(server_call.remote_read).to eq('marshalled:' + msg) end @@ -142,23 +136,22 @@ describe GRPC::ActiveCall do call = make_test_call ActiveCall.client_invoke(call, @client_queue, deadline, k1: 'v1', k2: 'v2') - @server.request_call(@server_tag) - ev = @server_queue.next(deadline) - expect(ev).to_not be_nil - expect(ev.result.metadata['k1']).to eq('v1') - expect(ev.result.metadata['k2']).to eq('v2') + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + recvd_call = recvd_rpc.call + expect(recvd_call).to_not be_nil + expect(recvd_rpc.metadata).to_not be_nil + expect(recvd_rpc.metadata['k1']).to eq('v1') + expect(recvd_rpc.metadata['k2']).to eq('v2') end end describe '#remote_read' do it 'reads the response sent by a server' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) server_call = expect_server_to_receive(msg) @@ -168,12 +161,10 @@ describe GRPC::ActiveCall do it 'saves no metadata when the server adds no metadata' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) server_call = expect_server_to_receive(msg) @@ -185,12 +176,10 @@ describe GRPC::ActiveCall do it 'saves metadata add by the server' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2') @@ -203,12 +192,10 @@ describe GRPC::ActiveCall do it 'get a nil msg before a status when an OK status is sent' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) client_call.writes_done(false) @@ -222,13 +209,11 @@ describe GRPC::ActiveCall do it 'unmarshals the response using the unmarshal func' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) unmarshal = proc { |x| 'unmarshalled:' + x } client_call = ActiveCall.new(call, @client_queue, @pass_through, unmarshal, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) # confirm the client receives the unmarshalled message msg = 'message is a string' @@ -249,13 +234,11 @@ describe GRPC::ActiveCall do it 'the returns an enumerator that can read n responses' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) - msg = 'message is 4a string' + metadata_tag: md_tag) + msg = 'message is a string' reply = 'server_response' client_call.remote_send(msg) server_call = expect_server_to_receive(msg) @@ -269,12 +252,10 @@ describe GRPC::ActiveCall do it 'the returns an enumerator that stops after an OK Status' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - read_metadata_tag: meta_tag, - finished_tag: done_tag) + metadata_tag: md_tag) msg = 'message is a string' reply = 'server_response' client_call.remote_send(msg) @@ -294,12 +275,10 @@ describe GRPC::ActiveCall do describe '#writes_done' do it 'finishes ok if the server sends a status response' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) expect { client_call.writes_done(false) }.to_not raise_error @@ -312,12 +291,10 @@ describe GRPC::ActiveCall do it 'finishes ok if the server sends an early status response' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - read_metadata_tag: meta_tag, - finished_tag: done_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) server_call = expect_server_to_receive(msg) @@ -330,12 +307,10 @@ describe GRPC::ActiveCall do it 'finishes ok if writes_done is true' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - read_metadata_tag: meta_tag, - finished_tag: done_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) server_call = expect_server_to_receive(msg) @@ -353,21 +328,20 @@ describe GRPC::ActiveCall do end def expect_server_to_be_invoked(**kw) - @server.request_call(@server_tag) - ev = @server_queue.next(deadline) - ev.call.add_metadata(kw) - ev.call.server_accept(@client_queue, @server_done_tag) - ev.call.server_end_initial_metadata - ActiveCall.new(ev.call, @client_queue, @pass_through, - @pass_through, deadline, - finished_tag: @server_done_tag) + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + expect(recvd_rpc).to_not eq nil + recvd_call = recvd_rpc.call + recvd_call.run_batch(@server_queue, @server_tag, deadline, + CallOps::SEND_INITIAL_METADATA => kw) + ActiveCall.new(recvd_call, @server_queue, @pass_through, + @pass_through, deadline) end def make_test_call - @ch.create_call('dummy_method', 'dummy_host', deadline) + @ch.create_call(@client_queue, '/method', 'a.dummy.host', deadline) end def deadline - Time.now + 1 # in 1 second; arbitrary + Time.now + 2 # in 2 seconds; arbitrary end end diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 0c98fc40d9..88c6b44c22 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -28,17 +28,42 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require 'grpc' -require 'xray/thread_dump_signal_handler' -NOOP = proc { |x| x } -FAKE_HOST = 'localhost:0' +# Notifier is useful high-level synchronization primitive. +class Notifier + attr_reader :payload, :notified + alias_method :notified?, :notified + + def initialize + @mutex = Mutex.new + @cvar = ConditionVariable.new + @notified = false + @payload = nil + end + + def wait + @mutex.synchronize do + @cvar.wait(@mutex) until notified? + end + end + + def notify(payload) + @mutex.synchronize do + return Error.new('already notified') if notified? + @payload = payload + @notified = true + @cvar.signal + return nil + end + end +end def wakey_thread(&blk) - awake_mutex, awake_cond = Mutex.new, ConditionVariable.new + n = Notifier.new t = Thread.new do - blk.call(awake_mutex, awake_cond) + blk.call(n) end - awake_mutex.synchronize { awake_cond.wait(awake_mutex) } + n.wait t end @@ -50,8 +75,11 @@ end include GRPC::Core::StatusCodes include GRPC::Core::TimeConsts +include GRPC::Core::CallOps describe 'ClientStub' do + let(:noop) { proc { |x| x } } + before(:each) do Thread.abort_on_exception = true @server = nil @@ -66,61 +94,56 @@ describe 'ClientStub' do end describe '#new' do + let(:fake_host) { 'localhost:0' } it 'can be created from a host and args' do - host = FAKE_HOST opts = { a_channel_arg: 'an_arg' } blk = proc do - GRPC::ClientStub.new(host, @cq, **opts) + GRPC::ClientStub.new(fake_host, @cq, **opts) end expect(&blk).not_to raise_error end it 'can be created with a default deadline' do - host = FAKE_HOST opts = { a_channel_arg: 'an_arg', deadline: 5 } blk = proc do - GRPC::ClientStub.new(host, @cq, **opts) + GRPC::ClientStub.new(fake_host, @cq, **opts) end expect(&blk).not_to raise_error end it 'can be created with an channel override' do - host = FAKE_HOST opts = { a_channel_arg: 'an_arg', channel_override: @ch } blk = proc do - GRPC::ClientStub.new(host, @cq, **opts) + GRPC::ClientStub.new(fake_host, @cq, **opts) end expect(&blk).not_to raise_error end it 'cannot be created with a bad channel override' do - host = FAKE_HOST blk = proc do opts = { a_channel_arg: 'an_arg', channel_override: Object.new } - GRPC::ClientStub.new(host, @cq, **opts) + GRPC::ClientStub.new(fake_host, @cq, **opts) end expect(&blk).to raise_error end it 'cannot be created with bad credentials' do - host = FAKE_HOST blk = proc do opts = { a_channel_arg: 'an_arg', creds: Object.new } - GRPC::ClientStub.new(host, @cq, **opts) + GRPC::ClientStub.new(fake_host, @cq, **opts) end expect(&blk).to raise_error end it 'can be created with test test credentials' do certs = load_test_certs - host = FAKE_HOST blk = proc do opts = { GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr', a_channel_arg: 'an_arg', creds: GRPC::Core::Credentials.new(certs[0], nil, nil) } - GRPC::ClientStub.new(host, @cq, **opts) + GRPC::ClientStub.new(fake_host, @cq, **opts) end expect(&blk).to_not raise_error end @@ -187,7 +210,7 @@ describe 'ClientStub' do describe 'without a call operation' do def get_response(stub) - stub.request_response(@method, @sent_msg, NOOP, NOOP, + stub.request_response(@method, @sent_msg, noop, noop, k1: 'v1', k2: 'v2') end @@ -196,7 +219,7 @@ describe 'ClientStub' do describe 'via a call operation' do def get_response(stub) - op = stub.request_response(@method, @sent_msg, NOOP, NOOP, + op = stub.request_response(@method, @sent_msg, noop, noop, return_op: true, k1: 'v1', k2: 'v2') expect(op).to be_a(GRPC::ActiveCall::Operation) op.execute @@ -259,7 +282,7 @@ describe 'ClientStub' do describe 'without a call operation' do def get_response(stub) - stub.client_streamer(@method, @sent_msgs, NOOP, NOOP, + stub.client_streamer(@method, @sent_msgs, noop, noop, k1: 'v1', k2: 'v2') end @@ -268,7 +291,7 @@ describe 'ClientStub' do describe 'via a call operation' do def get_response(stub) - op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP, + op = stub.client_streamer(@method, @sent_msgs, noop, noop, return_op: true, k1: 'v1', k2: 'v2') expect(op).to be_a(GRPC::ActiveCall::Operation) op.execute @@ -333,7 +356,7 @@ describe 'ClientStub' do describe 'without a call operation' do def get_responses(stub) - e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP, + e = stub.server_streamer(@method, @sent_msg, noop, noop, k1: 'v1', k2: 'v2') expect(e).to be_a(Enumerator) e @@ -344,7 +367,7 @@ describe 'ClientStub' do describe 'via a call operation' do def get_responses(stub) - op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP, + op = stub.server_streamer(@method, @sent_msg, noop, noop, return_op: true, k1: 'v1', k2: 'v2') expect(op).to be_a(GRPC::ActiveCall::Operation) e = op.execute @@ -361,34 +384,30 @@ describe 'ClientStub' do before(:each) do @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } + server_port = create_test_server + @host = "localhost:#{server_port}" end it 'supports sending all the requests first', bidi: true do - server_port = create_test_server - host = "localhost:#{server_port}" th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, @pass) - stub = GRPC::ClientStub.new(host, @cq) + stub = GRPC::ClientStub.new(@host, @cq) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@replys) th.join end it 'supports client-initiated ping pong', bidi: true do - server_port = create_test_server - host = "localhost:#{server_port}" th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true) - stub = GRPC::ClientStub.new(host, @cq) + stub = GRPC::ClientStub.new(@host, @cq) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end it 'supports a server-initiated ping pong', bidi: true do - server_port = create_test_server - host = "localhost:#{server_port}" th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false) - stub = GRPC::ClientStub.new(host, @cq) + stub = GRPC::ClientStub.new(@host, @cq) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join @@ -397,7 +416,7 @@ describe 'ClientStub' do describe 'without a call operation' do def get_responses(stub) - e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP) + e = stub.bidi_streamer(@method, @sent_msgs, noop, noop) expect(e).to be_a(Enumerator) e end @@ -407,7 +426,7 @@ describe 'ClientStub' do describe 'via a call operation' do def get_responses(stub) - op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP, + op = stub.bidi_streamer(@method, @sent_msgs, noop, noop, return_op: true) expect(op).to be_a(GRPC::ActiveCall::Operation) e = op.execute @@ -421,8 +440,8 @@ describe 'ClientStub' do def run_server_streamer(expected_input, replys, status, **kw) wanted_metadata = kw.clone - wakey_thread do |mtx, cnd| - c = expect_server_to_be_invoked(mtx, cnd) + wakey_thread do |notifier| + c = expect_server_to_be_invoked(notifier) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end @@ -434,8 +453,8 @@ describe 'ClientStub' do def run_bidi_streamer_handle_inputs_first(expected_inputs, replys, status) - wakey_thread do |mtx, cnd| - c = expect_server_to_be_invoked(mtx, cnd) + wakey_thread do |notifier| + c = expect_server_to_be_invoked(notifier) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } replys.each { |r| c.remote_send(r) } c.send_status(status, status == @pass ? 'OK' : 'NOK', true) @@ -443,8 +462,8 @@ describe 'ClientStub' do end def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts) - wakey_thread do |mtx, cnd| - c = expect_server_to_be_invoked(mtx, cnd) + wakey_thread do |notifier| + c = expect_server_to_be_invoked(notifier) expected_inputs.each do |i| if client_starts expect(c.remote_read).to eq(i) @@ -460,8 +479,8 @@ describe 'ClientStub' do def run_client_streamer(expected_inputs, resp, status, **kw) wanted_metadata = kw.clone - wakey_thread do |mtx, cnd| - c = expect_server_to_be_invoked(mtx, cnd) + wakey_thread do |notifier| + c = expect_server_to_be_invoked(notifier) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) @@ -473,8 +492,8 @@ describe 'ClientStub' do def run_request_response(expected_input, resp, status, **kw) wanted_metadata = kw.clone - wakey_thread do |mtx, cnd| - c = expect_server_to_be_invoked(mtx, cnd) + wakey_thread do |notifier| + c = expect_server_to_be_invoked(notifier) expect(c.remote_read).to eq(expected_input) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) @@ -490,24 +509,16 @@ describe 'ClientStub' do @server.add_http2_port('0.0.0.0:0') end - def start_test_server(awake_mutex, awake_cond) + def expect_server_to_be_invoked(notifier) @server.start - @server_tag = Object.new - @server.request_call(@server_tag) - awake_mutex.synchronize { awake_cond.signal } - end - - def expect_server_to_be_invoked(awake_mutex, awake_cond) - start_test_server(awake_mutex, awake_cond) - ev = @server_queue.pluck(@server_tag, INFINITE_FUTURE) - fail OutOfTime if ev.nil? - server_call = ev.call - server_call.metadata = ev.result.metadata - finished_tag = Object.new - server_call.server_accept(@server_queue, finished_tag) - server_call.server_end_initial_metadata - GRPC::ActiveCall.new(server_call, @server_queue, NOOP, NOOP, - INFINITE_FUTURE, - finished_tag: finished_tag) + notifier.notify(nil) + server_tag = Object.new + recvd_rpc = @server.request_call(@server_queue, server_tag, + INFINITE_FUTURE) + recvd_call = recvd_rpc.call + recvd_call.metadata = recvd_rpc.metadata + recvd_call.run_batch(@server_queue, server_tag, Time.now + 2, + SEND_INITIAL_METADATA => nil) + GRPC::ActiveCall.new(recvd_call, @server_queue, noop, noop, INFINITE_FUTURE) end end diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index 39d1e83748..083632a080 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -37,7 +37,6 @@ describe GRPC::RpcDesc do INTERNAL = GRPC::Core::StatusCodes::INTERNAL UNKNOWN = GRPC::Core::StatusCodes::UNKNOWN CallError = GRPC::Core::CallError - EventError = GRPC::Core::EventError before(:each) do @request_response = RpcDesc.new('rr', Object.new, Object.new, 'encode', @@ -53,49 +52,49 @@ describe GRPC::RpcDesc do @ok_response = Object.new end + shared_examples 'it handles errors' do + it 'sends the specified status if BadStatus is raised' do + expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, + {}) + this_desc.run_server_method(@call, method(:bad_status)) + end + + it 'sends status UNKNOWN if other StandardErrors are raised' do + expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason, + false, {}) + this_desc.run_server_method(@call, method(:other_error)) + end + + it 'absorbs CallError with no further action' do + expect(@call).to receive(:remote_read).once.and_raise(CallError) + blk = proc do + this_desc.run_server_method(@call, method(:fake_reqresp)) + end + expect(&blk).to_not raise_error + end + end + describe '#run_server_method' do + let(:fake_md) { { k1: 'v1', k2: 'v2' } } describe 'for request responses' do + let(:this_desc) { @request_response } before(:each) do @call = double('active_call') allow(@call).to receive(:single_req_view).and_return(@call) - allow(@call).to receive(:gc) - end - - it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK') - @request_response.run_server_method(@call, method(:bad_status)) - end - - it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason) - @request_response.run_server_method(@call, method(:other_error)) - end - - it 'absorbs EventError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(EventError) - blk = proc do - @request_response.run_server_method(@call, method(:fake_reqresp)) - end - expect(&blk).to_not raise_error end - it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(CallError) - blk = proc do - @request_response.run_server_method(@call, method(:fake_reqresp)) - end - expect(&blk).to_not raise_error - end + it_behaves_like 'it handles errors' it 'sends a response and closes the stream if there no errors' do req = Object.new expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:remote_send).once.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK') - expect(@call).to receive(:finished).once - @request_response.run_server_method(@call, method(:fake_reqresp)) + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) + this_desc.run_server_method(@call, method(:fake_reqresp)) end end @@ -103,27 +102,20 @@ describe GRPC::RpcDesc do before(:each) do @call = double('active_call') allow(@call).to receive(:multi_req_view).and_return(@call) - allow(@call).to receive(:gc) end it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK') + expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, + {}) @client_streamer.run_server_method(@call, method(:bad_status_alt)) end it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason) + expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason, + false, {}) @client_streamer.run_server_method(@call, method(:other_error_alt)) end - it 'absorbs EventError with no further action' do - expect(@call).to receive(:remote_send).once.and_raise(EventError) - blk = proc do - @client_streamer.run_server_method(@call, method(:fake_clstream)) - end - expect(&blk).to_not raise_error - end - it 'absorbs CallError with no further action' do expect(@call).to receive(:remote_send).once.and_raise(CallError) blk = proc do @@ -134,53 +126,29 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do expect(@call).to receive(:remote_send).once.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK') - expect(@call).to receive(:finished).once + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) @client_streamer.run_server_method(@call, method(:fake_clstream)) end end describe 'for server streaming' do + let(:this_desc) { @request_response } before(:each) do @call = double('active_call') allow(@call).to receive(:single_req_view).and_return(@call) - allow(@call).to receive(:gc) - end - - it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK') - @server_streamer.run_server_method(@call, method(:bad_status)) - end - - it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason) - @server_streamer.run_server_method(@call, method(:other_error)) - end - - it 'absorbs EventError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(EventError) - blk = proc do - @server_streamer.run_server_method(@call, method(:fake_svstream)) - end - expect(&blk).to_not raise_error end - it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(CallError) - blk = proc do - @server_streamer.run_server_method(@call, method(:fake_svstream)) - end - expect(&blk).to_not raise_error - end + it_behaves_like 'it handles errors' it 'sends a response and closes the stream if there no errors' do req = Object.new expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:remote_send).twice.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK') - expect(@call).to receive(:finished).once + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) @server_streamer.run_server_method(@call, method(:fake_svstream)) end end @@ -191,26 +159,28 @@ describe GRPC::RpcDesc do enq_th, rwl_th = double('enqueue_th'), ('read_write_loop_th') allow(enq_th).to receive(:join) allow(rwl_th).to receive(:join) - allow(@call).to receive(:gc) end it 'sends the specified status if BadStatus is raised' do e = GRPC::BadStatus.new(@bs_code, 'NOK') expect(@call).to receive(:run_server_bidi).and_raise(e) - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK') + expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, + {}) @bidi_streamer.run_server_method(@call, method(:bad_status_alt)) end it 'sends status UNKNOWN if other StandardErrors are raised' do expect(@call).to receive(:run_server_bidi).and_raise(StandardError) - expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason) + expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason, + false, {}) @bidi_streamer.run_server_method(@call, method(:other_error_alt)) end it 'closes the stream if there no errors' do expect(@call).to receive(:run_server_bidi) - expect(@call).to receive(:send_status).once.with(OK, 'OK') - expect(@call).to receive(:finished).once + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) @bidi_streamer.run_server_method(@call, method(:fake_bidistream)) end end diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb index 8383dc1533..aae3a7d7cb 100644 --- a/src/ruby/spec/generic/rpc_server_pool_spec.rb +++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb @@ -28,11 +28,10 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require 'grpc' -require 'xray/thread_dump_signal_handler' -Pool = GRPC::RpcServer::Pool +describe GRPC::Pool do + Pool = GRPC::Pool -describe Pool do describe '#new' do it 'raises if a non-positive size is used' do expect { Pool.new(0) }.to raise_error diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 34e5cdcd04..2cd21a15e3 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -28,7 +28,6 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require 'grpc' -require 'xray/thread_dump_signal_handler' def load_test_certs test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata') @@ -58,18 +57,20 @@ class NoRpcImplementation rpc :an_rpc, EchoMsg, EchoMsg end -# A test service with an implementation. +# A test service with an echo implementation. class EchoService include GRPC::GenericService rpc :an_rpc, EchoMsg, EchoMsg attr_reader :received_md - def initialize(_default_var = 'ignored') + def initialize(**kw) + @trailing_metadata = kw @received_md = [] end def an_rpc(req, call) logger.info('echo service received a request') + call.output_metadata.update(@trailing_metadata) @received_md << call.metadata unless call.metadata.nil? req end @@ -77,6 +78,25 @@ end EchoStub = EchoService.rpc_stub_class +# A test service with an implementation that fails with BadStatus +class FailingService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + attr_reader :details, :code, :md + + def initialize(_default_var = 'ignored') + @details = 'app error' + @code = 101 + @md = { failed_method: 'an_rpc' } + end + + def an_rpc(_req, _call) + fail GRPC::BadStatus.new(@code, @details, **@md) + end +end + +FailingStub = FailingService.rpc_stub_class + # A slow test service. class SlowService include GRPC::GenericService @@ -301,21 +321,20 @@ describe GRPC::RpcServer do end describe '#run' do - before(:each) do - @client_opts = { - channel_override: @ch - } - @marshal = EchoService.rpc_descs[:an_rpc].marshal_proc - @unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) - server_opts = { - server_override: @server, - completion_queue_override: @server_queue, - poll_period: 1 - } - @srv = RpcServer.new(**server_opts) - end + let(:client_opts) { { channel_override: @ch } } + let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc } + let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) } + + context 'with no connect_metadata' do + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + end - describe 'when running' do it 'should return NOT_FOUND status on unknown methods', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -323,8 +342,8 @@ describe GRPC::RpcServer do req = EchoMsg.new blk = proc do cq = GRPC::Core::CompletionQueue.new - stub = GRPC::ClientStub.new(@host, cq, **@client_opts) - stub.request_response('/unknown', req, @marshal, @unmarshal) + stub = GRPC::ClientStub.new(@host, cq, **client_opts) + stub.request_response('/unknown', req, marshal, unmarshal) end expect(&blk).to raise_error GRPC::BadStatus @srv.stop @@ -337,7 +356,7 @@ describe GRPC::RpcServer do @srv.wait_till_running req = EchoMsg.new n = 5 # arbitrary - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) } @srv.stop t.join @@ -349,7 +368,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] expect(service.received_md).to eq(wanted_md) @@ -363,8 +382,8 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, **@client_opts) - deadline = service.delay + 0.5 # wait for long enough + stub = SlowStub.new(@host, **client_opts) + deadline = service.delay + 1.0 # wait for long enough expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] expect(service.received_md).to eq(wanted_md) @@ -378,7 +397,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) deadline = 0.1 # too short for SlowService to respond blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') } expect(&blk).to raise_error GRPC::BadStatus @@ -388,19 +407,37 @@ describe GRPC::RpcServer do t.join end + it 'should handle cancellation correctly', server: true do + service = SlowService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = SlowStub.new(@host, **client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + Thread.new do # cancel the call + sleep 0.1 + op.cancel + end + expect { op.execute }.to raise_error GRPC::Cancelled + @srv.stop + t.join + end + it 'should receive updated metadata', server: true do service = EchoService.new @srv.handle(service) t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - @client_opts[:update_metadata] = proc do |md| + client_opts[:update_metadata] = proc do |md| md[:k1] = 'updated-v1' md end - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) - wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2' }] + wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2', + 'jwt_aud_uri' => "https://#{@host}/EchoService" }] expect(service.received_md).to eq(wanted_md) @srv.stop t.join @@ -415,7 +452,7 @@ describe GRPC::RpcServer do threads = [] n.times do threads << Thread.new do - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) q << stub.an_rpc(req) end end @@ -443,7 +480,7 @@ describe GRPC::RpcServer do one_failed_as_unavailable = false n.times do threads << Thread.new do - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) begin stub.an_rpc(req) rescue GRPC::BadStatus => e @@ -456,5 +493,97 @@ describe GRPC::RpcServer do expect(one_failed_as_unavailable).to be(true) end end + + context 'with connect metadata' do + let(:test_md_proc) do + proc do |mth, md| + res = md.clone + res['method'] = mth + res['connect_k1'] = 'connect_v1' + res + end + end + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1, + connect_md_proc: test_md_proc + } + @srv = RpcServer.new(**server_opts) + end + + it 'should send connect metadata to the client', server: true do + service = EchoService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = EchoStub.new(@host, **client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + expect(op.metadata).to be nil + expect(op.execute).to be_a(EchoMsg) + wanted_md = { + 'k1' => 'v1', + 'k2' => 'v2', + 'method' => '/EchoService/an_rpc', + 'connect_k1' => 'connect_v1' + } + expect(op.metadata).to eq(wanted_md) + @srv.stop + t.join + end + end + + context 'with trailing metadata' do + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + end + + it 'should be added to BadStatus when requests fail', server: true do + service = FailingService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = FailingStub.new(@host, **client_opts) + blk = proc { stub.an_rpc(req) } + + # confirm it raise the expected error + expect(&blk).to raise_error GRPC::BadStatus + + # call again and confirm exception contained the trailing metadata. + begin + blk.call + rescue GRPC::BadStatus => e + expect(e.code).to eq(service.code) + expect(e.details).to eq(service.details) + expect(e.metadata).to eq(service.md) + end + @srv.stop + t.join + end + + it 'should be received by the client', server: true do + wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' } + service = EchoService.new(k1: 'out_v1', k2: 'out_v2') + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = EchoStub.new(@host, **client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + expect(op.metadata).to be nil + expect(op.execute).to be_a(EchoMsg) + expect(op.metadata).to eq(wanted_trailers) + @srv.stop + t.join + end + end end end |