aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/ext/grpc/rb_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/ext/grpc/rb_channel.c')
-rw-r--r--src/ruby/ext/grpc/rb_channel.c25
1 files changed, 19 insertions, 6 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 18a15d0125..e6d30a174b 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -40,6 +40,7 @@
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/time.h>
#include "rb_grpc.h"
#include "rb_call.h"
#include "rb_channel_args.h"
@@ -71,6 +72,7 @@ typedef struct grpc_rb_channel {
/* The actual channel */
grpc_channel *wrapped;
+ grpc_completion_queue *queue;
} grpc_rb_channel;
/* Destroys Channel instances. */
@@ -83,6 +85,7 @@ static void grpc_rb_channel_free(void *p) {
if (ch->wrapped != NULL) {
grpc_channel_destroy(ch->wrapped);
+ grpc_rb_completion_queue_destroy(ch->queue);
}
xfree(p);
@@ -165,6 +168,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
}
rb_ivar_set(self, id_target, target);
wrapper->wrapped = ch;
+ wrapper->queue = grpc_completion_queue_create(NULL);
return self;
}
@@ -203,16 +207,18 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
the completion queue with success=0 */
static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE last_state,
- VALUE cqueue,
- VALUE deadline,
- VALUE tag) {
+ VALUE deadline) {
grpc_rb_channel *wrapper = NULL;
grpc_channel *ch = NULL;
grpc_completion_queue *cq = NULL;
- cq = grpc_rb_get_wrapped_completion_queue(cqueue);
+ void *tag = wrapper;
+
+ grpc_event event;
+
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
+ cq = wrapper->queue;
if (ch == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
@@ -222,9 +228,16 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
(grpc_connectivity_state)NUM2LONG(last_state),
grpc_rb_time_timeval(deadline, /* absolute time */ 0),
cq,
- ROBJECT(tag));
+ tag);
- return Qnil;
+ event = rb_completion_queue_pluck(cq, tag,
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+
+ if (event.success) {
+ return Qtrue;
+ } else {
+ return Qfalse;
+ }
}
/* Create a call given a grpc_channel, in order to call method. The request