aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/ext/grpc/rb_completion_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/ext/grpc/rb_completion_queue.c')
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c65
1 files changed, 59 insertions, 6 deletions
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index 4bb615f8be..9466402db0 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -52,21 +52,41 @@ typedef struct next_call_stack {
grpc_event event;
gpr_timespec timeout;
void *tag;
+ volatile int interrupted;
} next_call_stack;
/* Calls grpc_completion_queue_next without holding the ruby GIL */
static void *grpc_rb_completion_queue_next_no_gil(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
- next_call->event =
- grpc_completion_queue_next(next_call->cq, next_call->timeout, NULL);
+ gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
+ gpr_timespec deadline;
+ do {
+ deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
+ next_call->event = grpc_completion_queue_next(next_call->cq,
+ deadline, NULL);
+ if (next_call->event.type != GRPC_QUEUE_TIMEOUT ||
+ gpr_time_cmp(deadline, next_call->timeout) > 0) {
+ break;
+ }
+ } while (!next_call->interrupted);
return NULL;
}
/* Calls grpc_completion_queue_pluck without holding the ruby GIL */
static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
- next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag,
- next_call->timeout, NULL);
+ gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
+ gpr_timespec deadline;
+ do {
+ deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
+ next_call->event = grpc_completion_queue_pluck(next_call->cq,
+ next_call->tag,
+ deadline, NULL);
+ if (next_call->event.type != GRPC_QUEUE_TIMEOUT ||
+ gpr_time_cmp(deadline, next_call->timeout) > 0) {
+ break;
+ }
+ } while (!next_call->interrupted);
return NULL;
}
@@ -130,6 +150,14 @@ static rb_data_type_t grpc_rb_completion_queue_data_type = {
#endif
};
+/* Releases the c-level resources associated with a completion queue */
+static VALUE grpc_rb_completion_queue_close(VALUE self) {
+ grpc_completion_queue* cq = grpc_rb_get_wrapped_completion_queue(self);
+ grpc_rb_completion_queue_destroy(cq);
+ RTYPEDDATA_DATA(self) = NULL;
+ return Qnil;
+}
+
/* Allocates a completion queue. */
static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
@@ -139,6 +167,11 @@ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq);
}
+static void unblock_func(void *param) {
+ next_call_stack *const next_call = (next_call_stack*)param;
+ next_call->interrupted = 1;
+}
+
/* Blocks until the next event for given tag is available, and returns the
* event. */
grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
@@ -158,8 +191,23 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
next_call.tag = ROBJECT(tag);
}
next_call.event.type = GRPC_QUEUE_TIMEOUT;
- rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
- (void *)&next_call, NULL, NULL);
+ /* Loop until we finish a pluck without an interruption. The internal
+ pluck function runs either until it is interrupted or it gets an
+ event, or time runs out.
+
+ The basic reason we need this relatively complicated construction is that
+ we need to re-acquire the GVL when an interrupt comes in, so that the ruby
+ interpreter can do what it needs to do with the interrupt. But we also need
+ to get back to plucking when the interrupt has been handled. */
+ do {
+ next_call.interrupted = 0;
+ rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
+ (void *)&next_call, unblock_func,
+ (void *)&next_call);
+ /* If an interrupt prevented pluck from returning useful information, then
+ any plucks that did complete must have timed out */
+ } while (next_call.interrupted &&
+ next_call.event.type == GRPC_QUEUE_TIMEOUT);
return next_call.event;
}
@@ -172,6 +220,11 @@ void Init_grpc_completion_queue() {
this func, so no separate initialization step is necessary. */
rb_define_alloc_func(grpc_rb_cCompletionQueue,
grpc_rb_completion_queue_alloc);
+
+ /* close: Provides a way to close the underlying file descriptor without
+ waiting for ruby garbage collection. */
+ rb_define_method(grpc_rb_cCompletionQueue, "close",
+ grpc_rb_completion_queue_close, 0);
}
/* Gets the wrapped completion queue from the ruby wrapper */