aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/ext
diff options
context:
space:
mode:
authorGravatar vjpai <vpai@google.com>2015-05-18 09:20:43 -0700
committerGravatar vjpai <vpai@google.com>2015-05-18 09:20:43 -0700
commitccced5389de0258b9f1d37681efcb8be57a6ef10 (patch)
tree86eb0674d00268e333a7294c9a45bc49541e6d1e /src/ruby/ext
parent6a608020b0e10f258d64f7b2a5b5cc02643ed9bc (diff)
parentcc1c37c3e5aa7dc5e0da761e1bff0fe8e1fbaaee (diff)
Merge branch 'master' into poisson
Conflicts: Makefile
Diffstat (limited to 'src/ruby/ext')
-rw-r--r--src/ruby/ext/grpc/rb_call.c9
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c56
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.h4
-rw-r--r--src/ruby/ext/grpc/rb_server.c15
4 files changed, 20 insertions, 64 deletions
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index e76bb930ee..c46af250cd 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -581,7 +581,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
VALUE timeout, VALUE ops_hash) {
run_batch_stack st;
grpc_call *call = NULL;
- grpc_event *ev = NULL;
+ grpc_event ev;
grpc_call_error err;
VALUE result = Qnil;
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
@@ -605,15 +605,14 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
return Qnil;
}
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout);
- if (ev == NULL) {
+ if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_run_batch_stack_cleanup(&st);
rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out");
return Qnil;
}
- if (ev->data.op_complete != GRPC_OP_OK) {
+ if (!ev.success) {
grpc_run_batch_stack_cleanup(&st);
- rb_raise(grpc_rb_eCallError, "start_batch completion failed, (code=%d)",
- ev->data.op_complete);
+ rb_raise(grpc_rb_eCallError, "start_batch completion failed");
return Qnil;
}
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index 3cf6c313ee..fa4c566004 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -47,7 +47,7 @@ static VALUE grpc_rb_cCompletionQueue = Qnil;
/* Used to allow grpc_completion_queue_next call to release the GIL */
typedef struct next_call_stack {
grpc_completion_queue *cq;
- grpc_event *event;
+ grpc_event event;
gpr_timespec timeout;
void *tag;
} next_call_stack;
@@ -80,7 +80,7 @@ static void grpc_rb_completion_queue_shutdown_drain(grpc_completion_queue *cq) {
grpc_completion_queue_shutdown(cq);
next_call.cq = cq;
- next_call.event = NULL;
+ next_call.event.type = GRPC_QUEUE_TIMEOUT;
/* TODO: the timeout should be a module level constant that defaults
* to gpr_inf_future.
*
@@ -95,16 +95,12 @@ static void grpc_rb_completion_queue_shutdown_drain(grpc_completion_queue *cq) {
do {
rb_thread_call_without_gvl(grpc_rb_completion_queue_next_no_gil,
(void *)&next_call, NULL, NULL);
- if (next_call.event == NULL) {
- break;
- }
- type = next_call.event->type;
+ type = next_call.event.type;
+ if (type == GRPC_QUEUE_TIMEOUT) break;
if (type != GRPC_QUEUE_SHUTDOWN) {
++drained;
rb_warning("completion queue shutdown: %d undrained events", drained);
}
- grpc_event_finish(next_call.event);
- next_call.event = NULL;
} while (type != GRPC_QUEUE_SHUTDOWN);
}
@@ -138,49 +134,19 @@ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq);
}
-/* Blocks until the next event is available, and returns the event. */
-static VALUE grpc_rb_completion_queue_next(VALUE self, VALUE timeout) {
- next_call_stack next_call;
- MEMZERO(&next_call, next_call_stack, 1);
- TypedData_Get_Struct(self, grpc_completion_queue,
- &grpc_rb_completion_queue_data_type, next_call.cq);
- next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
- next_call.event = NULL;
- rb_thread_call_without_gvl(grpc_rb_completion_queue_next_no_gil,
- (void *)&next_call, NULL, NULL);
- if (next_call.event == NULL) {
- return Qnil;
- }
- return grpc_rb_new_event(next_call.event);
-}
-
/* Blocks until the next event for given tag is available, and returns the
* event. */
-VALUE grpc_rb_completion_queue_pluck(VALUE self, VALUE tag,
- VALUE timeout) {
- grpc_event *ev = grpc_rb_completion_queue_pluck_event(self, tag, timeout);
- if (ev == NULL) {
- return Qnil;
- }
- return grpc_rb_new_event(ev);
-}
-
-/* Blocks until the next event for given tag is available, and returns the
- * event. */
-grpc_event* grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
- VALUE timeout) {
+grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
+ VALUE timeout) {
next_call_stack next_call;
MEMZERO(&next_call, next_call_stack, 1);
TypedData_Get_Struct(self, grpc_completion_queue,
&grpc_rb_completion_queue_data_type, next_call.cq);
next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
next_call.tag = ROBJECT(tag);
- next_call.event = NULL;
+ next_call.event.type = GRPC_QUEUE_TIMEOUT;
rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
(void *)&next_call, NULL, NULL);
- if (next_call.event == NULL) {
- return NULL;
- }
return next_call.event;
}
@@ -193,14 +159,6 @@ void Init_grpc_completion_queue() {
this func, so no separate initialization step is necessary. */
rb_define_alloc_func(grpc_rb_cCompletionQueue,
grpc_rb_completion_queue_alloc);
-
- /* Add the next method that waits for the next event. */
- rb_define_method(grpc_rb_cCompletionQueue, "next",
- grpc_rb_completion_queue_next, 1);
-
- /* Add the pluck method that waits for the next event of given tag */
- rb_define_method(grpc_rb_cCompletionQueue, "pluck",
- grpc_rb_completion_queue_pluck, 2);
}
/* Gets the wrapped completion queue from the ruby wrapper */
diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h
index 4d0f49ac47..e4d04b10c8 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.h
+++ b/src/ruby/ext/grpc/rb_completion_queue.h
@@ -45,8 +45,8 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v);
*
* This avoids having code that holds the GIL repeated at multiple sites.
*/
-grpc_event* grpc_rb_completion_queue_pluck_event(VALUE cqueue, VALUE tag,
- VALUE timeout);
+grpc_event grpc_rb_completion_queue_pluck_event(VALUE cqueue, VALUE tag,
+ VALUE timeout);
/* Initializes the CompletionQueue class. */
void Init_grpc_completion_queue();
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index bc0878af05..0651c36c0b 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -123,7 +123,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) {
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type,
wrapper);
grpc_rb_hash_convert_to_channel_args(channel_args, &args);
- srv = grpc_server_create(cq, &args);
+ srv = grpc_server_create(&args);
if (args.args != NULL) {
xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */
@@ -131,6 +131,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) {
if (srv == NULL) {
rb_raise(rb_eRuntimeError, "could not create a gRPC server, not sure why");
}
+ grpc_server_register_completion_queue(srv, cq);
wrapper->wrapped = srv;
/* Add the cq as the server's mark object. This ensures the ruby cq can't be
@@ -203,7 +204,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
VALUE tag_new, VALUE timeout) {
grpc_rb_server *s = NULL;
grpc_call *call = NULL;
- grpc_event *ev = NULL;
+ grpc_event ev;
grpc_call_error err;
request_call_stack st;
VALUE result;
@@ -218,6 +219,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
err = grpc_server_request_call(
s->wrapped, &call, &st.details, &st.md_ary,
grpc_rb_get_wrapped_completion_queue(cqueue),
+ grpc_rb_get_wrapped_completion_queue(cqueue),
ROBJECT(tag_new));
if (err != GRPC_CALL_OK) {
grpc_request_call_stack_cleanup(&st);
@@ -227,15 +229,13 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
return Qnil;
}
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout);
- if (ev == NULL) {
+ if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_request_call_stack_cleanup(&st);
return Qnil;
}
- if (ev->data.op_complete != GRPC_OP_OK) {
+ if (!ev.success) {
grpc_request_call_stack_cleanup(&st);
- grpc_event_finish(ev);
- rb_raise(grpc_rb_eCallError, "request_call completion failed: (code=%d)",
- ev->data.op_complete);
+ rb_raise(grpc_rb_eCallError, "request_call completion failed");
return Qnil;
}
@@ -249,7 +249,6 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
grpc_rb_md_ary_to_h(&st.md_ary),
grpc_rb_wrap_call(call),
NULL);
- grpc_event_finish(ev);
grpc_request_call_stack_cleanup(&st);
return result;
}