diff options
author | Yuchen Zeng <y-zeng@users.noreply.github.com> | 2017-04-25 11:08:04 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-25 11:08:04 -0700 |
commit | 79759fea1abd09102d38af3e13a84b69e898124b (patch) | |
tree | 4ac7c385d528d08d9e2d6d44bb38eec8402f5802 /src/ruby/ext/grpc | |
parent | dc36f4df6aba60275a31227e1d29c4d20b6cadca (diff) | |
parent | b864e7c41c6d0363e23093fb090625f260994962 (diff) |
Merge branch 'master' into v1.3.x
Diffstat (limited to 'src/ruby/ext/grpc')
-rw-r--r-- | src/ruby/ext/grpc/rb_byte_buffer.c | 14 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_call.c | 104 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_call.h | 7 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_call_credentials.c | 45 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_channel.c | 74 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_channel_args.c | 20 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_channel_credentials.c | 28 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_completion_queue.c | 16 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_compression_options.c | 13 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_event_thread.c | 22 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_event_thread.h | 3 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc.c | 26 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc.h | 4 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc_imports.generated.c | 12 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc_imports.generated.h | 20 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_server.c | 54 |
16 files changed, 242 insertions, 220 deletions
diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c index 65fa2f2cf6..f5be19298f 100644 --- a/src/ruby/ext/grpc/rb_byte_buffer.c +++ b/src/ruby/ext/grpc/rb_byte_buffer.c @@ -33,15 +33,15 @@ #include <ruby/ruby.h> -#include "rb_grpc_imports.generated.h" #include "rb_byte_buffer.h" +#include "rb_grpc_imports.generated.h" -#include <grpc/grpc.h> #include <grpc/byte_buffer_reader.h> +#include <grpc/grpc.h> #include <grpc/slice.h> #include "rb_grpc.h" -grpc_byte_buffer* grpc_rb_s_to_byte_buffer(char *string, size_t length) { +grpc_byte_buffer *grpc_rb_s_to_byte_buffer(char *string, size_t length) { grpc_slice slice = grpc_slice_from_copied_buffer(string, length); grpc_byte_buffer *buffer = grpc_raw_byte_buffer_create(&slice, 1); grpc_slice_unref(slice); @@ -61,7 +61,7 @@ VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) { return Qnil; } while (grpc_byte_buffer_reader_next(&reader, &next) != 0) { - rb_str_cat(rb_string, (const char *) GRPC_SLICE_START_PTR(next), + rb_str_cat(rb_string, (const char *)GRPC_SLICE_START_PTR(next), GRPC_SLICE_LENGTH(next)); grpc_slice_unref(next); } @@ -71,7 +71,9 @@ VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) { VALUE grpc_rb_slice_to_ruby_string(grpc_slice slice) { if (GRPC_SLICE_START_PTR(slice) == NULL) { - rb_raise(rb_eRuntimeError, "attempt to convert uninitialized grpc_slice to ruby string"); + rb_raise(rb_eRuntimeError, + "attempt to convert uninitialized grpc_slice to ruby string"); } - return rb_str_new((char*)GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice)); + return rb_str_new((char *)GRPC_SLICE_START_PTR(slice), + GRPC_SLICE_LENGTH(slice)); } diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 344cb941ff..0067e10866 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -33,12 +33,12 @@ #include <ruby/ruby.h> -#include "rb_grpc_imports.generated.h" #include "rb_call.h" +#include "rb_grpc_imports.generated.h" #include <grpc/grpc.h> -#include <grpc/support/alloc.h> #include <grpc/impl/codegen/compression_types.h> +#include <grpc/support/alloc.h> #include "rb_byte_buffer.h" #include "rb_call_credentials.h" @@ -101,7 +101,7 @@ typedef struct grpc_rb_call { static void destroy_call(grpc_rb_call *call) { /* Ensure that we only try to destroy the call once */ if (call->wrapped != NULL) { - grpc_call_destroy(call->wrapped); + grpc_call_unref(call->wrapped); call->wrapped = NULL; grpc_rb_completion_queue_destroy(call->queue); call->queue = NULL; @@ -113,7 +113,7 @@ static void grpc_rb_call_destroy(void *p) { if (p == NULL) { return; } - destroy_call((grpc_rb_call*)p); + destroy_call((grpc_rb_call *)p); } static size_t md_ary_datasize(const void *p) { @@ -130,7 +130,9 @@ static size_t md_ary_datasize(const void *p) { static const rb_data_type_t grpc_rb_md_ary_data_type = { "grpc_metadata_array", - {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, md_ary_datasize, + {GRPC_RB_GC_NOT_MARKED, + GRPC_RB_GC_DONT_FREE, + md_ary_datasize, {NULL, NULL}}, NULL, NULL, @@ -140,19 +142,20 @@ static const rb_data_type_t grpc_rb_md_ary_data_type = { * touches a hash object. * TODO(yugui) Directly use st_table and call the free function earlier? */ - 0, + 0, #endif }; /* Describes grpc_call struct for RTypedData */ -static const rb_data_type_t grpc_call_data_type = { - "grpc_call", - {GRPC_RB_GC_NOT_MARKED, grpc_rb_call_destroy, GRPC_RB_MEMSIZE_UNAVAILABLE, - {NULL, NULL}}, - NULL, - NULL, +static const rb_data_type_t grpc_call_data_type = {"grpc_call", + {GRPC_RB_GC_NOT_MARKED, + grpc_rb_call_destroy, + GRPC_RB_MEMSIZE_UNAVAILABLE, + {NULL, NULL}}, + NULL, + NULL, #ifdef RUBY_TYPED_FREE_IMMEDIATELY - RUBY_TYPED_FREE_IMMEDIATELY + RUBY_TYPED_FREE_IMMEDIATELY #endif }; @@ -175,7 +178,7 @@ static VALUE grpc_rb_call_cancel(VALUE self) { grpc_rb_call *call = NULL; grpc_call_error err; if (RTYPEDDATA_DATA(self) == NULL) { - //This call has been closed + // This call has been closed return Qnil; } @@ -196,7 +199,7 @@ static VALUE grpc_rb_call_cancel(VALUE self) { static VALUE grpc_rb_call_close(VALUE self) { grpc_rb_call *call = NULL; TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call); - if(call != NULL) { + if (call != NULL) { destroy_call(call); RTYPEDDATA_DATA(self) = NULL; } @@ -238,8 +241,8 @@ static VALUE grpc_rb_call_get_peer_cert(VALUE self) { } { - grpc_auth_property_iterator it = - grpc_auth_context_find_properties_by_name(ctx, GRPC_X509_PEM_CERT_PROPERTY_NAME); + grpc_auth_property_iterator it = grpc_auth_context_find_properties_by_name( + ctx, GRPC_X509_PEM_CERT_PROPERTY_NAME); const grpc_auth_property *prop = grpc_auth_property_iterator_next(&it); if (prop == NULL) { return Qnil; @@ -388,21 +391,22 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { long i; grpc_slice key_slice; grpc_slice value_slice; - char* tmp_str; + char *tmp_str; if (TYPE(key) == T_SYMBOL) { key_slice = grpc_slice_from_static_string(rb_id2name(SYM2ID(key))); } else if (TYPE(key) == T_STRING) { - key_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(key), RSTRING_LEN(key)); + key_slice = + grpc_slice_from_copied_buffer(RSTRING_PTR(key), RSTRING_LEN(key)); } else { - rb_raise(rb_eTypeError, "grpc_rb_md_ary_fill_hash_cb: bad type for key parameter"); + rb_raise(rb_eTypeError, + "grpc_rb_md_ary_fill_hash_cb: bad type for key parameter"); } if (!grpc_header_key_is_legal(key_slice)) { tmp_str = grpc_slice_to_c_string(key_slice); rb_raise(rb_eArgError, - "'%s' is an invalid header key, must match [a-z0-9-_.]+", - tmp_str); + "'%s' is an invalid header key, must match [a-z0-9-_.]+", tmp_str); return ST_STOP; } @@ -414,13 +418,14 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { array_length = RARRAY_LEN(val); /* If the value is an array, add capacity for each value in the array */ for (i = 0; i < array_length; i++) { - value_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(rb_ary_entry(val, i)), RSTRING_LEN(rb_ary_entry(val, i))); + value_slice = grpc_slice_from_copied_buffer( + RSTRING_PTR(rb_ary_entry(val, i)), RSTRING_LEN(rb_ary_entry(val, i))); if (!grpc_is_binary_header(key_slice) && !grpc_header_nonbin_value_is_legal(value_slice)) { // The value has invalid characters tmp_str = grpc_slice_to_c_string(value_slice); - rb_raise(rb_eArgError, - "Header value '%s' has invalid characters", tmp_str); + rb_raise(rb_eArgError, "Header value '%s' has invalid characters", + tmp_str); return ST_STOP; } md_ary->metadata[md_ary->count].key = key_slice; @@ -428,21 +433,21 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { md_ary->count += 1; } } else if (TYPE(val) == T_STRING) { - value_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(val), RSTRING_LEN(val)); + value_slice = + grpc_slice_from_copied_buffer(RSTRING_PTR(val), RSTRING_LEN(val)); if (!grpc_is_binary_header(key_slice) && !grpc_header_nonbin_value_is_legal(value_slice)) { // The value has invalid characters tmp_str = grpc_slice_to_c_string(value_slice); - rb_raise(rb_eArgError, - "Header value '%s' has invalid characters", tmp_str); + rb_raise(rb_eArgError, "Header value '%s' has invalid characters", + tmp_str); return ST_STOP; } md_ary->metadata[md_ary->count].key = key_slice; md_ary->metadata[md_ary->count].value = value_slice; md_ary->count += 1; } else { - rb_raise(rb_eArgError, - "Header values must be of type string or array"); + rb_raise(rb_eArgError, "Header values must be of type string or array"); return ST_STOP; } @@ -474,8 +479,7 @@ static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val, /* grpc_rb_md_ary_convert converts a ruby metadata hash into a grpc_metadata_array. */ -void grpc_rb_md_ary_convert(VALUE md_ary_hash, - grpc_metadata_array *md_ary) { +void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) { VALUE md_ary_obj = Qnil; if (md_ary_hash == Qnil) { return; /* Do nothing if the expected has value is nil */ @@ -511,12 +515,14 @@ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary) { rb_hash_aset(result, key, value); } else if (TYPE(value) == T_ARRAY) { /* Add the string to the returned array */ - rb_ary_push(value, grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value)); + rb_ary_push(value, + grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value)); } else { /* Add the current value with this key and the new one to an array */ new_ary = rb_ary_new(); rb_ary_push(new_ary, value); - rb_ary_push(new_ary, grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value)); + rb_ary_push(new_ary, + grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value)); rb_hash_aset(result, key, new_ary); } } @@ -556,10 +562,9 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, /* grpc_rb_op_update_status_from_server adds the values in a ruby status struct to the 'send_status_from_server' portion of an op. */ -static void grpc_rb_op_update_status_from_server(grpc_op *op, - grpc_metadata_array *md_ary, - grpc_slice *send_status_details, - VALUE status) { +static void grpc_rb_op_update_status_from_server( + grpc_op *op, grpc_metadata_array *md_ary, grpc_slice *send_status_details, + VALUE status) { VALUE code = rb_struct_aref(status, sym_code); VALUE details = rb_struct_aref(status, sym_details); VALUE metadata_hash = rb_struct_aref(status, sym_metadata); @@ -576,7 +581,8 @@ static void grpc_rb_op_update_status_from_server(grpc_op *op, return; } - *send_status_details = grpc_slice_from_copied_buffer(RSTRING_PTR(details), RSTRING_LEN(details)); + *send_status_details = + grpc_slice_from_copied_buffer(RSTRING_PTR(details), RSTRING_LEN(details)); op->data.send_status_from_server.status = NUM2INT(code); op->data.send_status_from_server.status_details = send_status_details; @@ -687,7 +693,8 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { /* N.B. later there is no need to explicitly delete the metadata keys * and values, they are references to data in ruby objects. */ grpc_rb_op_update_status_from_server( - &st->ops[st->op_num], &st->send_trailing_metadata, &st->send_status_details, this_value); + &st->ops[st->op_num], &st->send_trailing_metadata, + &st->send_status_details, this_value); break; case GRPC_OP_RECV_INITIAL_METADATA: st->ops[st->op_num].data.recv_initial_metadata.recv_initial_metadata = @@ -749,12 +756,12 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) { case GRPC_OP_RECV_STATUS_ON_CLIENT: rb_struct_aset( result, sym_status, - rb_struct_new(grpc_rb_sStatus, UINT2NUM(st->recv_status), - (GRPC_SLICE_START_PTR(st->recv_status_details) == NULL - ? Qnil - : grpc_rb_slice_to_ruby_string(st->recv_status_details)), - grpc_rb_md_ary_to_h(&st->recv_trailing_metadata), - NULL)); + rb_struct_new( + grpc_rb_sStatus, UINT2NUM(st->recv_status), + (GRPC_SLICE_START_PTR(st->recv_status_details) == NULL + ? Qnil + : grpc_rb_slice_to_ruby_string(st->recv_status_details)), + grpc_rb_md_ary_to_h(&st->recv_trailing_metadata), NULL)); break; case GRPC_OP_RECV_CLOSE_ON_SERVER: rb_struct_aset(result, sym_send_close, Qtrue); @@ -791,7 +798,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) { VALUE result = Qnil; VALUE rb_write_flag = rb_ivar_get(self, id_write_flag); unsigned write_flag = 0; - void *tag = (void*)&st; + void *tag = (void *)&st; if (RTYPEDDATA_DATA(self) == NULL) { rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call"); @@ -919,7 +926,8 @@ static void Init_grpc_op_codes() { } static void Init_grpc_metadata_keys() { - VALUE grpc_rb_mMetadataKeys = rb_define_module_under(grpc_rb_mGrpcCore, "MetadataKeys"); + VALUE grpc_rb_mMetadataKeys = + rb_define_module_under(grpc_rb_mGrpcCore, "MetadataKeys"); rb_define_const(grpc_rb_mMetadataKeys, "COMPRESSION_REQUEST_ALGORITHM", rb_str_new2(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY)); } diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h index 56becdc5a4..dc597f7ba7 100644 --- a/src/ruby/ext/grpc/rb_call.h +++ b/src/ruby/ext/grpc/rb_call.h @@ -39,13 +39,13 @@ #include <grpc/grpc.h> /* Gets the wrapped call from a VALUE. */ -grpc_call* grpc_rb_get_wrapped_call(VALUE v); +grpc_call *grpc_rb_get_wrapped_call(VALUE v); /* Gets the VALUE corresponding to given grpc_call. */ VALUE grpc_rb_wrap_call(grpc_call *c, grpc_completion_queue *q); /* Provides the details of an call error */ -const char* grpc_call_error_detail_of(grpc_call_error err); +const char *grpc_call_error_detail_of(grpc_call_error err); /* Converts a metadata array to a hash. */ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary); @@ -53,8 +53,7 @@ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary); /* grpc_rb_md_ary_convert converts a ruby metadata hash into a grpc_metadata_array. */ -void grpc_rb_md_ary_convert(VALUE md_ary_hash, - grpc_metadata_array *md_ary); +void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary); /* grpc_rb_eCallError is the ruby class of the exception thrown during call operations. */ diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c index fac2cbb04d..7a5a74580d 100644 --- a/src/ruby/ext/grpc/rb_call_credentials.c +++ b/src/ruby/ext/grpc/rb_call_credentials.c @@ -33,8 +33,8 @@ #include <ruby/ruby.h> -#include "rb_grpc_imports.generated.h" #include "rb_call_credentials.h" +#include "rb_grpc_imports.generated.h" #include <ruby/thread.h> @@ -82,20 +82,18 @@ static VALUE grpc_rb_call_credentials_callback(VALUE callback_args) { static VALUE grpc_rb_call_credentials_callback_rescue(VALUE args, VALUE exception_object) { VALUE result = rb_hash_new(); - VALUE backtrace = rb_funcall( - rb_funcall(exception_object, rb_intern("backtrace"), 0), - rb_intern("join"), - 1, rb_str_new2("\n\tfrom ")); - VALUE rb_exception_info = rb_funcall(exception_object, rb_intern("inspect"), 0); + VALUE backtrace = + rb_funcall(rb_funcall(exception_object, rb_intern("backtrace"), 0), + rb_intern("join"), 1, rb_str_new2("\n\tfrom ")); + VALUE rb_exception_info = + rb_funcall(exception_object, rb_intern("inspect"), 0); (void)args; gpr_log(GPR_INFO, "Call credentials callback failed: %s\n%s", - StringValueCStr(rb_exception_info), - StringValueCStr(backtrace)); + StringValueCStr(rb_exception_info), StringValueCStr(backtrace)); rb_hash_aset(result, rb_str_new2("metadata"), Qnil); rb_hash_aset(result, rb_str_new2("status"), INT2NUM(GRPC_STATUS_UNAUTHENTICATED)); - rb_hash_aset(result, rb_str_new2("details"), - rb_exception_info); + rb_hash_aset(result, rb_str_new2("details"), rb_exception_info); return result; } @@ -118,7 +116,8 @@ static void grpc_rb_call_credentials_callback_with_gil(void *param) { result = rb_rescue(grpc_rb_call_credentials_callback, callback_args, grpc_rb_call_credentials_callback_rescue, Qnil); // Both callbacks return a hash, so result should be a hash - grpc_rb_md_ary_convert(rb_hash_aref(result, rb_str_new2("metadata")), &md_ary); + grpc_rb_md_ary_convert(rb_hash_aref(result, rb_str_new2("metadata")), + &md_ary); status = NUM2INT(rb_hash_aref(result, rb_str_new2("status"))); details = rb_hash_aref(result, rb_str_new2("details")); error_details = StringValueCStr(details); @@ -138,7 +137,7 @@ static void grpc_rb_call_credentials_plugin_get_metadata( params->callback = cb; grpc_rb_event_queue_enqueue(grpc_rb_call_credentials_callback_with_gil, - (void*)(params)); + (void *)(params)); } static void grpc_rb_call_credentials_plugin_destroy(void *state) { @@ -172,13 +171,15 @@ static void grpc_rb_call_credentials_mark(void *p) { } static rb_data_type_t grpc_rb_call_credentials_data_type = { - "grpc_call_credentials", - {grpc_rb_call_credentials_mark, grpc_rb_call_credentials_free, - GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}}, - NULL, - NULL, + "grpc_call_credentials", + {grpc_rb_call_credentials_mark, + grpc_rb_call_credentials_free, + GRPC_RB_MEMSIZE_UNAVAILABLE, + {NULL, NULL}}, + NULL, + NULL, #ifdef RUBY_TYPED_FREE_IMMEDIATELY - RUBY_TYPED_FREE_IMMEDIATELY + RUBY_TYPED_FREE_IMMEDIATELY #endif }; @@ -188,7 +189,8 @@ static VALUE grpc_rb_call_credentials_alloc(VALUE cls) { grpc_rb_call_credentials *wrapper = ALLOC(grpc_rb_call_credentials); wrapper->wrapped = NULL; wrapper->mark = Qnil; - return TypedData_Wrap_Struct(cls, &grpc_rb_call_credentials_data_type, wrapper); + return TypedData_Wrap_Struct(cls, &grpc_rb_call_credentials_data_type, + wrapper); } /* Creates a wrapping object for a given call credentials. This should only be @@ -232,7 +234,7 @@ static VALUE grpc_rb_call_credentials_init(VALUE self, VALUE proc) { rb_raise(rb_eTypeError, "Argument to CallCredentials#new must be a proc"); return Qnil; } - plugin.state = (void*)proc; + plugin.state = (void *)proc; plugin.type = ""; creds = grpc_metadata_credentials_create_from_plugin(plugin, NULL); @@ -289,7 +291,6 @@ void Init_grpc_call_credentials() { grpc_call_credentials *grpc_rb_get_wrapped_call_credentials(VALUE v) { grpc_rb_call_credentials *wrapper = NULL; TypedData_Get_Struct(v, grpc_rb_call_credentials, - &grpc_rb_call_credentials_data_type, - wrapper); + &grpc_rb_call_credentials_data_type, wrapper); return wrapper->wrapped; } diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index fb610f548e..a802183726 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -34,9 +34,9 @@ #include <ruby/ruby.h> #include <ruby/thread.h> -#include "rb_grpc_imports.generated.h" #include "rb_byte_buffer.h" #include "rb_channel.h" +#include "rb_grpc_imports.generated.h" #include <grpc/grpc.h> #include <grpc/grpc_security.h> @@ -89,8 +89,8 @@ typedef struct grpc_rb_channel { static void grpc_rb_channel_try_register_connection_polling( grpc_rb_channel *wrapper); static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper); -static void *wait_until_channel_polling_thread_started_no_gil(void*); -static void wait_until_channel_polling_thread_started_unblocking_func(void*); +static void *wait_until_channel_polling_thread_started_no_gil(void *); +static void wait_until_channel_polling_thread_started_unblocking_func(void *); static grpc_completion_queue *channel_polling_cq; static gpr_mu global_connection_polling_mu; @@ -171,8 +171,9 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { MEMZERO(&args, grpc_channel_args, 1); grpc_ruby_once_init(); - rb_thread_call_without_gvl(wait_until_channel_polling_thread_started_no_gil, NULL, - wait_until_channel_polling_thread_started_unblocking_func, NULL); + rb_thread_call_without_gvl( + wait_until_channel_polling_thread_started_no_gil, NULL, + wait_until_channel_polling_thread_started_unblocking_func, NULL); /* "3" == 3 mandatory args */ rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials); @@ -204,14 +205,14 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { gpr_mu_lock(&wrapper->channel_mu); wrapper->abort_watch_connectivity_state = 0; - wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); + wrapper->current_connectivity_state = + grpc_channel_check_connectivity_state(wrapper->wrapped, 0); wrapper->safe_to_destroy = 0; wrapper->request_safe_destroy = 0; gpr_cv_broadcast(&wrapper->channel_cv); gpr_mu_unlock(&wrapper->channel_mu); - grpc_rb_channel_try_register_connection_polling(wrapper); if (args.args != NULL) { @@ -253,7 +254,8 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } - return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect)); + return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, + grpc_try_to_connect)); } typedef struct watch_state_stack { @@ -263,22 +265,21 @@ typedef struct watch_state_stack { } watch_state_stack; static void *watch_channel_state_without_gvl(void *arg) { - watch_state_stack *stack = (watch_state_stack*)arg; + watch_state_stack *stack = (watch_state_stack *)arg; gpr_timespec deadline = stack->deadline; grpc_rb_channel *wrapper = stack->wrapper; int last_state = stack->last_state; - void *return_value = (void*)0; + void *return_value = (void *)0; gpr_mu_lock(&wrapper->channel_mu); - while(wrapper->current_connectivity_state == last_state && - !wrapper->request_safe_destroy && - !wrapper->safe_to_destroy && - !wrapper->abort_watch_connectivity_state && - gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) { + while (wrapper->current_connectivity_state == last_state && + !wrapper->request_safe_destroy && !wrapper->safe_to_destroy && + !wrapper->abort_watch_connectivity_state && + gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) { gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); } if (wrapper->current_connectivity_state != last_state) { - return_value = (void*)1; + return_value = (void *)1; } gpr_mu_unlock(&wrapper->channel_mu); @@ -286,7 +287,7 @@ static void *watch_channel_state_without_gvl(void *arg) { } static void watch_channel_state_unblocking_func(void *arg) { - grpc_rb_channel *wrapper = (grpc_rb_channel*)arg; + grpc_rb_channel *wrapper = (grpc_rb_channel *)arg; gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called"); gpr_mu_lock(&wrapper->channel_mu); wrapper->abort_watch_connectivity_state = 1; @@ -306,7 +307,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE deadline) { grpc_rb_channel *wrapper = NULL; watch_state_stack stack; - void* out; + void *out; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); @@ -316,14 +317,18 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, } if (!FIXNUM_P(last_state)) { - rb_raise(rb_eTypeError, "bad type for last_state. want a GRPC::Core::ChannelState constant"); + rb_raise( + rb_eTypeError, + "bad type for last_state. want a GRPC::Core::ChannelState constant"); return Qnil; } stack.wrapper = wrapper; stack.deadline = grpc_rb_time_timeval(deadline, 0); stack.last_state = NUM2LONG(last_state); - out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper); + out = + rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, + watch_channel_state_unblocking_func, wrapper); if (out) { return Qtrue; } @@ -359,7 +364,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, parent_call = grpc_rb_get_wrapped_call(parent); } - cq = grpc_completion_queue_create(NULL); + cq = grpc_completion_queue_create_for_pluck(NULL); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; if (ch == NULL) { @@ -428,7 +433,7 @@ static VALUE grpc_rb_channel_get_target(VALUE self) { // destroy. // Not safe to call while a channel's connection state is polled. static void grpc_rb_channel_try_register_connection_polling( - grpc_rb_channel *wrapper) { + grpc_rb_channel *wrapper) { grpc_connectivity_state conn_state; gpr_timespec sleep_time = gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); @@ -501,11 +506,14 @@ static void *run_poll_channels_loop_no_gil(void *arg) { break; } if (event.type == GRPC_OP_COMPLETE) { - grpc_rb_channel_try_register_connection_polling((grpc_rb_channel *)event.tag); + grpc_rb_channel_try_register_connection_polling( + (grpc_rb_channel *)event.tag); } } grpc_completion_queue_destroy(channel_polling_cq); - gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop"); + gpr_log(GPR_DEBUG, + "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling " + "loop"); return NULL; } @@ -513,7 +521,9 @@ static void *run_poll_channels_loop_no_gil(void *arg) { static void run_poll_channels_loop_unblocking_func(void *arg) { (void)arg; gpr_mu_lock(&global_connection_polling_mu); - gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting connection polling"); + gpr_log(GPR_DEBUG, + "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting " + "connection polling"); abort_channel_polling = 1; grpc_completion_queue_shutdown(channel_polling_cq); gpr_mu_unlock(&global_connection_polling_mu); @@ -522,7 +532,9 @@ static void run_poll_channels_loop_unblocking_func(void *arg) { // Poll channel connectivity states in background thread without the GIL. static VALUE run_poll_channels_loop(VALUE arg) { (void)arg; - gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); + gpr_log( + GPR_DEBUG, + "GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL, run_poll_channels_loop_unblocking_func, NULL); @@ -542,10 +554,14 @@ static void *wait_until_channel_polling_thread_started_no_gil(void *arg) { return NULL; } -static void wait_until_channel_polling_thread_started_unblocking_func(void* arg) { +static void wait_until_channel_polling_thread_started_unblocking_func( + void *arg) { (void)arg; gpr_mu_lock(&global_connection_polling_mu); - gpr_log(GPR_DEBUG, "GRPC_RUBY: wait_until_channel_polling_thread_started_unblocking_func - begin aborting connection polling"); + gpr_log(GPR_DEBUG, + "GRPC_RUBY: " + "wait_until_channel_polling_thread_started_unblocking_func - begin " + "aborting connection polling"); abort_channel_polling = 1; gpr_cv_broadcast(&global_connection_polling_cv); gpr_mu_unlock(&global_connection_polling_mu); @@ -571,7 +587,7 @@ void grpc_rb_channel_polling_thread_start() { gpr_mu_init(&global_connection_polling_mu); gpr_cv_init(&global_connection_polling_cv); - channel_polling_cq = grpc_completion_queue_create(NULL); + channel_polling_cq = grpc_completion_queue_create_for_next(NULL); background_thread = rb_thread_create(run_poll_channels_loop, NULL); if (!RTEST(background_thread)) { diff --git a/src/ruby/ext/grpc/rb_channel_args.c b/src/ruby/ext/grpc/rb_channel_args.c index 87c0e0a705..fa9ddee372 100644 --- a/src/ruby/ext/grpc/rb_channel_args.c +++ b/src/ruby/ext/grpc/rb_channel_args.c @@ -33,8 +33,8 @@ #include <ruby/ruby.h> -#include "rb_grpc_imports.generated.h" #include "rb_channel_args.h" +#include "rb_grpc_imports.generated.h" #include <grpc/grpc.h> @@ -42,9 +42,12 @@ static rb_data_type_t grpc_rb_channel_args_data_type = { "grpc_channel_args", - {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, GRPC_RB_MEMSIZE_UNAVAILABLE, + {GRPC_RB_GC_NOT_MARKED, + GRPC_RB_GC_DONT_FREE, + GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}}, - NULL, NULL, + NULL, + NULL, #ifdef RUBY_TYPED_FREE_IMMEDIATELY RUBY_TYPED_FREE_IMMEDIATELY #endif @@ -137,11 +140,10 @@ static VALUE grpc_rb_hash_convert_to_channel_args0(VALUE as_value) { params->dst->num_args = num_args; params->dst->args = ALLOC_N(grpc_arg, num_args); MEMZERO(params->dst->args, grpc_arg, num_args); - rb_hash_foreach(params->src_hash, - grpc_rb_channel_create_in_process_add_args_hash_cb, - TypedData_Wrap_Struct(grpc_rb_cChannelArgs, - &grpc_rb_channel_args_data_type, - params->dst)); + rb_hash_foreach( + params->src_hash, grpc_rb_channel_create_in_process_add_args_hash_cb, + TypedData_Wrap_Struct(grpc_rb_cChannelArgs, + &grpc_rb_channel_args_data_type, params->dst)); /* reset num_args as grpc_rb_channel_create_in_process_add_args_hash_cb * decrements it during has processing */ params->dst->num_args = num_args; @@ -157,7 +159,7 @@ void grpc_rb_hash_convert_to_channel_args(VALUE src_hash, /* Make a protected call to grpc_rb_hash_convert_channel_args */ params.src_hash = src_hash; params.dst = dst; - rb_protect(grpc_rb_hash_convert_to_channel_args0, (VALUE) & params, &status); + rb_protect(grpc_rb_hash_convert_to_channel_args0, (VALUE)¶ms, &status); if (status != 0) { if (dst->args != NULL) { /* Free any allocated memory before propagating the error */ diff --git a/src/ruby/ext/grpc/rb_channel_credentials.c b/src/ruby/ext/grpc/rb_channel_credentials.c index d334c09148..db713ed821 100644 --- a/src/ruby/ext/grpc/rb_channel_credentials.c +++ b/src/ruby/ext/grpc/rb_channel_credentials.c @@ -35,8 +35,8 @@ #include <string.h> -#include "rb_grpc_imports.generated.h" #include "rb_channel_credentials.h" +#include "rb_grpc_imports.generated.h" #include <grpc/grpc.h> #include <grpc/grpc_security.h> @@ -91,8 +91,10 @@ static void grpc_rb_channel_credentials_mark(void *p) { static rb_data_type_t grpc_rb_channel_credentials_data_type = { "grpc_channel_credentials", - {grpc_rb_channel_credentials_mark, grpc_rb_channel_credentials_free, - GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}}, + {grpc_rb_channel_credentials_mark, + grpc_rb_channel_credentials_free, + GRPC_RB_MEMSIZE_UNAVAILABLE, + {NULL, NULL}}, NULL, NULL, #ifdef RUBY_TYPED_FREE_IMMEDIATELY @@ -106,13 +108,15 @@ static VALUE grpc_rb_channel_credentials_alloc(VALUE cls) { grpc_rb_channel_credentials *wrapper = ALLOC(grpc_rb_channel_credentials); wrapper->wrapped = NULL; wrapper->mark = Qnil; - return TypedData_Wrap_Struct(cls, &grpc_rb_channel_credentials_data_type, wrapper); + return TypedData_Wrap_Struct(cls, &grpc_rb_channel_credentials_data_type, + wrapper); } /* Creates a wrapping object for a given channel credentials. This should only * be called with grpc_channel_credentials objects that are not already * associated with any Ruby object. */ -VALUE grpc_rb_wrap_channel_credentials(grpc_channel_credentials *c, VALUE mark) { +VALUE grpc_rb_wrap_channel_credentials(grpc_channel_credentials *c, + VALUE mark) { VALUE rb_wrapper; grpc_rb_channel_credentials *wrapper; if (c == NULL) { @@ -147,7 +151,8 @@ static ID id_pem_cert_chain; pem_private_key: (optional) PEM encoding of the client's private key pem_cert_chain: (optional) PEM encoding of the client's cert chain Initializes Credential instances. */ -static VALUE grpc_rb_channel_credentials_init(int argc, VALUE *argv, VALUE self) { +static VALUE grpc_rb_channel_credentials_init(int argc, VALUE *argv, + VALUE self) { VALUE pem_root_certs = Qnil; VALUE pem_private_key = Qnil; VALUE pem_cert_chain = Qnil; @@ -173,8 +178,8 @@ static VALUE grpc_rb_channel_credentials_init(int argc, VALUE *argv, VALUE self) } else { key_cert_pair.private_key = RSTRING_PTR(pem_private_key); key_cert_pair.cert_chain = RSTRING_PTR(pem_cert_chain); - creds = grpc_ssl_credentials_create(pem_root_certs_cstr, - &key_cert_pair, NULL); + creds = + grpc_ssl_credentials_create(pem_root_certs_cstr, &key_cert_pair, NULL); } if (creds == NULL) { rb_raise(rb_eRuntimeError, "could not create a credentials, not sure why"); @@ -233,8 +238,8 @@ static VALUE grpc_rb_set_default_roots_pem(VALUE self, VALUE roots) { } void Init_grpc_channel_credentials() { - grpc_rb_cChannelCredentials = - rb_define_class_under(grpc_rb_mGrpcCore, "ChannelCredentials", rb_cObject); + grpc_rb_cChannelCredentials = rb_define_class_under( + grpc_rb_mGrpcCore, "ChannelCredentials", rb_cObject); /* Allocates an object managed by the ruby runtime */ rb_define_alloc_func(grpc_rb_cChannelCredentials, @@ -262,7 +267,6 @@ void Init_grpc_channel_credentials() { grpc_channel_credentials *grpc_rb_get_wrapped_channel_credentials(VALUE v) { grpc_rb_channel_credentials *wrapper = NULL; TypedData_Get_Struct(v, grpc_rb_channel_credentials, - &grpc_rb_channel_credentials_data_type, - wrapper); + &grpc_rb_channel_credentials_data_type, wrapper); return wrapper->wrapped; } diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index fd75d2f691..c9d67739a5 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -33,14 +33,14 @@ #include <ruby/ruby.h> -#include "rb_grpc_imports.generated.h" #include "rb_completion_queue.h" +#include "rb_grpc_imports.generated.h" #include <ruby/thread.h> #include <grpc/grpc.h> -#include <grpc/support/time.h> #include <grpc/support/log.h> +#include <grpc/support/time.h> #include "rb_grpc.h" /* Used to allow grpc_completion_queue_next call to release the GIL */ @@ -54,14 +54,13 @@ typedef struct next_call_stack { /* Calls grpc_completion_queue_pluck without holding the ruby GIL */ static void *grpc_rb_completion_queue_pluck_no_gil(void *param) { - next_call_stack *const next_call = (next_call_stack*)param; + next_call_stack *const next_call = (next_call_stack *)param; gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN); gpr_timespec deadline; do { deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment); - next_call->event = grpc_completion_queue_pluck(next_call->cq, - next_call->tag, - deadline, NULL); + next_call->event = grpc_completion_queue_pluck( + next_call->cq, next_call->tag, deadline, NULL); if (next_call->event.type != GRPC_QUEUE_TIMEOUT || gpr_time_cmp(deadline, next_call->timeout) > 0) { break; @@ -81,7 +80,7 @@ void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq) { } static void unblock_func(void *param) { - next_call_stack *const next_call = (next_call_stack*)param; + next_call_stack *const next_call = (next_call_stack *)param; next_call->interrupted = 1; } @@ -111,7 +110,6 @@ grpc_event rb_completion_queue_pluck(grpc_completion_queue *queue, void *tag, (void *)&next_call); /* If an interrupt prevented pluck from returning useful information, then any plucks that did complete must have timed out */ - } while (next_call.interrupted && - next_call.event.type == GRPC_QUEUE_TIMEOUT); + } while (next_call.interrupted && next_call.event.type == GRPC_QUEUE_TIMEOUT); return next_call.event; } diff --git a/src/ruby/ext/grpc/rb_compression_options.c b/src/ruby/ext/grpc/rb_compression_options.c index b23e82b0db..45c963dca6 100644 --- a/src/ruby/ext/grpc/rb_compression_options.c +++ b/src/ruby/ext/grpc/rb_compression_options.c @@ -33,15 +33,15 @@ #include <ruby/ruby.h> -#include "rb_compression_options.h" #include "rb_byte_buffer.h" +#include "rb_compression_options.h" #include "rb_grpc_imports.generated.h" #include <grpc/compression.h> #include <grpc/grpc.h> -#include <grpc/support/alloc.h> #include <grpc/impl/codegen/compression_types.h> #include <grpc/impl/codegen/grpc_types.h> +#include <grpc/support/alloc.h> #include <string.h> #include "rb_grpc.h" @@ -182,15 +182,16 @@ void grpc_rb_compression_options_algorithm_name_to_value_internal( * correct C string out of it. */ algorithm_name_as_string = rb_funcall(algorithm_name, rb_intern("to_s"), 0); - name_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(algorithm_name_as_string), RSTRING_LEN(algorithm_name_as_string)); + name_slice = + grpc_slice_from_copied_buffer(RSTRING_PTR(algorithm_name_as_string), + RSTRING_LEN(algorithm_name_as_string)); /* Raise an error if the name isn't recognized as a compression algorithm by * the algorithm parse function * in GRPC core. */ - if(!grpc_compression_algorithm_parse(name_slice, algorithm_value)) { + if (!grpc_compression_algorithm_parse(name_slice, algorithm_value)) { tmp_str = grpc_slice_to_c_string(name_slice); - rb_raise(rb_eNameError, "Invalid compression algorithm name: %s", - tmp_str); + rb_raise(rb_eNameError, "Invalid compression algorithm name: %s", tmp_str); } grpc_slice_unref(name_slice); diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c index 9e85bbcfbf..9a3b56ddfb 100644 --- a/src/ruby/ext/grpc/rb_event_thread.c +++ b/src/ruby/ext/grpc/rb_event_thread.c @@ -33,20 +33,20 @@ #include <ruby/ruby.h> -#include "rb_grpc_imports.generated.h" #include "rb_event_thread.h" +#include "rb_grpc_imports.generated.h" #include <stdbool.h> -#include <ruby/thread.h> #include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> -#include <grpc/support/log.h> +#include <ruby/thread.h> typedef struct grpc_rb_event { // callback will be called with argument while holding the GVL - void (*callback)(void*); + void (*callback)(void *); void *argument; struct grpc_rb_event *next; @@ -65,8 +65,7 @@ typedef struct grpc_rb_event_queue { static grpc_rb_event_queue event_queue; -void grpc_rb_event_queue_enqueue(void (*callback)(void*), - void *argument) { +void grpc_rb_event_queue_enqueue(void (*callback)(void *), void *argument) { grpc_rb_event *event = gpr_malloc(sizeof(grpc_rb_event)); event->callback = callback; event->argument = argument; @@ -107,8 +106,7 @@ static void *grpc_rb_wait_for_event_no_gil(void *param) { (void)param; gpr_mu_lock(&event_queue.mu); while ((event = grpc_rb_event_queue_dequeue()) == NULL) { - gpr_cv_wait(&event_queue.cv, - &event_queue.mu, + gpr_cv_wait(&event_queue.cv, &event_queue.mu, gpr_inf_future(GPR_CLOCK_REALTIME)); if (event_queue.abort) { gpr_mu_unlock(&event_queue.mu); @@ -132,10 +130,10 @@ static void grpc_rb_event_unblocking_func(void *arg) { static VALUE grpc_rb_event_thread(VALUE arg) { grpc_rb_event *event; (void)arg; - while(true) { - event = (grpc_rb_event*)rb_thread_call_without_gvl( - grpc_rb_wait_for_event_no_gil, NULL, - grpc_rb_event_unblocking_func, NULL); + while (true) { + event = (grpc_rb_event *)rb_thread_call_without_gvl( + grpc_rb_wait_for_event_no_gil, NULL, grpc_rb_event_unblocking_func, + NULL); if (event == NULL) { // Indicates that the thread needs to shut down break; diff --git a/src/ruby/ext/grpc/rb_event_thread.h b/src/ruby/ext/grpc/rb_event_thread.h index 46638bfcf5..d7eff760a1 100644 --- a/src/ruby/ext/grpc/rb_event_thread.h +++ b/src/ruby/ext/grpc/rb_event_thread.h @@ -33,5 +33,4 @@ void grpc_rb_event_queue_thread_start(); -void grpc_rb_event_queue_enqueue(void (*callback)(void*), - void *argument); +void grpc_rb_event_queue_enqueue(void (*callback)(void *), void *argument); diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index 584b5dbc63..5be8861e0c 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -33,8 +33,8 @@ #include <ruby/ruby.h> -#include "rb_grpc_imports.generated.h" #include "rb_grpc.h" +#include "rb_grpc_imports.generated.h" #include <math.h> #include <ruby/vm.h> @@ -46,18 +46,19 @@ #include "rb_call_credentials.h" #include "rb_channel.h" #include "rb_channel_credentials.h" +#include "rb_compression_options.h" +#include "rb_event_thread.h" #include "rb_loader.h" #include "rb_server.h" #include "rb_server_credentials.h" -#include "rb_compression_options.h" -#include "rb_event_thread.h" -#include "rb_channel.h" static VALUE grpc_rb_cTimeVal = Qnil; static rb_data_type_t grpc_rb_timespec_data_type = { "gpr_timespec", - {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, GRPC_RB_MEMSIZE_UNAVAILABLE, + {GRPC_RB_GC_NOT_MARKED, + GRPC_RB_GC_DONT_FREE, + GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}}, NULL, NULL, @@ -86,8 +87,7 @@ VALUE grpc_rb_cannot_init(VALUE self) { /* Init/Clone func that fails by raising an exception. */ VALUE grpc_rb_cannot_init_copy(VALUE copy, VALUE self) { (void)self; - rb_raise(rb_eTypeError, - "Copy initialization of %s is not supported", + rb_raise(rb_eTypeError, "Copy initialization of %s is not supported", rb_obj_classname(copy)); return Qnil; } @@ -145,8 +145,7 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval) { } t.tv_sec = (int64_t)f; if (f != t.tv_sec) { - rb_raise(rb_eRangeError, "%f out of Time range", - RFLOAT_VALUE(time)); + rb_raise(rb_eRangeError, "%f out of Time range", RFLOAT_VALUE(time)); } t.tv_nsec = (int)(d * 1e9 + 0.5); } @@ -271,9 +270,7 @@ static void Init_grpc_time_consts() { id_tv_nsec = rb_intern("tv_nsec"); } -static void grpc_rb_shutdown(void) { - grpc_shutdown(); -} +static void grpc_rb_shutdown(void) { grpc_shutdown(); } /* Initialize the GRPC module structs */ @@ -322,9 +319,8 @@ void Init_grpc_c() { grpc_rb_mGRPC = rb_define_module("GRPC"); grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core"); - grpc_rb_sNewServerRpc = - rb_struct_define("NewServerRpc", "method", "host", - "deadline", "metadata", "call", NULL); + grpc_rb_sNewServerRpc = rb_struct_define( + "NewServerRpc", "method", "host", "deadline", "metadata", "call", NULL); grpc_rb_sStatus = rb_struct_define("Status", "code", "details", "metadata", NULL); sym_code = ID2SYM(rb_intern("code")); diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h index 4bf11e75df..8538a74211 100644 --- a/src/ruby/ext/grpc/rb_grpc.h +++ b/src/ruby/ext/grpc/rb_grpc.h @@ -34,8 +34,8 @@ #ifndef GRPC_RB_H_ #define GRPC_RB_H_ -#include <sys/time.h> #include <ruby/ruby.h> +#include <sys/time.h> #include <grpc/support/time.h> @@ -68,7 +68,7 @@ extern VALUE sym_metadata; /* GRPC_RB_MEMSIZE_UNAVAILABLE is used in rb_data_type_t to indicate that the * number of bytes used by the wrapped struct is not available. */ -#define GRPC_RB_MEMSIZE_UNAVAILABLE (size_t (*)(const void*))(NULL) +#define GRPC_RB_MEMSIZE_UNAVAILABLE (size_t(*)(const void*))(NULL) /* A ruby object alloc func that fails by raising an exception. */ VALUE grpc_rb_cannot_alloc(VALUE cls); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 063f92114c..a412277f6d 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -108,9 +108,9 @@ grpc_channel_create_call_type grpc_channel_create_call_import; grpc_channel_ping_type grpc_channel_ping_import; grpc_channel_register_call_type grpc_channel_register_call_import; grpc_channel_create_registered_call_type grpc_channel_create_registered_call_import; +grpc_call_arena_alloc_type grpc_call_arena_alloc_import; grpc_call_start_batch_type grpc_call_start_batch_import; grpc_call_get_peer_type grpc_call_get_peer_import; -grpc_call_set_load_reporting_cost_context_type grpc_call_set_load_reporting_cost_context_import; grpc_census_call_set_context_type grpc_census_call_set_context_import; grpc_census_call_get_context_type grpc_census_call_get_context_import; grpc_channel_get_target_type grpc_channel_get_target_import; @@ -120,13 +120,13 @@ grpc_lame_client_channel_create_type grpc_lame_client_channel_create_import; grpc_channel_destroy_type grpc_channel_destroy_import; grpc_call_cancel_type grpc_call_cancel_import; grpc_call_cancel_with_status_type grpc_call_cancel_with_status_import; -grpc_call_destroy_type grpc_call_destroy_import; +grpc_call_ref_type grpc_call_ref_import; +grpc_call_unref_type grpc_call_unref_import; grpc_server_request_call_type grpc_server_request_call_import; grpc_server_register_method_type grpc_server_register_method_import; grpc_server_request_registered_call_type grpc_server_request_registered_call_import; grpc_server_create_type grpc_server_create_import; grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import; -grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import; grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import; grpc_server_start_type grpc_server_start_import; grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import; @@ -405,9 +405,9 @@ void grpc_rb_load_imports(HMODULE library) { grpc_channel_ping_import = (grpc_channel_ping_type) GetProcAddress(library, "grpc_channel_ping"); grpc_channel_register_call_import = (grpc_channel_register_call_type) GetProcAddress(library, "grpc_channel_register_call"); grpc_channel_create_registered_call_import = (grpc_channel_create_registered_call_type) GetProcAddress(library, "grpc_channel_create_registered_call"); + grpc_call_arena_alloc_import = (grpc_call_arena_alloc_type) GetProcAddress(library, "grpc_call_arena_alloc"); grpc_call_start_batch_import = (grpc_call_start_batch_type) GetProcAddress(library, "grpc_call_start_batch"); grpc_call_get_peer_import = (grpc_call_get_peer_type) GetProcAddress(library, "grpc_call_get_peer"); - grpc_call_set_load_reporting_cost_context_import = (grpc_call_set_load_reporting_cost_context_type) GetProcAddress(library, "grpc_call_set_load_reporting_cost_context"); grpc_census_call_set_context_import = (grpc_census_call_set_context_type) GetProcAddress(library, "grpc_census_call_set_context"); grpc_census_call_get_context_import = (grpc_census_call_get_context_type) GetProcAddress(library, "grpc_census_call_get_context"); grpc_channel_get_target_import = (grpc_channel_get_target_type) GetProcAddress(library, "grpc_channel_get_target"); @@ -417,13 +417,13 @@ void grpc_rb_load_imports(HMODULE library) { grpc_channel_destroy_import = (grpc_channel_destroy_type) GetProcAddress(library, "grpc_channel_destroy"); grpc_call_cancel_import = (grpc_call_cancel_type) GetProcAddress(library, "grpc_call_cancel"); grpc_call_cancel_with_status_import = (grpc_call_cancel_with_status_type) GetProcAddress(library, "grpc_call_cancel_with_status"); - grpc_call_destroy_import = (grpc_call_destroy_type) GetProcAddress(library, "grpc_call_destroy"); + grpc_call_ref_import = (grpc_call_ref_type) GetProcAddress(library, "grpc_call_ref"); + grpc_call_unref_import = (grpc_call_unref_type) GetProcAddress(library, "grpc_call_unref"); grpc_server_request_call_import = (grpc_server_request_call_type) GetProcAddress(library, "grpc_server_request_call"); grpc_server_register_method_import = (grpc_server_register_method_type) GetProcAddress(library, "grpc_server_register_method"); grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call"); grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create"); grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue"); - grpc_server_register_non_listening_completion_queue_import = (grpc_server_register_non_listening_completion_queue_type) GetProcAddress(library, "grpc_server_register_non_listening_completion_queue"); grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port"); grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start"); grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index f5dcd68a8e..7df8dd69fc 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -233,7 +233,7 @@ extern grpc_completion_queue_create_for_next_type grpc_completion_queue_create_f typedef grpc_completion_queue *(*grpc_completion_queue_create_for_pluck_type)(void *reserved); extern grpc_completion_queue_create_for_pluck_type grpc_completion_queue_create_for_pluck_import; #define grpc_completion_queue_create_for_pluck grpc_completion_queue_create_for_pluck_import -typedef grpc_completion_queue *(*grpc_completion_queue_create_type)(void *reserved); +typedef grpc_completion_queue *(*grpc_completion_queue_create_type)(const grpc_completion_queue_factory *factory, const grpc_completion_queue_attributes *attributes, void *reserved); extern grpc_completion_queue_create_type grpc_completion_queue_create_import; #define grpc_completion_queue_create grpc_completion_queue_create_import typedef grpc_event(*grpc_completion_queue_next_type)(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved); @@ -275,15 +275,15 @@ extern grpc_channel_register_call_type grpc_channel_register_call_import; typedef grpc_call *(*grpc_channel_create_registered_call_type)(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, void *registered_call_handle, gpr_timespec deadline, void *reserved); extern grpc_channel_create_registered_call_type grpc_channel_create_registered_call_import; #define grpc_channel_create_registered_call grpc_channel_create_registered_call_import +typedef void *(*grpc_call_arena_alloc_type)(grpc_call *call, size_t size); +extern grpc_call_arena_alloc_type grpc_call_arena_alloc_import; +#define grpc_call_arena_alloc grpc_call_arena_alloc_import typedef grpc_call_error(*grpc_call_start_batch_type)(grpc_call *call, const grpc_op *ops, size_t nops, void *tag, void *reserved); extern grpc_call_start_batch_type grpc_call_start_batch_import; #define grpc_call_start_batch grpc_call_start_batch_import typedef char *(*grpc_call_get_peer_type)(grpc_call *call); extern grpc_call_get_peer_type grpc_call_get_peer_import; #define grpc_call_get_peer grpc_call_get_peer_import -typedef void(*grpc_call_set_load_reporting_cost_context_type)(grpc_call *call, struct grpc_load_reporting_cost_context *context); -extern grpc_call_set_load_reporting_cost_context_type grpc_call_set_load_reporting_cost_context_import; -#define grpc_call_set_load_reporting_cost_context grpc_call_set_load_reporting_cost_context_import typedef void(*grpc_census_call_set_context_type)(grpc_call *call, struct census_context *context); extern grpc_census_call_set_context_type grpc_census_call_set_context_import; #define grpc_census_call_set_context grpc_census_call_set_context_import @@ -311,9 +311,12 @@ extern grpc_call_cancel_type grpc_call_cancel_import; typedef grpc_call_error(*grpc_call_cancel_with_status_type)(grpc_call *call, grpc_status_code status, const char *description, void *reserved); extern grpc_call_cancel_with_status_type grpc_call_cancel_with_status_import; #define grpc_call_cancel_with_status grpc_call_cancel_with_status_import -typedef void(*grpc_call_destroy_type)(grpc_call *call); -extern grpc_call_destroy_type grpc_call_destroy_import; -#define grpc_call_destroy grpc_call_destroy_import +typedef void(*grpc_call_ref_type)(grpc_call *call); +extern grpc_call_ref_type grpc_call_ref_import; +#define grpc_call_ref grpc_call_ref_import +typedef void(*grpc_call_unref_type)(grpc_call *call); +extern grpc_call_unref_type grpc_call_unref_import; +#define grpc_call_unref grpc_call_unref_import typedef grpc_call_error(*grpc_server_request_call_type)(grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag_new); extern grpc_server_request_call_type grpc_server_request_call_import; #define grpc_server_request_call grpc_server_request_call_import @@ -329,9 +332,6 @@ extern grpc_server_create_type grpc_server_create_import; typedef void(*grpc_server_register_completion_queue_type)(grpc_server *server, grpc_completion_queue *cq, void *reserved); extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import; #define grpc_server_register_completion_queue grpc_server_register_completion_queue_import -typedef void(*grpc_server_register_non_listening_completion_queue_type)(grpc_server *server, grpc_completion_queue *q, void *reserved); -extern grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import; -#define grpc_server_register_non_listening_completion_queue grpc_server_register_non_listening_completion_queue_import typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server *server, const char *addr); extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import; #define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 2286a99f24..d7408f683d 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -37,15 +37,15 @@ #include "rb_server.h" #include <grpc/grpc.h> -#include <grpc/support/atm.h> #include <grpc/grpc_security.h> +#include <grpc/support/atm.h> #include <grpc/support/log.h> +#include "rb_byte_buffer.h" #include "rb_call.h" #include "rb_channel_args.h" #include "rb_completion_queue.h" -#include "rb_server_credentials.h" -#include "rb_byte_buffer.h" #include "rb_grpc.h" +#include "rb_server_credentials.h" /* grpc_rb_cServer is the ruby class that proxies grpc_server. */ static VALUE grpc_rb_cServer = Qnil; @@ -93,9 +93,8 @@ static void grpc_rb_server_free(void *p) { }; svr = (grpc_rb_server *)p; - deadline = gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(2, GPR_TIMESPAN)); + deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(2, GPR_TIMESPAN)); destroy_server(svr, deadline); @@ -104,13 +103,15 @@ static void grpc_rb_server_free(void *p) { static const rb_data_type_t grpc_rb_server_data_type = { "grpc_server", - {GRPC_RB_GC_NOT_MARKED, grpc_rb_server_free, GRPC_RB_MEMSIZE_UNAVAILABLE, + {GRPC_RB_GC_NOT_MARKED, + grpc_rb_server_free, + GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}}, NULL, NULL, #ifdef RUBY_TYPED_FREE_IMMEDIATELY - /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free function would block - * and we might want to unlock GVL + /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free + * function would block and we might want to unlock GVL * TODO(yugui) Unlock GVL? */ 0, @@ -139,7 +140,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) { grpc_ruby_once_init(); - cq = grpc_completion_queue_create(NULL); + cq = grpc_completion_queue_create_for_pluck(NULL); TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, wrapper); grpc_rb_hash_convert_to_channel_args(channel_args, &args); @@ -167,7 +168,7 @@ typedef struct request_call_stack { /* grpc_request_call_stack_init ensures the request_call_stack is properly * initialized */ -static void grpc_request_call_stack_init(request_call_stack* st) { +static void grpc_request_call_stack_init(request_call_stack *st) { MEMZERO(st, request_call_stack, 1); grpc_metadata_array_init(&st->md_ary); grpc_call_details_init(&st->details); @@ -175,7 +176,7 @@ static void grpc_request_call_stack_init(request_call_stack* st) { /* grpc_request_call_stack_cleanup ensures the request_call_stack is properly * cleaned up */ -static void grpc_request_call_stack_cleanup(request_call_stack* st) { +static void grpc_request_call_stack_cleanup(request_call_stack *st) { grpc_metadata_array_destroy(&st->md_ary); grpc_call_details_destroy(&st->details); } @@ -191,8 +192,9 @@ static VALUE grpc_rb_server_request_call(VALUE self) { grpc_call_error err; request_call_stack st; VALUE result; - void *tag = (void*)&st; - grpc_completion_queue *call_queue = grpc_completion_queue_create(NULL); + void *tag = (void *)&st; + grpc_completion_queue *call_queue = + grpc_completion_queue_create_for_pluck(NULL); gpr_timespec deadline; TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); @@ -203,9 +205,8 @@ static VALUE grpc_rb_server_request_call(VALUE self) { grpc_request_call_stack_init(&st); /* call grpc_server_request_call, then wait for it to complete using * pluck_event */ - err = grpc_server_request_call( - s->wrapped, &call, &st.details, &st.md_ary, - call_queue, s->queue, tag); + err = grpc_server_request_call(s->wrapped, &call, &st.details, &st.md_ary, + call_queue, s->queue, tag); if (err != GRPC_CALL_OK) { grpc_request_call_stack_cleanup(&st); rb_raise(grpc_rb_eCallError, @@ -301,8 +302,7 @@ static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port, return Qnil; } else if (TYPE(rb_creds) == T_SYMBOL) { if (id_insecure_server != SYM2ID(rb_creds)) { - rb_raise(rb_eTypeError, - "bad creds symbol, want :this_port_is_insecure"); + rb_raise(rb_eTypeError, "bad creds symbol, want :this_port_is_insecure"); return Qnil; } recvd_port = @@ -314,9 +314,8 @@ static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port, } } else { creds = grpc_rb_get_wrapped_server_credentials(rb_creds); - recvd_port = - grpc_server_add_secure_http2_port(s->wrapped, StringValueCStr(port), - creds); + recvd_port = grpc_server_add_secure_http2_port( + s->wrapped, StringValueCStr(port), creds); if (recvd_port == 0) { rb_raise(rb_eRuntimeError, "could not add secure port %s to server, not sure why", @@ -335,18 +334,17 @@ void Init_grpc_server() { /* Provides a ruby constructor and support for dup/clone. */ rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 1); - rb_define_method(grpc_rb_cServer, "initialize_copy", - grpc_rb_cannot_init_copy, 1); + rb_define_method(grpc_rb_cServer, "initialize_copy", grpc_rb_cannot_init_copy, + 1); /* Add the server methods. */ - rb_define_method(grpc_rb_cServer, "request_call", - grpc_rb_server_request_call, 0); + rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call, + 0); rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0); rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1); rb_define_alias(grpc_rb_cServer, "close", "destroy"); rb_define_method(grpc_rb_cServer, "add_http2_port", - grpc_rb_server_add_http2_port, - 2); + grpc_rb_server_add_http2_port, 2); id_at = rb_intern("at"); id_insecure_server = rb_intern("this_port_is_insecure"); } |