diff options
Diffstat (limited to 'src/ruby/spec/generic/rpc_server_spec.rb')
-rw-r--r-- | src/ruby/spec/generic/rpc_server_spec.rb | 498 |
1 files changed, 263 insertions, 235 deletions
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 4e7379bc45..fc579a6c3f 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -35,8 +35,14 @@ require 'grpc/generic/service' require 'xray/thread_dump_signal_handler' require_relative '../port_picker' +def load_test_certs + test_root = File.join(File.parent(File.dirname(__FILE__)), 'testdata') + files = ['ca.pem', 'server1.key', 'server1.pem'] + files.map { |f| File.open(File.join(test_root, f)).read } +end + class EchoMsg - def marshal + def self.marshal(o) '' end @@ -86,302 +92,324 @@ end SlowStub = SlowService.rpc_stub_class -module GRPC +describe GRPC::RpcServer do - describe RpcServer do + RpcServer = GRPC::RpcServer - before(:each) do - @method = 'an_rpc_method' - @pass = 0 - @fail = 1 - @noop = Proc.new { |x| x } - - @server_queue = CompletionQueue.new - port = find_unused_tcp_port - @host = "localhost:#{port}" - @server = GRPC::Server.new(@server_queue, nil) - @server.add_http2_port(@host) - @ch = GRPC::Channel.new(@host, nil) - end + before(:each) do + @method = 'an_rpc_method' + @pass = 0 + @fail = 1 + @noop = Proc.new { |x| x } + + @server_queue = GRPC::Core::CompletionQueue.new + port = find_unused_tcp_port + @host = "localhost:#{port}" + @server = GRPC::Core::Server.new(@server_queue, nil) + @server.add_http2_port(@host) + @ch = GRPC::Core::Channel.new(@host, nil) + end + + after(:each) do + @server.close + end + + describe '#new' do - after(:each) do - @server.close + it 'can be created with just some args' do + opts = {:a_channel_arg => 'an_arg'} + blk = Proc.new do + RpcServer.new(**opts) + end + expect(&blk).not_to raise_error end - describe '#new' do + it 'can be created with a default deadline' do + opts = {:a_channel_arg => 'an_arg', :deadline => 5} + blk = Proc.new do + RpcServer.new(**opts) + end + expect(&blk).not_to raise_error + end - it 'can be created with just some args' do - opts = {:a_channel_arg => 'an_arg'} - blk = Proc.new do - RpcServer.new(**opts) - end - expect(&blk).not_to raise_error + it 'can be created with a completion queue override' do + opts = { + :a_channel_arg => 'an_arg', + :completion_queue_override => @server_queue + } + blk = Proc.new do + RpcServer.new(**opts) end + expect(&blk).not_to raise_error + end - it 'can be created with a default deadline' do - opts = {:a_channel_arg => 'an_arg', :deadline => 5} - blk = Proc.new do - RpcServer.new(**opts) - end - expect(&blk).not_to raise_error + it 'cannot be created with a bad completion queue override' do + blk = Proc.new do + opts = { + :a_channel_arg => 'an_arg', + :completion_queue_override => Object.new + } + RpcServer.new(**opts) end + expect(&blk).to raise_error + end - it 'can be created with a completion queue override' do + it 'cannot be created with invalid ServerCredentials' do + blk = Proc.new do opts = { :a_channel_arg => 'an_arg', - :completion_queue_override => @server_queue + :creds => Object.new } - blk = Proc.new do - RpcServer.new(**opts) - end - expect(&blk).not_to raise_error + RpcServer.new(**opts) end + expect(&blk).to raise_error + end - it 'cannot be created with a bad completion queue override' do - blk = Proc.new do - opts = { - :a_channel_arg => 'an_arg', - :completion_queue_override => Object.new - } - RpcServer.new(**opts) - end - expect(&blk).to raise_error + it 'can be created with the creds as valid ServerCedentials' do + certs = load_test_certs + server_creds = GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2]) + blk = Proc.new do + opts = { + :a_channel_arg => 'an_arg', + :creds => server_creds + } + RpcServer.new(**opts) end + expect(&blk).to_not raise_error + end - it 'can be created with a server override' do - opts = {:a_channel_arg => 'an_arg', :server_override => @server} - blk = Proc.new do - RpcServer.new(**opts) - end - expect(&blk).not_to raise_error + it 'can be created with a server override' do + opts = {:a_channel_arg => 'an_arg', :server_override => @server} + blk = Proc.new do + RpcServer.new(**opts) end + expect(&blk).not_to raise_error + end - it 'cannot be created with a bad server override' do - blk = Proc.new do - opts = { - :a_channel_arg => 'an_arg', - :server_override => Object.new - } - RpcServer.new(**opts) - end - expect(&blk).to raise_error + it 'cannot be created with a bad server override' do + blk = Proc.new do + opts = { + :a_channel_arg => 'an_arg', + :server_override => Object.new + } + RpcServer.new(**opts) end + expect(&blk).to raise_error + end + + end + describe '#stopped?' do + + before(:each) do + opts = {:a_channel_arg => 'an_arg', :poll_period => 1} + @srv = RpcServer.new(**opts) end - describe '#stopped?' do + it 'starts out false' do + expect(@srv.stopped?).to be(false) + end - before(:each) do - opts = {:a_channel_arg => 'an_arg', :poll_period => 1} - @srv = RpcServer.new(**opts) - end + it 'stays false after a #stop is called before #run' do + @srv.stop + expect(@srv.stopped?).to be(false) + end - it 'starts out false' do - expect(@srv.stopped?).to be(false) - end + it 'stays false after the server starts running' do + @srv.handle(EchoService) + t = Thread.new { @srv.run } + @srv.wait_till_running + expect(@srv.stopped?).to be(false) + @srv.stop + t.join + end - it 'stays false after a #stop is called before #run' do - @srv.stop - expect(@srv.stopped?).to be(false) - end + it 'is true after a running server is stopped' do + @srv.handle(EchoService) + t = Thread.new { @srv.run } + @srv.wait_till_running + @srv.stop + expect(@srv.stopped?).to be(true) + t.join + end - it 'stays false after the server starts running' do - @srv.handle(EchoService) - t = Thread.new { @srv.run } - @srv.wait_till_running - expect(@srv.stopped?).to be(false) - @srv.stop - t.join - end + end - it 'is true after a running server is stopped' do - @srv.handle(EchoService) - t = Thread.new { @srv.run } - @srv.wait_till_running - @srv.stop - expect(@srv.stopped?).to be(true) - t.join - end + describe '#running?' do + + it 'starts out false' do + opts = {:a_channel_arg => 'an_arg', :server_override => @server} + r = RpcServer.new(**opts) + expect(r.running?).to be(false) + end + it 'is false after run is called with no services registered' do + opts = { + :a_channel_arg => 'an_arg', + :poll_period => 1, + :server_override => @server + } + r = RpcServer.new(**opts) + r.run() + expect(r.running?).to be(false) end - describe '#running?' do + it 'is true after run is called with a registered service' do + opts = { + :a_channel_arg => 'an_arg', + :poll_period => 1, + :server_override => @server + } + r = RpcServer.new(**opts) + r.handle(EchoService) + t = Thread.new { r.run } + r.wait_till_running + expect(r.running?).to be(true) + r.stop + t.join + end - it 'starts out false' do - opts = {:a_channel_arg => 'an_arg', :server_override => @server} - r = RpcServer.new(**opts) - expect(r.running?).to be(false) - end + end - it 'is false after run is called with no services registered' do - opts = { - :a_channel_arg => 'an_arg', - :poll_period => 1, - :server_override => @server - } - r = RpcServer.new(**opts) - r.run() - expect(r.running?).to be(false) - end + describe '#handle' do - it 'is true after run is called with a registered service' do - opts = { - :a_channel_arg => 'an_arg', - :poll_period => 1, - :server_override => @server - } - r = RpcServer.new(**opts) - r.handle(EchoService) - t = Thread.new { r.run } - r.wait_till_running - expect(r.running?).to be(true) - r.stop - t.join - end + before(:each) do + @opts = {:a_channel_arg => 'an_arg', :poll_period => 1} + @srv = RpcServer.new(**@opts) + end + it 'raises if #run has already been called' do + @srv.handle(EchoService) + t = Thread.new { @srv.run } + @srv.wait_till_running + expect { @srv.handle(EchoService) }.to raise_error + @srv.stop + t.join end - describe '#handle' do + it 'raises if the server has been run and stopped' do + @srv.handle(EchoService) + t = Thread.new { @srv.run } + @srv.wait_till_running + @srv.stop + t.join + expect { @srv.handle(EchoService) }.to raise_error + end - before(:each) do - @opts = {:a_channel_arg => 'an_arg', :poll_period => 1} - @srv = RpcServer.new(**@opts) - end + it 'raises if the service does not include GenericService ' do + expect { @srv.handle(Object) }.to raise_error + end + + it 'raises if the service does not declare any rpc methods' do + expect { @srv.handle(EmptyService) }.to raise_error + end + + it 'raises if the service does not define its rpc methods' do + expect { @srv.handle(NoRpcImplementation) }.to raise_error + end + + it 'raises if a handler method is already registered' do + @srv.handle(EchoService) + expect { r.handle(EchoService) }.to raise_error + end + + end + + describe '#run' do + + before(:each) do + @client_opts = { + :channel_override => @ch + } + @marshal = EchoService.rpc_descs[:an_rpc].marshal_proc + @unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) + server_opts = { + :server_override => @server, + :completion_queue_override => @server_queue, + :poll_period => 1 + } + @srv = RpcServer.new(**server_opts) + end - it 'raises if #run has already been called' do + describe 'when running' do + + it 'should return NOT_FOUND status for requests on unknown methods' do @srv.handle(EchoService) t = Thread.new { @srv.run } @srv.wait_till_running - expect { @srv.handle(EchoService) }.to raise_error + req = EchoMsg.new + blk = Proc.new do + cq = GRPC::Core::CompletionQueue.new + stub = GRPC::ClientStub.new(@host, cq, **@client_opts) + stub.request_response('/unknown', req, @marshal, @unmarshal) + end + expect(&blk).to raise_error BadStatus @srv.stop t.join end - it 'raises if the server has been run and stopped' do + it 'should obtain responses for multiple sequential requests' do @srv.handle(EchoService) t = Thread.new { @srv.run } @srv.wait_till_running + req = EchoMsg.new + n = 5 # arbitrary + stub = EchoStub.new(@host, **@client_opts) + n.times { |x| expect(stub.an_rpc(req)).to be_a(EchoMsg) } @srv.stop t.join - expect { @srv.handle(EchoService) }.to raise_error - end - - it 'raises if the service does not include GenericService ' do - expect { @srv.handle(Object) }.to raise_error - end - - it 'raises if the service does not declare any rpc methods' do - expect { @srv.handle(EmptyService) }.to raise_error - end - - it 'raises if the service does not define its rpc methods' do - expect { @srv.handle(NoRpcImplementation) }.to raise_error end - it 'raises if a handler method is already registered' do + it 'should obtain responses for multiple parallel requests' do @srv.handle(EchoService) - expect { r.handle(EchoService) }.to raise_error + t = Thread.new { @srv.run } + @srv.wait_till_running + req, q = EchoMsg.new, Queue.new + n = 5 # arbitrary + threads = [] + n.times do |x| + cq = GRPC::Core::CompletionQueue.new + threads << Thread.new do + stub = EchoStub.new(@host, **@client_opts) + q << stub.an_rpc(req) + end + end + n.times { expect(q.pop).to be_a(EchoMsg) } + @srv.stop + threads.each { |t| t.join } end - end - - describe '#run' do - - before(:each) do - @client_opts = { - :channel_override => @ch - } - @marshal = EchoService.rpc_descs[:an_rpc].marshal_proc - @unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) - server_opts = { + it 'should return UNAVAILABLE status if there too many jobs' do + opts = { + :a_channel_arg => 'an_arg', :server_override => @server, :completion_queue_override => @server_queue, - :poll_period => 1 + :pool_size => 1, + :poll_period => 1, + :max_waiting_requests => 0 } - @srv = RpcServer.new(**server_opts) - end - - describe 'when running' do - - it 'should return NOT_FOUND status for requests on unknown methods' do - @srv.handle(EchoService) - t = Thread.new { @srv.run } - @srv.wait_till_running - req = EchoMsg.new - blk = Proc.new do - cq = CompletionQueue.new - stub = ClientStub.new(@host, cq, **@client_opts) - stub.request_response('/unknown', req, @marshal, @unmarshal) - end - expect(&blk).to raise_error BadStatus - @srv.stop - t.join - end - - it 'should obtain responses for multiple sequential requests' do - @srv.handle(EchoService) - t = Thread.new { @srv.run } - @srv.wait_till_running - req = EchoMsg.new - n = 5 # arbitrary - stub = EchoStub.new(@host, **@client_opts) - n.times { |x| expect(stub.an_rpc(req)).to be_a(EchoMsg) } - @srv.stop - t.join - end - - it 'should obtain responses for multiple parallel requests' do - @srv.handle(EchoService) - t = Thread.new { @srv.run } - @srv.wait_till_running - req, q = EchoMsg.new, Queue.new - n = 5 # arbitrary - threads = [] - n.times do |x| - cq = CompletionQueue.new - threads << Thread.new do - stub = EchoStub.new(@host, **@client_opts) - q << stub.an_rpc(req) - end - end - n.times { expect(q.pop).to be_a(EchoMsg) } - @srv.stop - threads.each { |t| t.join } - end - - it 'should return UNAVAILABLE status if there too many jobs' do - opts = { - :a_channel_arg => 'an_arg', - :server_override => @server, - :completion_queue_override => @server_queue, - :pool_size => 1, - :poll_period => 1, - :max_waiting_requests => 0 - } - alt_srv = RpcServer.new(**opts) - alt_srv.handle(SlowService) - t = Thread.new { alt_srv.run } - alt_srv.wait_till_running - req = EchoMsg.new - n = 5 # arbitrary, use as many to ensure the server pool is exceeded - threads = [] - _1_failed_as_unavailable = false - n.times do |x| - threads << Thread.new do - cq = CompletionQueue.new - stub = SlowStub.new(@host, **@client_opts) - begin - stub.an_rpc(req) - rescue BadStatus => e - _1_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE - end + alt_srv = RpcServer.new(**opts) + alt_srv.handle(SlowService) + t = Thread.new { alt_srv.run } + alt_srv.wait_till_running + req = EchoMsg.new + n = 5 # arbitrary, use as many to ensure the server pool is exceeded + threads = [] + _1_failed_as_unavailable = false + n.times do |x| + threads << Thread.new do + cq = GRPC::Core::CompletionQueue.new + stub = SlowStub.new(@host, **@client_opts) + begin + stub.an_rpc(req) + rescue BadStatus => e + _1_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE end end - threads.each { |t| t.join } - alt_srv.stop - expect(_1_failed_as_unavailable).to be(true) end - + threads.each { |t| t.join } + alt_srv.stop + expect(_1_failed_as_unavailable).to be(true) end end |