aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby')
-rw-r--r--src/ruby/ext/grpc/rb_call.c1
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c12
-rw-r--r--src/ruby/ext/grpc/rb_server.c64
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb13
-rw-r--r--src/ruby/spec/client_server_spec.rb9
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb2
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb3
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb12
-rw-r--r--src/ruby/spec/server_spec.rb22
9 files changed, 92 insertions, 46 deletions
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 29f870f929..33bfd006da 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -507,6 +507,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
NUM2INT(this_op));
};
st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
+ st->ops[st->op_num].flags = 0;
st->op_num++;
}
}
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index fa4c566004..8fb3949b3d 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -142,8 +142,16 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
MEMZERO(&next_call, next_call_stack, 1);
TypedData_Get_Struct(self, grpc_completion_queue,
&grpc_rb_completion_queue_data_type, next_call.cq);
- next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
- next_call.tag = ROBJECT(tag);
+ if (TYPE(timeout) == T_NIL) {
+ next_call.timeout = gpr_inf_future;
+ } else {
+ next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
+ }
+ if (TYPE(tag) == T_NIL) {
+ next_call.tag = NULL;
+ } else {
+ next_call.tag = ROBJECT(tag);
+ }
next_call.event.type = GRPC_QUEUE_TIMEOUT;
rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
(void *)&next_call, NULL, NULL);
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 837ca3b5e8..9c0d24bf8f 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -210,7 +210,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
VALUE result;
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped == NULL) {
- rb_raise(rb_eRuntimeError, "closed!");
+ rb_raise(rb_eRuntimeError, "destroyed!");
return Qnil;
} else {
grpc_request_call_stack_init(&st);
@@ -259,21 +259,69 @@ static VALUE grpc_rb_server_start(VALUE self) {
grpc_rb_server *s = NULL;
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped == NULL) {
- rb_raise(rb_eRuntimeError, "closed!");
+ rb_raise(rb_eRuntimeError, "destroyed!");
} else {
grpc_server_start(s->wrapped);
}
return Qnil;
}
-static VALUE grpc_rb_server_destroy(VALUE self) {
+/*
+ call-seq:
+ cq = CompletionQueue.new
+ server = Server.new(cq, {'arg1': 'value1'})
+ ... // do stuff with server
+ ...
+ ... // to shutdown the server
+ server.destroy(cq)
+
+ ... // to shutdown the server with a timeout
+ server.destroy(cq, timeout)
+
+ Destroys server instances. */
+static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) {
+ VALUE cqueue = Qnil;
+ VALUE timeout = Qnil;
+ grpc_completion_queue *cq = NULL;
+ grpc_event ev;
grpc_rb_server *s = NULL;
+
+ /* "11" == 1 mandatory args, 1 (timeout) is optional */
+ rb_scan_args(argc, argv, "11", &cqueue, &timeout);
+ cq = grpc_rb_get_wrapped_completion_queue(cqueue);
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
+
if (s->wrapped != NULL) {
- grpc_server_shutdown(s->wrapped);
+ grpc_server_shutdown_and_notify(s->wrapped, cq, NULL);
+ ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout);
+
+ if (!ev.success) {
+ rb_warn("server shutdown failed, there will be a LEAKED object warning");
+ return Qnil;
+ /*
+ TODO: renable the rb_raise below.
+
+ At the moment if the timeout is INFINITE_FUTURE as recommended, the
+ pluck blocks forever, even though
+
+ the outstanding server_request_calls correctly fail on the other
+ thread that they are running on.
+
+ it's almost as if calls that fail on the other thread do not get
+ cleaned up by shutdown request, even though it caused htem to
+ terminate.
+
+ rb_raise(rb_eRuntimeError, "grpc server shutdown did not succeed");
+ return Qnil;
+
+ The workaround is just to use a timeout and return without really
+ shutting down the server, and rely on the grpc core garbage collection
+ it down as a 'LEAKED OBJECT'.
+
+ */
+ }
grpc_server_destroy(s->wrapped);
s->wrapped = NULL;
- s->mark = Qnil;
}
return Qnil;
}
@@ -302,7 +350,7 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) {
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped == NULL) {
- rb_raise(rb_eRuntimeError, "closed!");
+ rb_raise(rb_eRuntimeError, "destroyed!");
return Qnil;
} else if (rb_creds == Qnil) {
recvd_port = grpc_server_add_http2_port(s->wrapped, StringValueCStr(port));
@@ -315,7 +363,7 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) {
creds = grpc_rb_get_wrapped_server_credentials(rb_creds);
recvd_port =
grpc_server_add_secure_http2_port(s->wrapped, StringValueCStr(port),
- creds);
+ creds);
if (recvd_port == 0) {
rb_raise(rb_eRuntimeError,
"could not add secure port %s to server, not sure why",
@@ -341,7 +389,7 @@ void Init_grpc_server() {
rb_define_method(grpc_rb_cServer, "request_call",
grpc_rb_server_request_call, 3);
rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
- rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0);
+ rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1);
rb_define_alias(grpc_rb_cServer, "close", "destroy");
rb_define_method(grpc_rb_cServer, "add_http2_port",
grpc_rb_server_add_http2_port,
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index dcb11bfbef..a7e20d6b82 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -278,7 +278,9 @@ module GRPC
@stopped = true
end
@pool.stop
- @server.close
+ deadline = from_relative_time(@poll_period)
+
+ @server.close(@cq, deadline)
end
# determines if the server has been stopped
@@ -410,17 +412,18 @@ module GRPC
# handles calls to the server
def loop_handle_server_calls
fail 'not running' unless @running
- request_call_tag = Object.new
+ loop_tag = Object.new
until stopped?
deadline = from_relative_time(@poll_period)
begin
- an_rpc = @server.request_call(@cq, request_call_tag, deadline)
+ an_rpc = @server.request_call(@cq, loop_tag, deadline)
+ c = new_active_server_call(an_rpc)
rescue Core::CallError, RuntimeError => e
- # can happen during server shutdown
+ # these might happen for various reasonse. The correct behaviour of
+ # the server is to log them and continue.
GRPC.logger.warn("server call failed: #{e}")
next
end
- c = new_active_server_call(an_rpc)
unless c.nil?
mth = an_rpc.method.to_sym
@pool.schedule(c) do |call|
diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb
index b247882241..0e85441209 100644
--- a/src/ruby/spec/client_server_spec.rb
+++ b/src/ruby/spec/client_server_spec.rb
@@ -42,11 +42,8 @@ shared_context 'setup: tags' do
let(:sent_message) { 'sent message' }
let(:reply_text) { 'the reply' }
before(:example) do
- @server_finished_tag = Object.new
- @client_finished_tag = Object.new
- @client_metadata_tag = Object.new
+ @client_tag = Object.new
@server_tag = Object.new
- @tag = Object.new
end
def deadline
@@ -395,7 +392,7 @@ describe 'the http client/server' do
after(:example) do
@ch.close
- @server.close
+ @server.close(@server_queue, deadline)
end
it_behaves_like 'basic GRPC message delivery is OK' do
@@ -421,7 +418,7 @@ describe 'the secure http client/server' do
end
after(:example) do
- @server.close
+ @server.close(@server_queue, deadline)
end
it_behaves_like 'basic GRPC message delivery is OK' do
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 575871afb1..bc3bee3d44 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -51,7 +51,7 @@ describe GRPC::ActiveCall do
end
after(:each) do
- @server.close
+ @server.close(@server_queue, deadline)
end
describe 'restricted view methods' do
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 98d68ccfbb..68d4b11790 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -54,6 +54,7 @@ describe 'ClientStub' do
before(:each) do
Thread.abort_on_exception = true
@server = nil
+ @server_queue = nil
@method = 'an_rpc_method'
@pass = OK
@fail = INTERNAL
@@ -61,7 +62,7 @@ describe 'ClientStub' do
end
after(:each) do
- @server.close unless @server.nil?
+ @server.close(@server_queue) unless @server_queue.nil?
end
describe '#new' do
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index e60a8b27c3..f2403de77c 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -136,10 +136,6 @@ describe GRPC::RpcServer do
@ch = GRPC::Core::Channel.new(@host, nil)
end
- after(:each) do
- @server.close
- end
-
describe '#new' do
it 'can be created with just some args' do
opts = { a_channel_arg: 'an_arg' }
@@ -344,10 +340,6 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts)
end
- after(:each) do
- @srv.stop
- end
-
it 'should return NOT_FOUND status on unknown methods', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@@ -527,10 +519,6 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts)
end
- after(:each) do
- @srv.stop
- end
-
it 'should send connect metadata to the client', server: true do
service = EchoService.new
@srv.handle(service)
diff --git a/src/ruby/spec/server_spec.rb b/src/ruby/spec/server_spec.rb
index bb566d1b1f..47fe575343 100644
--- a/src/ruby/spec/server_spec.rb
+++ b/src/ruby/spec/server_spec.rb
@@ -54,7 +54,7 @@ describe Server do
it 'fails if the server is closed' do
s = Server.new(@cq, nil)
- s.close
+ s.close(@cq)
expect { s.start }.to raise_error(RuntimeError)
end
end
@@ -62,19 +62,19 @@ describe Server do
describe '#destroy' do
it 'destroys a server ok' do
s = start_a_server
- blk = proc { s.destroy }
+ blk = proc { s.destroy(@cq) }
expect(&blk).to_not raise_error
end
it 'can be called more than once without error' do
s = start_a_server
begin
- blk = proc { s.destroy }
+ blk = proc { s.destroy(@cq) }
expect(&blk).to_not raise_error
blk.call
expect(&blk).to_not raise_error
ensure
- s.close
+ s.close(@cq)
end
end
end
@@ -83,16 +83,16 @@ describe Server do
it 'closes a server ok' do
s = start_a_server
begin
- blk = proc { s.close }
+ blk = proc { s.close(@cq) }
expect(&blk).to_not raise_error
ensure
- s.close
+ s.close(@cq)
end
end
it 'can be called more than once without error' do
s = start_a_server
- blk = proc { s.close }
+ blk = proc { s.close(@cq) }
expect(&blk).to_not raise_error
blk.call
expect(&blk).to_not raise_error
@@ -105,14 +105,14 @@ describe Server do
blk = proc do
s = Server.new(@cq, nil)
s.add_http2_port('localhost:0')
- s.close
+ s.close(@cq)
end
expect(&blk).to_not raise_error
end
it 'fails if the server is closed' do
s = Server.new(@cq, nil)
- s.close
+ s.close(@cq)
expect { s.add_http2_port('localhost:0') }.to raise_error(RuntimeError)
end
end
@@ -123,14 +123,14 @@ describe Server do
blk = proc do
s = Server.new(@cq, nil)
s.add_http2_port('localhost:0', cert)
- s.close
+ s.close(@cq)
end
expect(&blk).to_not raise_error
end
it 'fails if the server is closed' do
s = Server.new(@cq, nil)
- s.close
+ s.close(@cq)
blk = proc { s.add_http2_port('localhost:0', cert) }
expect(&blk).to raise_error(RuntimeError)
end