aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/ext/grpc
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <y-zeng@users.noreply.github.com>2017-04-25 11:08:04 -0700
committerGravatar GitHub <noreply@github.com>2017-04-25 11:08:04 -0700
commit79759fea1abd09102d38af3e13a84b69e898124b (patch)
tree4ac7c385d528d08d9e2d6d44bb38eec8402f5802 /src/ruby/ext/grpc
parentdc36f4df6aba60275a31227e1d29c4d20b6cadca (diff)
parentb864e7c41c6d0363e23093fb090625f260994962 (diff)
Merge branch 'master' into v1.3.x
Diffstat (limited to 'src/ruby/ext/grpc')
-rw-r--r--src/ruby/ext/grpc/rb_byte_buffer.c14
-rw-r--r--src/ruby/ext/grpc/rb_call.c104
-rw-r--r--src/ruby/ext/grpc/rb_call.h7
-rw-r--r--src/ruby/ext/grpc/rb_call_credentials.c45
-rw-r--r--src/ruby/ext/grpc/rb_channel.c74
-rw-r--r--src/ruby/ext/grpc/rb_channel_args.c20
-rw-r--r--src/ruby/ext/grpc/rb_channel_credentials.c28
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c16
-rw-r--r--src/ruby/ext/grpc/rb_compression_options.c13
-rw-r--r--src/ruby/ext/grpc/rb_event_thread.c22
-rw-r--r--src/ruby/ext/grpc/rb_event_thread.h3
-rw-r--r--src/ruby/ext/grpc/rb_grpc.c26
-rw-r--r--src/ruby/ext/grpc/rb_grpc.h4
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c12
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h20
-rw-r--r--src/ruby/ext/grpc/rb_server.c54
16 files changed, 242 insertions, 220 deletions
diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c
index 65fa2f2cf6..f5be19298f 100644
--- a/src/ruby/ext/grpc/rb_byte_buffer.c
+++ b/src/ruby/ext/grpc/rb_byte_buffer.c
@@ -33,15 +33,15 @@
#include <ruby/ruby.h>
-#include "rb_grpc_imports.generated.h"
#include "rb_byte_buffer.h"
+#include "rb_grpc_imports.generated.h"
-#include <grpc/grpc.h>
#include <grpc/byte_buffer_reader.h>
+#include <grpc/grpc.h>
#include <grpc/slice.h>
#include "rb_grpc.h"
-grpc_byte_buffer* grpc_rb_s_to_byte_buffer(char *string, size_t length) {
+grpc_byte_buffer *grpc_rb_s_to_byte_buffer(char *string, size_t length) {
grpc_slice slice = grpc_slice_from_copied_buffer(string, length);
grpc_byte_buffer *buffer = grpc_raw_byte_buffer_create(&slice, 1);
grpc_slice_unref(slice);
@@ -61,7 +61,7 @@ VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) {
return Qnil;
}
while (grpc_byte_buffer_reader_next(&reader, &next) != 0) {
- rb_str_cat(rb_string, (const char *) GRPC_SLICE_START_PTR(next),
+ rb_str_cat(rb_string, (const char *)GRPC_SLICE_START_PTR(next),
GRPC_SLICE_LENGTH(next));
grpc_slice_unref(next);
}
@@ -71,7 +71,9 @@ VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) {
VALUE grpc_rb_slice_to_ruby_string(grpc_slice slice) {
if (GRPC_SLICE_START_PTR(slice) == NULL) {
- rb_raise(rb_eRuntimeError, "attempt to convert uninitialized grpc_slice to ruby string");
+ rb_raise(rb_eRuntimeError,
+ "attempt to convert uninitialized grpc_slice to ruby string");
}
- return rb_str_new((char*)GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice));
+ return rb_str_new((char *)GRPC_SLICE_START_PTR(slice),
+ GRPC_SLICE_LENGTH(slice));
}
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 344cb941ff..0067e10866 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -33,12 +33,12 @@
#include <ruby/ruby.h>
-#include "rb_grpc_imports.generated.h"
#include "rb_call.h"
+#include "rb_grpc_imports.generated.h"
#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
#include <grpc/impl/codegen/compression_types.h>
+#include <grpc/support/alloc.h>
#include "rb_byte_buffer.h"
#include "rb_call_credentials.h"
@@ -101,7 +101,7 @@ typedef struct grpc_rb_call {
static void destroy_call(grpc_rb_call *call) {
/* Ensure that we only try to destroy the call once */
if (call->wrapped != NULL) {
- grpc_call_destroy(call->wrapped);
+ grpc_call_unref(call->wrapped);
call->wrapped = NULL;
grpc_rb_completion_queue_destroy(call->queue);
call->queue = NULL;
@@ -113,7 +113,7 @@ static void grpc_rb_call_destroy(void *p) {
if (p == NULL) {
return;
}
- destroy_call((grpc_rb_call*)p);
+ destroy_call((grpc_rb_call *)p);
}
static size_t md_ary_datasize(const void *p) {
@@ -130,7 +130,9 @@ static size_t md_ary_datasize(const void *p) {
static const rb_data_type_t grpc_rb_md_ary_data_type = {
"grpc_metadata_array",
- {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, md_ary_datasize,
+ {GRPC_RB_GC_NOT_MARKED,
+ GRPC_RB_GC_DONT_FREE,
+ md_ary_datasize,
{NULL, NULL}},
NULL,
NULL,
@@ -140,19 +142,20 @@ static const rb_data_type_t grpc_rb_md_ary_data_type = {
* touches a hash object.
* TODO(yugui) Directly use st_table and call the free function earlier?
*/
- 0,
+ 0,
#endif
};
/* Describes grpc_call struct for RTypedData */
-static const rb_data_type_t grpc_call_data_type = {
- "grpc_call",
- {GRPC_RB_GC_NOT_MARKED, grpc_rb_call_destroy, GRPC_RB_MEMSIZE_UNAVAILABLE,
- {NULL, NULL}},
- NULL,
- NULL,
+static const rb_data_type_t grpc_call_data_type = {"grpc_call",
+ {GRPC_RB_GC_NOT_MARKED,
+ grpc_rb_call_destroy,
+ GRPC_RB_MEMSIZE_UNAVAILABLE,
+ {NULL, NULL}},
+ NULL,
+ NULL,
#ifdef RUBY_TYPED_FREE_IMMEDIATELY
- RUBY_TYPED_FREE_IMMEDIATELY
+ RUBY_TYPED_FREE_IMMEDIATELY
#endif
};
@@ -175,7 +178,7 @@ static VALUE grpc_rb_call_cancel(VALUE self) {
grpc_rb_call *call = NULL;
grpc_call_error err;
if (RTYPEDDATA_DATA(self) == NULL) {
- //This call has been closed
+ // This call has been closed
return Qnil;
}
@@ -196,7 +199,7 @@ static VALUE grpc_rb_call_cancel(VALUE self) {
static VALUE grpc_rb_call_close(VALUE self) {
grpc_rb_call *call = NULL;
TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
- if(call != NULL) {
+ if (call != NULL) {
destroy_call(call);
RTYPEDDATA_DATA(self) = NULL;
}
@@ -238,8 +241,8 @@ static VALUE grpc_rb_call_get_peer_cert(VALUE self) {
}
{
- grpc_auth_property_iterator it =
- grpc_auth_context_find_properties_by_name(ctx, GRPC_X509_PEM_CERT_PROPERTY_NAME);
+ grpc_auth_property_iterator it = grpc_auth_context_find_properties_by_name(
+ ctx, GRPC_X509_PEM_CERT_PROPERTY_NAME);
const grpc_auth_property *prop = grpc_auth_property_iterator_next(&it);
if (prop == NULL) {
return Qnil;
@@ -388,21 +391,22 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
long i;
grpc_slice key_slice;
grpc_slice value_slice;
- char* tmp_str;
+ char *tmp_str;
if (TYPE(key) == T_SYMBOL) {
key_slice = grpc_slice_from_static_string(rb_id2name(SYM2ID(key)));
} else if (TYPE(key) == T_STRING) {
- key_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(key), RSTRING_LEN(key));
+ key_slice =
+ grpc_slice_from_copied_buffer(RSTRING_PTR(key), RSTRING_LEN(key));
} else {
- rb_raise(rb_eTypeError, "grpc_rb_md_ary_fill_hash_cb: bad type for key parameter");
+ rb_raise(rb_eTypeError,
+ "grpc_rb_md_ary_fill_hash_cb: bad type for key parameter");
}
if (!grpc_header_key_is_legal(key_slice)) {
tmp_str = grpc_slice_to_c_string(key_slice);
rb_raise(rb_eArgError,
- "'%s' is an invalid header key, must match [a-z0-9-_.]+",
- tmp_str);
+ "'%s' is an invalid header key, must match [a-z0-9-_.]+", tmp_str);
return ST_STOP;
}
@@ -414,13 +418,14 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
array_length = RARRAY_LEN(val);
/* If the value is an array, add capacity for each value in the array */
for (i = 0; i < array_length; i++) {
- value_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(rb_ary_entry(val, i)), RSTRING_LEN(rb_ary_entry(val, i)));
+ value_slice = grpc_slice_from_copied_buffer(
+ RSTRING_PTR(rb_ary_entry(val, i)), RSTRING_LEN(rb_ary_entry(val, i)));
if (!grpc_is_binary_header(key_slice) &&
!grpc_header_nonbin_value_is_legal(value_slice)) {
// The value has invalid characters
tmp_str = grpc_slice_to_c_string(value_slice);
- rb_raise(rb_eArgError,
- "Header value '%s' has invalid characters", tmp_str);
+ rb_raise(rb_eArgError, "Header value '%s' has invalid characters",
+ tmp_str);
return ST_STOP;
}
md_ary->metadata[md_ary->count].key = key_slice;
@@ -428,21 +433,21 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
md_ary->count += 1;
}
} else if (TYPE(val) == T_STRING) {
- value_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(val), RSTRING_LEN(val));
+ value_slice =
+ grpc_slice_from_copied_buffer(RSTRING_PTR(val), RSTRING_LEN(val));
if (!grpc_is_binary_header(key_slice) &&
!grpc_header_nonbin_value_is_legal(value_slice)) {
// The value has invalid characters
tmp_str = grpc_slice_to_c_string(value_slice);
- rb_raise(rb_eArgError,
- "Header value '%s' has invalid characters", tmp_str);
+ rb_raise(rb_eArgError, "Header value '%s' has invalid characters",
+ tmp_str);
return ST_STOP;
}
md_ary->metadata[md_ary->count].key = key_slice;
md_ary->metadata[md_ary->count].value = value_slice;
md_ary->count += 1;
} else {
- rb_raise(rb_eArgError,
- "Header values must be of type string or array");
+ rb_raise(rb_eArgError, "Header values must be of type string or array");
return ST_STOP;
}
@@ -474,8 +479,7 @@ static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val,
/* grpc_rb_md_ary_convert converts a ruby metadata hash into
a grpc_metadata_array.
*/
-void grpc_rb_md_ary_convert(VALUE md_ary_hash,
- grpc_metadata_array *md_ary) {
+void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) {
VALUE md_ary_obj = Qnil;
if (md_ary_hash == Qnil) {
return; /* Do nothing if the expected has value is nil */
@@ -511,12 +515,14 @@ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary) {
rb_hash_aset(result, key, value);
} else if (TYPE(value) == T_ARRAY) {
/* Add the string to the returned array */
- rb_ary_push(value, grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
+ rb_ary_push(value,
+ grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
} else {
/* Add the current value with this key and the new one to an array */
new_ary = rb_ary_new();
rb_ary_push(new_ary, value);
- rb_ary_push(new_ary, grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
+ rb_ary_push(new_ary,
+ grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
rb_hash_aset(result, key, new_ary);
}
}
@@ -556,10 +562,9 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
/* grpc_rb_op_update_status_from_server adds the values in a ruby status
struct to the 'send_status_from_server' portion of an op.
*/
-static void grpc_rb_op_update_status_from_server(grpc_op *op,
- grpc_metadata_array *md_ary,
- grpc_slice *send_status_details,
- VALUE status) {
+static void grpc_rb_op_update_status_from_server(
+ grpc_op *op, grpc_metadata_array *md_ary, grpc_slice *send_status_details,
+ VALUE status) {
VALUE code = rb_struct_aref(status, sym_code);
VALUE details = rb_struct_aref(status, sym_details);
VALUE metadata_hash = rb_struct_aref(status, sym_metadata);
@@ -576,7 +581,8 @@ static void grpc_rb_op_update_status_from_server(grpc_op *op,
return;
}
- *send_status_details = grpc_slice_from_copied_buffer(RSTRING_PTR(details), RSTRING_LEN(details));
+ *send_status_details =
+ grpc_slice_from_copied_buffer(RSTRING_PTR(details), RSTRING_LEN(details));
op->data.send_status_from_server.status = NUM2INT(code);
op->data.send_status_from_server.status_details = send_status_details;
@@ -687,7 +693,8 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
/* N.B. later there is no need to explicitly delete the metadata keys
* and values, they are references to data in ruby objects. */
grpc_rb_op_update_status_from_server(
- &st->ops[st->op_num], &st->send_trailing_metadata, &st->send_status_details, this_value);
+ &st->ops[st->op_num], &st->send_trailing_metadata,
+ &st->send_status_details, this_value);
break;
case GRPC_OP_RECV_INITIAL_METADATA:
st->ops[st->op_num].data.recv_initial_metadata.recv_initial_metadata =
@@ -749,12 +756,12 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
case GRPC_OP_RECV_STATUS_ON_CLIENT:
rb_struct_aset(
result, sym_status,
- rb_struct_new(grpc_rb_sStatus, UINT2NUM(st->recv_status),
- (GRPC_SLICE_START_PTR(st->recv_status_details) == NULL
- ? Qnil
- : grpc_rb_slice_to_ruby_string(st->recv_status_details)),
- grpc_rb_md_ary_to_h(&st->recv_trailing_metadata),
- NULL));
+ rb_struct_new(
+ grpc_rb_sStatus, UINT2NUM(st->recv_status),
+ (GRPC_SLICE_START_PTR(st->recv_status_details) == NULL
+ ? Qnil
+ : grpc_rb_slice_to_ruby_string(st->recv_status_details)),
+ grpc_rb_md_ary_to_h(&st->recv_trailing_metadata), NULL));
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
rb_struct_aset(result, sym_send_close, Qtrue);
@@ -791,7 +798,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
VALUE result = Qnil;
VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
unsigned write_flag = 0;
- void *tag = (void*)&st;
+ void *tag = (void *)&st;
if (RTYPEDDATA_DATA(self) == NULL) {
rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
@@ -919,7 +926,8 @@ static void Init_grpc_op_codes() {
}
static void Init_grpc_metadata_keys() {
- VALUE grpc_rb_mMetadataKeys = rb_define_module_under(grpc_rb_mGrpcCore, "MetadataKeys");
+ VALUE grpc_rb_mMetadataKeys =
+ rb_define_module_under(grpc_rb_mGrpcCore, "MetadataKeys");
rb_define_const(grpc_rb_mMetadataKeys, "COMPRESSION_REQUEST_ALGORITHM",
rb_str_new2(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY));
}
diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h
index 56becdc5a4..dc597f7ba7 100644
--- a/src/ruby/ext/grpc/rb_call.h
+++ b/src/ruby/ext/grpc/rb_call.h
@@ -39,13 +39,13 @@
#include <grpc/grpc.h>
/* Gets the wrapped call from a VALUE. */
-grpc_call* grpc_rb_get_wrapped_call(VALUE v);
+grpc_call *grpc_rb_get_wrapped_call(VALUE v);
/* Gets the VALUE corresponding to given grpc_call. */
VALUE grpc_rb_wrap_call(grpc_call *c, grpc_completion_queue *q);
/* Provides the details of an call error */
-const char* grpc_call_error_detail_of(grpc_call_error err);
+const char *grpc_call_error_detail_of(grpc_call_error err);
/* Converts a metadata array to a hash. */
VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary);
@@ -53,8 +53,7 @@ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary);
/* grpc_rb_md_ary_convert converts a ruby metadata hash into
a grpc_metadata_array.
*/
-void grpc_rb_md_ary_convert(VALUE md_ary_hash,
- grpc_metadata_array *md_ary);
+void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary);
/* grpc_rb_eCallError is the ruby class of the exception thrown during call
operations. */
diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c
index fac2cbb04d..7a5a74580d 100644
--- a/src/ruby/ext/grpc/rb_call_credentials.c
+++ b/src/ruby/ext/grpc/rb_call_credentials.c
@@ -33,8 +33,8 @@
#include <ruby/ruby.h>
-#include "rb_grpc_imports.generated.h"
#include "rb_call_credentials.h"
+#include "rb_grpc_imports.generated.h"
#include <ruby/thread.h>
@@ -82,20 +82,18 @@ static VALUE grpc_rb_call_credentials_callback(VALUE callback_args) {
static VALUE grpc_rb_call_credentials_callback_rescue(VALUE args,
VALUE exception_object) {
VALUE result = rb_hash_new();
- VALUE backtrace = rb_funcall(
- rb_funcall(exception_object, rb_intern("backtrace"), 0),
- rb_intern("join"),
- 1, rb_str_new2("\n\tfrom "));
- VALUE rb_exception_info = rb_funcall(exception_object, rb_intern("inspect"), 0);
+ VALUE backtrace =
+ rb_funcall(rb_funcall(exception_object, rb_intern("backtrace"), 0),
+ rb_intern("join"), 1, rb_str_new2("\n\tfrom "));
+ VALUE rb_exception_info =
+ rb_funcall(exception_object, rb_intern("inspect"), 0);
(void)args;
gpr_log(GPR_INFO, "Call credentials callback failed: %s\n%s",
- StringValueCStr(rb_exception_info),
- StringValueCStr(backtrace));
+ StringValueCStr(rb_exception_info), StringValueCStr(backtrace));
rb_hash_aset(result, rb_str_new2("metadata"), Qnil);
rb_hash_aset(result, rb_str_new2("status"),
INT2NUM(GRPC_STATUS_UNAUTHENTICATED));
- rb_hash_aset(result, rb_str_new2("details"),
- rb_exception_info);
+ rb_hash_aset(result, rb_str_new2("details"), rb_exception_info);
return result;
}
@@ -118,7 +116,8 @@ static void grpc_rb_call_credentials_callback_with_gil(void *param) {
result = rb_rescue(grpc_rb_call_credentials_callback, callback_args,
grpc_rb_call_credentials_callback_rescue, Qnil);
// Both callbacks return a hash, so result should be a hash
- grpc_rb_md_ary_convert(rb_hash_aref(result, rb_str_new2("metadata")), &md_ary);
+ grpc_rb_md_ary_convert(rb_hash_aref(result, rb_str_new2("metadata")),
+ &md_ary);
status = NUM2INT(rb_hash_aref(result, rb_str_new2("status")));
details = rb_hash_aref(result, rb_str_new2("details"));
error_details = StringValueCStr(details);
@@ -138,7 +137,7 @@ static void grpc_rb_call_credentials_plugin_get_metadata(
params->callback = cb;
grpc_rb_event_queue_enqueue(grpc_rb_call_credentials_callback_with_gil,
- (void*)(params));
+ (void *)(params));
}
static void grpc_rb_call_credentials_plugin_destroy(void *state) {
@@ -172,13 +171,15 @@ static void grpc_rb_call_credentials_mark(void *p) {
}
static rb_data_type_t grpc_rb_call_credentials_data_type = {
- "grpc_call_credentials",
- {grpc_rb_call_credentials_mark, grpc_rb_call_credentials_free,
- GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}},
- NULL,
- NULL,
+ "grpc_call_credentials",
+ {grpc_rb_call_credentials_mark,
+ grpc_rb_call_credentials_free,
+ GRPC_RB_MEMSIZE_UNAVAILABLE,
+ {NULL, NULL}},
+ NULL,
+ NULL,
#ifdef RUBY_TYPED_FREE_IMMEDIATELY
- RUBY_TYPED_FREE_IMMEDIATELY
+ RUBY_TYPED_FREE_IMMEDIATELY
#endif
};
@@ -188,7 +189,8 @@ static VALUE grpc_rb_call_credentials_alloc(VALUE cls) {
grpc_rb_call_credentials *wrapper = ALLOC(grpc_rb_call_credentials);
wrapper->wrapped = NULL;
wrapper->mark = Qnil;
- return TypedData_Wrap_Struct(cls, &grpc_rb_call_credentials_data_type, wrapper);
+ return TypedData_Wrap_Struct(cls, &grpc_rb_call_credentials_data_type,
+ wrapper);
}
/* Creates a wrapping object for a given call credentials. This should only be
@@ -232,7 +234,7 @@ static VALUE grpc_rb_call_credentials_init(VALUE self, VALUE proc) {
rb_raise(rb_eTypeError, "Argument to CallCredentials#new must be a proc");
return Qnil;
}
- plugin.state = (void*)proc;
+ plugin.state = (void *)proc;
plugin.type = "";
creds = grpc_metadata_credentials_create_from_plugin(plugin, NULL);
@@ -289,7 +291,6 @@ void Init_grpc_call_credentials() {
grpc_call_credentials *grpc_rb_get_wrapped_call_credentials(VALUE v) {
grpc_rb_call_credentials *wrapper = NULL;
TypedData_Get_Struct(v, grpc_rb_call_credentials,
- &grpc_rb_call_credentials_data_type,
- wrapper);
+ &grpc_rb_call_credentials_data_type, wrapper);
return wrapper->wrapped;
}
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index fb610f548e..a802183726 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -34,9 +34,9 @@
#include <ruby/ruby.h>
#include <ruby/thread.h>
-#include "rb_grpc_imports.generated.h"
#include "rb_byte_buffer.h"
#include "rb_channel.h"
+#include "rb_grpc_imports.generated.h"
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
@@ -89,8 +89,8 @@ typedef struct grpc_rb_channel {
static void grpc_rb_channel_try_register_connection_polling(
grpc_rb_channel *wrapper);
static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper);
-static void *wait_until_channel_polling_thread_started_no_gil(void*);
-static void wait_until_channel_polling_thread_started_unblocking_func(void*);
+static void *wait_until_channel_polling_thread_started_no_gil(void *);
+static void wait_until_channel_polling_thread_started_unblocking_func(void *);
static grpc_completion_queue *channel_polling_cq;
static gpr_mu global_connection_polling_mu;
@@ -171,8 +171,9 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
MEMZERO(&args, grpc_channel_args, 1);
grpc_ruby_once_init();
- rb_thread_call_without_gvl(wait_until_channel_polling_thread_started_no_gil, NULL,
- wait_until_channel_polling_thread_started_unblocking_func, NULL);
+ rb_thread_call_without_gvl(
+ wait_until_channel_polling_thread_started_no_gil, NULL,
+ wait_until_channel_polling_thread_started_unblocking_func, NULL);
/* "3" == 3 mandatory args */
rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials);
@@ -204,14 +205,14 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
gpr_mu_lock(&wrapper->channel_mu);
wrapper->abort_watch_connectivity_state = 0;
- wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
+ wrapper->current_connectivity_state =
+ grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
wrapper->safe_to_destroy = 0;
wrapper->request_safe_destroy = 0;
gpr_cv_broadcast(&wrapper->channel_cv);
gpr_mu_unlock(&wrapper->channel_mu);
-
grpc_rb_channel_try_register_connection_polling(wrapper);
if (args.args != NULL) {
@@ -253,7 +254,8 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
- return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect));
+ return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped,
+ grpc_try_to_connect));
}
typedef struct watch_state_stack {
@@ -263,22 +265,21 @@ typedef struct watch_state_stack {
} watch_state_stack;
static void *watch_channel_state_without_gvl(void *arg) {
- watch_state_stack *stack = (watch_state_stack*)arg;
+ watch_state_stack *stack = (watch_state_stack *)arg;
gpr_timespec deadline = stack->deadline;
grpc_rb_channel *wrapper = stack->wrapper;
int last_state = stack->last_state;
- void *return_value = (void*)0;
+ void *return_value = (void *)0;
gpr_mu_lock(&wrapper->channel_mu);
- while(wrapper->current_connectivity_state == last_state &&
- !wrapper->request_safe_destroy &&
- !wrapper->safe_to_destroy &&
- !wrapper->abort_watch_connectivity_state &&
- gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) {
+ while (wrapper->current_connectivity_state == last_state &&
+ !wrapper->request_safe_destroy && !wrapper->safe_to_destroy &&
+ !wrapper->abort_watch_connectivity_state &&
+ gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) {
gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline);
}
if (wrapper->current_connectivity_state != last_state) {
- return_value = (void*)1;
+ return_value = (void *)1;
}
gpr_mu_unlock(&wrapper->channel_mu);
@@ -286,7 +287,7 @@ static void *watch_channel_state_without_gvl(void *arg) {
}
static void watch_channel_state_unblocking_func(void *arg) {
- grpc_rb_channel *wrapper = (grpc_rb_channel*)arg;
+ grpc_rb_channel *wrapper = (grpc_rb_channel *)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called");
gpr_mu_lock(&wrapper->channel_mu);
wrapper->abort_watch_connectivity_state = 1;
@@ -306,7 +307,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE deadline) {
grpc_rb_channel *wrapper = NULL;
watch_state_stack stack;
- void* out;
+ void *out;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
@@ -316,14 +317,18 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
}
if (!FIXNUM_P(last_state)) {
- rb_raise(rb_eTypeError, "bad type for last_state. want a GRPC::Core::ChannelState constant");
+ rb_raise(
+ rb_eTypeError,
+ "bad type for last_state. want a GRPC::Core::ChannelState constant");
return Qnil;
}
stack.wrapper = wrapper;
stack.deadline = grpc_rb_time_timeval(deadline, 0);
stack.last_state = NUM2LONG(last_state);
- out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper);
+ out =
+ rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack,
+ watch_channel_state_unblocking_func, wrapper);
if (out) {
return Qtrue;
}
@@ -359,7 +364,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
parent_call = grpc_rb_get_wrapped_call(parent);
}
- cq = grpc_completion_queue_create(NULL);
+ cq = grpc_completion_queue_create_for_pluck(NULL);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch == NULL) {
@@ -428,7 +433,7 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
// destroy.
// Not safe to call while a channel's connection state is polled.
static void grpc_rb_channel_try_register_connection_polling(
- grpc_rb_channel *wrapper) {
+ grpc_rb_channel *wrapper) {
grpc_connectivity_state conn_state;
gpr_timespec sleep_time = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN));
@@ -501,11 +506,14 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
break;
}
if (event.type == GRPC_OP_COMPLETE) {
- grpc_rb_channel_try_register_connection_polling((grpc_rb_channel *)event.tag);
+ grpc_rb_channel_try_register_connection_polling(
+ (grpc_rb_channel *)event.tag);
}
}
grpc_completion_queue_destroy(channel_polling_cq);
- gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop");
+ gpr_log(GPR_DEBUG,
+ "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling "
+ "loop");
return NULL;
}
@@ -513,7 +521,9 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
static void run_poll_channels_loop_unblocking_func(void *arg) {
(void)arg;
gpr_mu_lock(&global_connection_polling_mu);
- gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting connection polling");
+ gpr_log(GPR_DEBUG,
+ "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting "
+ "connection polling");
abort_channel_polling = 1;
grpc_completion_queue_shutdown(channel_polling_cq);
gpr_mu_unlock(&global_connection_polling_mu);
@@ -522,7 +532,9 @@ static void run_poll_channels_loop_unblocking_func(void *arg) {
// Poll channel connectivity states in background thread without the GIL.
static VALUE run_poll_channels_loop(VALUE arg) {
(void)arg;
- gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread");
+ gpr_log(
+ GPR_DEBUG,
+ "GRPC_RUBY: run_poll_channels_loop - create connection polling thread");
rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL,
run_poll_channels_loop_unblocking_func, NULL);
@@ -542,10 +554,14 @@ static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
return NULL;
}
-static void wait_until_channel_polling_thread_started_unblocking_func(void* arg) {
+static void wait_until_channel_polling_thread_started_unblocking_func(
+ void *arg) {
(void)arg;
gpr_mu_lock(&global_connection_polling_mu);
- gpr_log(GPR_DEBUG, "GRPC_RUBY: wait_until_channel_polling_thread_started_unblocking_func - begin aborting connection polling");
+ gpr_log(GPR_DEBUG,
+ "GRPC_RUBY: "
+ "wait_until_channel_polling_thread_started_unblocking_func - begin "
+ "aborting connection polling");
abort_channel_polling = 1;
gpr_cv_broadcast(&global_connection_polling_cv);
gpr_mu_unlock(&global_connection_polling_mu);
@@ -571,7 +587,7 @@ void grpc_rb_channel_polling_thread_start() {
gpr_mu_init(&global_connection_polling_mu);
gpr_cv_init(&global_connection_polling_cv);
- channel_polling_cq = grpc_completion_queue_create(NULL);
+ channel_polling_cq = grpc_completion_queue_create_for_next(NULL);
background_thread = rb_thread_create(run_poll_channels_loop, NULL);
if (!RTEST(background_thread)) {
diff --git a/src/ruby/ext/grpc/rb_channel_args.c b/src/ruby/ext/grpc/rb_channel_args.c
index 87c0e0a705..fa9ddee372 100644
--- a/src/ruby/ext/grpc/rb_channel_args.c
+++ b/src/ruby/ext/grpc/rb_channel_args.c
@@ -33,8 +33,8 @@
#include <ruby/ruby.h>
-#include "rb_grpc_imports.generated.h"
#include "rb_channel_args.h"
+#include "rb_grpc_imports.generated.h"
#include <grpc/grpc.h>
@@ -42,9 +42,12 @@
static rb_data_type_t grpc_rb_channel_args_data_type = {
"grpc_channel_args",
- {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, GRPC_RB_MEMSIZE_UNAVAILABLE,
+ {GRPC_RB_GC_NOT_MARKED,
+ GRPC_RB_GC_DONT_FREE,
+ GRPC_RB_MEMSIZE_UNAVAILABLE,
{NULL, NULL}},
- NULL, NULL,
+ NULL,
+ NULL,
#ifdef RUBY_TYPED_FREE_IMMEDIATELY
RUBY_TYPED_FREE_IMMEDIATELY
#endif
@@ -137,11 +140,10 @@ static VALUE grpc_rb_hash_convert_to_channel_args0(VALUE as_value) {
params->dst->num_args = num_args;
params->dst->args = ALLOC_N(grpc_arg, num_args);
MEMZERO(params->dst->args, grpc_arg, num_args);
- rb_hash_foreach(params->src_hash,
- grpc_rb_channel_create_in_process_add_args_hash_cb,
- TypedData_Wrap_Struct(grpc_rb_cChannelArgs,
- &grpc_rb_channel_args_data_type,
- params->dst));
+ rb_hash_foreach(
+ params->src_hash, grpc_rb_channel_create_in_process_add_args_hash_cb,
+ TypedData_Wrap_Struct(grpc_rb_cChannelArgs,
+ &grpc_rb_channel_args_data_type, params->dst));
/* reset num_args as grpc_rb_channel_create_in_process_add_args_hash_cb
* decrements it during has processing */
params->dst->num_args = num_args;
@@ -157,7 +159,7 @@ void grpc_rb_hash_convert_to_channel_args(VALUE src_hash,
/* Make a protected call to grpc_rb_hash_convert_channel_args */
params.src_hash = src_hash;
params.dst = dst;
- rb_protect(grpc_rb_hash_convert_to_channel_args0, (VALUE) & params, &status);
+ rb_protect(grpc_rb_hash_convert_to_channel_args0, (VALUE)&params, &status);
if (status != 0) {
if (dst->args != NULL) {
/* Free any allocated memory before propagating the error */
diff --git a/src/ruby/ext/grpc/rb_channel_credentials.c b/src/ruby/ext/grpc/rb_channel_credentials.c
index d334c09148..db713ed821 100644
--- a/src/ruby/ext/grpc/rb_channel_credentials.c
+++ b/src/ruby/ext/grpc/rb_channel_credentials.c
@@ -35,8 +35,8 @@
#include <string.h>
-#include "rb_grpc_imports.generated.h"
#include "rb_channel_credentials.h"
+#include "rb_grpc_imports.generated.h"
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
@@ -91,8 +91,10 @@ static void grpc_rb_channel_credentials_mark(void *p) {
static rb_data_type_t grpc_rb_channel_credentials_data_type = {
"grpc_channel_credentials",
- {grpc_rb_channel_credentials_mark, grpc_rb_channel_credentials_free,
- GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}},
+ {grpc_rb_channel_credentials_mark,
+ grpc_rb_channel_credentials_free,
+ GRPC_RB_MEMSIZE_UNAVAILABLE,
+ {NULL, NULL}},
NULL,
NULL,
#ifdef RUBY_TYPED_FREE_IMMEDIATELY
@@ -106,13 +108,15 @@ static VALUE grpc_rb_channel_credentials_alloc(VALUE cls) {
grpc_rb_channel_credentials *wrapper = ALLOC(grpc_rb_channel_credentials);
wrapper->wrapped = NULL;
wrapper->mark = Qnil;
- return TypedData_Wrap_Struct(cls, &grpc_rb_channel_credentials_data_type, wrapper);
+ return TypedData_Wrap_Struct(cls, &grpc_rb_channel_credentials_data_type,
+ wrapper);
}
/* Creates a wrapping object for a given channel credentials. This should only
* be called with grpc_channel_credentials objects that are not already
* associated with any Ruby object. */
-VALUE grpc_rb_wrap_channel_credentials(grpc_channel_credentials *c, VALUE mark) {
+VALUE grpc_rb_wrap_channel_credentials(grpc_channel_credentials *c,
+ VALUE mark) {
VALUE rb_wrapper;
grpc_rb_channel_credentials *wrapper;
if (c == NULL) {
@@ -147,7 +151,8 @@ static ID id_pem_cert_chain;
pem_private_key: (optional) PEM encoding of the client's private key
pem_cert_chain: (optional) PEM encoding of the client's cert chain
Initializes Credential instances. */
-static VALUE grpc_rb_channel_credentials_init(int argc, VALUE *argv, VALUE self) {
+static VALUE grpc_rb_channel_credentials_init(int argc, VALUE *argv,
+ VALUE self) {
VALUE pem_root_certs = Qnil;
VALUE pem_private_key = Qnil;
VALUE pem_cert_chain = Qnil;
@@ -173,8 +178,8 @@ static VALUE grpc_rb_channel_credentials_init(int argc, VALUE *argv, VALUE self)
} else {
key_cert_pair.private_key = RSTRING_PTR(pem_private_key);
key_cert_pair.cert_chain = RSTRING_PTR(pem_cert_chain);
- creds = grpc_ssl_credentials_create(pem_root_certs_cstr,
- &key_cert_pair, NULL);
+ creds =
+ grpc_ssl_credentials_create(pem_root_certs_cstr, &key_cert_pair, NULL);
}
if (creds == NULL) {
rb_raise(rb_eRuntimeError, "could not create a credentials, not sure why");
@@ -233,8 +238,8 @@ static VALUE grpc_rb_set_default_roots_pem(VALUE self, VALUE roots) {
}
void Init_grpc_channel_credentials() {
- grpc_rb_cChannelCredentials =
- rb_define_class_under(grpc_rb_mGrpcCore, "ChannelCredentials", rb_cObject);
+ grpc_rb_cChannelCredentials = rb_define_class_under(
+ grpc_rb_mGrpcCore, "ChannelCredentials", rb_cObject);
/* Allocates an object managed by the ruby runtime */
rb_define_alloc_func(grpc_rb_cChannelCredentials,
@@ -262,7 +267,6 @@ void Init_grpc_channel_credentials() {
grpc_channel_credentials *grpc_rb_get_wrapped_channel_credentials(VALUE v) {
grpc_rb_channel_credentials *wrapper = NULL;
TypedData_Get_Struct(v, grpc_rb_channel_credentials,
- &grpc_rb_channel_credentials_data_type,
- wrapper);
+ &grpc_rb_channel_credentials_data_type, wrapper);
return wrapper->wrapped;
}
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index fd75d2f691..c9d67739a5 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -33,14 +33,14 @@
#include <ruby/ruby.h>
-#include "rb_grpc_imports.generated.h"
#include "rb_completion_queue.h"
+#include "rb_grpc_imports.generated.h"
#include <ruby/thread.h>
#include <grpc/grpc.h>
-#include <grpc/support/time.h>
#include <grpc/support/log.h>
+#include <grpc/support/time.h>
#include "rb_grpc.h"
/* Used to allow grpc_completion_queue_next call to release the GIL */
@@ -54,14 +54,13 @@ typedef struct next_call_stack {
/* Calls grpc_completion_queue_pluck without holding the ruby GIL */
static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
- next_call_stack *const next_call = (next_call_stack*)param;
+ next_call_stack *const next_call = (next_call_stack *)param;
gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
gpr_timespec deadline;
do {
deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
- next_call->event = grpc_completion_queue_pluck(next_call->cq,
- next_call->tag,
- deadline, NULL);
+ next_call->event = grpc_completion_queue_pluck(
+ next_call->cq, next_call->tag, deadline, NULL);
if (next_call->event.type != GRPC_QUEUE_TIMEOUT ||
gpr_time_cmp(deadline, next_call->timeout) > 0) {
break;
@@ -81,7 +80,7 @@ void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq) {
}
static void unblock_func(void *param) {
- next_call_stack *const next_call = (next_call_stack*)param;
+ next_call_stack *const next_call = (next_call_stack *)param;
next_call->interrupted = 1;
}
@@ -111,7 +110,6 @@ grpc_event rb_completion_queue_pluck(grpc_completion_queue *queue, void *tag,
(void *)&next_call);
/* If an interrupt prevented pluck from returning useful information, then
any plucks that did complete must have timed out */
- } while (next_call.interrupted &&
- next_call.event.type == GRPC_QUEUE_TIMEOUT);
+ } while (next_call.interrupted && next_call.event.type == GRPC_QUEUE_TIMEOUT);
return next_call.event;
}
diff --git a/src/ruby/ext/grpc/rb_compression_options.c b/src/ruby/ext/grpc/rb_compression_options.c
index b23e82b0db..45c963dca6 100644
--- a/src/ruby/ext/grpc/rb_compression_options.c
+++ b/src/ruby/ext/grpc/rb_compression_options.c
@@ -33,15 +33,15 @@
#include <ruby/ruby.h>
-#include "rb_compression_options.h"
#include "rb_byte_buffer.h"
+#include "rb_compression_options.h"
#include "rb_grpc_imports.generated.h"
#include <grpc/compression.h>
#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
#include <grpc/impl/codegen/compression_types.h>
#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/support/alloc.h>
#include <string.h>
#include "rb_grpc.h"
@@ -182,15 +182,16 @@ void grpc_rb_compression_options_algorithm_name_to_value_internal(
* correct C string out of it. */
algorithm_name_as_string = rb_funcall(algorithm_name, rb_intern("to_s"), 0);
- name_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(algorithm_name_as_string), RSTRING_LEN(algorithm_name_as_string));
+ name_slice =
+ grpc_slice_from_copied_buffer(RSTRING_PTR(algorithm_name_as_string),
+ RSTRING_LEN(algorithm_name_as_string));
/* Raise an error if the name isn't recognized as a compression algorithm by
* the algorithm parse function
* in GRPC core. */
- if(!grpc_compression_algorithm_parse(name_slice, algorithm_value)) {
+ if (!grpc_compression_algorithm_parse(name_slice, algorithm_value)) {
tmp_str = grpc_slice_to_c_string(name_slice);
- rb_raise(rb_eNameError, "Invalid compression algorithm name: %s",
- tmp_str);
+ rb_raise(rb_eNameError, "Invalid compression algorithm name: %s", tmp_str);
}
grpc_slice_unref(name_slice);
diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c
index 9e85bbcfbf..9a3b56ddfb 100644
--- a/src/ruby/ext/grpc/rb_event_thread.c
+++ b/src/ruby/ext/grpc/rb_event_thread.c
@@ -33,20 +33,20 @@
#include <ruby/ruby.h>
-#include "rb_grpc_imports.generated.h"
#include "rb_event_thread.h"
+#include "rb_grpc_imports.generated.h"
#include <stdbool.h>
-#include <ruby/thread.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
-#include <grpc/support/log.h>
+#include <ruby/thread.h>
typedef struct grpc_rb_event {
// callback will be called with argument while holding the GVL
- void (*callback)(void*);
+ void (*callback)(void *);
void *argument;
struct grpc_rb_event *next;
@@ -65,8 +65,7 @@ typedef struct grpc_rb_event_queue {
static grpc_rb_event_queue event_queue;
-void grpc_rb_event_queue_enqueue(void (*callback)(void*),
- void *argument) {
+void grpc_rb_event_queue_enqueue(void (*callback)(void *), void *argument) {
grpc_rb_event *event = gpr_malloc(sizeof(grpc_rb_event));
event->callback = callback;
event->argument = argument;
@@ -107,8 +106,7 @@ static void *grpc_rb_wait_for_event_no_gil(void *param) {
(void)param;
gpr_mu_lock(&event_queue.mu);
while ((event = grpc_rb_event_queue_dequeue()) == NULL) {
- gpr_cv_wait(&event_queue.cv,
- &event_queue.mu,
+ gpr_cv_wait(&event_queue.cv, &event_queue.mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
if (event_queue.abort) {
gpr_mu_unlock(&event_queue.mu);
@@ -132,10 +130,10 @@ static void grpc_rb_event_unblocking_func(void *arg) {
static VALUE grpc_rb_event_thread(VALUE arg) {
grpc_rb_event *event;
(void)arg;
- while(true) {
- event = (grpc_rb_event*)rb_thread_call_without_gvl(
- grpc_rb_wait_for_event_no_gil, NULL,
- grpc_rb_event_unblocking_func, NULL);
+ while (true) {
+ event = (grpc_rb_event *)rb_thread_call_without_gvl(
+ grpc_rb_wait_for_event_no_gil, NULL, grpc_rb_event_unblocking_func,
+ NULL);
if (event == NULL) {
// Indicates that the thread needs to shut down
break;
diff --git a/src/ruby/ext/grpc/rb_event_thread.h b/src/ruby/ext/grpc/rb_event_thread.h
index 46638bfcf5..d7eff760a1 100644
--- a/src/ruby/ext/grpc/rb_event_thread.h
+++ b/src/ruby/ext/grpc/rb_event_thread.h
@@ -33,5 +33,4 @@
void grpc_rb_event_queue_thread_start();
-void grpc_rb_event_queue_enqueue(void (*callback)(void*),
- void *argument);
+void grpc_rb_event_queue_enqueue(void (*callback)(void *), void *argument);
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index 584b5dbc63..5be8861e0c 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -33,8 +33,8 @@
#include <ruby/ruby.h>
-#include "rb_grpc_imports.generated.h"
#include "rb_grpc.h"
+#include "rb_grpc_imports.generated.h"
#include <math.h>
#include <ruby/vm.h>
@@ -46,18 +46,19 @@
#include "rb_call_credentials.h"
#include "rb_channel.h"
#include "rb_channel_credentials.h"
+#include "rb_compression_options.h"
+#include "rb_event_thread.h"
#include "rb_loader.h"
#include "rb_server.h"
#include "rb_server_credentials.h"
-#include "rb_compression_options.h"
-#include "rb_event_thread.h"
-#include "rb_channel.h"
static VALUE grpc_rb_cTimeVal = Qnil;
static rb_data_type_t grpc_rb_timespec_data_type = {
"gpr_timespec",
- {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, GRPC_RB_MEMSIZE_UNAVAILABLE,
+ {GRPC_RB_GC_NOT_MARKED,
+ GRPC_RB_GC_DONT_FREE,
+ GRPC_RB_MEMSIZE_UNAVAILABLE,
{NULL, NULL}},
NULL,
NULL,
@@ -86,8 +87,7 @@ VALUE grpc_rb_cannot_init(VALUE self) {
/* Init/Clone func that fails by raising an exception. */
VALUE grpc_rb_cannot_init_copy(VALUE copy, VALUE self) {
(void)self;
- rb_raise(rb_eTypeError,
- "Copy initialization of %s is not supported",
+ rb_raise(rb_eTypeError, "Copy initialization of %s is not supported",
rb_obj_classname(copy));
return Qnil;
}
@@ -145,8 +145,7 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval) {
}
t.tv_sec = (int64_t)f;
if (f != t.tv_sec) {
- rb_raise(rb_eRangeError, "%f out of Time range",
- RFLOAT_VALUE(time));
+ rb_raise(rb_eRangeError, "%f out of Time range", RFLOAT_VALUE(time));
}
t.tv_nsec = (int)(d * 1e9 + 0.5);
}
@@ -271,9 +270,7 @@ static void Init_grpc_time_consts() {
id_tv_nsec = rb_intern("tv_nsec");
}
-static void grpc_rb_shutdown(void) {
- grpc_shutdown();
-}
+static void grpc_rb_shutdown(void) { grpc_shutdown(); }
/* Initialize the GRPC module structs */
@@ -322,9 +319,8 @@ void Init_grpc_c() {
grpc_rb_mGRPC = rb_define_module("GRPC");
grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core");
- grpc_rb_sNewServerRpc =
- rb_struct_define("NewServerRpc", "method", "host",
- "deadline", "metadata", "call", NULL);
+ grpc_rb_sNewServerRpc = rb_struct_define(
+ "NewServerRpc", "method", "host", "deadline", "metadata", "call", NULL);
grpc_rb_sStatus =
rb_struct_define("Status", "code", "details", "metadata", NULL);
sym_code = ID2SYM(rb_intern("code"));
diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h
index 4bf11e75df..8538a74211 100644
--- a/src/ruby/ext/grpc/rb_grpc.h
+++ b/src/ruby/ext/grpc/rb_grpc.h
@@ -34,8 +34,8 @@
#ifndef GRPC_RB_H_
#define GRPC_RB_H_
-#include <sys/time.h>
#include <ruby/ruby.h>
+#include <sys/time.h>
#include <grpc/support/time.h>
@@ -68,7 +68,7 @@ extern VALUE sym_metadata;
/* GRPC_RB_MEMSIZE_UNAVAILABLE is used in rb_data_type_t to indicate that the
* number of bytes used by the wrapped struct is not available. */
-#define GRPC_RB_MEMSIZE_UNAVAILABLE (size_t (*)(const void*))(NULL)
+#define GRPC_RB_MEMSIZE_UNAVAILABLE (size_t(*)(const void*))(NULL)
/* A ruby object alloc func that fails by raising an exception. */
VALUE grpc_rb_cannot_alloc(VALUE cls);
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index 063f92114c..a412277f6d 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -108,9 +108,9 @@ grpc_channel_create_call_type grpc_channel_create_call_import;
grpc_channel_ping_type grpc_channel_ping_import;
grpc_channel_register_call_type grpc_channel_register_call_import;
grpc_channel_create_registered_call_type grpc_channel_create_registered_call_import;
+grpc_call_arena_alloc_type grpc_call_arena_alloc_import;
grpc_call_start_batch_type grpc_call_start_batch_import;
grpc_call_get_peer_type grpc_call_get_peer_import;
-grpc_call_set_load_reporting_cost_context_type grpc_call_set_load_reporting_cost_context_import;
grpc_census_call_set_context_type grpc_census_call_set_context_import;
grpc_census_call_get_context_type grpc_census_call_get_context_import;
grpc_channel_get_target_type grpc_channel_get_target_import;
@@ -120,13 +120,13 @@ grpc_lame_client_channel_create_type grpc_lame_client_channel_create_import;
grpc_channel_destroy_type grpc_channel_destroy_import;
grpc_call_cancel_type grpc_call_cancel_import;
grpc_call_cancel_with_status_type grpc_call_cancel_with_status_import;
-grpc_call_destroy_type grpc_call_destroy_import;
+grpc_call_ref_type grpc_call_ref_import;
+grpc_call_unref_type grpc_call_unref_import;
grpc_server_request_call_type grpc_server_request_call_import;
grpc_server_register_method_type grpc_server_register_method_import;
grpc_server_request_registered_call_type grpc_server_request_registered_call_import;
grpc_server_create_type grpc_server_create_import;
grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
-grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
grpc_server_start_type grpc_server_start_import;
grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import;
@@ -405,9 +405,9 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_channel_ping_import = (grpc_channel_ping_type) GetProcAddress(library, "grpc_channel_ping");
grpc_channel_register_call_import = (grpc_channel_register_call_type) GetProcAddress(library, "grpc_channel_register_call");
grpc_channel_create_registered_call_import = (grpc_channel_create_registered_call_type) GetProcAddress(library, "grpc_channel_create_registered_call");
+ grpc_call_arena_alloc_import = (grpc_call_arena_alloc_type) GetProcAddress(library, "grpc_call_arena_alloc");
grpc_call_start_batch_import = (grpc_call_start_batch_type) GetProcAddress(library, "grpc_call_start_batch");
grpc_call_get_peer_import = (grpc_call_get_peer_type) GetProcAddress(library, "grpc_call_get_peer");
- grpc_call_set_load_reporting_cost_context_import = (grpc_call_set_load_reporting_cost_context_type) GetProcAddress(library, "grpc_call_set_load_reporting_cost_context");
grpc_census_call_set_context_import = (grpc_census_call_set_context_type) GetProcAddress(library, "grpc_census_call_set_context");
grpc_census_call_get_context_import = (grpc_census_call_get_context_type) GetProcAddress(library, "grpc_census_call_get_context");
grpc_channel_get_target_import = (grpc_channel_get_target_type) GetProcAddress(library, "grpc_channel_get_target");
@@ -417,13 +417,13 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_channel_destroy_import = (grpc_channel_destroy_type) GetProcAddress(library, "grpc_channel_destroy");
grpc_call_cancel_import = (grpc_call_cancel_type) GetProcAddress(library, "grpc_call_cancel");
grpc_call_cancel_with_status_import = (grpc_call_cancel_with_status_type) GetProcAddress(library, "grpc_call_cancel_with_status");
- grpc_call_destroy_import = (grpc_call_destroy_type) GetProcAddress(library, "grpc_call_destroy");
+ grpc_call_ref_import = (grpc_call_ref_type) GetProcAddress(library, "grpc_call_ref");
+ grpc_call_unref_import = (grpc_call_unref_type) GetProcAddress(library, "grpc_call_unref");
grpc_server_request_call_import = (grpc_server_request_call_type) GetProcAddress(library, "grpc_server_request_call");
grpc_server_register_method_import = (grpc_server_register_method_type) GetProcAddress(library, "grpc_server_register_method");
grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call");
grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create");
grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue");
- grpc_server_register_non_listening_completion_queue_import = (grpc_server_register_non_listening_completion_queue_type) GetProcAddress(library, "grpc_server_register_non_listening_completion_queue");
grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port");
grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start");
grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index f5dcd68a8e..7df8dd69fc 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -233,7 +233,7 @@ extern grpc_completion_queue_create_for_next_type grpc_completion_queue_create_f
typedef grpc_completion_queue *(*grpc_completion_queue_create_for_pluck_type)(void *reserved);
extern grpc_completion_queue_create_for_pluck_type grpc_completion_queue_create_for_pluck_import;
#define grpc_completion_queue_create_for_pluck grpc_completion_queue_create_for_pluck_import
-typedef grpc_completion_queue *(*grpc_completion_queue_create_type)(void *reserved);
+typedef grpc_completion_queue *(*grpc_completion_queue_create_type)(const grpc_completion_queue_factory *factory, const grpc_completion_queue_attributes *attributes, void *reserved);
extern grpc_completion_queue_create_type grpc_completion_queue_create_import;
#define grpc_completion_queue_create grpc_completion_queue_create_import
typedef grpc_event(*grpc_completion_queue_next_type)(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved);
@@ -275,15 +275,15 @@ extern grpc_channel_register_call_type grpc_channel_register_call_import;
typedef grpc_call *(*grpc_channel_create_registered_call_type)(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, void *registered_call_handle, gpr_timespec deadline, void *reserved);
extern grpc_channel_create_registered_call_type grpc_channel_create_registered_call_import;
#define grpc_channel_create_registered_call grpc_channel_create_registered_call_import
+typedef void *(*grpc_call_arena_alloc_type)(grpc_call *call, size_t size);
+extern grpc_call_arena_alloc_type grpc_call_arena_alloc_import;
+#define grpc_call_arena_alloc grpc_call_arena_alloc_import
typedef grpc_call_error(*grpc_call_start_batch_type)(grpc_call *call, const grpc_op *ops, size_t nops, void *tag, void *reserved);
extern grpc_call_start_batch_type grpc_call_start_batch_import;
#define grpc_call_start_batch grpc_call_start_batch_import
typedef char *(*grpc_call_get_peer_type)(grpc_call *call);
extern grpc_call_get_peer_type grpc_call_get_peer_import;
#define grpc_call_get_peer grpc_call_get_peer_import
-typedef void(*grpc_call_set_load_reporting_cost_context_type)(grpc_call *call, struct grpc_load_reporting_cost_context *context);
-extern grpc_call_set_load_reporting_cost_context_type grpc_call_set_load_reporting_cost_context_import;
-#define grpc_call_set_load_reporting_cost_context grpc_call_set_load_reporting_cost_context_import
typedef void(*grpc_census_call_set_context_type)(grpc_call *call, struct census_context *context);
extern grpc_census_call_set_context_type grpc_census_call_set_context_import;
#define grpc_census_call_set_context grpc_census_call_set_context_import
@@ -311,9 +311,12 @@ extern grpc_call_cancel_type grpc_call_cancel_import;
typedef grpc_call_error(*grpc_call_cancel_with_status_type)(grpc_call *call, grpc_status_code status, const char *description, void *reserved);
extern grpc_call_cancel_with_status_type grpc_call_cancel_with_status_import;
#define grpc_call_cancel_with_status grpc_call_cancel_with_status_import
-typedef void(*grpc_call_destroy_type)(grpc_call *call);
-extern grpc_call_destroy_type grpc_call_destroy_import;
-#define grpc_call_destroy grpc_call_destroy_import
+typedef void(*grpc_call_ref_type)(grpc_call *call);
+extern grpc_call_ref_type grpc_call_ref_import;
+#define grpc_call_ref grpc_call_ref_import
+typedef void(*grpc_call_unref_type)(grpc_call *call);
+extern grpc_call_unref_type grpc_call_unref_import;
+#define grpc_call_unref grpc_call_unref_import
typedef grpc_call_error(*grpc_server_request_call_type)(grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag_new);
extern grpc_server_request_call_type grpc_server_request_call_import;
#define grpc_server_request_call grpc_server_request_call_import
@@ -329,9 +332,6 @@ extern grpc_server_create_type grpc_server_create_import;
typedef void(*grpc_server_register_completion_queue_type)(grpc_server *server, grpc_completion_queue *cq, void *reserved);
extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
#define grpc_server_register_completion_queue grpc_server_register_completion_queue_import
-typedef void(*grpc_server_register_non_listening_completion_queue_type)(grpc_server *server, grpc_completion_queue *q, void *reserved);
-extern grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
-#define grpc_server_register_non_listening_completion_queue grpc_server_register_non_listening_completion_queue_import
typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server *server, const char *addr);
extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
#define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 2286a99f24..d7408f683d 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -37,15 +37,15 @@
#include "rb_server.h"
#include <grpc/grpc.h>
-#include <grpc/support/atm.h>
#include <grpc/grpc_security.h>
+#include <grpc/support/atm.h>
#include <grpc/support/log.h>
+#include "rb_byte_buffer.h"
#include "rb_call.h"
#include "rb_channel_args.h"
#include "rb_completion_queue.h"
-#include "rb_server_credentials.h"
-#include "rb_byte_buffer.h"
#include "rb_grpc.h"
+#include "rb_server_credentials.h"
/* grpc_rb_cServer is the ruby class that proxies grpc_server. */
static VALUE grpc_rb_cServer = Qnil;
@@ -93,9 +93,8 @@ static void grpc_rb_server_free(void *p) {
};
svr = (grpc_rb_server *)p;
- deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(2, GPR_TIMESPAN));
+ deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(2, GPR_TIMESPAN));
destroy_server(svr, deadline);
@@ -104,13 +103,15 @@ static void grpc_rb_server_free(void *p) {
static const rb_data_type_t grpc_rb_server_data_type = {
"grpc_server",
- {GRPC_RB_GC_NOT_MARKED, grpc_rb_server_free, GRPC_RB_MEMSIZE_UNAVAILABLE,
+ {GRPC_RB_GC_NOT_MARKED,
+ grpc_rb_server_free,
+ GRPC_RB_MEMSIZE_UNAVAILABLE,
{NULL, NULL}},
NULL,
NULL,
#ifdef RUBY_TYPED_FREE_IMMEDIATELY
- /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free function would block
- * and we might want to unlock GVL
+ /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free
+ * function would block and we might want to unlock GVL
* TODO(yugui) Unlock GVL?
*/
0,
@@ -139,7 +140,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) {
grpc_ruby_once_init();
- cq = grpc_completion_queue_create(NULL);
+ cq = grpc_completion_queue_create_for_pluck(NULL);
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type,
wrapper);
grpc_rb_hash_convert_to_channel_args(channel_args, &args);
@@ -167,7 +168,7 @@ typedef struct request_call_stack {
/* grpc_request_call_stack_init ensures the request_call_stack is properly
* initialized */
-static void grpc_request_call_stack_init(request_call_stack* st) {
+static void grpc_request_call_stack_init(request_call_stack *st) {
MEMZERO(st, request_call_stack, 1);
grpc_metadata_array_init(&st->md_ary);
grpc_call_details_init(&st->details);
@@ -175,7 +176,7 @@ static void grpc_request_call_stack_init(request_call_stack* st) {
/* grpc_request_call_stack_cleanup ensures the request_call_stack is properly
* cleaned up */
-static void grpc_request_call_stack_cleanup(request_call_stack* st) {
+static void grpc_request_call_stack_cleanup(request_call_stack *st) {
grpc_metadata_array_destroy(&st->md_ary);
grpc_call_details_destroy(&st->details);
}
@@ -191,8 +192,9 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
grpc_call_error err;
request_call_stack st;
VALUE result;
- void *tag = (void*)&st;
- grpc_completion_queue *call_queue = grpc_completion_queue_create(NULL);
+ void *tag = (void *)&st;
+ grpc_completion_queue *call_queue =
+ grpc_completion_queue_create_for_pluck(NULL);
gpr_timespec deadline;
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
@@ -203,9 +205,8 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
grpc_request_call_stack_init(&st);
/* call grpc_server_request_call, then wait for it to complete using
* pluck_event */
- err = grpc_server_request_call(
- s->wrapped, &call, &st.details, &st.md_ary,
- call_queue, s->queue, tag);
+ err = grpc_server_request_call(s->wrapped, &call, &st.details, &st.md_ary,
+ call_queue, s->queue, tag);
if (err != GRPC_CALL_OK) {
grpc_request_call_stack_cleanup(&st);
rb_raise(grpc_rb_eCallError,
@@ -301,8 +302,7 @@ static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port,
return Qnil;
} else if (TYPE(rb_creds) == T_SYMBOL) {
if (id_insecure_server != SYM2ID(rb_creds)) {
- rb_raise(rb_eTypeError,
- "bad creds symbol, want :this_port_is_insecure");
+ rb_raise(rb_eTypeError, "bad creds symbol, want :this_port_is_insecure");
return Qnil;
}
recvd_port =
@@ -314,9 +314,8 @@ static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port,
}
} else {
creds = grpc_rb_get_wrapped_server_credentials(rb_creds);
- recvd_port =
- grpc_server_add_secure_http2_port(s->wrapped, StringValueCStr(port),
- creds);
+ recvd_port = grpc_server_add_secure_http2_port(
+ s->wrapped, StringValueCStr(port), creds);
if (recvd_port == 0) {
rb_raise(rb_eRuntimeError,
"could not add secure port %s to server, not sure why",
@@ -335,18 +334,17 @@ void Init_grpc_server() {
/* Provides a ruby constructor and support for dup/clone. */
rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 1);
- rb_define_method(grpc_rb_cServer, "initialize_copy",
- grpc_rb_cannot_init_copy, 1);
+ rb_define_method(grpc_rb_cServer, "initialize_copy", grpc_rb_cannot_init_copy,
+ 1);
/* Add the server methods. */
- rb_define_method(grpc_rb_cServer, "request_call",
- grpc_rb_server_request_call, 0);
+ rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call,
+ 0);
rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1);
rb_define_alias(grpc_rb_cServer, "close", "destroy");
rb_define_method(grpc_rb_cServer, "add_http2_port",
- grpc_rb_server_add_http2_port,
- 2);
+ grpc_rb_server_add_http2_port, 2);
id_at = rb_intern("at");
id_insecure_server = rb_intern("this_port_is_insecure");
}