diff options
Diffstat (limited to 'src/ruby')
55 files changed, 2070 insertions, 2653 deletions
diff --git a/src/ruby/.rspec b/src/ruby/.rspec index 60a4aad5a2..dd579f7a13 100755 --- a/src/ruby/.rspec +++ b/src/ruby/.rspec @@ -1 +1,2 @@ -I. +--require spec_helper diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml index d5bb55e5a8..ed4a4438b3 100644 --- a/src/ruby/.rubocop_todo.yml +++ b/src/ruby/.rubocop_todo.yml @@ -1,42 +1,30 @@ # This configuration was generated by `rubocop --auto-gen-config` -# on 2015-01-16 02:30:04 -0800 using RuboCop version 0.28.0. +# on 2015-04-17 14:43:27 -0700 using RuboCop version 0.30.0. # The point is for the user to remove these configuration records # one by one as the offenses are removed from the code base. # Note that changes in the inspected code, or installation of new # versions of RuboCop, may require this file to be generated again. -# Offense count: 3 -# Lint/UselessAssignment: -# Enabled: false - -# Offense count: 33 +# Offense count: 30 Metrics/AbcSize: - Max: 39 + Max: 40 # Offense count: 3 # Configuration parameters: CountComments. Metrics/ClassLength: - Max: 231 - -# Offense count: 2 -Metrics/CyclomaticComplexity: - Max: 8 + Max: 184 -# Offense count: 36 +# Offense count: 35 # Configuration parameters: CountComments. Metrics/MethodLength: - Max: 37 + Max: 36 -# Offense count: 8 +# Offense count: 7 # Configuration parameters: CountKeywordArgs. Metrics/ParameterLists: Max: 8 -# Offense count: 2 -Metrics/PerceivedComplexity: - Max: 10 - -# Offense count: 7 +# Offense count: 9 # Configuration parameters: AllowedVariables. Style/GlobalVars: Enabled: false @@ -50,3 +38,7 @@ Style/Next: # Configuration parameters: Methods. Style/SingleLineBlockParams: Enabled: false + +# Offense count: 1 +Style/StructInheritance: + Enabled: false diff --git a/src/ruby/CHANGELOG.md b/src/ruby/CHANGELOG.md new file mode 100644 index 0000000000..8ec6e3cfdb --- /dev/null +++ b/src/ruby/CHANGELOG.md @@ -0,0 +1,11 @@ +## 0.6.1 (2015-04-14) + +### Changes + +* Begins this ChangeLog ([@tbetbetbe][]) +* Updates to version 0.4 of googleauth. ([@tbetbetbe][]) +* Switch the extension to use the call API. ([@tbetbetbe][]) +* Refactor the C extension to avoid identifiers used by ruby ([@yugui][]) + +[@tbetbetbe]: https://github.com/tbetbetbe +[@yugui]: https://github.com/yugui diff --git a/src/ruby/Rakefile b/src/ruby/Rakefile index afb354e922..02af9a84b8 100755 --- a/src/ruby/Rakefile +++ b/src/ruby/Rakefile @@ -26,6 +26,7 @@ namespace :suite do SPEC_SUITES.each do |suite| desc "Run all specs in the #{suite[:title]} spec suite" RSpec::Core::RakeTask.new(suite[:id]) do |t| + ENV['COVERAGE_NAME'] = suite[:id].to_s spec_files = [] suite[:files].each { |f| spec_files += Dir[f] } if suite[:files] diff --git a/src/ruby/bin/apis/pubsub_demo.rb b/src/ruby/bin/apis/pubsub_demo.rb index 9bb324ff64..6d69b0f21e 100755 --- a/src/ruby/bin/apis/pubsub_demo.rb +++ b/src/ruby/bin/apis/pubsub_demo.rb @@ -71,7 +71,7 @@ end # Builds the metadata authentication update proc. def auth_proc(opts) - auth_creds = Google::Auth.get_application_default(opts.oauth_scope) + auth_creds = Google::Auth.get_application_default return auth_creds.updater_proc end @@ -213,17 +213,14 @@ class NamedActions end # Args is used to hold the command line info. -Args = Struct.new(:host, :oauth_scope, :port, :action, :project_id, :topic_name, +Args = Struct.new(:host, :port, :action, :project_id, :topic_name, :sub_name) # validates the the command line options, returning them as an Arg. def parse_args args = Args.new('pubsub-staging.googleapis.com', - 'https://www.googleapis.com/auth/pubsub', 443, 'list_some_topics', 'stoked-keyword-656') OptionParser.new do |opts| - opts.on('--oauth_scope scope', - 'Scope for OAuth tokens') { |v| args['oauth_scope'] = v } opts.on('--server_host SERVER_HOST', 'server hostname') do |v| args.host = v end @@ -250,7 +247,7 @@ def parse_args end def _check_args(args) - %w(host port action oauth_scope).each do |a| + %w(host port action).each do |a| if args[a].nil? raise OptionParser::MissingArgument.new("please specify --#{a}") end diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index b2a8711c79..a388924722 100755 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -110,6 +110,11 @@ def create_stub(opts) end end + if opts.test_case == 'jwt_token_creds' # don't use a scope + auth_creds = Google::Auth.get_application_default + stub_opts[:update_metadata] = auth_creds.updater_proc + end + logger.info("... connecting securely to #{address}") Grpc::Testing::TestService::Stub.new(address, **stub_opts) else @@ -131,12 +136,14 @@ class PingPongPlayer include Grpc::Testing::PayloadType attr_accessor :assertions # required by Minitest::Assertions attr_accessor :queue + attr_accessor :canceller_op # reqs is the enumerator over the requests def initialize(msg_sizes) @queue = Queue.new @msg_sizes = msg_sizes @assertions = 0 # required by Minitest::Assertions + @canceller_op = nil # used to cancel after the first response end def each_item @@ -150,12 +157,15 @@ class PingPongPlayer response_parameters: [p_cls.new(size: resp_size)]) yield req resp = @queue.pop - assert_equal(:COMPRESSABLE, resp.payload.type, - 'payload type is wrong') + assert_equal(:COMPRESSABLE, resp.payload.type, 'payload type is wrong') assert_equal(resp_size, resp.payload.body.length, - 'payload body #{i} has the wrong length') + "payload body #{count} has the wrong length") p "OK: ping_pong #{count}" count += 1 + unless @canceller_op.nil? + canceller_op.cancel + break + end end end end @@ -201,6 +211,15 @@ class NamedTests p 'OK: service_account_creds' end + def jwt_token_creds + json_key = File.read(ENV[AUTH_ENV]) + wanted_email = MultiJson.load(json_key)['client_email'] + resp = perform_large_unary(fill_username: true) + assert_equal(wanted_email, resp.username, + 'service_account_creds: incorrect username') + p 'OK: jwt_token_creds' + end + def compute_engine_creds resp = perform_large_unary(fill_username: true, fill_oauth_scope: true) @@ -246,6 +265,27 @@ class NamedTests p 'OK: ping_pong' end + def cancel_after_begin + msg_sizes = [27_182, 8, 1828, 45_904] + reqs = msg_sizes.map do |x| + req = Payload.new(body: nulls(x)) + StreamingInputCallRequest.new(payload: req) + end + op = @stub.streaming_input_call(reqs, return_op: true) + op.cancel + assert_raises(GRPC::Cancelled) { op.execute } + p 'OK: cancel_after_begin' + end + + def cancel_after_first_response + msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]] + ppp = PingPongPlayer.new(msg_sizes) + op = @stub.full_duplex_call(ppp.each_item, return_op: true) + ppp.canceller_op = op # causes ppp to cancel after the 1st message + assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } } + p 'OK: cancel_after_first_response' + end + def all all_methods = NamedTests.instance_methods(false).map(&:to_s) all_methods.each do |m| diff --git a/src/ruby/bin/interop/interop_server.rb b/src/ruby/bin/interop/interop_server.rb index 0819ba9bbc..72570d92f3 100755 --- a/src/ruby/bin/interop/interop_server.rb +++ b/src/ruby/bin/interop/interop_server.rb @@ -185,7 +185,7 @@ def main logger.info("... running insecurely on #{host}") end s.handle(TestTarget) - s.run + s.run_till_terminated end main diff --git a/src/ruby/bin/math_server.rb b/src/ruby/bin/math_server.rb index 5cc7613489..1bfe253b85 100755 --- a/src/ruby/bin/math_server.rb +++ b/src/ruby/bin/math_server.rb @@ -183,7 +183,7 @@ def main end s.handle(Calculator) - s.run + s.run_till_terminated end main diff --git a/src/ruby/bin/noproto_server.rb b/src/ruby/bin/noproto_server.rb index 9979cb7ebb..f71daeadb3 100755 --- a/src/ruby/bin/noproto_server.rb +++ b/src/ruby/bin/noproto_server.rb @@ -105,7 +105,7 @@ def main end s.handle(NoProto) - s.run + s.run_till_terminated end main diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c index ff5a114de5..e3a5277f54 100644 --- a/src/ruby/ext/grpc/rb_byte_buffer.c +++ b/src/ruby/ext/grpc/rb_byte_buffer.c @@ -39,203 +39,29 @@ #include <grpc/support/slice.h> #include "rb_grpc.h" -/* grpc_rb_byte_buffer wraps a grpc_byte_buffer. It provides a peer ruby - * object, 'mark' to minimize copying when a byte_buffer is created from - * ruby. */ -typedef struct grpc_rb_byte_buffer { - /* Holder of ruby objects involved in constructing the status */ - VALUE mark; - /* The actual status */ - grpc_byte_buffer *wrapped; -} grpc_rb_byte_buffer; - -/* Destroys ByteBuffer instances. */ -static void grpc_rb_byte_buffer_free(void *p) { - grpc_rb_byte_buffer *bb = NULL; - if (p == NULL) { - return; - }; - bb = (grpc_rb_byte_buffer *)p; - - /* Deletes the wrapped object if the mark object is Qnil, which indicates - * that no other object is the actual owner. */ - if (bb->wrapped != NULL && bb->mark == Qnil) { - grpc_byte_buffer_destroy(bb->wrapped); - } - - xfree(p); -} - -/* Protects the mark object from GC */ -static void grpc_rb_byte_buffer_mark(void *p) { - grpc_rb_byte_buffer *bb = NULL; - if (p == NULL) { - return; - } - bb = (grpc_rb_byte_buffer *)p; - - /* If it's not already cleaned up, mark the mark object */ - if (bb->mark != Qnil && BUILTIN_TYPE(bb->mark) != T_NONE) { - rb_gc_mark(bb->mark); - } +grpc_byte_buffer* grpc_rb_s_to_byte_buffer(char *string, size_t length) { + gpr_slice slice = gpr_slice_from_copied_buffer(string, length); + grpc_byte_buffer *buffer = grpc_byte_buffer_create(&slice, 1); + gpr_slice_unref(slice); + return buffer; } -/* id_source is the name of the hidden ivar the preserves the original - * byte_buffer source string */ -static ID id_source; - -/* Allocates ByteBuffer instances. - - Provides safe default values for the byte_buffer fields. */ -static VALUE grpc_rb_byte_buffer_alloc(VALUE cls) { - grpc_rb_byte_buffer *wrapper = ALLOC(grpc_rb_byte_buffer); - wrapper->wrapped = NULL; - wrapper->mark = Qnil; - return Data_Wrap_Struct(cls, grpc_rb_byte_buffer_mark, - grpc_rb_byte_buffer_free, wrapper); -} - -/* Clones ByteBuffer instances. - - Gives ByteBuffer a consistent implementation of Ruby's object copy/dup - protocol. */ -static VALUE grpc_rb_byte_buffer_init_copy(VALUE copy, VALUE orig) { - grpc_rb_byte_buffer *orig_bb = NULL; - grpc_rb_byte_buffer *copy_bb = NULL; - - if (copy == orig) { - return copy; - } - - /* Raise an error if orig is not a metadata object or a subclass. */ - if (TYPE(orig) != T_DATA || - RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_byte_buffer_free) { - rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(rb_cByteBuffer)); - } - - Data_Get_Struct(orig, grpc_rb_byte_buffer, orig_bb); - Data_Get_Struct(copy, grpc_rb_byte_buffer, copy_bb); - - /* use ruby's MEMCPY to make a byte-for-byte copy of the metadata wrapper - * object. */ - MEMCPY(copy_bb, orig_bb, grpc_rb_byte_buffer, 1); - return copy; -} - -/* id_empty is used to return the empty string from to_s when necessary. */ -static ID id_empty; - -static VALUE grpc_rb_byte_buffer_to_s(VALUE self) { - grpc_rb_byte_buffer *wrapper = NULL; - grpc_byte_buffer *bb = NULL; - grpc_byte_buffer_reader *reader = NULL; - char *output = NULL; +VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) { size_t length = 0; + char *string = NULL; size_t offset = 0; - VALUE output_obj = Qnil; + grpc_byte_buffer_reader *reader = NULL; gpr_slice next; + if (buffer == NULL) { + return Qnil; - Data_Get_Struct(self, grpc_rb_byte_buffer, wrapper); - output_obj = rb_ivar_get(wrapper->mark, id_source); - if (output_obj != Qnil) { - /* From ruby, ByteBuffers are immutable so if a source is set, return that - * as the to_s value */ - return output_obj; - } - - /* Read the bytes. */ - bb = wrapper->wrapped; - if (bb == NULL) { - return rb_id2str(id_empty); - } - length = grpc_byte_buffer_length(bb); - if (length == 0) { - return rb_id2str(id_empty); } - reader = grpc_byte_buffer_reader_create(bb); - output = xmalloc(length); + length = grpc_byte_buffer_length(buffer); + string = xmalloc(length + 1); + reader = grpc_byte_buffer_reader_create(buffer); while (grpc_byte_buffer_reader_next(reader, &next) != 0) { - memcpy(output + offset, GPR_SLICE_START_PTR(next), GPR_SLICE_LENGTH(next)); + memcpy(string + offset, GPR_SLICE_START_PTR(next), GPR_SLICE_LENGTH(next)); offset += GPR_SLICE_LENGTH(next); } - output_obj = rb_str_new(output, length); - - /* Save a references to the computed string in the mark object so that the - * calling to_s does not do any allocations. */ - wrapper->mark = rb_class_new_instance(0, NULL, rb_cObject); - rb_ivar_set(wrapper->mark, id_source, output_obj); - - return output_obj; -} - -/* Initializes ByteBuffer instances. */ -static VALUE grpc_rb_byte_buffer_init(VALUE self, VALUE src) { - gpr_slice a_slice; - grpc_rb_byte_buffer *wrapper = NULL; - grpc_byte_buffer *byte_buffer = NULL; - - if (TYPE(src) != T_STRING) { - rb_raise(rb_eTypeError, "bad byte_buffer arg: got <%s>, want <String>", - rb_obj_classname(src)); - return Qnil; - } - Data_Get_Struct(self, grpc_rb_byte_buffer, wrapper); - a_slice = gpr_slice_malloc(RSTRING_LEN(src)); - memcpy(GPR_SLICE_START_PTR(a_slice), RSTRING_PTR(src), RSTRING_LEN(src)); - byte_buffer = grpc_byte_buffer_create(&a_slice, 1); - gpr_slice_unref(a_slice); - - if (byte_buffer == NULL) { - rb_raise(rb_eArgError, "could not create a byte_buffer, not sure why"); - return Qnil; - } - wrapper->wrapped = byte_buffer; - - /* Save a references to the original string in the mark object so that the - * pointers used there is valid for the lifetime of the object. */ - wrapper->mark = rb_class_new_instance(0, NULL, rb_cObject); - rb_ivar_set(wrapper->mark, id_source, src); - - return self; -} - -/* rb_cByteBuffer is the ruby class that proxies grpc_byte_buffer. */ -VALUE rb_cByteBuffer = Qnil; - -void Init_grpc_byte_buffer() { - rb_cByteBuffer = - rb_define_class_under(rb_mGrpcCore, "ByteBuffer", rb_cObject); - - /* Allocates an object managed by the ruby runtime */ - rb_define_alloc_func(rb_cByteBuffer, grpc_rb_byte_buffer_alloc); - - /* Provides a ruby constructor and support for dup/clone. */ - rb_define_method(rb_cByteBuffer, "initialize", grpc_rb_byte_buffer_init, 1); - rb_define_method(rb_cByteBuffer, "initialize_copy", - grpc_rb_byte_buffer_init_copy, 1); - - /* Provides a to_s method that returns the buffer value */ - rb_define_method(rb_cByteBuffer, "to_s", grpc_rb_byte_buffer_to_s, 0); - - id_source = rb_intern("__source"); - id_empty = rb_intern(""); -} - -VALUE grpc_rb_byte_buffer_create_with_mark(VALUE mark, grpc_byte_buffer *bb) { - grpc_rb_byte_buffer *byte_buffer = NULL; - if (bb == NULL) { - return Qnil; - } - byte_buffer = ALLOC(grpc_rb_byte_buffer); - byte_buffer->wrapped = bb; - byte_buffer->mark = mark; - return Data_Wrap_Struct(rb_cByteBuffer, grpc_rb_byte_buffer_mark, - grpc_rb_byte_buffer_free, byte_buffer); -} - -/* Gets the wrapped byte_buffer from the ruby wrapper */ -grpc_byte_buffer *grpc_rb_get_wrapped_byte_buffer(VALUE v) { - grpc_rb_byte_buffer *wrapper = NULL; - Data_Get_Struct(v, grpc_rb_byte_buffer, wrapper); - return wrapper->wrapped; + return rb_str_new(string, length); } diff --git a/src/ruby/ext/grpc/rb_byte_buffer.h b/src/ruby/ext/grpc/rb_byte_buffer.h index 6ef72f3e75..96b9009dae 100644 --- a/src/ruby/ext/grpc/rb_byte_buffer.h +++ b/src/ruby/ext/grpc/rb_byte_buffer.h @@ -37,18 +37,10 @@ #include <grpc/grpc.h> #include <ruby.h> -/* rb_cByteBuffer is the ByteBuffer class whose instances proxy - grpc_byte_buffer. */ -extern VALUE rb_cByteBuffer; +/* Converts a char* with a length to a grpc_byte_buffer */ +grpc_byte_buffer *grpc_rb_s_to_byte_buffer(char *string, size_t length); -/* Initializes the ByteBuffer class. */ -void Init_grpc_byte_buffer(); - -/* grpc_rb_byte_buffer_create_with_mark creates a grpc_rb_byte_buffer with a - * ruby mark object that will be kept alive while the byte_buffer is alive. */ -VALUE grpc_rb_byte_buffer_create_with_mark(VALUE mark, grpc_byte_buffer* bb); - -/* Gets the wrapped byte_buffer from its ruby object. */ -grpc_byte_buffer* grpc_rb_get_wrapped_byte_buffer(VALUE v); +/* Converts a grpc_byte_buffer to a ruby string */ +VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer); #endif /* GRPC_RB_BYTE_BUFFER_H_ */ diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index b5a256d5a6..e76bb930ee 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -36,11 +36,31 @@ #include <ruby.h> #include <grpc/grpc.h> +#include <grpc/support/alloc.h> + #include "rb_byte_buffer.h" #include "rb_completion_queue.h" -#include "rb_metadata.h" #include "rb_grpc.h" +/* grpc_rb_cCall is the Call class whose instances proxy grpc_call. */ +static VALUE grpc_rb_cCall; + +/* grpc_rb_eCallError is the ruby class of the exception thrown during call + operations; */ +VALUE grpc_rb_eCallError = Qnil; + +/* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate + a timeout. */ +static VALUE grpc_rb_eOutOfTime = Qnil; + +/* grpc_rb_sBatchResult is struct class used to hold the results of a batch + * call. */ +static VALUE grpc_rb_sBatchResult; + +/* grpc_rb_cMdAry is the MetadataArray class whose instances proxy + * grpc_metadata_array. */ +static VALUE grpc_rb_cMdAry; + /* id_cq is the name of the hidden ivar that preserves a reference to a * completion queue */ static ID id_cq; @@ -62,13 +82,22 @@ static ID id_metadata; * received by the call and subsequently saved on it. */ static ID id_status; +/* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */ +static VALUE sym_send_message; +static VALUE sym_send_metadata; +static VALUE sym_send_close; +static VALUE sym_send_status; +static VALUE sym_message; +static VALUE sym_status; +static VALUE sym_cancelled; + /* hash_all_calls is a hash of Call address -> reference count that is used to * track the creation and destruction of rb_call instances. */ static VALUE hash_all_calls; /* Destroys a Call. */ -void grpc_rb_call_destroy(void *p) { +static void grpc_rb_call_destroy(void *p) { grpc_call *call = NULL; VALUE ref_count = Qnil; if (p == NULL) { @@ -88,6 +117,38 @@ void grpc_rb_call_destroy(void *p) { } } +static size_t md_ary_datasize(const void *p) { + const grpc_metadata_array *const ary = (grpc_metadata_array *)p; + size_t i, datasize = sizeof(grpc_metadata_array); + for (i = 0; i < ary->count; ++i) { + const grpc_metadata *const md = &ary->metadata[i]; + datasize += strlen(md->key); + datasize += md->value_length; + } + datasize += ary->capacity * sizeof(grpc_metadata); + return datasize; +} + +static const rb_data_type_t grpc_rb_md_ary_data_type = { + "grpc_metadata_array", + {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, md_ary_datasize}, + NULL, + NULL, + 0}; + +/* Describes grpc_call struct for RTypedData */ +static const rb_data_type_t grpc_call_data_type = { + "grpc_call", + {GRPC_RB_GC_NOT_MARKED, grpc_rb_call_destroy, GRPC_RB_MEMSIZE_UNAVAILABLE}, + NULL, + NULL, + /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because + * grpc_rb_call_destroy + * touches a hash object. + * TODO(yugui) Directly use st_table and call the free function earlier? + */ + 0}; + /* Error code details is a hash containing text strings describing errors */ VALUE rb_error_code_details; @@ -101,93 +162,15 @@ const char *grpc_call_error_detail_of(grpc_call_error err) { return detail; } -/* grpc_rb_call_add_metadata_hash_cb is the hash iteration callback used by - grpc_rb_call_add_metadata. -*/ -int grpc_rb_call_add_metadata_hash_cb(VALUE key, VALUE val, VALUE call_obj) { - grpc_call *call = NULL; - grpc_metadata *md = NULL; - VALUE md_obj = Qnil; - VALUE md_obj_args[2]; - VALUE flags = rb_ivar_get(call_obj, id_flags); - grpc_call_error err; - int array_length; - int i; - - /* Construct a metadata object from key and value and add it */ - Data_Get_Struct(call_obj, grpc_call, call); - md_obj_args[0] = key; - - if (TYPE(val) == T_ARRAY) { - /* If the value is an array, add each value in the array separately */ - array_length = RARRAY_LEN(val); - for (i = 0; i < array_length; i++) { - md_obj_args[1] = rb_ary_entry(val, i); - md_obj = rb_class_new_instance(2, md_obj_args, rb_cMetadata); - md = grpc_rb_get_wrapped_metadata(md_obj); - err = grpc_call_add_metadata_old(call, md, NUM2UINT(flags)); - if (err != GRPC_CALL_OK) { - rb_raise(rb_eCallError, "add metadata failed: %s (code=%d)", - grpc_call_error_detail_of(err), err); - return ST_STOP; - } - } - } else { - md_obj_args[1] = val; - md_obj = rb_class_new_instance(2, md_obj_args, rb_cMetadata); - md = grpc_rb_get_wrapped_metadata(md_obj); - err = grpc_call_add_metadata_old(call, md, NUM2UINT(flags)); - if (err != GRPC_CALL_OK) { - rb_raise(rb_eCallError, "add metadata failed: %s (code=%d)", - grpc_call_error_detail_of(err), err); - return ST_STOP; - } - } - - return ST_CONTINUE; -} - -/* - call-seq: - call.add_metadata(completion_queue, hash_elements, flags=nil) - - Add metadata elements to the call from a ruby hash, to be sent upon - invocation. flags is a bit-field combination of the write flags defined - above. REQUIRES: grpc_call_invoke/grpc_call_accept have not been - called on this call. Produces no events. */ - -static VALUE grpc_rb_call_add_metadata(int argc, VALUE *argv, VALUE self) { - VALUE metadata; - VALUE flags = Qnil; - ID id_size = rb_intern("size"); - - /* "11" == 1 mandatory args, 1 (flags) is optional */ - rb_scan_args(argc, argv, "11", &metadata, &flags); - if (NIL_P(flags)) { - flags = UINT2NUM(0); /* Default to no flags */ - } - if (TYPE(metadata) != T_HASH) { - rb_raise(rb_eTypeError, "add metadata failed: metadata should be a hash"); - return Qnil; - } - if (NUM2UINT(rb_funcall(metadata, id_size, 0)) == 0) { - return Qnil; - } - rb_ivar_set(self, id_flags, flags); - rb_ivar_set(self, id_input_md, metadata); - rb_hash_foreach(metadata, grpc_rb_call_add_metadata_hash_cb, self); - return Qnil; -} - /* Called by clients to cancel an RPC on the server. Can be called multiple times, from any thread. */ static VALUE grpc_rb_call_cancel(VALUE self) { grpc_call *call = NULL; grpc_call_error err; - Data_Get_Struct(self, grpc_call, call); + TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); err = grpc_call_cancel(call); if (err != GRPC_CALL_OK) { - rb_raise(rb_eCallError, "cancel failed: %s (code=%d)", + rb_raise(grpc_rb_eCallError, "cancel failed: %s (code=%d)", grpc_call_error_detail_of(err), err); } @@ -196,77 +179,20 @@ static VALUE grpc_rb_call_cancel(VALUE self) { /* call-seq: - call.invoke(completion_queue, tag, flags=nil) - - Invoke the RPC. Starts sending metadata and request headers on the wire. - flags is a bit-field combination of the write flags defined above. - REQUIRES: Can be called at most once per call. - Can only be called on the client. - Produces a GRPC_INVOKE_ACCEPTED event on completion. */ -static VALUE grpc_rb_call_invoke(int argc, VALUE *argv, VALUE self) { - VALUE cqueue = Qnil; - VALUE metadata_read_tag = Qnil; - VALUE finished_tag = Qnil; - VALUE flags = Qnil; - grpc_call *call = NULL; - grpc_completion_queue *cq = NULL; - grpc_call_error err; + status = call.status - /* "31" == 3 mandatory args, 1 (flags) is optional */ - rb_scan_args(argc, argv, "31", &cqueue, &metadata_read_tag, &finished_tag, - &flags); - if (NIL_P(flags)) { - flags = UINT2NUM(0); /* Default to no flags */ - } - cq = grpc_rb_get_wrapped_completion_queue(cqueue); - Data_Get_Struct(self, grpc_call, call); - err = grpc_call_invoke_old(call, cq, ROBJECT(metadata_read_tag), - ROBJECT(finished_tag), NUM2UINT(flags)); - if (err != GRPC_CALL_OK) { - rb_raise(rb_eCallError, "invoke failed: %s (code=%d)", - grpc_call_error_detail_of(err), err); - } - - /* Add the completion queue as an instance attribute, prevents it from being - * GCed until this call object is GCed */ - rb_ivar_set(self, id_cq, cqueue); - - return Qnil; -} - -/* Initiate a read on a call. Output event contains a byte buffer with the - result of the read. - REQUIRES: No other reads are pending on the call. It is only safe to start - the next read after the corresponding read event is received. */ -static VALUE grpc_rb_call_start_read(VALUE self, VALUE tag) { - grpc_call *call = NULL; - grpc_call_error err; - Data_Get_Struct(self, grpc_call, call); - err = grpc_call_start_read_old(call, ROBJECT(tag)); - if (err != GRPC_CALL_OK) { - rb_raise(rb_eCallError, "start read failed: %s (code=%d)", - grpc_call_error_detail_of(err), err); - } - - return Qnil; -} - -/* - call-seq: - status = call.status - - Gets the status object saved the call. */ + Gets the status object saved the call. */ static VALUE grpc_rb_call_get_status(VALUE self) { return rb_ivar_get(self, id_status); } /* call-seq: - call.status = status + call.status = status - Saves a status object on the call. */ + Saves a status object on the call. */ static VALUE grpc_rb_call_set_status(VALUE self, VALUE status) { - if (!NIL_P(status) && rb_obj_class(status) != rb_sStatus) { + if (!NIL_P(status) && rb_obj_class(status) != grpc_rb_sStatus) { rb_raise(rb_eTypeError, "bad status: got:<%s> want: <Struct::Status>", rb_obj_classname(status)); return Qnil; @@ -277,18 +203,18 @@ static VALUE grpc_rb_call_set_status(VALUE self, VALUE status) { /* call-seq: - metadata = call.metadata + metadata = call.metadata - Gets the metadata object saved the call. */ + Gets the metadata object saved the call. */ static VALUE grpc_rb_call_get_metadata(VALUE self) { return rb_ivar_get(self, id_metadata); } /* call-seq: - call.metadata = metadata + call.metadata = metadata - Saves the metadata hash on the call. */ + Saves the metadata hash on the call. */ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) { if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) { rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>", @@ -299,176 +225,425 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) { return rb_ivar_set(self, id_metadata, metadata); } -/* - call-seq: - call.start_write(byte_buffer, tag, flags=nil) - - Queue a byte buffer for writing. - flags is a bit-field combination of the write flags defined above. - A write with byte_buffer null is allowed, and will not send any bytes on the - wire. If this is performed without GRPC_WRITE_BUFFER_HINT flag it provides - a mechanism to flush any previously buffered writes to outgoing flow control. - REQUIRES: No other writes are pending on the call. It is only safe to - start the next write after the corresponding write_accepted event - is received. - GRPC_INVOKE_ACCEPTED must have been received by the application - prior to calling this on the client. On the server, - grpc_call_accept must have been called successfully. - Produces a GRPC_WRITE_ACCEPTED event. */ -static VALUE grpc_rb_call_start_write(int argc, VALUE *argv, VALUE self) { - VALUE byte_buffer = Qnil; - VALUE tag = Qnil; - VALUE flags = Qnil; - grpc_call *call = NULL; - grpc_byte_buffer *bfr = NULL; - grpc_call_error err; +/* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used + to fill grpc_metadata_array. + + it's capacity should have been computed via a prior call to + grpc_rb_md_ary_fill_hash_cb +*/ +static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { + grpc_metadata_array *md_ary = NULL; + int array_length; + int i; + + /* Construct a metadata object from key and value and add it */ + TypedData_Get_Struct(md_ary_obj, grpc_metadata_array, + &grpc_rb_md_ary_data_type, md_ary); - /* "21" == 2 mandatory args, 1 (flags) is optional */ - rb_scan_args(argc, argv, "21", &byte_buffer, &tag, &flags); - if (NIL_P(flags)) { - flags = UINT2NUM(0); /* Default to no flags */ + if (TYPE(val) == T_ARRAY) { + /* If the value is an array, add capacity for each value in the array */ + array_length = RARRAY_LEN(val); + for (i = 0; i < array_length; i++) { + if (TYPE(key) == T_SYMBOL) { + md_ary->metadata[md_ary->count].key = (char *)rb_id2name(SYM2ID(key)); + } else { /* StringValueCStr does all other type exclusions for us */ + md_ary->metadata[md_ary->count].key = StringValueCStr(key); + } + md_ary->metadata[md_ary->count].value = RSTRING_PTR(rb_ary_entry(val, i)); + md_ary->metadata[md_ary->count].value_length = + RSTRING_LEN(rb_ary_entry(val, i)); + md_ary->count += 1; + } + } else { + if (TYPE(key) == T_SYMBOL) { + md_ary->metadata[md_ary->count].key = (char *)rb_id2name(SYM2ID(key)); + } else { /* StringValueCStr does all other type exclusions for us */ + md_ary->metadata[md_ary->count].key = StringValueCStr(key); + } + md_ary->metadata[md_ary->count].value = RSTRING_PTR(val); + md_ary->metadata[md_ary->count].value_length = RSTRING_LEN(val); + md_ary->count += 1; } - bfr = grpc_rb_get_wrapped_byte_buffer(byte_buffer); - Data_Get_Struct(self, grpc_call, call); - err = grpc_call_start_write_old(call, bfr, ROBJECT(tag), NUM2UINT(flags)); - if (err != GRPC_CALL_OK) { - rb_raise(rb_eCallError, "start write failed: %s (code=%d)", - grpc_call_error_detail_of(err), err); + + return ST_CONTINUE; +} + +/* grpc_rb_md_ary_capacity_hash_cb is the hash iteration callback used + to pre-compute the capacity a grpc_metadata_array. +*/ +static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val, + VALUE md_ary_obj) { + grpc_metadata_array *md_ary = NULL; + + /* Construct a metadata object from key and value and add it */ + TypedData_Get_Struct(md_ary_obj, grpc_metadata_array, + &grpc_rb_md_ary_data_type, md_ary); + + if (TYPE(val) == T_ARRAY) { + /* If the value is an array, add capacity for each value in the array */ + md_ary->capacity += RARRAY_LEN(val); + } else { + md_ary->capacity += 1; } + return ST_CONTINUE; +} - return Qnil; +/* grpc_rb_md_ary_convert converts a ruby metadata hash into + a grpc_metadata_array. +*/ +static void grpc_rb_md_ary_convert(VALUE md_ary_hash, + grpc_metadata_array *md_ary) { + VALUE md_ary_obj = Qnil; + if (md_ary_hash == Qnil) { + return; /* Do nothing if the expected has value is nil */ + } + if (TYPE(md_ary_hash) != T_HASH) { + rb_raise(rb_eTypeError, "md_ary_convert: got <%s>, want <Hash>", + rb_obj_classname(md_ary_hash)); + return; + } + + /* Initialize the array, compute it's capacity, then fill it. */ + grpc_metadata_array_init(md_ary); + md_ary_obj = + TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary); + rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj); + md_ary->metadata = gpr_malloc(md_ary->capacity * sizeof(grpc_metadata)); + rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj); } -/* Queue a status for writing. - - call-seq: - tag = Object.new - call.write_status(200, "OK", tag) - - REQUIRES: No other writes are pending on the call. It is only safe to - start the next write after the corresponding write_accepted event - is received. - GRPC_INVOKE_ACCEPTED must have been received by the application - prior to calling this. - Only callable on the server. - Produces a GRPC_FINISHED event when the status is sent and the stream is - fully closed */ -static VALUE grpc_rb_call_start_write_status(VALUE self, VALUE code, - VALUE status, VALUE tag) { - grpc_call *call = NULL; - grpc_call_error err; - Data_Get_Struct(self, grpc_call, call); - err = grpc_call_start_write_status_old(call, NUM2UINT(code), - StringValueCStr(status), ROBJECT(tag)); - if (err != GRPC_CALL_OK) { - rb_raise(rb_eCallError, "start write status: %s (code=%d)", - grpc_call_error_detail_of(err), err); +/* Converts a metadata array to a hash. */ +VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary) { + VALUE key = Qnil; + VALUE new_ary = Qnil; + VALUE value = Qnil; + VALUE result = rb_hash_new(); + size_t i; + + for (i = 0; i < md_ary->count; i++) { + key = rb_str_new2(md_ary->metadata[i].key); + value = rb_hash_aref(result, key); + if (value == Qnil) { + value = rb_str_new(md_ary->metadata[i].value, + md_ary->metadata[i].value_length); + rb_hash_aset(result, key, value); + } else if (TYPE(value) == T_ARRAY) { + /* Add the string to the returned array */ + rb_ary_push(value, rb_str_new(md_ary->metadata[i].value, + md_ary->metadata[i].value_length)); + } else { + /* Add the current value with this key and the new one to an array */ + new_ary = rb_ary_new(); + rb_ary_push(new_ary, value); + rb_ary_push(new_ary, rb_str_new(md_ary->metadata[i].value, + md_ary->metadata[i].value_length)); + rb_hash_aset(result, key, new_ary); + } } + return result; +} - return Qnil; +/* grpc_rb_call_check_op_keys_hash_cb is a hash iteration func that checks + each key of an ops hash is valid. +*/ +static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, + VALUE ops_ary) { + /* Update the capacity; the value is an array, add capacity for each value in + * the array */ + if (TYPE(key) != T_FIXNUM) { + rb_raise(rb_eTypeError, "invalid operation : got <%s>, want <Fixnum>", + rb_obj_classname(key)); + return ST_STOP; + } + switch (NUM2INT(key)) { + case GRPC_OP_SEND_INITIAL_METADATA: + case GRPC_OP_SEND_MESSAGE: + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + case GRPC_OP_SEND_STATUS_FROM_SERVER: + case GRPC_OP_RECV_INITIAL_METADATA: + case GRPC_OP_RECV_MESSAGE: + case GRPC_OP_RECV_STATUS_ON_CLIENT: + case GRPC_OP_RECV_CLOSE_ON_SERVER: + rb_ary_push(ops_ary, key); + return ST_CONTINUE; + default: + rb_raise(rb_eTypeError, "invalid operation : bad value %d", NUM2INT(key)); + }; + return ST_STOP; } -/* No more messages to send. - REQUIRES: No other writes are pending on the call. */ -static VALUE grpc_rb_call_writes_done(VALUE self, VALUE tag) { - grpc_call *call = NULL; - grpc_call_error err; - Data_Get_Struct(self, grpc_call, call); - err = grpc_call_writes_done_old(call, ROBJECT(tag)); - if (err != GRPC_CALL_OK) { - rb_raise(rb_eCallError, "writes done: %s (code=%d)", - grpc_call_error_detail_of(err), err); +/* grpc_rb_op_update_status_from_server adds the values in a ruby status + struct to the 'send_status_from_server' portion of an op. +*/ +static void grpc_rb_op_update_status_from_server(grpc_op *op, + grpc_metadata_array *md_ary, + VALUE status) { + VALUE code = rb_struct_aref(status, sym_code); + VALUE details = rb_struct_aref(status, sym_details); + VALUE metadata_hash = rb_struct_aref(status, sym_metadata); + + /* TODO: add check to ensure status is the correct struct type */ + if (TYPE(code) != T_FIXNUM) { + rb_raise(rb_eTypeError, "invalid code : got <%s>, want <Fixnum>", + rb_obj_classname(code)); + return; } + if (TYPE(details) != T_STRING) { + rb_raise(rb_eTypeError, "invalid details : got <%s>, want <String>", + rb_obj_classname(code)); + return; + } + op->data.send_status_from_server.status = NUM2INT(code); + op->data.send_status_from_server.status_details = StringValueCStr(details); + grpc_rb_md_ary_convert(metadata_hash, md_ary); + op->data.send_status_from_server.trailing_metadata_count = md_ary->count; + op->data.send_status_from_server.trailing_metadata = md_ary->metadata; +} - return Qnil; +/* run_batch_stack holds various values used by the + * grpc_rb_call_run_batch function */ +typedef struct run_batch_stack { + /* The batch ops */ + grpc_op ops[8]; /* 8 is the maximum number of operations */ + size_t op_num; /* tracks the last added operation */ + + /* Data being sent */ + grpc_metadata_array send_metadata; + grpc_metadata_array send_trailing_metadata; + + /* Data being received */ + grpc_byte_buffer *recv_message; + grpc_metadata_array recv_metadata; + grpc_metadata_array recv_trailing_metadata; + int recv_cancelled; + grpc_status_code recv_status; + char *recv_status_details; + size_t recv_status_details_capacity; +} run_batch_stack; + +/* grpc_run_batch_stack_init ensures the run_batch_stack is properly + * initialized */ +static void grpc_run_batch_stack_init(run_batch_stack *st) { + MEMZERO(st, run_batch_stack, 1); + grpc_metadata_array_init(&st->send_metadata); + grpc_metadata_array_init(&st->send_trailing_metadata); + grpc_metadata_array_init(&st->recv_metadata); + grpc_metadata_array_init(&st->recv_trailing_metadata); + st->op_num = 0; } -/* call-seq: - call.server_end_initial_metadata(flag) - - Only to be called on servers, before sending messages. - flags is a bit-field combination of the write flags defined above. - - REQUIRES: Can be called at most once per call. - Can only be called on the server, must be called after - grpc_call_server_accept - Produces no events */ -static VALUE grpc_rb_call_server_end_initial_metadata(int argc, VALUE *argv, - VALUE self) { - VALUE flags = Qnil; - grpc_call *call = NULL; - grpc_call_error err; +/* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly + * cleaned up */ +static void grpc_run_batch_stack_cleanup(run_batch_stack *st) { + grpc_metadata_array_destroy(&st->send_metadata); + grpc_metadata_array_destroy(&st->send_trailing_metadata); + grpc_metadata_array_destroy(&st->recv_metadata); + grpc_metadata_array_destroy(&st->recv_trailing_metadata); + if (st->recv_status_details != NULL) { + gpr_free(st->recv_status_details); + } +} - /* "01" == 1 (flags) is optional */ - rb_scan_args(argc, argv, "01", &flags); - if (NIL_P(flags)) { - flags = UINT2NUM(0); /* Default to no flags */ +/* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from + * ops_hash */ +static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { + VALUE this_op = Qnil; + VALUE this_value = Qnil; + VALUE ops_ary = rb_ary_new(); + size_t i = 0; + + /* Create a ruby array with just the operation keys */ + rb_hash_foreach(ops_hash, grpc_rb_call_check_op_keys_hash_cb, ops_ary); + + /* Fill the ops array */ + for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) { + this_op = rb_ary_entry(ops_ary, i); + this_value = rb_hash_aref(ops_hash, this_op); + switch (NUM2INT(this_op)) { + case GRPC_OP_SEND_INITIAL_METADATA: + /* N.B. later there is no need to explicitly delete the metadata keys + * and values, they are references to data in ruby objects. */ + grpc_rb_md_ary_convert(this_value, &st->send_metadata); + st->ops[st->op_num].data.send_initial_metadata.count = + st->send_metadata.count; + st->ops[st->op_num].data.send_initial_metadata.metadata = + st->send_metadata.metadata; + break; + case GRPC_OP_SEND_MESSAGE: + st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer( + RSTRING_PTR(this_value), RSTRING_LEN(this_value)); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + /* N.B. later there is no need to explicitly delete the metadata keys + * and values, they are references to data in ruby objects. */ + grpc_rb_op_update_status_from_server( + &st->ops[st->op_num], &st->send_trailing_metadata, this_value); + break; + case GRPC_OP_RECV_INITIAL_METADATA: + st->ops[st->op_num].data.recv_initial_metadata = &st->recv_metadata; + break; + case GRPC_OP_RECV_MESSAGE: + st->ops[st->op_num].data.recv_message = &st->recv_message; + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + st->ops[st->op_num].data.recv_status_on_client.trailing_metadata = + &st->recv_trailing_metadata; + st->ops[st->op_num].data.recv_status_on_client.status = + &st->recv_status; + st->ops[st->op_num].data.recv_status_on_client.status_details = + &st->recv_status_details; + st->ops[st->op_num].data.recv_status_on_client.status_details_capacity = + &st->recv_status_details_capacity; + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + st->ops[st->op_num].data.recv_close_on_server.cancelled = + &st->recv_cancelled; + break; + default: + grpc_run_batch_stack_cleanup(st); + rb_raise(rb_eTypeError, "invalid operation : bad value %d", + NUM2INT(this_op)); + }; + st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op); + st->op_num++; } - Data_Get_Struct(self, grpc_call, call); - err = grpc_call_server_end_initial_metadata_old(call, NUM2UINT(flags)); - if (err != GRPC_CALL_OK) { - rb_raise(rb_eCallError, "end_initial_metadata failed: %s (code=%d)", - grpc_call_error_detail_of(err), err); +} + +/* grpc_run_batch_stack_build_result fills constructs a ruby BatchResult struct + after the results have run */ +static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) { + size_t i = 0; + VALUE result = rb_struct_new(grpc_rb_sBatchResult, Qnil, Qnil, Qnil, Qnil, + Qnil, Qnil, Qnil, Qnil, NULL); + for (i = 0; i < st->op_num; i++) { + switch (st->ops[i].op) { + case GRPC_OP_SEND_INITIAL_METADATA: + rb_struct_aset(result, sym_send_metadata, Qtrue); + break; + case GRPC_OP_SEND_MESSAGE: + rb_struct_aset(result, sym_send_message, Qtrue); + grpc_byte_buffer_destroy(st->ops[i].data.send_message); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + rb_struct_aset(result, sym_send_close, Qtrue); + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + rb_struct_aset(result, sym_send_status, Qtrue); + break; + case GRPC_OP_RECV_INITIAL_METADATA: + rb_struct_aset(result, sym_metadata, + grpc_rb_md_ary_to_h(&st->recv_metadata)); + case GRPC_OP_RECV_MESSAGE: + rb_struct_aset(result, sym_message, + grpc_rb_byte_buffer_to_s(st->recv_message)); + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + rb_struct_aset( + result, sym_status, + rb_struct_new(grpc_rb_sStatus, UINT2NUM(st->recv_status), + (st->recv_status_details == NULL + ? Qnil + : rb_str_new2(st->recv_status_details)), + grpc_rb_md_ary_to_h(&st->recv_trailing_metadata), + NULL)); + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + rb_struct_aset(result, sym_send_close, Qtrue); + break; + default: + break; + } } - return Qnil; + return result; } /* call-seq: - call.server_accept(completion_queue, finished_tag) - - Accept an incoming RPC, binding a completion queue to it. - To be called before sending or receiving messages. - - REQUIRES: Can be called at most once per call. - Can only be called on the server. - Produces a GRPC_FINISHED event with finished_tag when the call has been - completed (there may be other events for the call pending at this - time) */ -static VALUE grpc_rb_call_server_accept(VALUE self, VALUE cqueue, - VALUE finished_tag) { + cq = CompletionQueue.new + ops = { + GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>, + GRPC::Core::CallOps::SEND_MESSAGE => <op_value>, + ... + } + tag = Object.new + timeout = 10 + call.start_batch(cqueue, tag, timeout, ops) + + Start a batch of operations defined in the array ops; when complete, post a + completion of type 'tag' to the completion queue bound to the call. + + Also waits for the batch to complete, until timeout is reached. + The order of ops specified in the batch has no significance. + Only one operation of each type can be active at once in any given + batch */ +static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, + VALUE timeout, VALUE ops_hash) { + run_batch_stack st; grpc_call *call = NULL; - grpc_completion_queue *cq = grpc_rb_get_wrapped_completion_queue(cqueue); + grpc_event *ev = NULL; grpc_call_error err; - Data_Get_Struct(self, grpc_call, call); - err = grpc_call_server_accept_old(call, cq, ROBJECT(finished_tag)); + VALUE result = Qnil; + TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); + + /* Validate the ops args, adding them to a ruby array */ + if (TYPE(ops_hash) != T_HASH) { + rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash"); + return Qnil; + } + grpc_run_batch_stack_init(&st); + grpc_run_batch_stack_fill_ops(&st, ops_hash); + + /* call grpc_call_start_batch, then wait for it to complete using + * pluck_event */ + err = grpc_call_start_batch(call, st.ops, st.op_num, ROBJECT(tag)); if (err != GRPC_CALL_OK) { - rb_raise(rb_eCallError, "server_accept failed: %s (code=%d)", + grpc_run_batch_stack_cleanup(&st); + rb_raise(grpc_rb_eCallError, + "grpc_call_start_batch failed with %s (code=%d)", grpc_call_error_detail_of(err), err); + return Qnil; + } + ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout); + if (ev == NULL) { + grpc_run_batch_stack_cleanup(&st); + rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); + return Qnil; + } + if (ev->data.op_complete != GRPC_OP_OK) { + grpc_run_batch_stack_cleanup(&st); + rb_raise(grpc_rb_eCallError, "start_batch completion failed, (code=%d)", + ev->data.op_complete); + return Qnil; } - /* Add the completion queue as an instance attribute, prevents it from being - * GCed until this call object is GCed */ - rb_ivar_set(self, id_cq, cqueue); - return Qnil; + /* Build and return the BatchResult struct result */ + result = grpc_run_batch_stack_build_result(&st); + grpc_run_batch_stack_cleanup(&st); + return result; } -/* rb_cCall is the ruby class that proxies grpc_call. */ -VALUE rb_cCall = Qnil; - -/* rb_eCallError is the ruby class of the exception thrown during call - operations; */ -VALUE rb_eCallError = Qnil; - -void Init_grpc_error_codes() { +static void Init_grpc_error_codes() { /* Constants representing the error codes of grpc_call_error in grpc.h */ - VALUE rb_RpcErrors = rb_define_module_under(rb_mGrpcCore, "RpcErrors"); - rb_define_const(rb_RpcErrors, "OK", UINT2NUM(GRPC_CALL_OK)); - rb_define_const(rb_RpcErrors, "ERROR", UINT2NUM(GRPC_CALL_ERROR)); - rb_define_const(rb_RpcErrors, "NOT_ON_SERVER", + VALUE grpc_rb_mRpcErrors = + rb_define_module_under(grpc_rb_mGrpcCore, "RpcErrors"); + rb_define_const(grpc_rb_mRpcErrors, "OK", UINT2NUM(GRPC_CALL_OK)); + rb_define_const(grpc_rb_mRpcErrors, "ERROR", UINT2NUM(GRPC_CALL_ERROR)); + rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_SERVER", UINT2NUM(GRPC_CALL_ERROR_NOT_ON_SERVER)); - rb_define_const(rb_RpcErrors, "NOT_ON_CLIENT", + rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_CLIENT", UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT)); - rb_define_const(rb_RpcErrors, "ALREADY_ACCEPTED", + rb_define_const(grpc_rb_mRpcErrors, "ALREADY_ACCEPTED", UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED)); - rb_define_const(rb_RpcErrors, "ALREADY_INVOKED", + rb_define_const(grpc_rb_mRpcErrors, "ALREADY_INVOKED", UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED)); - rb_define_const(rb_RpcErrors, "NOT_INVOKED", + rb_define_const(grpc_rb_mRpcErrors, "NOT_INVOKED", UINT2NUM(GRPC_CALL_ERROR_NOT_INVOKED)); - rb_define_const(rb_RpcErrors, "ALREADY_FINISHED", + rb_define_const(grpc_rb_mRpcErrors, "ALREADY_FINISHED", UINT2NUM(GRPC_CALL_ERROR_ALREADY_FINISHED)); - rb_define_const(rb_RpcErrors, "TOO_MANY_OPERATIONS", + rb_define_const(grpc_rb_mRpcErrors, "TOO_MANY_OPERATIONS", UINT2NUM(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS)); - rb_define_const(rb_RpcErrors, "INVALID_FLAGS", + rb_define_const(grpc_rb_mRpcErrors, "INVALID_FLAGS", UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS)); /* Add the detail strings to a Hash */ @@ -496,37 +671,54 @@ void Init_grpc_error_codes() { rb_str_new2("outstanding read or write present")); rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS), rb_str_new2("a bad flag was given")); - rb_define_const(rb_RpcErrors, "ErrorMessages", rb_error_code_details); + rb_define_const(grpc_rb_mRpcErrors, "ErrorMessages", rb_error_code_details); rb_obj_freeze(rb_error_code_details); } +static void Init_grpc_op_codes() { + /* Constants representing operation type codes in grpc.h */ + VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps"); + rb_define_const(grpc_rb_mCallOps, "SEND_INITIAL_METADATA", + UINT2NUM(GRPC_OP_SEND_INITIAL_METADATA)); + rb_define_const(grpc_rb_mCallOps, "SEND_MESSAGE", + UINT2NUM(GRPC_OP_SEND_MESSAGE)); + rb_define_const(grpc_rb_mCallOps, "SEND_CLOSE_FROM_CLIENT", + UINT2NUM(GRPC_OP_SEND_CLOSE_FROM_CLIENT)); + rb_define_const(grpc_rb_mCallOps, "SEND_STATUS_FROM_SERVER", + UINT2NUM(GRPC_OP_SEND_STATUS_FROM_SERVER)); + rb_define_const(grpc_rb_mCallOps, "RECV_INITIAL_METADATA", + UINT2NUM(GRPC_OP_RECV_INITIAL_METADATA)); + rb_define_const(grpc_rb_mCallOps, "RECV_MESSAGE", + UINT2NUM(GRPC_OP_RECV_MESSAGE)); + rb_define_const(grpc_rb_mCallOps, "RECV_STATUS_ON_CLIENT", + UINT2NUM(GRPC_OP_RECV_STATUS_ON_CLIENT)); + rb_define_const(grpc_rb_mCallOps, "RECV_CLOSE_ON_SERVER", + UINT2NUM(GRPC_OP_RECV_CLOSE_ON_SERVER)); +} + void Init_grpc_call() { /* CallError inherits from Exception to signal that it is non-recoverable */ - rb_eCallError = - rb_define_class_under(rb_mGrpcCore, "CallError", rb_eException); - rb_cCall = rb_define_class_under(rb_mGrpcCore, "Call", rb_cObject); + grpc_rb_eCallError = + rb_define_class_under(grpc_rb_mGrpcCore, "CallError", rb_eException); + grpc_rb_eOutOfTime = + rb_define_class_under(grpc_rb_mGrpcCore, "OutOfTime", rb_eException); + grpc_rb_cCall = rb_define_class_under(grpc_rb_mGrpcCore, "Call", rb_cObject); + grpc_rb_cMdAry = + rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray", rb_cObject); /* Prevent allocation or inialization of the Call class */ - rb_define_alloc_func(rb_cCall, grpc_rb_cannot_alloc); - rb_define_method(rb_cCall, "initialize", grpc_rb_cannot_init, 0); - rb_define_method(rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy, 1); + rb_define_alloc_func(grpc_rb_cCall, grpc_rb_cannot_alloc); + rb_define_method(grpc_rb_cCall, "initialize", grpc_rb_cannot_init, 0); + rb_define_method(grpc_rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy, + 1); /* Add ruby analogues of the Call methods. */ - rb_define_method(rb_cCall, "server_accept", grpc_rb_call_server_accept, 2); - rb_define_method(rb_cCall, "server_end_initial_metadata", - grpc_rb_call_server_end_initial_metadata, -1); - rb_define_method(rb_cCall, "add_metadata", grpc_rb_call_add_metadata, -1); - rb_define_method(rb_cCall, "cancel", grpc_rb_call_cancel, 0); - rb_define_method(rb_cCall, "invoke", grpc_rb_call_invoke, -1); - rb_define_method(rb_cCall, "start_read", grpc_rb_call_start_read, 1); - rb_define_method(rb_cCall, "start_write", grpc_rb_call_start_write, -1); - rb_define_method(rb_cCall, "start_write_status", - grpc_rb_call_start_write_status, 3); - rb_define_method(rb_cCall, "writes_done", grpc_rb_call_writes_done, 1); - rb_define_method(rb_cCall, "status", grpc_rb_call_get_status, 0); - rb_define_method(rb_cCall, "status=", grpc_rb_call_set_status, 1); - rb_define_method(rb_cCall, "metadata", grpc_rb_call_get_metadata, 0); - rb_define_method(rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1); + rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4); + rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0); + rb_define_method(grpc_rb_cCall, "status", grpc_rb_call_get_status, 0); + rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1); + rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0); + rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1); /* Ids used to support call attributes */ id_metadata = rb_intern("metadata"); @@ -537,18 +729,33 @@ void Init_grpc_call() { id_flags = rb_intern("__flags"); id_input_md = rb_intern("__input_md"); + /* Ids used in constructing the batch result. */ + sym_send_message = ID2SYM(rb_intern("send_message")); + sym_send_metadata = ID2SYM(rb_intern("send_metadata")); + sym_send_close = ID2SYM(rb_intern("send_close")); + sym_send_status = ID2SYM(rb_intern("send_status")); + sym_message = ID2SYM(rb_intern("message")); + sym_status = ID2SYM(rb_intern("status")); + sym_cancelled = ID2SYM(rb_intern("cancelled")); + + /* The Struct used to return the run_batch result. */ + grpc_rb_sBatchResult = rb_struct_define( + "BatchResult", "send_message", "send_metadata", "send_close", + "send_status", "message", "metadata", "status", "cancelled", NULL); + /* The hash for reference counting calls, to ensure they can't be destroyed * more than once */ hash_all_calls = rb_hash_new(); - rb_define_const(rb_cCall, "INTERNAL_ALL_CALLs", hash_all_calls); + rb_define_const(grpc_rb_cCall, "INTERNAL_ALL_CALLs", hash_all_calls); Init_grpc_error_codes(); + Init_grpc_op_codes(); } /* Gets the call from the ruby object */ grpc_call *grpc_rb_get_wrapped_call(VALUE v) { grpc_call *c = NULL; - Data_Get_Struct(v, grpc_call, c); + TypedData_Get_Struct(v, grpc_call, &grpc_call_data_type, c); return c; } @@ -565,5 +772,5 @@ VALUE grpc_rb_wrap_call(grpc_call *c) { rb_hash_aset(hash_all_calls, OFFT2NUM((VALUE)c), UINT2NUM(NUM2UINT(obj) + 1)); } - return Data_Wrap_Struct(rb_cCall, GC_NOT_MARKED, grpc_rb_call_destroy, c); + return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, c); } diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h index bb51759a46..003ce0429e 100644 --- a/src/ruby/ext/grpc/rb_call.h +++ b/src/ruby/ext/grpc/rb_call.h @@ -46,12 +46,12 @@ VALUE grpc_rb_wrap_call(grpc_call* c); /* Provides the details of an call error */ const char* grpc_call_error_detail_of(grpc_call_error err); -/* rb_cCall is the Call class whose instances proxy grpc_call. */ -extern VALUE rb_cCall; +/* Converts a metadata array to a hash. */ +VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary); -/* rb_cCallError is the ruby class of the exception thrown during call +/* grpc_rb_eCallError is the ruby class of the exception thrown during call operations. */ -extern VALUE rb_eCallError; +extern VALUE grpc_rb_eCallError; /* Initializes the Call class. */ void Init_grpc_call(); diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 2a48f46ce2..214675af92 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -49,12 +49,20 @@ static ID id_channel; /* id_target is the name of the hidden ivar that preserves a reference to the - * target string used to create the call, preserved so that is does not get + * target string used to create the call, preserved so that it does not get * GCed before the channel */ static ID id_target; +/* id_cqueue is the name of the hidden ivar that preserves a reference to the + * completion queue used to create the call, preserved so that it does not get + * GCed before the channel */ +static ID id_cqueue; + +/* grpc_rb_cChannel is the ruby class that proxies grpc_channel. */ +static VALUE grpc_rb_cChannel = Qnil; + /* Used during the conversion of a hash to channel args during channel setup */ -static VALUE rb_cChannelArgs; +static VALUE grpc_rb_cChannelArgs; /* grpc_rb_channel wraps a grpc_channel. It provides a peer ruby object, * 'mark' to minimize copying when a channel is created from ruby. */ @@ -97,13 +105,19 @@ static void grpc_rb_channel_mark(void *p) { } } +static rb_data_type_t grpc_channel_data_type = { + "grpc_channel", + {grpc_rb_channel_mark, grpc_rb_channel_free, GRPC_RB_MEMSIZE_UNAVAILABLE}, + NULL, NULL, + RUBY_TYPED_FREE_IMMEDIATELY +}; + /* Allocates grpc_rb_channel instances. */ static VALUE grpc_rb_channel_alloc(VALUE cls) { grpc_rb_channel *wrapper = ALLOC(grpc_rb_channel); wrapper->wrapped = NULL; wrapper->mark = Qnil; - return Data_Wrap_Struct(cls, grpc_rb_channel_mark, grpc_rb_channel_free, - wrapper); + return TypedData_Wrap_Struct(cls, &grpc_channel_data_type, wrapper); } /* @@ -127,7 +141,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { /* "21" == 2 mandatory args, 1 (credentials) is optional */ rb_scan_args(argc, argv, "21", &target, &channel_args, &credentials); - Data_Get_Struct(self, grpc_rb_channel, wrapper); + TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); target_chars = StringValueCStr(target); grpc_rb_hash_convert_to_channel_args(channel_args, &args); if (credentials == Qnil) { @@ -142,6 +156,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { if (ch == NULL) { rb_raise(rb_eRuntimeError, "could not create an rpc channel to target:%s", target_chars); + return Qnil; } rb_ivar_set(self, id_target, target); wrapper->wrapped = ch; @@ -163,11 +178,12 @@ static VALUE grpc_rb_channel_init_copy(VALUE copy, VALUE orig) { /* Raise an error if orig is not a channel object or a subclass. */ if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_channel_free) { - rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(rb_cChannel)); + rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(grpc_rb_cChannel)); + return Qnil; } - Data_Get_Struct(orig, grpc_rb_channel, orig_ch); - Data_Get_Struct(copy, grpc_rb_channel, copy_ch); + TypedData_Get_Struct(orig, grpc_rb_channel, &grpc_channel_data_type, orig_ch); + TypedData_Get_Struct(copy, grpc_rb_channel, &grpc_channel_data_type, copy_ch); /* use ruby's MEMCPY to make a byte-for-byte copy of the channel wrapper * object. */ @@ -177,34 +193,42 @@ static VALUE grpc_rb_channel_init_copy(VALUE copy, VALUE orig) { /* Create a call given a grpc_channel, in order to call method. The request is not sent until grpc_call_invoke is called. */ -static VALUE grpc_rb_channel_create_call(VALUE self, VALUE method, VALUE host, - VALUE deadline) { +static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method, + VALUE host, VALUE deadline) { VALUE res = Qnil; grpc_rb_channel *wrapper = NULL; - grpc_channel *ch = NULL; grpc_call *call = NULL; + grpc_channel *ch = NULL; + grpc_completion_queue *cq = NULL; char *method_chars = StringValueCStr(method); char *host_chars = StringValueCStr(host); - Data_Get_Struct(self, grpc_rb_channel, wrapper); + cq = grpc_rb_get_wrapped_completion_queue(cqueue); + TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; if (ch == NULL) { rb_raise(rb_eRuntimeError, "closed!"); + return Qnil; } call = - grpc_channel_create_call_old(ch, method_chars, host_chars, - grpc_rb_time_timeval(deadline, - /* absolute time */ 0)); + grpc_channel_create_call(ch, cq, method_chars, host_chars, + grpc_rb_time_timeval(deadline, + /* absolute time */ 0)); if (call == NULL) { rb_raise(rb_eRuntimeError, "cannot create call with method %s", method_chars); + return Qnil; } res = grpc_rb_wrap_call(call); - /* Make this channel an instance attribute of the call so that is is not GCed + /* Make this channel an instance attribute of the call so that it is not GCed * before the call. */ rb_ivar_set(res, id_channel, self); + + /* Make the completion queue an instance attribute of the call so that it is + * not GCed before the call. */ + rb_ivar_set(res, id_cqueue, cqueue); return res; } @@ -213,7 +237,7 @@ static VALUE grpc_rb_channel_destroy(VALUE self) { grpc_rb_channel *wrapper = NULL; grpc_channel *ch = NULL; - Data_Get_Struct(self, grpc_rb_channel, wrapper); + TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; if (ch != NULL) { grpc_channel_destroy(ch); @@ -224,41 +248,41 @@ static VALUE grpc_rb_channel_destroy(VALUE self) { return Qnil; } -/* rb_cChannel is the ruby class that proxies grpc_channel. */ -VALUE rb_cChannel = Qnil; - void Init_grpc_channel() { - rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject); - rb_cChannel = rb_define_class_under(rb_mGrpcCore, "Channel", rb_cObject); + grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject); + grpc_rb_cChannel = + rb_define_class_under(grpc_rb_mGrpcCore, "Channel", rb_cObject); /* Allocates an object managed by the ruby runtime */ - rb_define_alloc_func(rb_cChannel, grpc_rb_channel_alloc); + rb_define_alloc_func(grpc_rb_cChannel, grpc_rb_channel_alloc); /* Provides a ruby constructor and support for dup/clone. */ - rb_define_method(rb_cChannel, "initialize", grpc_rb_channel_init, -1); - rb_define_method(rb_cChannel, "initialize_copy", grpc_rb_channel_init_copy, - 1); + rb_define_method(grpc_rb_cChannel, "initialize", grpc_rb_channel_init, -1); + rb_define_method(grpc_rb_cChannel, "initialize_copy", + grpc_rb_channel_init_copy, 1); /* Add ruby analogues of the Channel methods. */ - rb_define_method(rb_cChannel, "create_call", grpc_rb_channel_create_call, 3); - rb_define_method(rb_cChannel, "destroy", grpc_rb_channel_destroy, 0); - rb_define_alias(rb_cChannel, "close", "destroy"); + rb_define_method(grpc_rb_cChannel, "create_call", + grpc_rb_channel_create_call, 4); + rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0); + rb_define_alias(grpc_rb_cChannel, "close", "destroy"); id_channel = rb_intern("__channel"); + id_cqueue = rb_intern("__cqueue"); id_target = rb_intern("__target"); - rb_define_const(rb_cChannel, "SSL_TARGET", + rb_define_const(grpc_rb_cChannel, "SSL_TARGET", ID2SYM(rb_intern(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG))); - rb_define_const(rb_cChannel, "ENABLE_CENSUS", + rb_define_const(grpc_rb_cChannel, "ENABLE_CENSUS", ID2SYM(rb_intern(GRPC_ARG_ENABLE_CENSUS))); - rb_define_const(rb_cChannel, "MAX_CONCURRENT_STREAMS", + rb_define_const(grpc_rb_cChannel, "MAX_CONCURRENT_STREAMS", ID2SYM(rb_intern(GRPC_ARG_MAX_CONCURRENT_STREAMS))); - rb_define_const(rb_cChannel, "MAX_MESSAGE_LENGTH", + rb_define_const(grpc_rb_cChannel, "MAX_MESSAGE_LENGTH", ID2SYM(rb_intern(GRPC_ARG_MAX_MESSAGE_LENGTH))); } /* Gets the wrapped channel from the ruby wrapper */ grpc_channel *grpc_rb_get_wrapped_channel(VALUE v) { grpc_rb_channel *wrapper = NULL; - Data_Get_Struct(v, grpc_rb_channel, wrapper); + TypedData_Get_Struct(v, grpc_rb_channel, &grpc_channel_data_type, wrapper); return wrapper->wrapped; } diff --git a/src/ruby/ext/grpc/rb_channel.h b/src/ruby/ext/grpc/rb_channel.h index a582869cda..6e3c087689 100644 --- a/src/ruby/ext/grpc/rb_channel.h +++ b/src/ruby/ext/grpc/rb_channel.h @@ -37,9 +37,6 @@ #include <ruby.h> #include <grpc/grpc.h> -/* rb_cChannel is the Channel class whose instances proxy grpc_channel. */ -extern VALUE rb_cChannel; - /* Initializes the Channel class. */ void Init_grpc_channel(); diff --git a/src/ruby/ext/grpc/rb_channel_args.c b/src/ruby/ext/grpc/rb_channel_args.c index 532ee5e785..acd545f5d2 100644 --- a/src/ruby/ext/grpc/rb_channel_args.c +++ b/src/ruby/ext/grpc/rb_channel_args.c @@ -38,6 +38,13 @@ #include "rb_grpc.h" +static rb_data_type_t grpc_rb_channel_args_data_type = { + "grpc_channel_args", + {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, GRPC_RB_MEMSIZE_UNAVAILABLE}, + NULL, NULL, + RUBY_TYPED_FREE_IMMEDIATELY +}; + /* A callback the processes the hash key values in channel_args hash */ static int grpc_rb_channel_create_in_process_add_args_hash_cb(VALUE key, VALUE val, @@ -60,7 +67,8 @@ static int grpc_rb_channel_create_in_process_add_args_hash_cb(VALUE key, return ST_STOP; } - Data_Get_Struct(args_obj, grpc_channel_args, args); + TypedData_Get_Struct(args_obj, grpc_channel_args, + &grpc_rb_channel_args_data_type, args); if (args->num_args <= 0) { rb_raise(rb_eRuntimeError, "hash_cb bug: num_args is %lu for key:%s", args->num_args, StringValueCStr(key)); @@ -109,7 +117,7 @@ typedef struct channel_convert_params { static VALUE grpc_rb_hash_convert_to_channel_args0(VALUE as_value) { ID id_size = rb_intern("size"); - VALUE rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject); + VALUE grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject); channel_convert_params* params = (channel_convert_params*)as_value; size_t num_args = 0; @@ -126,8 +134,9 @@ static VALUE grpc_rb_hash_convert_to_channel_args0(VALUE as_value) { MEMZERO(params->dst->args, grpc_arg, num_args); rb_hash_foreach(params->src_hash, grpc_rb_channel_create_in_process_add_args_hash_cb, - Data_Wrap_Struct(rb_cChannelArgs, GC_NOT_MARKED, - GC_DONT_FREE, params->dst)); + TypedData_Wrap_Struct(grpc_rb_cChannelArgs, + &grpc_rb_channel_args_data_type, + params->dst)); /* reset num_args as grpc_rb_channel_create_in_process_add_args_hash_cb * decrements it during has processing */ params->dst->num_args = num_args; diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index 3fdbdd837a..3cf6c313ee 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -33,12 +33,16 @@ #include "rb_completion_queue.h" -#include <ruby.h> +#include <ruby/ruby.h> +#include <ruby/thread.h> #include <grpc/grpc.h> #include <grpc/support/time.h> #include "rb_grpc.h" -#include "rb_event.h" + +/* grpc_rb_cCompletionQueue is the ruby class that proxies + * grpc_completion_queue. */ +static VALUE grpc_rb_cCompletionQueue = Qnil; /* Used to allow grpc_completion_queue_next call to release the GIL */ typedef struct next_call_stack { @@ -49,14 +53,16 @@ typedef struct next_call_stack { } next_call_stack; /* Calls grpc_completion_queue_next without holding the ruby GIL */ -static void *grpc_rb_completion_queue_next_no_gil(next_call_stack *next_call) { +static void *grpc_rb_completion_queue_next_no_gil(void *param) { + next_call_stack *const next_call = (next_call_stack*)param; next_call->event = grpc_completion_queue_next(next_call->cq, next_call->timeout); return NULL; } /* Calls grpc_completion_queue_pluck without holding the ruby GIL */ -static void *grpc_rb_completion_queue_pluck_no_gil(next_call_stack *next_call) { +static void *grpc_rb_completion_queue_pluck_no_gil(void *param) { + next_call_stack *const next_call = (next_call_stack*)param; next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag, next_call->timeout); return NULL; @@ -113,21 +119,31 @@ static void grpc_rb_completion_queue_destroy(void *p) { grpc_completion_queue_destroy(cq); } +static rb_data_type_t grpc_rb_completion_queue_data_type = { + "grpc_completion_queue", + {GRPC_RB_GC_NOT_MARKED, grpc_rb_completion_queue_destroy, + GRPC_RB_MEMSIZE_UNAVAILABLE}, + NULL, NULL, + /* cannot immediately free because grpc_rb_completion_queue_shutdown_drain + * calls rb_thread_call_without_gvl. */ + 0 +}; + /* Allocates a completion queue. */ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) { grpc_completion_queue *cq = grpc_completion_queue_create(); if (cq == NULL) { rb_raise(rb_eArgError, "could not create a completion queue: not sure why"); } - return Data_Wrap_Struct(cls, GC_NOT_MARKED, grpc_rb_completion_queue_destroy, - cq); + return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq); } /* Blocks until the next event is available, and returns the event. */ static VALUE grpc_rb_completion_queue_next(VALUE self, VALUE timeout) { next_call_stack next_call; MEMZERO(&next_call, next_call_stack, 1); - Data_Get_Struct(self, grpc_completion_queue, next_call.cq); + TypedData_Get_Struct(self, grpc_completion_queue, + &grpc_rb_completion_queue_data_type, next_call.cq); next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0); next_call.event = NULL; rb_thread_call_without_gvl(grpc_rb_completion_queue_next_no_gil, @@ -140,46 +156,57 @@ static VALUE grpc_rb_completion_queue_next(VALUE self, VALUE timeout) { /* Blocks until the next event for given tag is available, and returns the * event. */ -static VALUE grpc_rb_completion_queue_pluck(VALUE self, VALUE tag, - VALUE timeout) { +VALUE grpc_rb_completion_queue_pluck(VALUE self, VALUE tag, + VALUE timeout) { + grpc_event *ev = grpc_rb_completion_queue_pluck_event(self, tag, timeout); + if (ev == NULL) { + return Qnil; + } + return grpc_rb_new_event(ev); +} + +/* Blocks until the next event for given tag is available, and returns the + * event. */ +grpc_event* grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, + VALUE timeout) { next_call_stack next_call; MEMZERO(&next_call, next_call_stack, 1); - Data_Get_Struct(self, grpc_completion_queue, next_call.cq); + TypedData_Get_Struct(self, grpc_completion_queue, + &grpc_rb_completion_queue_data_type, next_call.cq); next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0); next_call.tag = ROBJECT(tag); next_call.event = NULL; rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil, (void *)&next_call, NULL, NULL); if (next_call.event == NULL) { - return Qnil; + return NULL; } - return grpc_rb_new_event(next_call.event); + return next_call.event; } -/* rb_cCompletionQueue is the ruby class that proxies grpc_completion_queue. */ -VALUE rb_cCompletionQueue = Qnil; - void Init_grpc_completion_queue() { - rb_cCompletionQueue = - rb_define_class_under(rb_mGrpcCore, "CompletionQueue", rb_cObject); + grpc_rb_cCompletionQueue = + rb_define_class_under(grpc_rb_mGrpcCore, "CompletionQueue", rb_cObject); /* constructor: uses an alloc func without an initializer. Using a simple alloc func works here as the grpc header does not specify any args for this func, so no separate initialization step is necessary. */ - rb_define_alloc_func(rb_cCompletionQueue, grpc_rb_completion_queue_alloc); + rb_define_alloc_func(grpc_rb_cCompletionQueue, + grpc_rb_completion_queue_alloc); /* Add the next method that waits for the next event. */ - rb_define_method(rb_cCompletionQueue, "next", grpc_rb_completion_queue_next, - 1); + rb_define_method(grpc_rb_cCompletionQueue, "next", + grpc_rb_completion_queue_next, 1); /* Add the pluck method that waits for the next event of given tag */ - rb_define_method(rb_cCompletionQueue, "pluck", grpc_rb_completion_queue_pluck, - 2); + rb_define_method(grpc_rb_cCompletionQueue, "pluck", + grpc_rb_completion_queue_pluck, 2); } /* Gets the wrapped completion queue from the ruby wrapper */ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v) { grpc_completion_queue *cq = NULL; - Data_Get_Struct(v, grpc_completion_queue, cq); + TypedData_Get_Struct(v, grpc_completion_queue, + &grpc_rb_completion_queue_data_type, cq); return cq; } diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h index 38025ea2d2..4d0f49ac47 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.h +++ b/src/ruby/ext/grpc/rb_completion_queue.h @@ -40,9 +40,13 @@ /* Gets the wrapped completion queue from the ruby wrapper */ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v); -/* rb_cCompletionQueue is the CompletionQueue class whose instances proxy - grpc_completion_queue. */ -extern VALUE rb_cCompletionQueue; +/** + * Makes the implementation of CompletionQueue#pluck available in other files + * + * This avoids having code that holds the GIL repeated at multiple sites. + */ +grpc_event* grpc_rb_completion_queue_pluck_event(VALUE cqueue, VALUE tag, + VALUE timeout); /* Initializes the CompletionQueue class. */ void Init_grpc_completion_queue(); diff --git a/src/ruby/ext/grpc/rb_credentials.c b/src/ruby/ext/grpc/rb_credentials.c index 4ee5d6b51c..1ec88914e4 100644 --- a/src/ruby/ext/grpc/rb_credentials.c +++ b/src/ruby/ext/grpc/rb_credentials.c @@ -40,6 +40,9 @@ #include "rb_grpc.h" +/* grpc_rb_cCredentials is the ruby class that proxies grpc_credentials. */ +static VALUE grpc_rb_cCredentials = Qnil; + /* grpc_rb_credentials wraps a grpc_credentials. It provides a * peer ruby object, 'mark' to minimize copying when a credential is * created from ruby. */ @@ -83,14 +86,21 @@ static void grpc_rb_credentials_mark(void *p) { } } +static rb_data_type_t grpc_rb_credentials_data_type = { + "grpc_credentials", + {grpc_rb_credentials_mark, grpc_rb_credentials_free, + GRPC_RB_MEMSIZE_UNAVAILABLE}, + NULL, + NULL, + RUBY_TYPED_FREE_IMMEDIATELY}; + /* Allocates Credential instances. Provides safe initial defaults for the instance fields. */ static VALUE grpc_rb_credentials_alloc(VALUE cls) { grpc_rb_credentials *wrapper = ALLOC(grpc_rb_credentials); wrapper->wrapped = NULL; wrapper->mark = Qnil; - return Data_Wrap_Struct(cls, grpc_rb_credentials_mark, - grpc_rb_credentials_free, wrapper); + return TypedData_Wrap_Struct(cls, &grpc_rb_credentials_data_type, wrapper); } /* Clones Credentials instances. @@ -107,11 +117,13 @@ static VALUE grpc_rb_credentials_init_copy(VALUE copy, VALUE orig) { /* Raise an error if orig is not a credentials object or a subclass. */ if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_credentials_free) { - rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(rb_cCredentials)); + rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(grpc_rb_cCredentials)); } - Data_Get_Struct(orig, grpc_rb_credentials, orig_cred); - Data_Get_Struct(copy, grpc_rb_credentials, copy_cred); + TypedData_Get_Struct(orig, grpc_rb_credentials, + &grpc_rb_credentials_data_type, orig_cred); + TypedData_Get_Struct(copy, grpc_rb_credentials, + &grpc_rb_credentials_data_type, copy_cred); /* use ruby's MEMCPY to make a byte-for-byte copy of the credentials * wrapper object. */ @@ -133,8 +145,7 @@ static VALUE grpc_rb_default_credentials_create(VALUE cls) { } wrapper->mark = Qnil; - return Data_Wrap_Struct(cls, grpc_rb_credentials_mark, - grpc_rb_credentials_free, wrapper); + return TypedData_Wrap_Struct(cls, &grpc_rb_credentials_data_type, wrapper); } /* @@ -151,8 +162,7 @@ static VALUE grpc_rb_compute_engine_credentials_create(VALUE cls) { } wrapper->mark = Qnil; - return Data_Wrap_Struct(cls, grpc_rb_credentials_mark, - grpc_rb_credentials_free, wrapper); + return TypedData_Wrap_Struct(cls, &grpc_rb_credentials_data_type, wrapper); } /* @@ -166,8 +176,10 @@ static VALUE grpc_rb_composite_credentials_create(VALUE self, VALUE other) { grpc_rb_credentials *other_wrapper = NULL; grpc_rb_credentials *wrapper = NULL; - Data_Get_Struct(self, grpc_rb_credentials, self_wrapper); - Data_Get_Struct(other, grpc_rb_credentials, other_wrapper); + TypedData_Get_Struct(self, grpc_rb_credentials, + &grpc_rb_credentials_data_type, self_wrapper); + TypedData_Get_Struct(other, grpc_rb_credentials, + &grpc_rb_credentials_data_type, other_wrapper); wrapper = ALLOC(grpc_rb_credentials); wrapper->wrapped = grpc_composite_credentials_create(self_wrapper->wrapped, other_wrapper->wrapped); @@ -178,8 +190,8 @@ static VALUE grpc_rb_composite_credentials_create(VALUE self, VALUE other) { } wrapper->mark = Qnil; - return Data_Wrap_Struct(rb_cCredentials, grpc_rb_credentials_mark, - grpc_rb_credentials_free, wrapper); + return TypedData_Wrap_Struct(grpc_rb_cCredentials, + &grpc_rb_credentials_data_type, wrapper); } /* The attribute used on the mark object to hold the pem_root_certs. */ @@ -214,7 +226,8 @@ static VALUE grpc_rb_credentials_init(int argc, VALUE *argv, VALUE self) { rb_scan_args(argc, argv, "12", &pem_root_certs, &pem_private_key, &pem_cert_chain); - Data_Get_Struct(self, grpc_rb_credentials, wrapper); + TypedData_Get_Struct(self, grpc_rb_credentials, + &grpc_rb_credentials_data_type, wrapper); if (pem_root_certs == Qnil) { rb_raise(rb_eRuntimeError, "could not create a credential: nil pem_root_certs"); @@ -225,8 +238,8 @@ static VALUE grpc_rb_credentials_init(int argc, VALUE *argv, VALUE self) { } else { key_cert_pair.private_key = RSTRING_PTR(pem_private_key); key_cert_pair.cert_chain = RSTRING_PTR(pem_cert_chain); - creds = grpc_ssl_credentials_create( - RSTRING_PTR(pem_root_certs), &key_cert_pair); + creds = grpc_ssl_credentials_create(RSTRING_PTR(pem_root_certs), + &key_cert_pair); } if (creds == NULL) { rb_raise(rb_eRuntimeError, "could not create a credentials, not sure why"); @@ -242,30 +255,28 @@ static VALUE grpc_rb_credentials_init(int argc, VALUE *argv, VALUE self) { return self; } -/* rb_cCredentials is the ruby class that proxies grpc_credentials. */ -VALUE rb_cCredentials = Qnil; - void Init_grpc_credentials() { - rb_cCredentials = - rb_define_class_under(rb_mGrpcCore, "Credentials", rb_cObject); + grpc_rb_cCredentials = + rb_define_class_under(grpc_rb_mGrpcCore, "Credentials", rb_cObject); /* Allocates an object managed by the ruby runtime */ - rb_define_alloc_func(rb_cCredentials, grpc_rb_credentials_alloc); + rb_define_alloc_func(grpc_rb_cCredentials, grpc_rb_credentials_alloc); /* Provides a ruby constructor and support for dup/clone. */ - rb_define_method(rb_cCredentials, "initialize", grpc_rb_credentials_init, -1); - rb_define_method(rb_cCredentials, "initialize_copy", + rb_define_method(grpc_rb_cCredentials, "initialize", grpc_rb_credentials_init, + -1); + rb_define_method(grpc_rb_cCredentials, "initialize_copy", grpc_rb_credentials_init_copy, 1); /* Provide static funcs that create new special instances. */ - rb_define_singleton_method(rb_cCredentials, "default", + rb_define_singleton_method(grpc_rb_cCredentials, "default", grpc_rb_default_credentials_create, 0); - rb_define_singleton_method(rb_cCredentials, "compute_engine", + rb_define_singleton_method(grpc_rb_cCredentials, "compute_engine", grpc_rb_compute_engine_credentials_create, 0); /* Provide other methods. */ - rb_define_method(rb_cCredentials, "compose", + rb_define_method(grpc_rb_cCredentials, "compose", grpc_rb_composite_credentials_create, 1); id_pem_cert_chain = rb_intern("__pem_cert_chain"); @@ -276,6 +287,7 @@ void Init_grpc_credentials() { /* Gets the wrapped grpc_credentials from the ruby wrapper */ grpc_credentials *grpc_rb_get_wrapped_credentials(VALUE v) { grpc_rb_credentials *wrapper = NULL; - Data_Get_Struct(v, grpc_rb_credentials, wrapper); + TypedData_Get_Struct(v, grpc_rb_credentials, &grpc_rb_credentials_data_type, + wrapper); return wrapper->wrapped; } diff --git a/src/ruby/ext/grpc/rb_credentials.h b/src/ruby/ext/grpc/rb_credentials.h index 3b24397173..e7c43c9c78 100644 --- a/src/ruby/ext/grpc/rb_credentials.h +++ b/src/ruby/ext/grpc/rb_credentials.h @@ -37,10 +37,6 @@ #include <ruby.h> #include <grpc/grpc_security.h> -/* rb_cCredentials is the ruby class whose instances proxy - grpc_credentials. */ -extern VALUE rb_cCredentials; - /* Initializes the ruby Credentials class. */ void Init_grpc_credentials(); diff --git a/src/ruby/ext/grpc/rb_event.c b/src/ruby/ext/grpc/rb_event.c deleted file mode 100644 index 2e64af4c84..0000000000 --- a/src/ruby/ext/grpc/rb_event.c +++ /dev/null @@ -1,361 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "rb_event.h" - -#include <ruby.h> - -#include <grpc/grpc.h> -#include "rb_grpc.h" -#include "rb_byte_buffer.h" -#include "rb_call.h" -#include "rb_metadata.h" - -/* grpc_rb_event wraps a grpc_event. It provides a peer ruby object, - * 'mark' to minimize copying when an event is created from ruby. */ -typedef struct grpc_rb_event { - /* Holder of ruby objects involved in constructing the channel */ - VALUE mark; - /* The actual event */ - grpc_event *wrapped; -} grpc_rb_event; - -/* rb_mCompletionType is a ruby module that holds the completion type values */ -VALUE rb_mCompletionType = Qnil; - -/* Destroys Event instances. */ -static void grpc_rb_event_free(void *p) { - grpc_rb_event *ev = NULL; - if (p == NULL) { - return; - }; - ev = (grpc_rb_event *)p; - - /* Deletes the wrapped object if the mark object is Qnil, which indicates - * that no other object is the actual owner. */ - if (ev->wrapped != NULL && ev->mark == Qnil) { - grpc_event_finish(ev->wrapped); - rb_warning("event gc: destroyed the c event"); - } else { - rb_warning("event gc: did not destroy the c event"); - } - - xfree(p); -} - -/* Protects the mark object from GC */ -static void grpc_rb_event_mark(void *p) { - grpc_rb_event *event = NULL; - if (p == NULL) { - return; - } - event = (grpc_rb_event *)p; - if (event->mark != Qnil) { - rb_gc_mark(event->mark); - } -} - -static VALUE grpc_rb_event_result(VALUE self); - -/* Obtains the type of an event. */ -static VALUE grpc_rb_event_type(VALUE self) { - grpc_event *event = NULL; - grpc_rb_event *wrapper = NULL; - Data_Get_Struct(self, grpc_rb_event, wrapper); - if (wrapper->wrapped == NULL) { - rb_raise(rb_eRuntimeError, "finished!"); - return Qnil; - } - - event = wrapper->wrapped; - switch (event->type) { - case GRPC_QUEUE_SHUTDOWN: - return rb_const_get(rb_mCompletionType, rb_intern("QUEUE_SHUTDOWN")); - - case GRPC_READ: - return rb_const_get(rb_mCompletionType, rb_intern("READ")); - - case GRPC_WRITE_ACCEPTED: - grpc_rb_event_result(self); /* validates the result */ - return rb_const_get(rb_mCompletionType, rb_intern("WRITE_ACCEPTED")); - - case GRPC_FINISH_ACCEPTED: - grpc_rb_event_result(self); /* validates the result */ - return rb_const_get(rb_mCompletionType, rb_intern("FINISH_ACCEPTED")); - - case GRPC_CLIENT_METADATA_READ: - return rb_const_get(rb_mCompletionType, - rb_intern("CLIENT_METADATA_READ")); - - case GRPC_FINISHED: - return rb_const_get(rb_mCompletionType, rb_intern("FINISHED")); - - case GRPC_SERVER_RPC_NEW: - return rb_const_get(rb_mCompletionType, rb_intern("SERVER_RPC_NEW")); - - default: - rb_raise(rb_eRuntimeError, "unrecognized event code for an rpc event:%d", - event->type); - } - return Qnil; /* should not be reached */ -} - -/* Obtains the tag associated with an event. */ -static VALUE grpc_rb_event_tag(VALUE self) { - grpc_event *event = NULL; - grpc_rb_event *wrapper = NULL; - Data_Get_Struct(self, grpc_rb_event, wrapper); - if (wrapper->wrapped == NULL) { - rb_raise(rb_eRuntimeError, "finished!"); - return Qnil; - } - - event = wrapper->wrapped; - if (event->tag == NULL) { - return Qnil; - } - return (VALUE)event->tag; -} - -/* Obtains the call associated with an event. */ -static VALUE grpc_rb_event_call(VALUE self) { - grpc_event *event = NULL; - grpc_rb_event *wrapper = NULL; - Data_Get_Struct(self, grpc_rb_event, wrapper); - if (wrapper->wrapped == NULL) { - rb_raise(rb_eRuntimeError, "finished!"); - return Qnil; - } - - event = wrapper->wrapped; - if (event->call != NULL) { - return grpc_rb_wrap_call(event->call); - } - return Qnil; -} - -/* Obtains the metadata associated with an event. */ -static VALUE grpc_rb_event_metadata(VALUE self) { - grpc_event *event = NULL; - grpc_rb_event *wrapper = NULL; - grpc_metadata *metadata = NULL; - VALUE key = Qnil; - VALUE new_ary = Qnil; - VALUE result = Qnil; - VALUE value = Qnil; - size_t count = 0; - size_t i = 0; - Data_Get_Struct(self, grpc_rb_event, wrapper); - if (wrapper->wrapped == NULL) { - rb_raise(rb_eRuntimeError, "finished!"); - return Qnil; - } - - /* Figure out which metadata to read. */ - event = wrapper->wrapped; - switch (event->type) { - case GRPC_CLIENT_METADATA_READ: - count = event->data.client_metadata_read.count; - metadata = event->data.client_metadata_read.elements; - break; - - case GRPC_FINISHED: - count = event->data.finished.metadata_count; - metadata = event->data.finished.metadata_elements; - break; - - case GRPC_SERVER_RPC_NEW: - count = event->data.server_rpc_new.metadata_count; - metadata = event->data.server_rpc_new.metadata_elements; - break; - - default: - rb_raise(rb_eRuntimeError, - "bug: bad event type metadata. got %d; want %d|%d:%d", - event->type, GRPC_CLIENT_METADATA_READ, GRPC_FINISHED, - GRPC_SERVER_RPC_NEW); - return Qnil; - } - - result = rb_hash_new(); - for (i = 0; i < count; i++) { - key = rb_str_new2(metadata[i].key); - value = rb_hash_aref(result, key); - if (value == Qnil) { - value = rb_str_new(metadata[i].value, metadata[i].value_length); - rb_hash_aset(result, key, value); - } else if (TYPE(value) == T_ARRAY) { - /* Add the string to the returned array */ - rb_ary_push(value, - rb_str_new(metadata[i].value, metadata[i].value_length)); - } else { - /* Add the current value with this key and the new one to an array */ - new_ary = rb_ary_new(); - rb_ary_push(new_ary, value); - rb_ary_push(new_ary, - rb_str_new(metadata[i].value, metadata[i].value_length)); - rb_hash_aset(result, key, new_ary); - } - } - return result; -} - -/* Obtains the data associated with an event. */ -static VALUE grpc_rb_event_result(VALUE self) { - grpc_event *event = NULL; - grpc_rb_event *wrapper = NULL; - Data_Get_Struct(self, grpc_rb_event, wrapper); - if (wrapper->wrapped == NULL) { - rb_raise(rb_eRuntimeError, "finished!"); - return Qnil; - } - event = wrapper->wrapped; - - switch (event->type) { - case GRPC_QUEUE_SHUTDOWN: - return Qnil; - - case GRPC_READ: - return grpc_rb_byte_buffer_create_with_mark(self, event->data.read); - - case GRPC_FINISH_ACCEPTED: - if (event->data.finish_accepted == GRPC_OP_OK) { - return Qnil; - } - rb_raise(rb_eEventError, "finish failed, not sure why (code=%d)", - event->data.finish_accepted); - break; - - case GRPC_WRITE_ACCEPTED: - if (event->data.write_accepted == GRPC_OP_OK) { - return Qnil; - } - rb_raise(rb_eEventError, "write failed, not sure why (code=%d)", - event->data.write_accepted); - break; - - case GRPC_CLIENT_METADATA_READ: - return grpc_rb_event_metadata(self); - - case GRPC_FINISHED: - return rb_struct_new(rb_sStatus, UINT2NUM(event->data.finished.status), - (event->data.finished.details == NULL - ? Qnil - : rb_str_new2(event->data.finished.details)), - grpc_rb_event_metadata(self), NULL); - break; - - case GRPC_SERVER_RPC_NEW: - return rb_struct_new( - rb_sNewServerRpc, rb_str_new2(event->data.server_rpc_new.method), - rb_str_new2(event->data.server_rpc_new.host), - Data_Wrap_Struct(rb_cTimeVal, GC_NOT_MARKED, GC_DONT_FREE, - (void *)&event->data.server_rpc_new.deadline), - grpc_rb_event_metadata(self), NULL); - - default: - rb_raise(rb_eRuntimeError, "unrecognized event code for an rpc event:%d", - event->type); - } - - return Qfalse; -} - -static VALUE grpc_rb_event_finish(VALUE self) { - grpc_event *event = NULL; - grpc_rb_event *wrapper = NULL; - Data_Get_Struct(self, grpc_rb_event, wrapper); - if (wrapper->wrapped == NULL) { /* already closed */ - return Qnil; - } - event = wrapper->wrapped; - grpc_event_finish(event); - wrapper->wrapped = NULL; - wrapper->mark = Qnil; - return Qnil; -} - -/* rb_cEvent is the Event class whose instances proxy grpc_event */ -VALUE rb_cEvent = Qnil; - -/* rb_eEventError is the ruby class of the exception thrown on failures during - rpc event processing. */ -VALUE rb_eEventError = Qnil; - -void Init_grpc_event() { - rb_eEventError = - rb_define_class_under(rb_mGrpcCore, "EventError", rb_eStandardError); - rb_cEvent = rb_define_class_under(rb_mGrpcCore, "Event", rb_cObject); - - /* Prevent allocation or inialization from ruby. */ - rb_define_alloc_func(rb_cEvent, grpc_rb_cannot_alloc); - rb_define_method(rb_cEvent, "initialize", grpc_rb_cannot_init, 0); - rb_define_method(rb_cEvent, "initialize_copy", grpc_rb_cannot_init_copy, 1); - - /* Accessors for the data available in an event. */ - rb_define_method(rb_cEvent, "call", grpc_rb_event_call, 0); - rb_define_method(rb_cEvent, "result", grpc_rb_event_result, 0); - rb_define_method(rb_cEvent, "tag", grpc_rb_event_tag, 0); - rb_define_method(rb_cEvent, "type", grpc_rb_event_type, 0); - rb_define_method(rb_cEvent, "finish", grpc_rb_event_finish, 0); - rb_define_alias(rb_cEvent, "close", "finish"); - - /* Constants representing the completion types */ - rb_mCompletionType = - rb_define_module_under(rb_mGrpcCore, "CompletionType"); - rb_define_const(rb_mCompletionType, "QUEUE_SHUTDOWN", - INT2NUM(GRPC_QUEUE_SHUTDOWN)); - rb_define_const(rb_mCompletionType, "OP_COMPLETE", INT2NUM(GRPC_OP_COMPLETE)); - rb_define_const(rb_mCompletionType, "READ", INT2NUM(GRPC_READ)); - rb_define_const(rb_mCompletionType, "WRITE_ACCEPTED", - INT2NUM(GRPC_WRITE_ACCEPTED)); - rb_define_const(rb_mCompletionType, "FINISH_ACCEPTED", - INT2NUM(GRPC_FINISH_ACCEPTED)); - rb_define_const(rb_mCompletionType, "CLIENT_METADATA_READ", - INT2NUM(GRPC_CLIENT_METADATA_READ)); - rb_define_const(rb_mCompletionType, "FINISHED", INT2NUM(GRPC_FINISHED)); - rb_define_const(rb_mCompletionType, "SERVER_RPC_NEW", - INT2NUM(GRPC_SERVER_RPC_NEW)); - rb_define_const(rb_mCompletionType, "SERVER_SHUTDOWN", - INT2NUM(GRPC_SERVER_SHUTDOWN)); - rb_define_const(rb_mCompletionType, "RESERVED", - INT2NUM(GRPC_COMPLETION_DO_NOT_USE)); -} - -VALUE grpc_rb_new_event(grpc_event *ev) { - grpc_rb_event *wrapper = ALLOC(grpc_rb_event); - wrapper->wrapped = ev; - wrapper->mark = Qnil; - return Data_Wrap_Struct(rb_cEvent, grpc_rb_event_mark, grpc_rb_event_free, - wrapper); -} diff --git a/src/ruby/ext/grpc/rb_event.h b/src/ruby/ext/grpc/rb_event.h deleted file mode 100644 index 3105934b11..0000000000 --- a/src/ruby/ext/grpc/rb_event.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_RB_EVENT_H_ -#define GRPC_RB_EVENT_H_ - -#include <ruby.h> -#include <grpc/grpc.h> - -/* rb_cEvent is the Event class whose instances proxy grpc_event. */ -extern VALUE rb_cEvent; - -/* rb_cEventError is the ruby class that acts the exception thrown during rpc - event processing. */ -extern VALUE rb_eEventError; - -/* Used to create new ruby event objects */ -VALUE grpc_rb_new_event(grpc_event *ev); - -/* Initializes the Event and EventError classes. */ -void Init_grpc_event(); - -#endif /* GRPC_RB_EVENT_H_ */ diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index 400efd0dfa..699548b940 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -34,26 +34,27 @@ #include "rb_grpc.h" #include <math.h> -#include <ruby.h> +#include <ruby/ruby.h> +#include <ruby/vm.h> #include <sys/time.h> #include <grpc/grpc.h> #include <grpc/support/time.h> -#include "rb_byte_buffer.h" #include "rb_call.h" #include "rb_channel.h" #include "rb_completion_queue.h" -#include "rb_event.h" -#include "rb_metadata.h" #include "rb_server.h" #include "rb_credentials.h" #include "rb_server_credentials.h" -/* Define common vars and funcs declared in rb.h */ -const RUBY_DATA_FUNC GC_NOT_MARKED = NULL; -const RUBY_DATA_FUNC GC_DONT_FREE = NULL; +static VALUE grpc_rb_cTimeVal = Qnil; -VALUE rb_cTimeVal = Qnil; +static rb_data_type_t grpc_rb_timespec_data_type = { + "gpr_timespec", + {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, GRPC_RB_MEMSIZE_UNAVAILABLE}, + NULL, + NULL, + RUBY_TYPED_FREE_IMMEDIATELY}; /* Alloc func that blocks allocation of a given object by raising an * exception. */ @@ -99,8 +100,9 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval) { switch (TYPE(time)) { case T_DATA: - if (CLASS_OF(time) == rb_cTimeVal) { - Data_Get_Struct(time, gpr_timespec, time_const); + if (CLASS_OF(time) == grpc_rb_cTimeVal) { + TypedData_Get_Struct(time, gpr_timespec, &grpc_rb_timespec_data_type, + time_const); t = *time_const; } else if (CLASS_OF(time) == rb_cTime) { t.tv_sec = NUM2INT(rb_funcall(time, id_tv_sec, 0)); @@ -153,37 +155,43 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval) { return t; } -void Init_grpc_status_codes() { +static void Init_grpc_status_codes() { /* Constants representing the status codes or grpc_status_code in status.h */ - VALUE rb_mStatusCodes = - rb_define_module_under(rb_mGrpcCore, "StatusCodes"); - rb_define_const(rb_mStatusCodes, "OK", INT2NUM(GRPC_STATUS_OK)); - rb_define_const(rb_mStatusCodes, "CANCELLED", INT2NUM(GRPC_STATUS_CANCELLED)); - rb_define_const(rb_mStatusCodes, "UNKNOWN", INT2NUM(GRPC_STATUS_UNKNOWN)); - rb_define_const(rb_mStatusCodes, "INVALID_ARGUMENT", + VALUE grpc_rb_mStatusCodes = + rb_define_module_under(grpc_rb_mGrpcCore, "StatusCodes"); + rb_define_const(grpc_rb_mStatusCodes, "OK", INT2NUM(GRPC_STATUS_OK)); + rb_define_const(grpc_rb_mStatusCodes, "CANCELLED", + INT2NUM(GRPC_STATUS_CANCELLED)); + rb_define_const(grpc_rb_mStatusCodes, "UNKNOWN", + INT2NUM(GRPC_STATUS_UNKNOWN)); + rb_define_const(grpc_rb_mStatusCodes, "INVALID_ARGUMENT", INT2NUM(GRPC_STATUS_INVALID_ARGUMENT)); - rb_define_const(rb_mStatusCodes, "DEADLINE_EXCEEDED", + rb_define_const(grpc_rb_mStatusCodes, "DEADLINE_EXCEEDED", INT2NUM(GRPC_STATUS_DEADLINE_EXCEEDED)); - rb_define_const(rb_mStatusCodes, "NOT_FOUND", INT2NUM(GRPC_STATUS_NOT_FOUND)); - rb_define_const(rb_mStatusCodes, "ALREADY_EXISTS", + rb_define_const(grpc_rb_mStatusCodes, "NOT_FOUND", + INT2NUM(GRPC_STATUS_NOT_FOUND)); + rb_define_const(grpc_rb_mStatusCodes, "ALREADY_EXISTS", INT2NUM(GRPC_STATUS_ALREADY_EXISTS)); - rb_define_const(rb_mStatusCodes, "PERMISSION_DENIED", + rb_define_const(grpc_rb_mStatusCodes, "PERMISSION_DENIED", INT2NUM(GRPC_STATUS_PERMISSION_DENIED)); - rb_define_const(rb_mStatusCodes, "UNAUTHENTICATED", + rb_define_const(grpc_rb_mStatusCodes, "UNAUTHENTICATED", INT2NUM(GRPC_STATUS_UNAUTHENTICATED)); - rb_define_const(rb_mStatusCodes, "RESOURCE_EXHAUSTED", + rb_define_const(grpc_rb_mStatusCodes, "RESOURCE_EXHAUSTED", INT2NUM(GRPC_STATUS_RESOURCE_EXHAUSTED)); - rb_define_const(rb_mStatusCodes, "FAILED_PRECONDITION", + rb_define_const(grpc_rb_mStatusCodes, "FAILED_PRECONDITION", INT2NUM(GRPC_STATUS_FAILED_PRECONDITION)); - rb_define_const(rb_mStatusCodes, "ABORTED", INT2NUM(GRPC_STATUS_ABORTED)); - rb_define_const(rb_mStatusCodes, "OUT_OF_RANGE", + rb_define_const(grpc_rb_mStatusCodes, "ABORTED", + INT2NUM(GRPC_STATUS_ABORTED)); + rb_define_const(grpc_rb_mStatusCodes, "OUT_OF_RANGE", INT2NUM(GRPC_STATUS_OUT_OF_RANGE)); - rb_define_const(rb_mStatusCodes, "UNIMPLEMENTED", + rb_define_const(grpc_rb_mStatusCodes, "UNIMPLEMENTED", INT2NUM(GRPC_STATUS_UNIMPLEMENTED)); - rb_define_const(rb_mStatusCodes, "INTERNAL", INT2NUM(GRPC_STATUS_INTERNAL)); - rb_define_const(rb_mStatusCodes, "UNAVAILABLE", + rb_define_const(grpc_rb_mStatusCodes, "INTERNAL", + INT2NUM(GRPC_STATUS_INTERNAL)); + rb_define_const(grpc_rb_mStatusCodes, "UNAVAILABLE", INT2NUM(GRPC_STATUS_UNAVAILABLE)); - rb_define_const(rb_mStatusCodes, "DATA_LOSS", INT2NUM(GRPC_STATUS_DATA_LOSS)); + rb_define_const(grpc_rb_mStatusCodes, "DATA_LOSS", + INT2NUM(GRPC_STATUS_DATA_LOSS)); } /* id_at is the constructor method of the ruby standard Time class. */ @@ -195,42 +203,46 @@ static ID id_inspect; /* id_to_s is the to_s method found on various ruby objects. */ static ID id_to_s; -/* Converts `a wrapped time constant to a standard time. */ -VALUE grpc_rb_time_val_to_time(VALUE self) { +/* Converts a wrapped time constant to a standard time. */ +static VALUE grpc_rb_time_val_to_time(VALUE self) { gpr_timespec *time_const = NULL; - Data_Get_Struct(self, gpr_timespec, time_const); + TypedData_Get_Struct(self, gpr_timespec, &grpc_rb_timespec_data_type, + time_const); return rb_funcall(rb_cTime, id_at, 2, INT2NUM(time_const->tv_sec), INT2NUM(time_const->tv_nsec)); } /* Invokes inspect on the ctime version of the time val. */ -VALUE grpc_rb_time_val_inspect(VALUE self) { +static VALUE grpc_rb_time_val_inspect(VALUE self) { return rb_funcall(grpc_rb_time_val_to_time(self), id_inspect, 0); } /* Invokes to_s on the ctime version of the time val. */ -VALUE grpc_rb_time_val_to_s(VALUE self) { +static VALUE grpc_rb_time_val_to_s(VALUE self) { return rb_funcall(grpc_rb_time_val_to_time(self), id_to_s, 0); } /* Adds a module with constants that map to gpr's static timeval structs. */ -void Init_grpc_time_consts() { - VALUE rb_mTimeConsts = - rb_define_module_under(rb_mGrpcCore, "TimeConsts"); - rb_cTimeVal = - rb_define_class_under(rb_mGrpcCore, "TimeSpec", rb_cObject); - rb_define_const(rb_mTimeConsts, "ZERO", - Data_Wrap_Struct(rb_cTimeVal, GC_NOT_MARKED, GC_DONT_FREE, - (void *)&gpr_time_0)); - rb_define_const(rb_mTimeConsts, "INFINITE_FUTURE", - Data_Wrap_Struct(rb_cTimeVal, GC_NOT_MARKED, GC_DONT_FREE, - (void *)&gpr_inf_future)); - rb_define_const(rb_mTimeConsts, "INFINITE_PAST", - Data_Wrap_Struct(rb_cTimeVal, GC_NOT_MARKED, GC_DONT_FREE, - (void *)&gpr_inf_past)); - rb_define_method(rb_cTimeVal, "to_time", grpc_rb_time_val_to_time, 0); - rb_define_method(rb_cTimeVal, "inspect", grpc_rb_time_val_inspect, 0); - rb_define_method(rb_cTimeVal, "to_s", grpc_rb_time_val_to_s, 0); +static void Init_grpc_time_consts() { + VALUE grpc_rb_mTimeConsts = + rb_define_module_under(grpc_rb_mGrpcCore, "TimeConsts"); + grpc_rb_cTimeVal = + rb_define_class_under(grpc_rb_mGrpcCore, "TimeSpec", rb_cObject); + rb_define_const( + grpc_rb_mTimeConsts, "ZERO", + TypedData_Wrap_Struct(grpc_rb_cTimeVal, &grpc_rb_timespec_data_type, + (void *)&gpr_time_0)); + rb_define_const( + grpc_rb_mTimeConsts, "INFINITE_FUTURE", + TypedData_Wrap_Struct(grpc_rb_cTimeVal, &grpc_rb_timespec_data_type, + (void *)&gpr_inf_future)); + rb_define_const( + grpc_rb_mTimeConsts, "INFINITE_PAST", + TypedData_Wrap_Struct(grpc_rb_cTimeVal, &grpc_rb_timespec_data_type, + (void *)&gpr_inf_past)); + rb_define_method(grpc_rb_cTimeVal, "to_time", grpc_rb_time_val_to_time, 0); + rb_define_method(grpc_rb_cTimeVal, "inspect", grpc_rb_time_val_inspect, 0); + rb_define_method(grpc_rb_cTimeVal, "to_s", grpc_rb_time_val_to_s, 0); id_at = rb_intern("at"); id_inspect = rb_intern("inspect"); id_to_s = rb_intern("to_s"); @@ -238,35 +250,42 @@ void Init_grpc_time_consts() { id_tv_nsec = rb_intern("tv_nsec"); } -void grpc_rb_shutdown(void *vm) { grpc_shutdown(); } +static void grpc_rb_shutdown(ruby_vm_t *vm) { grpc_shutdown(); } /* Initialize the GRPC module structs */ -/* rb_sNewServerRpc is the struct that holds new server rpc details. */ -VALUE rb_sNewServerRpc = Qnil; -/* rb_sStatus is the struct that holds status details. */ -VALUE rb_sStatus = Qnil; +/* grpc_rb_sNewServerRpc is the struct that holds new server rpc details. */ +VALUE grpc_rb_sNewServerRpc = Qnil; +/* grpc_rb_sStatus is the struct that holds status details. */ +VALUE grpc_rb_sStatus = Qnil; /* Initialize the GRPC module. */ -VALUE rb_mGRPC = Qnil; -VALUE rb_mGrpcCore = Qnil; +VALUE grpc_rb_mGRPC = Qnil; +VALUE grpc_rb_mGrpcCore = Qnil; + +/* cached Symbols for members in Status struct */ +VALUE sym_code = Qundef; +VALUE sym_details = Qundef; +VALUE sym_metadata = Qundef; void Init_grpc() { grpc_init(); ruby_vm_at_exit(grpc_rb_shutdown); - rb_mGRPC = rb_define_module("GRPC"); - rb_mGrpcCore = rb_define_module_under(rb_mGRPC, "Core"); - rb_sNewServerRpc = rb_struct_define("NewServerRpc", "method", "host", - "deadline", "metadata", NULL); - rb_sStatus = rb_struct_define("Status", "code", "details", "metadata", NULL); + grpc_rb_mGRPC = rb_define_module("GRPC"); + grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core"); + grpc_rb_sNewServerRpc = + rb_struct_define("NewServerRpc", "method", "host", + "deadline", "metadata", "call", NULL); + grpc_rb_sStatus = + rb_struct_define("Status", "code", "details", "metadata", NULL); + sym_code = ID2SYM(rb_intern("code")); + sym_details = ID2SYM(rb_intern("details")); + sym_metadata = ID2SYM(rb_intern("metadata")); - Init_grpc_byte_buffer(); - Init_grpc_event(); Init_grpc_channel(); Init_grpc_completion_queue(); Init_grpc_call(); Init_grpc_credentials(); - Init_grpc_metadata(); Init_grpc_server(); Init_grpc_server_credentials(); Init_grpc_status_codes(); diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h index 851f5ee69f..a502273de1 100644 --- a/src/ruby/ext/grpc/rb_grpc.h +++ b/src/ruby/ext/grpc/rb_grpc.h @@ -38,26 +38,36 @@ #include <ruby.h> #include <grpc/support/time.h> -/* rb_mGrpcCore is the module containing the ruby wrapper GRPC classes. */ -extern VALUE rb_mGrpcCore; +/* grpc_rb_mGrpcCore is the module containing the ruby wrapper GRPC classes. */ +extern VALUE grpc_rb_mGrpcCore; -/* Class used to wrap timeval structs. */ -extern VALUE rb_cTimeVal; +/* grpc_rb_sNewServerRpc is the struct that holds new server rpc details. */ +extern VALUE grpc_rb_sNewServerRpc; -/* rb_sNewServerRpc is the struct that holds new server rpc details. */ -extern VALUE rb_sNewServerRpc; +/* grpc_rb_sStruct is the struct that holds status details. */ +extern VALUE grpc_rb_sStatus; -/* rb_sStruct is the struct that holds status details. */ -extern VALUE rb_sStatus; +/* sym_code is the symbol for the code attribute of grpc_rb_sStatus. */ +extern VALUE sym_code; + +/* sym_details is the symbol for the details attribute of grpc_rb_sStatus. */ +extern VALUE sym_details; + +/* sym_metadata is the symbol for the metadata attribute of grpc_rb_sStatus. */ +extern VALUE sym_metadata; /* GC_NOT_MARKED is used in calls to Data_Wrap_Struct to indicate that the wrapped struct does not need to participate in ruby gc. */ -extern const RUBY_DATA_FUNC GC_NOT_MARKED; +#define GRPC_RB_GC_NOT_MARKED (RUBY_DATA_FUNC)(NULL) /* GC_DONT_FREED is used in calls to Data_Wrap_Struct to indicate that the wrapped struct should not be freed the wrapped ruby object is released by the garbage collector. */ -extern const RUBY_DATA_FUNC GC_DONT_FREE; +#define GRPC_RB_GC_DONT_FREE (RUBY_DATA_FUNC)(NULL) + +/* GRPC_RB_MEMSIZE_UNAVAILABLE is used in rb_data_type_t to indicate that the + * number of bytes used by the wrapped struct is not available. */ +#define GRPC_RB_MEMSIZE_UNAVAILABLE (size_t (*)(const void*))(NULL) /* A ruby object alloc func that fails by raising an exception. */ VALUE grpc_rb_cannot_alloc(VALUE cls); diff --git a/src/ruby/ext/grpc/rb_metadata.c b/src/ruby/ext/grpc/rb_metadata.c deleted file mode 100644 index 7622a8c57e..0000000000 --- a/src/ruby/ext/grpc/rb_metadata.c +++ /dev/null @@ -1,215 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "rb_metadata.h" - -#include <ruby.h> -#include <string.h> - -#include <grpc/grpc.h> -#include "rb_grpc.h" - -/* grpc_rb_metadata wraps a grpc_metadata. It provides a peer ruby object, - * 'mark' to minimize copying when a metadata is created from ruby. */ -typedef struct grpc_rb_metadata { - /* Holder of ruby objects involved in constructing the metadata */ - VALUE mark; - /* The actual metadata */ - grpc_metadata *wrapped; -} grpc_rb_metadata; - -/* Destroys Metadata instances. */ -static void grpc_rb_metadata_free(void *p) { - if (p == NULL) { - return; - }; - - /* Because metadata is only created during a call to grpc_call_add_metadata, - * and the call takes ownership of the metadata, this does not free the - * wrapped struct, only the wrapper */ - xfree(p); -} - -/* Protects the mark object from GC */ -static void grpc_rb_metadata_mark(void *p) { - grpc_rb_metadata *md = NULL; - if (p == NULL) { - return; - } - - md = (grpc_rb_metadata *)p; - /* If it's not already cleaned up, mark the mark object */ - if (md->mark != Qnil && BUILTIN_TYPE(md->mark) != T_NONE) { - rb_gc_mark(md->mark); - } -} - -/* Allocates Metadata instances. - - Provides safe default values for the Metadata fields. */ -static VALUE grpc_rb_metadata_alloc(VALUE cls) { - grpc_rb_metadata *wrapper = ALLOC(grpc_rb_metadata); - wrapper->wrapped = NULL; - wrapper->mark = Qnil; - return Data_Wrap_Struct(cls, grpc_rb_metadata_mark, grpc_rb_metadata_free, - wrapper); -} - -/* id_key and id_value are the names of the hidden ivars that preserve the - * original byte_buffer source string */ -static ID id_key; -static ID id_value; - -/* Initializes Metadata instances. */ -static VALUE grpc_rb_metadata_init(VALUE self, VALUE key, VALUE value) { - grpc_rb_metadata *wrapper = NULL; - grpc_metadata *md = ALLOC(grpc_metadata); - - /* Use direct pointers to the strings wrapped by the ruby object to avoid - * copying */ - Data_Get_Struct(self, grpc_rb_metadata, wrapper); - wrapper->wrapped = md; - if (TYPE(key) == T_SYMBOL) { - md->key = (char *)rb_id2name(SYM2ID(key)); - } else { /* StringValueCStr does all other type exclusions for us */ - md->key = StringValueCStr(key); - } - md->value = RSTRING_PTR(value); - md->value_length = RSTRING_LEN(value); - - /* Save references to the original values on the mark object so that the - * pointers used there are valid for the lifetime of the object. */ - wrapper->mark = rb_class_new_instance(0, NULL, rb_cObject); - rb_ivar_set(wrapper->mark, id_key, key); - rb_ivar_set(wrapper->mark, id_value, value); - - return self; -} - -/* Clones Metadata instances. - - Gives Metadata a consistent implementation of Ruby's object copy/dup - protocol. */ -static VALUE grpc_rb_metadata_init_copy(VALUE copy, VALUE orig) { - grpc_rb_metadata *orig_md = NULL; - grpc_rb_metadata *copy_md = NULL; - - if (copy == orig) { - return copy; - } - - /* Raise an error if orig is not a metadata object or a subclass. */ - if (TYPE(orig) != T_DATA || - RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_metadata_free) { - rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(rb_cMetadata)); - } - - Data_Get_Struct(orig, grpc_rb_metadata, orig_md); - Data_Get_Struct(copy, grpc_rb_metadata, copy_md); - - /* use ruby's MEMCPY to make a byte-for-byte copy of the metadata wrapper - * object. */ - MEMCPY(copy_md, orig_md, grpc_rb_metadata, 1); - return copy; -} - -/* Gets the key from a metadata instance. */ -static VALUE grpc_rb_metadata_key(VALUE self) { - VALUE key = Qnil; - grpc_rb_metadata *wrapper = NULL; - grpc_metadata *md = NULL; - - Data_Get_Struct(self, grpc_rb_metadata, wrapper); - if (wrapper->mark != Qnil) { - key = rb_ivar_get(wrapper->mark, id_key); - if (key != Qnil) { - return key; - } - } - - md = wrapper->wrapped; - if (md == NULL || md->key == NULL) { - return Qnil; - } - return rb_str_new2(md->key); -} - -/* Gets the value from a metadata instance. */ -static VALUE grpc_rb_metadata_value(VALUE self) { - VALUE val = Qnil; - grpc_rb_metadata *wrapper = NULL; - grpc_metadata *md = NULL; - - Data_Get_Struct(self, grpc_rb_metadata, wrapper); - if (wrapper->mark != Qnil) { - val = rb_ivar_get(wrapper->mark, id_value); - if (val != Qnil) { - return val; - } - } - - md = wrapper->wrapped; - if (md == NULL || md->value == NULL) { - return Qnil; - } - return rb_str_new2(md->value); -} - -/* rb_cMetadata is the Metadata class whose instances proxy grpc_metadata. */ -VALUE rb_cMetadata = Qnil; -void Init_grpc_metadata() { - rb_cMetadata = - rb_define_class_under(rb_mGrpcCore, "Metadata", rb_cObject); - - /* Allocates an object managed by the ruby runtime */ - rb_define_alloc_func(rb_cMetadata, grpc_rb_metadata_alloc); - - /* Provides a ruby constructor and support for dup/clone. */ - rb_define_method(rb_cMetadata, "initialize", grpc_rb_metadata_init, 2); - rb_define_method(rb_cMetadata, "initialize_copy", grpc_rb_metadata_init_copy, - 1); - - /* Provides accessors for the code and details. */ - rb_define_method(rb_cMetadata, "key", grpc_rb_metadata_key, 0); - rb_define_method(rb_cMetadata, "value", grpc_rb_metadata_value, 0); - - id_key = rb_intern("__key"); - id_value = rb_intern("__value"); -} - -/* Gets the wrapped metadata from the ruby wrapper */ -grpc_metadata *grpc_rb_get_wrapped_metadata(VALUE v) { - grpc_rb_metadata *wrapper = NULL; - Data_Get_Struct(v, grpc_rb_metadata, wrapper); - return wrapper->wrapped; -} diff --git a/src/ruby/ext/grpc/rb_metadata.h b/src/ruby/ext/grpc/rb_metadata.h deleted file mode 100644 index 251072f658..0000000000 --- a/src/ruby/ext/grpc/rb_metadata.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_RB_METADATA_H_ -#define GRPC_RB_METADATA_H_ - -#include <grpc/grpc.h> -#include <ruby.h> - -/* rb_cMetadata is the Metadata class whose instances proxy grpc_metadata. */ -extern VALUE rb_cMetadata; - -/* grpc_rb_metadata_create_with_mark creates a grpc_rb_metadata with a ruby mark - * object that will be kept alive while the metadata is alive. */ -extern VALUE grpc_rb_metadata_create_with_mark(VALUE mark, grpc_metadata* md); - -/* Gets the wrapped metadata from the ruby wrapper */ -grpc_metadata* grpc_rb_get_wrapped_metadata(VALUE v); - -/* Initializes the Metadata class. */ -void Init_grpc_metadata(); - -#endif /* GRPC_RB_METADATA_H_ */ diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index c54f02e87a..bc0878af05 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -43,8 +43,11 @@ #include "rb_server_credentials.h" #include "rb_grpc.h" -/* rb_cServer is the ruby class that proxies grpc_server. */ -VALUE rb_cServer = Qnil; +/* grpc_rb_cServer is the ruby class that proxies grpc_server. */ +static VALUE grpc_rb_cServer = Qnil; + +/* id_at is the constructor method of the ruby standard Time class. */ +static ID id_at; /* grpc_rb_server wraps a grpc_server. It provides a peer ruby object, 'mark' to minimize copying when a server is created from ruby. */ @@ -85,13 +88,23 @@ static void grpc_rb_server_mark(void *p) { } } +static const rb_data_type_t grpc_rb_server_data_type = { + "grpc_server", + {grpc_rb_server_mark, grpc_rb_server_free, GRPC_RB_MEMSIZE_UNAVAILABLE}, + NULL, + NULL, + /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free function would block + * and we might want to unlock GVL + * TODO(yugui) Unlock GVL? + */ + 0}; + /* Allocates grpc_rb_server instances. */ static VALUE grpc_rb_server_alloc(VALUE cls) { grpc_rb_server *wrapper = ALLOC(grpc_rb_server); wrapper->wrapped = NULL; wrapper->mark = Qnil; - return Data_Wrap_Struct(cls, grpc_rb_server_mark, grpc_rb_server_free, - wrapper); + return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper); } /* @@ -107,7 +120,8 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) { grpc_channel_args args; MEMZERO(&args, grpc_channel_args, 1); cq = grpc_rb_get_wrapped_completion_queue(cqueue); - Data_Get_Struct(self, grpc_rb_server, wrapper); + TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, + wrapper); grpc_rb_hash_convert_to_channel_args(channel_args, &args); srv = grpc_server_create(cq, &args); @@ -140,11 +154,13 @@ static VALUE grpc_rb_server_init_copy(VALUE copy, VALUE orig) { /* Raise an error if orig is not a server object or a subclass. */ if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_server_free) { - rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(rb_cServer)); + rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(grpc_rb_cServer)); } - Data_Get_Struct(orig, grpc_rb_server, orig_srv); - Data_Get_Struct(copy, grpc_rb_server, copy_srv); + TypedData_Get_Struct(orig, grpc_rb_server, &grpc_rb_server_data_type, + orig_srv); + TypedData_Get_Struct(copy, grpc_rb_server, &grpc_rb_server_data_type, + copy_srv); /* use ruby's MEMCPY to make a byte-for-byte copy of the server wrapper object. */ @@ -152,25 +168,97 @@ static VALUE grpc_rb_server_init_copy(VALUE copy, VALUE orig) { return copy; } -static VALUE grpc_rb_server_request_call(VALUE self, VALUE tag_new) { - grpc_call_error err; +/* request_call_stack holds various values used by the + * grpc_rb_server_request_call function */ +typedef struct request_call_stack { + grpc_call_details details; + grpc_metadata_array md_ary; +} request_call_stack; + +/* grpc_request_call_stack_init ensures the request_call_stack is properly + * initialized */ +static void grpc_request_call_stack_init(request_call_stack* st) { + MEMZERO(st, request_call_stack, 1); + grpc_metadata_array_init(&st->md_ary); + grpc_call_details_init(&st->details); + st->details.method = NULL; + st->details.host = NULL; +} + +/* grpc_request_call_stack_cleanup ensures the request_call_stack is properly + * cleaned up */ +static void grpc_request_call_stack_cleanup(request_call_stack* st) { + grpc_metadata_array_destroy(&st->md_ary); + grpc_call_details_destroy(&st->details); +} + +/* call-seq: + cq = CompletionQueue.new + tag = Object.new + timeout = 10 + server.request_call(cqueue, tag, timeout) + + Requests notification of a new call on a server. */ +static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue, + VALUE tag_new, VALUE timeout) { grpc_rb_server *s = NULL; - Data_Get_Struct(self, grpc_rb_server, s); + grpc_call *call = NULL; + grpc_event *ev = NULL; + grpc_call_error err; + request_call_stack st; + VALUE result; + TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); if (s->wrapped == NULL) { rb_raise(rb_eRuntimeError, "closed!"); + return Qnil; } else { - err = grpc_server_request_call_old(s->wrapped, ROBJECT(tag_new)); + grpc_request_call_stack_init(&st); + /* call grpc_server_request_call, then wait for it to complete using + * pluck_event */ + err = grpc_server_request_call( + s->wrapped, &call, &st.details, &st.md_ary, + grpc_rb_get_wrapped_completion_queue(cqueue), + ROBJECT(tag_new)); if (err != GRPC_CALL_OK) { - rb_raise(rb_eCallError, "server request failed: %s (code=%d)", + grpc_request_call_stack_cleanup(&st); + rb_raise(grpc_rb_eCallError, + "grpc_server_request_call failed: %s (code=%d)", grpc_call_error_detail_of(err), err); + return Qnil; } + ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout); + if (ev == NULL) { + grpc_request_call_stack_cleanup(&st); + return Qnil; + } + if (ev->data.op_complete != GRPC_OP_OK) { + grpc_request_call_stack_cleanup(&st); + grpc_event_finish(ev); + rb_raise(grpc_rb_eCallError, "request_call completion failed: (code=%d)", + ev->data.op_complete); + return Qnil; + } + + /* build the NewServerRpc struct result */ + result = rb_struct_new( + grpc_rb_sNewServerRpc, + rb_str_new2(st.details.method), + rb_str_new2(st.details.host), + rb_funcall(rb_cTime, id_at, 2, INT2NUM(st.details.deadline.tv_sec), + INT2NUM(st.details.deadline.tv_nsec)), + grpc_rb_md_ary_to_h(&st.md_ary), + grpc_rb_wrap_call(call), + NULL); + grpc_event_finish(ev); + grpc_request_call_stack_cleanup(&st); + return result; } return Qnil; } static VALUE grpc_rb_server_start(VALUE self) { grpc_rb_server *s = NULL; - Data_Get_Struct(self, grpc_rb_server, s); + TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); if (s->wrapped == NULL) { rb_raise(rb_eRuntimeError, "closed!"); } else { @@ -181,7 +269,7 @@ static VALUE grpc_rb_server_start(VALUE self) { static VALUE grpc_rb_server_destroy(VALUE self) { grpc_rb_server *s = NULL; - Data_Get_Struct(self, grpc_rb_server, s); + TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); if (s->wrapped != NULL) { grpc_server_shutdown(s->wrapped); grpc_server_destroy(s->wrapped); @@ -213,7 +301,7 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) { /* "11" == 1 mandatory args, 1 (rb_creds) is optional */ rb_scan_args(argc, argv, "11", &port, &rb_creds); - Data_Get_Struct(self, grpc_rb_server, s); + TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); if (s->wrapped == NULL) { rb_raise(rb_eRuntimeError, "closed!"); return Qnil; @@ -239,27 +327,32 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) { } void Init_grpc_server() { - rb_cServer = rb_define_class_under(rb_mGrpcCore, "Server", rb_cObject); + grpc_rb_cServer = + rb_define_class_under(grpc_rb_mGrpcCore, "Server", rb_cObject); /* Allocates an object managed by the ruby runtime */ - rb_define_alloc_func(rb_cServer, grpc_rb_server_alloc); + rb_define_alloc_func(grpc_rb_cServer, grpc_rb_server_alloc); /* Provides a ruby constructor and support for dup/clone. */ - rb_define_method(rb_cServer, "initialize", grpc_rb_server_init, 2); - rb_define_method(rb_cServer, "initialize_copy", grpc_rb_server_init_copy, 1); + rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 2); + rb_define_method(grpc_rb_cServer, "initialize_copy", + grpc_rb_server_init_copy, 1); /* Add the server methods. */ - rb_define_method(rb_cServer, "request_call", grpc_rb_server_request_call, 1); - rb_define_method(rb_cServer, "start", grpc_rb_server_start, 0); - rb_define_method(rb_cServer, "destroy", grpc_rb_server_destroy, 0); - rb_define_alias(rb_cServer, "close", "destroy"); - rb_define_method(rb_cServer, "add_http2_port", grpc_rb_server_add_http2_port, + rb_define_method(grpc_rb_cServer, "request_call", + grpc_rb_server_request_call, 3); + rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0); + rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0); + rb_define_alias(grpc_rb_cServer, "close", "destroy"); + rb_define_method(grpc_rb_cServer, "add_http2_port", + grpc_rb_server_add_http2_port, -1); + id_at = rb_intern("at"); } /* Gets the wrapped server from the ruby wrapper */ grpc_server *grpc_rb_get_wrapped_server(VALUE v) { grpc_rb_server *wrapper = NULL; - Data_Get_Struct(v, grpc_rb_server, wrapper); + TypedData_Get_Struct(v, grpc_rb_server, &grpc_rb_server_data_type, wrapper); return wrapper->wrapped; } diff --git a/src/ruby/ext/grpc/rb_server.h b/src/ruby/ext/grpc/rb_server.h index 2726b9a50a..5e4b711d35 100644 --- a/src/ruby/ext/grpc/rb_server.h +++ b/src/ruby/ext/grpc/rb_server.h @@ -37,10 +37,6 @@ #include <ruby.h> #include <grpc/grpc.h> -/* rb_cServer is the Server class whose instances proxy - grpc_byte_buffer. */ -extern VALUE rb_cServer; - /* Initializes the Server class. */ void Init_grpc_server(); diff --git a/src/ruby/ext/grpc/rb_server_credentials.c b/src/ruby/ext/grpc/rb_server_credentials.c index fb02987870..a86389445f 100644 --- a/src/ruby/ext/grpc/rb_server_credentials.c +++ b/src/ruby/ext/grpc/rb_server_credentials.c @@ -40,6 +40,10 @@ #include "rb_grpc.h" +/* grpc_rb_cServerCredentials is the ruby class that proxies + grpc_server_credentials. */ +static VALUE grpc_rb_cServerCredentials = Qnil; + /* grpc_rb_server_credentials wraps a grpc_server_credentials. It provides a peer ruby object, 'mark' to minimize copying when a server credential is created from ruby. */ @@ -82,6 +86,14 @@ static void grpc_rb_server_credentials_mark(void *p) { } } +static const rb_data_type_t grpc_rb_server_credentials_data_type = { + "grpc_server_credentials", + {grpc_rb_server_credentials_mark, grpc_rb_server_credentials_free, + GRPC_RB_MEMSIZE_UNAVAILABLE}, + NULL, NULL, + RUBY_TYPED_FREE_IMMEDIATELY +}; + /* Allocates ServerCredential instances. Provides safe initial defaults for the instance fields. */ @@ -89,8 +101,8 @@ static VALUE grpc_rb_server_credentials_alloc(VALUE cls) { grpc_rb_server_credentials *wrapper = ALLOC(grpc_rb_server_credentials); wrapper->wrapped = NULL; wrapper->mark = Qnil; - return Data_Wrap_Struct(cls, grpc_rb_server_credentials_mark, - grpc_rb_server_credentials_free, wrapper); + return TypedData_Wrap_Struct(cls, &grpc_rb_server_credentials_data_type, + wrapper); } /* Clones ServerCredentials instances. @@ -109,11 +121,13 @@ static VALUE grpc_rb_server_credentials_init_copy(VALUE copy, VALUE orig) { if (TYPE(orig) != T_DATA || RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_server_credentials_free) { rb_raise(rb_eTypeError, "not a %s", - rb_obj_classname(rb_cServerCredentials)); + rb_obj_classname(grpc_rb_cServerCredentials)); } - Data_Get_Struct(orig, grpc_rb_server_credentials, orig_ch); - Data_Get_Struct(copy, grpc_rb_server_credentials, copy_ch); + TypedData_Get_Struct(orig, grpc_rb_server_credentials, + &grpc_rb_server_credentials_data_type, orig_ch); + TypedData_Get_Struct(copy, grpc_rb_server_credentials, + &grpc_rb_server_credentials_data_type, copy_ch); /* use ruby's MEMCPY to make a byte-for-byte copy of the server_credentials wrapper object. */ @@ -149,7 +163,8 @@ static VALUE grpc_rb_server_credentials_init(VALUE self, VALUE pem_root_certs, grpc_rb_server_credentials *wrapper = NULL; grpc_server_credentials *creds = NULL; grpc_ssl_pem_key_cert_pair key_cert_pair = {NULL, NULL}; - Data_Get_Struct(self, grpc_rb_server_credentials, wrapper); + TypedData_Get_Struct(self, grpc_rb_server_credentials, + &grpc_rb_server_credentials_data_type, wrapper); if (pem_cert_chain == Qnil) { rb_raise(rb_eRuntimeError, "could not create a server credential: nil pem_cert_chain"); @@ -180,21 +195,18 @@ static VALUE grpc_rb_server_credentials_init(VALUE self, VALUE pem_root_certs, return self; } -/* rb_cServerCredentials is the ruby class that proxies - grpc_server_credentials. */ -VALUE rb_cServerCredentials = Qnil; - void Init_grpc_server_credentials() { - rb_cServerCredentials = - rb_define_class_under(rb_mGrpcCore, "ServerCredentials", rb_cObject); + grpc_rb_cServerCredentials = + rb_define_class_under(grpc_rb_mGrpcCore, "ServerCredentials", rb_cObject); /* Allocates an object managed by the ruby runtime */ - rb_define_alloc_func(rb_cServerCredentials, grpc_rb_server_credentials_alloc); + rb_define_alloc_func(grpc_rb_cServerCredentials, + grpc_rb_server_credentials_alloc); /* Provides a ruby constructor and support for dup/clone. */ - rb_define_method(rb_cServerCredentials, "initialize", + rb_define_method(grpc_rb_cServerCredentials, "initialize", grpc_rb_server_credentials_init, 3); - rb_define_method(rb_cServerCredentials, "initialize_copy", + rb_define_method(grpc_rb_cServerCredentials, "initialize_copy", grpc_rb_server_credentials_init_copy, 1); id_pem_cert_chain = rb_intern("__pem_cert_chain"); @@ -205,6 +217,7 @@ void Init_grpc_server_credentials() { /* Gets the wrapped grpc_server_credentials from the ruby wrapper */ grpc_server_credentials *grpc_rb_get_wrapped_server_credentials(VALUE v) { grpc_rb_server_credentials *wrapper = NULL; - Data_Get_Struct(v, grpc_rb_server_credentials, wrapper); + TypedData_Get_Struct(v, grpc_rb_server_credentials, + &grpc_rb_server_credentials_data_type, wrapper); return wrapper->wrapped; } diff --git a/src/ruby/ext/grpc/rb_server_credentials.h b/src/ruby/ext/grpc/rb_server_credentials.h index ef377195a0..35b395ad5c 100644 --- a/src/ruby/ext/grpc/rb_server_credentials.h +++ b/src/ruby/ext/grpc/rb_server_credentials.h @@ -37,10 +37,6 @@ #include <ruby.h> #include <grpc/grpc_security.h> -/* rb_cServerCredentials is the ruby class whose instances proxy - grpc_server_credentials. */ -extern VALUE rb_cServerCredentials; - /* Initializes the ruby ServerCredentials class. */ void Init_grpc_server_credentials(); diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec index 45cbacfeb0..19b3e21cb6 100755 --- a/src/ruby/grpc.gemspec +++ b/src/ruby/grpc.gemspec @@ -13,6 +13,9 @@ Gem::Specification.new do |s| s.description = 'Send RPCs from Ruby using GRPC' s.license = 'BSD-3-Clause' + s.required_ruby_version = '>= 2.0.0' + s.requirements << 'libgrpc ~> 0.6.0 needs to be installed' + s.files = `git ls-files`.split("\n") s.test_files = `git ls-files -- spec/*`.split("\n") s.executables = `git ls-files -- bin/*.rb`.split("\n").map do |f| @@ -22,16 +25,16 @@ Gem::Specification.new do |s| s.platform = Gem::Platform::RUBY s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1' - s.add_dependency 'googleauth', '~> 0.1' - s.add_dependency 'logging', '~> 1.8' + s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests + s.add_dependency 'logging', '~> 2.0' s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests - s.add_dependency 'xray', '~> 1.1' - s.add_development_dependency 'bundler', '~> 1.7' - s.add_development_dependency 'rake', '~> 10.0' - s.add_development_dependency 'rake-compiler', '~> 0' - s.add_development_dependency 'rubocop', '~> 0.28.0' - s.add_development_dependency 'rspec', '~> 3.0' + s.add_development_dependency 'simplecov', '~> 0.9' + s.add_development_dependency 'bundler', '~> 1.9' + s.add_development_dependency 'rake', '~> 10.4' + s.add_development_dependency 'rake-compiler', '~> 0.9' + s.add_development_dependency 'rspec', '~> 3.2' + s.add_development_dependency 'rubocop', '~> 0.30' s.extensions = %w(ext/grpc/extconf.rb) end diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index dd02ef7666..80b5743e91 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -30,8 +30,8 @@ require 'grpc/errors' require 'grpc/grpc' require 'grpc/logconfig' +require 'grpc/notifier' require 'grpc/version' -require 'grpc/core/event' require 'grpc/core/time_consts' require 'grpc/generic/active_call' require 'grpc/generic/client_stub' diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb index 58944872b5..f1201c1704 100644 --- a/src/ruby/lib/grpc/errors.rb +++ b/src/ruby/lib/grpc/errors.rb @@ -31,23 +31,20 @@ require 'grpc' # GRPC contains the General RPC module. module GRPC - # OutOfTime is an exception class that indicates that an RPC exceeded its - # deadline. - OutOfTime = Class.new(StandardError) - # BadStatus is an exception class that indicates that an error occurred at # either end of a GRPC connection. When raised, it indicates that a status # error should be returned to the other end of a GRPC connection; when # caught it means that this end received a status error. class BadStatus < StandardError - attr_reader :code, :details + attr_reader :code, :details, :metadata # @param code [Numeric] the status code # @param details [String] the details of the exception - def initialize(code, details = 'unknown cause') + def initialize(code, details = 'unknown cause', **kw) super("#{code}:#{details}") @code = code @details = details + @metadata = kw end # Converts the exception to a GRPC::Status for use in the networking @@ -55,7 +52,11 @@ module GRPC # # @return [Status] with the same code and details def to_status - Status.new(code, details) + Struct::Status.new(code, details, @metadata) end end + + # Cancelled is an exception class that indicates that an rpc was cancelled. + class Cancelled < StandardError + end end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 6256330e88..947c39cd22 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -30,10 +30,23 @@ require 'forwardable' require 'grpc/generic/bidi_call' -def assert_event_type(ev, want) - fail OutOfTime if ev.nil? - got = ev.type - fail "Unexpected rpc event: got #{got}, want #{want}" unless got == want +class Struct + # BatchResult is the struct returned by calls to call#start_batch. + class BatchResult + # check_status returns the status, raising an error if the status + # is non-nil and not OK. + def check_status + return nil if status.nil? + fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED + if status.code != GRPC::Core::StatusCodes::OK + # raise BadStatus, propagating the metadata if present. + md = status.metadata + with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }] + fail GRPC::BadStatus.new(status.code, status.details, **with_sym_keys) + end + status + end + end end # GRPC contains the General RPC module. @@ -41,10 +54,12 @@ module GRPC # The ActiveCall class provides simple methods for sending marshallable # data to a call class ActiveCall - include Core::CompletionType include Core::StatusCodes include Core::TimeConsts + include Core::CallOps + extend Forwardable attr_reader(:deadline) + def_delegators :@call, :cancel, :metadata # client_invoke begins a client invocation. # @@ -61,15 +76,14 @@ module GRPC # @param q [CompletionQueue] the completion queue # @param deadline [Fixnum,TimeSpec] the deadline def self.client_invoke(call, q, _deadline, **kw) - fail(ArgumentError, 'not a call') unless call.is_a? Core::Call + fail(TypeError, '!Core::Call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') + fail(TypeError, '!Core::CompletionQueue') end - call.add_metadata(kw) if kw.length > 0 - client_metadata_read = Object.new - finished_tag = Object.new - call.invoke(q, client_metadata_read, finished_tag) - [finished_tag, client_metadata_read] + metadata_tag = Object.new + call.run_batch(q, metadata_tag, INFINITE_FUTURE, + SEND_INITIAL_METADATA => kw) + metadata_tag end # Creates an ActiveCall. @@ -91,69 +105,27 @@ module GRPC # @param marshal [Function] f(obj)->string that marshal requests # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param deadline [Fixnum] the deadline for the call to complete - # @param finished_tag [Object] the object used as the call's finish tag, - # if the call has begun - # @param read_metadata_tag [Object] the object used as the call's finish - # tag, if the call has begun + # @param metadata_tag [Object] the object use obtain metadata for clients # @param started [true|false] indicates if the call has begun - def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil, - read_metadata_tag: nil, started: true) - fail(ArgumentError, 'not a call') unless call.is_a? Core::Call + def initialize(call, q, marshal, unmarshal, deadline, started: true, + metadata_tag: nil) + fail(TypeError, '!Core::Call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') + fail(TypeError, '!Core::CompletionQueue') end @call = call @cq = q @deadline = deadline - @finished_tag = finished_tag - @read_metadata_tag = read_metadata_tag @marshal = marshal @started = started @unmarshal = unmarshal + @metadata_tag = metadata_tag end - # Obtains the status of the call. - # - # this value is nil until the call completes - # @return this call's status - def status - @call.status - end - - # Obtains the metadata of the call. - # - # At the start of the call this will be nil. During the call this gets - # some values as soon as the other end of the connection acknowledges the - # request. - # - # @return this calls's metadata - def metadata - @call.metadata - end - - # Cancels the call. - # - # Cancels the call. The call does not return any result, but once this it - # has been called, the call should eventually terminate. Due to potential - # races between the execution of the cancel and the in-flight request, the - # result of the call after calling #cancel is indeterminate: - # - # - the call may terminate with a BadStatus exception, with code=CANCELLED - # - the call may terminate with OK Status, and return a response - # - the call may terminate with a different BadStatus exception if that - # was happening - def cancel - @call.cancel - end - - # indicates if the call is shutdown - def shutdown - @shutdown ||= false - end - - # indicates if the call is cancelled. - def cancelled - @cancelled ||= false + # output_metadata are provides access to hash that can be used to + # save metadata to be sent as trailer + def output_metadata + @output_metadata ||= {} end # multi_req_view provides a restricted view of this ActiveCall for use @@ -176,128 +148,94 @@ module GRPC # writes_done indicates that all writes are completed. # - # It blocks until the remote endpoint acknowledges by sending a FINISHED - # event, unless assert_finished is set to false. Any calls to - # #remote_send after this call will fail. + # It blocks until the remote endpoint acknowledges with at status unless + # assert_finished is set to false. Any calls to #remote_send after this + # call will fail. # # @param assert_finished [true, false] when true(default), waits for # FINISHED. def writes_done(assert_finished = true) - @call.writes_done(self) - ev = @cq.pluck(self, INFINITE_FUTURE) - begin - assert_event_type(ev, FINISH_ACCEPTED) - logger.debug("Writes done: waiting for finish? #{assert_finished}") - ensure - ev.close - end - + ops = { + SEND_CLOSE_FROM_CLIENT => nil + } + ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished + batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) return unless assert_finished - ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) - fail 'unexpected nil event' if ev.nil? - ev.close - @call.status + batch_result.check_status end - # finished waits until the call is completed. + # finished waits until a client call is completed. # - # It blocks until the remote endpoint acknowledges by sending a FINISHED - # event. + # It blocks until the remote endpoint acknowledges by sending a status. def finished - ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) - begin - fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED + batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, + RECV_STATUS_ON_CLIENT => nil) + unless batch_result.status.nil? if @call.metadata.nil? - @call.metadata = ev.result.metadata + @call.metadata = batch_result.status.metadata else - @call.metadata.merge!(ev.result.metadata) + @call.metadata.merge!(batch_result.status.metadata) end - - if ev.result.code != Core::StatusCodes::OK - fail BadStatus.new(ev.result.code, ev.result.details) - end - res = ev.result - ensure - ev.close end - res + batch_result.check_status end # remote_send sends a request to the remote endpoint. # - # It blocks until the remote endpoint acknowledges by sending a - # WRITE_ACCEPTED. req can be marshalled already. + # It blocks until the remote endpoint accepts the message. # # @param req [Object, String] the object to send or it's marshal form. # @param marshalled [false, true] indicates if the object is already # marshalled. def remote_send(req, marshalled = false) - assert_queue_is_ready - logger.debug("sending #{req.inspect}, marshalled? #{marshalled}") + logger.debug("sending #{req}, marshalled? #{marshalled}") if marshalled payload = req else payload = @marshal.call(req) end - @call.start_write(Core::ByteBuffer.new(payload), self) - - # call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return - # until the flow control allows another send on this call. - ev = @cq.pluck(self, INFINITE_FUTURE) - begin - assert_event_type(ev, WRITE_ACCEPTED) - ensure - ev.close - end + @call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload) end - # send_status sends a status to the remote endpoint + # send_status sends a status to the remote endpoint. # # @param code [int] the status code to send # @param details [String] details # @param assert_finished [true, false] when true(default), waits for # FINISHED. - def send_status(code = OK, details = '', assert_finished = false) - assert_queue_is_ready - @call.start_write_status(code, details, self) - ev = @cq.pluck(self, INFINITE_FUTURE) - begin - assert_event_type(ev, FINISH_ACCEPTED) - ensure - ev.close - end - logger.debug("Status sent: #{code}:'#{details}'") - return finished if assert_finished + # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + def send_status(code = OK, details = '', assert_finished = false, **kw) + ops = { + SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, kw) + } + ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished + @call.run_batch(@cq, self, INFINITE_FUTURE, ops) nil end # remote_read reads a response from the remote endpoint. # - # It blocks until the remote endpoint sends a READ or FINISHED event. On - # a READ, it returns the response after unmarshalling it. On - # FINISHED, it returns nil if the status is OK, otherwise raising - # BadStatus + # It blocks until the remote endpoint replies with a message or status. + # On receiving a message, it returns the response after unmarshalling it. + # On receiving a status, it returns nil if the status is OK, otherwise + # raising BadStatus def remote_read - if @call.metadata.nil? && !@read_metadata_tag.nil? - ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE) - assert_event_type(ev, CLIENT_METADATA_READ) - @call.metadata = ev.result - @read_metadata_tag = nil + ops = { RECV_MESSAGE => nil } + ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil? + batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) + unless @metadata_tag.nil? + @call.metadata = batch_result.metadata + @metadata_tag = nil end - - @call.start_read(self) - ev = @cq.pluck(self, INFINITE_FUTURE) - begin - assert_event_type(ev, READ) - logger.debug("received req: #{ev.result.inspect}") - unless ev.result.nil? - logger.debug("received req.to_s: #{ev.result}") - res = @unmarshal.call(ev.result.to_s) - logger.debug("received_req (unmarshalled): #{res.inspect}") - return res - end - ensure - ev.close + logger.debug("received req: #{batch_result}") + unless batch_result.nil? || batch_result.message.nil? + logger.debug("received req.to_s: #{batch_result.message}") + res = @unmarshal.call(batch_result.message) + logger.debug("received_req (unmarshalled): #{res.inspect}") + return res end logger.debug('found nil; the final response has been sent') nil @@ -324,7 +262,6 @@ module GRPC return enum_for(:each_remote_read) unless block_given? loop do resp = remote_read - break if resp.is_a? Struct::Status # is an OK status break if resp.nil? # the last response was received yield resp end @@ -379,6 +316,9 @@ module GRPC response = remote_read finished unless response.is_a? Struct::Status response + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # client_streamer sends a stream of requests to a GRPC server, and @@ -402,6 +342,9 @@ module GRPC response = remote_read finished unless response.is_a? Struct::Status response + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # server_streamer sends one request to the GRPC server, which yields a @@ -428,6 +371,9 @@ module GRPC replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # bidi_streamer sends a stream of requests to the GRPC server, and yields @@ -461,9 +407,11 @@ module GRPC # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, **kw, &blk) start_call(**kw) unless @started - bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline, - @finished_tag) + bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd.run_on_client(requests, &blk) + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # run_server_bidi orchestrates a BiDi stream processing on a server. @@ -478,16 +426,16 @@ module GRPC # # @param gen_each_reply [Proc] generates the BiDi stream replies def run_server_bidi(gen_each_reply) - bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline, - @finished_tag) + bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd.run_on_server(gen_each_reply) end private + # Starts the call if not already started def start_call(**kw) - tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) - @finished_tag, @read_metadata_tag = tags + return if @started + @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) @started = true end @@ -505,32 +453,17 @@ module GRPC # SingleReqView limits access to an ActiveCall's methods for use in server # handlers that receive just one request. - SingleReqView = view_class(:cancelled, :deadline, :metadata) + SingleReqView = view_class(:cancelled, :deadline, :metadata, + :output_metadata) # MultiReqView limits access to an ActiveCall's methods for use in # server client_streamer handlers. MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg, - :each_remote_read, :metadata) + :each_remote_read, :metadata, :output_metadata) # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. Operation = view_class(:cancel, :cancelled, :deadline, :execute, - :metadata, :status) - - # confirms that no events are enqueued, and that the queue is not - # shutdown. - def assert_queue_is_ready - ev = nil - begin - ev = @cq.pluck(self, ZERO) - fail "unexpected event #{ev.inspect}" unless ev.nil? - rescue OutOfTime - logging.debug('timed out waiting for next event') - # expected, nothing should be on the queue and the deadline was ZERO, - # except things using another tag - ensure - ev.close unless ev.nil? - end - end + :metadata, :status, :start_call) end end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index c66deaae60..4ca3004d6f 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -30,18 +30,12 @@ require 'forwardable' require 'grpc/grpc' -def assert_event_type(ev, want) - fail OutOfTime if ev.nil? - got = ev.type - fail("Unexpected rpc event: got #{got}, want #{want}") unless got == want -end - # GRPC contains the General RPC module. module GRPC # The BiDiCall class orchestrates exection of a BiDi stream on a client or # server. class BidiCall - include Core::CompletionType + include Core::CallOps include Core::StatusCodes include Core::TimeConsts @@ -63,8 +57,7 @@ module GRPC # @param marshal [Function] f(obj)->string that marshal requests # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param deadline [Fixnum] the deadline for the call to complete - # @param finished_tag [Object] the object used as the call's finish tag, - def initialize(call, q, marshal, unmarshal, deadline, finished_tag) + def initialize(call, q, marshal, unmarshal, deadline) fail(ArgumentError, 'not a call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue fail(ArgumentError, 'not a CompletionQueue') @@ -72,7 +65,6 @@ module GRPC @call = call @cq = q @deadline = deadline - @finished_tag = finished_tag @marshal = marshal @readq = Queue.new @unmarshal = unmarshal @@ -86,13 +78,11 @@ module GRPC # @param requests the Enumerable of requests to send # @return an Enumerator of requests to yield def run_on_client(requests, &blk) - enq_th = start_write_loop(requests) - loop_th = start_read_loop + @enq_th = start_write_loop(requests) + @loop_th = start_read_loop replies = each_queued_msg return replies if blk.nil? replies.each { |r| blk.call(r) } - enq_th.join - loop_th.join end # Begins orchestration of the Bidi stream for a server generating replies. @@ -108,10 +98,8 @@ module GRPC # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) - enq_th = start_write_loop(replys, is_client: false) - loop_th = start_read_loop - loop_th.join - enq_th.join + @enq_th = start_write_loop(replys, is_client: false) + @loop_th = start_read_loop end private @@ -130,10 +118,12 @@ module GRPC logger.debug("each_queued_msg: msg##{count}") count += 1 req = @readq.pop + logger.debug("each_queued_msg: req = #{req}") throw req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req end + @enq_th.join if @enq_th.alive? end # during bidi-streaming, read the requests to send from a separate thread @@ -144,36 +134,23 @@ module GRPC begin count = 0 requests.each do |req| + logger.debug("bidi-write_loop: #{count}") count += 1 payload = @marshal.call(req) - @call.start_write(Core::ByteBuffer.new(payload), write_tag) - ev = @cq.pluck(write_tag, INFINITE_FUTURE) - begin - assert_event_type(ev, WRITE_ACCEPTED) - ensure - ev.close - end + @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_MESSAGE => payload) end if is_client - @call.writes_done(write_tag) - ev = @cq.pluck(write_tag, INFINITE_FUTURE) - begin - assert_event_type(ev, FINISH_ACCEPTED) - ensure - ev.close - end - logger.debug("bidi-client: sent #{count} reqs, waiting to finish") - ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) - begin - assert_event_type(ev, FINISHED) - ensure - ev.close - end - logger.debug('bidi-client: finished received') + logger.debug("bidi-write-loop: sent #{count}, waiting to finish") + batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil, + RECV_STATUS_ON_CLIENT => nil) + batch_result.check_status end rescue StandardError => e - logger.warn('bidi: write_loop failed') + logger.warn('bidi-write_loop: failed') logger.warn(e) + raise e end end end @@ -187,27 +164,22 @@ module GRPC # queue the initial read before beginning the loop loop do - logger.debug("waiting for read #{count}") + logger.debug("bidi-read_loop: #{count}") count += 1 - @call.start_read(read_tag) - ev = @cq.pluck(read_tag, INFINITE_FUTURE) - begin - assert_event_type(ev, READ) - - # handle the next event. - if ev.result.nil? - @readq.push(END_OF_READS) - logger.debug('done reading!') - break - end - - # push the latest read onto the queue and continue reading - logger.debug("received req: #{ev.result}") - res = @unmarshal.call(ev.result.to_s) - @readq.push(res) - ensure - ev.close + # TODO: ensure metadata is read if available, currently it's not + batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, + RECV_MESSAGE => nil) + # handle the next message + if batch_result.message.nil? + @readq.push(END_OF_READS) + logger.debug('bidi-read-loop: done reading!') + break end + + # push the latest read onto the queue and continue reading + logger.debug("received req: #{batch_result.message}") + res = @unmarshal.call(batch_result.message) + @readq.push(res) end rescue StandardError => e diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb index 01328d4a5b..7b2c04aa22 100644 --- a/src/ruby/lib/grpc/generic/client_stub.rb +++ b/src/ruby/lib/grpc/generic/client_stub.rb @@ -28,16 +28,16 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require 'grpc/generic/active_call' -require 'xray/thread_dump_signal_handler' # GRPC contains the General RPC module. module GRPC # ClientStub represents an endpoint used to send requests to GRPC servers. class ClientStub include Core::StatusCodes + include Core::TimeConsts - # Default deadline is 5 seconds. - DEFAULT_DEADLINE = 5 + # Default timeout is 5 seconds. + DEFAULT_TIMEOUT = 5 # setup_channel is used by #initialize to constuct a channel from its # arguments. @@ -51,6 +51,14 @@ module GRPC Core::Channel.new(host, kw, creds) end + def self.update_with_jwt_aud_uri(a_hash, host, method) + last_slash_idx, res = method.rindex('/'), a_hash.clone + return res if last_slash_idx.nil? + service_name = method[0..(last_slash_idx - 1)] + res[:jwt_aud_uri] = "https://#{host}#{service_name}" + res + end + # check_update_metadata is used by #initialize verify that it's a Proc. def self.check_update_metadata(update_metadata) return update_metadata if update_metadata.nil? @@ -76,8 +84,8 @@ module GRPC # present the host and arbitrary keyword arg areignored, and the RPC # connection uses this channel. # - # - :deadline - # when present, this is the default deadline used for calls + # - :timeout + # when present, this is the default timeout used for calls # # - :update_metadata # when present, this a func that takes a hash and returns a hash @@ -87,13 +95,13 @@ module GRPC # @param host [String] the host the stub connects to # @param q [Core::CompletionQueue] used to wait for events # @param channel_override [Core::Channel] a pre-created channel - # @param deadline [Number] the default deadline to use in requests + # @param timeout [Number] the default timeout to use in requests # @param creds [Core::Credentials] the channel # @param update_metadata a func that updates metadata as described above # @param kw [KeywordArgs]the channel arguments def initialize(host, q, channel_override: nil, - deadline: DEFAULT_DEADLINE, + timeout: nil, creds: nil, update_metadata: nil, **kw) @@ -103,7 +111,7 @@ module GRPC @update_metadata = ClientStub.check_update_metadata(update_metadata) alt_host = kw[Core::Channel::SSL_TARGET] @host = alt_host.nil? ? host : alt_host - @deadline = deadline + @timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout end # request_response sends a request to a GRPC server, and returns the @@ -140,13 +148,14 @@ module GRPC # @param req [Object] the request sent to the server # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Numeric] (optional) the max completion time in seconds + # @param timeout [Numeric] (optional) the max completion time in seconds # @param return_op [true|false] return an Operation if true # @return [Object] the response received from the server - def request_response(method, req, marshal, unmarshal, deadline = nil, + def request_response(method, req, marshal, unmarshal, timeout = nil, return_op: false, **kw) - c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + c = new_active_call(method, marshal, unmarshal, timeout) + kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.request_response(req, **md) unless return_op # return the operation view of the active_call; define #execute as a @@ -197,13 +206,14 @@ module GRPC # @param requests [Object] an Enumerable of requests to send # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Numeric] the max completion time in seconds + # @param timeout [Numeric] the max completion time in seconds # @param return_op [true|false] return an Operation if true # @return [Object|Operation] the response received from the server - def client_streamer(method, requests, marshal, unmarshal, deadline = nil, + def client_streamer(method, requests, marshal, unmarshal, timeout = nil, return_op: false, **kw) - c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + c = new_active_call(method, marshal, unmarshal, timeout) + kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.client_streamer(requests, **md) unless return_op # return the operation view of the active_call; define #execute as a @@ -262,14 +272,15 @@ module GRPC # @param req [Object] the request sent to the server # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Numeric] the max completion time in seconds + # @param timeout [Numeric] the max completion time in seconds # @param return_op [true|false]return an Operation if true # @param blk [Block] when provided, is executed for each response # @return [Enumerator|Operation|nil] as discussed above - def server_streamer(method, req, marshal, unmarshal, deadline = nil, + def server_streamer(method, req, marshal, unmarshal, timeout = nil, return_op: false, **kw, &blk) - c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + c = new_active_call(method, marshal, unmarshal, timeout) + kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.server_streamer(req, **md, &blk) unless return_op # return the operation view of the active_call; define #execute @@ -367,14 +378,15 @@ module GRPC # @param requests [Object] an Enumerable of requests to send # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Numeric] (optional) the max completion time in seconds + # @param timeout [Numeric] (optional) the max completion time in seconds # @param blk [Block] when provided, is executed for each response # @param return_op [true|false] return an Operation if true # @return [Enumerator|nil|Operation] as discussed above - def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil, + def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil, return_op: false, **kw, &blk) - c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + c = new_active_call(method, marshal, unmarshal, timeout) + kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.bidi_streamer(requests, **md, &blk) unless return_op # return the operation view of the active_call; define #execute @@ -390,15 +402,14 @@ module GRPC # Creates a new active stub # - # @param ch [GRPC::Channel] the channel used to create the stub. + # @param method [string] the method being called. # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [TimeConst] - def new_active_call(ch, marshal, unmarshal, deadline = nil) - absolute_deadline = Core::TimeConsts.from_relative_time(deadline) - call = @ch.create_call(ch, @host, absolute_deadline) - ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline, - started: false) + # @param timeout [TimeConst] + def new_active_call(method, marshal, unmarshal, timeout = nil) + deadline = from_relative_time(timeout.nil? ? @timeout : timeout) + call = @ch.create_call(@queue, method, @host, deadline) + ActiveCall.new(call, @queue, marshal, unmarshal, deadline, started: false) end end end diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 2cb3d2eebf..10211ae239 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -80,26 +80,21 @@ module GRPC else # is a bidi_stream active_call.run_server_bidi(mth) end - send_status(active_call, OK, 'OK') - active_call.finished + send_status(active_call, OK, 'OK', **active_call.output_metadata) rescue BadStatus => e - # this is raised by handlers that want GRPC to send an application - # error code and detail message. + # this is raised by handlers that want GRPC to send an application error + # code and detail message and some additional app-specific metadata. logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}") - send_status(active_call, e.code, e.details) + send_status(active_call, e.code, e.details, **e.metadata) rescue Core::CallError => e # This is raised by GRPC internals but should rarely, if ever happen. # Log it, but don't notify the other endpoint.. logger.warn("failed call: #{active_call}\n#{e}") - rescue OutOfTime + rescue Core::OutOfTime # This is raised when active_call#method.call exceeeds the deadline # event. Send a status of deadline exceeded logger.warn("late call: #{active_call}") send_status(active_call, DEADLINE_EXCEEDED, 'late') - rescue Core::EventError => e - # This is raised by GRPC internals but should rarely, if ever happen. - # Log it, but don't notify the other endpoint.. - logger.warn("failed call: #{active_call}\n#{e}") rescue StandardError => e # This will usuaally be an unhandled error in the handling code. # Send back a UNKNOWN status to the client @@ -140,9 +135,9 @@ module GRPC "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}" end - def send_status(active_client, code, details) + def send_status(active_client, code, details, **kw) details = 'Not sure why' if details.nil? - active_client.send_status(code, details) + active_client.send_status(code, details, code == OK, **kw) rescue StandardError => e logger.warn("Could not send status #{code}:#{details}") logger.warn(e) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 35e84023be..3375fcf20a 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -31,14 +31,142 @@ require 'grpc/grpc' require 'grpc/generic/active_call' require 'grpc/generic/service' require 'thread' -require 'xray/thread_dump_signal_handler' + +# A global that contains signals the gRPC servers should respond to. +$grpc_signals = [] # GRPC contains the General RPC module. module GRPC + # Handles the signals in $grpc_signals. + # + # @return false if the server should exit, true if not. + def handle_signals + loop do + sig = $grpc_signals.shift + case sig + when 'INT' + return false + when 'TERM' + return false + end + end + true + end + module_function :handle_signals + + # Sets up a signal handler that adds signals to the signal handling global. + # + # Signal handlers should do as little as humanly possible. + # Here, they just add themselves to $grpc_signals + # + # RpcServer (and later other parts of gRPC) monitors the signals + # $grpc_signals in its own non-signal context. + def trap_signals + %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } + end + module_function :trap_signals + + # Pool is a simple thread pool. + class Pool + # Default keep alive period is 1s + DEFAULT_KEEP_ALIVE = 1 + + def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) + fail 'pool size must be positive' unless size > 0 + @jobs = Queue.new + @size = size + @stopped = false + @stop_mutex = Mutex.new + @stop_cond = ConditionVariable.new + @workers = [] + @keep_alive = keep_alive + end + + # Returns the number of jobs waiting + def jobs_waiting + @jobs.size + end + + # Runs the given block on the queue with the provided args. + # + # @param args the args passed blk when it is called + # @param blk the block to call + def schedule(*args, &blk) + fail 'already stopped' if @stopped + return if blk.nil? + logger.info('schedule another job') + @jobs << [blk, args] + end + + # Starts running the jobs in the thread pool. + def start + fail 'already stopped' if @stopped + until @workers.size == @size.to_i + next_thread = Thread.new do + catch(:exit) do # allows { throw :exit } to kill a thread + loop_execute_jobs + end + remove_current_thread + end + @workers << next_thread + end + end + + # Stops the jobs in the pool + def stop + logger.info('stopping, will wait for all the workers to exit') + @workers.size.times { schedule { throw :exit } } + @stopped = true + @stop_mutex.synchronize do # wait @keep_alive for works to stop + @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 + end + forcibly_stop_workers + logger.info('stopped, all workers are shutdown') + end + + protected + + # Forcibly shutdown any threads that are still alive. + def forcibly_stop_workers + return unless @workers.size > 0 + logger.info("forcibly terminating #{@workers.size} worker(s)") + @workers.each do |t| + next unless t.alive? + begin + t.exit + rescue StandardError => e + logger.warn('error while terminating a worker') + logger.warn(e) + end + end + end + + # removes the threads from workers, and signal when all the + # threads are complete. + def remove_current_thread + @stop_mutex.synchronize do + @workers.delete(Thread.current) + @stop_cond.signal if @workers.size == 0 + end + end + + def loop_execute_jobs + loop do + begin + blk, args = @jobs.pop + blk.call(*args) + rescue StandardError => e + logger.warn('Error in worker thread') + logger.warn(e) + end + end + end + end + # RpcServer hosts a number of services and makes them available on the # network. class RpcServer - include Core::CompletionType + include Core::CallOps include Core::TimeConsts extend ::Forwardable @@ -50,6 +178,38 @@ module GRPC # Default max_waiting_requests size is 20 DEFAULT_MAX_WAITING_REQUESTS = 20 + # Default poll period is 1s + DEFAULT_POLL_PERIOD = 1 + + # Signal check period is 0.25s + SIGNAL_CHECK_PERIOD = 0.25 + + # setup_cq is used by #initialize to constuct a Core::CompletionQueue from + # its arguments. + def self.setup_cq(alt_cq) + return Core::CompletionQueue.new if alt_cq.nil? + unless alt_cq.is_a? Core::CompletionQueue + fail(TypeError, '!CompletionQueue') + end + alt_cq + end + + # setup_srv is used by #initialize to constuct a Core::Server from its + # arguments. + def self.setup_srv(alt_srv, cq, **kw) + return Core::Server.new(cq, kw) if alt_srv.nil? + fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server + alt_srv + end + + # setup_connect_md_proc is used by #initialize to validate the + # connect_md_proc. + def self.setup_connect_md_proc(a_proc) + return nil if a_proc.nil? + fail(TypeError, '!Proc') unless a_proc.is_a? Proc + a_proc + end + # Creates a new RpcServer. # # The RPC server is configured using keyword arguments. @@ -77,30 +237,21 @@ module GRPC # * max_waiting_requests: the maximum number of requests that are not # being handled to allow. When this limit is exceeded, the server responds # with not available to new requests + # + # * connect_md_proc: + # when non-nil is a proc for determining metadata to to send back the client + # on receiving an invocation req. The proc signature is: + # {key: val, ..} func(method_name, {key: val, ...}) def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, - poll_period:INFINITE_FUTURE, + poll_period:DEFAULT_POLL_PERIOD, completion_queue_override:nil, server_override:nil, + connect_md_proc:nil, **kw) - if completion_queue_override.nil? - cq = Core::CompletionQueue.new - else - cq = completion_queue_override - unless cq.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') - end - end - @cq = cq - - if server_override.nil? - srv = Core::Server.new(@cq, kw) - else - srv = server_override - fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server - end - @server = srv - + @cq = RpcServer.setup_cq(completion_queue_override) + @server = RpcServer.setup_srv(server_override, @cq, **kw) + @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @pool_size = pool_size @max_waiting_requests = max_waiting_requests @poll_period = poll_period @@ -117,6 +268,13 @@ module GRPC return unless @running @stopped = true @pool.stop + + # TODO: uncomment this: + # + # This segfaults in the c layer, so its commented out for now. Shutdown + # still occurs, but the c layer has to do the cleanup. + # + # @server.close end # determines if the server is currently running @@ -139,7 +297,21 @@ module GRPC running? end - # determines if the server is currently stopped + # Runs the server in its own thread, then waits for signal INT or TERM on + # the current thread to terminate it. + def run_till_terminated + GRPC.trap_signals + t = Thread.new { run } + wait_till_running + loop do + sleep SIGNAL_CHECK_PERIOD + break unless GRPC.handle_signals + end + stop + t.join + end + + # Determines if the server is currently stopped def stopped? @stopped ||= false end @@ -202,154 +374,71 @@ module GRPC end @pool.start @server.start - server_tag = Object.new - until stopped? - @server.request_call(server_tag) - ev = @cq.pluck(server_tag, @poll_period) - next if ev.nil? - if ev.type != SERVER_RPC_NEW - logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}") - ev.close - next - end - c = new_active_server_call(ev.call, ev.result) - unless c.nil? - mth = ev.result.method.to_sym - ev.close - @pool.schedule(c) do |call| - rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) - end - end - end + loop_handle_server_calls @running = false end - def new_active_server_call(call, new_server_rpc) - # Accept the call. This is necessary even if a status is to be sent - # back immediately - finished_tag = Object.new - call_queue = Core::CompletionQueue.new - call.metadata = new_server_rpc.metadata # store the metadata - call.server_accept(call_queue, finished_tag) - call.server_end_initial_metadata - - # Send UNAVAILABLE if there are too many unprocessed jobs + # Sends UNAVAILABLE if there are too many unprocessed jobs + def available?(an_rpc) jobs_count, max = @pool.jobs_waiting, @max_waiting_requests logger.info("waiting: #{jobs_count}, max: #{max}") - if @pool.jobs_waiting > @max_waiting_requests - logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}") - noop = proc { |x| x } - c = ActiveCall.new(call, call_queue, noop, noop, - new_server_rpc.deadline, - finished_tag: finished_tag) - c.send_status(StatusCodes::UNAVAILABLE, '') - return nil - end - - # Send NOT_FOUND if the method does not exist - mth = new_server_rpc.method.to_sym - unless rpc_descs.key?(mth) - logger.warn("NOT_FOUND: #{new_server_rpc}") - noop = proc { |x| x } - c = ActiveCall.new(call, call_queue, noop, noop, - new_server_rpc.deadline, - finished_tag: finished_tag) - c.send_status(StatusCodes::NOT_FOUND, '') - return nil - end - - # Create the ActiveCall - rpc_desc = rpc_descs[mth] - logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})") - ActiveCall.new(call, call_queue, - rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), - new_server_rpc.deadline, finished_tag: finished_tag) + return an_rpc if @pool.jobs_waiting <= @max_waiting_requests + logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") + noop = proc { |x| x } + c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) + c.send_status(StatusCodes::UNAVAILABLE, '') + nil end - # Pool is a simple thread pool for running server requests. - class Pool - def initialize(size) - fail 'pool size must be positive' unless size > 0 - @jobs = Queue.new - @size = size - @stopped = false - @stop_mutex = Mutex.new - @stop_cond = ConditionVariable.new - @workers = [] - end - - # Returns the number of jobs waiting - def jobs_waiting - @jobs.size - end - - # Runs the given block on the queue with the provided args. - # - # @param args the args passed blk when it is called - # @param blk the block to call - def schedule(*args, &blk) - fail 'already stopped' if @stopped - return if blk.nil? - logger.info('schedule another job') - @jobs << [blk, args] - end + # Sends NOT_FOUND if the method can't be found + def found?(an_rpc) + mth = an_rpc.method.to_sym + return an_rpc if rpc_descs.key?(mth) + logger.warn("NOT_FOUND: #{an_rpc}") + noop = proc { |x| x } + c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) + c.send_status(StatusCodes::NOT_FOUND, '') + nil + end - # Starts running the jobs in the thread pool. - def start - fail 'already stopped' if @stopped - until @workers.size == @size.to_i - next_thread = Thread.new do - catch(:exit) do # allows { throw :exit } to kill a thread - loop do - begin - blk, args = @jobs.pop - blk.call(*args) - rescue StandardError => e - logger.warn('Error in worker thread') - logger.warn(e) - end - end - end - - # removes the threads from workers, and signal when all the - # threads are complete. - @stop_mutex.synchronize do - @workers.delete(Thread.current) - @stop_cond.signal if @workers.size == 0 - end + # handles calls to the server + def loop_handle_server_calls + fail 'not running' unless @running + request_call_tag = Object.new + until stopped? + deadline = from_relative_time(@poll_period) + an_rpc = @server.request_call(@cq, request_call_tag, deadline) + c = new_active_server_call(an_rpc) + unless c.nil? + mth = an_rpc.method.to_sym + @pool.schedule(c) do |call| + rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) end - @workers << next_thread end end + end - # Stops the jobs in the pool - def stop - logger.info('stopping, will wait for all the workers to exit') - @workers.size.times { schedule { throw :exit } } - @stopped = true - - # TODO: allow configuration of the keepalive period - keep_alive = 5 - @stop_mutex.synchronize do - @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0 - end - - # Forcibly shutdown any threads that are still alive. - if @workers.size > 0 - logger.warn("forcibly terminating #{@workers.size} worker(s)") - @workers.each do |t| - next unless t.alive? - begin - t.exit - rescue StandardError => e - logger.warn('error while terminating a worker') - logger.warn(e) - end - end - end + def new_active_server_call(an_rpc) + return nil if an_rpc.nil? || an_rpc.call.nil? - logger.info('stopped, all workers are shutdown') + # allow the metadata to be accessed from the call + handle_call_tag = Object.new + an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers + connect_md = nil + unless @connect_md_proc.nil? + connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) end + an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE, + SEND_INITIAL_METADATA => connect_md) + return nil unless available?(an_rpc) + return nil unless found?(an_rpc) + + # Create the ActiveCall + logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") + rpc_desc = rpc_descs[an_rpc.method.to_sym] + ActiveCall.new(an_rpc.call, @cq, + rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), + an_rpc.deadline) end protected @@ -362,11 +451,9 @@ module GRPC @rpc_handlers ||= {} end - private - def assert_valid_service_class(cls) unless cls.include?(GenericService) - fail "#{cls} should 'include GenericService'" + fail "#{cls} must 'include GenericService'" end if cls.rpc_descs.size == 0 fail "#{cls} should specify some rpc descriptions" @@ -376,21 +463,17 @@ module GRPC def add_rpc_descs_for(service) cls = service.is_a?(Class) ? service : service.class - specs = rpc_descs - handlers = rpc_handlers + specs, handlers = rpc_descs, rpc_handlers cls.rpc_descs.each_pair do |name, spec| route = "/#{cls.service_name}/#{name}".to_sym - if specs.key? route - fail "Cannot add rpc #{route} from #{spec}, already registered" + fail "already registered: rpc #{route} from #{spec}" if specs.key? route + specs[route] = spec + if service.is_a?(Class) + handlers[route] = cls.new.method(name.to_s.underscore.to_sym) else - specs[route] = spec - if service.is_a?(Class) - handlers[route] = cls.new.method(name.to_s.underscore.to_sym) - else - handlers[route] = service.method(name.to_s.underscore.to_sym) - end - logger.info("handling #{route} with #{handlers[route]}") + handlers[route] = service.method(name.to_s.underscore.to_sym) end + logger.info("handling #{route} with #{handlers[route]}") end end end diff --git a/src/ruby/lib/grpc/core/event.rb b/src/ruby/lib/grpc/notifier.rb index 194aa8ecac..caa18bbed6 100644 --- a/src/ruby/lib/grpc/core/event.rb +++ b/src/ruby/lib/grpc/notifier.rb @@ -27,17 +27,33 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -require 'grpc' - # GRPC contains the General RPC module. module GRPC - module Core - # Event is a class defined in the c extension - # - # Here, we add an inspect method. - class Event - def inspect - "<#{self.class}: type:#{type}, tag:#{tag} result:#{result}>" + # Notifier is useful high-level synchronization primitive. + class Notifier + attr_reader :payload, :notified + alias_method :notified?, :notified + + def initialize + @mutex = Mutex.new + @cvar = ConditionVariable.new + @notified = false + @payload = nil + end + + def wait + @mutex.synchronize do + @cvar.wait(@mutex) until notified? + end + end + + def notify(payload) + @mutex.synchronize do + return Error.new('already notified') if notified? + @payload = payload + @notified = true + @cvar.signal + return nil end end end diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb index 513a53724f..072fb9b1aa 100644 --- a/src/ruby/lib/grpc/version.rb +++ b/src/ruby/lib/grpc/version.rb @@ -29,5 +29,5 @@ # GRPC contains the General RPC module. module GRPC - VERSION = '0.5.0' + VERSION = '0.6.1' end diff --git a/src/ruby/spec/alloc_spec.rb b/src/ruby/spec/alloc_spec.rb deleted file mode 100644 index 88e7e2b3e7..0000000000 --- a/src/ruby/spec/alloc_spec.rb +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -require 'grpc' - -describe 'Wrapped classes where .new cannot create an instance' do - describe GRPC::Core::Event do - it 'should fail .new fail with a runtime error' do - expect { GRPC::Core::Event.new }.to raise_error(TypeError) - end - end - - describe GRPC::Core::Call do - it 'should fail .new fail with a runtime error' do - expect { GRPC::Core::Event.new }.to raise_error(TypeError) - end - end -end diff --git a/src/ruby/spec/byte_buffer_spec.rb b/src/ruby/spec/byte_buffer_spec.rb deleted file mode 100644 index e1833ebb3a..0000000000 --- a/src/ruby/spec/byte_buffer_spec.rb +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -require 'grpc' - -describe GRPC::Core::ByteBuffer do - describe '#new' do - it 'is constructed from a string' do - expect { GRPC::Core::ByteBuffer.new('#new') }.not_to raise_error - end - - it 'can be constructed from the empty string' do - expect { GRPC::Core::ByteBuffer.new('') }.not_to raise_error - end - - it 'cannot be constructed from nil' do - expect { GRPC::Core::ByteBuffer.new(nil) }.to raise_error TypeError - end - - it 'cannot be constructed from non-strings' do - [1, Object.new, :a_symbol].each do |x| - expect { GRPC::Core::ByteBuffer.new(x) }.to raise_error TypeError - end - end - end - - describe '#to_s' do - it 'is the string value the ByteBuffer was constructed with' do - expect(GRPC::Core::ByteBuffer.new('#to_s').to_s).to eq('#to_s') - end - end - - describe '#dup' do - it 'makes an instance whose #to_s is the original string value' do - bb = GRPC::Core::ByteBuffer.new('#dup') - a_copy = bb.dup - expect(a_copy.to_s).to eq('#dup') - expect(a_copy.dup.to_s).to eq('#dup') - end - end -end diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb index 2617564571..4977c10a7e 100644 --- a/src/ruby/spec/call_spec.rb +++ b/src/ruby/spec/call_spec.rb @@ -66,51 +66,34 @@ describe GRPC::Core::RpcErrors do end end -describe GRPC::Core::Call do +describe GRPC::Core::CallOps do before(:each) do - @tag = Object.new - @client_queue = GRPC::Core::CompletionQueue.new - fake_host = 'localhost:10101' - @ch = GRPC::Core::Channel.new(fake_host, nil) - end - - describe '#start_read' do - xit 'should fail if called immediately' do - blk = proc { make_test_call.start_read(@tag) } - expect(&blk).to raise_error GRPC::Core::CallError - end - end - - describe '#start_write' do - xit 'should fail if called immediately' do - bytes = GRPC::Core::ByteBuffer.new('test string') - blk = proc { make_test_call.start_write(bytes, @tag) } - expect(&blk).to raise_error GRPC::Core::CallError - end + @known_types = { + SEND_INITIAL_METADATA: 0, + SEND_MESSAGE: 1, + SEND_CLOSE_FROM_CLIENT: 2, + SEND_STATUS_FROM_SERVER: 3, + RECV_INITIAL_METADATA: 4, + RECV_MESSAGE: 5, + RECV_STATUS_ON_CLIENT: 6, + RECV_CLOSE_ON_SERVER: 7 + } end - describe '#start_write_status' do - xit 'should fail if called immediately' do - blk = proc { make_test_call.start_write_status(153, 'x', @tag) } - expect(&blk).to raise_error GRPC::Core::CallError - end + it 'should have symbols for all the known operation types' do + m = GRPC::Core::CallOps + syms_and_codes = m.constants.collect { |c| [c, m.const_get(c)] } + expect(Hash[syms_and_codes]).to eq(@known_types) end +end - describe '#writes_done' do - xit 'should fail if called immediately' do - blk = proc { make_test_call.writes_done(Object.new) } - expect(&blk).to raise_error GRPC::Core::CallError - end - end +describe GRPC::Core::Call do + let(:client_queue) { GRPC::Core::CompletionQueue.new } + let(:test_tag) { Object.new } + let(:fake_host) { 'localhost:10101' } - describe '#add_metadata' do - it 'adds metadata to a call without fail' do - call = make_test_call - n = 37 - one_md = proc { |x| [sprintf('key%d', x), sprintf('value%d', x)] } - metadata = Hash[n.times.collect { |i| one_md.call i }] - expect { call.add_metadata(metadata) }.to_not raise_error - end + before(:each) do + @ch = GRPC::Core::Channel.new(fake_host, nil) end describe '#status' do @@ -154,7 +137,7 @@ describe GRPC::Core::Call do end def make_test_call - @ch.create_call('dummy_method', 'dummy_host', deadline) + @ch.create_call(client_queue, 'dummy_method', 'dummy_host', deadline) end def deadline diff --git a/src/ruby/spec/channel_spec.rb b/src/ruby/spec/channel_spec.rb index af73294abe..d471ff5db6 100644 --- a/src/ruby/spec/channel_spec.rb +++ b/src/ruby/spec/channel_spec.rb @@ -36,16 +36,13 @@ def load_test_certs end describe GRPC::Core::Channel do - FAKE_HOST = 'localhost:0' + let(:fake_host) { 'localhost:0' } + let(:cq) { GRPC::Core::CompletionQueue.new } def create_test_cert GRPC::Core::Credentials.new(load_test_certs[0]) end - before(:each) do - @cq = GRPC::Core::CompletionQueue.new - end - shared_examples '#new' do it 'take a host name without channel args' do expect { GRPC::Core::Channel.new('dummy_host', nil) }.not_to raise_error @@ -61,7 +58,7 @@ describe GRPC::Core::Channel do it 'does not take a hash with bad values as channel args' do blk = construct_with_args(symbol: Object.new) expect(&blk).to raise_error TypeError - blk = construct_with_args('1' => Hash.new) + blk = construct_with_args('1' => {}) expect(&blk).to raise_error TypeError end @@ -115,25 +112,23 @@ describe GRPC::Core::Channel do describe '#create_call' do it 'creates a call OK' do - host = FAKE_HOST - ch = GRPC::Core::Channel.new(host, nil) + ch = GRPC::Core::Channel.new(fake_host, nil) deadline = Time.now + 5 blk = proc do - ch.create_call('dummy_method', 'dummy_host', deadline) + ch.create_call(cq, 'dummy_method', 'dummy_host', deadline) end expect(&blk).to_not raise_error end it 'raises an error if called on a closed channel' do - host = FAKE_HOST - ch = GRPC::Core::Channel.new(host, nil) + ch = GRPC::Core::Channel.new(fake_host, nil) ch.close deadline = Time.now + 5 blk = proc do - ch.create_call('dummy_method', 'dummy_host', deadline) + ch.create_call(cq, 'dummy_method', 'dummy_host', deadline) end expect(&blk).to raise_error(RuntimeError) end @@ -141,15 +136,13 @@ describe GRPC::Core::Channel do describe '#destroy' do it 'destroys a channel ok' do - host = FAKE_HOST - ch = GRPC::Core::Channel.new(host, nil) + ch = GRPC::Core::Channel.new(fake_host, nil) blk = proc { ch.destroy } expect(&blk).to_not raise_error end it 'can be called more than once without error' do - host = FAKE_HOST - ch = GRPC::Core::Channel.new(host, nil) + ch = GRPC::Core::Channel.new(fake_host, nil) blk = proc { ch.destroy } blk.call expect(&blk).to_not raise_error @@ -164,15 +157,13 @@ describe GRPC::Core::Channel do describe '#close' do it 'closes a channel ok' do - host = FAKE_HOST - ch = GRPC::Core::Channel.new(host, nil) + ch = GRPC::Core::Channel.new(fake_host, nil) blk = proc { ch.close } expect(&blk).to_not raise_error end it 'can be called more than once without error' do - host = FAKE_HOST - ch = GRPC::Core::Channel.new(host, nil) + ch = GRPC::Core::Channel.new(fake_host, nil) blk = proc { ch.close } blk.call expect(&blk).to_not raise_error diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index 49a2d3bb4d..68af79f907 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -30,7 +30,6 @@ require 'grpc' require 'spec_helper' -include GRPC::Core::CompletionType include GRPC::Core def load_test_certs @@ -40,6 +39,8 @@ def load_test_certs end shared_context 'setup: tags' do + let(:sent_message) { 'sent message' } + let(:reply_text) { 'the reply' } before(:example) do @server_finished_tag = Object.new @client_finished_tag = Object.new @@ -52,153 +53,136 @@ shared_context 'setup: tags' do Time.now + 2 end - def expect_next_event_on(queue, type, tag) - ev = queue.pluck(tag, deadline) - if type.nil? - expect(ev).to be_nil - else - expect(ev).to_not be_nil - expect(ev.type).to be(type) - end - ev - end - def server_allows_client_to_proceed - @server.request_call(@server_tag) - ev = @server_queue.pluck(@server_tag, deadline) - expect(ev).not_to be_nil - expect(ev.type).to be(SERVER_RPC_NEW) - server_call = ev.call - server_call.server_accept(@server_queue, @server_finished_tag) - server_call.server_end_initial_metadata + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + expect(recvd_rpc).to_not eq nil + server_call = recvd_rpc.call + ops = { CallOps::SEND_INITIAL_METADATA => {} } + svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline, ops) + expect(svr_batch.send_metadata).to be true server_call end - def server_responds_with(server_call, reply_text) - reply = ByteBuffer.new(reply_text) - server_call.start_read(@server_tag) - ev = @server_queue.pluck(@server_tag, TimeConsts::INFINITE_FUTURE) - expect(ev.type).to be(READ) - server_call.start_write(reply, @server_tag) - ev = @server_queue.pluck(@server_tag, TimeConsts::INFINITE_FUTURE) - expect(ev).not_to be_nil - expect(ev.type).to be(WRITE_ACCEPTED) - end - - def client_sends(call, sent = 'a message') - req = ByteBuffer.new(sent) - call.start_write(req, @tag) - ev = @client_queue.pluck(@tag, TimeConsts::INFINITE_FUTURE) - expect(ev).not_to be_nil - expect(ev.type).to be(WRITE_ACCEPTED) - sent - end - def new_client_call - @ch.create_call('/method', 'foo.test.google.fr', deadline) + @ch.create_call(@client_queue, '/method', 'foo.test.google.fr', deadline) end end shared_examples 'basic GRPC message delivery is OK' do + include GRPC::Core include_context 'setup: tags' - it 'servers receive requests from clients and start responding' do - reply = ByteBuffer.new('the server payload') + it 'servers receive requests from clients and can respond' do call = new_client_call - call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag) - - # check the server rpc new was received - # @server.request_call(@server_tag) - # ev = expect_next_event_on(@server_queue, SERVER_RPC_NEW, @server_tag) - - # accept the call - # server_call = ev.call - # server_call.server_accept(@server_queue, @server_finished_tag) - # server_call.server_end_initial_metadata - server_call = server_allows_client_to_proceed - - # client sends a message - msg = client_sends(call) + client_ops = { + CallOps::SEND_INITIAL_METADATA => {}, + CallOps::SEND_MESSAGE => sent_message + } + batch_result = call.run_batch(@client_queue, @client_tag, deadline, + client_ops) + expect(batch_result.send_metadata).to be true + expect(batch_result.send_message).to be true # confirm the server can read the inbound message - server_call.start_read(@server_tag) - ev = expect_next_event_on(@server_queue, READ, @server_tag) - expect(ev.result.to_s).to eq(msg) - - # the server response - server_call.start_write(reply, @server_tag) - expect_next_event_on(@server_queue, WRITE_ACCEPTED, @server_tag) + server_call = server_allows_client_to_proceed + server_ops = { + CallOps::RECV_MESSAGE => nil + } + svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline, + server_ops) + expect(svr_batch.message).to eq(sent_message) end it 'responses written by servers are received by the client' do call = new_client_call - call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag) - server_call = server_allows_client_to_proceed - client_sends(call) - server_responds_with(server_call, 'server_response') + client_ops = { + CallOps::SEND_INITIAL_METADATA => {}, + CallOps::SEND_MESSAGE => sent_message + } + batch_result = call.run_batch(@client_queue, @client_tag, deadline, + client_ops) + expect(batch_result.send_metadata).to be true + expect(batch_result.send_message).to be true - call.start_read(@tag) - ev = expect_next_event_on(@client_queue, READ, @tag) - expect(ev.result.to_s).to eq('server_response') + # confirm the server can read the inbound message + server_call = server_allows_client_to_proceed + server_ops = { + CallOps::RECV_MESSAGE => nil, + CallOps::SEND_MESSAGE => reply_text + } + svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline, + server_ops) + expect(svr_batch.message).to eq(sent_message) + expect(svr_batch.send_message).to be true end it 'servers can ignore a client write and send a status' do call = new_client_call - call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag) - - # check the server rpc new was received - @server.request_call(@server_tag) - ev = expect_next_event_on(@server_queue, SERVER_RPC_NEW, @server_tag) - expect(ev.tag).to be(@server_tag) - - # accept the call - need to do this to sent status. - server_call = ev.call - server_call.server_accept(@server_queue, @server_finished_tag) - server_call.server_end_initial_metadata - server_call.start_write_status(StatusCodes::NOT_FOUND, 'not found', - @server_tag) - - # Client sends some data - client_sends(call) - - # client gets an empty response for the read, preceeded by some metadata. - call.start_read(@tag) - expect_next_event_on(@client_queue, CLIENT_METADATA_READ, - @client_metadata_tag) - ev = expect_next_event_on(@client_queue, READ, @tag) - expect(ev.tag).to be(@tag) - expect(ev.result.to_s).to eq('') - - # finally, after client sends writes_done, they get the finished. - call.writes_done(@tag) - expect_next_event_on(@client_queue, FINISH_ACCEPTED, @tag) - ev = expect_next_event_on(@client_queue, FINISHED, @client_finished_tag) - expect(ev.result.code).to eq(StatusCodes::NOT_FOUND) + client_ops = { + CallOps::SEND_INITIAL_METADATA => {}, + CallOps::SEND_MESSAGE => sent_message + } + batch_result = call.run_batch(@client_queue, @client_tag, deadline, + client_ops) + expect(batch_result.send_metadata).to be true + expect(batch_result.send_message).to be true + + # confirm the server can read the inbound message + the_status = Struct::Status.new(StatusCodes::OK, 'OK') + server_call = server_allows_client_to_proceed + server_ops = { + CallOps::SEND_STATUS_FROM_SERVER => the_status + } + svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline, + server_ops) + expect(svr_batch.message).to eq nil + expect(svr_batch.send_status).to be true end it 'completes calls by sending status to client and server' do call = new_client_call - call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag) + client_ops = { + CallOps::SEND_INITIAL_METADATA => {}, + CallOps::SEND_MESSAGE => sent_message + } + batch_result = call.run_batch(@client_queue, @client_tag, deadline, + client_ops) + expect(batch_result.send_metadata).to be true + expect(batch_result.send_message).to be true + + # confirm the server can read the inbound message and respond + the_status = Struct::Status.new(StatusCodes::OK, 'OK', {}) server_call = server_allows_client_to_proceed - client_sends(call) - server_responds_with(server_call, 'server_response') - server_call.start_write_status(10_101, 'status code is 10101', @server_tag) - - # first the client says writes are done - call.start_read(@tag) - expect_next_event_on(@client_queue, READ, @tag) - call.writes_done(@tag) - - # but nothing happens until the server sends a status - expect_next_event_on(@server_queue, FINISH_ACCEPTED, @server_tag) - ev = expect_next_event_on(@server_queue, FINISHED, @server_finished_tag) - expect(ev.result).to be_a(Struct::Status) - - # client gets FINISHED - expect_next_event_on(@client_queue, FINISH_ACCEPTED, @tag) - ev = expect_next_event_on(@client_queue, FINISHED, @client_finished_tag) - expect(ev.result.details).to eq('status code is 10101') - expect(ev.result.code).to eq(10_101) + server_ops = { + CallOps::RECV_MESSAGE => nil, + CallOps::SEND_MESSAGE => reply_text, + CallOps::SEND_STATUS_FROM_SERVER => the_status + } + svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline, + server_ops) + expect(svr_batch.message).to eq sent_message + expect(svr_batch.send_status).to be true + expect(svr_batch.send_message).to be true + + # confirm the client can receive the server response and status. + client_ops = { + CallOps::SEND_CLOSE_FROM_CLIENT => nil, + CallOps::RECV_MESSAGE => nil, + CallOps::RECV_STATUS_ON_CLIENT => nil + } + batch_result = call.run_batch(@client_queue, @client_tag, deadline, + client_ops) + expect(batch_result.send_close).to be true + expect(batch_result.message).to eq reply_text + expect(batch_result.status).to eq the_status + + # confirm the server can receive the client close. + server_ops = { + CallOps::RECV_CLOSE_ON_SERVER => nil + } + svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline, + server_ops) + expect(svr_batch.send_close).to be true end end @@ -208,11 +192,11 @@ shared_examples 'GRPC metadata delivery works OK' do describe 'from client => server' do before(:example) do n = 7 # arbitrary number of metadata - diff_keys_fn = proc { |i| [sprintf('k%d', i), sprintf('v%d', i)] } + diff_keys_fn = proc { |i| [format('k%d', i), format('v%d', i)] } diff_keys = Hash[n.times.collect { |x| diff_keys_fn.call x }] - null_vals_fn = proc { |i| [sprintf('k%d', i), sprintf('v\0%d', i)] } + null_vals_fn = proc { |i| [format('k%d', i), format('v\0%d', i)] } null_vals = Hash[n.times.collect { |x| null_vals_fn.call x }] - same_keys_fn = proc { |i| [sprintf('k%d', i), [sprintf('v%d', i)] * n] } + same_keys_fn = proc { |i| [format('k%d', i), [format('v%d', i)] * n] } same_keys = Hash[n.times.collect { |x| same_keys_fn.call x }] symbol_key = { a_key: 'a val' } @valid_metadata = [diff_keys, same_keys, null_vals, symbol_key] @@ -224,25 +208,33 @@ shared_examples 'GRPC metadata delivery works OK' do it 'raises an exception if a metadata key is invalid' do @bad_keys.each do |md| call = new_client_call - expect { call.add_metadata(md) }.to raise_error + client_ops = { + CallOps::SEND_INITIAL_METADATA => md + } + blk = proc do + call.run_batch(@client_queue, @client_tag, deadline, + client_ops) + end + expect(&blk).to raise_error end end it 'sends all the metadata pairs when keys and values are valid' do @valid_metadata.each do |md| call = new_client_call - call.add_metadata(md) - - # Client begins a call OK - call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag) - - # ... server has all metadata available even though the client did not - # send a write - @server.request_call(@server_tag) - ev = expect_next_event_on(@server_queue, SERVER_RPC_NEW, @server_tag) + client_ops = { + CallOps::SEND_INITIAL_METADATA => md + } + batch_result = call.run_batch(@client_queue, @client_tag, deadline, + client_ops) + expect(batch_result.send_metadata).to be true + + # confirm the server can receive the client metadata + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + expect(recvd_rpc).to_not eq nil + recvd_md = recvd_rpc.metadata replace_symbols = Hash[md.each_pair.collect { |x, y| [x.to_s, y] }] - result = ev.result.metadata - expect(result.merge(replace_symbols)).to eq(result) + expect(recvd_md).to eq(recvd_md.merge(replace_symbols)) end end end @@ -250,11 +242,11 @@ shared_examples 'GRPC metadata delivery works OK' do describe 'from server => client' do before(:example) do n = 7 # arbitrary number of metadata - diff_keys_fn = proc { |i| [sprintf('k%d', i), sprintf('v%d', i)] } + diff_keys_fn = proc { |i| [format('k%d', i), format('v%d', i)] } diff_keys = Hash[n.times.collect { |x| diff_keys_fn.call x }] - null_vals_fn = proc { |i| [sprintf('k%d', i), sprintf('v\0%d', i)] } + null_vals_fn = proc { |i| [format('k%d', i), format('v\0%d', i)] } null_vals = Hash[n.times.collect { |x| null_vals_fn.call x }] - same_keys_fn = proc { |i| [sprintf('k%d', i), [sprintf('v%d', i)] * n] } + same_keys_fn = proc { |i| [format('k%d', i), [format('v%d', i)] * n] } same_keys = Hash[n.times.collect { |x| same_keys_fn.call x }] symbol_key = { a_key: 'a val' } @valid_metadata = [diff_keys, same_keys, null_vals, symbol_key] @@ -266,55 +258,81 @@ shared_examples 'GRPC metadata delivery works OK' do it 'raises an exception if a metadata key is invalid' do @bad_keys.each do |md| call = new_client_call - call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag) + # client signals that it's done sending metadata to allow server to + # respond + client_ops = { + CallOps::SEND_INITIAL_METADATA => nil + } + call.run_batch(@client_queue, @client_tag, deadline, client_ops) # server gets the invocation - @server.request_call(@server_tag) - ev = expect_next_event_on(@server_queue, SERVER_RPC_NEW, @server_tag) - expect { ev.call.add_metadata(md) }.to raise_error + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + expect(recvd_rpc).to_not eq nil + server_ops = { + CallOps::SEND_INITIAL_METADATA => md + } + blk = proc do + recvd_rpc.call.run_batch(@server_queue, @server_tag, deadline, + server_ops) + end + expect(&blk).to raise_error end end - it 'sends a hash that contains the status when no metadata is added' do + it 'sends an empty hash if no metadata is added' do call = new_client_call - call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag) - - # server gets the invocation - @server.request_call(@server_tag) - ev = expect_next_event_on(@server_queue, SERVER_RPC_NEW, @server_tag) - server_call = ev.call - - # ... server accepts the call without adding metadata - server_call.server_accept(@server_queue, @server_finished_tag) - server_call.server_end_initial_metadata - - # there is the HTTP status metadata, though there should not be any - # TODO: update this with the bug number to be resolved - ev = expect_next_event_on(@client_queue, CLIENT_METADATA_READ, - @client_metadata_tag) - expect(ev.result).to eq({}) + # client signals that it's done sending metadata to allow server to + # respond + client_ops = { + CallOps::SEND_INITIAL_METADATA => nil + } + call.run_batch(@client_queue, @client_tag, deadline, client_ops) + + # server gets the invocation but sends no metadata back + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + expect(recvd_rpc).to_not eq nil + server_call = recvd_rpc.call + server_ops = { + CallOps::SEND_INITIAL_METADATA => nil + } + server_call.run_batch(@server_queue, @server_tag, deadline, server_ops) + + # client receives nothing as expected + client_ops = { + CallOps::RECV_INITIAL_METADATA => nil + } + batch_result = call.run_batch(@client_queue, @client_tag, deadline, + client_ops) + expect(batch_result.metadata).to eq({}) end it 'sends all the pairs when keys and values are valid' do @valid_metadata.each do |md| call = new_client_call - call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag) - - # server gets the invocation - @server.request_call(@server_tag) - ev = expect_next_event_on(@server_queue, SERVER_RPC_NEW, @server_tag) - server_call = ev.call - - # ... server adds metadata and accepts the call - server_call.add_metadata(md) - server_call.server_accept(@server_queue, @server_finished_tag) - server_call.server_end_initial_metadata - - # Now the client can read the metadata - ev = expect_next_event_on(@client_queue, CLIENT_METADATA_READ, - @client_metadata_tag) + # client signals that it's done sending metadata to allow server to + # respond + client_ops = { + CallOps::SEND_INITIAL_METADATA => nil + } + call.run_batch(@client_queue, @client_tag, deadline, client_ops) + + # server gets the invocation but sends no metadata back + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + expect(recvd_rpc).to_not eq nil + server_call = recvd_rpc.call + server_ops = { + CallOps::SEND_INITIAL_METADATA => md + } + server_call.run_batch(@server_queue, @server_tag, deadline, server_ops) + + # client receives nothing as expected + client_ops = { + CallOps::RECV_INITIAL_METADATA => nil + } + batch_result = call.run_batch(@client_queue, @client_tag, deadline, + client_ops) replace_symbols = Hash[md.each_pair.collect { |x, y| [x.to_s, y] }] - expect(ev.result).to eq(replace_symbols) + expect(batch_result.metadata).to eq(replace_symbols) end end end diff --git a/src/ruby/spec/credentials_spec.rb b/src/ruby/spec/credentials_spec.rb index fc97d11a87..8e72e85d54 100644 --- a/src/ruby/spec/credentials_spec.rb +++ b/src/ruby/spec/credentials_spec.rb @@ -61,11 +61,11 @@ describe Credentials do end describe '#compose' do - it 'can be completed OK' do + it 'cannot be completed OK with 2 SSL creds' do certs = load_test_certs cred1 = Credentials.new(*certs) cred2 = Credentials.new(*certs) - expect { cred1.compose(cred2) }.to_not raise_error + expect { cred1.compose(cred2) }.to raise_error end end end diff --git a/src/ruby/spec/event_spec.rb b/src/ruby/spec/event_spec.rb deleted file mode 100644 index 7d92fcd792..0000000000 --- a/src/ruby/spec/event_spec.rb +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -require 'grpc' - -describe GRPC::Core::CompletionType do - before(:each) do - @known_types = { - QUEUE_SHUTDOWN: 0, - OP_COMPLETE: 1, - READ: 2, - WRITE_ACCEPTED: 3, - FINISH_ACCEPTED: 4, - CLIENT_METADATA_READ: 5, - FINISHED: 6, - SERVER_RPC_NEW: 7, - SERVER_SHUTDOWN: 8, - RESERVED: 9 - } - end - - it 'should have all the known types' do - mod = GRPC::Core::CompletionType - blk = proc { Hash[mod.constants.collect { |c| [c, mod.const_get(c)] }] } - expect(blk.call).to eq(@known_types) - end -end diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 96e07cacb4..575871afb1 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -34,12 +34,11 @@ include GRPC::Core::StatusCodes describe GRPC::ActiveCall do ActiveCall = GRPC::ActiveCall Call = GRPC::Core::Call - CompletionType = GRPC::Core::CompletionType + CallOps = GRPC::Core::CallOps before(:each) do @pass_through = proc { |x| x } @server_tag = Object.new - @server_done_tag = Object.new @tag = Object.new @client_queue = GRPC::Core::CompletionQueue.new @@ -48,7 +47,7 @@ describe GRPC::ActiveCall do @server = GRPC::Core::Server.new(@server_queue, nil) server_port = @server.add_http2_port(host) @server.start - @ch = GRPC::Core::Channel.new("localhost:#{server_port}", nil) + @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil) end after(:each) do @@ -58,12 +57,10 @@ describe GRPC::ActiveCall do describe 'restricted view methods' do before(:each) do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) @client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) end describe '#multi_req_view' do @@ -90,48 +87,45 @@ describe GRPC::ActiveCall do describe '#remote_send' do it 'allows a client to send a payload to the server' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) @client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) msg = 'message is a string' @client_call.remote_send(msg) # check that server rpc new was received - @server.request_call(@server_tag) - ev = @server_queue.next(deadline) - expect(ev.type).to be(CompletionType::SERVER_RPC_NEW) - expect(ev.call).to be_a(Call) - expect(ev.tag).to be(@server_tag) + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + expect(recvd_rpc).to_not eq nil + recvd_call = recvd_rpc.call # Accept the call, and verify that the server reads the response ok. - ev.call.server_accept(@client_queue, @server_tag) - ev.call.server_end_initial_metadata - server_call = ActiveCall.new(ev.call, @client_queue, @pass_through, + server_ops = { + CallOps::SEND_INITIAL_METADATA => {} + } + recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops) + server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through, @pass_through, deadline) expect(server_call.remote_read).to eq(msg) end it 'marshals the payload using the marshal func' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + ActiveCall.client_invoke(call, @client_queue, deadline) marshal = proc { |x| 'marshalled:' + x } client_call = ActiveCall.new(call, @client_queue, marshal, - @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + @pass_through, deadline) msg = 'message is a string' client_call.remote_send(msg) # confirm that the message was marshalled - @server.request_call(@server_tag) - ev = @server_queue.next(deadline) - ev.call.server_accept(@client_queue, @server_tag) - ev.call.server_end_initial_metadata - server_call = ActiveCall.new(ev.call, @client_queue, @pass_through, + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + recvd_call = recvd_rpc.call + server_ops = { + CallOps::SEND_INITIAL_METADATA => nil + } + recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops) + server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through, @pass_through, deadline) expect(server_call.remote_read).to eq('marshalled:' + msg) end @@ -142,23 +136,22 @@ describe GRPC::ActiveCall do call = make_test_call ActiveCall.client_invoke(call, @client_queue, deadline, k1: 'v1', k2: 'v2') - @server.request_call(@server_tag) - ev = @server_queue.next(deadline) - expect(ev).to_not be_nil - expect(ev.result.metadata['k1']).to eq('v1') - expect(ev.result.metadata['k2']).to eq('v2') + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + recvd_call = recvd_rpc.call + expect(recvd_call).to_not be_nil + expect(recvd_rpc.metadata).to_not be_nil + expect(recvd_rpc.metadata['k1']).to eq('v1') + expect(recvd_rpc.metadata['k2']).to eq('v2') end end describe '#remote_read' do it 'reads the response sent by a server' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) server_call = expect_server_to_receive(msg) @@ -168,12 +161,10 @@ describe GRPC::ActiveCall do it 'saves no metadata when the server adds no metadata' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) server_call = expect_server_to_receive(msg) @@ -185,12 +176,10 @@ describe GRPC::ActiveCall do it 'saves metadata add by the server' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2') @@ -203,12 +192,10 @@ describe GRPC::ActiveCall do it 'get a nil msg before a status when an OK status is sent' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) client_call.writes_done(false) @@ -222,13 +209,11 @@ describe GRPC::ActiveCall do it 'unmarshals the response using the unmarshal func' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) unmarshal = proc { |x| 'unmarshalled:' + x } client_call = ActiveCall.new(call, @client_queue, @pass_through, unmarshal, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) # confirm the client receives the unmarshalled message msg = 'message is a string' @@ -249,13 +234,11 @@ describe GRPC::ActiveCall do it 'the returns an enumerator that can read n responses' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) - msg = 'message is 4a string' + metadata_tag: md_tag) + msg = 'message is a string' reply = 'server_response' client_call.remote_send(msg) server_call = expect_server_to_receive(msg) @@ -269,12 +252,10 @@ describe GRPC::ActiveCall do it 'the returns an enumerator that stops after an OK Status' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - read_metadata_tag: meta_tag, - finished_tag: done_tag) + metadata_tag: md_tag) msg = 'message is a string' reply = 'server_response' client_call.remote_send(msg) @@ -294,12 +275,10 @@ describe GRPC::ActiveCall do describe '#writes_done' do it 'finishes ok if the server sends a status response' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - finished_tag: done_tag, - read_metadata_tag: meta_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) expect { client_call.writes_done(false) }.to_not raise_error @@ -312,12 +291,10 @@ describe GRPC::ActiveCall do it 'finishes ok if the server sends an early status response' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - read_metadata_tag: meta_tag, - finished_tag: done_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) server_call = expect_server_to_receive(msg) @@ -330,12 +307,10 @@ describe GRPC::ActiveCall do it 'finishes ok if writes_done is true' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, - deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, - read_metadata_tag: meta_tag, - finished_tag: done_tag) + metadata_tag: md_tag) msg = 'message is a string' client_call.remote_send(msg) server_call = expect_server_to_receive(msg) @@ -353,21 +328,20 @@ describe GRPC::ActiveCall do end def expect_server_to_be_invoked(**kw) - @server.request_call(@server_tag) - ev = @server_queue.next(deadline) - ev.call.add_metadata(kw) - ev.call.server_accept(@client_queue, @server_done_tag) - ev.call.server_end_initial_metadata - ActiveCall.new(ev.call, @client_queue, @pass_through, - @pass_through, deadline, - finished_tag: @server_done_tag) + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + expect(recvd_rpc).to_not eq nil + recvd_call = recvd_rpc.call + recvd_call.run_batch(@server_queue, @server_tag, deadline, + CallOps::SEND_INITIAL_METADATA => kw) + ActiveCall.new(recvd_call, @server_queue, @pass_through, + @pass_through, deadline) end def make_test_call - @ch.create_call('dummy_method', 'dummy_host', deadline) + @ch.create_call(@client_queue, '/method', 'a.dummy.host', deadline) end def deadline - Time.now + 1 # in 1 second; arbitrary + Time.now + 2 # in 2 seconds; arbitrary end end diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 0c98fc40d9..98d68ccfbb 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -28,17 +28,13 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require 'grpc' -require 'xray/thread_dump_signal_handler' - -NOOP = proc { |x| x } -FAKE_HOST = 'localhost:0' def wakey_thread(&blk) - awake_mutex, awake_cond = Mutex.new, ConditionVariable.new + n = GRPC::Notifier.new t = Thread.new do - blk.call(awake_mutex, awake_cond) + blk.call(n) end - awake_mutex.synchronize { awake_cond.wait(awake_mutex) } + n.wait t end @@ -50,8 +46,11 @@ end include GRPC::Core::StatusCodes include GRPC::Core::TimeConsts +include GRPC::Core::CallOps describe 'ClientStub' do + let(:noop) { proc { |x| x } } + before(:each) do Thread.abort_on_exception = true @server = nil @@ -66,61 +65,56 @@ describe 'ClientStub' do end describe '#new' do + let(:fake_host) { 'localhost:0' } it 'can be created from a host and args' do - host = FAKE_HOST opts = { a_channel_arg: 'an_arg' } blk = proc do - GRPC::ClientStub.new(host, @cq, **opts) + GRPC::ClientStub.new(fake_host, @cq, **opts) end expect(&blk).not_to raise_error end it 'can be created with a default deadline' do - host = FAKE_HOST opts = { a_channel_arg: 'an_arg', deadline: 5 } blk = proc do - GRPC::ClientStub.new(host, @cq, **opts) + GRPC::ClientStub.new(fake_host, @cq, **opts) end expect(&blk).not_to raise_error end it 'can be created with an channel override' do - host = FAKE_HOST opts = { a_channel_arg: 'an_arg', channel_override: @ch } blk = proc do - GRPC::ClientStub.new(host, @cq, **opts) + GRPC::ClientStub.new(fake_host, @cq, **opts) end expect(&blk).not_to raise_error end it 'cannot be created with a bad channel override' do - host = FAKE_HOST blk = proc do opts = { a_channel_arg: 'an_arg', channel_override: Object.new } - GRPC::ClientStub.new(host, @cq, **opts) + GRPC::ClientStub.new(fake_host, @cq, **opts) end expect(&blk).to raise_error end it 'cannot be created with bad credentials' do - host = FAKE_HOST blk = proc do opts = { a_channel_arg: 'an_arg', creds: Object.new } - GRPC::ClientStub.new(host, @cq, **opts) + GRPC::ClientStub.new(fake_host, @cq, **opts) end expect(&blk).to raise_error end it 'can be created with test test credentials' do certs = load_test_certs - host = FAKE_HOST blk = proc do opts = { GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr', a_channel_arg: 'an_arg', creds: GRPC::Core::Credentials.new(certs[0], nil, nil) } - GRPC::ClientStub.new(host, @cq, **opts) + GRPC::ClientStub.new(fake_host, @cq, **opts) end expect(&blk).to_not raise_error end @@ -187,7 +181,7 @@ describe 'ClientStub' do describe 'without a call operation' do def get_response(stub) - stub.request_response(@method, @sent_msg, NOOP, NOOP, + stub.request_response(@method, @sent_msg, noop, noop, k1: 'v1', k2: 'v2') end @@ -196,7 +190,7 @@ describe 'ClientStub' do describe 'via a call operation' do def get_response(stub) - op = stub.request_response(@method, @sent_msg, NOOP, NOOP, + op = stub.request_response(@method, @sent_msg, noop, noop, return_op: true, k1: 'v1', k2: 'v2') expect(op).to be_a(GRPC::ActiveCall::Operation) op.execute @@ -259,7 +253,7 @@ describe 'ClientStub' do describe 'without a call operation' do def get_response(stub) - stub.client_streamer(@method, @sent_msgs, NOOP, NOOP, + stub.client_streamer(@method, @sent_msgs, noop, noop, k1: 'v1', k2: 'v2') end @@ -268,7 +262,7 @@ describe 'ClientStub' do describe 'via a call operation' do def get_response(stub) - op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP, + op = stub.client_streamer(@method, @sent_msgs, noop, noop, return_op: true, k1: 'v1', k2: 'v2') expect(op).to be_a(GRPC::ActiveCall::Operation) op.execute @@ -333,7 +327,7 @@ describe 'ClientStub' do describe 'without a call operation' do def get_responses(stub) - e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP, + e = stub.server_streamer(@method, @sent_msg, noop, noop, k1: 'v1', k2: 'v2') expect(e).to be_a(Enumerator) e @@ -344,7 +338,7 @@ describe 'ClientStub' do describe 'via a call operation' do def get_responses(stub) - op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP, + op = stub.server_streamer(@method, @sent_msg, noop, noop, return_op: true, k1: 'v1', k2: 'v2') expect(op).to be_a(GRPC::ActiveCall::Operation) e = op.execute @@ -361,34 +355,30 @@ describe 'ClientStub' do before(:each) do @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } + server_port = create_test_server + @host = "localhost:#{server_port}" end it 'supports sending all the requests first', bidi: true do - server_port = create_test_server - host = "localhost:#{server_port}" th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, @pass) - stub = GRPC::ClientStub.new(host, @cq) + stub = GRPC::ClientStub.new(@host, @cq) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@replys) th.join end it 'supports client-initiated ping pong', bidi: true do - server_port = create_test_server - host = "localhost:#{server_port}" th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true) - stub = GRPC::ClientStub.new(host, @cq) + stub = GRPC::ClientStub.new(@host, @cq) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end it 'supports a server-initiated ping pong', bidi: true do - server_port = create_test_server - host = "localhost:#{server_port}" th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false) - stub = GRPC::ClientStub.new(host, @cq) + stub = GRPC::ClientStub.new(@host, @cq) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join @@ -397,7 +387,7 @@ describe 'ClientStub' do describe 'without a call operation' do def get_responses(stub) - e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP) + e = stub.bidi_streamer(@method, @sent_msgs, noop, noop) expect(e).to be_a(Enumerator) e end @@ -407,7 +397,7 @@ describe 'ClientStub' do describe 'via a call operation' do def get_responses(stub) - op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP, + op = stub.bidi_streamer(@method, @sent_msgs, noop, noop, return_op: true) expect(op).to be_a(GRPC::ActiveCall::Operation) e = op.execute @@ -421,8 +411,8 @@ describe 'ClientStub' do def run_server_streamer(expected_input, replys, status, **kw) wanted_metadata = kw.clone - wakey_thread do |mtx, cnd| - c = expect_server_to_be_invoked(mtx, cnd) + wakey_thread do |notifier| + c = expect_server_to_be_invoked(notifier) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end @@ -434,8 +424,8 @@ describe 'ClientStub' do def run_bidi_streamer_handle_inputs_first(expected_inputs, replys, status) - wakey_thread do |mtx, cnd| - c = expect_server_to_be_invoked(mtx, cnd) + wakey_thread do |notifier| + c = expect_server_to_be_invoked(notifier) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } replys.each { |r| c.remote_send(r) } c.send_status(status, status == @pass ? 'OK' : 'NOK', true) @@ -443,8 +433,8 @@ describe 'ClientStub' do end def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts) - wakey_thread do |mtx, cnd| - c = expect_server_to_be_invoked(mtx, cnd) + wakey_thread do |notifier| + c = expect_server_to_be_invoked(notifier) expected_inputs.each do |i| if client_starts expect(c.remote_read).to eq(i) @@ -460,8 +450,8 @@ describe 'ClientStub' do def run_client_streamer(expected_inputs, resp, status, **kw) wanted_metadata = kw.clone - wakey_thread do |mtx, cnd| - c = expect_server_to_be_invoked(mtx, cnd) + wakey_thread do |notifier| + c = expect_server_to_be_invoked(notifier) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) @@ -473,8 +463,8 @@ describe 'ClientStub' do def run_request_response(expected_input, resp, status, **kw) wanted_metadata = kw.clone - wakey_thread do |mtx, cnd| - c = expect_server_to_be_invoked(mtx, cnd) + wakey_thread do |notifier| + c = expect_server_to_be_invoked(notifier) expect(c.remote_read).to eq(expected_input) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) @@ -490,24 +480,16 @@ describe 'ClientStub' do @server.add_http2_port('0.0.0.0:0') end - def start_test_server(awake_mutex, awake_cond) + def expect_server_to_be_invoked(notifier) @server.start - @server_tag = Object.new - @server.request_call(@server_tag) - awake_mutex.synchronize { awake_cond.signal } - end - - def expect_server_to_be_invoked(awake_mutex, awake_cond) - start_test_server(awake_mutex, awake_cond) - ev = @server_queue.pluck(@server_tag, INFINITE_FUTURE) - fail OutOfTime if ev.nil? - server_call = ev.call - server_call.metadata = ev.result.metadata - finished_tag = Object.new - server_call.server_accept(@server_queue, finished_tag) - server_call.server_end_initial_metadata - GRPC::ActiveCall.new(server_call, @server_queue, NOOP, NOOP, - INFINITE_FUTURE, - finished_tag: finished_tag) + notifier.notify(nil) + server_tag = Object.new + recvd_rpc = @server.request_call(@server_queue, server_tag, + INFINITE_FUTURE) + recvd_call = recvd_rpc.call + recvd_call.metadata = recvd_rpc.metadata + recvd_call.run_batch(@server_queue, server_tag, Time.now + 2, + SEND_INITIAL_METADATA => nil) + GRPC::ActiveCall.new(recvd_call, @server_queue, noop, noop, INFINITE_FUTURE) end end diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index 39d1e83748..083632a080 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -37,7 +37,6 @@ describe GRPC::RpcDesc do INTERNAL = GRPC::Core::StatusCodes::INTERNAL UNKNOWN = GRPC::Core::StatusCodes::UNKNOWN CallError = GRPC::Core::CallError - EventError = GRPC::Core::EventError before(:each) do @request_response = RpcDesc.new('rr', Object.new, Object.new, 'encode', @@ -53,49 +52,49 @@ describe GRPC::RpcDesc do @ok_response = Object.new end + shared_examples 'it handles errors' do + it 'sends the specified status if BadStatus is raised' do + expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, + {}) + this_desc.run_server_method(@call, method(:bad_status)) + end + + it 'sends status UNKNOWN if other StandardErrors are raised' do + expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason, + false, {}) + this_desc.run_server_method(@call, method(:other_error)) + end + + it 'absorbs CallError with no further action' do + expect(@call).to receive(:remote_read).once.and_raise(CallError) + blk = proc do + this_desc.run_server_method(@call, method(:fake_reqresp)) + end + expect(&blk).to_not raise_error + end + end + describe '#run_server_method' do + let(:fake_md) { { k1: 'v1', k2: 'v2' } } describe 'for request responses' do + let(:this_desc) { @request_response } before(:each) do @call = double('active_call') allow(@call).to receive(:single_req_view).and_return(@call) - allow(@call).to receive(:gc) - end - - it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK') - @request_response.run_server_method(@call, method(:bad_status)) - end - - it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason) - @request_response.run_server_method(@call, method(:other_error)) - end - - it 'absorbs EventError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(EventError) - blk = proc do - @request_response.run_server_method(@call, method(:fake_reqresp)) - end - expect(&blk).to_not raise_error end - it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(CallError) - blk = proc do - @request_response.run_server_method(@call, method(:fake_reqresp)) - end - expect(&blk).to_not raise_error - end + it_behaves_like 'it handles errors' it 'sends a response and closes the stream if there no errors' do req = Object.new expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:remote_send).once.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK') - expect(@call).to receive(:finished).once - @request_response.run_server_method(@call, method(:fake_reqresp)) + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) + this_desc.run_server_method(@call, method(:fake_reqresp)) end end @@ -103,27 +102,20 @@ describe GRPC::RpcDesc do before(:each) do @call = double('active_call') allow(@call).to receive(:multi_req_view).and_return(@call) - allow(@call).to receive(:gc) end it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK') + expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, + {}) @client_streamer.run_server_method(@call, method(:bad_status_alt)) end it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason) + expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason, + false, {}) @client_streamer.run_server_method(@call, method(:other_error_alt)) end - it 'absorbs EventError with no further action' do - expect(@call).to receive(:remote_send).once.and_raise(EventError) - blk = proc do - @client_streamer.run_server_method(@call, method(:fake_clstream)) - end - expect(&blk).to_not raise_error - end - it 'absorbs CallError with no further action' do expect(@call).to receive(:remote_send).once.and_raise(CallError) blk = proc do @@ -134,53 +126,29 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do expect(@call).to receive(:remote_send).once.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK') - expect(@call).to receive(:finished).once + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) @client_streamer.run_server_method(@call, method(:fake_clstream)) end end describe 'for server streaming' do + let(:this_desc) { @request_response } before(:each) do @call = double('active_call') allow(@call).to receive(:single_req_view).and_return(@call) - allow(@call).to receive(:gc) - end - - it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK') - @server_streamer.run_server_method(@call, method(:bad_status)) - end - - it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) - expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason) - @server_streamer.run_server_method(@call, method(:other_error)) - end - - it 'absorbs EventError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(EventError) - blk = proc do - @server_streamer.run_server_method(@call, method(:fake_svstream)) - end - expect(&blk).to_not raise_error end - it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(CallError) - blk = proc do - @server_streamer.run_server_method(@call, method(:fake_svstream)) - end - expect(&blk).to_not raise_error - end + it_behaves_like 'it handles errors' it 'sends a response and closes the stream if there no errors' do req = Object.new expect(@call).to receive(:remote_read).once.and_return(req) expect(@call).to receive(:remote_send).twice.with(@ok_response) - expect(@call).to receive(:send_status).once.with(OK, 'OK') - expect(@call).to receive(:finished).once + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) @server_streamer.run_server_method(@call, method(:fake_svstream)) end end @@ -191,26 +159,28 @@ describe GRPC::RpcDesc do enq_th, rwl_th = double('enqueue_th'), ('read_write_loop_th') allow(enq_th).to receive(:join) allow(rwl_th).to receive(:join) - allow(@call).to receive(:gc) end it 'sends the specified status if BadStatus is raised' do e = GRPC::BadStatus.new(@bs_code, 'NOK') expect(@call).to receive(:run_server_bidi).and_raise(e) - expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK') + expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, + {}) @bidi_streamer.run_server_method(@call, method(:bad_status_alt)) end it 'sends status UNKNOWN if other StandardErrors are raised' do expect(@call).to receive(:run_server_bidi).and_raise(StandardError) - expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason) + expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason, + false, {}) @bidi_streamer.run_server_method(@call, method(:other_error_alt)) end it 'closes the stream if there no errors' do expect(@call).to receive(:run_server_bidi) - expect(@call).to receive(:send_status).once.with(OK, 'OK') - expect(@call).to receive(:finished).once + expect(@call).to receive(:output_metadata).and_return(fake_md) + expect(@call).to receive(:send_status).once.with(OK, 'OK', true, + **fake_md) @bidi_streamer.run_server_method(@call, method(:fake_bidistream)) end end diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb index 8383dc1533..aae3a7d7cb 100644 --- a/src/ruby/spec/generic/rpc_server_pool_spec.rb +++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb @@ -28,11 +28,10 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require 'grpc' -require 'xray/thread_dump_signal_handler' -Pool = GRPC::RpcServer::Pool +describe GRPC::Pool do + Pool = GRPC::Pool -describe Pool do describe '#new' do it 'raises if a non-positive size is used' do expect { Pool.new(0) }.to raise_error diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 34e5cdcd04..2cd21a15e3 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -28,7 +28,6 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require 'grpc' -require 'xray/thread_dump_signal_handler' def load_test_certs test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata') @@ -58,18 +57,20 @@ class NoRpcImplementation rpc :an_rpc, EchoMsg, EchoMsg end -# A test service with an implementation. +# A test service with an echo implementation. class EchoService include GRPC::GenericService rpc :an_rpc, EchoMsg, EchoMsg attr_reader :received_md - def initialize(_default_var = 'ignored') + def initialize(**kw) + @trailing_metadata = kw @received_md = [] end def an_rpc(req, call) logger.info('echo service received a request') + call.output_metadata.update(@trailing_metadata) @received_md << call.metadata unless call.metadata.nil? req end @@ -77,6 +78,25 @@ end EchoStub = EchoService.rpc_stub_class +# A test service with an implementation that fails with BadStatus +class FailingService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + attr_reader :details, :code, :md + + def initialize(_default_var = 'ignored') + @details = 'app error' + @code = 101 + @md = { failed_method: 'an_rpc' } + end + + def an_rpc(_req, _call) + fail GRPC::BadStatus.new(@code, @details, **@md) + end +end + +FailingStub = FailingService.rpc_stub_class + # A slow test service. class SlowService include GRPC::GenericService @@ -301,21 +321,20 @@ describe GRPC::RpcServer do end describe '#run' do - before(:each) do - @client_opts = { - channel_override: @ch - } - @marshal = EchoService.rpc_descs[:an_rpc].marshal_proc - @unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) - server_opts = { - server_override: @server, - completion_queue_override: @server_queue, - poll_period: 1 - } - @srv = RpcServer.new(**server_opts) - end + let(:client_opts) { { channel_override: @ch } } + let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc } + let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) } + + context 'with no connect_metadata' do + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + end - describe 'when running' do it 'should return NOT_FOUND status on unknown methods', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -323,8 +342,8 @@ describe GRPC::RpcServer do req = EchoMsg.new blk = proc do cq = GRPC::Core::CompletionQueue.new - stub = GRPC::ClientStub.new(@host, cq, **@client_opts) - stub.request_response('/unknown', req, @marshal, @unmarshal) + stub = GRPC::ClientStub.new(@host, cq, **client_opts) + stub.request_response('/unknown', req, marshal, unmarshal) end expect(&blk).to raise_error GRPC::BadStatus @srv.stop @@ -337,7 +356,7 @@ describe GRPC::RpcServer do @srv.wait_till_running req = EchoMsg.new n = 5 # arbitrary - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) } @srv.stop t.join @@ -349,7 +368,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] expect(service.received_md).to eq(wanted_md) @@ -363,8 +382,8 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, **@client_opts) - deadline = service.delay + 0.5 # wait for long enough + stub = SlowStub.new(@host, **client_opts) + deadline = service.delay + 1.0 # wait for long enough expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] expect(service.received_md).to eq(wanted_md) @@ -378,7 +397,7 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) deadline = 0.1 # too short for SlowService to respond blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') } expect(&blk).to raise_error GRPC::BadStatus @@ -388,19 +407,37 @@ describe GRPC::RpcServer do t.join end + it 'should handle cancellation correctly', server: true do + service = SlowService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = SlowStub.new(@host, **client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + Thread.new do # cancel the call + sleep 0.1 + op.cancel + end + expect { op.execute }.to raise_error GRPC::Cancelled + @srv.stop + t.join + end + it 'should receive updated metadata', server: true do service = EchoService.new @srv.handle(service) t = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - @client_opts[:update_metadata] = proc do |md| + client_opts[:update_metadata] = proc do |md| md[:k1] = 'updated-v1' md end - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) - wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2' }] + wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2', + 'jwt_aud_uri' => "https://#{@host}/EchoService" }] expect(service.received_md).to eq(wanted_md) @srv.stop t.join @@ -415,7 +452,7 @@ describe GRPC::RpcServer do threads = [] n.times do threads << Thread.new do - stub = EchoStub.new(@host, **@client_opts) + stub = EchoStub.new(@host, **client_opts) q << stub.an_rpc(req) end end @@ -443,7 +480,7 @@ describe GRPC::RpcServer do one_failed_as_unavailable = false n.times do threads << Thread.new do - stub = SlowStub.new(@host, **@client_opts) + stub = SlowStub.new(@host, **client_opts) begin stub.an_rpc(req) rescue GRPC::BadStatus => e @@ -456,5 +493,97 @@ describe GRPC::RpcServer do expect(one_failed_as_unavailable).to be(true) end end + + context 'with connect metadata' do + let(:test_md_proc) do + proc do |mth, md| + res = md.clone + res['method'] = mth + res['connect_k1'] = 'connect_v1' + res + end + end + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1, + connect_md_proc: test_md_proc + } + @srv = RpcServer.new(**server_opts) + end + + it 'should send connect metadata to the client', server: true do + service = EchoService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = EchoStub.new(@host, **client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + expect(op.metadata).to be nil + expect(op.execute).to be_a(EchoMsg) + wanted_md = { + 'k1' => 'v1', + 'k2' => 'v2', + 'method' => '/EchoService/an_rpc', + 'connect_k1' => 'connect_v1' + } + expect(op.metadata).to eq(wanted_md) + @srv.stop + t.join + end + end + + context 'with trailing metadata' do + before(:each) do + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + end + + it 'should be added to BadStatus when requests fail', server: true do + service = FailingService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = FailingStub.new(@host, **client_opts) + blk = proc { stub.an_rpc(req) } + + # confirm it raise the expected error + expect(&blk).to raise_error GRPC::BadStatus + + # call again and confirm exception contained the trailing metadata. + begin + blk.call + rescue GRPC::BadStatus => e + expect(e.code).to eq(service.code) + expect(e.details).to eq(service.details) + expect(e.metadata).to eq(service.md) + end + @srv.stop + t.join + end + + it 'should be received by the client', server: true do + wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' } + service = EchoService.new(k1: 'out_v1', k2: 'out_v2') + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = EchoStub.new(@host, **client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + expect(op.metadata).to be nil + expect(op.execute).to be_a(EchoMsg) + expect(op.metadata).to eq(wanted_trailers) + @srv.stop + t.join + end + end end end diff --git a/src/ruby/spec/metadata_spec.rb b/src/ruby/spec/metadata_spec.rb deleted file mode 100644 index 2472866692..0000000000 --- a/src/ruby/spec/metadata_spec.rb +++ /dev/null @@ -1,64 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -require 'grpc' - -describe GRPC::Core::Metadata do - describe '#new' do - it 'should create instances' do - expect { GRPC::Core::Metadata.new('a key', 'a value') }.to_not raise_error - end - end - - describe '#key' do - md = GRPC::Core::Metadata.new('a key', 'a value') - it 'should be the constructor value' do - expect(md.key).to eq('a key') - end - end - - describe '#value' do - md = GRPC::Core::Metadata.new('a key', 'a value') - it 'should be the constuctor value' do - expect(md.value).to eq('a value') - end - end - - describe '#dup' do - it 'should create a copy that returns the correct key' do - md = GRPC::Core::Metadata.new('a key', 'a value') - expect(md.dup.key).to eq('a key') - end - - it 'should create a copy that returns the correct value' do - md = GRPC::Core::Metadata.new('a key', 'a value') - expect(md.dup.value).to eq('a value') - end - end -end diff --git a/src/ruby/spec/server_spec.rb b/src/ruby/spec/server_spec.rb index a47e484f97..bb566d1b1f 100644 --- a/src/ruby/spec/server_spec.rb +++ b/src/ruby/spec/server_spec.rb @@ -152,7 +152,7 @@ describe Server do it 'does not take a hash with bad values as channel args' do blk = construct_with_args(symbol: Object.new) expect(&blk).to raise_error TypeError - blk = construct_with_args('1' => Hash.new) + blk = construct_with_args('1' => {}) expect(&blk).to raise_error TypeError end diff --git a/src/ruby/spec/spec_helper.rb b/src/ruby/spec/spec_helper.rb index 837d2fc42a..101165c146 100644 --- a/src/ruby/spec/spec_helper.rb +++ b/src/ruby/spec/spec_helper.rb @@ -35,14 +35,18 @@ $LOAD_PATH.unshift(spec_dir) $LOAD_PATH.unshift(lib_dir) $LOAD_PATH.uniq! -require 'faraday' +# set up coverage +require 'simplecov' +SimpleCov.start do + add_filter 'spec' + add_filter 'bin' + SimpleCov.command_name ENV['COVERAGE_NAME'] +end if ENV['COVERAGE_NAME'] + require 'rspec' require 'logging' require 'rspec/logging_helper' -# Allow Faraday to support test stubs -Faraday::Adapter.load_middleware(:test) - # Configure RSpec to capture log messages for each test. The output from the # logs will be stored in the @log_output variable. It is a StringIO instance. RSpec.configure do |config| |