aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2017-03-03 18:28:47 -0800
committerGravatar Sree Kuchibhotla <sreek@google.com>2017-03-03 18:28:47 -0800
commit98ab52003a7afbf1dc193d80ebecbb9d503078a9 (patch)
tree5dde32c46f459ce4f9f6725dea53fd28f1dfa6db /src
parent39bc495a1631a332a551e217de2bb6d69e8bbced (diff)
Ruby: Completion queue creation API changes
Diffstat (limited to 'src')
-rw-r--r--src/ruby/ext/grpc/rb_channel.c80
-rw-r--r--src/ruby/ext/grpc/rb_server.c57
2 files changed, 67 insertions, 70 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 84e43d3f7b..2b9f03abd4 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -33,20 +33,20 @@
#include <ruby/ruby.h>
-#include "rb_grpc_imports.generated.h"
-#include "rb_channel.h"
#include "rb_byte_buffer.h"
+#include "rb_channel.h"
+#include "rb_grpc_imports.generated.h"
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
-#include "rb_grpc.h"
#include "rb_call.h"
#include "rb_channel_args.h"
#include "rb_channel_credentials.h"
#include "rb_completion_queue.h"
+#include "rb_grpc.h"
#include "rb_server.h"
/* id_channel is the name of the hidden ivar that preserves a reference to the
@@ -104,13 +104,15 @@ static void grpc_rb_channel_mark(void *p) {
}
}
-static rb_data_type_t grpc_channel_data_type = {
- "grpc_channel",
- {grpc_rb_channel_mark, grpc_rb_channel_free, GRPC_RB_MEMSIZE_UNAVAILABLE,
- {NULL, NULL}},
- NULL, NULL,
+static rb_data_type_t grpc_channel_data_type = {"grpc_channel",
+ {grpc_rb_channel_mark,
+ grpc_rb_channel_free,
+ GRPC_RB_MEMSIZE_UNAVAILABLE,
+ {NULL, NULL}},
+ NULL,
+ NULL,
#ifdef RUBY_TYPED_FREE_IMMEDIATELY
- RUBY_TYPED_FREE_IMMEDIATELY
+ RUBY_TYPED_FREE_IMMEDIATELY
#endif
};
@@ -169,7 +171,8 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
}
rb_ivar_set(self, id_target, target);
wrapper->wrapped = ch;
- wrapper->queue = grpc_completion_queue_create(NULL);
+ wrapper->queue = grpc_completion_queue_create(GRPC_CQ_PLUCK,
+ GRPC_CQ_DEFAULT_POLLING, NULL);
return self;
}
@@ -225,14 +228,11 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
return Qnil;
}
grpc_channel_watch_connectivity_state(
- ch,
- (grpc_connectivity_state)NUM2LONG(last_state),
- grpc_rb_time_timeval(deadline, /* absolute time */ 0),
- cq,
- tag);
+ ch, (grpc_connectivity_state)NUM2LONG(last_state),
+ grpc_rb_time_timeval(deadline, /* absolute time */ 0), cq, tag);
- event = rb_completion_queue_pluck(cq, tag,
- gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ event = rb_completion_queue_pluck(cq, tag, gpr_inf_future(GPR_CLOCK_REALTIME),
+ NULL);
if (event.success) {
return Qtrue;
@@ -243,9 +243,9 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
/* Create a call given a grpc_channel, in order to call method. The request
is not sent until grpc_call_invoke is called. */
-static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
- VALUE mask, VALUE method,
- VALUE host, VALUE deadline) {
+static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
+ VALUE method, VALUE host,
+ VALUE deadline) {
VALUE res = Qnil;
grpc_rb_channel *wrapper = NULL;
grpc_call *call = NULL;
@@ -256,10 +256,11 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
grpc_slice method_slice;
grpc_slice host_slice;
grpc_slice *host_slice_ptr = NULL;
- char* tmp_str = NULL;
+ char *tmp_str = NULL;
if (host != Qnil) {
- host_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host));
+ host_slice =
+ grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host));
host_slice_ptr = &host_slice;
}
if (mask != Qnil) {
@@ -269,7 +270,8 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
parent_call = grpc_rb_get_wrapped_call(parent);
}
- cq = grpc_completion_queue_create(NULL);
+ cq = grpc_completion_queue_create(GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
+ NULL);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch == NULL) {
@@ -277,17 +279,18 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
return Qnil;
}
- method_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method));
+ method_slice =
+ grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method));
call = grpc_channel_create_call(ch, parent_call, flags, cq, method_slice,
- host_slice_ptr, grpc_rb_time_timeval(
- deadline,
- /* absolute time */ 0), NULL);
+ host_slice_ptr,
+ grpc_rb_time_timeval(deadline,
+ /* absolute time */ 0),
+ NULL);
if (call == NULL) {
tmp_str = grpc_slice_to_c_string(method_slice);
- rb_raise(rb_eRuntimeError, "cannot create call with method %s",
- tmp_str);
+ rb_raise(rb_eRuntimeError, "cannot create call with method %s", tmp_str);
return Qnil;
}
@@ -304,7 +307,6 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
return res;
}
-
/* Closes the channel, calling it's destroy method */
static VALUE grpc_rb_channel_destroy(VALUE self) {
grpc_rb_channel *wrapper = NULL;
@@ -320,12 +322,11 @@ static VALUE grpc_rb_channel_destroy(VALUE self) {
return Qnil;
}
-
/* Called to obtain the target that this channel accesses. */
static VALUE grpc_rb_channel_get_target(VALUE self) {
grpc_rb_channel *wrapper = NULL;
VALUE res = Qnil;
- char* target = NULL;
+ char *target = NULL;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
target = grpc_channel_get_target(wrapper->wrapped);
@@ -337,8 +338,8 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
static void Init_grpc_propagate_masks() {
/* Constants representing call propagation masks in grpc.h */
- VALUE grpc_rb_mPropagateMasks = rb_define_module_under(
- grpc_rb_mGrpcCore, "PropagateMasks");
+ VALUE grpc_rb_mPropagateMasks =
+ rb_define_module_under(grpc_rb_mGrpcCore, "PropagateMasks");
rb_define_const(grpc_rb_mPropagateMasks, "DEADLINE",
UINT2NUM(GRPC_PROPAGATE_DEADLINE));
rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_STATS_CONTEXT",
@@ -353,8 +354,8 @@ static void Init_grpc_propagate_masks() {
static void Init_grpc_connectivity_states() {
/* Constants representing call propagation masks in grpc.h */
- VALUE grpc_rb_mConnectivityStates = rb_define_module_under(
- grpc_rb_mGrpcCore, "ConnectivityStates");
+ VALUE grpc_rb_mConnectivityStates =
+ rb_define_module_under(grpc_rb_mGrpcCore, "ConnectivityStates");
rb_define_const(grpc_rb_mConnectivityStates, "IDLE",
LONG2NUM(GRPC_CHANNEL_IDLE));
rb_define_const(grpc_rb_mConnectivityStates, "CONNECTING",
@@ -382,12 +383,11 @@ void Init_grpc_channel() {
/* Add ruby analogues of the Channel methods. */
rb_define_method(grpc_rb_cChannel, "connectivity_state",
- grpc_rb_channel_get_connectivity_state,
- -1);
+ grpc_rb_channel_get_connectivity_state, -1);
rb_define_method(grpc_rb_cChannel, "watch_connectivity_state",
grpc_rb_channel_watch_connectivity_state, 4);
- rb_define_method(grpc_rb_cChannel, "create_call",
- grpc_rb_channel_create_call, 5);
+ rb_define_method(grpc_rb_cChannel, "create_call", grpc_rb_channel_create_call,
+ 5);
rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
rb_define_alias(grpc_rb_cChannel, "close", "destroy");
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 7b2f5774aa..a0e8f15294 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -37,15 +37,15 @@
#include "rb_server.h"
#include <grpc/grpc.h>
-#include <grpc/support/atm.h>
#include <grpc/grpc_security.h>
+#include <grpc/support/atm.h>
#include <grpc/support/log.h>
+#include "rb_byte_buffer.h"
#include "rb_call.h"
#include "rb_channel_args.h"
#include "rb_completion_queue.h"
-#include "rb_server_credentials.h"
-#include "rb_byte_buffer.h"
#include "rb_grpc.h"
+#include "rb_server_credentials.h"
/* grpc_rb_cServer is the ruby class that proxies grpc_server. */
static VALUE grpc_rb_cServer = Qnil;
@@ -93,9 +93,8 @@ static void grpc_rb_server_free(void *p) {
};
svr = (grpc_rb_server *)p;
- deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(2, GPR_TIMESPAN));
+ deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(2, GPR_TIMESPAN));
destroy_server(svr, deadline);
@@ -104,13 +103,15 @@ static void grpc_rb_server_free(void *p) {
static const rb_data_type_t grpc_rb_server_data_type = {
"grpc_server",
- {GRPC_RB_GC_NOT_MARKED, grpc_rb_server_free, GRPC_RB_MEMSIZE_UNAVAILABLE,
+ {GRPC_RB_GC_NOT_MARKED,
+ grpc_rb_server_free,
+ GRPC_RB_MEMSIZE_UNAVAILABLE,
{NULL, NULL}},
NULL,
NULL,
#ifdef RUBY_TYPED_FREE_IMMEDIATELY
- /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free function would block
- * and we might want to unlock GVL
+ /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free
+ * function would block and we might want to unlock GVL
* TODO(yugui) Unlock GVL?
*/
0,
@@ -131,7 +132,8 @@ static VALUE grpc_rb_server_alloc(VALUE cls) {
Initializes server instances. */
static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) {
- grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
+ grpc_completion_queue *cq = grpc_completion_queue_create(
+ GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, NULL);
grpc_rb_server *wrapper = NULL;
grpc_server *srv = NULL;
grpc_channel_args args;
@@ -163,7 +165,7 @@ typedef struct request_call_stack {
/* grpc_request_call_stack_init ensures the request_call_stack is properly
* initialized */
-static void grpc_request_call_stack_init(request_call_stack* st) {
+static void grpc_request_call_stack_init(request_call_stack *st) {
MEMZERO(st, request_call_stack, 1);
grpc_metadata_array_init(&st->md_ary);
grpc_call_details_init(&st->details);
@@ -171,7 +173,7 @@ static void grpc_request_call_stack_init(request_call_stack* st) {
/* grpc_request_call_stack_cleanup ensures the request_call_stack is properly
* cleaned up */
-static void grpc_request_call_stack_cleanup(request_call_stack* st) {
+static void grpc_request_call_stack_cleanup(request_call_stack *st) {
grpc_metadata_array_destroy(&st->md_ary);
grpc_call_details_destroy(&st->details);
}
@@ -187,8 +189,9 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
grpc_call_error err;
request_call_stack st;
VALUE result;
- void *tag = (void*)&st;
- grpc_completion_queue *call_queue = grpc_completion_queue_create(NULL);
+ void *tag = (void *)&st;
+ grpc_completion_queue *call_queue = grpc_completion_queue_create(
+ GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, NULL);
gpr_timespec deadline;
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
@@ -199,9 +202,8 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
grpc_request_call_stack_init(&st);
/* call grpc_server_request_call, then wait for it to complete using
* pluck_event */
- err = grpc_server_request_call(
- s->wrapped, &call, &st.details, &st.md_ary,
- call_queue, s->queue, tag);
+ err = grpc_server_request_call(s->wrapped, &call, &st.details, &st.md_ary,
+ call_queue, s->queue, tag);
if (err != GRPC_CALL_OK) {
grpc_request_call_stack_cleanup(&st);
rb_raise(grpc_rb_eCallError,
@@ -218,8 +220,6 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
return Qnil;
}
-
-
/* build the NewServerRpc struct result */
deadline = gpr_convert_clock_type(st.details.deadline, GPR_CLOCK_REALTIME);
result = rb_struct_new(
@@ -299,8 +299,7 @@ static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port,
return Qnil;
} else if (TYPE(rb_creds) == T_SYMBOL) {
if (id_insecure_server != SYM2ID(rb_creds)) {
- rb_raise(rb_eTypeError,
- "bad creds symbol, want :this_port_is_insecure");
+ rb_raise(rb_eTypeError, "bad creds symbol, want :this_port_is_insecure");
return Qnil;
}
recvd_port =
@@ -312,9 +311,8 @@ static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port,
}
} else {
creds = grpc_rb_get_wrapped_server_credentials(rb_creds);
- recvd_port =
- grpc_server_add_secure_http2_port(s->wrapped, StringValueCStr(port),
- creds);
+ recvd_port = grpc_server_add_secure_http2_port(
+ s->wrapped, StringValueCStr(port), creds);
if (recvd_port == 0) {
rb_raise(rb_eRuntimeError,
"could not add secure port %s to server, not sure why",
@@ -333,18 +331,17 @@ void Init_grpc_server() {
/* Provides a ruby constructor and support for dup/clone. */
rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 1);
- rb_define_method(grpc_rb_cServer, "initialize_copy",
- grpc_rb_cannot_init_copy, 1);
+ rb_define_method(grpc_rb_cServer, "initialize_copy", grpc_rb_cannot_init_copy,
+ 1);
/* Add the server methods. */
- rb_define_method(grpc_rb_cServer, "request_call",
- grpc_rb_server_request_call, 0);
+ rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call,
+ 0);
rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1);
rb_define_alias(grpc_rb_cServer, "close", "destroy");
rb_define_method(grpc_rb_cServer, "add_http2_port",
- grpc_rb_server_add_http2_port,
- 2);
+ grpc_rb_server_add_http2_port, 2);
id_at = rb_intern("at");
id_insecure_server = rb_intern("this_port_is_insecure");
}