aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar apolcyn <apolcyn@google.com>2016-12-05 13:52:47 -0800
committerGravatar GitHub <noreply@github.com>2016-12-05 13:52:47 -0800
commit93904c0dc8315dee04af29091e7033ad50797918 (patch)
tree807c7952caf12066947f4c87f7464cb9955465f7
parent58e120c46a828e66f3bde57b17c4d00fcea3a25a (diff)
parenta868c0424e492cfa135390dacb76c30c525dd794 (diff)
Merge pull request #8843 from apolcyn/fix_ruby_shutdown
Fix ruby shutdown
-rw-r--r--src/ruby/ext/grpc/rb_server.c28
-rw-r--r--src/ruby/qps/client.rb6
2 files changed, 21 insertions, 13 deletions
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index bf26841fd2..55c745965b 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -37,6 +37,7 @@
#include "rb_server.h"
#include <grpc/grpc.h>
+#include <grpc/support/atm.h>
#include <grpc/grpc_security.h>
#include <grpc/support/log.h>
#include "rb_call.h"
@@ -59,22 +60,26 @@ typedef struct grpc_rb_server {
/* The actual server */
grpc_server *wrapped;
grpc_completion_queue *queue;
+ gpr_atm shutdown_started;
} grpc_rb_server;
static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) {
grpc_event ev;
- if (server->wrapped != NULL) {
- grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL);
- ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL);
- if (ev.type == GRPC_QUEUE_TIMEOUT) {
- grpc_server_cancel_all_calls(server->wrapped);
- rb_completion_queue_pluck(server->queue, NULL,
- gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ // This can be started by app or implicitly by GC. Avoid a race between these.
+ if (gpr_atm_full_fetch_add(&server->shutdown_started, (gpr_atm)1) == 0) {
+ if (server->wrapped != NULL) {
+ grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL);
+ ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL);
+ if (ev.type == GRPC_QUEUE_TIMEOUT) {
+ grpc_server_cancel_all_calls(server->wrapped);
+ rb_completion_queue_pluck(server->queue, NULL,
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ }
+ grpc_server_destroy(server->wrapped);
+ grpc_rb_completion_queue_destroy(server->queue);
+ server->wrapped = NULL;
+ server->queue = NULL;
}
- grpc_server_destroy(server->wrapped);
- grpc_rb_completion_queue_destroy(server->queue);
- server->wrapped = NULL;
- server->queue = NULL;
}
}
@@ -115,6 +120,7 @@ static const rb_data_type_t grpc_rb_server_data_type = {
static VALUE grpc_rb_server_alloc(VALUE cls) {
grpc_rb_server *wrapper = ALLOC(grpc_rb_server);
wrapper->wrapped = NULL;
+ wrapper->shutdown_started = (gpr_atm)0;
return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper);
}
diff --git a/src/ruby/qps/client.rb b/src/ruby/qps/client.rb
index 8aed866da5..817192626b 100644
--- a/src/ruby/qps/client.rb
+++ b/src/ruby/qps/client.rb
@@ -134,6 +134,7 @@ class BenchmarkClient
resp = stub.streaming_call(q.each_item)
start = Time.now
q.push(req)
+ pushed_sentinal = false
resp.each do |r|
@histogram.add((Time.now-start)*1e9)
if !@done
@@ -141,8 +142,9 @@ class BenchmarkClient
start = Time.now
q.push(req)
else
- q.push(self)
- break
+ q.push(self) unless pushed_sentinal
+ # Continue polling on the responses to consume and release resources
+ pushed_sentinal = true
end
end
end