diff options
author | apolcyn <apolcyn@google.com> | 2018-02-09 18:03:30 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-09 18:03:30 -0800 |
commit | 90dde8f1360fcc251ac16cd5fb8f53b5f372659a (patch) | |
tree | 0a84caa4bc14af022d7d91c80b0d60dc2c67a0e6 /src | |
parent | 823c9c47173a56ecb69261a0548066cd004d8a8d (diff) | |
parent | d60ed8f1f88a2d0320d2fe7e3cb3ae71645f8400 (diff) |
Merge pull request #14134 from apolcyn/fix_ruby_shutdown_race
Refactor ruby server shutdown to fix a race in tests
Diffstat (limited to 'src')
-rw-r--r-- | src/ruby/ext/grpc/rb_server.c | 80 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 11 | ||||
-rw-r--r-- | src/ruby/spec/client_server_spec.rb | 6 | ||||
-rw-r--r-- | src/ruby/spec/generic/active_call_spec.rb | 3 | ||||
-rw-r--r-- | src/ruby/spec/generic/client_stub_spec.rb | 40 | ||||
-rw-r--r-- | src/ruby/spec/server_spec.rb | 33 |
6 files changed, 123 insertions, 50 deletions
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 160c1533ba..88e6a0cfd5 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -46,21 +46,38 @@ typedef struct grpc_rb_server { /* The actual server */ grpc_server* wrapped; grpc_completion_queue* queue; - gpr_atm shutdown_started; + int shutdown_and_notify_done; + int destroy_done; } grpc_rb_server; -static void destroy_server(grpc_rb_server* server, gpr_timespec deadline) { +static void grpc_rb_server_maybe_shutdown_and_notify(grpc_rb_server* server, + gpr_timespec deadline) { grpc_event ev; - // This can be started by app or implicitly by GC. Avoid a race between these. - if (gpr_atm_full_fetch_add(&server->shutdown_started, (gpr_atm)1) == 0) { + void* tag = &ev; + if (!server->shutdown_and_notify_done) { + server->shutdown_and_notify_done = 1; if (server->wrapped != NULL) { - grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL); - ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL); + grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag); + ev = rb_completion_queue_pluck(server->queue, tag, deadline, NULL); if (ev.type == GRPC_QUEUE_TIMEOUT) { grpc_server_cancel_all_calls(server->wrapped); - rb_completion_queue_pluck(server->queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + ev = rb_completion_queue_pluck( + server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + } + if (ev.type != GRPC_OP_COMPLETE) { + gpr_log(GPR_INFO, + "GRPC_RUBY: bad grpc_server_shutdown_and_notify result:%d", + ev.type); } + } + } +} + +static void grpc_rb_server_maybe_destroy(grpc_rb_server* server) { + // This can be started by app or implicitly by GC. Avoid a race between these. + if (!server->destroy_done) { + server->destroy_done = 1; + if (server->wrapped != NULL) { grpc_server_destroy(server->wrapped); grpc_rb_completion_queue_destroy(server->queue); server->wrapped = NULL; @@ -81,7 +98,8 @@ static void grpc_rb_server_free(void* p) { deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(2, GPR_TIMESPAN)); - destroy_server(svr, deadline); + grpc_rb_server_maybe_shutdown_and_notify(svr, deadline); + grpc_rb_server_maybe_destroy(svr); xfree(p); } @@ -107,7 +125,8 @@ static const rb_data_type_t grpc_rb_server_data_type = { static VALUE grpc_rb_server_alloc(VALUE cls) { grpc_rb_server* wrapper = ALLOC(grpc_rb_server); wrapper->wrapped = NULL; - wrapper->shutdown_started = (gpr_atm)0; + wrapper->destroy_done = 0; + wrapper->shutdown_and_notify_done = 0; return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper); } @@ -232,25 +251,10 @@ static VALUE grpc_rb_server_start(VALUE self) { return Qnil; } -/* - call-seq: - server = Server.new({'arg1': 'value1'}) - ... // do stuff with server - ... - ... // to shutdown the server - server.destroy() - - ... // to shutdown the server with a timeout - server.destroy(timeout) - - Destroys server instances. */ -static VALUE grpc_rb_server_destroy(int argc, VALUE* argv, VALUE self) { - VALUE timeout = Qnil; +static VALUE grpc_rb_server_shutdown_and_notify(VALUE self, VALUE timeout) { gpr_timespec deadline; grpc_rb_server* s = NULL; - /* "01" == 0 mandatory args, 1 (timeout) is optional */ - rb_scan_args(argc, argv, "01", &timeout); TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); if (TYPE(timeout) == T_NIL) { deadline = gpr_inf_future(GPR_CLOCK_REALTIME); @@ -258,8 +262,26 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE* argv, VALUE self) { deadline = grpc_rb_time_timeval(timeout, /* absolute time*/ 0); } - destroy_server(s, deadline); + grpc_rb_server_maybe_shutdown_and_notify(s, deadline); + + return Qnil; +} + +/* + call-seq: + server = Server.new({'arg1': 'value1'}) + ... // do stuff with server + ... + ... // initiate server shutdown + server.shutdown_and_notify(timeout) + ... // to shutdown the server + server.destroy() + Destroys server instances. */ +static VALUE grpc_rb_server_destroy(VALUE self) { + grpc_rb_server* s = NULL; + TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); + grpc_rb_server_maybe_destroy(s); return Qnil; } @@ -326,7 +348,9 @@ void Init_grpc_server() { rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call, 0); rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0); - rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1); + rb_define_method(grpc_rb_cServer, "shutdown_and_notify", + grpc_rb_server_shutdown_and_notify, 1); + rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0); rb_define_alias(grpc_rb_cServer, "close", "destroy"); rb_define_method(grpc_rb_cServer, "add_http2_port", grpc_rb_server_add_http2_port, 2); diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index c80c7fcd32..d96e677f20 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -244,9 +244,9 @@ module GRPC fail 'Cannot stop before starting' if @running_state == :not_started return if @running_state != :running transition_running_state(:stopping) + deadline = from_relative_time(@poll_period) + @server.shutdown_and_notify(deadline) end - deadline = from_relative_time(@poll_period) - @server.close(deadline) @pool.stop end @@ -416,8 +416,11 @@ module GRPC end end # @running_state should be :stopping here - @run_mutex.synchronize { transition_running_state(:stopped) } - GRPC.logger.info("stopped: #{self}") + @run_mutex.synchronize do + transition_running_state(:stopped) + GRPC.logger.info("stopped: #{self}") + @server.close + end end def new_active_server_call(an_rpc) diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index 14ad369ac8..afbfb0bc43 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -550,7 +550,8 @@ describe 'the http client/server' do after(:example) do @ch.close - @server.close(deadline) + @server.shutdown_and_notify(deadline) + @server.close end it_behaves_like 'basic GRPC message delivery is OK' do @@ -583,7 +584,8 @@ describe 'the secure http client/server' do end after(:example) do - @server.close(deadline) + @server.shutdown_and_notify(deadline) + @server.close end it_behaves_like 'basic GRPC message delivery is OK' do diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 135d1f28bf..6b44b22acf 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -48,7 +48,8 @@ describe GRPC::ActiveCall do end after(:each) do - @server.close(deadline) + @server.shutdown_and_notify(deadline) + @server.close end describe 'restricted view methods' do diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 5353b534f4..d858c4e3fe 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -83,7 +83,12 @@ def sanity_check_values_of_accessors(op_view, op_view.deadline.is_a?(Time)).to be(true) end -describe 'ClientStub' do +def close_active_server_call(active_server_call) + active_server_call.send(:set_input_stream_done) + active_server_call.send(:set_output_stream_done) +end + +describe 'ClientStub' do # rubocop:disable Metrics/BlockLength let(:noop) { proc { |x| x } } before(:each) do @@ -96,7 +101,10 @@ describe 'ClientStub' do end after(:each) do - @server.close(from_relative_time(2)) unless @server.nil? + unless @server.nil? + @server.shutdown_and_notify(from_relative_time(2)) + @server.close + end end describe '#new' do @@ -230,7 +238,15 @@ describe 'ClientStub' do it 'should receive UNAVAILABLE if call credentials plugin fails' do server_port = create_secure_test_server - th = run_request_response(@sent_msg, @resp, @pass) + server_started_notifier = GRPC::Notifier.new + th = Thread.new do + @server.start + server_started_notifier.notify(nil) + # Poll on the server so that the client connection can proceed. + # We don't expect the server to actually accept a call though. + expect { @server.request_call }.to raise_error(GRPC::Core::CallError) + end + server_started_notifier.wait certs = load_test_certs secure_channel_creds = GRPC::Core::ChannelCredentials.new( @@ -249,17 +265,18 @@ describe 'ClientStub' do end creds = GRPC::Core::CallCredentials.new(failing_auth) - unauth_error_occured = false + unavailable_error_occured = false begin get_response(stub, credentials: creds) rescue GRPC::Unavailable => e - unauth_error_occured = true + unavailable_error_occured = true expect(e.details.include?(error_message)).to be true end - expect(unauth_error_occured).to eq(true) + expect(unavailable_error_occured).to eq(true) - # Kill the server thread so tests can complete - th.kill + @server.shutdown_and_notify(Time.now + 3) + th.join + @server.close end it 'should raise ArgumentError if metadata contains invalid values' do @@ -493,6 +510,7 @@ describe 'ClientStub' do p 'remote_send failed (allowed because call expected to cancel)' ensure c.send_status(OK, 'OK', true) + close_active_server_call(c) end end end @@ -659,6 +677,7 @@ describe 'ClientStub' do end # can't fail since initial metadata already sent server_call.send_status(@pass, 'OK', true) + close_active_server_call(server_call) end def verify_error_from_write_thread(stub, requests_to_push, @@ -809,6 +828,7 @@ describe 'ClientStub' do replys.each { |r| c.remote_send(r) } c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) + close_active_server_call(c) end end @@ -819,6 +839,7 @@ describe 'ClientStub' do expected_inputs.each { |i| expect(c.remote_read).to eq(i) } replys.each { |r| c.remote_send(r) } c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + close_active_server_call(c) end end @@ -844,6 +865,7 @@ describe 'ClientStub' do end c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) + close_active_server_call(c) end end @@ -862,6 +884,7 @@ describe 'ClientStub' do c.remote_send(resp) c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) + close_active_server_call(c) end end @@ -880,6 +903,7 @@ describe 'ClientStub' do c.remote_send(resp) c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) + close_active_server_call(c) end end diff --git a/src/ruby/spec/server_spec.rb b/src/ruby/spec/server_spec.rb index a0d27b66f5..6eaac5ded1 100644 --- a/src/ruby/spec/server_spec.rb +++ b/src/ruby/spec/server_spec.rb @@ -36,45 +36,60 @@ describe Server do it 'fails if the server is closed' do s = new_core_server_for_testing(nil) + s.shutdown_and_notify(nil) s.close expect { s.start }.to raise_error(RuntimeError) end end - describe '#destroy' do + describe '#shutdown_and_notify and #destroy' do it 'destroys a server ok' do s = start_a_server - blk = proc { s.destroy } + blk = proc do + s.shutdown_and_notify(nil) + s.destroy + end expect(&blk).to_not raise_error end it 'can be called more than once without error' do s = start_a_server begin - blk = proc { s.destroy } + blk = proc do + s.shutdown_and_notify(nil) + s.destroy + end expect(&blk).to_not raise_error blk.call expect(&blk).to_not raise_error ensure + s.shutdown_and_notify(nil) s.close end end end - describe '#close' do + describe '#shutdown_and_notify and #close' do it 'closes a server ok' do s = start_a_server begin - blk = proc { s.close } + blk = proc do + s.shutdown_and_notify(nil) + s.close + end expect(&blk).to_not raise_error ensure - s.close(@cq) + s.shutdown_and_notify(nil) + s.close end end it 'can be called more than once without error' do s = start_a_server - blk = proc { s.close } + blk = proc do + s.shutdown_and_notify(nil) + s.close + end expect(&blk).to_not raise_error blk.call expect(&blk).to_not raise_error @@ -87,6 +102,7 @@ describe Server do blk = proc do s = new_core_server_for_testing(nil) s.add_http2_port('localhost:0', :this_port_is_insecure) + s.shutdown_and_notify(nil) s.close end expect(&blk).to_not raise_error @@ -94,6 +110,7 @@ describe Server do it 'fails if the server is closed' do s = new_core_server_for_testing(nil) + s.shutdown_and_notify(nil) s.close blk = proc do s.add_http2_port('localhost:0', :this_port_is_insecure) @@ -108,6 +125,7 @@ describe Server do blk = proc do s = new_core_server_for_testing(nil) s.add_http2_port('localhost:0', cert) + s.shutdown_and_notify(nil) s.close end expect(&blk).to_not raise_error @@ -115,6 +133,7 @@ describe Server do it 'fails if the server is closed' do s = new_core_server_for_testing(nil) + s.shutdown_and_notify(nil) s.close blk = proc { s.add_http2_port('localhost:0', cert) } expect(&blk).to raise_error(RuntimeError) |