aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/ext/grpc/rb_server.c
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2018-01-22 22:31:38 -0800
committerGravatar Alexander Polcyn <apolcyn@google.com>2018-01-22 23:43:39 -0800
commit7b87bab83263968e2b176d8ec7e8d85db715d306 (patch)
tree22d6450fcf57c5fc61709458700136ea195c7aba /src/ruby/ext/grpc/rb_server.c
parent6827d4473bc71601fdfc69cabe47a065109c416e (diff)
Refactor ruby server shutdown to fix a race
Diffstat (limited to 'src/ruby/ext/grpc/rb_server.c')
-rw-r--r--src/ruby/ext/grpc/rb_server.c80
1 files changed, 52 insertions, 28 deletions
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 160c1533ba..88e6a0cfd5 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -46,21 +46,38 @@ typedef struct grpc_rb_server {
/* The actual server */
grpc_server* wrapped;
grpc_completion_queue* queue;
- gpr_atm shutdown_started;
+ int shutdown_and_notify_done;
+ int destroy_done;
} grpc_rb_server;
-static void destroy_server(grpc_rb_server* server, gpr_timespec deadline) {
+static void grpc_rb_server_maybe_shutdown_and_notify(grpc_rb_server* server,
+ gpr_timespec deadline) {
grpc_event ev;
- // This can be started by app or implicitly by GC. Avoid a race between these.
- if (gpr_atm_full_fetch_add(&server->shutdown_started, (gpr_atm)1) == 0) {
+ void* tag = &ev;
+ if (!server->shutdown_and_notify_done) {
+ server->shutdown_and_notify_done = 1;
if (server->wrapped != NULL) {
- grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL);
- ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL);
+ grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag);
+ ev = rb_completion_queue_pluck(server->queue, tag, deadline, NULL);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_server_cancel_all_calls(server->wrapped);
- rb_completion_queue_pluck(server->queue, NULL,
- gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ ev = rb_completion_queue_pluck(
+ server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ }
+ if (ev.type != GRPC_OP_COMPLETE) {
+ gpr_log(GPR_INFO,
+ "GRPC_RUBY: bad grpc_server_shutdown_and_notify result:%d",
+ ev.type);
}
+ }
+ }
+}
+
+static void grpc_rb_server_maybe_destroy(grpc_rb_server* server) {
+ // This can be started by app or implicitly by GC. Avoid a race between these.
+ if (!server->destroy_done) {
+ server->destroy_done = 1;
+ if (server->wrapped != NULL) {
grpc_server_destroy(server->wrapped);
grpc_rb_completion_queue_destroy(server->queue);
server->wrapped = NULL;
@@ -81,7 +98,8 @@ static void grpc_rb_server_free(void* p) {
deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(2, GPR_TIMESPAN));
- destroy_server(svr, deadline);
+ grpc_rb_server_maybe_shutdown_and_notify(svr, deadline);
+ grpc_rb_server_maybe_destroy(svr);
xfree(p);
}
@@ -107,7 +125,8 @@ static const rb_data_type_t grpc_rb_server_data_type = {
static VALUE grpc_rb_server_alloc(VALUE cls) {
grpc_rb_server* wrapper = ALLOC(grpc_rb_server);
wrapper->wrapped = NULL;
- wrapper->shutdown_started = (gpr_atm)0;
+ wrapper->destroy_done = 0;
+ wrapper->shutdown_and_notify_done = 0;
return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper);
}
@@ -232,25 +251,10 @@ static VALUE grpc_rb_server_start(VALUE self) {
return Qnil;
}
-/*
- call-seq:
- server = Server.new({'arg1': 'value1'})
- ... // do stuff with server
- ...
- ... // to shutdown the server
- server.destroy()
-
- ... // to shutdown the server with a timeout
- server.destroy(timeout)
-
- Destroys server instances. */
-static VALUE grpc_rb_server_destroy(int argc, VALUE* argv, VALUE self) {
- VALUE timeout = Qnil;
+static VALUE grpc_rb_server_shutdown_and_notify(VALUE self, VALUE timeout) {
gpr_timespec deadline;
grpc_rb_server* s = NULL;
- /* "01" == 0 mandatory args, 1 (timeout) is optional */
- rb_scan_args(argc, argv, "01", &timeout);
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (TYPE(timeout) == T_NIL) {
deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
@@ -258,8 +262,26 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE* argv, VALUE self) {
deadline = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
}
- destroy_server(s, deadline);
+ grpc_rb_server_maybe_shutdown_and_notify(s, deadline);
+
+ return Qnil;
+}
+
+/*
+ call-seq:
+ server = Server.new({'arg1': 'value1'})
+ ... // do stuff with server
+ ...
+ ... // initiate server shutdown
+ server.shutdown_and_notify(timeout)
+ ... // to shutdown the server
+ server.destroy()
+ Destroys server instances. */
+static VALUE grpc_rb_server_destroy(VALUE self) {
+ grpc_rb_server* s = NULL;
+ TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
+ grpc_rb_server_maybe_destroy(s);
return Qnil;
}
@@ -326,7 +348,9 @@ void Init_grpc_server() {
rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call,
0);
rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
- rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1);
+ rb_define_method(grpc_rb_cServer, "shutdown_and_notify",
+ grpc_rb_server_shutdown_and_notify, 1);
+ rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0);
rb_define_alias(grpc_rb_cServer, "close", "destroy");
rb_define_method(grpc_rb_cServer, "add_http2_port",
grpc_rb_server_add_http2_port, 2);