aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2018-01-22 22:31:38 -0800
committerGravatar Alexander Polcyn <apolcyn@google.com>2018-01-22 23:43:39 -0800
commit7b87bab83263968e2b176d8ec7e8d85db715d306 (patch)
tree22d6450fcf57c5fc61709458700136ea195c7aba
parent6827d4473bc71601fdfc69cabe47a065109c416e (diff)
Refactor ruby server shutdown to fix a race
-rw-r--r--src/ruby/ext/grpc/rb_server.c80
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb3
-rw-r--r--src/ruby/spec/client_server_spec.rb6
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb3
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb40
-rw-r--r--src/ruby/spec/server_spec.rb33
6 files changed, 118 insertions, 47 deletions
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 160c1533ba..88e6a0cfd5 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -46,21 +46,38 @@ typedef struct grpc_rb_server {
/* The actual server */
grpc_server* wrapped;
grpc_completion_queue* queue;
- gpr_atm shutdown_started;
+ int shutdown_and_notify_done;
+ int destroy_done;
} grpc_rb_server;
-static void destroy_server(grpc_rb_server* server, gpr_timespec deadline) {
+static void grpc_rb_server_maybe_shutdown_and_notify(grpc_rb_server* server,
+ gpr_timespec deadline) {
grpc_event ev;
- // This can be started by app or implicitly by GC. Avoid a race between these.
- if (gpr_atm_full_fetch_add(&server->shutdown_started, (gpr_atm)1) == 0) {
+ void* tag = &ev;
+ if (!server->shutdown_and_notify_done) {
+ server->shutdown_and_notify_done = 1;
if (server->wrapped != NULL) {
- grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL);
- ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL);
+ grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag);
+ ev = rb_completion_queue_pluck(server->queue, tag, deadline, NULL);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_server_cancel_all_calls(server->wrapped);
- rb_completion_queue_pluck(server->queue, NULL,
- gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ ev = rb_completion_queue_pluck(
+ server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ }
+ if (ev.type != GRPC_OP_COMPLETE) {
+ gpr_log(GPR_INFO,
+ "GRPC_RUBY: bad grpc_server_shutdown_and_notify result:%d",
+ ev.type);
}
+ }
+ }
+}
+
+static void grpc_rb_server_maybe_destroy(grpc_rb_server* server) {
+ // This can be started by app or implicitly by GC. Avoid a race between these.
+ if (!server->destroy_done) {
+ server->destroy_done = 1;
+ if (server->wrapped != NULL) {
grpc_server_destroy(server->wrapped);
grpc_rb_completion_queue_destroy(server->queue);
server->wrapped = NULL;
@@ -81,7 +98,8 @@ static void grpc_rb_server_free(void* p) {
deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(2, GPR_TIMESPAN));
- destroy_server(svr, deadline);
+ grpc_rb_server_maybe_shutdown_and_notify(svr, deadline);
+ grpc_rb_server_maybe_destroy(svr);
xfree(p);
}
@@ -107,7 +125,8 @@ static const rb_data_type_t grpc_rb_server_data_type = {
static VALUE grpc_rb_server_alloc(VALUE cls) {
grpc_rb_server* wrapper = ALLOC(grpc_rb_server);
wrapper->wrapped = NULL;
- wrapper->shutdown_started = (gpr_atm)0;
+ wrapper->destroy_done = 0;
+ wrapper->shutdown_and_notify_done = 0;
return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper);
}
@@ -232,25 +251,10 @@ static VALUE grpc_rb_server_start(VALUE self) {
return Qnil;
}
-/*
- call-seq:
- server = Server.new({'arg1': 'value1'})
- ... // do stuff with server
- ...
- ... // to shutdown the server
- server.destroy()
-
- ... // to shutdown the server with a timeout
- server.destroy(timeout)
-
- Destroys server instances. */
-static VALUE grpc_rb_server_destroy(int argc, VALUE* argv, VALUE self) {
- VALUE timeout = Qnil;
+static VALUE grpc_rb_server_shutdown_and_notify(VALUE self, VALUE timeout) {
gpr_timespec deadline;
grpc_rb_server* s = NULL;
- /* "01" == 0 mandatory args, 1 (timeout) is optional */
- rb_scan_args(argc, argv, "01", &timeout);
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (TYPE(timeout) == T_NIL) {
deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
@@ -258,8 +262,26 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE* argv, VALUE self) {
deadline = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
}
- destroy_server(s, deadline);
+ grpc_rb_server_maybe_shutdown_and_notify(s, deadline);
+
+ return Qnil;
+}
+
+/*
+ call-seq:
+ server = Server.new({'arg1': 'value1'})
+ ... // do stuff with server
+ ...
+ ... // initiate server shutdown
+ server.shutdown_and_notify(timeout)
+ ... // to shutdown the server
+ server.destroy()
+ Destroys server instances. */
+static VALUE grpc_rb_server_destroy(VALUE self) {
+ grpc_rb_server* s = NULL;
+ TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
+ grpc_rb_server_maybe_destroy(s);
return Qnil;
}
@@ -326,7 +348,9 @@ void Init_grpc_server() {
rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call,
0);
rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
- rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1);
+ rb_define_method(grpc_rb_cServer, "shutdown_and_notify",
+ grpc_rb_server_shutdown_and_notify, 1);
+ rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0);
rb_define_alias(grpc_rb_cServer, "close", "destroy");
rb_define_method(grpc_rb_cServer, "add_http2_port",
grpc_rb_server_add_http2_port, 2);
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index c80c7fcd32..640975109d 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -246,7 +246,7 @@ module GRPC
transition_running_state(:stopping)
end
deadline = from_relative_time(@poll_period)
- @server.close(deadline)
+ @server.shutdown_and_notify(deadline)
@pool.stop
end
@@ -418,6 +418,7 @@ module GRPC
# @running_state should be :stopping here
@run_mutex.synchronize { transition_running_state(:stopped) }
GRPC.logger.info("stopped: #{self}")
+ @server.close
end
def new_active_server_call(an_rpc)
diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb
index 14ad369ac8..afbfb0bc43 100644
--- a/src/ruby/spec/client_server_spec.rb
+++ b/src/ruby/spec/client_server_spec.rb
@@ -550,7 +550,8 @@ describe 'the http client/server' do
after(:example) do
@ch.close
- @server.close(deadline)
+ @server.shutdown_and_notify(deadline)
+ @server.close
end
it_behaves_like 'basic GRPC message delivery is OK' do
@@ -583,7 +584,8 @@ describe 'the secure http client/server' do
end
after(:example) do
- @server.close(deadline)
+ @server.shutdown_and_notify(deadline)
+ @server.close
end
it_behaves_like 'basic GRPC message delivery is OK' do
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 135d1f28bf..6b44b22acf 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -48,7 +48,8 @@ describe GRPC::ActiveCall do
end
after(:each) do
- @server.close(deadline)
+ @server.shutdown_and_notify(deadline)
+ @server.close
end
describe 'restricted view methods' do
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 5353b534f4..d858c4e3fe 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -83,7 +83,12 @@ def sanity_check_values_of_accessors(op_view,
op_view.deadline.is_a?(Time)).to be(true)
end
-describe 'ClientStub' do
+def close_active_server_call(active_server_call)
+ active_server_call.send(:set_input_stream_done)
+ active_server_call.send(:set_output_stream_done)
+end
+
+describe 'ClientStub' do # rubocop:disable Metrics/BlockLength
let(:noop) { proc { |x| x } }
before(:each) do
@@ -96,7 +101,10 @@ describe 'ClientStub' do
end
after(:each) do
- @server.close(from_relative_time(2)) unless @server.nil?
+ unless @server.nil?
+ @server.shutdown_and_notify(from_relative_time(2))
+ @server.close
+ end
end
describe '#new' do
@@ -230,7 +238,15 @@ describe 'ClientStub' do
it 'should receive UNAVAILABLE if call credentials plugin fails' do
server_port = create_secure_test_server
- th = run_request_response(@sent_msg, @resp, @pass)
+ server_started_notifier = GRPC::Notifier.new
+ th = Thread.new do
+ @server.start
+ server_started_notifier.notify(nil)
+ # Poll on the server so that the client connection can proceed.
+ # We don't expect the server to actually accept a call though.
+ expect { @server.request_call }.to raise_error(GRPC::Core::CallError)
+ end
+ server_started_notifier.wait
certs = load_test_certs
secure_channel_creds = GRPC::Core::ChannelCredentials.new(
@@ -249,17 +265,18 @@ describe 'ClientStub' do
end
creds = GRPC::Core::CallCredentials.new(failing_auth)
- unauth_error_occured = false
+ unavailable_error_occured = false
begin
get_response(stub, credentials: creds)
rescue GRPC::Unavailable => e
- unauth_error_occured = true
+ unavailable_error_occured = true
expect(e.details.include?(error_message)).to be true
end
- expect(unauth_error_occured).to eq(true)
+ expect(unavailable_error_occured).to eq(true)
- # Kill the server thread so tests can complete
- th.kill
+ @server.shutdown_and_notify(Time.now + 3)
+ th.join
+ @server.close
end
it 'should raise ArgumentError if metadata contains invalid values' do
@@ -493,6 +510,7 @@ describe 'ClientStub' do
p 'remote_send failed (allowed because call expected to cancel)'
ensure
c.send_status(OK, 'OK', true)
+ close_active_server_call(c)
end
end
end
@@ -659,6 +677,7 @@ describe 'ClientStub' do
end
# can't fail since initial metadata already sent
server_call.send_status(@pass, 'OK', true)
+ close_active_server_call(server_call)
end
def verify_error_from_write_thread(stub, requests_to_push,
@@ -809,6 +828,7 @@ describe 'ClientStub' do
replys.each { |r| c.remote_send(r) }
c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
+ close_active_server_call(c)
end
end
@@ -819,6 +839,7 @@ describe 'ClientStub' do
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
replys.each { |r| c.remote_send(r) }
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+ close_active_server_call(c)
end
end
@@ -844,6 +865,7 @@ describe 'ClientStub' do
end
c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
+ close_active_server_call(c)
end
end
@@ -862,6 +884,7 @@ describe 'ClientStub' do
c.remote_send(resp)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
+ close_active_server_call(c)
end
end
@@ -880,6 +903,7 @@ describe 'ClientStub' do
c.remote_send(resp)
c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
metadata: server_trailing_md)
+ close_active_server_call(c)
end
end
diff --git a/src/ruby/spec/server_spec.rb b/src/ruby/spec/server_spec.rb
index a0d27b66f5..6eaac5ded1 100644
--- a/src/ruby/spec/server_spec.rb
+++ b/src/ruby/spec/server_spec.rb
@@ -36,45 +36,60 @@ describe Server do
it 'fails if the server is closed' do
s = new_core_server_for_testing(nil)
+ s.shutdown_and_notify(nil)
s.close
expect { s.start }.to raise_error(RuntimeError)
end
end
- describe '#destroy' do
+ describe '#shutdown_and_notify and #destroy' do
it 'destroys a server ok' do
s = start_a_server
- blk = proc { s.destroy }
+ blk = proc do
+ s.shutdown_and_notify(nil)
+ s.destroy
+ end
expect(&blk).to_not raise_error
end
it 'can be called more than once without error' do
s = start_a_server
begin
- blk = proc { s.destroy }
+ blk = proc do
+ s.shutdown_and_notify(nil)
+ s.destroy
+ end
expect(&blk).to_not raise_error
blk.call
expect(&blk).to_not raise_error
ensure
+ s.shutdown_and_notify(nil)
s.close
end
end
end
- describe '#close' do
+ describe '#shutdown_and_notify and #close' do
it 'closes a server ok' do
s = start_a_server
begin
- blk = proc { s.close }
+ blk = proc do
+ s.shutdown_and_notify(nil)
+ s.close
+ end
expect(&blk).to_not raise_error
ensure
- s.close(@cq)
+ s.shutdown_and_notify(nil)
+ s.close
end
end
it 'can be called more than once without error' do
s = start_a_server
- blk = proc { s.close }
+ blk = proc do
+ s.shutdown_and_notify(nil)
+ s.close
+ end
expect(&blk).to_not raise_error
blk.call
expect(&blk).to_not raise_error
@@ -87,6 +102,7 @@ describe Server do
blk = proc do
s = new_core_server_for_testing(nil)
s.add_http2_port('localhost:0', :this_port_is_insecure)
+ s.shutdown_and_notify(nil)
s.close
end
expect(&blk).to_not raise_error
@@ -94,6 +110,7 @@ describe Server do
it 'fails if the server is closed' do
s = new_core_server_for_testing(nil)
+ s.shutdown_and_notify(nil)
s.close
blk = proc do
s.add_http2_port('localhost:0', :this_port_is_insecure)
@@ -108,6 +125,7 @@ describe Server do
blk = proc do
s = new_core_server_for_testing(nil)
s.add_http2_port('localhost:0', cert)
+ s.shutdown_and_notify(nil)
s.close
end
expect(&blk).to_not raise_error
@@ -115,6 +133,7 @@ describe Server do
it 'fails if the server is closed' do
s = new_core_server_for_testing(nil)
+ s.shutdown_and_notify(nil)
s.close
blk = proc { s.add_http2_port('localhost:0', cert) }
expect(&blk).to raise_error(RuntimeError)