diff options
author | Yuki Yugui Sonoda <yugui@yugui.jp> | 2015-04-17 08:06:01 +0900 |
---|---|---|
committer | Yuki Yugui Sonoda <yugui@yugui.jp> | 2015-04-17 08:06:01 +0900 |
commit | e8696fb2864a01bb3fd568323466485d2b96b12d (patch) | |
tree | 18731c7688f54f9b788decbd99d810f20d26768c /src/ruby | |
parent | 22887917f54dfa3f1b42f2bc4627fb8dbd968cfb (diff) | |
parent | d35b7107f8c54196ba8ddd55a0760e5f559e2014 (diff) |
Merge branch 'master' into fix/header
Conflicts:
src/ruby/ext/grpc/rb_grpc.c
Diffstat (limited to 'src/ruby')
-rw-r--r-- | src/ruby/.rubocop_todo.yml | 4 | ||||
-rwxr-xr-x | src/ruby/bin/interop/interop_client.rb | 32 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_call.c | 40 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_call.h | 7 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_channel.c | 5 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_channel.h | 3 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_completion_queue.c | 8 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_completion_queue.h | 4 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_credentials.c | 6 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_credentials.h | 4 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc.c | 19 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc.h | 9 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_server.c | 2 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_server.h | 4 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_server_credentials.c | 8 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_server_credentials.h | 4 | ||||
-rwxr-xr-x | src/ruby/grpc.gemspec | 12 | ||||
-rw-r--r-- | src/ruby/lib/grpc/errors.rb | 4 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 94 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/bidi_call.rb | 32 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_server_spec.rb | 17 |
21 files changed, 163 insertions, 155 deletions
diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml index d9fe0a5835..b4d66c517c 100644 --- a/src/ruby/.rubocop_todo.yml +++ b/src/ruby/.rubocop_todo.yml @@ -1,5 +1,5 @@ # This configuration was generated by `rubocop --auto-gen-config` -# on 2015-04-14 09:35:44 -0700 using RuboCop version 0.29.1. +# on 2015-04-15 18:43:23 -0700 using RuboCop version 0.30.0. # The point is for the user to remove these configuration records # one by one as the offenses are removed from the code base. # Note that changes in the inspected code, or installation of new @@ -9,7 +9,7 @@ Metrics/AbcSize: Max: 36 -# Offense count: 2 +# Offense count: 3 # Configuration parameters: CountComments. Metrics/ClassLength: Max: 183 diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index af7a1d5b15..6f1fe2614f 100755 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -136,12 +136,14 @@ class PingPongPlayer include Grpc::Testing::PayloadType attr_accessor :assertions # required by Minitest::Assertions attr_accessor :queue + attr_accessor :canceller_op # reqs is the enumerator over the requests def initialize(msg_sizes) @queue = Queue.new @msg_sizes = msg_sizes @assertions = 0 # required by Minitest::Assertions + @canceller_op = nil # used to cancel after the first response end def each_item @@ -155,12 +157,15 @@ class PingPongPlayer response_parameters: [p_cls.new(size: resp_size)]) yield req resp = @queue.pop - assert_equal(:COMPRESSABLE, resp.payload.type, - 'payload type is wrong') + assert_equal(:COMPRESSABLE, resp.payload.type, 'payload type is wrong') assert_equal(resp_size, resp.payload.body.length, - 'payload body #{i} has the wrong length') + "payload body #{count} has the wrong length") p "OK: ping_pong #{count}" count += 1 + unless @canceller_op.nil? + canceller_op.cancel + break + end end end end @@ -260,6 +265,27 @@ class NamedTests p 'OK: ping_pong' end + def cancel_after_begin + msg_sizes = [27_182, 8, 1828, 45_904] + reqs = msg_sizes.map do |x| + req = Payload.new(body: nulls(x)) + StreamingInputCallRequest.new(payload: req) + end + op = @stub.streaming_input_call(reqs, return_op: true) + op.cancel + assert_raises(GRPC::Cancelled) { op.execute } + p 'OK: cancel_after_begin' + end + + def cancel_after_first + msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]] + ppp = PingPongPlayer.new(msg_sizes) + op = @stub.full_duplex_call(ppp.each_item, return_op: true) + ppp.canceller_op = op # causes ppp to cancel after the 1st message + assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } } + p 'OK: cancel_after_first' + end + def all all_methods = NamedTests.instance_methods(false).map(&:to_s) all_methods.each do |m| diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 1b1958fd26..b0963411d1 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -42,6 +42,17 @@ #include "rb_completion_queue.h" #include "rb_grpc.h" +/* grpc_rb_cCall is the Call class whose instances proxy grpc_call. */ +static VALUE grpc_rb_cCall; + +/* grpc_rb_eCallError is the ruby class of the exception thrown during call + operations; */ +VALUE grpc_rb_eCallError = Qnil; + +/* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate + a timeout. */ +static VALUE grpc_rb_eOutOfTime = Qnil; + /* grpc_rb_sBatchResult is struct class used to hold the results of a batch * call. */ static VALUE grpc_rb_sBatchResult; @@ -86,7 +97,7 @@ static VALUE sym_cancelled; static VALUE hash_all_calls; /* Destroys a Call. */ -void grpc_rb_call_destroy(void *p) { +static void grpc_rb_call_destroy(void *p) { grpc_call *call = NULL; VALUE ref_count = Qnil; if (p == NULL) { @@ -188,7 +199,7 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) { it's capacity should have been computed via a prior call to grpc_rb_md_ary_fill_hash_cb */ -int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { +static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { grpc_metadata_array *md_ary = NULL; int array_length; int i; @@ -227,7 +238,8 @@ int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { /* grpc_rb_md_ary_capacity_hash_cb is the hash iteration callback used to pre-compute the capacity a grpc_metadata_array. */ -int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { +static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val, + VALUE md_ary_obj) { grpc_metadata_array *md_ary = NULL; /* Construct a metadata object from key and value and add it */ @@ -245,7 +257,7 @@ int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { /* grpc_rb_md_ary_convert converts a ruby metadata hash into a grpc_metadata_array. */ -void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) { +static void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) { VALUE md_ary_obj = Qnil; if (md_ary_hash == Qnil) { return; /* Do nothing if the expected has value is nil */ @@ -301,7 +313,8 @@ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary) { /* grpc_rb_call_check_op_keys_hash_cb is a hash iteration func that checks each key of an ops hash is valid. */ -int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, VALUE ops_ary) { +static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, + VALUE ops_ary) { /* Update the capacity; the value is an array, add capacity for each value in * the array */ if (TYPE(key) != T_FIXNUM) { @@ -330,7 +343,7 @@ int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, VALUE ops_ary) { /* grpc_rb_op_update_status_from_server adds the values in a ruby status struct to the 'send_status_from_server' portion of an op. */ -void grpc_rb_op_update_status_from_server(grpc_op *op, +static void grpc_rb_op_update_status_from_server(grpc_op *op, grpc_metadata_array* md_ary, VALUE status) { VALUE code = rb_struct_aref(status, sym_code); @@ -582,18 +595,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, return result; } -/* grpc_rb_cCall is the ruby class that proxies grpc_call. */ -VALUE grpc_rb_cCall = Qnil; - -/* grpc_rb_eCallError is the ruby class of the exception thrown during call - operations; */ -VALUE grpc_rb_eCallError = Qnil; - -/* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate - a timeout. */ -VALUE grpc_rb_eOutOfTime = Qnil; - -void Init_grpc_error_codes() { +static void Init_grpc_error_codes() { /* Constants representing the error codes of grpc_call_error in grpc.h */ VALUE grpc_rb_mRpcErrors = rb_define_module_under(grpc_rb_mGrpcCore, "RpcErrors"); @@ -645,7 +647,7 @@ void Init_grpc_error_codes() { rb_obj_freeze(rb_error_code_details); } -void Init_grpc_op_codes() { +static void Init_grpc_op_codes() { /* Constants representing operation type codes in grpc.h */ VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps"); diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h index e20a34c74e..003ce0429e 100644 --- a/src/ruby/ext/grpc/rb_call.h +++ b/src/ruby/ext/grpc/rb_call.h @@ -49,17 +49,10 @@ const char* grpc_call_error_detail_of(grpc_call_error err); /* Converts a metadata array to a hash. */ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary); -/* grpc_rb_cCall is the Call class whose instances proxy grpc_call. */ -extern VALUE grpc_rb_cCall; - /* grpc_rb_eCallError is the ruby class of the exception thrown during call operations. */ extern VALUE grpc_rb_eCallError; -/* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate - a timeout. */ -extern VALUE grpc_rb_eOutOfTime; - /* Initializes the Call class. */ void Init_grpc_call(); diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 3480280a03..9bd7c2edf9 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -58,6 +58,8 @@ static ID id_target; * GCed before the channel */ static ID id_cqueue; +/* grpc_rb_cChannel is the ruby class that proxies grpc_channel. */ +static VALUE grpc_rb_cChannel = Qnil; /* Used during the conversion of a hash to channel args during channel setup */ static VALUE grpc_rb_cChannelArgs; @@ -240,9 +242,6 @@ static VALUE grpc_rb_channel_destroy(VALUE self) { return Qnil; } -/* grpc_rb_cChannel is the ruby class that proxies grpc_channel. */ -VALUE grpc_rb_cChannel = Qnil; - void Init_grpc_channel() { grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject); grpc_rb_cChannel = diff --git a/src/ruby/ext/grpc/rb_channel.h b/src/ruby/ext/grpc/rb_channel.h index 5c57b31fb2..6e3c087689 100644 --- a/src/ruby/ext/grpc/rb_channel.h +++ b/src/ruby/ext/grpc/rb_channel.h @@ -37,9 +37,6 @@ #include <ruby.h> #include <grpc/grpc.h> -/* grpc_rb_cChannel is the Channel class whose instances proxy grpc_channel. */ -extern VALUE grpc_rb_cChannel; - /* Initializes the Channel class. */ void Init_grpc_channel(); diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index bc5ca109c3..369adeb915 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -40,6 +40,10 @@ #include <grpc/support/time.h> #include "rb_grpc.h" +/* grpc_rb_cCompletionQueue is the ruby class that proxies + * grpc_completion_queue. */ +static VALUE grpc_rb_cCompletionQueue = Qnil; + /* Used to allow grpc_completion_queue_next call to release the GIL */ typedef struct next_call_stack { grpc_completion_queue *cq; @@ -169,10 +173,6 @@ grpc_event* grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, return next_call.event; } -/* grpc_rb_cCompletionQueue is the ruby class that proxies - * grpc_completion_queue. */ -VALUE grpc_rb_cCompletionQueue = Qnil; - void Init_grpc_completion_queue() { grpc_rb_cCompletionQueue = rb_define_class_under(grpc_rb_mGrpcCore, "CompletionQueue", rb_cObject); diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h index 1bfb80e499..4d0f49ac47 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.h +++ b/src/ruby/ext/grpc/rb_completion_queue.h @@ -48,10 +48,6 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v); grpc_event* grpc_rb_completion_queue_pluck_event(VALUE cqueue, VALUE tag, VALUE timeout); -/* grpc_rb_cCompletionQueue is the CompletionQueue class whose instances proxy - grpc_completion_queue. */ -extern VALUE grpc_rb_cCompletionQueue; - /* Initializes the CompletionQueue class. */ void Init_grpc_completion_queue(); diff --git a/src/ruby/ext/grpc/rb_credentials.c b/src/ruby/ext/grpc/rb_credentials.c index 1504a4884e..122cffc92f 100644 --- a/src/ruby/ext/grpc/rb_credentials.c +++ b/src/ruby/ext/grpc/rb_credentials.c @@ -40,6 +40,9 @@ #include "rb_grpc.h" +/* grpc_rb_cCredentials is the ruby class that proxies grpc_credentials. */ +static VALUE grpc_rb_cCredentials = Qnil; + /* grpc_rb_credentials wraps a grpc_credentials. It provides a * peer ruby object, 'mark' to minimize copying when a credential is * created from ruby. */ @@ -242,9 +245,6 @@ static VALUE grpc_rb_credentials_init(int argc, VALUE *argv, VALUE self) { return self; } -/* grpc_rb_cCredentials is the ruby class that proxies grpc_credentials. */ -VALUE grpc_rb_cCredentials = Qnil; - void Init_grpc_credentials() { grpc_rb_cCredentials = rb_define_class_under(grpc_rb_mGrpcCore, "Credentials", rb_cObject); diff --git a/src/ruby/ext/grpc/rb_credentials.h b/src/ruby/ext/grpc/rb_credentials.h index dc0a3d01e8..e7c43c9c78 100644 --- a/src/ruby/ext/grpc/rb_credentials.h +++ b/src/ruby/ext/grpc/rb_credentials.h @@ -37,10 +37,6 @@ #include <ruby.h> #include <grpc/grpc_security.h> -/* grpc_rb_cCredentials is the ruby class whose instances proxy - grpc_credentials. */ -extern VALUE grpc_rb_cCredentials; - /* Initializes the ruby Credentials class. */ void Init_grpc_credentials(); diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index 2392393ffe..050e889d55 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -51,7 +51,7 @@ const RUBY_DATA_FUNC GC_NOT_MARKED = NULL; const RUBY_DATA_FUNC GC_DONT_FREE = NULL; -VALUE grpc_rb_cTimeVal = Qnil; +static VALUE grpc_rb_cTimeVal = Qnil; /* Alloc func that blocks allocation of a given object by raising an * exception. */ @@ -151,7 +151,7 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval) { return t; } -void Init_grpc_status_codes() { +static void Init_grpc_status_codes() { /* Constants representing the status codes or grpc_status_code in status.h */ VALUE grpc_rb_mStatusCodes = rb_define_module_under(grpc_rb_mGrpcCore, "StatusCodes"); @@ -200,7 +200,7 @@ static ID id_inspect; static ID id_to_s; /* Converts a wrapped time constant to a standard time. */ -VALUE grpc_rb_time_val_to_time(VALUE self) { +static VALUE grpc_rb_time_val_to_time(VALUE self) { gpr_timespec *time_const = NULL; Data_Get_Struct(self, gpr_timespec, time_const); return rb_funcall(rb_cTime, id_at, 2, INT2NUM(time_const->tv_sec), @@ -208,17 +208,17 @@ VALUE grpc_rb_time_val_to_time(VALUE self) { } /* Invokes inspect on the ctime version of the time val. */ -VALUE grpc_rb_time_val_inspect(VALUE self) { +static VALUE grpc_rb_time_val_inspect(VALUE self) { return rb_funcall(grpc_rb_time_val_to_time(self), id_inspect, 0); } /* Invokes to_s on the ctime version of the time val. */ -VALUE grpc_rb_time_val_to_s(VALUE self) { +static VALUE grpc_rb_time_val_to_s(VALUE self) { return rb_funcall(grpc_rb_time_val_to_time(self), id_to_s, 0); } /* Adds a module with constants that map to gpr's static timeval structs. */ -void Init_grpc_time_consts() { +static void Init_grpc_time_consts() { VALUE grpc_rb_mTimeConsts = rb_define_module_under(grpc_rb_mGrpcCore, "TimeConsts"); grpc_rb_cTimeVal = @@ -245,7 +245,7 @@ void Init_grpc_time_consts() { id_tv_nsec = rb_intern("tv_nsec"); } -void grpc_rb_shutdown(ruby_vm_t *vm) { grpc_shutdown(); } +static void grpc_rb_shutdown(ruby_vm_t *vm) { grpc_shutdown(); } /* Initialize the GRPC module structs */ @@ -258,6 +258,11 @@ VALUE grpc_rb_sStatus = Qnil; VALUE grpc_rb_mGRPC = Qnil; VALUE grpc_rb_mGrpcCore = Qnil; +/* cached Symbols for members in Status struct */ +VALUE sym_code = Qundef; +VALUE sym_details = Qundef; +VALUE sym_metadata = Qundef; + void Init_grpc() { grpc_init(); ruby_vm_at_exit(grpc_rb_shutdown); diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h index 3a93029556..1d411baf5b 100644 --- a/src/ruby/ext/grpc/rb_grpc.h +++ b/src/ruby/ext/grpc/rb_grpc.h @@ -41,9 +41,6 @@ /* grpc_rb_mGrpcCore is the module containing the ruby wrapper GRPC classes. */ extern VALUE grpc_rb_mGrpcCore; -/* Class used to wrap timeval structs. */ -extern VALUE grpc_rb_cTimeVal; - /* grpc_rb_sNewServerRpc is the struct that holds new server rpc details. */ extern VALUE grpc_rb_sNewServerRpc; @@ -51,13 +48,13 @@ extern VALUE grpc_rb_sNewServerRpc; extern VALUE grpc_rb_sStatus; /* sym_code is the symbol for the code attribute of grpc_rb_sStatus. */ -VALUE sym_code; +extern VALUE sym_code; /* sym_details is the symbol for the details attribute of grpc_rb_sStatus. */ -VALUE sym_details; +extern VALUE sym_details; /* sym_metadata is the symbol for the metadata attribute of grpc_rb_sStatus. */ -VALUE sym_metadata; +extern VALUE sym_metadata; /* GC_NOT_MARKED is used in calls to Data_Wrap_Struct to indicate that the wrapped struct does not need to participate in ruby gc. */ diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 33d9d69500..80f7760ebb 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -44,7 +44,7 @@ #include "rb_grpc.h" /* grpc_rb_cServer is the ruby class that proxies grpc_server. */ -VALUE grpc_rb_cServer = Qnil; +static VALUE grpc_rb_cServer = Qnil; /* id_at is the constructor method of the ruby standard Time class. */ static ID id_at; diff --git a/src/ruby/ext/grpc/rb_server.h b/src/ruby/ext/grpc/rb_server.h index 22e88a7d46..5e4b711d35 100644 --- a/src/ruby/ext/grpc/rb_server.h +++ b/src/ruby/ext/grpc/rb_server.h @@ -37,10 +37,6 @@ #include <ruby.h> #include <grpc/grpc.h> -/* grpc_rb_cServer is the Server class whose instances proxy - grpc_byte_buffer. */ -extern VALUE grpc_rb_cServer; - /* Initializes the Server class. */ void Init_grpc_server(); diff --git a/src/ruby/ext/grpc/rb_server_credentials.c b/src/ruby/ext/grpc/rb_server_credentials.c index 8b813eaca1..5109b96b5f 100644 --- a/src/ruby/ext/grpc/rb_server_credentials.c +++ b/src/ruby/ext/grpc/rb_server_credentials.c @@ -40,6 +40,10 @@ #include "rb_grpc.h" +/* grpc_rb_cServerCredentials is the ruby class that proxies + grpc_server_credentials. */ +static VALUE grpc_rb_cServerCredentials = Qnil; + /* grpc_rb_server_credentials wraps a grpc_server_credentials. It provides a peer ruby object, 'mark' to minimize copying when a server credential is created from ruby. */ @@ -180,10 +184,6 @@ static VALUE grpc_rb_server_credentials_init(VALUE self, VALUE pem_root_certs, return self; } -/* grpc_rb_cServerCredentials is the ruby class that proxies - grpc_server_credentials. */ -VALUE grpc_rb_cServerCredentials = Qnil; - void Init_grpc_server_credentials() { grpc_rb_cServerCredentials = rb_define_class_under(grpc_rb_mGrpcCore, "ServerCredentials", rb_cObject); diff --git a/src/ruby/ext/grpc/rb_server_credentials.h b/src/ruby/ext/grpc/rb_server_credentials.h index f79a869358..35b395ad5c 100644 --- a/src/ruby/ext/grpc/rb_server_credentials.h +++ b/src/ruby/ext/grpc/rb_server_credentials.h @@ -37,10 +37,6 @@ #include <ruby.h> #include <grpc/grpc_security.h> -/* grpc_rb_cServerCredentials is the ruby class whose instances proxy - grpc_server_credentials. */ -extern VALUE grpc_rb_cServerCredentials; - /* Initializes the ruby ServerCredentials class. */ void Init_grpc_server_credentials(); diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec index a50d0351da..12d4ab17f2 100755 --- a/src/ruby/grpc.gemspec +++ b/src/ruby/grpc.gemspec @@ -22,16 +22,16 @@ Gem::Specification.new do |s| s.platform = Gem::Platform::RUBY s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1' - s.add_dependency 'googleauth', '~> 0.4' + s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests s.add_dependency 'logging', '~> 1.8' s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests s.add_dependency 'xray', '~> 1.1' - s.add_development_dependency 'bundler', '~> 1.7' - s.add_development_dependency 'rake', '~> 10.0' - s.add_development_dependency 'rake-compiler', '~> 0' - s.add_development_dependency 'rubocop', '~> 0.28.0' - s.add_development_dependency 'rspec', '~> 3.0' + s.add_development_dependency 'bundler', '~> 1.9' + s.add_development_dependency 'rake', '~> 10.4' + s.add_development_dependency 'rake-compiler', '~> 0.9' + s.add_development_dependency 'rubocop', '~> 0.30' + s.add_development_dependency 'rspec', '~> 3.2' s.extensions = %w(ext/grpc/extconf.rb) end diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb index b23793730f..35e9c02a94 100644 --- a/src/ruby/lib/grpc/errors.rb +++ b/src/ruby/lib/grpc/errors.rb @@ -54,4 +54,8 @@ module GRPC Status.new(code, details) end end + + # Cancelled is an exception class that indicates that an rpc was cancelled. + class Cancelled < StandardError + end end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 489349c2c9..8d63de4145 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -30,6 +30,22 @@ require 'forwardable' require 'grpc/generic/bidi_call' +class Struct + # BatchResult is the struct returned by calls to call#start_batch. + class BatchResult + # check_status returns the status, raising an error if the status + # is non-nil and not OK. + def check_status + return nil if status.nil? + fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED + if status.code != GRPC::Core::StatusCodes::OK + fail GRPC::BadStatus.new(status.code, status.details) + end + status + end + end +end + # GRPC contains the General RPC module. module GRPC # The ActiveCall class provides simple methods for sending marshallable @@ -38,7 +54,9 @@ module GRPC include Core::StatusCodes include Core::TimeConsts include Core::CallOps + extend Forwardable attr_reader(:deadline) + def_delegators :@call, :cancel, :metadata # client_invoke begins a client invocation. # @@ -101,50 +119,6 @@ module GRPC @metadata_tag = metadata_tag end - # Obtains the status of the call. - # - # this value is nil until the call completes - # @return this call's status - def status - @call.status - end - - # Obtains the metadata of the call. - # - # At the start of the call this will be nil. During the call this gets - # some values as soon as the other end of the connection acknowledges the - # request. - # - # @return this calls's metadata - def metadata - @call.metadata - end - - # Cancels the call. - # - # Cancels the call. The call does not return any result, but once this it - # has been called, the call should eventually terminate. Due to potential - # races between the execution of the cancel and the in-flight request, the - # result of the call after calling #cancel is indeterminate: - # - # - the call may terminate with a BadStatus exception, with code=CANCELLED - # - the call may terminate with OK Status, and return a response - # - the call may terminate with a different BadStatus exception if that - # was happening - def cancel - @call.cancel - end - - # indicates if the call is shutdown - def shutdown - @shutdown ||= false - end - - # indicates if the call is cancelled. - def cancelled - @cancelled ||= false - end - # multi_req_view provides a restricted view of this ActiveCall for use # in a server client-streaming handler. def multi_req_view @@ -176,9 +150,9 @@ module GRPC SEND_CLOSE_FROM_CLIENT => nil } ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished - @call.run_batch(@cq, self, INFINITE_FUTURE, ops) + batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) return unless assert_finished - @call.status + batch_result.check_status end # finished waits until a client call is completed. @@ -192,17 +166,12 @@ module GRPC elsif !batch_result.metadata.nil? @call.metadata.merge!(batch_result.metadata) end - if batch_result.status.code != Core::StatusCodes::OK - fail BadStatus.new(batch_result.status.code, - batch_result.status.details) - end - batch_result + batch_result.check_status end # remote_send sends a request to the remote endpoint. # - # It blocks until the remote endpoint acknowledges by sending a - # WRITE_ACCEPTED. req can be marshalled already. + # It blocks until the remote endpoint accepts the message. # # @param req [Object, String] the object to send or it's marshal form. # @param marshalled [false, true] indicates if the object is already @@ -332,6 +301,9 @@ module GRPC response = remote_read finished unless response.is_a? Struct::Status response + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # client_streamer sends a stream of requests to a GRPC server, and @@ -355,6 +327,9 @@ module GRPC response = remote_read finished unless response.is_a? Struct::Status response + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # server_streamer sends one request to the GRPC server, which yields a @@ -381,6 +356,9 @@ module GRPC replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # bidi_streamer sends a stream of requests to the GRPC server, and yields @@ -416,6 +394,9 @@ module GRPC start_call(**kw) unless @started bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd.run_on_client(requests, &blk) + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # run_server_bidi orchestrates a BiDi stream processing on a server. @@ -436,9 +417,10 @@ module GRPC private + # Starts the call if not already started def start_call(**kw) - tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) - @finished_tag, @read_metadata_tag = tags + return if @started + @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) @started = true end @@ -466,6 +448,6 @@ module GRPC # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. Operation = view_class(:cancel, :cancelled, :deadline, :execute, - :metadata, :status) + :metadata, :status, :start_call) end end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 1c1b3b0db7..b813ab5b54 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -78,13 +78,11 @@ module GRPC # @param requests the Enumerable of requests to send # @return an Enumerator of requests to yield def run_on_client(requests, &blk) - enq_th = start_write_loop(requests) - loop_th = start_read_loop + @enq_th = start_write_loop(requests) + @loop_th = start_read_loop replies = each_queued_msg return replies if blk.nil? replies.each { |r| blk.call(r) } - enq_th.join - loop_th.join end # Begins orchestration of the Bidi stream for a server generating replies. @@ -100,10 +98,8 @@ module GRPC # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) - enq_th = start_write_loop(replys, is_client: false) - loop_th = start_read_loop - loop_th.join - enq_th.join + @enq_th = start_write_loop(replys, is_client: false) + @loop_th = start_read_loop end private @@ -122,10 +118,13 @@ module GRPC logger.debug("each_queued_msg: msg##{count}") count += 1 req = @readq.pop + logger.debug("each_queued_msg: req = #{req}") throw req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req end + @loop_th.join + @enq_th.join end # during bidi-streaming, read the requests to send from a separate thread @@ -136,20 +135,23 @@ module GRPC begin count = 0 requests.each do |req| + logger.debug("bidi-write_loop: #{count}") count += 1 payload = @marshal.call(req) @call.run_batch(@cq, write_tag, INFINITE_FUTURE, SEND_MESSAGE => payload) end if is_client - logger.debug("bidi-client: sent #{count} reqs, waiting to finish") - @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_CLOSE_FROM_CLIENT => nil, - RECV_STATUS_ON_CLIENT => nil) + logger.debug("bidi-write-loop: sent #{count}, waiting to finish") + batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil, + RECV_STATUS_ON_CLIENT => nil) + batch_result.check_status end rescue StandardError => e - logger.warn('bidi: write_loop failed') + logger.warn('bidi-write_loop: failed') logger.warn(e) + raise e end end end @@ -163,7 +165,7 @@ module GRPC # queue the initial read before beginning the loop loop do - logger.debug("waiting for read #{count}") + logger.debug("bidi-read_loop: #{count}") count += 1 # TODO: ensure metadata is read if available, currently it's not batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, @@ -171,7 +173,7 @@ module GRPC # handle the next message if batch_result.message.nil? @readq.push(END_OF_READS) - logger.debug('done reading!') + logger.debug('bidi-read-loop: done reading!') break end diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 245999ea03..1323bacfa6 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -388,6 +388,23 @@ describe GRPC::RpcServer do t.join end + it 'should handle cancellation correctly', server: true do + service = SlowService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = SlowStub.new(@host, **@client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + Thread.new do # cancel the call + sleep 0.1 + op.cancel + end + expect { op.execute }.to raise_error GRPC::Cancelled + @srv.stop + t.join + end + it 'should receive updated metadata', server: true do service = EchoService.new @srv.handle(service) |