diff options
Diffstat (limited to 'src/ruby/spec/generic')
-rw-r--r-- | src/ruby/spec/generic/active_call_spec.rb | 494 | ||||
-rw-r--r-- | src/ruby/spec/generic/client_stub_spec.rb | 62 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_desc_spec.rb | 22 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_server_spec.rb | 498 | ||||
-rw-r--r-- | src/ruby/spec/generic/service_spec.rb | 11 |
5 files changed, 570 insertions, 517 deletions
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 872625ccf0..ceeef2a1d8 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -31,291 +31,289 @@ require 'grpc' require 'grpc/generic/active_call' require_relative '../port_picker' -module GRPC +ActiveCall = GRPC::ActiveCall + +describe GRPC::ActiveCall do + + before(:each) do + @pass_through = Proc.new { |x| x } + @server_tag = Object.new + @server_finished_tag = Object.new + @tag = Object.new + + @client_queue = GRPC::Core::CompletionQueue.new + @server_queue = GRPC::Core::CompletionQueue.new + port = find_unused_tcp_port + host = "localhost:#{port}" + @server = GRPC::Core::Server.new(@server_queue, nil) + @server.add_http2_port(host) + @server.start + @ch = GRPC::Core::Channel.new(host, nil) + end - describe ActiveCall do + after(:each) do + @server.close + end + describe 'restricted view methods' do before(:each) do - @pass_through = Proc.new { |x| x } - @server_tag = Object.new - @server_finished_tag = Object.new - @tag = Object.new - - @client_queue = CompletionQueue.new - @server_queue = CompletionQueue.new - port = find_unused_tcp_port - host = "localhost:#{port}" - @server = GRPC::Server.new(@server_queue, nil) - @server.add_http2_port(host) - @server.start - @ch = GRPC::Channel.new(host, nil) - end - - after(:each) do - @server.close + call = make_test_call + finished_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) + @client_call = ActiveCall.new(call, @client_queue, @pass_through, + @pass_through, deadline, + finished_tag: finished_tag) end - describe 'restricted view methods' do - before(:each) do - call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) - @client_call = ActiveCall.new(call, @client_queue, @pass_through, - @pass_through, deadline, - finished_tag: finished_tag) - end - - describe '#multi_req_view' do - it 'exposes a fixed subset of the ActiveCall methods' do - want = ['cancelled', 'deadline', 'each_remote_read', 'shutdown'] - v = @client_call.multi_req_view - want.each do |w| - expect(v.methods.include?(w)) - end + describe '#multi_req_view' do + it 'exposes a fixed subset of the ActiveCall methods' do + want = ['cancelled', 'deadline', 'each_remote_read', 'shutdown'] + v = @client_call.multi_req_view + want.each do |w| + expect(v.methods.include?(w)) end end + end - describe '#single_req_view' do - it 'exposes a fixed subset of the ActiveCall methods' do - want = ['cancelled', 'deadline', 'shutdown'] - v = @client_call.single_req_view - want.each do |w| - expect(v.methods.include?(w)) - end + describe '#single_req_view' do + it 'exposes a fixed subset of the ActiveCall methods' do + want = ['cancelled', 'deadline', 'shutdown'] + v = @client_call.single_req_view + want.each do |w| + expect(v.methods.include?(w)) end end end + end - describe '#remote_send' do - it 'allows a client to send a payload to the server' do - call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) - @client_call = ActiveCall.new(call, @client_queue, @pass_through, - @pass_through, deadline, - finished_tag: finished_tag) - msg = 'message is a string' - @client_call.remote_send(msg) - - # check that server rpc new was received - @server.request_call(@server_tag) - ev = @server_queue.next(deadline) - expect(ev.type).to be(CompletionType::SERVER_RPC_NEW) - expect(ev.call).to be_a(Call) - expect(ev.tag).to be(@server_tag) - - # Accept the call, and verify that the server reads the response ok. - ev.call.accept(@client_queue, @server_tag) - server_call = ActiveCall.new(ev.call, @client_queue, @pass_through, - @pass_through, deadline) - expect(server_call.remote_read).to eq(msg) - end - - it 'marshals the payload using the marshal func' do - call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) - marshal = Proc.new { |x| 'marshalled:' + x } - client_call = ActiveCall.new(call, @client_queue, marshal, - @pass_through, deadline, - finished_tag: finished_tag) - msg = 'message is a string' - client_call.remote_send(msg) - - # confirm that the message was marshalled - @server.request_call(@server_tag) - ev = @server_queue.next(deadline) - ev.call.accept(@client_queue, @server_tag) - server_call = ActiveCall.new(ev.call, @client_queue, @pass_through, - @pass_through, deadline) - expect(server_call.remote_read).to eq('marshalled:' + msg) - end - + describe '#remote_send' do + it 'allows a client to send a payload to the server' do + call = make_test_call + finished_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) + @client_call = ActiveCall.new(call, @client_queue, @pass_through, + @pass_through, deadline, + finished_tag: finished_tag) + msg = 'message is a string' + @client_call.remote_send(msg) + + # check that server rpc new was received + @server.request_call(@server_tag) + ev = @server_queue.next(deadline) + expect(ev.type).to be(CompletionType::SERVER_RPC_NEW) + expect(ev.call).to be_a(Call) + expect(ev.tag).to be(@server_tag) + + # Accept the call, and verify that the server reads the response ok. + ev.call.accept(@client_queue, @server_tag) + server_call = ActiveCall.new(ev.call, @client_queue, @pass_through, + @pass_through, deadline) + expect(server_call.remote_read).to eq(msg) end - describe '#remote_read' do - it 'reads the response sent by a server' do - call, pass_through = make_test_call, Proc.new { |x| x } - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) - client_call = ActiveCall.new(call, @client_queue, @pass_through, - @pass_through, deadline, - finished_tag: finished_tag) - msg = 'message is a string' - client_call.remote_send(msg) - server_call = expect_server_to_receive(msg) - server_call.remote_send('server_response') - expect(client_call.remote_read).to eq('server_response') - end - - it 'get a nil msg before a status when an OK status is sent' do - call, pass_through = make_test_call, Proc.new { |x| x } - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) - client_call = ActiveCall.new(call, @client_queue, @pass_through, - @pass_through, deadline, - finished_tag: finished_tag) - msg = 'message is a string' - client_call.remote_send(msg) - client_call.writes_done(false) - server_call = expect_server_to_receive(msg) - server_call.remote_send('server_response') - server_call.send_status(StatusCodes::OK, 'OK') - expect(client_call.remote_read).to eq('server_response') - res = client_call.remote_read - expect(res).to be_nil - end + it 'marshals the payload using the marshal func' do + call = make_test_call + finished_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) + marshal = Proc.new { |x| 'marshalled:' + x } + client_call = ActiveCall.new(call, @client_queue, marshal, + @pass_through, deadline, + finished_tag: finished_tag) + msg = 'message is a string' + client_call.remote_send(msg) + + # confirm that the message was marshalled + @server.request_call(@server_tag) + ev = @server_queue.next(deadline) + ev.call.accept(@client_queue, @server_tag) + server_call = ActiveCall.new(ev.call, @client_queue, @pass_through, + @pass_through, deadline) + expect(server_call.remote_read).to eq('marshalled:' + msg) + end + end - it 'unmarshals the response using the unmarshal func' do - call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) - unmarshal = Proc.new { |x| 'unmarshalled:' + x } - client_call = ActiveCall.new(call, @client_queue, @pass_through, - unmarshal, deadline, - finished_tag: finished_tag) - - # confirm the client receives the unmarshalled message - msg = 'message is a string' - client_call.remote_send(msg) - server_call = expect_server_to_receive(msg) - server_call.remote_send('server_response') - expect(client_call.remote_read).to eq('unmarshalled:server_response') - end + describe '#remote_read' do + it 'reads the response sent by a server' do + call, pass_through = make_test_call, Proc.new { |x| x } + finished_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) + client_call = ActiveCall.new(call, @client_queue, @pass_through, + @pass_through, deadline, + finished_tag: finished_tag) + msg = 'message is a string' + client_call.remote_send(msg) + server_call = expect_server_to_receive(msg) + server_call.remote_send('server_response') + expect(client_call.remote_read).to eq('server_response') + end + it 'get a nil msg before a status when an OK status is sent' do + call, pass_through = make_test_call, Proc.new { |x| x } + finished_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) + client_call = ActiveCall.new(call, @client_queue, @pass_through, + @pass_through, deadline, + finished_tag: finished_tag) + msg = 'message is a string' + client_call.remote_send(msg) + client_call.writes_done(false) + server_call = expect_server_to_receive(msg) + server_call.remote_send('server_response') + server_call.send_status(StatusCodes::OK, 'OK') + expect(client_call.remote_read).to eq('server_response') + res = client_call.remote_read + expect(res).to be_nil end - describe '#each_remote_read' do - it 'creates an Enumerator' do - call = make_test_call - client_call = ActiveCall.new(call, @client_queue, @pass_through, - @pass_through, deadline) - expect(client_call.each_remote_read).to be_a(Enumerator) - end - it 'the returns an enumerator that can read n responses' do - call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) - client_call = ActiveCall.new(call, @client_queue, @pass_through, - @pass_through, deadline, - finished_tag: finished_tag) - msg = 'message is 4a string' - reply = 'server_response' - client_call.remote_send(msg) - server_call = expect_server_to_receive(msg) - e = client_call.each_remote_read - n = 3 # arbitrary value > 1 - n.times do - server_call.remote_send(reply) - expect(e.next).to eq(reply) - end - end + it 'unmarshals the response using the unmarshal func' do + call = make_test_call + finished_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) + unmarshal = Proc.new { |x| 'unmarshalled:' + x } + client_call = ActiveCall.new(call, @client_queue, @pass_through, + unmarshal, deadline, + finished_tag: finished_tag) + + # confirm the client receives the unmarshalled message + msg = 'message is a string' + client_call.remote_send(msg) + server_call = expect_server_to_receive(msg) + server_call.remote_send('server_response') + expect(client_call.remote_read).to eq('unmarshalled:server_response') + end - it 'the returns an enumerator that stops after an OK Status' do - call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) - client_call = ActiveCall.new(call, @client_queue, @pass_through, - @pass_through, deadline, - finished_tag: finished_tag) - msg = 'message is a string' - reply = 'server_response' - client_call.remote_send(msg) - client_call.writes_done(false) - server_call = expect_server_to_receive(msg) - e = client_call.each_remote_read - n = 3 # arbitrary value > 1 - n.times do - server_call.remote_send(reply) - expect(e.next).to eq(reply) - end - server_call.send_status(StatusCodes::OK, 'OK') - expect { e.next }.to raise_error(StopIteration) - end + end + describe '#each_remote_read' do + it 'creates an Enumerator' do + call = make_test_call + client_call = ActiveCall.new(call, @client_queue, @pass_through, + @pass_through, deadline) + expect(client_call.each_remote_read).to be_a(Enumerator) end - describe '#writes_done' do - it 'finishes ok if the server sends a status response' do - call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) - client_call = ActiveCall.new(call, @client_queue, @pass_through, - @pass_through, deadline, - finished_tag: finished_tag) - msg = 'message is a string' - client_call.remote_send(msg) - expect { client_call.writes_done(false) }.to_not raise_error - server_call = expect_server_to_receive(msg) - server_call.remote_send('server_response') - expect(client_call.remote_read).to eq('server_response') - server_call.send_status(StatusCodes::OK, 'status code is OK') - expect { server_call.finished }.to_not raise_error - expect { client_call.finished }.to_not raise_error + it 'the returns an enumerator that can read n responses' do + call = make_test_call + finished_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) + client_call = ActiveCall.new(call, @client_queue, @pass_through, + @pass_through, deadline, + finished_tag: finished_tag) + msg = 'message is 4a string' + reply = 'server_response' + client_call.remote_send(msg) + server_call = expect_server_to_receive(msg) + e = client_call.each_remote_read + n = 3 # arbitrary value > 1 + n.times do + server_call.remote_send(reply) + expect(e.next).to eq(reply) end + end - it 'finishes ok if the server sends an early status response' do - call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) - client_call = ActiveCall.new(call, @client_queue, @pass_through, - @pass_through, deadline, - finished_tag: finished_tag) - msg = 'message is a string' - client_call.remote_send(msg) - server_call = expect_server_to_receive(msg) - server_call.remote_send('server_response') - server_call.send_status(StatusCodes::OK, 'status code is OK') - expect(client_call.remote_read).to eq('server_response') - expect { client_call.writes_done(false) }.to_not raise_error - expect { server_call.finished }.to_not raise_error - expect { client_call.finished }.to_not raise_error + it 'the returns an enumerator that stops after an OK Status' do + call = make_test_call + finished_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) + client_call = ActiveCall.new(call, @client_queue, @pass_through, + @pass_through, deadline, + finished_tag: finished_tag) + msg = 'message is a string' + reply = 'server_response' + client_call.remote_send(msg) + client_call.writes_done(false) + server_call = expect_server_to_receive(msg) + e = client_call.each_remote_read + n = 3 # arbitrary value > 1 + n.times do + server_call.remote_send(reply) + expect(e.next).to eq(reply) end + server_call.send_status(StatusCodes::OK, 'OK') + expect { e.next }.to raise_error(StopIteration) + end - it 'finishes ok if writes_done is true' do - call = make_test_call - finished_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) - client_call = ActiveCall.new(call, @client_queue, @pass_through, - @pass_through, deadline, - finished_tag: finished_tag) - msg = 'message is a string' - client_call.remote_send(msg) - server_call = expect_server_to_receive(msg) - server_call.remote_send('server_response') - server_call.send_status(StatusCodes::OK, 'status code is OK') - expect(client_call.remote_read).to eq('server_response') - expect { client_call.writes_done(true) }.to_not raise_error - expect { server_call.finished }.to_not raise_error - end + end + describe '#writes_done' do + it 'finishes ok if the server sends a status response' do + call = make_test_call + finished_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) + client_call = ActiveCall.new(call, @client_queue, @pass_through, + @pass_through, deadline, + finished_tag: finished_tag) + msg = 'message is a string' + client_call.remote_send(msg) + expect { client_call.writes_done(false) }.to_not raise_error + server_call = expect_server_to_receive(msg) + server_call.remote_send('server_response') + expect(client_call.remote_read).to eq('server_response') + server_call.send_status(StatusCodes::OK, 'status code is OK') + expect { server_call.finished }.to_not raise_error + expect { client_call.finished }.to_not raise_error end - def expect_server_to_receive(sent_text) - c = expect_server_to_be_invoked - expect(c.remote_read).to eq(sent_text) - c + it 'finishes ok if the server sends an early status response' do + call = make_test_call + finished_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) + client_call = ActiveCall.new(call, @client_queue, @pass_through, + @pass_through, deadline, + finished_tag: finished_tag) + msg = 'message is a string' + client_call.remote_send(msg) + server_call = expect_server_to_receive(msg) + server_call.remote_send('server_response') + server_call.send_status(StatusCodes::OK, 'status code is OK') + expect(client_call.remote_read).to eq('server_response') + expect { client_call.writes_done(false) }.to_not raise_error + expect { server_call.finished }.to_not raise_error + expect { client_call.finished }.to_not raise_error end - def expect_server_to_be_invoked() - @server.request_call(@server_tag) - ev = @server_queue.next(deadline) - ev.call.accept(@client_queue, @server_finished_tag) - ActiveCall.new(ev.call, @client_queue, @pass_through, - @pass_through, deadline, - finished_tag: @server_finished_tag) + it 'finishes ok if writes_done is true' do + call = make_test_call + finished_tag = ActiveCall.client_start_invoke(call, @client_queue, + deadline) + client_call = ActiveCall.new(call, @client_queue, @pass_through, + @pass_through, deadline, + finished_tag: finished_tag) + msg = 'message is a string' + client_call.remote_send(msg) + server_call = expect_server_to_receive(msg) + server_call.remote_send('server_response') + server_call.send_status(StatusCodes::OK, 'status code is OK') + expect(client_call.remote_read).to eq('server_response') + expect { client_call.writes_done(true) }.to_not raise_error + expect { server_call.finished }.to_not raise_error end - def make_test_call - @ch.create_call('dummy_method', 'dummy_host', deadline) - end + end - def deadline - Time.now + 0.25 # in 0.25 seconds; arbitrary - end + def expect_server_to_receive(sent_text) + c = expect_server_to_be_invoked + expect(c.remote_read).to eq(sent_text) + c + end + + def expect_server_to_be_invoked() + @server.request_call(@server_tag) + ev = @server_queue.next(deadline) + ev.call.accept(@client_queue, @server_finished_tag) + ActiveCall.new(ev.call, @client_queue, @pass_through, + @pass_through, deadline, + finished_tag: @server_finished_tag) + end + + def make_test_call + @ch.create_call('dummy_method', 'dummy_host', deadline) + end + def deadline + Time.now + 0.25 # in 0.25 seconds; arbitrary end end diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index c8dee74563..4b01af9581 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -44,12 +44,16 @@ def wakey_thread(&blk) t end +def load_test_certs + test_root = File.join(File.parent(File.dirname(__FILE__)), 'testdata') + files = ['ca.pem', 'server1.key', 'server1.pem'] + files.map { |f| File.open(File.join(test_root, f)).read } +end -include GRPC::StatusCodes +include GRPC::Core::StatusCodes +include GRPC::Core::TimeConsts describe 'ClientStub' do - BadStatus = GRPC::BadStatus - TimeConsts = GRPC::TimeConsts before(:each) do Thread.abort_on_exception = true @@ -57,7 +61,7 @@ describe 'ClientStub' do @method = 'an_rpc_method' @pass = OK @fail = INTERNAL - @cq = GRPC::CompletionQueue.new + @cq = GRPC::Core::CompletionQueue.new end after(:each) do @@ -102,6 +106,29 @@ describe 'ClientStub' do expect(&blk).to raise_error end + it 'cannot be created with bad credentials' do + host = new_test_host + blk = Proc.new do + opts = {:a_channel_arg => 'an_arg', :creds => Object.new} + GRPC::ClientStub.new(host, @cq, **opts) + end + expect(&blk).to raise_error + end + + it 'can be created with test test credentials' do + certs = load_test_certs + host = new_test_host + blk = Proc.new do + opts = { + GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.com', + :a_channel_arg => 'an_arg', + :creds => GRPC::Core::Credentials.new(certs[0], nil, nil) + } + GRPC::ClientStub.new(host, @cq, **opts) + end + expect(&blk).to_not raise_error + end + end describe '#request_response' do @@ -123,7 +150,7 @@ describe 'ClientStub' do it 'should send a request when configured using an override channel' do alt_host = new_test_host th = run_request_response(alt_host, @sent_msg, @resp, @pass) - ch = GRPC::Channel.new(alt_host, nil) + ch = GRPC::Core::Channel.new(alt_host, nil) stub = GRPC::ClientStub.new('ignored-host', @cq, channel_override:ch) resp = stub.request_response(@method, @sent_msg, NOOP, NOOP) @@ -138,7 +165,7 @@ describe 'ClientStub' do blk = Proc.new do stub.request_response(@method, @sent_msg, NOOP, NOOP) end - expect(&blk).to raise_error(BadStatus) + expect(&blk).to raise_error(GRPC::BadStatus) th.join end @@ -168,7 +195,7 @@ describe 'ClientStub' do blk = Proc.new do op.execute() end - expect(&blk).to raise_error(BadStatus) + expect(&blk).to raise_error(GRPC::BadStatus) th.join end @@ -309,7 +336,7 @@ describe 'ClientStub' do describe 'without a call operation' do - it 'supports a simple scenario with all requests sent first' do + it 'supports sending all the requests first', :bidi => true do host = new_test_host th = run_bidi_streamer_handle_inputs_first(host, @sent_msgs, @replys, @pass) @@ -320,7 +347,7 @@ describe 'ClientStub' do th.join end - it 'supports a simple scenario with a client-initiated ping pong' do + it 'supports client-initiated ping pong', :bidi => true do host = new_test_host th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, true) stub = GRPC::ClientStub.new(host, @cq) @@ -336,7 +363,7 @@ describe 'ClientStub' do # servers don't know if all the client metadata has been sent until # they receive a message from the client. Without receiving all the # metadata, the server does not accept the call, so this test hangs. - xit 'supports a simple scenario with a server-initiated ping pong' do + xit 'supports a server-initiated ping pong', :bidi => true do host = new_test_host th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, false) stub = GRPC::ClientStub.new(host, @cq) @@ -350,7 +377,7 @@ describe 'ClientStub' do describe 'via a call operation' do - it 'supports a simple scenario with all requests sent first' do + it 'supports sending all the requests first', :bidi => true do host = new_test_host th = run_bidi_streamer_handle_inputs_first(host, @sent_msgs, @replys, @pass) @@ -364,7 +391,7 @@ describe 'ClientStub' do th.join end - it 'supports a simple scenario with a client-initiated ping pong' do + it 'supports client-initiated ping pong', :bidi => true do host = new_test_host th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, true) stub = GRPC::ClientStub.new(host, @cq) @@ -383,7 +410,7 @@ describe 'ClientStub' do # servers don't know if all the client metadata has been sent until # they receive a message from the client. Without receiving all the # metadata, the server does not accept the call, so this test hangs. - xit 'supports a simple scenario with a server-initiated ping pong' do + xit 'supports server-initiated ping pong', :bidi => true do th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, false) stub = GRPC::ClientStub.new(host, @cq) op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP, @@ -454,8 +481,8 @@ describe 'ClientStub' do end def start_test_server(hostname, awake_mutex, awake_cond) - server_queue = GRPC::CompletionQueue.new - @server = GRPC::Server.new(server_queue, nil) + server_queue = GRPC::Core::CompletionQueue.new + @server = GRPC::Core::Server.new(server_queue, nil) @server.add_http2_port(hostname) @server.start @server_tag = Object.new @@ -467,12 +494,11 @@ describe 'ClientStub' do def expect_server_to_be_invoked(hostname, awake_mutex, awake_cond) server_queue = start_test_server(hostname, awake_mutex, awake_cond) test_deadline = Time.now + 10 # fail tests after 10 seconds - ev = server_queue.pluck(@server_tag, TimeConsts::INFINITE_FUTURE) + ev = server_queue.pluck(@server_tag, INFINITE_FUTURE) raise OutOfTime if ev.nil? finished_tag = Object.new ev.call.accept(server_queue, finished_tag) - GRPC::ActiveCall.new(ev.call, server_queue, NOOP, - NOOP, TimeConsts::INFINITE_FUTURE, + GRPC::ActiveCall.new(ev.call, server_queue, NOOP, NOOP, INFINITE_FUTURE, finished_tag: finished_tag) end diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index 141fb1187d..efef7e4686 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -35,8 +35,11 @@ describe GRPC::RpcDesc do RpcDesc = GRPC::RpcDesc Stream = RpcDesc::Stream - OK = GRPC::StatusCodes::OK - UNKNOWN = GRPC::StatusCodes::UNKNOWN + OK = GRPC::Core::StatusCodes::OK + INTERNAL = GRPC::Core::StatusCodes::INTERNAL + UNKNOWN = GRPC::Core::StatusCodes::UNKNOWN + CallError = GRPC::Core::CallError + EventError = GRPC::Core::EventError before(:each) do @request_response = RpcDesc.new('rr', Object.new, Object.new, 'encode', @@ -47,7 +50,7 @@ describe GRPC::RpcDesc do 'encode', 'decode') @bidi_streamer = RpcDesc.new('ss', Stream.new(Object.new), Stream.new(Object.new), 'encode', 'decode') - @bs_code = GRPC::StatusCodes::INTERNAL + @bs_code = INTERNAL @no_reason = 'no reason given' @ok_response = Object.new end @@ -74,7 +77,7 @@ describe GRPC::RpcDesc do end it 'absorbs EventError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(GRPC::EventError) + expect(@call).to receive(:remote_read).once.and_raise(EventError) blk = Proc.new do @request_response.run_server_method(@call, method(:fake_reqresp)) end @@ -82,7 +85,7 @@ describe GRPC::RpcDesc do end it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(GRPC::CallError) + expect(@call).to receive(:remote_read).once.and_raise(CallError) blk = Proc.new do @request_response.run_server_method(@call, method(:fake_reqresp)) end @@ -118,7 +121,7 @@ describe GRPC::RpcDesc do end it 'absorbs EventError with no further action' do - expect(@call).to receive(:remote_send).once.and_raise(GRPC::EventError) + expect(@call).to receive(:remote_send).once.and_raise(EventError) blk = Proc.new do @client_streamer.run_server_method(@call, method(:fake_clstream)) end @@ -126,7 +129,7 @@ describe GRPC::RpcDesc do end it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_send).once.and_raise(GRPC::CallError) + expect(@call).to receive(:remote_send).once.and_raise(CallError) blk = Proc.new do @client_streamer.run_server_method(@call, method(:fake_clstream)) end @@ -163,7 +166,7 @@ describe GRPC::RpcDesc do end it 'absorbs EventError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(GRPC::EventError) + expect(@call).to receive(:remote_read).once.and_raise(EventError) blk = Proc.new do @server_streamer.run_server_method(@call, method(:fake_svstream)) end @@ -171,7 +174,7 @@ describe GRPC::RpcDesc do end it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(GRPC::CallError) + expect(@call).to receive(:remote_read).once.and_raise(CallError) blk = Proc.new do @server_streamer.run_server_method(@call, method(:fake_svstream)) end @@ -377,4 +380,3 @@ describe GRPC::RpcDesc do end end - diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 4e7379bc45..fc579a6c3f 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -35,8 +35,14 @@ require 'grpc/generic/service' require 'xray/thread_dump_signal_handler' require_relative '../port_picker' +def load_test_certs + test_root = File.join(File.parent(File.dirname(__FILE__)), 'testdata') + files = ['ca.pem', 'server1.key', 'server1.pem'] + files.map { |f| File.open(File.join(test_root, f)).read } +end + class EchoMsg - def marshal + def self.marshal(o) '' end @@ -86,302 +92,324 @@ end SlowStub = SlowService.rpc_stub_class -module GRPC +describe GRPC::RpcServer do - describe RpcServer do + RpcServer = GRPC::RpcServer - before(:each) do - @method = 'an_rpc_method' - @pass = 0 - @fail = 1 - @noop = Proc.new { |x| x } - - @server_queue = CompletionQueue.new - port = find_unused_tcp_port - @host = "localhost:#{port}" - @server = GRPC::Server.new(@server_queue, nil) - @server.add_http2_port(@host) - @ch = GRPC::Channel.new(@host, nil) - end + before(:each) do + @method = 'an_rpc_method' + @pass = 0 + @fail = 1 + @noop = Proc.new { |x| x } + + @server_queue = GRPC::Core::CompletionQueue.new + port = find_unused_tcp_port + @host = "localhost:#{port}" + @server = GRPC::Core::Server.new(@server_queue, nil) + @server.add_http2_port(@host) + @ch = GRPC::Core::Channel.new(@host, nil) + end + + after(:each) do + @server.close + end + + describe '#new' do - after(:each) do - @server.close + it 'can be created with just some args' do + opts = {:a_channel_arg => 'an_arg'} + blk = Proc.new do + RpcServer.new(**opts) + end + expect(&blk).not_to raise_error end - describe '#new' do + it 'can be created with a default deadline' do + opts = {:a_channel_arg => 'an_arg', :deadline => 5} + blk = Proc.new do + RpcServer.new(**opts) + end + expect(&blk).not_to raise_error + end - it 'can be created with just some args' do - opts = {:a_channel_arg => 'an_arg'} - blk = Proc.new do - RpcServer.new(**opts) - end - expect(&blk).not_to raise_error + it 'can be created with a completion queue override' do + opts = { + :a_channel_arg => 'an_arg', + :completion_queue_override => @server_queue + } + blk = Proc.new do + RpcServer.new(**opts) end + expect(&blk).not_to raise_error + end - it 'can be created with a default deadline' do - opts = {:a_channel_arg => 'an_arg', :deadline => 5} - blk = Proc.new do - RpcServer.new(**opts) - end - expect(&blk).not_to raise_error + it 'cannot be created with a bad completion queue override' do + blk = Proc.new do + opts = { + :a_channel_arg => 'an_arg', + :completion_queue_override => Object.new + } + RpcServer.new(**opts) end + expect(&blk).to raise_error + end - it 'can be created with a completion queue override' do + it 'cannot be created with invalid ServerCredentials' do + blk = Proc.new do opts = { :a_channel_arg => 'an_arg', - :completion_queue_override => @server_queue + :creds => Object.new } - blk = Proc.new do - RpcServer.new(**opts) - end - expect(&blk).not_to raise_error + RpcServer.new(**opts) end + expect(&blk).to raise_error + end - it 'cannot be created with a bad completion queue override' do - blk = Proc.new do - opts = { - :a_channel_arg => 'an_arg', - :completion_queue_override => Object.new - } - RpcServer.new(**opts) - end - expect(&blk).to raise_error + it 'can be created with the creds as valid ServerCedentials' do + certs = load_test_certs + server_creds = GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2]) + blk = Proc.new do + opts = { + :a_channel_arg => 'an_arg', + :creds => server_creds + } + RpcServer.new(**opts) end + expect(&blk).to_not raise_error + end - it 'can be created with a server override' do - opts = {:a_channel_arg => 'an_arg', :server_override => @server} - blk = Proc.new do - RpcServer.new(**opts) - end - expect(&blk).not_to raise_error + it 'can be created with a server override' do + opts = {:a_channel_arg => 'an_arg', :server_override => @server} + blk = Proc.new do + RpcServer.new(**opts) end + expect(&blk).not_to raise_error + end - it 'cannot be created with a bad server override' do - blk = Proc.new do - opts = { - :a_channel_arg => 'an_arg', - :server_override => Object.new - } - RpcServer.new(**opts) - end - expect(&blk).to raise_error + it 'cannot be created with a bad server override' do + blk = Proc.new do + opts = { + :a_channel_arg => 'an_arg', + :server_override => Object.new + } + RpcServer.new(**opts) end + expect(&blk).to raise_error + end + + end + describe '#stopped?' do + + before(:each) do + opts = {:a_channel_arg => 'an_arg', :poll_period => 1} + @srv = RpcServer.new(**opts) end - describe '#stopped?' do + it 'starts out false' do + expect(@srv.stopped?).to be(false) + end - before(:each) do - opts = {:a_channel_arg => 'an_arg', :poll_period => 1} - @srv = RpcServer.new(**opts) - end + it 'stays false after a #stop is called before #run' do + @srv.stop + expect(@srv.stopped?).to be(false) + end - it 'starts out false' do - expect(@srv.stopped?).to be(false) - end + it 'stays false after the server starts running' do + @srv.handle(EchoService) + t = Thread.new { @srv.run } + @srv.wait_till_running + expect(@srv.stopped?).to be(false) + @srv.stop + t.join + end - it 'stays false after a #stop is called before #run' do - @srv.stop - expect(@srv.stopped?).to be(false) - end + it 'is true after a running server is stopped' do + @srv.handle(EchoService) + t = Thread.new { @srv.run } + @srv.wait_till_running + @srv.stop + expect(@srv.stopped?).to be(true) + t.join + end - it 'stays false after the server starts running' do - @srv.handle(EchoService) - t = Thread.new { @srv.run } - @srv.wait_till_running - expect(@srv.stopped?).to be(false) - @srv.stop - t.join - end + end - it 'is true after a running server is stopped' do - @srv.handle(EchoService) - t = Thread.new { @srv.run } - @srv.wait_till_running - @srv.stop - expect(@srv.stopped?).to be(true) - t.join - end + describe '#running?' do + + it 'starts out false' do + opts = {:a_channel_arg => 'an_arg', :server_override => @server} + r = RpcServer.new(**opts) + expect(r.running?).to be(false) + end + it 'is false after run is called with no services registered' do + opts = { + :a_channel_arg => 'an_arg', + :poll_period => 1, + :server_override => @server + } + r = RpcServer.new(**opts) + r.run() + expect(r.running?).to be(false) end - describe '#running?' do + it 'is true after run is called with a registered service' do + opts = { + :a_channel_arg => 'an_arg', + :poll_period => 1, + :server_override => @server + } + r = RpcServer.new(**opts) + r.handle(EchoService) + t = Thread.new { r.run } + r.wait_till_running + expect(r.running?).to be(true) + r.stop + t.join + end - it 'starts out false' do - opts = {:a_channel_arg => 'an_arg', :server_override => @server} - r = RpcServer.new(**opts) - expect(r.running?).to be(false) - end + end - it 'is false after run is called with no services registered' do - opts = { - :a_channel_arg => 'an_arg', - :poll_period => 1, - :server_override => @server - } - r = RpcServer.new(**opts) - r.run() - expect(r.running?).to be(false) - end + describe '#handle' do - it 'is true after run is called with a registered service' do - opts = { - :a_channel_arg => 'an_arg', - :poll_period => 1, - :server_override => @server - } - r = RpcServer.new(**opts) - r.handle(EchoService) - t = Thread.new { r.run } - r.wait_till_running - expect(r.running?).to be(true) - r.stop - t.join - end + before(:each) do + @opts = {:a_channel_arg => 'an_arg', :poll_period => 1} + @srv = RpcServer.new(**@opts) + end + it 'raises if #run has already been called' do + @srv.handle(EchoService) + t = Thread.new { @srv.run } + @srv.wait_till_running + expect { @srv.handle(EchoService) }.to raise_error + @srv.stop + t.join end - describe '#handle' do + it 'raises if the server has been run and stopped' do + @srv.handle(EchoService) + t = Thread.new { @srv.run } + @srv.wait_till_running + @srv.stop + t.join + expect { @srv.handle(EchoService) }.to raise_error + end - before(:each) do - @opts = {:a_channel_arg => 'an_arg', :poll_period => 1} - @srv = RpcServer.new(**@opts) - end + it 'raises if the service does not include GenericService ' do + expect { @srv.handle(Object) }.to raise_error + end + + it 'raises if the service does not declare any rpc methods' do + expect { @srv.handle(EmptyService) }.to raise_error + end + + it 'raises if the service does not define its rpc methods' do + expect { @srv.handle(NoRpcImplementation) }.to raise_error + end + + it 'raises if a handler method is already registered' do + @srv.handle(EchoService) + expect { r.handle(EchoService) }.to raise_error + end + + end + + describe '#run' do + + before(:each) do + @client_opts = { + :channel_override => @ch + } + @marshal = EchoService.rpc_descs[:an_rpc].marshal_proc + @unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) + server_opts = { + :server_override => @server, + :completion_queue_override => @server_queue, + :poll_period => 1 + } + @srv = RpcServer.new(**server_opts) + end - it 'raises if #run has already been called' do + describe 'when running' do + + it 'should return NOT_FOUND status for requests on unknown methods' do @srv.handle(EchoService) t = Thread.new { @srv.run } @srv.wait_till_running - expect { @srv.handle(EchoService) }.to raise_error + req = EchoMsg.new + blk = Proc.new do + cq = GRPC::Core::CompletionQueue.new + stub = GRPC::ClientStub.new(@host, cq, **@client_opts) + stub.request_response('/unknown', req, @marshal, @unmarshal) + end + expect(&blk).to raise_error BadStatus @srv.stop t.join end - it 'raises if the server has been run and stopped' do + it 'should obtain responses for multiple sequential requests' do @srv.handle(EchoService) t = Thread.new { @srv.run } @srv.wait_till_running + req = EchoMsg.new + n = 5 # arbitrary + stub = EchoStub.new(@host, **@client_opts) + n.times { |x| expect(stub.an_rpc(req)).to be_a(EchoMsg) } @srv.stop t.join - expect { @srv.handle(EchoService) }.to raise_error - end - - it 'raises if the service does not include GenericService ' do - expect { @srv.handle(Object) }.to raise_error - end - - it 'raises if the service does not declare any rpc methods' do - expect { @srv.handle(EmptyService) }.to raise_error - end - - it 'raises if the service does not define its rpc methods' do - expect { @srv.handle(NoRpcImplementation) }.to raise_error end - it 'raises if a handler method is already registered' do + it 'should obtain responses for multiple parallel requests' do @srv.handle(EchoService) - expect { r.handle(EchoService) }.to raise_error + t = Thread.new { @srv.run } + @srv.wait_till_running + req, q = EchoMsg.new, Queue.new + n = 5 # arbitrary + threads = [] + n.times do |x| + cq = GRPC::Core::CompletionQueue.new + threads << Thread.new do + stub = EchoStub.new(@host, **@client_opts) + q << stub.an_rpc(req) + end + end + n.times { expect(q.pop).to be_a(EchoMsg) } + @srv.stop + threads.each { |t| t.join } end - end - - describe '#run' do - - before(:each) do - @client_opts = { - :channel_override => @ch - } - @marshal = EchoService.rpc_descs[:an_rpc].marshal_proc - @unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) - server_opts = { + it 'should return UNAVAILABLE status if there too many jobs' do + opts = { + :a_channel_arg => 'an_arg', :server_override => @server, :completion_queue_override => @server_queue, - :poll_period => 1 + :pool_size => 1, + :poll_period => 1, + :max_waiting_requests => 0 } - @srv = RpcServer.new(**server_opts) - end - - describe 'when running' do - - it 'should return NOT_FOUND status for requests on unknown methods' do - @srv.handle(EchoService) - t = Thread.new { @srv.run } - @srv.wait_till_running - req = EchoMsg.new - blk = Proc.new do - cq = CompletionQueue.new - stub = ClientStub.new(@host, cq, **@client_opts) - stub.request_response('/unknown', req, @marshal, @unmarshal) - end - expect(&blk).to raise_error BadStatus - @srv.stop - t.join - end - - it 'should obtain responses for multiple sequential requests' do - @srv.handle(EchoService) - t = Thread.new { @srv.run } - @srv.wait_till_running - req = EchoMsg.new - n = 5 # arbitrary - stub = EchoStub.new(@host, **@client_opts) - n.times { |x| expect(stub.an_rpc(req)).to be_a(EchoMsg) } - @srv.stop - t.join - end - - it 'should obtain responses for multiple parallel requests' do - @srv.handle(EchoService) - t = Thread.new { @srv.run } - @srv.wait_till_running - req, q = EchoMsg.new, Queue.new - n = 5 # arbitrary - threads = [] - n.times do |x| - cq = CompletionQueue.new - threads << Thread.new do - stub = EchoStub.new(@host, **@client_opts) - q << stub.an_rpc(req) - end - end - n.times { expect(q.pop).to be_a(EchoMsg) } - @srv.stop - threads.each { |t| t.join } - end - - it 'should return UNAVAILABLE status if there too many jobs' do - opts = { - :a_channel_arg => 'an_arg', - :server_override => @server, - :completion_queue_override => @server_queue, - :pool_size => 1, - :poll_period => 1, - :max_waiting_requests => 0 - } - alt_srv = RpcServer.new(**opts) - alt_srv.handle(SlowService) - t = Thread.new { alt_srv.run } - alt_srv.wait_till_running - req = EchoMsg.new - n = 5 # arbitrary, use as many to ensure the server pool is exceeded - threads = [] - _1_failed_as_unavailable = false - n.times do |x| - threads << Thread.new do - cq = CompletionQueue.new - stub = SlowStub.new(@host, **@client_opts) - begin - stub.an_rpc(req) - rescue BadStatus => e - _1_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE - end + alt_srv = RpcServer.new(**opts) + alt_srv.handle(SlowService) + t = Thread.new { alt_srv.run } + alt_srv.wait_till_running + req = EchoMsg.new + n = 5 # arbitrary, use as many to ensure the server pool is exceeded + threads = [] + _1_failed_as_unavailable = false + n.times do |x| + threads << Thread.new do + cq = GRPC::Core::CompletionQueue.new + stub = SlowStub.new(@host, **@client_opts) + begin + stub.an_rpc(req) + rescue BadStatus => e + _1_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE end end - threads.each { |t| t.join } - alt_srv.stop - expect(_1_failed_as_unavailable).to be(true) end - + threads.each { |t| t.join } + alt_srv.stop + expect(_1_failed_as_unavailable).to be(true) end end diff --git a/src/ruby/spec/generic/service_spec.rb b/src/ruby/spec/generic/service_spec.rb index 4c76881bcf..dc921d8934 100644 --- a/src/ruby/spec/generic/service_spec.rb +++ b/src/ruby/spec/generic/service_spec.rb @@ -33,7 +33,7 @@ require 'grpc/generic/service' class GoodMsg - def marshal + def self.marshal(o) '' end @@ -43,7 +43,7 @@ class GoodMsg end class EncodeDecodeMsg - def encode + def self.encode(o) '' end @@ -53,7 +53,6 @@ class EncodeDecodeMsg end GenericService = GRPC::GenericService -RpcDesc = GRPC::RpcDesc Dsl = GenericService::Dsl @@ -95,7 +94,7 @@ describe GenericService do end expect(c.rpc_descs).to include(:AnRpc) - expect(c.rpc_descs[:AnRpc]).to be_a(RpcDesc) + expect(c.rpc_descs[:AnRpc]).to be_a(GRPC::RpcDesc) end it 'give subclasses access to #rpc_descs' do @@ -106,7 +105,7 @@ describe GenericService do c = Class.new(base) do end expect(c.rpc_descs).to include(:AnRpc) - expect(c.rpc_descs[:AnRpc]).to be_a(RpcDesc) + expect(c.rpc_descs[:AnRpc]).to be_a(GRPC::RpcDesc) end end @@ -189,7 +188,7 @@ describe GenericService do blk = Proc.new do Class.new do include GenericService - self.marshal_instance_method = :encode + self.marshal_class_method = :encode self.unmarshal_class_method = :decode rpc :AnRpc, EncodeDecodeMsg, EncodeDecodeMsg end |