diff options
author | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2016-12-29 01:38:26 +0100 |
---|---|---|
committer | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2017-01-04 22:10:14 +0100 |
commit | 203860ac23b6229096c0e9d9c04f12cd607c1938 (patch) | |
tree | 43cdba091ef2ea2421a0633b4241e40135ec551a /src/ruby | |
parent | 19014deb841f25d14827147d4812b1ddcce2b693 (diff) | |
parent | ddebfa65f2bdff332902adf73606bc050014b498 (diff) |
Merge remote-tracking branch 'google/master' into bazel-take-2
Diffstat (limited to 'src/ruby')
25 files changed, 1083 insertions, 374 deletions
diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c index 61b7c30315..47fd6d9120 100644 --- a/src/ruby/ext/grpc/rb_byte_buffer.c +++ b/src/ruby/ext/grpc/rb_byte_buffer.c @@ -38,20 +38,20 @@ #include <grpc/grpc.h> #include <grpc/byte_buffer_reader.h> -#include <grpc/support/slice.h> +#include <grpc/slice.h> #include "rb_grpc.h" grpc_byte_buffer* grpc_rb_s_to_byte_buffer(char *string, size_t length) { - gpr_slice slice = gpr_slice_from_copied_buffer(string, length); + grpc_slice slice = grpc_slice_from_copied_buffer(string, length); grpc_byte_buffer *buffer = grpc_raw_byte_buffer_create(&slice, 1); - gpr_slice_unref(slice); + grpc_slice_unref(slice); return buffer; } VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) { VALUE rb_string; grpc_byte_buffer_reader reader; - gpr_slice next; + grpc_slice next; if (buffer == NULL) { return Qnil; } @@ -61,9 +61,10 @@ VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) { return Qnil; } while (grpc_byte_buffer_reader_next(&reader, &next) != 0) { - rb_str_cat(rb_string, (const char *) GPR_SLICE_START_PTR(next), - GPR_SLICE_LENGTH(next)); - gpr_slice_unref(next); + rb_str_cat(rb_string, (const char *) GRPC_SLICE_START_PTR(next), + GRPC_SLICE_LENGTH(next)); + grpc_slice_unref(next); } + grpc_byte_buffer_reader_destroy(&reader); return rb_string; } diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c index 9b6675da84..280f21c973 100644 --- a/src/ruby/ext/grpc/rb_call_credentials.c +++ b/src/ruby/ext/grpc/rb_call_credentials.c @@ -86,19 +86,16 @@ static VALUE grpc_rb_call_credentials_callback_rescue(VALUE args, rb_funcall(exception_object, rb_intern("backtrace"), 0), rb_intern("join"), 1, rb_str_new2("\n\tfrom ")); - VALUE rb_exception_info = rb_funcall(exception_object, rb_intern("to_s"), 0); - const char *exception_classname = rb_obj_classname(exception_object); + VALUE rb_exception_info = rb_funcall(exception_object, rb_intern("inspect"), 0); (void)args; - gpr_log(GPR_INFO, "Call credentials callback failed: %s: %s\n%s", - exception_classname, StringValueCStr(rb_exception_info), + gpr_log(GPR_INFO, "Call credentials callback failed: %s\n%s", + StringValueCStr(rb_exception_info), StringValueCStr(backtrace)); rb_hash_aset(result, rb_str_new2("metadata"), Qnil); - /* Currently only gives the exception class name. It should be possible get - more details */ rb_hash_aset(result, rb_str_new2("status"), - INT2NUM(GRPC_STATUS_PERMISSION_DENIED)); + INT2NUM(GRPC_STATUS_UNAUTHENTICATED)); rb_hash_aset(result, rb_str_new2("details"), - rb_str_new2(exception_classname)); + rb_exception_info); return result; } diff --git a/src/ruby/ext/grpc/rb_compression_options.c b/src/ruby/ext/grpc/rb_compression_options.c index c5668fdab4..6200dbafeb 100644 --- a/src/ruby/ext/grpc/rb_compression_options.c +++ b/src/ruby/ext/grpc/rb_compression_options.c @@ -283,6 +283,8 @@ VALUE grpc_rb_compression_options_level_value_to_name_internal( rb_eArgError, "Failed to convert compression level value to name for value: %d", (int)compression_value); + /* return something to avoid compiler error about no return */ + return Qnil; } } diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index a6cad0db1a..6c36df9113 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -110,6 +110,7 @@ grpc_call_get_peer_type grpc_call_get_peer_import; grpc_census_call_set_context_type grpc_census_call_set_context_import; grpc_census_call_get_context_type grpc_census_call_get_context_import; grpc_channel_get_target_type grpc_channel_get_target_import; +grpc_channel_get_info_type grpc_channel_get_info_import; grpc_insecure_channel_create_type grpc_insecure_channel_create_import; grpc_lame_client_channel_create_type grpc_lame_client_channel_create_import; grpc_channel_destroy_type grpc_channel_destroy_import; @@ -132,6 +133,11 @@ grpc_header_key_is_legal_type grpc_header_key_is_legal_import; grpc_header_nonbin_value_is_legal_type grpc_header_nonbin_value_is_legal_import; grpc_is_binary_header_type grpc_is_binary_header_import; grpc_call_error_to_string_type grpc_call_error_to_string_import; +grpc_resource_quota_create_type grpc_resource_quota_create_import; +grpc_resource_quota_ref_type grpc_resource_quota_ref_import; +grpc_resource_quota_unref_type grpc_resource_quota_unref_import; +grpc_resource_quota_resize_type grpc_resource_quota_resize_import; +grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import; grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import; grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import; grpc_use_signal_type grpc_use_signal_import; @@ -167,6 +173,36 @@ grpc_ssl_server_credentials_create_ex_type grpc_ssl_server_credentials_create_ex grpc_server_add_secure_http2_port_type grpc_server_add_secure_http2_port_import; grpc_call_set_credentials_type grpc_call_set_credentials_import; grpc_server_credentials_set_auth_metadata_processor_type grpc_server_credentials_set_auth_metadata_processor_import; +grpc_slice_ref_type grpc_slice_ref_import; +grpc_slice_unref_type grpc_slice_unref_import; +grpc_slice_new_type grpc_slice_new_import; +grpc_slice_new_with_user_data_type grpc_slice_new_with_user_data_import; +grpc_slice_new_with_len_type grpc_slice_new_with_len_import; +grpc_slice_malloc_type grpc_slice_malloc_import; +grpc_slice_from_copied_string_type grpc_slice_from_copied_string_import; +grpc_slice_from_copied_buffer_type grpc_slice_from_copied_buffer_import; +grpc_slice_from_static_string_type grpc_slice_from_static_string_import; +grpc_slice_sub_type grpc_slice_sub_import; +grpc_slice_sub_no_ref_type grpc_slice_sub_no_ref_import; +grpc_slice_split_tail_type grpc_slice_split_tail_import; +grpc_slice_split_head_type grpc_slice_split_head_import; +gpr_empty_slice_type gpr_empty_slice_import; +grpc_slice_cmp_type grpc_slice_cmp_import; +grpc_slice_str_cmp_type grpc_slice_str_cmp_import; +grpc_slice_is_equivalent_type grpc_slice_is_equivalent_import; +grpc_slice_buffer_init_type grpc_slice_buffer_init_import; +grpc_slice_buffer_destroy_type grpc_slice_buffer_destroy_import; +grpc_slice_buffer_add_type grpc_slice_buffer_add_import; +grpc_slice_buffer_add_indexed_type grpc_slice_buffer_add_indexed_import; +grpc_slice_buffer_addn_type grpc_slice_buffer_addn_import; +grpc_slice_buffer_tiny_add_type grpc_slice_buffer_tiny_add_import; +grpc_slice_buffer_pop_type grpc_slice_buffer_pop_import; +grpc_slice_buffer_reset_and_unref_type grpc_slice_buffer_reset_and_unref_import; +grpc_slice_buffer_swap_type grpc_slice_buffer_swap_import; +grpc_slice_buffer_move_into_type grpc_slice_buffer_move_into_import; +grpc_slice_buffer_trim_end_type grpc_slice_buffer_trim_end_import; +grpc_slice_buffer_move_first_type grpc_slice_buffer_move_first_import; +grpc_slice_buffer_take_first_type grpc_slice_buffer_take_first_import; gpr_malloc_type gpr_malloc_import; gpr_free_type gpr_free_import; gpr_realloc_type gpr_realloc_import; @@ -216,35 +252,6 @@ gpr_set_log_verbosity_type gpr_set_log_verbosity_import; gpr_log_verbosity_init_type gpr_log_verbosity_init_import; gpr_set_log_function_type gpr_set_log_function_import; gpr_format_message_type gpr_format_message_import; -gpr_slice_ref_type gpr_slice_ref_import; -gpr_slice_unref_type gpr_slice_unref_import; -gpr_slice_new_type gpr_slice_new_import; -gpr_slice_new_with_user_data_type gpr_slice_new_with_user_data_import; -gpr_slice_new_with_len_type gpr_slice_new_with_len_import; -gpr_slice_malloc_type gpr_slice_malloc_import; -gpr_slice_from_copied_string_type gpr_slice_from_copied_string_import; -gpr_slice_from_copied_buffer_type gpr_slice_from_copied_buffer_import; -gpr_slice_from_static_string_type gpr_slice_from_static_string_import; -gpr_slice_sub_type gpr_slice_sub_import; -gpr_slice_sub_no_ref_type gpr_slice_sub_no_ref_import; -gpr_slice_split_tail_type gpr_slice_split_tail_import; -gpr_slice_split_head_type gpr_slice_split_head_import; -gpr_empty_slice_type gpr_empty_slice_import; -gpr_slice_cmp_type gpr_slice_cmp_import; -gpr_slice_str_cmp_type gpr_slice_str_cmp_import; -gpr_slice_buffer_init_type gpr_slice_buffer_init_import; -gpr_slice_buffer_destroy_type gpr_slice_buffer_destroy_import; -gpr_slice_buffer_add_type gpr_slice_buffer_add_import; -gpr_slice_buffer_add_indexed_type gpr_slice_buffer_add_indexed_import; -gpr_slice_buffer_addn_type gpr_slice_buffer_addn_import; -gpr_slice_buffer_tiny_add_type gpr_slice_buffer_tiny_add_import; -gpr_slice_buffer_pop_type gpr_slice_buffer_pop_import; -gpr_slice_buffer_reset_and_unref_type gpr_slice_buffer_reset_and_unref_import; -gpr_slice_buffer_swap_type gpr_slice_buffer_swap_import; -gpr_slice_buffer_move_into_type gpr_slice_buffer_move_into_import; -gpr_slice_buffer_trim_end_type gpr_slice_buffer_trim_end_import; -gpr_slice_buffer_move_first_type gpr_slice_buffer_move_first_import; -gpr_slice_buffer_take_first_type gpr_slice_buffer_take_first_import; gpr_strdup_type gpr_strdup_import; gpr_asprintf_type gpr_asprintf_import; gpr_subprocess_binary_extension_type gpr_subprocess_binary_extension_import; @@ -379,6 +386,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_census_call_set_context_import = (grpc_census_call_set_context_type) GetProcAddress(library, "grpc_census_call_set_context"); grpc_census_call_get_context_import = (grpc_census_call_get_context_type) GetProcAddress(library, "grpc_census_call_get_context"); grpc_channel_get_target_import = (grpc_channel_get_target_type) GetProcAddress(library, "grpc_channel_get_target"); + grpc_channel_get_info_import = (grpc_channel_get_info_type) GetProcAddress(library, "grpc_channel_get_info"); grpc_insecure_channel_create_import = (grpc_insecure_channel_create_type) GetProcAddress(library, "grpc_insecure_channel_create"); grpc_lame_client_channel_create_import = (grpc_lame_client_channel_create_type) GetProcAddress(library, "grpc_lame_client_channel_create"); grpc_channel_destroy_import = (grpc_channel_destroy_type) GetProcAddress(library, "grpc_channel_destroy"); @@ -401,6 +409,11 @@ void grpc_rb_load_imports(HMODULE library) { grpc_header_nonbin_value_is_legal_import = (grpc_header_nonbin_value_is_legal_type) GetProcAddress(library, "grpc_header_nonbin_value_is_legal"); grpc_is_binary_header_import = (grpc_is_binary_header_type) GetProcAddress(library, "grpc_is_binary_header"); grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string"); + grpc_resource_quota_create_import = (grpc_resource_quota_create_type) GetProcAddress(library, "grpc_resource_quota_create"); + grpc_resource_quota_ref_import = (grpc_resource_quota_ref_type) GetProcAddress(library, "grpc_resource_quota_ref"); + grpc_resource_quota_unref_import = (grpc_resource_quota_unref_type) GetProcAddress(library, "grpc_resource_quota_unref"); + grpc_resource_quota_resize_import = (grpc_resource_quota_resize_type) GetProcAddress(library, "grpc_resource_quota_resize"); + grpc_resource_quota_arg_vtable_import = (grpc_resource_quota_arg_vtable_type) GetProcAddress(library, "grpc_resource_quota_arg_vtable"); grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd"); grpc_server_add_insecure_channel_from_fd_import = (grpc_server_add_insecure_channel_from_fd_type) GetProcAddress(library, "grpc_server_add_insecure_channel_from_fd"); grpc_use_signal_import = (grpc_use_signal_type) GetProcAddress(library, "grpc_use_signal"); @@ -436,6 +449,36 @@ void grpc_rb_load_imports(HMODULE library) { grpc_server_add_secure_http2_port_import = (grpc_server_add_secure_http2_port_type) GetProcAddress(library, "grpc_server_add_secure_http2_port"); grpc_call_set_credentials_import = (grpc_call_set_credentials_type) GetProcAddress(library, "grpc_call_set_credentials"); grpc_server_credentials_set_auth_metadata_processor_import = (grpc_server_credentials_set_auth_metadata_processor_type) GetProcAddress(library, "grpc_server_credentials_set_auth_metadata_processor"); + grpc_slice_ref_import = (grpc_slice_ref_type) GetProcAddress(library, "grpc_slice_ref"); + grpc_slice_unref_import = (grpc_slice_unref_type) GetProcAddress(library, "grpc_slice_unref"); + grpc_slice_new_import = (grpc_slice_new_type) GetProcAddress(library, "grpc_slice_new"); + grpc_slice_new_with_user_data_import = (grpc_slice_new_with_user_data_type) GetProcAddress(library, "grpc_slice_new_with_user_data"); + grpc_slice_new_with_len_import = (grpc_slice_new_with_len_type) GetProcAddress(library, "grpc_slice_new_with_len"); + grpc_slice_malloc_import = (grpc_slice_malloc_type) GetProcAddress(library, "grpc_slice_malloc"); + grpc_slice_from_copied_string_import = (grpc_slice_from_copied_string_type) GetProcAddress(library, "grpc_slice_from_copied_string"); + grpc_slice_from_copied_buffer_import = (grpc_slice_from_copied_buffer_type) GetProcAddress(library, "grpc_slice_from_copied_buffer"); + grpc_slice_from_static_string_import = (grpc_slice_from_static_string_type) GetProcAddress(library, "grpc_slice_from_static_string"); + grpc_slice_sub_import = (grpc_slice_sub_type) GetProcAddress(library, "grpc_slice_sub"); + grpc_slice_sub_no_ref_import = (grpc_slice_sub_no_ref_type) GetProcAddress(library, "grpc_slice_sub_no_ref"); + grpc_slice_split_tail_import = (grpc_slice_split_tail_type) GetProcAddress(library, "grpc_slice_split_tail"); + grpc_slice_split_head_import = (grpc_slice_split_head_type) GetProcAddress(library, "grpc_slice_split_head"); + gpr_empty_slice_import = (gpr_empty_slice_type) GetProcAddress(library, "gpr_empty_slice"); + grpc_slice_cmp_import = (grpc_slice_cmp_type) GetProcAddress(library, "grpc_slice_cmp"); + grpc_slice_str_cmp_import = (grpc_slice_str_cmp_type) GetProcAddress(library, "grpc_slice_str_cmp"); + grpc_slice_is_equivalent_import = (grpc_slice_is_equivalent_type) GetProcAddress(library, "grpc_slice_is_equivalent"); + grpc_slice_buffer_init_import = (grpc_slice_buffer_init_type) GetProcAddress(library, "grpc_slice_buffer_init"); + grpc_slice_buffer_destroy_import = (grpc_slice_buffer_destroy_type) GetProcAddress(library, "grpc_slice_buffer_destroy"); + grpc_slice_buffer_add_import = (grpc_slice_buffer_add_type) GetProcAddress(library, "grpc_slice_buffer_add"); + grpc_slice_buffer_add_indexed_import = (grpc_slice_buffer_add_indexed_type) GetProcAddress(library, "grpc_slice_buffer_add_indexed"); + grpc_slice_buffer_addn_import = (grpc_slice_buffer_addn_type) GetProcAddress(library, "grpc_slice_buffer_addn"); + grpc_slice_buffer_tiny_add_import = (grpc_slice_buffer_tiny_add_type) GetProcAddress(library, "grpc_slice_buffer_tiny_add"); + grpc_slice_buffer_pop_import = (grpc_slice_buffer_pop_type) GetProcAddress(library, "grpc_slice_buffer_pop"); + grpc_slice_buffer_reset_and_unref_import = (grpc_slice_buffer_reset_and_unref_type) GetProcAddress(library, "grpc_slice_buffer_reset_and_unref"); + grpc_slice_buffer_swap_import = (grpc_slice_buffer_swap_type) GetProcAddress(library, "grpc_slice_buffer_swap"); + grpc_slice_buffer_move_into_import = (grpc_slice_buffer_move_into_type) GetProcAddress(library, "grpc_slice_buffer_move_into"); + grpc_slice_buffer_trim_end_import = (grpc_slice_buffer_trim_end_type) GetProcAddress(library, "grpc_slice_buffer_trim_end"); + grpc_slice_buffer_move_first_import = (grpc_slice_buffer_move_first_type) GetProcAddress(library, "grpc_slice_buffer_move_first"); + grpc_slice_buffer_take_first_import = (grpc_slice_buffer_take_first_type) GetProcAddress(library, "grpc_slice_buffer_take_first"); gpr_malloc_import = (gpr_malloc_type) GetProcAddress(library, "gpr_malloc"); gpr_free_import = (gpr_free_type) GetProcAddress(library, "gpr_free"); gpr_realloc_import = (gpr_realloc_type) GetProcAddress(library, "gpr_realloc"); @@ -485,35 +528,6 @@ void grpc_rb_load_imports(HMODULE library) { gpr_log_verbosity_init_import = (gpr_log_verbosity_init_type) GetProcAddress(library, "gpr_log_verbosity_init"); gpr_set_log_function_import = (gpr_set_log_function_type) GetProcAddress(library, "gpr_set_log_function"); gpr_format_message_import = (gpr_format_message_type) GetProcAddress(library, "gpr_format_message"); - gpr_slice_ref_import = (gpr_slice_ref_type) GetProcAddress(library, "gpr_slice_ref"); - gpr_slice_unref_import = (gpr_slice_unref_type) GetProcAddress(library, "gpr_slice_unref"); - gpr_slice_new_import = (gpr_slice_new_type) GetProcAddress(library, "gpr_slice_new"); - gpr_slice_new_with_user_data_import = (gpr_slice_new_with_user_data_type) GetProcAddress(library, "gpr_slice_new_with_user_data"); - gpr_slice_new_with_len_import = (gpr_slice_new_with_len_type) GetProcAddress(library, "gpr_slice_new_with_len"); - gpr_slice_malloc_import = (gpr_slice_malloc_type) GetProcAddress(library, "gpr_slice_malloc"); - gpr_slice_from_copied_string_import = (gpr_slice_from_copied_string_type) GetProcAddress(library, "gpr_slice_from_copied_string"); - gpr_slice_from_copied_buffer_import = (gpr_slice_from_copied_buffer_type) GetProcAddress(library, "gpr_slice_from_copied_buffer"); - gpr_slice_from_static_string_import = (gpr_slice_from_static_string_type) GetProcAddress(library, "gpr_slice_from_static_string"); - gpr_slice_sub_import = (gpr_slice_sub_type) GetProcAddress(library, "gpr_slice_sub"); - gpr_slice_sub_no_ref_import = (gpr_slice_sub_no_ref_type) GetProcAddress(library, "gpr_slice_sub_no_ref"); - gpr_slice_split_tail_import = (gpr_slice_split_tail_type) GetProcAddress(library, "gpr_slice_split_tail"); - gpr_slice_split_head_import = (gpr_slice_split_head_type) GetProcAddress(library, "gpr_slice_split_head"); - gpr_empty_slice_import = (gpr_empty_slice_type) GetProcAddress(library, "gpr_empty_slice"); - gpr_slice_cmp_import = (gpr_slice_cmp_type) GetProcAddress(library, "gpr_slice_cmp"); - gpr_slice_str_cmp_import = (gpr_slice_str_cmp_type) GetProcAddress(library, "gpr_slice_str_cmp"); - gpr_slice_buffer_init_import = (gpr_slice_buffer_init_type) GetProcAddress(library, "gpr_slice_buffer_init"); - gpr_slice_buffer_destroy_import = (gpr_slice_buffer_destroy_type) GetProcAddress(library, "gpr_slice_buffer_destroy"); - gpr_slice_buffer_add_import = (gpr_slice_buffer_add_type) GetProcAddress(library, "gpr_slice_buffer_add"); - gpr_slice_buffer_add_indexed_import = (gpr_slice_buffer_add_indexed_type) GetProcAddress(library, "gpr_slice_buffer_add_indexed"); - gpr_slice_buffer_addn_import = (gpr_slice_buffer_addn_type) GetProcAddress(library, "gpr_slice_buffer_addn"); - gpr_slice_buffer_tiny_add_import = (gpr_slice_buffer_tiny_add_type) GetProcAddress(library, "gpr_slice_buffer_tiny_add"); - gpr_slice_buffer_pop_import = (gpr_slice_buffer_pop_type) GetProcAddress(library, "gpr_slice_buffer_pop"); - gpr_slice_buffer_reset_and_unref_import = (gpr_slice_buffer_reset_and_unref_type) GetProcAddress(library, "gpr_slice_buffer_reset_and_unref"); - gpr_slice_buffer_swap_import = (gpr_slice_buffer_swap_type) GetProcAddress(library, "gpr_slice_buffer_swap"); - gpr_slice_buffer_move_into_import = (gpr_slice_buffer_move_into_type) GetProcAddress(library, "gpr_slice_buffer_move_into"); - gpr_slice_buffer_trim_end_import = (gpr_slice_buffer_trim_end_type) GetProcAddress(library, "gpr_slice_buffer_trim_end"); - gpr_slice_buffer_move_first_import = (gpr_slice_buffer_move_first_type) GetProcAddress(library, "gpr_slice_buffer_move_first"); - gpr_slice_buffer_take_first_import = (gpr_slice_buffer_take_first_type) GetProcAddress(library, "gpr_slice_buffer_take_first"); gpr_strdup_import = (gpr_strdup_type) GetProcAddress(library, "gpr_strdup"); gpr_asprintf_import = (gpr_asprintf_type) GetProcAddress(library, "gpr_asprintf"); gpr_subprocess_binary_extension_import = (gpr_subprocess_binary_extension_type) GetProcAddress(library, "gpr_subprocess_binary_extension"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 00a67b0b2c..5745686adf 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -46,6 +46,8 @@ #include <grpc/grpc.h> #include <grpc/grpc_posix.h> #include <grpc/grpc_security.h> +#include <grpc/slice.h> +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/avl.h> #include <grpc/support/cmdline.h> @@ -54,18 +56,16 @@ #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/log_windows.h> -#include <grpc/support/slice.h> -#include <grpc/support/slice_buffer.h> #include <grpc/support/string_util.h> #include <grpc/support/subprocess.h> #include <grpc/support/sync.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> -typedef grpc_byte_buffer *(*grpc_raw_byte_buffer_create_type)(gpr_slice *slices, size_t nslices); +typedef grpc_byte_buffer *(*grpc_raw_byte_buffer_create_type)(grpc_slice *slices, size_t nslices); extern grpc_raw_byte_buffer_create_type grpc_raw_byte_buffer_create_import; #define grpc_raw_byte_buffer_create grpc_raw_byte_buffer_create_import -typedef grpc_byte_buffer *(*grpc_raw_compressed_byte_buffer_create_type)(gpr_slice *slices, size_t nslices, grpc_compression_algorithm compression); +typedef grpc_byte_buffer *(*grpc_raw_compressed_byte_buffer_create_type)(grpc_slice *slices, size_t nslices, grpc_compression_algorithm compression); extern grpc_raw_compressed_byte_buffer_create_type grpc_raw_compressed_byte_buffer_create_import; #define grpc_raw_compressed_byte_buffer_create grpc_raw_compressed_byte_buffer_create_import typedef grpc_byte_buffer *(*grpc_byte_buffer_copy_type)(grpc_byte_buffer *bb); @@ -83,10 +83,10 @@ extern grpc_byte_buffer_reader_init_type grpc_byte_buffer_reader_init_import; typedef void(*grpc_byte_buffer_reader_destroy_type)(grpc_byte_buffer_reader *reader); extern grpc_byte_buffer_reader_destroy_type grpc_byte_buffer_reader_destroy_import; #define grpc_byte_buffer_reader_destroy grpc_byte_buffer_reader_destroy_import -typedef int(*grpc_byte_buffer_reader_next_type)(grpc_byte_buffer_reader *reader, gpr_slice *slice); +typedef int(*grpc_byte_buffer_reader_next_type)(grpc_byte_buffer_reader *reader, grpc_slice *slice); extern grpc_byte_buffer_reader_next_type grpc_byte_buffer_reader_next_import; #define grpc_byte_buffer_reader_next grpc_byte_buffer_reader_next_import -typedef gpr_slice(*grpc_byte_buffer_reader_readall_type)(grpc_byte_buffer_reader *reader); +typedef grpc_slice(*grpc_byte_buffer_reader_readall_type)(grpc_byte_buffer_reader *reader); extern grpc_byte_buffer_reader_readall_type grpc_byte_buffer_reader_readall_import; #define grpc_byte_buffer_reader_readall grpc_byte_buffer_reader_readall_import typedef grpc_byte_buffer *(*grpc_raw_byte_buffer_from_reader_type)(grpc_byte_buffer_reader *reader); @@ -281,6 +281,9 @@ extern grpc_census_call_get_context_type grpc_census_call_get_context_import; typedef char *(*grpc_channel_get_target_type)(grpc_channel *channel); extern grpc_channel_get_target_type grpc_channel_get_target_import; #define grpc_channel_get_target grpc_channel_get_target_import +typedef void(*grpc_channel_get_info_type)(grpc_channel *channel, const grpc_channel_info *channel_info); +extern grpc_channel_get_info_type grpc_channel_get_info_import; +#define grpc_channel_get_info grpc_channel_get_info_import typedef grpc_channel *(*grpc_insecure_channel_create_type)(const char *target, const grpc_channel_args *args, void *reserved); extern grpc_insecure_channel_create_type grpc_insecure_channel_create_import; #define grpc_insecure_channel_create grpc_insecure_channel_create_import @@ -347,6 +350,21 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import; typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error); extern grpc_call_error_to_string_type grpc_call_error_to_string_import; #define grpc_call_error_to_string grpc_call_error_to_string_import +typedef grpc_resource_quota *(*grpc_resource_quota_create_type)(const char *trace_name); +extern grpc_resource_quota_create_type grpc_resource_quota_create_import; +#define grpc_resource_quota_create grpc_resource_quota_create_import +typedef void(*grpc_resource_quota_ref_type)(grpc_resource_quota *resource_quota); +extern grpc_resource_quota_ref_type grpc_resource_quota_ref_import; +#define grpc_resource_quota_ref grpc_resource_quota_ref_import +typedef void(*grpc_resource_quota_unref_type)(grpc_resource_quota *resource_quota); +extern grpc_resource_quota_unref_type grpc_resource_quota_unref_import; +#define grpc_resource_quota_unref grpc_resource_quota_unref_import +typedef void(*grpc_resource_quota_resize_type)(grpc_resource_quota *resource_quota, size_t new_size); +extern grpc_resource_quota_resize_type grpc_resource_quota_resize_import; +#define grpc_resource_quota_resize grpc_resource_quota_resize_import +typedef const grpc_arg_pointer_vtable *(*grpc_resource_quota_arg_vtable_type)(void); +extern grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import; +#define grpc_resource_quota_arg_vtable grpc_resource_quota_arg_vtable_import typedef grpc_channel *(*grpc_insecure_channel_create_from_fd_type)(const char *target, int fd, const grpc_channel_args *args); extern grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import; #define grpc_insecure_channel_create_from_fd grpc_insecure_channel_create_from_fd_import @@ -452,6 +470,96 @@ extern grpc_call_set_credentials_type grpc_call_set_credentials_import; typedef void(*grpc_server_credentials_set_auth_metadata_processor_type)(grpc_server_credentials *creds, grpc_auth_metadata_processor processor); extern grpc_server_credentials_set_auth_metadata_processor_type grpc_server_credentials_set_auth_metadata_processor_import; #define grpc_server_credentials_set_auth_metadata_processor grpc_server_credentials_set_auth_metadata_processor_import +typedef grpc_slice(*grpc_slice_ref_type)(grpc_slice s); +extern grpc_slice_ref_type grpc_slice_ref_import; +#define grpc_slice_ref grpc_slice_ref_import +typedef void(*grpc_slice_unref_type)(grpc_slice s); +extern grpc_slice_unref_type grpc_slice_unref_import; +#define grpc_slice_unref grpc_slice_unref_import +typedef grpc_slice(*grpc_slice_new_type)(void *p, size_t len, void (*destroy)(void *)); +extern grpc_slice_new_type grpc_slice_new_import; +#define grpc_slice_new grpc_slice_new_import +typedef grpc_slice(*grpc_slice_new_with_user_data_type)(void *p, size_t len, void (*destroy)(void *), void *user_data); +extern grpc_slice_new_with_user_data_type grpc_slice_new_with_user_data_import; +#define grpc_slice_new_with_user_data grpc_slice_new_with_user_data_import +typedef grpc_slice(*grpc_slice_new_with_len_type)(void *p, size_t len, void (*destroy)(void *, size_t)); +extern grpc_slice_new_with_len_type grpc_slice_new_with_len_import; +#define grpc_slice_new_with_len grpc_slice_new_with_len_import +typedef grpc_slice(*grpc_slice_malloc_type)(size_t length); +extern grpc_slice_malloc_type grpc_slice_malloc_import; +#define grpc_slice_malloc grpc_slice_malloc_import +typedef grpc_slice(*grpc_slice_from_copied_string_type)(const char *source); +extern grpc_slice_from_copied_string_type grpc_slice_from_copied_string_import; +#define grpc_slice_from_copied_string grpc_slice_from_copied_string_import +typedef grpc_slice(*grpc_slice_from_copied_buffer_type)(const char *source, size_t len); +extern grpc_slice_from_copied_buffer_type grpc_slice_from_copied_buffer_import; +#define grpc_slice_from_copied_buffer grpc_slice_from_copied_buffer_import +typedef grpc_slice(*grpc_slice_from_static_string_type)(const char *source); +extern grpc_slice_from_static_string_type grpc_slice_from_static_string_import; +#define grpc_slice_from_static_string grpc_slice_from_static_string_import +typedef grpc_slice(*grpc_slice_sub_type)(grpc_slice s, size_t begin, size_t end); +extern grpc_slice_sub_type grpc_slice_sub_import; +#define grpc_slice_sub grpc_slice_sub_import +typedef grpc_slice(*grpc_slice_sub_no_ref_type)(grpc_slice s, size_t begin, size_t end); +extern grpc_slice_sub_no_ref_type grpc_slice_sub_no_ref_import; +#define grpc_slice_sub_no_ref grpc_slice_sub_no_ref_import +typedef grpc_slice(*grpc_slice_split_tail_type)(grpc_slice *s, size_t split); +extern grpc_slice_split_tail_type grpc_slice_split_tail_import; +#define grpc_slice_split_tail grpc_slice_split_tail_import +typedef grpc_slice(*grpc_slice_split_head_type)(grpc_slice *s, size_t split); +extern grpc_slice_split_head_type grpc_slice_split_head_import; +#define grpc_slice_split_head grpc_slice_split_head_import +typedef grpc_slice(*gpr_empty_slice_type)(void); +extern gpr_empty_slice_type gpr_empty_slice_import; +#define gpr_empty_slice gpr_empty_slice_import +typedef int(*grpc_slice_cmp_type)(grpc_slice a, grpc_slice b); +extern grpc_slice_cmp_type grpc_slice_cmp_import; +#define grpc_slice_cmp grpc_slice_cmp_import +typedef int(*grpc_slice_str_cmp_type)(grpc_slice a, const char *b); +extern grpc_slice_str_cmp_type grpc_slice_str_cmp_import; +#define grpc_slice_str_cmp grpc_slice_str_cmp_import +typedef int(*grpc_slice_is_equivalent_type)(grpc_slice a, grpc_slice b); +extern grpc_slice_is_equivalent_type grpc_slice_is_equivalent_import; +#define grpc_slice_is_equivalent grpc_slice_is_equivalent_import +typedef void(*grpc_slice_buffer_init_type)(grpc_slice_buffer *sb); +extern grpc_slice_buffer_init_type grpc_slice_buffer_init_import; +#define grpc_slice_buffer_init grpc_slice_buffer_init_import +typedef void(*grpc_slice_buffer_destroy_type)(grpc_slice_buffer *sb); +extern grpc_slice_buffer_destroy_type grpc_slice_buffer_destroy_import; +#define grpc_slice_buffer_destroy grpc_slice_buffer_destroy_import +typedef void(*grpc_slice_buffer_add_type)(grpc_slice_buffer *sb, grpc_slice slice); +extern grpc_slice_buffer_add_type grpc_slice_buffer_add_import; +#define grpc_slice_buffer_add grpc_slice_buffer_add_import +typedef size_t(*grpc_slice_buffer_add_indexed_type)(grpc_slice_buffer *sb, grpc_slice slice); +extern grpc_slice_buffer_add_indexed_type grpc_slice_buffer_add_indexed_import; +#define grpc_slice_buffer_add_indexed grpc_slice_buffer_add_indexed_import +typedef void(*grpc_slice_buffer_addn_type)(grpc_slice_buffer *sb, grpc_slice *slices, size_t n); +extern grpc_slice_buffer_addn_type grpc_slice_buffer_addn_import; +#define grpc_slice_buffer_addn grpc_slice_buffer_addn_import +typedef uint8_t *(*grpc_slice_buffer_tiny_add_type)(grpc_slice_buffer *sb, size_t len); +extern grpc_slice_buffer_tiny_add_type grpc_slice_buffer_tiny_add_import; +#define grpc_slice_buffer_tiny_add grpc_slice_buffer_tiny_add_import +typedef void(*grpc_slice_buffer_pop_type)(grpc_slice_buffer *sb); +extern grpc_slice_buffer_pop_type grpc_slice_buffer_pop_import; +#define grpc_slice_buffer_pop grpc_slice_buffer_pop_import +typedef void(*grpc_slice_buffer_reset_and_unref_type)(grpc_slice_buffer *sb); +extern grpc_slice_buffer_reset_and_unref_type grpc_slice_buffer_reset_and_unref_import; +#define grpc_slice_buffer_reset_and_unref grpc_slice_buffer_reset_and_unref_import +typedef void(*grpc_slice_buffer_swap_type)(grpc_slice_buffer *a, grpc_slice_buffer *b); +extern grpc_slice_buffer_swap_type grpc_slice_buffer_swap_import; +#define grpc_slice_buffer_swap grpc_slice_buffer_swap_import +typedef void(*grpc_slice_buffer_move_into_type)(grpc_slice_buffer *src, grpc_slice_buffer *dst); +extern grpc_slice_buffer_move_into_type grpc_slice_buffer_move_into_import; +#define grpc_slice_buffer_move_into grpc_slice_buffer_move_into_import +typedef void(*grpc_slice_buffer_trim_end_type)(grpc_slice_buffer *src, size_t n, grpc_slice_buffer *garbage); +extern grpc_slice_buffer_trim_end_type grpc_slice_buffer_trim_end_import; +#define grpc_slice_buffer_trim_end grpc_slice_buffer_trim_end_import +typedef void(*grpc_slice_buffer_move_first_type)(grpc_slice_buffer *src, size_t n, grpc_slice_buffer *dst); +extern grpc_slice_buffer_move_first_type grpc_slice_buffer_move_first_import; +#define grpc_slice_buffer_move_first grpc_slice_buffer_move_first_import +typedef grpc_slice(*grpc_slice_buffer_take_first_type)(grpc_slice_buffer *src); +extern grpc_slice_buffer_take_first_type grpc_slice_buffer_take_first_import; +#define grpc_slice_buffer_take_first grpc_slice_buffer_take_first_import typedef void *(*gpr_malloc_type)(size_t size); extern gpr_malloc_type gpr_malloc_import; #define gpr_malloc gpr_malloc_import @@ -581,7 +689,7 @@ extern gpr_join_host_port_type gpr_join_host_port_import; typedef int(*gpr_split_host_port_type)(const char *name, char **host, char **port); extern gpr_split_host_port_type gpr_split_host_port_import; #define gpr_split_host_port gpr_split_host_port_import -typedef void(*gpr_log_type)(const char *file, int line, gpr_log_severity severity, const char *format, ...) GPRC_PRINT_FORMAT_CHECK(4, 5); +typedef void(*gpr_log_type)(const char *file, int line, gpr_log_severity severity, const char *format, ...) GPR_PRINT_FORMAT_CHECK(4, 5); extern gpr_log_type gpr_log_import; #define gpr_log gpr_log_import typedef void(*gpr_log_message_type)(const char *file, int line, gpr_log_severity severity, const char *message); @@ -599,97 +707,10 @@ extern gpr_set_log_function_type gpr_set_log_function_import; typedef char *(*gpr_format_message_type)(int messageid); extern gpr_format_message_type gpr_format_message_import; #define gpr_format_message gpr_format_message_import -typedef gpr_slice(*gpr_slice_ref_type)(gpr_slice s); -extern gpr_slice_ref_type gpr_slice_ref_import; -#define gpr_slice_ref gpr_slice_ref_import -typedef void(*gpr_slice_unref_type)(gpr_slice s); -extern gpr_slice_unref_type gpr_slice_unref_import; -#define gpr_slice_unref gpr_slice_unref_import -typedef gpr_slice(*gpr_slice_new_type)(void *p, size_t len, void (*destroy)(void *)); -extern gpr_slice_new_type gpr_slice_new_import; -#define gpr_slice_new gpr_slice_new_import -typedef gpr_slice(*gpr_slice_new_with_user_data_type)(void *p, size_t len, void (*destroy)(void *), void *user_data); -extern gpr_slice_new_with_user_data_type gpr_slice_new_with_user_data_import; -#define gpr_slice_new_with_user_data gpr_slice_new_with_user_data_import -typedef gpr_slice(*gpr_slice_new_with_len_type)(void *p, size_t len, void (*destroy)(void *, size_t)); -extern gpr_slice_new_with_len_type gpr_slice_new_with_len_import; -#define gpr_slice_new_with_len gpr_slice_new_with_len_import -typedef gpr_slice(*gpr_slice_malloc_type)(size_t length); -extern gpr_slice_malloc_type gpr_slice_malloc_import; -#define gpr_slice_malloc gpr_slice_malloc_import -typedef gpr_slice(*gpr_slice_from_copied_string_type)(const char *source); -extern gpr_slice_from_copied_string_type gpr_slice_from_copied_string_import; -#define gpr_slice_from_copied_string gpr_slice_from_copied_string_import -typedef gpr_slice(*gpr_slice_from_copied_buffer_type)(const char *source, size_t len); -extern gpr_slice_from_copied_buffer_type gpr_slice_from_copied_buffer_import; -#define gpr_slice_from_copied_buffer gpr_slice_from_copied_buffer_import -typedef gpr_slice(*gpr_slice_from_static_string_type)(const char *source); -extern gpr_slice_from_static_string_type gpr_slice_from_static_string_import; -#define gpr_slice_from_static_string gpr_slice_from_static_string_import -typedef gpr_slice(*gpr_slice_sub_type)(gpr_slice s, size_t begin, size_t end); -extern gpr_slice_sub_type gpr_slice_sub_import; -#define gpr_slice_sub gpr_slice_sub_import -typedef gpr_slice(*gpr_slice_sub_no_ref_type)(gpr_slice s, size_t begin, size_t end); -extern gpr_slice_sub_no_ref_type gpr_slice_sub_no_ref_import; -#define gpr_slice_sub_no_ref gpr_slice_sub_no_ref_import -typedef gpr_slice(*gpr_slice_split_tail_type)(gpr_slice *s, size_t split); -extern gpr_slice_split_tail_type gpr_slice_split_tail_import; -#define gpr_slice_split_tail gpr_slice_split_tail_import -typedef gpr_slice(*gpr_slice_split_head_type)(gpr_slice *s, size_t split); -extern gpr_slice_split_head_type gpr_slice_split_head_import; -#define gpr_slice_split_head gpr_slice_split_head_import -typedef gpr_slice(*gpr_empty_slice_type)(void); -extern gpr_empty_slice_type gpr_empty_slice_import; -#define gpr_empty_slice gpr_empty_slice_import -typedef int(*gpr_slice_cmp_type)(gpr_slice a, gpr_slice b); -extern gpr_slice_cmp_type gpr_slice_cmp_import; -#define gpr_slice_cmp gpr_slice_cmp_import -typedef int(*gpr_slice_str_cmp_type)(gpr_slice a, const char *b); -extern gpr_slice_str_cmp_type gpr_slice_str_cmp_import; -#define gpr_slice_str_cmp gpr_slice_str_cmp_import -typedef void(*gpr_slice_buffer_init_type)(gpr_slice_buffer *sb); -extern gpr_slice_buffer_init_type gpr_slice_buffer_init_import; -#define gpr_slice_buffer_init gpr_slice_buffer_init_import -typedef void(*gpr_slice_buffer_destroy_type)(gpr_slice_buffer *sb); -extern gpr_slice_buffer_destroy_type gpr_slice_buffer_destroy_import; -#define gpr_slice_buffer_destroy gpr_slice_buffer_destroy_import -typedef void(*gpr_slice_buffer_add_type)(gpr_slice_buffer *sb, gpr_slice slice); -extern gpr_slice_buffer_add_type gpr_slice_buffer_add_import; -#define gpr_slice_buffer_add gpr_slice_buffer_add_import -typedef size_t(*gpr_slice_buffer_add_indexed_type)(gpr_slice_buffer *sb, gpr_slice slice); -extern gpr_slice_buffer_add_indexed_type gpr_slice_buffer_add_indexed_import; -#define gpr_slice_buffer_add_indexed gpr_slice_buffer_add_indexed_import -typedef void(*gpr_slice_buffer_addn_type)(gpr_slice_buffer *sb, gpr_slice *slices, size_t n); -extern gpr_slice_buffer_addn_type gpr_slice_buffer_addn_import; -#define gpr_slice_buffer_addn gpr_slice_buffer_addn_import -typedef uint8_t *(*gpr_slice_buffer_tiny_add_type)(gpr_slice_buffer *sb, size_t len); -extern gpr_slice_buffer_tiny_add_type gpr_slice_buffer_tiny_add_import; -#define gpr_slice_buffer_tiny_add gpr_slice_buffer_tiny_add_import -typedef void(*gpr_slice_buffer_pop_type)(gpr_slice_buffer *sb); -extern gpr_slice_buffer_pop_type gpr_slice_buffer_pop_import; -#define gpr_slice_buffer_pop gpr_slice_buffer_pop_import -typedef void(*gpr_slice_buffer_reset_and_unref_type)(gpr_slice_buffer *sb); -extern gpr_slice_buffer_reset_and_unref_type gpr_slice_buffer_reset_and_unref_import; -#define gpr_slice_buffer_reset_and_unref gpr_slice_buffer_reset_and_unref_import -typedef void(*gpr_slice_buffer_swap_type)(gpr_slice_buffer *a, gpr_slice_buffer *b); -extern gpr_slice_buffer_swap_type gpr_slice_buffer_swap_import; -#define gpr_slice_buffer_swap gpr_slice_buffer_swap_import -typedef void(*gpr_slice_buffer_move_into_type)(gpr_slice_buffer *src, gpr_slice_buffer *dst); -extern gpr_slice_buffer_move_into_type gpr_slice_buffer_move_into_import; -#define gpr_slice_buffer_move_into gpr_slice_buffer_move_into_import -typedef void(*gpr_slice_buffer_trim_end_type)(gpr_slice_buffer *src, size_t n, gpr_slice_buffer *garbage); -extern gpr_slice_buffer_trim_end_type gpr_slice_buffer_trim_end_import; -#define gpr_slice_buffer_trim_end gpr_slice_buffer_trim_end_import -typedef void(*gpr_slice_buffer_move_first_type)(gpr_slice_buffer *src, size_t n, gpr_slice_buffer *dst); -extern gpr_slice_buffer_move_first_type gpr_slice_buffer_move_first_import; -#define gpr_slice_buffer_move_first gpr_slice_buffer_move_first_import -typedef gpr_slice(*gpr_slice_buffer_take_first_type)(gpr_slice_buffer *src); -extern gpr_slice_buffer_take_first_type gpr_slice_buffer_take_first_import; -#define gpr_slice_buffer_take_first gpr_slice_buffer_take_first_import typedef char *(*gpr_strdup_type)(const char *src); extern gpr_strdup_type gpr_strdup_import; #define gpr_strdup gpr_strdup_import -typedef int(*gpr_asprintf_type)(char **strp, const char *format, ...) GPRC_PRINT_FORMAT_CHECK(2, 3); +typedef int(*gpr_asprintf_type)(char **strp, const char *format, ...) GPR_PRINT_FORMAT_CHECK(2, 3); extern gpr_asprintf_type gpr_asprintf_import; #define gpr_asprintf gpr_asprintf_import typedef const char *(*gpr_subprocess_binary_extension_type)(); diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 2a6a246e67..c7b112c94b 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -37,6 +37,7 @@ #include "rb_server.h" #include <grpc/grpc.h> +#include <grpc/support/atm.h> #include <grpc/grpc_security.h> #include <grpc/support/log.h> #include "rb_call.h" @@ -59,22 +60,26 @@ typedef struct grpc_rb_server { /* The actual server */ grpc_server *wrapped; grpc_completion_queue *queue; + gpr_atm shutdown_started; } grpc_rb_server; static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) { grpc_event ev; - if (server->wrapped != NULL) { - grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL); - ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL); - if (ev.type == GRPC_QUEUE_TIMEOUT) { - grpc_server_cancel_all_calls(server->wrapped); - rb_completion_queue_pluck(server->queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + // This can be started by app or implicitly by GC. Avoid a race between these. + if (gpr_atm_full_fetch_add(&server->shutdown_started, (gpr_atm)1) == 0) { + if (server->wrapped != NULL) { + grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL); + ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL); + if (ev.type == GRPC_QUEUE_TIMEOUT) { + grpc_server_cancel_all_calls(server->wrapped); + rb_completion_queue_pluck(server->queue, NULL, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + } + grpc_server_destroy(server->wrapped); + grpc_rb_completion_queue_destroy(server->queue); + server->wrapped = NULL; + server->queue = NULL; } - grpc_server_destroy(server->wrapped); - grpc_rb_completion_queue_destroy(server->queue); - server->wrapped = NULL; - server->queue = NULL; } } @@ -115,6 +120,7 @@ static const rb_data_type_t grpc_rb_server_data_type = { static VALUE grpc_rb_server_alloc(VALUE cls) { grpc_rb_server *wrapper = ALLOC(grpc_rb_server); wrapper->wrapped = NULL; + wrapper->shutdown_started = (gpr_atm)0; return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper); } diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb index 23b2bb7e12..f6998e17c4 100644 --- a/src/ruby/lib/grpc/errors.rb +++ b/src/ruby/lib/grpc/errors.rb @@ -35,9 +35,18 @@ module GRPC # either end of a GRPC connection. When raised, it indicates that a status # error should be returned to the other end of a GRPC connection; when # caught it means that this end received a status error. + # + # There is also subclass of BadStatus in this module for each GRPC status. + # E.g., the GRPC::Cancelled class corresponds to status CANCELLED. + # + # See + # https://github.com/grpc/grpc/blob/master/include/grpc/impl/codegen/status.h + # for detailed descriptions of each status code. class BadStatus < StandardError attr_reader :code, :details, :metadata + include GRPC::Core::StatusCodes + # @param code [Numeric] the status code # @param details [String] the details of the exception # @param metadata [Hash] the error's metadata @@ -55,9 +64,152 @@ module GRPC def to_status Struct::Status.new(code, details, @metadata) end + + def self.new_status_exception(code, details = 'unkown cause', metadata = {}) + codes = {} + codes[OK] = Ok + codes[CANCELLED] = Cancelled + codes[UNKNOWN] = Unknown + codes[INVALID_ARGUMENT] = InvalidArgument + codes[DEADLINE_EXCEEDED] = DeadlineExceeded + codes[NOT_FOUND] = NotFound + codes[ALREADY_EXISTS] = AlreadyExists + codes[PERMISSION_DENIED] = PermissionDenied + codes[UNAUTHENTICATED] = Unauthenticated + codes[RESOURCE_EXHAUSTED] = ResourceExhausted + codes[FAILED_PRECONDITION] = FailedPrecondition + codes[ABORTED] = Aborted + codes[OUT_OF_RANGE] = OutOfRange + codes[UNIMPLEMENTED] = Unimplemented + codes[INTERNAL] = Internal + codes[UNIMPLEMENTED] = Unimplemented + codes[UNAVAILABLE] = Unavailable + codes[DATA_LOSS] = DataLoss + + if codes[code].nil? + BadStatus.new(code, details, metadata) + else + codes[code].new(details, metadata) + end + end + end + + # GRPC status code corresponding to status OK + class Ok < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::OK, details, metadata) + end end - # Cancelled is an exception class that indicates that an rpc was cancelled. - class Cancelled < StandardError + # GRPC status code corresponding to status CANCELLED + class Cancelled < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::CANCELLED, details, metadata) + end + end + + # GRPC status code corresponding to status UNKNOWN + class Unknown < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::UNKNOWN, details, metadata) + end + end + + # GRPC status code corresponding to status INVALID_ARGUMENT + class InvalidArgument < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::INVALID_ARGUMENT, details, metadata) + end + end + + # GRPC status code corresponding to status DEADLINE_EXCEEDED + class DeadlineExceeded < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::DEADLINE_EXCEEDED, details, metadata) + end + end + + # GRPC status code corresponding to status NOT_FOUND + class NotFound < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::NOT_FOUND, details, metadata) + end + end + + # GRPC status code corresponding to status ALREADY_EXISTS + class AlreadyExists < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::ALREADY_EXISTS, details, metadata) + end + end + + # GRPC status code corresponding to status PERMISSION_DENIED + class PermissionDenied < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::PERMISSION_DENIED, details, metadata) + end + end + + # GRPC status code corresponding to status UNAUTHENTICATED + class Unauthenticated < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::UNAUTHENTICATED, details, metadata) + end + end + + # GRPC status code corresponding to status RESOURCE_EXHAUSTED + class ResourceExhausted < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::RESOURCE_EXHAUSTED, details, metadata) + end + end + + # GRPC status code corresponding to status FAILED_PRECONDITION + class FailedPrecondition < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::FAILED_PRECONDITION, details, metadata) + end + end + + # GRPC status code corresponding to status ABORTED + class Aborted < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::ABORTED, details, metadata) + end + end + + # GRPC status code corresponding to status OUT_OF_RANGE + class OutOfRange < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::OUT_OF_RANGE, details, metadata) + end + end + + # GRPC status code corresponding to status UNIMPLEMENTED + class Unimplemented < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::UNIMPLEMENTED, details, metadata) + end + end + + # GRPC status code corresponding to status INTERNAL + class Internal < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::INTERNAL, details, metadata) + end + end + + # GRPC status code corresponding to status UNAVAILABLE + class Unavailable < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::UNAVAILABLE, details, metadata) + end + end + + # GRPC status code corresponding to status DATA_LOSS + class DataLoss < BadStatus + def initialize(details = 'unknown cause', metadata = {}) + super(Core::StatusCodes::DATA_LOSS, details, metadata) + end end end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index dfc2644c46..3b31f77ec0 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -43,7 +43,8 @@ class Struct GRPC.logger.debug("Failing with status #{status}") # raise BadStatus, propagating the metadata if present. md = status.metadata - fail GRPC::BadStatus.new(status.code, status.details, md) + fail GRPC::BadStatus.new_status_exception( + status.code, status.details, md) end status end @@ -156,41 +157,25 @@ module GRPC Operation.new(self) end - # writes_done indicates that all writes are completed. - # - # It blocks until the remote endpoint acknowledges with at status unless - # assert_finished is set to false. Any calls to #remote_send after this - # call will fail. - # - # @param assert_finished [true, false] when true(default), waits for - # FINISHED. - def writes_done(assert_finished = true) - ops = { - SEND_CLOSE_FROM_CLIENT => nil - } - ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished - batch_result = @call.run_batch(ops) - return unless assert_finished - unless batch_result.status.nil? - @call.trailing_metadata = batch_result.status.metadata - end - @call.status = batch_result.status - op_is_done - batch_result.check_status - end - # finished waits until a client call is completed. # # It blocks until the remote endpoint acknowledges by sending a status. def finished batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) - unless batch_result.status.nil? - @call.trailing_metadata = batch_result.status.metadata + attach_status_results_and_complete_call(batch_result) + end + + def attach_status_results_and_complete_call(recv_status_batch_result) + unless recv_status_batch_result.status.nil? + @call.trailing_metadata = recv_status_batch_result.status.metadata end - @call.status = batch_result.status - op_is_done - batch_result.check_status + @call.status = recv_status_batch_result.status @call.close + op_is_done + + # The RECV_STATUS in run_batch always succeeds + # Check the status for a bad status or failed run batch + recv_status_batch_result.check_status end # remote_send sends a request to the remote endpoint. @@ -226,6 +211,23 @@ module GRPC nil end + def server_unary_response(req, trailing_metadata: {}, + code: Core::StatusCodes::OK, details: 'OK') + ops = {} + @send_initial_md_mutex.synchronize do + ops[SEND_INITIAL_METADATA] = @metadata_to_send unless @metadata_sent + @metadata_sent = true + end + + payload = @marshal.call(req) + ops[SEND_MESSAGE] = payload + ops[SEND_STATUS_FROM_SERVER] = Struct::Status.new( + code, details, trailing_metadata) + ops[RECV_CLOSE_ON_SERVER] = nil + + @call.run_batch(ops) + end + # remote_read reads a response from the remote endpoint. # # It blocks until the remote endpoint replies with a message or status. @@ -240,9 +242,13 @@ module GRPC @call.metadata = batch_result.metadata @metadata_received = true end - unless batch_result.nil? || batch_result.message.nil? - res = @unmarshal.call(batch_result.message) - return res + get_message_from_batch_result(batch_result) + end + + def get_message_from_batch_result(recv_message_batch_result) + unless recv_message_batch_result.nil? || + recv_message_batch_result.message.nil? + return @unmarshal.call(recv_message_batch_result.message) end GRPC.logger.debug('found nil; the final response has been sent') nil @@ -298,7 +304,6 @@ module GRPC return enum_for(:each_remote_read_then_finish) unless block_given? loop do resp = remote_read - break if resp.is_a? Struct::Status # is an OK status if resp.nil? # the last response was received, but not finished yet finished break @@ -315,15 +320,25 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def request_response(req, metadata: {}) - merge_metadata_to_send(metadata) && send_initial_metadata - remote_send(req) - writes_done(false) - response = remote_read - finished unless response.is_a? Struct::Status - response - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e + ops = { + SEND_MESSAGE => @marshal.call(req), + SEND_CLOSE_FROM_CLIENT => nil, + RECV_INITIAL_METADATA => nil, + RECV_MESSAGE => nil, + RECV_STATUS_ON_CLIENT => nil + } + @send_initial_md_mutex.synchronize do + # Metadata might have already been sent if this is an operation view + unless @metadata_sent + ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata) + end + @metadata_sent = true + end + batch_result = @call.run_batch(ops) + + @call.metadata = batch_result.metadata + attach_status_results_and_complete_call(batch_result) + get_message_from_batch_result(batch_result) end # client_streamer sends a stream of requests to a GRPC server, and @@ -339,12 +354,20 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def client_streamer(requests, metadata: {}) - merge_metadata_to_send(metadata) && send_initial_metadata - requests.each { |r| remote_send(r) } - writes_done(false) - response = remote_read - finished unless response.is_a? Struct::Status - response + # Metadata might have already been sent if this is an operation view + merge_metadata_and_send_if_not_already_sent(metadata) + + requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } + batch_result = @call.run_batch( + SEND_CLOSE_FROM_CLIENT => nil, + RECV_INITIAL_METADATA => nil, + RECV_MESSAGE => nil, + RECV_STATUS_ON_CLIENT => nil + ) + + @call.metadata = batch_result.metadata + attach_status_results_and_complete_call(batch_result) + get_message_from_batch_result(batch_result) rescue GRPC::Core::CallError => e finished # checks for Cancelled raise e @@ -365,9 +388,18 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator|nil] a response Enumerator def server_streamer(req, metadata: {}) - merge_metadata_to_send(metadata) && send_initial_metadata - remote_send(req) - writes_done(false) + ops = { + SEND_MESSAGE => @marshal.call(req), + SEND_CLOSE_FROM_CLIENT => nil + } + @send_initial_md_mutex.synchronize do + # Metadata might have already been sent if this is an operation view + unless @metadata_sent + ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata) + end + @metadata_sent = true + end + @call.run_batch(ops) replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } @@ -404,7 +436,8 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, metadata: {}, &blk) - merge_metadata_to_send(metadata) && send_initial_metadata + # Metadata might have already been sent if this is an operation view + merge_metadata_and_send_if_not_already_sent(metadata) bd = BidiCall.new(@call, @marshal, @unmarshal, @@ -457,6 +490,15 @@ module GRPC end end + def merge_metadata_and_send_if_not_already_sent(new_metadata = {}) + @send_initial_md_mutex.synchronize do + return if @metadata_sent + @metadata_to_send.merge!(new_metadata) + @call.run_batch(SEND_INITIAL_METADATA => @metadata_to_send) + @metadata_sent = true + end + end + private # Starts the call if not already started diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index d7cd9e6df2..8943f3f1fe 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -219,6 +219,10 @@ module GRPC GRPC.logger.debug('bidi-read-loop: finished') @reads_complete = true finished + # Make sure that the write loop is done done before finishing the call. + # Note that blocking is ok at this point because we've already received + # a status + @enq_th.join if is_client end end end diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb index 0d7c1f7805..6934257cbc 100644 --- a/src/ruby/lib/grpc/generic/client_stub.rb +++ b/src/ruby/lib/grpc/generic/client_stub.rb @@ -168,6 +168,7 @@ module GRPC # return the operation view of the active_call; define #execute as a # new method for this instance that invokes #request_response. + c.merge_metadata_to_send(metadata) op = c.operation op.define_singleton_method(:execute) do c.request_response(req, metadata: metadata) @@ -231,9 +232,10 @@ module GRPC # return the operation view of the active_call; define #execute as a # new method for this instance that invokes #client_streamer. + c.merge_metadata_to_send(metadata) op = c.operation op.define_singleton_method(:execute) do - c.client_streamer(requests, metadata: metadata) + c.client_streamer(requests) end op end @@ -309,9 +311,10 @@ module GRPC # return the operation view of the active_call; define #execute # as a new method for this instance that invokes #server_streamer + c.merge_metadata_to_send(metadata) op = c.operation op.define_singleton_method(:execute) do - c.server_streamer(req, metadata: metadata, &blk) + c.server_streamer(req, &blk) end op end @@ -417,15 +420,15 @@ module GRPC deadline: deadline, parent: parent, credentials: credentials) - return c.bidi_streamer(requests, metadata: metadata, &blk) unless return_op # return the operation view of the active_call; define #execute # as a new method for this instance that invokes #bidi_streamer + c.merge_metadata_to_send(metadata) op = c.operation op.define_singleton_method(:execute) do - c.bidi_streamer(requests, metadata: metadata, &blk) + c.bidi_streamer(requests, &blk) end op end @@ -445,7 +448,6 @@ module GRPC deadline: nil, parent: nil, credentials: nil) - deadline = from_relative_time(@timeout) if deadline.nil? # Provide each new client call with its own completion queue call = @ch.create_call(parent, # parent call diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 584fe78169..d46c4a1b5c 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -62,25 +62,44 @@ module GRPC proc { |o| unmarshal_class.method(unmarshal_method).call(o) } end + def handle_request_response(active_call, mth) + req = active_call.remote_read + resp = mth.call(req, active_call.single_req_view) + active_call.server_unary_response( + resp, trailing_metadata: active_call.output_metadata) + end + + def handle_client_streamer(active_call, mth) + resp = mth.call(active_call.multi_req_view) + active_call.server_unary_response( + resp, trailing_metadata: active_call.output_metadata) + end + + def handle_server_streamer(active_call, mth) + req = active_call.remote_read + replys = mth.call(req, active_call.single_req_view) + replys.each { |r| active_call.remote_send(r) } + send_status(active_call, OK, 'OK', active_call.output_metadata) + end + + def handle_bidi_streamer(active_call, mth) + active_call.run_server_bidi(mth) + send_status(active_call, OK, 'OK', active_call.output_metadata) + end + def run_server_method(active_call, mth) # While a server method is running, it might be cancelled, its deadline # might be reached, the handler could throw an unknown error, or a # well-behaved handler could throw a StatusError. if request_response? - req = active_call.remote_read - resp = mth.call(req, active_call.single_req_view) - active_call.remote_send(resp) + handle_request_response(active_call, mth) elsif client_streamer? - resp = mth.call(active_call.multi_req_view) - active_call.remote_send(resp) + handle_client_streamer(active_call, mth) elsif server_streamer? - req = active_call.remote_read - replys = mth.call(req, active_call.single_req_view) - replys.each { |r| active_call.remote_send(r) } + handle_server_streamer(active_call, mth) else # is a bidi_stream - active_call.run_server_bidi(mth) + handle_bidi_streamer(active_call, mth) end - send_status(active_call, OK, 'OK', active_call.output_metadata) rescue BadStatus => e # this is raised by handlers that want GRPC to send an application error # code and detail message and some additional app-specific metadata. @@ -91,7 +110,7 @@ module GRPC # Log it, but don't notify the other endpoint.. GRPC.logger.warn("failed call: #{active_call}\n#{e}") rescue Core::OutOfTime - # This is raised when active_call#method.call exceeeds the deadline + # This is raised when active_call#method.call exceeds the deadline # event. Send a status of deadline exceeded GRPC.logger.warn("late call: #{active_call}") send_status(active_call, DEADLINE_EXCEEDED, 'late') @@ -100,7 +119,7 @@ module GRPC # Send back a UNKNOWN status to the client GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN") GRPC.logger.warn(e) - send_status(active_call, UNKNOWN, 'no reason given') + send_status(active_call, UNKNOWN, "#{e.class}: #{e.message}") end def assert_arity_matches(mth) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 7dbcb7d479..00e0db71be 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -31,10 +31,133 @@ require_relative '../grpc' require_relative 'active_call' require_relative 'service' require 'thread' -require 'concurrent' # GRPC contains the General RPC module. module GRPC + # Pool is a simple thread pool. + class Pool + # Default keep alive period is 1s + DEFAULT_KEEP_ALIVE = 1 + + def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) + fail 'pool size must be positive' unless size > 0 + @jobs = Queue.new + @size = size + @stopped = false + @stop_mutex = Mutex.new # needs to be held when accessing @stopped + @stop_cond = ConditionVariable.new + @workers = [] + @keep_alive = keep_alive + + # Each worker thread has its own queue to push and pull jobs + # these queues are put into @ready_queues when that worker is idle + @ready_workers = Queue.new + end + + # Returns the number of jobs waiting + def jobs_waiting + @jobs.size + end + + def ready_for_work? + # Busy worker threads are either doing work, or have a single job + # waiting on them. Workers that are idle with no jobs waiting + # have their "queues" in @ready_workers + !@ready_workers.empty? + end + + # Runs the given block on the queue with the provided args. + # + # @param args the args passed blk when it is called + # @param blk the block to call + def schedule(*args, &blk) + return if blk.nil? + @stop_mutex.synchronize do + if @stopped + GRPC.logger.warn('did not schedule job, already stopped') + return + end + GRPC.logger.info('schedule another job') + fail 'No worker threads available' if @ready_workers.empty? + worker_queue = @ready_workers.pop + + fail 'worker already has a task waiting' unless worker_queue.empty? + worker_queue << [blk, args] + end + end + + # Starts running the jobs in the thread pool. + def start + @stop_mutex.synchronize do + fail 'already stopped' if @stopped + end + until @workers.size == @size.to_i + new_worker_queue = Queue.new + @ready_workers << new_worker_queue + next_thread = Thread.new(new_worker_queue) do |jobs| + catch(:exit) do # allows { throw :exit } to kill a thread + loop_execute_jobs(jobs) + end + remove_current_thread + end + @workers << next_thread + end + end + + # Stops the jobs in the pool + def stop + GRPC.logger.info('stopping, will wait for all the workers to exit') + schedule { throw :exit } while ready_for_work? + @stop_mutex.synchronize do # wait @keep_alive for works to stop + @stopped = true + @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 + end + forcibly_stop_workers + GRPC.logger.info('stopped, all workers are shutdown') + end + + protected + + # Forcibly shutdown any threads that are still alive. + def forcibly_stop_workers + return unless @workers.size > 0 + GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)") + @workers.each do |t| + next unless t.alive? + begin + t.exit + rescue StandardError => e + GRPC.logger.warn('error while terminating a worker') + GRPC.logger.warn(e) + end + end + end + + # removes the threads from workers, and signal when all the + # threads are complete. + def remove_current_thread + @stop_mutex.synchronize do + @workers.delete(Thread.current) + @stop_cond.signal if @workers.size.zero? + end + end + + def loop_execute_jobs(worker_queue) + loop do + begin + blk, args = worker_queue.pop + blk.call(*args) + rescue StandardError => e + GRPC.logger.warn('Error in worker thread') + GRPC.logger.warn(e) + end + # there shouldn't be any work given to this thread while its busy + fail('received a task while busy') unless worker_queue.empty? + @ready_workers << worker_queue + end + end + end + # RpcServer hosts a number of services and makes them available on the # network. class RpcServer @@ -44,14 +167,11 @@ module GRPC def_delegators :@server, :add_http2_port - # Default max size of the thread pool size is 100 - DEFAULT_MAX_POOL_SIZE = 100 - - # Default minimum size of the thread pool is 5 - DEFAULT_MIN_POOL_SIZE = 5 + # Default thread pool size is 30 + DEFAULT_POOL_SIZE = 30 - # Default max_waiting_requests size is 60 - DEFAULT_MAX_WAITING_REQUESTS = 60 + # Deprecated due to internal changes to the thread pool + DEFAULT_MAX_WAITING_REQUESTS = 20 # Default poll period is 1s DEFAULT_POLL_PERIOD = 1 @@ -74,12 +194,12 @@ module GRPC # There are some specific keyword args used to configure the RpcServer # instance. # - # * pool_size: the maximum size of the thread pool that the server's - # thread pool can reach. + # * pool_size: the size of the thread pool the server uses to run its + # threads. No more concurrent requests can be made than the size + # of the thread pool # - # * max_waiting_requests: the maximum number of requests that are not - # being handled to allow. When this limit is exceeded, the server responds - # with not available to new requests + # * max_waiting_requests: Deprecated due to internal changes to the thread + # pool. This is still an argument for compatibility but is ignored. # # * poll_period: when present, the server polls for new events with this # period @@ -91,8 +211,7 @@ module GRPC # # * server_args: # A server arguments hash to be passed down to the underlying core server - def initialize(pool_size:DEFAULT_MAX_POOL_SIZE, - min_pool_size:DEFAULT_MIN_POOL_SIZE, + def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, connect_md_proc:nil, @@ -100,12 +219,8 @@ module GRPC @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @max_waiting_requests = max_waiting_requests @poll_period = poll_period - - @pool = Concurrent::ThreadPoolExecutor.new( - min_threads: [min_pool_size, pool_size].min, - max_threads: pool_size, - max_queue: max_waiting_requests, - fallback_policy: :discard) + @pool_size = pool_size + @pool = Pool.new(@pool_size) @run_cond = ConditionVariable.new @run_mutex = Mutex.new # running_state can take 4 values: :not_started, :running, :stopping, and @@ -126,8 +241,7 @@ module GRPC end deadline = from_relative_time(@poll_period) @server.close(deadline) - @pool.shutdown - @pool.wait_for_termination + @pool.stop end def running_state @@ -224,6 +338,7 @@ module GRPC def run @run_mutex.synchronize do fail 'cannot run without registering services' if rpc_descs.size.zero? + @pool.start @server.start transition_running_state(:running) @run_cond.broadcast @@ -235,12 +350,8 @@ module GRPC # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs def available?(an_rpc) - jobs_count, max = @pool.queue_length, @pool.max_queue - GRPC.logger.info("waiting: #{jobs_count}, max: #{max}") - - # remaining capacity for ThreadPoolExecutors is -1 if unbounded - return an_rpc if @pool.remaining_capacity != 0 - GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") + return an_rpc if @pool.ready_for_work? + GRPC.logger.warn('no free worker threads currently') noop = proc { |x| x } # Create a new active call that knows that metadata hasn't been @@ -275,7 +386,7 @@ module GRPC break if (!an_rpc.nil?) && an_rpc.call.nil? active_call = new_active_server_call(an_rpc) unless active_call.nil? - @pool.post(active_call) do |ac| + @pool.schedule(active_call) do |ac| c, mth = ac begin rpc_descs[mth].run_server_method(c, rpc_handlers[mth]) diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb index 7cb9f1cc99..84f1ce7520 100644 --- a/src/ruby/lib/grpc/generic/service.rb +++ b/src/ruby/lib/grpc/generic/service.rb @@ -110,8 +110,9 @@ module GRPC rpc_descs[name] = RpcDesc.new(name, input, output, marshal_class_method, unmarshal_class_method) - define_method(name) do - fail GRPC::BadStatus, GRPC::Core::StatusCodes::UNIMPLEMENTED + define_method(GenericService.underscore(name.to_s).to_sym) do + fail GRPC::BadStatus.new_status_exception( + GRPC::Core::StatusCodes::UNIMPLEMENTED) end end diff --git a/src/ruby/pb/grpc/health/checker.rb b/src/ruby/pb/grpc/health/checker.rb index 4bce1744c4..6b2d852ebf 100644 --- a/src/ruby/pb/grpc/health/checker.rb +++ b/src/ruby/pb/grpc/health/checker.rb @@ -52,7 +52,9 @@ module Grpc @status_mutex.synchronize do status = @statuses["#{req.service}"] end - fail GRPC::BadStatus, StatusCodes::NOT_FOUND if status.nil? + if status.nil? + fail GRPC::BadStatus.new_status_exception(StatusCodes::NOT_FOUND) + end HealthCheckResponse.new(status: status) end diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb index 1e3ae65630..f101f9d89e 100755 --- a/src/ruby/pb/test/client.rb +++ b/src/ruby/pb/test/client.rb @@ -459,11 +459,8 @@ class NamedTests deadline = GRPC::Core::TimeConsts::from_relative_time(1) resps = @stub.full_duplex_call(enum.each_item, deadline: deadline) resps.each { } # wait to receive each request (or timeout) - fail 'Should have raised GRPC::BadStatus(DEADLINE_EXCEEDED)' - rescue GRPC::BadStatus => e - assert("#{__callee__}: status was wrong") do - e.code == GRPC::Core::StatusCodes::DEADLINE_EXCEEDED - end + fail 'Should have raised GRPC::DeadlineExceeded' + rescue GRPC::DeadlineExceeded end def empty_stream diff --git a/src/ruby/qps/client.rb b/src/ruby/qps/client.rb index 8aed866da5..817192626b 100644 --- a/src/ruby/qps/client.rb +++ b/src/ruby/qps/client.rb @@ -134,6 +134,7 @@ class BenchmarkClient resp = stub.streaming_call(q.each_item) start = Time.now q.push(req) + pushed_sentinal = false resp.each do |r| @histogram.add((Time.now-start)*1e9) if !@done @@ -141,8 +142,9 @@ class BenchmarkClient start = Time.now q.push(req) else - q.push(self) - break + q.push(self) unless pushed_sentinal + # Continue polling on the responses to consume and release resources + pushed_sentinal = true end end end diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb index d0c2073dd1..6175855cd9 100644 --- a/src/ruby/qps/server.rb +++ b/src/ruby/qps/server.rb @@ -63,7 +63,9 @@ class BenchmarkServer cred = :this_port_is_insecure end # Make sure server can handle the large number of calls in benchmarks - @server = GRPC::RpcServer.new(pool_size: 100, max_waiting_requests: 100) + # TODO: @apolcyn, if scenario config increases total outstanding + # calls then will need to increase the pool size too + @server = GRPC::RpcServer.new(pool_size: 1024, max_waiting_requests: 1024) @port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred) @server.handle(BenchmarkServiceImpl.new) @start_time = Time.now diff --git a/src/ruby/spec/error_sanity_spec.rb b/src/ruby/spec/error_sanity_spec.rb new file mode 100644 index 0000000000..77e94a8816 --- /dev/null +++ b/src/ruby/spec/error_sanity_spec.rb @@ -0,0 +1,64 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require 'grpc' + +StatusCodes = GRPC::Core::StatusCodes + +describe StatusCodes do + # convert upper snake-case to camel case. + # e.g., DEADLINE_EXCEEDED -> DeadlineExceeded + def upper_snake_to_camel(name) + name.to_s.split('_').map(&:downcase).map(&:capitalize).join('') + end + + StatusCodes.constants.each do |status_name| + it 'there is a subclass of BadStatus corresponding to StatusCode: ' \ + "#{status_name} that has code: #{StatusCodes.const_get(status_name)}" do + camel_case = upper_snake_to_camel(status_name) + error_class = GRPC.const_get(camel_case) + # expect the error class to be a subclass of BadStatus + expect(error_class < GRPC::BadStatus) + + error_object = error_class.new + # check that the code matches the int value of the error's constant + status_code = StatusCodes.const_get(status_name) + expect(error_object.code).to eq(status_code) + + # check default parameters + expect(error_object.details).to eq('unknown cause') + expect(error_object.metadata).to eq({}) + + # check that the BadStatus factory for creates the correct + # exception too + from_factory = GRPC::BadStatus.new_status_exception(status_code) + expect(from_factory.is_a?(error_class)).to be(true) + end + end +end diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 5ae4f25537..aa51d9d7b1 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -402,7 +402,7 @@ describe GRPC::ActiveCall do @pass_through, deadline) msg = 'message is a string' client_call.remote_send(msg) - client_call.writes_done(false) + call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) server_call = expect_server_to_receive(msg) server_call.remote_send('server_response') server_call.send_status(OK, 'OK') @@ -460,7 +460,7 @@ describe GRPC::ActiveCall do msg = 'message is a string' reply = 'server_response' client_call.remote_send(msg) - client_call.writes_done(false) + call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) server_call = expect_server_to_receive(msg) e = client_call.each_remote_read n = 3 # arbitrary value > 1 @@ -473,7 +473,7 @@ describe GRPC::ActiveCall do end end - describe '#writes_done' do + describe '#closing the call from the client' do it 'finishes ok if the server sends a status response' do call = make_test_call ActiveCall.client_invoke(call) @@ -481,7 +481,9 @@ describe GRPC::ActiveCall do @pass_through, deadline) msg = 'message is a string' client_call.remote_send(msg) - expect { client_call.writes_done(false) }.to_not raise_error + expect do + call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) + end.to_not raise_error server_call = expect_server_to_receive(msg) server_call.remote_send('server_response') expect(client_call.remote_read).to eq('server_response') @@ -500,11 +502,13 @@ describe GRPC::ActiveCall do server_call.remote_send('server_response') server_call.send_status(OK, 'status code is OK') expect(client_call.remote_read).to eq('server_response') - expect { client_call.writes_done(false) }.to_not raise_error + expect do + call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) + end.to_not raise_error expect { client_call.finished }.to_not raise_error end - it 'finishes ok if writes_done is true' do + it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do call = make_test_call ActiveCall.client_invoke(call) client_call = ActiveCall.new(call, @pass_through, @@ -515,7 +519,11 @@ describe GRPC::ActiveCall do server_call.remote_send('server_response') server_call.send_status(OK, 'status code is OK') expect(client_call.remote_read).to eq('server_response') - expect { client_call.writes_done(true) }.to_not raise_error + expect do + call.run_batch( + CallOps::SEND_CLOSE_FROM_CLIENT => nil, + CallOps::RECV_STATUS_ON_CLIENT => nil) + end.to_not raise_error end end diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 6034b5419c..b51b291cbd 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -168,42 +168,93 @@ describe 'ClientStub' do expect(&blk).to raise_error(GRPC::BadStatus) th.join end + + it 'should receive UNAUTHENTICATED if call credentials plugin fails' do + server_port = create_secure_test_server + th = run_request_response(@sent_msg, @resp, @pass) + + certs = load_test_certs + secure_channel_creds = GRPC::Core::ChannelCredentials.new( + certs[0], nil, nil) + secure_stub_opts = { + channel_args: { + GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr' + } + } + stub = GRPC::ClientStub.new("localhost:#{server_port}", + secure_channel_creds, **secure_stub_opts) + + error_message = 'Failing call credentials callback' + failing_auth = proc do + fail error_message + end + creds = GRPC::Core::CallCredentials.new(failing_auth) + + unauth_error_occured = false + begin + get_response(stub, credentials: creds) + rescue GRPC::Unauthenticated => e + unauth_error_occured = true + expect(e.details.include?(error_message)).to be true + end + expect(unauth_error_occured).to eq(true) + + # Kill the server thread so tests can complete + th.kill + end end describe 'without a call operation' do - def get_response(stub) + def get_response(stub, credentials: nil) + puts credentials.inspect stub.request_response(@method, @sent_msg, noop, noop, - metadata: { k1: 'v1', k2: 'v2' }) + metadata: { k1: 'v1', k2: 'v2' }, + credentials: credentials) end it_behaves_like 'request response' end describe 'via a call operation' do - def get_response(stub) + def get_response(stub, run_start_call_first: false, credentials: nil) op = stub.request_response(@method, @sent_msg, noop, noop, return_op: true, metadata: { k1: 'v1', k2: 'v2' }, - deadline: from_relative_time(2)) + deadline: from_relative_time(2), + credentials: credentials) expect(op).to be_a(GRPC::ActiveCall::Operation) - op.execute + op.start_call if run_start_call_first + result = op.execute + op.wait # make sure wait doesn't hang + result end it_behaves_like 'request response' - end - end - describe '#client_streamer' do - shared_examples 'client streaming' do - before(:each) do + it 'sends metadata to the server ok when running start_call first' do server_port = create_test_server host = "localhost:#{server_port}" - @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - @metadata = { k1: 'v1', k2: 'v2' } - @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } - @resp = 'a_reply' + th = run_request_response(@sent_msg, @resp, @pass, + k1: 'v1', k2: 'v2') + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + expect(get_response(stub)).to eq(@resp) + th.join end + end + end + + describe '#client_streamer' do + before(:each) do + Thread.abort_on_exception = true + server_port = create_test_server + host = "localhost:#{server_port}" + @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + @metadata = { k1: 'v1', k2: 'v2' } + @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } + @resp = 'a_reply' + end + shared_examples 'client streaming' do it 'should send requests to/receive a reply from a server' do th = run_client_streamer(@sent_msgs, @resp, @pass) expect(get_response(@stub)).to eq(@resp) @@ -242,24 +293,33 @@ describe 'ClientStub' do end describe 'via a call operation' do - def get_response(stub) + def get_response(stub, run_start_call_first: false) op = stub.client_streamer(@method, @sent_msgs, noop, noop, return_op: true, metadata: @metadata) expect(op).to be_a(GRPC::ActiveCall::Operation) - op.execute + op.start_call if run_start_call_first + result = op.execute + op.wait # make sure wait doesn't hang + result end it_behaves_like 'client streaming' + + it 'sends metadata to the server ok when running start_call first' do + th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) + expect(get_response(@stub, run_start_call_first: true)).to eq(@resp) + th.join + end end end describe '#server_streamer' do - shared_examples 'server streaming' do - before(:each) do - @sent_msg = 'a_msg' - @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } - end + before(:each) do + @sent_msg = 'a_msg' + @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } + end + shared_examples 'server streaming' do it 'should send a request to/receive replies from a server' do server_port = create_test_server host = "localhost:#{server_port}" @@ -303,29 +363,44 @@ describe 'ClientStub' do end describe 'via a call operation' do - def get_responses(stub) - op = stub.server_streamer(@method, @sent_msg, noop, noop, - return_op: true, - metadata: { k1: 'v1', k2: 'v2' }) - expect(op).to be_a(GRPC::ActiveCall::Operation) - e = op.execute + after(:each) do + @op.wait # make sure wait doesn't hang + end + def get_responses(stub, run_start_call_first: false) + @op = stub.server_streamer(@method, @sent_msg, noop, noop, + return_op: true, + metadata: { k1: 'v1', k2: 'v2' }) + expect(@op).to be_a(GRPC::ActiveCall::Operation) + @op.start_call if run_start_call_first + e = @op.execute expect(e).to be_a(Enumerator) e end it_behaves_like 'server streaming' + + it 'should send metadata to the server ok when start_call is run first' do + server_port = create_test_server + host = "localhost:#{server_port}" + th = run_server_streamer(@sent_msg, @replys, @fail, + k1: 'v1', k2: 'v2') + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + e = get_responses(stub, run_start_call_first: true) + expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) + th.join + end end end describe '#bidi_streamer' do - shared_examples 'bidi streaming' do - before(:each) do - @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } - @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } - server_port = create_test_server - @host = "localhost:#{server_port}" - end + before(:each) do + @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } + @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } + server_port = create_test_server + @host = "localhost:#{server_port}" + end + shared_examples 'bidi streaming' do it 'supports sending all the requests first', bidi: true do th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, @pass) @@ -363,16 +438,29 @@ describe 'ClientStub' do end describe 'via a call operation' do - def get_responses(stub) - op = stub.bidi_streamer(@method, @sent_msgs, noop, noop, - return_op: true) - expect(op).to be_a(GRPC::ActiveCall::Operation) - e = op.execute + after(:each) do + @op.wait # make sure wait doesn't hang + end + def get_responses(stub, run_start_call_first: false) + @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop, + return_op: true) + expect(@op).to be_a(GRPC::ActiveCall::Operation) + @op.start_call if run_start_call_first + e = @op.execute expect(e).to be_a(Enumerator) e end it_behaves_like 'bidi streaming' + + it 'can run start_call before executing the call' do + th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, + @pass) + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + e = get_responses(stub, run_start_call_first: true) + expect(e.collect { |r| r }).to eq(@replys) + th.join + end end end @@ -441,6 +529,15 @@ describe 'ClientStub' do end end + def create_secure_test_server + certs = load_test_certs + secure_credentials = GRPC::Core::ServerCredentials.new( + nil, [{ private_key: certs[1], cert_chain: certs[2] }], false) + + @server = GRPC::Core::Server.new(nil) + @server.add_http2_port('0.0.0.0:0', secure_credentials) + end + def create_test_server @server = GRPC::Core::Server.new(nil) @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index 1a895005bc..1ace7211e9 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -48,7 +48,6 @@ describe GRPC::RpcDesc do @bidi_streamer = RpcDesc.new('ss', Stream.new(Object.new), Stream.new(Object.new), 'encode', 'decode') @bs_code = INTERNAL - @no_reason = 'no reason given' @ok_response = Object.new end @@ -62,8 +61,9 @@ describe GRPC::RpcDesc do it 'sends status UNKNOWN if other StandardErrors are raised' do expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason, - false, metadata: {}) + expect(@call).to receive(:send_status).once.with(UNKNOWN, + arg_error_msg, + false, metadata: {}) this_desc.run_server_method(@call, method(:other_error)) end @@ -83,6 +83,7 @@ describe GRPC::RpcDesc do before(:each) do @call = double('active_call') allow(@call).to receive(:single_req_view).and_return(@call) + allow(@call).to receive(:output_metadata).and_return(@call) end it_behaves_like 'it handles errors' @@ -90,10 +91,10 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do req = Object.new expect(@call).to receive(:remote_read).once.and_return(req) - expect(@call).to receive(:remote_send).once.with(@ok_response) - expect(@call).to receive(:output_metadata).and_return(fake_md) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true, - metadata: fake_md) + expect(@call).to receive(:output_metadata).once.and_return(fake_md) + expect(@call).to receive(:server_unary_response).once + .with(@ok_response, trailing_metadata: fake_md) + this_desc.run_server_method(@call, method(:fake_reqresp)) end end @@ -111,13 +112,15 @@ describe GRPC::RpcDesc do end it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason, + expect(@call).to receive(:send_status).once.with(UNKNOWN, arg_error_msg, false, metadata: {}) @client_streamer.run_server_method(@call, method(:other_error_alt)) end it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_send).once.and_raise(CallError) + expect(@call).to receive(:server_unary_response).once.and_raise( + CallError) + allow(@call).to receive(:output_metadata).and_return({}) blk = proc do @client_streamer.run_server_method(@call, method(:fake_clstream)) end @@ -125,10 +128,11 @@ describe GRPC::RpcDesc do end it 'sends a response and closes the stream if there no errors' do - expect(@call).to receive(:remote_send).once.with(@ok_response) - expect(@call).to receive(:output_metadata).and_return(fake_md) - expect(@call).to receive(:send_status).once.with(OK, 'OK', true, - metadata: fake_md) + expect(@call).to receive(:output_metadata).and_return( + fake_md) + expect(@call).to receive(:server_unary_response).once + .with(@ok_response, trailing_metadata: fake_md) + @client_streamer.run_server_method(@call, method(:fake_clstream)) end end @@ -170,8 +174,9 @@ describe GRPC::RpcDesc do end it 'sends status UNKNOWN if other StandardErrors are raised' do + error_msg = arg_error_msg(StandardError.new) expect(@call).to receive(:run_server_bidi).and_raise(StandardError) - expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason, + expect(@call).to receive(:send_status).once.with(UNKNOWN, error_msg, false, metadata: {}) @bidi_streamer.run_server_method(@call, method(:other_error_alt)) end @@ -338,4 +343,9 @@ describe GRPC::RpcDesc do def other_error_alt(_call) fail(ArgumentError, 'other error') end + + def arg_error_msg(error = nil) + error ||= ArgumentError.new('other error') + "#{error.class}: #{error.message}" + end end diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb new file mode 100644 index 0000000000..48ccaee510 --- /dev/null +++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb @@ -0,0 +1,144 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require 'grpc' + +Thread.abort_on_exception = true + +describe GRPC::Pool do + Pool = GRPC::Pool + + describe '#new' do + it 'raises if a non-positive size is used' do + expect { Pool.new(0) }.to raise_error + expect { Pool.new(-1) }.to raise_error + expect { Pool.new(Object.new) }.to raise_error + end + + it 'is constructed OK with a positive size' do + expect { Pool.new(1) }.not_to raise_error + end + end + + describe '#ready_for_work?' do + it 'before start it is not ready' do + p = Pool.new(1) + expect(p.ready_for_work?).to be(false) + end + + it 'it stops being ready after all workers jobs waiting or running' do + p = Pool.new(5) + p.start + job = proc { sleep(3) } # sleep so workers busy when done scheduling + 5.times do + expect(p.ready_for_work?).to be(true) + p.schedule(&job) + end + expect(p.ready_for_work?).to be(false) + end + + it 'it becomes ready again after jobs complete' do + p = Pool.new(5) + p.start + job = proc {} + 5.times do + expect(p.ready_for_work?).to be(true) + p.schedule(&job) + end + expect(p.ready_for_work?).to be(false) + sleep 5 # give the pool time do get at least one task done + expect(p.ready_for_work?).to be(true) + end + end + + describe '#schedule' do + it 'return if the pool is already stopped' do + p = Pool.new(1) + p.stop + job = proc {} + expect { p.schedule(&job) }.to_not raise_error + end + + it 'adds jobs that get run by the pool' do + p = Pool.new(1) + p.start + o, q = Object.new, Queue.new + job = proc { q.push(o) } + p.schedule(&job) + expect(q.pop).to be(o) + p.stop + end + + it 'it throws an error if all of the workers have tasks to do' do + p = Pool.new(5) + p.start + job = proc {} + 5.times do + expect(p.ready_for_work?).to be(true) + p.schedule(&job) + end + expect { p.schedule(&job) }.to raise_error + expect { p.schedule(&job) }.to raise_error + end + end + + describe '#stop' do + it 'works when there are no scheduled tasks' do + p = Pool.new(1) + expect { p.stop }.not_to raise_error + end + + it 'stops jobs when there are long running jobs' do + p = Pool.new(1) + p.start + o, q = Object.new, Queue.new + job = proc do + sleep(5) # long running + q.push(o) + end + p.schedule(&job) + sleep(1) # should ensure the long job gets scheduled + expect { p.stop }.not_to raise_error + end + end + + describe '#start' do + it 'runs jobs as they are scheduled' do + p = Pool.new(5) + o, q = Object.new, Queue.new + p.start + n = 5 # arbitrary + n.times do + p.schedule(o, &q.method(:push)) + expect(q.pop).to be(o) + end + p.stop + end + end +end diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index d362e48dee..806ea8ce9f 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -408,21 +408,21 @@ describe GRPC::RpcServer do req = EchoMsg.new n = 20 # arbitrary, use as many to ensure the server pool is exceeded threads = [] - bad_status_code = nil + one_failed_as_unavailable = false n.times do threads << Thread.new do stub = SlowStub.new(alt_host, :this_channel_is_insecure) begin stub.an_rpc(req) - rescue GRPC::BadStatus => e - bad_status_code = e.code + rescue GRPC::ResourceExhausted + one_failed_as_unavailable = true end end end threads.each(&:join) alt_srv.stop t.join - expect(bad_status_code).to be(StatusCodes::RESOURCE_EXHAUSTED) + expect(one_failed_as_unavailable).to be(true) end end @@ -462,6 +462,7 @@ describe GRPC::RpcServer do 'connect_k1' => 'connect_v1' } wanted_md.each do |key, value| + puts "key: #{key}" expect(op.metadata[key]).to eq(value) end @srv.stop diff --git a/src/ruby/spec/pb/health/checker_spec.rb b/src/ruby/spec/pb/health/checker_spec.rb index 1b2fa96827..719510001c 100644 --- a/src/ruby/spec/pb/health/checker_spec.rb +++ b/src/ruby/spec/pb/health/checker_spec.rb @@ -97,15 +97,17 @@ describe Grpc::Health::Checker do context 'initialization' do it 'can be constructed with no args' do - expect(subject).to_not be(nil) + checker = Grpc::Health::Checker.new + expect(checker).to_not be(nil) end end context 'method `add_status` and `check`' do success_tests.each do |t| it "should succeed when #{t[:desc]}" do - subject.add_status(t[:service], ServingStatus::NOT_SERVING) - got = subject.check(HCReq.new(service: t[:service]), nil) + checker = Grpc::Health::Checker.new + checker.add_status(t[:service], ServingStatus::NOT_SERVING) + got = checker.check(HCReq.new(service: t[:service]), nil) want = HCResp.new(status: ServingStatus::NOT_SERVING) expect(got).to eq(want) end @@ -115,11 +117,12 @@ describe Grpc::Health::Checker do context 'method `check`' do success_tests.each do |t| it "should fail with NOT_FOUND when #{t[:desc]}" do + checker = Grpc::Health::Checker.new blk = proc do - subject.check(HCReq.new(service: t[:service]), nil) + checker.check(HCReq.new(service: t[:service]), nil) end expected_msg = /#{StatusCodes::NOT_FOUND}/ - expect(&blk).to raise_error GRPC::BadStatus, expected_msg + expect(&blk).to raise_error GRPC::NotFound, expected_msg end end end @@ -127,38 +130,40 @@ describe Grpc::Health::Checker do context 'method `clear_status`' do success_tests.each do |t| it "should fail after clearing status when #{t[:desc]}" do - subject.add_status(t[:service], ServingStatus::NOT_SERVING) - got = subject.check(HCReq.new(service: t[:service]), nil) + checker = Grpc::Health::Checker.new + checker.add_status(t[:service], ServingStatus::NOT_SERVING) + got = checker.check(HCReq.new(service: t[:service]), nil) want = HCResp.new(status: ServingStatus::NOT_SERVING) expect(got).to eq(want) - subject.clear_status(t[:service]) + checker.clear_status(t[:service]) blk = proc do - subject.check(HCReq.new(service: t[:service]), nil) + checker.check(HCReq.new(service: t[:service]), nil) end expected_msg = /#{StatusCodes::NOT_FOUND}/ - expect(&blk).to raise_error GRPC::BadStatus, expected_msg + expect(&blk).to raise_error GRPC::NotFound, expected_msg end end end context 'method `clear_all`' do it 'should return NOT_FOUND after being invoked' do + checker = Grpc::Health::Checker.new success_tests.each do |t| - subject.add_status(t[:service], ServingStatus::NOT_SERVING) - got = subject.check(HCReq.new(service: t[:service]), nil) + checker.add_status(t[:service], ServingStatus::NOT_SERVING) + got = checker.check(HCReq.new(service: t[:service]), nil) want = HCResp.new(status: ServingStatus::NOT_SERVING) expect(got).to eq(want) end - subject.clear_all + checker.clear_all success_tests.each do |t| blk = proc do - subject.check(HCReq.new(service: t[:service]), nil) + checker.check(HCReq.new(service: t[:service]), nil) end expected_msg = /#{StatusCodes::NOT_FOUND}/ - expect(&blk).to raise_error GRPC::BadStatus, expected_msg + expect(&blk).to raise_error GRPC::NotFound, expected_msg end end end @@ -184,8 +189,10 @@ describe Grpc::Health::Checker do end it 'should receive the correct status', server: true do - @srv.handle(subject) - subject.add_status('', ServingStatus::NOT_SERVING) + Thread.abort_on_exception = true + checker = Grpc::Health::Checker.new + @srv.handle(checker) + checker.add_status('', ServingStatus::NOT_SERVING) t = Thread.new { @srv.run } @srv.wait_till_running @@ -198,7 +205,8 @@ describe Grpc::Health::Checker do end it 'should fail on unknown services', server: true do - @srv.handle(subject) + checker = Grpc::Health::Checker.new + @srv.handle(checker) t = Thread.new { @srv.run } @srv.wait_till_running blk = proc do @@ -206,7 +214,7 @@ describe Grpc::Health::Checker do stub.check(HCReq.new(service: 'unknown')) end expected_msg = /#{StatusCodes::NOT_FOUND}/ - expect(&blk).to raise_error GRPC::BadStatus, expected_msg + expect(&blk).to raise_error GRPC::NotFound, expected_msg @srv.stop t.join end diff --git a/src/ruby/spec/spec_helper.rb b/src/ruby/spec/spec_helper.rb index c891c1bf5e..c2be0afa72 100644 --- a/src/ruby/spec/spec_helper.rb +++ b/src/ruby/spec/spec_helper.rb @@ -67,3 +67,5 @@ RSpec.configure do |config| end RSpec::Expectations.configuration.warn_about_potential_false_positives = false + +Thread.abort_on_exception = true |