diff options
Diffstat (limited to 'src/ruby')
-rw-r--r-- | src/ruby/ext/grpc/rb_call.c | 8 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_completion_queue.c | 62 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_completion_queue.h | 4 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_server.c | 12 |
4 files changed, 57 insertions, 29 deletions
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index b43ad08eba..1b06273af9 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -722,10 +722,6 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) { return result; } -static void run_batch_unblock_func(void *call) { - grpc_call_cancel((grpc_call*)call, NULL); -} - /* call-seq: cq = CompletionQueue.new ops = { @@ -776,9 +772,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, grpc_call_error_detail_of(err), err); return Qnil; } - ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout, - run_batch_unblock_func, - (void*)call); + ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout); if (ev.type == GRPC_QUEUE_TIMEOUT) { grpc_run_batch_stack_cleanup(&st); rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index 4f671807eb..605c7408b4 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -52,21 +52,47 @@ typedef struct next_call_stack { grpc_event event; gpr_timespec timeout; void *tag; + volatile int interrupted; } next_call_stack; /* Calls grpc_completion_queue_next without holding the ruby GIL */ static void *grpc_rb_completion_queue_next_no_gil(void *param) { next_call_stack *const next_call = (next_call_stack*)param; - next_call->event = - grpc_completion_queue_next(next_call->cq, next_call->timeout, NULL); + gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN); + gpr_timespec deadline; + do { + deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment); + if (gpr_time_cmp(deadline, next_call->timeout) > 0) { + // Then we have run out of time + break; + } + next_call->event = grpc_completion_queue_next(next_call->cq, + deadline, NULL); + if (next_call->event.success) { + break; + } + } while (!next_call->interrupted); return NULL; } /* Calls grpc_completion_queue_pluck without holding the ruby GIL */ static void *grpc_rb_completion_queue_pluck_no_gil(void *param) { next_call_stack *const next_call = (next_call_stack*)param; - next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag, - next_call->timeout, NULL); + gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN); + gpr_timespec deadline; + do { + deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment); + if (gpr_time_cmp(deadline, next_call->timeout) > 0) { + // Then we have run out of time + break; + } + next_call->event = grpc_completion_queue_pluck(next_call->cq, + next_call->tag, + deadline, NULL); + if (next_call->event.type != GRPC_QUEUE_TIMEOUT) { + break; + } + } while (!next_call->interrupted); return NULL; } @@ -139,12 +165,15 @@ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) { return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq); } +static void unblock_func(void *param) { + next_call_stack *const next_call = (next_call_stack*)param; + next_call->interrupted = 1; +} + /* Blocks until the next event for given tag is available, and returns the * event. */ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, - VALUE timeout, - rb_unblock_function_t *ubf, - void *unblock_arg) { + VALUE timeout) { next_call_stack next_call; MEMZERO(&next_call, next_call_stack, 1); TypedData_Get_Struct(self, grpc_completion_queue, @@ -160,8 +189,23 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, next_call.tag = ROBJECT(tag); } next_call.event.type = GRPC_QUEUE_TIMEOUT; - rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil, - (void *)&next_call, ubf, unblock_arg); + /* Loop until we finish a pluck without an interruption. The internal + pluck function runs either until it is interrupted or it gets an + event, or time runs out. + + The basic reason we need this relatively complicated construction is that + we need to re-acquire the GVL when an interrupt comes in, so that the ruby + interpeter can do what it needs to do with the interrupt. But we also need + to get back to plucking when */ + do { + next_call.interrupted = 0; + rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil, + (void *)&next_call, unblock_func, + (void *)&next_call); + /* If an interrupt prevented pluck from returning useful information, then + any plucks that did complete must have timed out */ + } while (next_call.interrupted && + next_call.event.type == GRPC_QUEUE_TIMEOUT); return next_call.event; } diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h index 4bd5739869..42de43c3fb 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.h +++ b/src/ruby/ext/grpc/rb_completion_queue.h @@ -47,9 +47,7 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v); * This avoids having code that holds the GIL repeated at multiple sites. */ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, - VALUE timeout, - rb_unblock_function_t *ubf, - void *unblock_arg); + VALUE timeout); /* Initializes the CompletionQueue class. */ void Init_grpc_completion_queue(); diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index aa7fa0af8e..0899feb685 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -207,11 +207,6 @@ static void grpc_request_call_stack_cleanup(request_call_stack* st) { grpc_call_details_destroy(&st->details); } -static void request_call_unblock_func(void *ptr) { - grpc_rb_server *rb_srv = (grpc_rb_server*)ptr; - grpc_server_shutdown_and_notify(rb_srv->wrapped, rb_srv->queue, rb_srv); -} - /* call-seq: cq = CompletionQueue.new tag = Object.new @@ -249,9 +244,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue, return Qnil; } - ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout, - request_call_unblock_func, - (void*)s); + ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout); if (ev.type == GRPC_QUEUE_TIMEOUT) { grpc_request_call_stack_cleanup(&st); return Qnil; @@ -314,8 +307,7 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) { if (s->wrapped != NULL) { grpc_server_shutdown_and_notify(s->wrapped, cq, NULL); - ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout, - NULL, NULL); + ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout); if (!ev.success) { rb_warn("server shutdown failed, cancelling the calls, objects may leak"); grpc_server_cancel_all_calls(s->wrapped); |