diff options
Diffstat (limited to 'src/ruby/ext')
-rw-r--r-- | src/ruby/ext/grpc/rb_call.c | 11 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_call.h | 2 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_channel.c | 23 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_completion_queue.c | 7 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_completion_queue.h | 2 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_server.c | 32 |
6 files changed, 31 insertions, 46 deletions
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index f2c567c7da..c1f5075b52 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -94,7 +94,6 @@ typedef struct grpc_rb_call { } grpc_rb_call; static void destroy_call(grpc_rb_call *call) { - call = (grpc_rb_call *)p; grpc_call_destroy(call->wrapped); grpc_rb_completion_queue_destroy(call->queue); } @@ -783,8 +782,11 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) { grpc_call_error_detail_of(err), err); return Qnil; } - ev = grpc_rb_completion_queue_pluck(call->queue, tag, gpr_inf_future, NULL); - + ev = rb_completion_queue_pluck(call->queue, tag, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + if (!ev.success) { + rb_raise(grpc_rb_eCallError, "call#run_batch failed somehow"); + } /* Build and return the BatchResult struct result, if there is an error, it's reflected in the status */ result = grpc_run_batch_stack_build_result(&st); @@ -943,10 +945,11 @@ grpc_call *grpc_rb_get_wrapped_call(VALUE v) { /* Obtains the wrapped object for a given call */ VALUE grpc_rb_wrap_call(grpc_call *c, grpc_completion_queue *q) { + grpc_rb_call *wrapper; if (c == NULL || q == NULL) { return Qnil; } - grpc_rb_call *wrapper = ALLOC(grpc_rb_call); + wrapper = ALLOC(grpc_rb_call); wrapper->wrapped = c; wrapper->queue = q; return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, wrapper); diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h index 24adb3477b..56becdc5a4 100644 --- a/src/ruby/ext/grpc/rb_call.h +++ b/src/ruby/ext/grpc/rb_call.h @@ -42,7 +42,7 @@ grpc_call* grpc_rb_get_wrapped_call(VALUE v); /* Gets the VALUE corresponding to given grpc_call. */ -VALUE grpc_rb_wrap_call(grpc_call* c); +VALUE grpc_rb_wrap_call(grpc_call *c, grpc_completion_queue *q); /* Provides the details of an call error */ const char* grpc_call_error_detail_of(grpc_call_error err); diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 9d29b96a35..f5190b85f7 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -55,11 +55,6 @@ static ID id_channel; * GCed before the channel */ static ID id_target; -/* id_cqueue is the name of the hidden ivar that preserves a reference to the - * completion queue used to create the call, preserved so that it does not get - * GCed before the channel */ -static ID id_cqueue; - /* id_insecure_channel is used to indicate that a channel is insecure */ static VALUE id_insecure_channel; @@ -233,10 +228,9 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, /* Create a call given a grpc_channel, in order to call method. The request is not sent until grpc_call_invoke is called. */ -static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, - VALUE parent, VALUE mask, - VALUE method, VALUE host, - VALUE deadline) { +static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, + VALUE mask, VALUE method, + VALUE host, VALUE deadline) { VALUE res = Qnil; grpc_rb_channel *wrapper = NULL; grpc_call *call = NULL; @@ -256,7 +250,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, parent_call = grpc_rb_get_wrapped_call(parent); } - cq = grpc_rb_get_wrapped_completion_queue(cqueue); + cq = grpc_completion_queue_create(NULL); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; if (ch == NULL) { @@ -273,15 +267,11 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, method_chars); return Qnil; } - res = grpc_rb_wrap_call(call); + res = grpc_rb_wrap_call(call, cq); /* Make this channel an instance attribute of the call so that it is not GCed * before the call. */ rb_ivar_set(res, id_channel, self); - - /* Make the completion queue an instance attribute of the call so that it is - * not GCed before the call. */ - rb_ivar_set(res, id_cqueue, cqueue); return res; } @@ -368,13 +358,12 @@ void Init_grpc_channel() { rb_define_method(grpc_rb_cChannel, "watch_connectivity_state", grpc_rb_channel_watch_connectivity_state, 4); rb_define_method(grpc_rb_cChannel, "create_call", - grpc_rb_channel_create_call, 6); + grpc_rb_channel_create_call, 5); rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0); rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0); rb_define_alias(grpc_rb_cChannel, "close", "destroy"); id_channel = rb_intern("__channel"); - id_cqueue = rb_intern("__cqueue"); id_target = rb_intern("__target"); rb_define_const(grpc_rb_cChannel, "SSL_TARGET", ID2SYM(rb_intern(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG))); diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index 1ac2ef2f33..1a69a175c4 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -125,11 +125,6 @@ static void grpc_rb_completion_queue_shutdown_drain(grpc_completion_queue *cq) { /* Helper function to free a completion queue. */ void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq) { - grpc_completion_queue *cq = NULL; - if (p == NULL) { - return; - } - cq = (grpc_completion_queue *)p; grpc_rb_completion_queue_shutdown_drain(cq); grpc_completion_queue_destroy(cq); } @@ -141,7 +136,7 @@ static void unblock_func(void *param) { /* Does the same thing as grpc_completion_queue_pluck, while properly releasing the GVL and handling interrupts */ -grpc_event rb_completion_queue_pluck(grpc_completion_queue queue, void *tag, +grpc_event rb_completion_queue_pluck(grpc_completion_queue *queue, void *tag, gpr_timespec deadline, void *reserved) { next_call_stack next_call; MEMZERO(&next_call, next_call_stack, 1); diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h index 3543e86275..9f8f6aa5ff 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.h +++ b/src/ruby/ext/grpc/rb_completion_queue.h @@ -48,7 +48,7 @@ void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq); * * This avoids having code that holds the GIL repeated at multiple sites. */ -grpc_event rb_completion_queue_pluck(grpc_completion_queue queue, void *tag, +grpc_event rb_completion_queue_pluck(grpc_completion_queue *queue, void *tag, gpr_timespec deadline, void *reserved); #endif /* GRPC_RB_COMPLETION_QUEUE_H_ */ diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index cf430495c8..1ed2c35da3 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -70,7 +70,8 @@ static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) { ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL); if (ev.type == GRPC_QUEUE_TIMEOUT) { grpc_server_cancel_all_calls(server->wrapped); - rb_completion_queue_pluck(server->queue, NULL, gpr_inf_future, NULL); + rb_completion_queue_pluck(server->queue, NULL, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); } grpc_server_destroy(server->wrapped); grpc_rb_completion_queue_destroy(server->queue); @@ -82,13 +83,17 @@ static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) { /* Destroys server instances. */ static void grpc_rb_server_free(void *p) { grpc_rb_server *svr = NULL; + gpr_timespec deadline; if (p == NULL) { return; }; svr = (grpc_rb_server *)p; - // TODO(murgatroid99): Maybe don't wait forever for the server to shutdown - destroy_server(svr, gpr_inf_future); + deadline = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(2, GPR_TIMESPAN)); + + destroy_server(svr, deadline); xfree(p); } @@ -154,9 +159,6 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) { wrapper->wrapped = srv; wrapper->queue = cq; - /* Add the cq as the server's mark object. This ensures the ruby cq can't be - GCed before the server */ - wrapper->mark = cqueue; return self; } @@ -218,7 +220,7 @@ static VALUE grpc_rb_server_request_call(VALUE self) { } ev = rb_completion_queue_pluck(s->queue, tag, - gpr_inf_future, NULL); + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); if (!ev.success) { grpc_request_call_stack_cleanup(&st); rb_raise(grpc_rb_eCallError, "request_call completion failed"); @@ -251,27 +253,23 @@ static VALUE grpc_rb_server_start(VALUE self) { /* call-seq: - cq = CompletionQueue.new - server = Server.new(cq, {'arg1': 'value1'}) + server = Server.new({'arg1': 'value1'}) ... // do stuff with server ... ... // to shutdown the server - server.destroy(cq) + server.destroy() ... // to shutdown the server with a timeout - server.destroy(cq, timeout) + server.destroy(timeout) Destroys server instances. */ static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) { - VALUE cqueue = Qnil; VALUE timeout = Qnil; - grpc_completion_queue *cq = NULL; - grpc_event ev; + gpr_timespec deadline; grpc_rb_server *s = NULL; - /* "11" == 1 mandatory args, 1 (timeout) is optional */ - rb_scan_args(argc, argv, "11", &cqueue, &timeout); - cq = grpc_rb_get_wrapped_completion_queue(cqueue); + /* "01" == 0 mandatory args, 1 (timeout) is optional */ + rb_scan_args(argc, argv, "01", &timeout); TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); if (TYPE(timeout) == T_NIL) { deadline = gpr_inf_future(GPR_CLOCK_REALTIME); |