aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/ext')
-rw-r--r--src/ruby/ext/grpc/rb_call.c11
-rw-r--r--src/ruby/ext/grpc/rb_call.h2
-rw-r--r--src/ruby/ext/grpc/rb_channel.c23
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c7
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.h2
-rw-r--r--src/ruby/ext/grpc/rb_server.c32
6 files changed, 31 insertions, 46 deletions
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index f2c567c7da..c1f5075b52 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -94,7 +94,6 @@ typedef struct grpc_rb_call {
} grpc_rb_call;
static void destroy_call(grpc_rb_call *call) {
- call = (grpc_rb_call *)p;
grpc_call_destroy(call->wrapped);
grpc_rb_completion_queue_destroy(call->queue);
}
@@ -783,8 +782,11 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
grpc_call_error_detail_of(err), err);
return Qnil;
}
- ev = grpc_rb_completion_queue_pluck(call->queue, tag, gpr_inf_future, NULL);
-
+ ev = rb_completion_queue_pluck(call->queue, tag,
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ if (!ev.success) {
+ rb_raise(grpc_rb_eCallError, "call#run_batch failed somehow");
+ }
/* Build and return the BatchResult struct result,
if there is an error, it's reflected in the status */
result = grpc_run_batch_stack_build_result(&st);
@@ -943,10 +945,11 @@ grpc_call *grpc_rb_get_wrapped_call(VALUE v) {
/* Obtains the wrapped object for a given call */
VALUE grpc_rb_wrap_call(grpc_call *c, grpc_completion_queue *q) {
+ grpc_rb_call *wrapper;
if (c == NULL || q == NULL) {
return Qnil;
}
- grpc_rb_call *wrapper = ALLOC(grpc_rb_call);
+ wrapper = ALLOC(grpc_rb_call);
wrapper->wrapped = c;
wrapper->queue = q;
return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, wrapper);
diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h
index 24adb3477b..56becdc5a4 100644
--- a/src/ruby/ext/grpc/rb_call.h
+++ b/src/ruby/ext/grpc/rb_call.h
@@ -42,7 +42,7 @@
grpc_call* grpc_rb_get_wrapped_call(VALUE v);
/* Gets the VALUE corresponding to given grpc_call. */
-VALUE grpc_rb_wrap_call(grpc_call* c);
+VALUE grpc_rb_wrap_call(grpc_call *c, grpc_completion_queue *q);
/* Provides the details of an call error */
const char* grpc_call_error_detail_of(grpc_call_error err);
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 9d29b96a35..f5190b85f7 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -55,11 +55,6 @@ static ID id_channel;
* GCed before the channel */
static ID id_target;
-/* id_cqueue is the name of the hidden ivar that preserves a reference to the
- * completion queue used to create the call, preserved so that it does not get
- * GCed before the channel */
-static ID id_cqueue;
-
/* id_insecure_channel is used to indicate that a channel is insecure */
static VALUE id_insecure_channel;
@@ -233,10 +228,9 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
/* Create a call given a grpc_channel, in order to call method. The request
is not sent until grpc_call_invoke is called. */
-static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue,
- VALUE parent, VALUE mask,
- VALUE method, VALUE host,
- VALUE deadline) {
+static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
+ VALUE mask, VALUE method,
+ VALUE host, VALUE deadline) {
VALUE res = Qnil;
grpc_rb_channel *wrapper = NULL;
grpc_call *call = NULL;
@@ -256,7 +250,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue,
parent_call = grpc_rb_get_wrapped_call(parent);
}
- cq = grpc_rb_get_wrapped_completion_queue(cqueue);
+ cq = grpc_completion_queue_create(NULL);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch == NULL) {
@@ -273,15 +267,11 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue,
method_chars);
return Qnil;
}
- res = grpc_rb_wrap_call(call);
+ res = grpc_rb_wrap_call(call, cq);
/* Make this channel an instance attribute of the call so that it is not GCed
* before the call. */
rb_ivar_set(res, id_channel, self);
-
- /* Make the completion queue an instance attribute of the call so that it is
- * not GCed before the call. */
- rb_ivar_set(res, id_cqueue, cqueue);
return res;
}
@@ -368,13 +358,12 @@ void Init_grpc_channel() {
rb_define_method(grpc_rb_cChannel, "watch_connectivity_state",
grpc_rb_channel_watch_connectivity_state, 4);
rb_define_method(grpc_rb_cChannel, "create_call",
- grpc_rb_channel_create_call, 6);
+ grpc_rb_channel_create_call, 5);
rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
rb_define_alias(grpc_rb_cChannel, "close", "destroy");
id_channel = rb_intern("__channel");
- id_cqueue = rb_intern("__cqueue");
id_target = rb_intern("__target");
rb_define_const(grpc_rb_cChannel, "SSL_TARGET",
ID2SYM(rb_intern(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)));
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index 1ac2ef2f33..1a69a175c4 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -125,11 +125,6 @@ static void grpc_rb_completion_queue_shutdown_drain(grpc_completion_queue *cq) {
/* Helper function to free a completion queue. */
void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq) {
- grpc_completion_queue *cq = NULL;
- if (p == NULL) {
- return;
- }
- cq = (grpc_completion_queue *)p;
grpc_rb_completion_queue_shutdown_drain(cq);
grpc_completion_queue_destroy(cq);
}
@@ -141,7 +136,7 @@ static void unblock_func(void *param) {
/* Does the same thing as grpc_completion_queue_pluck, while properly releasing
the GVL and handling interrupts */
-grpc_event rb_completion_queue_pluck(grpc_completion_queue queue, void *tag,
+grpc_event rb_completion_queue_pluck(grpc_completion_queue *queue, void *tag,
gpr_timespec deadline, void *reserved) {
next_call_stack next_call;
MEMZERO(&next_call, next_call_stack, 1);
diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h
index 3543e86275..9f8f6aa5ff 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.h
+++ b/src/ruby/ext/grpc/rb_completion_queue.h
@@ -48,7 +48,7 @@ void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq);
*
* This avoids having code that holds the GIL repeated at multiple sites.
*/
-grpc_event rb_completion_queue_pluck(grpc_completion_queue queue, void *tag,
+grpc_event rb_completion_queue_pluck(grpc_completion_queue *queue, void *tag,
gpr_timespec deadline, void *reserved);
#endif /* GRPC_RB_COMPLETION_QUEUE_H_ */
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index cf430495c8..1ed2c35da3 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -70,7 +70,8 @@ static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) {
ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
grpc_server_cancel_all_calls(server->wrapped);
- rb_completion_queue_pluck(server->queue, NULL, gpr_inf_future, NULL);
+ rb_completion_queue_pluck(server->queue, NULL,
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
}
grpc_server_destroy(server->wrapped);
grpc_rb_completion_queue_destroy(server->queue);
@@ -82,13 +83,17 @@ static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) {
/* Destroys server instances. */
static void grpc_rb_server_free(void *p) {
grpc_rb_server *svr = NULL;
+ gpr_timespec deadline;
if (p == NULL) {
return;
};
svr = (grpc_rb_server *)p;
- // TODO(murgatroid99): Maybe don't wait forever for the server to shutdown
- destroy_server(svr, gpr_inf_future);
+ deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(2, GPR_TIMESPAN));
+
+ destroy_server(svr, deadline);
xfree(p);
}
@@ -154,9 +159,6 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) {
wrapper->wrapped = srv;
wrapper->queue = cq;
- /* Add the cq as the server's mark object. This ensures the ruby cq can't be
- GCed before the server */
- wrapper->mark = cqueue;
return self;
}
@@ -218,7 +220,7 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
}
ev = rb_completion_queue_pluck(s->queue, tag,
- gpr_inf_future, NULL);
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
if (!ev.success) {
grpc_request_call_stack_cleanup(&st);
rb_raise(grpc_rb_eCallError, "request_call completion failed");
@@ -251,27 +253,23 @@ static VALUE grpc_rb_server_start(VALUE self) {
/*
call-seq:
- cq = CompletionQueue.new
- server = Server.new(cq, {'arg1': 'value1'})
+ server = Server.new({'arg1': 'value1'})
... // do stuff with server
...
... // to shutdown the server
- server.destroy(cq)
+ server.destroy()
... // to shutdown the server with a timeout
- server.destroy(cq, timeout)
+ server.destroy(timeout)
Destroys server instances. */
static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) {
- VALUE cqueue = Qnil;
VALUE timeout = Qnil;
- grpc_completion_queue *cq = NULL;
- grpc_event ev;
+ gpr_timespec deadline;
grpc_rb_server *s = NULL;
- /* "11" == 1 mandatory args, 1 (timeout) is optional */
- rb_scan_args(argc, argv, "11", &cqueue, &timeout);
- cq = grpc_rb_get_wrapped_completion_queue(cqueue);
+ /* "01" == 0 mandatory args, 1 (timeout) is optional */
+ rb_scan_args(argc, argv, "01", &timeout);
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (TYPE(timeout) == T_NIL) {
deadline = gpr_inf_future(GPR_CLOCK_REALTIME);