diff options
author | apolcyn <apolcyn@google.com> | 2016-12-05 13:52:47 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-12-05 13:52:47 -0800 |
commit | 93904c0dc8315dee04af29091e7033ad50797918 (patch) | |
tree | 807c7952caf12066947f4c87f7464cb9955465f7 | |
parent | 58e120c46a828e66f3bde57b17c4d00fcea3a25a (diff) | |
parent | a868c0424e492cfa135390dacb76c30c525dd794 (diff) |
Merge pull request #8843 from apolcyn/fix_ruby_shutdown
Fix ruby shutdown
-rw-r--r-- | src/ruby/ext/grpc/rb_server.c | 28 | ||||
-rw-r--r-- | src/ruby/qps/client.rb | 6 |
2 files changed, 21 insertions, 13 deletions
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index bf26841fd2..55c745965b 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -37,6 +37,7 @@ #include "rb_server.h" #include <grpc/grpc.h> +#include <grpc/support/atm.h> #include <grpc/grpc_security.h> #include <grpc/support/log.h> #include "rb_call.h" @@ -59,22 +60,26 @@ typedef struct grpc_rb_server { /* The actual server */ grpc_server *wrapped; grpc_completion_queue *queue; + gpr_atm shutdown_started; } grpc_rb_server; static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) { grpc_event ev; - if (server->wrapped != NULL) { - grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL); - ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL); - if (ev.type == GRPC_QUEUE_TIMEOUT) { - grpc_server_cancel_all_calls(server->wrapped); - rb_completion_queue_pluck(server->queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + // This can be started by app or implicitly by GC. Avoid a race between these. + if (gpr_atm_full_fetch_add(&server->shutdown_started, (gpr_atm)1) == 0) { + if (server->wrapped != NULL) { + grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL); + ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL); + if (ev.type == GRPC_QUEUE_TIMEOUT) { + grpc_server_cancel_all_calls(server->wrapped); + rb_completion_queue_pluck(server->queue, NULL, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + } + grpc_server_destroy(server->wrapped); + grpc_rb_completion_queue_destroy(server->queue); + server->wrapped = NULL; + server->queue = NULL; } - grpc_server_destroy(server->wrapped); - grpc_rb_completion_queue_destroy(server->queue); - server->wrapped = NULL; - server->queue = NULL; } } @@ -115,6 +120,7 @@ static const rb_data_type_t grpc_rb_server_data_type = { static VALUE grpc_rb_server_alloc(VALUE cls) { grpc_rb_server *wrapper = ALLOC(grpc_rb_server); wrapper->wrapped = NULL; + wrapper->shutdown_started = (gpr_atm)0; return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper); } diff --git a/src/ruby/qps/client.rb b/src/ruby/qps/client.rb index 8aed866da5..817192626b 100644 --- a/src/ruby/qps/client.rb +++ b/src/ruby/qps/client.rb @@ -134,6 +134,7 @@ class BenchmarkClient resp = stub.streaming_call(q.each_item) start = Time.now q.push(req) + pushed_sentinal = false resp.each do |r| @histogram.add((Time.now-start)*1e9) if !@done @@ -141,8 +142,9 @@ class BenchmarkClient start = Time.now q.push(req) else - q.push(self) - break + q.push(self) unless pushed_sentinal + # Continue polling on the responses to consume and release resources + pushed_sentinal = true end end end |