aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby')
-rw-r--r--src/ruby/ext/grpc/rb_call.c8
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c62
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.h4
-rw-r--r--src/ruby/ext/grpc/rb_server.c12
4 files changed, 57 insertions, 29 deletions
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index b43ad08eba..1b06273af9 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -722,10 +722,6 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
return result;
}
-static void run_batch_unblock_func(void *call) {
- grpc_call_cancel((grpc_call*)call, NULL);
-}
-
/* call-seq:
cq = CompletionQueue.new
ops = {
@@ -776,9 +772,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
grpc_call_error_detail_of(err), err);
return Qnil;
}
- ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout,
- run_batch_unblock_func,
- (void*)call);
+ ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_run_batch_stack_cleanup(&st);
rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out");
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index 4f671807eb..605c7408b4 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -52,21 +52,47 @@ typedef struct next_call_stack {
grpc_event event;
gpr_timespec timeout;
void *tag;
+ volatile int interrupted;
} next_call_stack;
/* Calls grpc_completion_queue_next without holding the ruby GIL */
static void *grpc_rb_completion_queue_next_no_gil(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
- next_call->event =
- grpc_completion_queue_next(next_call->cq, next_call->timeout, NULL);
+ gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
+ gpr_timespec deadline;
+ do {
+ deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
+ if (gpr_time_cmp(deadline, next_call->timeout) > 0) {
+ // Then we have run out of time
+ break;
+ }
+ next_call->event = grpc_completion_queue_next(next_call->cq,
+ deadline, NULL);
+ if (next_call->event.success) {
+ break;
+ }
+ } while (!next_call->interrupted);
return NULL;
}
/* Calls grpc_completion_queue_pluck without holding the ruby GIL */
static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
- next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag,
- next_call->timeout, NULL);
+ gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
+ gpr_timespec deadline;
+ do {
+ deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
+ if (gpr_time_cmp(deadline, next_call->timeout) > 0) {
+ // Then we have run out of time
+ break;
+ }
+ next_call->event = grpc_completion_queue_pluck(next_call->cq,
+ next_call->tag,
+ deadline, NULL);
+ if (next_call->event.type != GRPC_QUEUE_TIMEOUT) {
+ break;
+ }
+ } while (!next_call->interrupted);
return NULL;
}
@@ -139,12 +165,15 @@ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq);
}
+static void unblock_func(void *param) {
+ next_call_stack *const next_call = (next_call_stack*)param;
+ next_call->interrupted = 1;
+}
+
/* Blocks until the next event for given tag is available, and returns the
* event. */
grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
- VALUE timeout,
- rb_unblock_function_t *ubf,
- void *unblock_arg) {
+ VALUE timeout) {
next_call_stack next_call;
MEMZERO(&next_call, next_call_stack, 1);
TypedData_Get_Struct(self, grpc_completion_queue,
@@ -160,8 +189,23 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
next_call.tag = ROBJECT(tag);
}
next_call.event.type = GRPC_QUEUE_TIMEOUT;
- rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
- (void *)&next_call, ubf, unblock_arg);
+ /* Loop until we finish a pluck without an interruption. The internal
+ pluck function runs either until it is interrupted or it gets an
+ event, or time runs out.
+
+ The basic reason we need this relatively complicated construction is that
+ we need to re-acquire the GVL when an interrupt comes in, so that the ruby
+ interpeter can do what it needs to do with the interrupt. But we also need
+ to get back to plucking when */
+ do {
+ next_call.interrupted = 0;
+ rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
+ (void *)&next_call, unblock_func,
+ (void *)&next_call);
+ /* If an interrupt prevented pluck from returning useful information, then
+ any plucks that did complete must have timed out */
+ } while (next_call.interrupted &&
+ next_call.event.type == GRPC_QUEUE_TIMEOUT);
return next_call.event;
}
diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h
index 4bd5739869..42de43c3fb 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.h
+++ b/src/ruby/ext/grpc/rb_completion_queue.h
@@ -47,9 +47,7 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v);
* This avoids having code that holds the GIL repeated at multiple sites.
*/
grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
- VALUE timeout,
- rb_unblock_function_t *ubf,
- void *unblock_arg);
+ VALUE timeout);
/* Initializes the CompletionQueue class. */
void Init_grpc_completion_queue();
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index aa7fa0af8e..0899feb685 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -207,11 +207,6 @@ static void grpc_request_call_stack_cleanup(request_call_stack* st) {
grpc_call_details_destroy(&st->details);
}
-static void request_call_unblock_func(void *ptr) {
- grpc_rb_server *rb_srv = (grpc_rb_server*)ptr;
- grpc_server_shutdown_and_notify(rb_srv->wrapped, rb_srv->queue, rb_srv);
-}
-
/* call-seq:
cq = CompletionQueue.new
tag = Object.new
@@ -249,9 +244,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
return Qnil;
}
- ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout,
- request_call_unblock_func,
- (void*)s);
+ ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_request_call_stack_cleanup(&st);
return Qnil;
@@ -314,8 +307,7 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) {
if (s->wrapped != NULL) {
grpc_server_shutdown_and_notify(s->wrapped, cq, NULL);
- ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout,
- NULL, NULL);
+ ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout);
if (!ev.success) {
rb_warn("server shutdown failed, cancelling the calls, objects may leak");
grpc_server_cancel_all_calls(s->wrapped);