diff options
author | vjpai <vpai@google.com> | 2015-05-18 09:20:43 -0700 |
---|---|---|
committer | vjpai <vpai@google.com> | 2015-05-18 09:20:43 -0700 |
commit | ccced5389de0258b9f1d37681efcb8be57a6ef10 (patch) | |
tree | 86eb0674d00268e333a7294c9a45bc49541e6d1e /src/ruby/ext | |
parent | 6a608020b0e10f258d64f7b2a5b5cc02643ed9bc (diff) | |
parent | cc1c37c3e5aa7dc5e0da761e1bff0fe8e1fbaaee (diff) |
Merge branch 'master' into poisson
Conflicts:
Makefile
Diffstat (limited to 'src/ruby/ext')
-rw-r--r-- | src/ruby/ext/grpc/rb_call.c | 9 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_completion_queue.c | 56 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_completion_queue.h | 4 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_server.c | 15 |
4 files changed, 20 insertions, 64 deletions
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index e76bb930ee..c46af250cd 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -581,7 +581,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, VALUE timeout, VALUE ops_hash) { run_batch_stack st; grpc_call *call = NULL; - grpc_event *ev = NULL; + grpc_event ev; grpc_call_error err; VALUE result = Qnil; TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); @@ -605,15 +605,14 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, return Qnil; } ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout); - if (ev == NULL) { + if (ev.type == GRPC_QUEUE_TIMEOUT) { grpc_run_batch_stack_cleanup(&st); rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); return Qnil; } - if (ev->data.op_complete != GRPC_OP_OK) { + if (!ev.success) { grpc_run_batch_stack_cleanup(&st); - rb_raise(grpc_rb_eCallError, "start_batch completion failed, (code=%d)", - ev->data.op_complete); + rb_raise(grpc_rb_eCallError, "start_batch completion failed"); return Qnil; } diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index 3cf6c313ee..fa4c566004 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -47,7 +47,7 @@ static VALUE grpc_rb_cCompletionQueue = Qnil; /* Used to allow grpc_completion_queue_next call to release the GIL */ typedef struct next_call_stack { grpc_completion_queue *cq; - grpc_event *event; + grpc_event event; gpr_timespec timeout; void *tag; } next_call_stack; @@ -80,7 +80,7 @@ static void grpc_rb_completion_queue_shutdown_drain(grpc_completion_queue *cq) { grpc_completion_queue_shutdown(cq); next_call.cq = cq; - next_call.event = NULL; + next_call.event.type = GRPC_QUEUE_TIMEOUT; /* TODO: the timeout should be a module level constant that defaults * to gpr_inf_future. * @@ -95,16 +95,12 @@ static void grpc_rb_completion_queue_shutdown_drain(grpc_completion_queue *cq) { do { rb_thread_call_without_gvl(grpc_rb_completion_queue_next_no_gil, (void *)&next_call, NULL, NULL); - if (next_call.event == NULL) { - break; - } - type = next_call.event->type; + type = next_call.event.type; + if (type == GRPC_QUEUE_TIMEOUT) break; if (type != GRPC_QUEUE_SHUTDOWN) { ++drained; rb_warning("completion queue shutdown: %d undrained events", drained); } - grpc_event_finish(next_call.event); - next_call.event = NULL; } while (type != GRPC_QUEUE_SHUTDOWN); } @@ -138,49 +134,19 @@ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) { return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq); } -/* Blocks until the next event is available, and returns the event. */ -static VALUE grpc_rb_completion_queue_next(VALUE self, VALUE timeout) { - next_call_stack next_call; - MEMZERO(&next_call, next_call_stack, 1); - TypedData_Get_Struct(self, grpc_completion_queue, - &grpc_rb_completion_queue_data_type, next_call.cq); - next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0); - next_call.event = NULL; - rb_thread_call_without_gvl(grpc_rb_completion_queue_next_no_gil, - (void *)&next_call, NULL, NULL); - if (next_call.event == NULL) { - return Qnil; - } - return grpc_rb_new_event(next_call.event); -} - /* Blocks until the next event for given tag is available, and returns the * event. */ -VALUE grpc_rb_completion_queue_pluck(VALUE self, VALUE tag, - VALUE timeout) { - grpc_event *ev = grpc_rb_completion_queue_pluck_event(self, tag, timeout); - if (ev == NULL) { - return Qnil; - } - return grpc_rb_new_event(ev); -} - -/* Blocks until the next event for given tag is available, and returns the - * event. */ -grpc_event* grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, - VALUE timeout) { +grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, + VALUE timeout) { next_call_stack next_call; MEMZERO(&next_call, next_call_stack, 1); TypedData_Get_Struct(self, grpc_completion_queue, &grpc_rb_completion_queue_data_type, next_call.cq); next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0); next_call.tag = ROBJECT(tag); - next_call.event = NULL; + next_call.event.type = GRPC_QUEUE_TIMEOUT; rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil, (void *)&next_call, NULL, NULL); - if (next_call.event == NULL) { - return NULL; - } return next_call.event; } @@ -193,14 +159,6 @@ void Init_grpc_completion_queue() { this func, so no separate initialization step is necessary. */ rb_define_alloc_func(grpc_rb_cCompletionQueue, grpc_rb_completion_queue_alloc); - - /* Add the next method that waits for the next event. */ - rb_define_method(grpc_rb_cCompletionQueue, "next", - grpc_rb_completion_queue_next, 1); - - /* Add the pluck method that waits for the next event of given tag */ - rb_define_method(grpc_rb_cCompletionQueue, "pluck", - grpc_rb_completion_queue_pluck, 2); } /* Gets the wrapped completion queue from the ruby wrapper */ diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h index 4d0f49ac47..e4d04b10c8 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.h +++ b/src/ruby/ext/grpc/rb_completion_queue.h @@ -45,8 +45,8 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v); * * This avoids having code that holds the GIL repeated at multiple sites. */ -grpc_event* grpc_rb_completion_queue_pluck_event(VALUE cqueue, VALUE tag, - VALUE timeout); +grpc_event grpc_rb_completion_queue_pluck_event(VALUE cqueue, VALUE tag, + VALUE timeout); /* Initializes the CompletionQueue class. */ void Init_grpc_completion_queue(); diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index bc0878af05..0651c36c0b 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -123,7 +123,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) { TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, wrapper); grpc_rb_hash_convert_to_channel_args(channel_args, &args); - srv = grpc_server_create(cq, &args); + srv = grpc_server_create(&args); if (args.args != NULL) { xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */ @@ -131,6 +131,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) { if (srv == NULL) { rb_raise(rb_eRuntimeError, "could not create a gRPC server, not sure why"); } + grpc_server_register_completion_queue(srv, cq); wrapper->wrapped = srv; /* Add the cq as the server's mark object. This ensures the ruby cq can't be @@ -203,7 +204,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue, VALUE tag_new, VALUE timeout) { grpc_rb_server *s = NULL; grpc_call *call = NULL; - grpc_event *ev = NULL; + grpc_event ev; grpc_call_error err; request_call_stack st; VALUE result; @@ -218,6 +219,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue, err = grpc_server_request_call( s->wrapped, &call, &st.details, &st.md_ary, grpc_rb_get_wrapped_completion_queue(cqueue), + grpc_rb_get_wrapped_completion_queue(cqueue), ROBJECT(tag_new)); if (err != GRPC_CALL_OK) { grpc_request_call_stack_cleanup(&st); @@ -227,15 +229,13 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue, return Qnil; } ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout); - if (ev == NULL) { + if (ev.type == GRPC_QUEUE_TIMEOUT) { grpc_request_call_stack_cleanup(&st); return Qnil; } - if (ev->data.op_complete != GRPC_OP_OK) { + if (!ev.success) { grpc_request_call_stack_cleanup(&st); - grpc_event_finish(ev); - rb_raise(grpc_rb_eCallError, "request_call completion failed: (code=%d)", - ev->data.op_complete); + rb_raise(grpc_rb_eCallError, "request_call completion failed"); return Qnil; } @@ -249,7 +249,6 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue, grpc_rb_md_ary_to_h(&st.md_ary), grpc_rb_wrap_call(call), NULL); - grpc_event_finish(ev); grpc_request_call_stack_cleanup(&st); return result; } |