aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/spec/generic/rpc_server_spec.rb
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/spec/generic/rpc_server_spec.rb')
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb498
1 files changed, 263 insertions, 235 deletions
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 4e7379bc45..fc579a6c3f 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -35,8 +35,14 @@ require 'grpc/generic/service'
require 'xray/thread_dump_signal_handler'
require_relative '../port_picker'
+def load_test_certs
+ test_root = File.join(File.parent(File.dirname(__FILE__)), 'testdata')
+ files = ['ca.pem', 'server1.key', 'server1.pem']
+ files.map { |f| File.open(File.join(test_root, f)).read }
+end
+
class EchoMsg
- def marshal
+ def self.marshal(o)
''
end
@@ -86,302 +92,324 @@ end
SlowStub = SlowService.rpc_stub_class
-module GRPC
+describe GRPC::RpcServer do
- describe RpcServer do
+ RpcServer = GRPC::RpcServer
- before(:each) do
- @method = 'an_rpc_method'
- @pass = 0
- @fail = 1
- @noop = Proc.new { |x| x }
-
- @server_queue = CompletionQueue.new
- port = find_unused_tcp_port
- @host = "localhost:#{port}"
- @server = GRPC::Server.new(@server_queue, nil)
- @server.add_http2_port(@host)
- @ch = GRPC::Channel.new(@host, nil)
- end
+ before(:each) do
+ @method = 'an_rpc_method'
+ @pass = 0
+ @fail = 1
+ @noop = Proc.new { |x| x }
+
+ @server_queue = GRPC::Core::CompletionQueue.new
+ port = find_unused_tcp_port
+ @host = "localhost:#{port}"
+ @server = GRPC::Core::Server.new(@server_queue, nil)
+ @server.add_http2_port(@host)
+ @ch = GRPC::Core::Channel.new(@host, nil)
+ end
+
+ after(:each) do
+ @server.close
+ end
+
+ describe '#new' do
- after(:each) do
- @server.close
+ it 'can be created with just some args' do
+ opts = {:a_channel_arg => 'an_arg'}
+ blk = Proc.new do
+ RpcServer.new(**opts)
+ end
+ expect(&blk).not_to raise_error
end
- describe '#new' do
+ it 'can be created with a default deadline' do
+ opts = {:a_channel_arg => 'an_arg', :deadline => 5}
+ blk = Proc.new do
+ RpcServer.new(**opts)
+ end
+ expect(&blk).not_to raise_error
+ end
- it 'can be created with just some args' do
- opts = {:a_channel_arg => 'an_arg'}
- blk = Proc.new do
- RpcServer.new(**opts)
- end
- expect(&blk).not_to raise_error
+ it 'can be created with a completion queue override' do
+ opts = {
+ :a_channel_arg => 'an_arg',
+ :completion_queue_override => @server_queue
+ }
+ blk = Proc.new do
+ RpcServer.new(**opts)
end
+ expect(&blk).not_to raise_error
+ end
- it 'can be created with a default deadline' do
- opts = {:a_channel_arg => 'an_arg', :deadline => 5}
- blk = Proc.new do
- RpcServer.new(**opts)
- end
- expect(&blk).not_to raise_error
+ it 'cannot be created with a bad completion queue override' do
+ blk = Proc.new do
+ opts = {
+ :a_channel_arg => 'an_arg',
+ :completion_queue_override => Object.new
+ }
+ RpcServer.new(**opts)
end
+ expect(&blk).to raise_error
+ end
- it 'can be created with a completion queue override' do
+ it 'cannot be created with invalid ServerCredentials' do
+ blk = Proc.new do
opts = {
:a_channel_arg => 'an_arg',
- :completion_queue_override => @server_queue
+ :creds => Object.new
}
- blk = Proc.new do
- RpcServer.new(**opts)
- end
- expect(&blk).not_to raise_error
+ RpcServer.new(**opts)
end
+ expect(&blk).to raise_error
+ end
- it 'cannot be created with a bad completion queue override' do
- blk = Proc.new do
- opts = {
- :a_channel_arg => 'an_arg',
- :completion_queue_override => Object.new
- }
- RpcServer.new(**opts)
- end
- expect(&blk).to raise_error
+ it 'can be created with the creds as valid ServerCedentials' do
+ certs = load_test_certs
+ server_creds = GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2])
+ blk = Proc.new do
+ opts = {
+ :a_channel_arg => 'an_arg',
+ :creds => server_creds
+ }
+ RpcServer.new(**opts)
end
+ expect(&blk).to_not raise_error
+ end
- it 'can be created with a server override' do
- opts = {:a_channel_arg => 'an_arg', :server_override => @server}
- blk = Proc.new do
- RpcServer.new(**opts)
- end
- expect(&blk).not_to raise_error
+ it 'can be created with a server override' do
+ opts = {:a_channel_arg => 'an_arg', :server_override => @server}
+ blk = Proc.new do
+ RpcServer.new(**opts)
end
+ expect(&blk).not_to raise_error
+ end
- it 'cannot be created with a bad server override' do
- blk = Proc.new do
- opts = {
- :a_channel_arg => 'an_arg',
- :server_override => Object.new
- }
- RpcServer.new(**opts)
- end
- expect(&blk).to raise_error
+ it 'cannot be created with a bad server override' do
+ blk = Proc.new do
+ opts = {
+ :a_channel_arg => 'an_arg',
+ :server_override => Object.new
+ }
+ RpcServer.new(**opts)
end
+ expect(&blk).to raise_error
+ end
+
+ end
+ describe '#stopped?' do
+
+ before(:each) do
+ opts = {:a_channel_arg => 'an_arg', :poll_period => 1}
+ @srv = RpcServer.new(**opts)
end
- describe '#stopped?' do
+ it 'starts out false' do
+ expect(@srv.stopped?).to be(false)
+ end
- before(:each) do
- opts = {:a_channel_arg => 'an_arg', :poll_period => 1}
- @srv = RpcServer.new(**opts)
- end
+ it 'stays false after a #stop is called before #run' do
+ @srv.stop
+ expect(@srv.stopped?).to be(false)
+ end
- it 'starts out false' do
- expect(@srv.stopped?).to be(false)
- end
+ it 'stays false after the server starts running' do
+ @srv.handle(EchoService)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ expect(@srv.stopped?).to be(false)
+ @srv.stop
+ t.join
+ end
- it 'stays false after a #stop is called before #run' do
- @srv.stop
- expect(@srv.stopped?).to be(false)
- end
+ it 'is true after a running server is stopped' do
+ @srv.handle(EchoService)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ @srv.stop
+ expect(@srv.stopped?).to be(true)
+ t.join
+ end
- it 'stays false after the server starts running' do
- @srv.handle(EchoService)
- t = Thread.new { @srv.run }
- @srv.wait_till_running
- expect(@srv.stopped?).to be(false)
- @srv.stop
- t.join
- end
+ end
- it 'is true after a running server is stopped' do
- @srv.handle(EchoService)
- t = Thread.new { @srv.run }
- @srv.wait_till_running
- @srv.stop
- expect(@srv.stopped?).to be(true)
- t.join
- end
+ describe '#running?' do
+
+ it 'starts out false' do
+ opts = {:a_channel_arg => 'an_arg', :server_override => @server}
+ r = RpcServer.new(**opts)
+ expect(r.running?).to be(false)
+ end
+ it 'is false after run is called with no services registered' do
+ opts = {
+ :a_channel_arg => 'an_arg',
+ :poll_period => 1,
+ :server_override => @server
+ }
+ r = RpcServer.new(**opts)
+ r.run()
+ expect(r.running?).to be(false)
end
- describe '#running?' do
+ it 'is true after run is called with a registered service' do
+ opts = {
+ :a_channel_arg => 'an_arg',
+ :poll_period => 1,
+ :server_override => @server
+ }
+ r = RpcServer.new(**opts)
+ r.handle(EchoService)
+ t = Thread.new { r.run }
+ r.wait_till_running
+ expect(r.running?).to be(true)
+ r.stop
+ t.join
+ end
- it 'starts out false' do
- opts = {:a_channel_arg => 'an_arg', :server_override => @server}
- r = RpcServer.new(**opts)
- expect(r.running?).to be(false)
- end
+ end
- it 'is false after run is called with no services registered' do
- opts = {
- :a_channel_arg => 'an_arg',
- :poll_period => 1,
- :server_override => @server
- }
- r = RpcServer.new(**opts)
- r.run()
- expect(r.running?).to be(false)
- end
+ describe '#handle' do
- it 'is true after run is called with a registered service' do
- opts = {
- :a_channel_arg => 'an_arg',
- :poll_period => 1,
- :server_override => @server
- }
- r = RpcServer.new(**opts)
- r.handle(EchoService)
- t = Thread.new { r.run }
- r.wait_till_running
- expect(r.running?).to be(true)
- r.stop
- t.join
- end
+ before(:each) do
+ @opts = {:a_channel_arg => 'an_arg', :poll_period => 1}
+ @srv = RpcServer.new(**@opts)
+ end
+ it 'raises if #run has already been called' do
+ @srv.handle(EchoService)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ expect { @srv.handle(EchoService) }.to raise_error
+ @srv.stop
+ t.join
end
- describe '#handle' do
+ it 'raises if the server has been run and stopped' do
+ @srv.handle(EchoService)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ @srv.stop
+ t.join
+ expect { @srv.handle(EchoService) }.to raise_error
+ end
- before(:each) do
- @opts = {:a_channel_arg => 'an_arg', :poll_period => 1}
- @srv = RpcServer.new(**@opts)
- end
+ it 'raises if the service does not include GenericService ' do
+ expect { @srv.handle(Object) }.to raise_error
+ end
+
+ it 'raises if the service does not declare any rpc methods' do
+ expect { @srv.handle(EmptyService) }.to raise_error
+ end
+
+ it 'raises if the service does not define its rpc methods' do
+ expect { @srv.handle(NoRpcImplementation) }.to raise_error
+ end
+
+ it 'raises if a handler method is already registered' do
+ @srv.handle(EchoService)
+ expect { r.handle(EchoService) }.to raise_error
+ end
+
+ end
+
+ describe '#run' do
+
+ before(:each) do
+ @client_opts = {
+ :channel_override => @ch
+ }
+ @marshal = EchoService.rpc_descs[:an_rpc].marshal_proc
+ @unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output)
+ server_opts = {
+ :server_override => @server,
+ :completion_queue_override => @server_queue,
+ :poll_period => 1
+ }
+ @srv = RpcServer.new(**server_opts)
+ end
- it 'raises if #run has already been called' do
+ describe 'when running' do
+
+ it 'should return NOT_FOUND status for requests on unknown methods' do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@srv.wait_till_running
- expect { @srv.handle(EchoService) }.to raise_error
+ req = EchoMsg.new
+ blk = Proc.new do
+ cq = GRPC::Core::CompletionQueue.new
+ stub = GRPC::ClientStub.new(@host, cq, **@client_opts)
+ stub.request_response('/unknown', req, @marshal, @unmarshal)
+ end
+ expect(&blk).to raise_error BadStatus
@srv.stop
t.join
end
- it 'raises if the server has been run and stopped' do
+ it 'should obtain responses for multiple sequential requests' do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@srv.wait_till_running
+ req = EchoMsg.new
+ n = 5 # arbitrary
+ stub = EchoStub.new(@host, **@client_opts)
+ n.times { |x| expect(stub.an_rpc(req)).to be_a(EchoMsg) }
@srv.stop
t.join
- expect { @srv.handle(EchoService) }.to raise_error
- end
-
- it 'raises if the service does not include GenericService ' do
- expect { @srv.handle(Object) }.to raise_error
- end
-
- it 'raises if the service does not declare any rpc methods' do
- expect { @srv.handle(EmptyService) }.to raise_error
- end
-
- it 'raises if the service does not define its rpc methods' do
- expect { @srv.handle(NoRpcImplementation) }.to raise_error
end
- it 'raises if a handler method is already registered' do
+ it 'should obtain responses for multiple parallel requests' do
@srv.handle(EchoService)
- expect { r.handle(EchoService) }.to raise_error
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req, q = EchoMsg.new, Queue.new
+ n = 5 # arbitrary
+ threads = []
+ n.times do |x|
+ cq = GRPC::Core::CompletionQueue.new
+ threads << Thread.new do
+ stub = EchoStub.new(@host, **@client_opts)
+ q << stub.an_rpc(req)
+ end
+ end
+ n.times { expect(q.pop).to be_a(EchoMsg) }
+ @srv.stop
+ threads.each { |t| t.join }
end
- end
-
- describe '#run' do
-
- before(:each) do
- @client_opts = {
- :channel_override => @ch
- }
- @marshal = EchoService.rpc_descs[:an_rpc].marshal_proc
- @unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output)
- server_opts = {
+ it 'should return UNAVAILABLE status if there too many jobs' do
+ opts = {
+ :a_channel_arg => 'an_arg',
:server_override => @server,
:completion_queue_override => @server_queue,
- :poll_period => 1
+ :pool_size => 1,
+ :poll_period => 1,
+ :max_waiting_requests => 0
}
- @srv = RpcServer.new(**server_opts)
- end
-
- describe 'when running' do
-
- it 'should return NOT_FOUND status for requests on unknown methods' do
- @srv.handle(EchoService)
- t = Thread.new { @srv.run }
- @srv.wait_till_running
- req = EchoMsg.new
- blk = Proc.new do
- cq = CompletionQueue.new
- stub = ClientStub.new(@host, cq, **@client_opts)
- stub.request_response('/unknown', req, @marshal, @unmarshal)
- end
- expect(&blk).to raise_error BadStatus
- @srv.stop
- t.join
- end
-
- it 'should obtain responses for multiple sequential requests' do
- @srv.handle(EchoService)
- t = Thread.new { @srv.run }
- @srv.wait_till_running
- req = EchoMsg.new
- n = 5 # arbitrary
- stub = EchoStub.new(@host, **@client_opts)
- n.times { |x| expect(stub.an_rpc(req)).to be_a(EchoMsg) }
- @srv.stop
- t.join
- end
-
- it 'should obtain responses for multiple parallel requests' do
- @srv.handle(EchoService)
- t = Thread.new { @srv.run }
- @srv.wait_till_running
- req, q = EchoMsg.new, Queue.new
- n = 5 # arbitrary
- threads = []
- n.times do |x|
- cq = CompletionQueue.new
- threads << Thread.new do
- stub = EchoStub.new(@host, **@client_opts)
- q << stub.an_rpc(req)
- end
- end
- n.times { expect(q.pop).to be_a(EchoMsg) }
- @srv.stop
- threads.each { |t| t.join }
- end
-
- it 'should return UNAVAILABLE status if there too many jobs' do
- opts = {
- :a_channel_arg => 'an_arg',
- :server_override => @server,
- :completion_queue_override => @server_queue,
- :pool_size => 1,
- :poll_period => 1,
- :max_waiting_requests => 0
- }
- alt_srv = RpcServer.new(**opts)
- alt_srv.handle(SlowService)
- t = Thread.new { alt_srv.run }
- alt_srv.wait_till_running
- req = EchoMsg.new
- n = 5 # arbitrary, use as many to ensure the server pool is exceeded
- threads = []
- _1_failed_as_unavailable = false
- n.times do |x|
- threads << Thread.new do
- cq = CompletionQueue.new
- stub = SlowStub.new(@host, **@client_opts)
- begin
- stub.an_rpc(req)
- rescue BadStatus => e
- _1_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE
- end
+ alt_srv = RpcServer.new(**opts)
+ alt_srv.handle(SlowService)
+ t = Thread.new { alt_srv.run }
+ alt_srv.wait_till_running
+ req = EchoMsg.new
+ n = 5 # arbitrary, use as many to ensure the server pool is exceeded
+ threads = []
+ _1_failed_as_unavailable = false
+ n.times do |x|
+ threads << Thread.new do
+ cq = GRPC::Core::CompletionQueue.new
+ stub = SlowStub.new(@host, **@client_opts)
+ begin
+ stub.an_rpc(req)
+ rescue BadStatus => e
+ _1_failed_as_unavailable = e.code == StatusCodes::UNAVAILABLE
end
end
- threads.each { |t| t.join }
- alt_srv.stop
- expect(_1_failed_as_unavailable).to be(true)
end
-
+ threads.each { |t| t.join }
+ alt_srv.stop
+ expect(_1_failed_as_unavailable).to be(true)
end
end