diff options
author | Alexander Polcyn <apolcyn@google.com> | 2018-01-22 22:31:38 -0800 |
---|---|---|
committer | Alexander Polcyn <apolcyn@google.com> | 2018-01-22 23:43:39 -0800 |
commit | 7b87bab83263968e2b176d8ec7e8d85db715d306 (patch) | |
tree | 22d6450fcf57c5fc61709458700136ea195c7aba /src/ruby/ext/grpc/rb_server.c | |
parent | 6827d4473bc71601fdfc69cabe47a065109c416e (diff) |
Refactor ruby server shutdown to fix a race
Diffstat (limited to 'src/ruby/ext/grpc/rb_server.c')
-rw-r--r-- | src/ruby/ext/grpc/rb_server.c | 80 |
1 files changed, 52 insertions, 28 deletions
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 160c1533ba..88e6a0cfd5 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -46,21 +46,38 @@ typedef struct grpc_rb_server { /* The actual server */ grpc_server* wrapped; grpc_completion_queue* queue; - gpr_atm shutdown_started; + int shutdown_and_notify_done; + int destroy_done; } grpc_rb_server; -static void destroy_server(grpc_rb_server* server, gpr_timespec deadline) { +static void grpc_rb_server_maybe_shutdown_and_notify(grpc_rb_server* server, + gpr_timespec deadline) { grpc_event ev; - // This can be started by app or implicitly by GC. Avoid a race between these. - if (gpr_atm_full_fetch_add(&server->shutdown_started, (gpr_atm)1) == 0) { + void* tag = &ev; + if (!server->shutdown_and_notify_done) { + server->shutdown_and_notify_done = 1; if (server->wrapped != NULL) { - grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL); - ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL); + grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag); + ev = rb_completion_queue_pluck(server->queue, tag, deadline, NULL); if (ev.type == GRPC_QUEUE_TIMEOUT) { grpc_server_cancel_all_calls(server->wrapped); - rb_completion_queue_pluck(server->queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + ev = rb_completion_queue_pluck( + server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + } + if (ev.type != GRPC_OP_COMPLETE) { + gpr_log(GPR_INFO, + "GRPC_RUBY: bad grpc_server_shutdown_and_notify result:%d", + ev.type); } + } + } +} + +static void grpc_rb_server_maybe_destroy(grpc_rb_server* server) { + // This can be started by app or implicitly by GC. Avoid a race between these. + if (!server->destroy_done) { + server->destroy_done = 1; + if (server->wrapped != NULL) { grpc_server_destroy(server->wrapped); grpc_rb_completion_queue_destroy(server->queue); server->wrapped = NULL; @@ -81,7 +98,8 @@ static void grpc_rb_server_free(void* p) { deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(2, GPR_TIMESPAN)); - destroy_server(svr, deadline); + grpc_rb_server_maybe_shutdown_and_notify(svr, deadline); + grpc_rb_server_maybe_destroy(svr); xfree(p); } @@ -107,7 +125,8 @@ static const rb_data_type_t grpc_rb_server_data_type = { static VALUE grpc_rb_server_alloc(VALUE cls) { grpc_rb_server* wrapper = ALLOC(grpc_rb_server); wrapper->wrapped = NULL; - wrapper->shutdown_started = (gpr_atm)0; + wrapper->destroy_done = 0; + wrapper->shutdown_and_notify_done = 0; return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper); } @@ -232,25 +251,10 @@ static VALUE grpc_rb_server_start(VALUE self) { return Qnil; } -/* - call-seq: - server = Server.new({'arg1': 'value1'}) - ... // do stuff with server - ... - ... // to shutdown the server - server.destroy() - - ... // to shutdown the server with a timeout - server.destroy(timeout) - - Destroys server instances. */ -static VALUE grpc_rb_server_destroy(int argc, VALUE* argv, VALUE self) { - VALUE timeout = Qnil; +static VALUE grpc_rb_server_shutdown_and_notify(VALUE self, VALUE timeout) { gpr_timespec deadline; grpc_rb_server* s = NULL; - /* "01" == 0 mandatory args, 1 (timeout) is optional */ - rb_scan_args(argc, argv, "01", &timeout); TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); if (TYPE(timeout) == T_NIL) { deadline = gpr_inf_future(GPR_CLOCK_REALTIME); @@ -258,8 +262,26 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE* argv, VALUE self) { deadline = grpc_rb_time_timeval(timeout, /* absolute time*/ 0); } - destroy_server(s, deadline); + grpc_rb_server_maybe_shutdown_and_notify(s, deadline); + + return Qnil; +} + +/* + call-seq: + server = Server.new({'arg1': 'value1'}) + ... // do stuff with server + ... + ... // initiate server shutdown + server.shutdown_and_notify(timeout) + ... // to shutdown the server + server.destroy() + Destroys server instances. */ +static VALUE grpc_rb_server_destroy(VALUE self) { + grpc_rb_server* s = NULL; + TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); + grpc_rb_server_maybe_destroy(s); return Qnil; } @@ -326,7 +348,9 @@ void Init_grpc_server() { rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call, 0); rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0); - rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1); + rb_define_method(grpc_rb_cServer, "shutdown_and_notify", + grpc_rb_server_shutdown_and_notify, 1); + rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0); rb_define_alias(grpc_rb_cServer, "close", "destroy"); rb_define_method(grpc_rb_cServer, "add_http2_port", grpc_rb_server_add_http2_port, 2); |