aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby')
-rwxr-xr-xsrc/ruby/.rspec1
-rw-r--r--src/ruby/.rubocop_todo.yml32
-rw-r--r--src/ruby/CHANGELOG.md11
-rwxr-xr-xsrc/ruby/Rakefile1
-rwxr-xr-xsrc/ruby/bin/apis/pubsub_demo.rb9
-rwxr-xr-xsrc/ruby/bin/interop/interop_client.rb46
-rwxr-xr-xsrc/ruby/bin/interop/interop_server.rb2
-rwxr-xr-xsrc/ruby/bin/math_server.rb2
-rwxr-xr-xsrc/ruby/bin/noproto_server.rb2
-rw-r--r--src/ruby/ext/grpc/rb_byte_buffer.c204
-rw-r--r--src/ruby/ext/grpc/rb_byte_buffer.h16
-rw-r--r--src/ruby/ext/grpc/rb_call.c821
-rw-r--r--src/ruby/ext/grpc/rb_call.h8
-rw-r--r--src/ruby/ext/grpc/rb_channel.c92
-rw-r--r--src/ruby/ext/grpc/rb_channel.h3
-rw-r--r--src/ruby/ext/grpc/rb_channel_args.c17
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c73
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.h10
-rw-r--r--src/ruby/ext/grpc/rb_credentials.c68
-rw-r--r--src/ruby/ext/grpc/rb_credentials.h4
-rw-r--r--src/ruby/ext/grpc/rb_event.c361
-rw-r--r--src/ruby/ext/grpc/rb_event.h53
-rw-r--r--src/ruby/ext/grpc/rb_grpc.c153
-rw-r--r--src/ruby/ext/grpc/rb_grpc.h30
-rw-r--r--src/ruby/ext/grpc/rb_metadata.c215
-rw-r--r--src/ruby/ext/grpc/rb_metadata.h53
-rw-r--r--src/ruby/ext/grpc/rb_server.c145
-rw-r--r--src/ruby/ext/grpc/rb_server.h4
-rw-r--r--src/ruby/ext/grpc/rb_server_credentials.c45
-rw-r--r--src/ruby/ext/grpc/rb_server_credentials.h4
-rwxr-xr-xsrc/ruby/grpc.gemspec19
-rw-r--r--src/ruby/lib/grpc.rb2
-rw-r--r--src/ruby/lib/grpc/errors.rb15
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb275
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb92
-rw-r--r--src/ruby/lib/grpc/generic/client_stub.rb73
-rw-r--r--src/ruby/lib/grpc/generic/rpc_desc.rb19
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb423
-rw-r--r--src/ruby/lib/grpc/notifier.rb (renamed from src/ruby/lib/grpc/core/event.rb)34
-rw-r--r--src/ruby/lib/grpc/version.rb2
-rw-r--r--src/ruby/spec/alloc_spec.rb44
-rw-r--r--src/ruby/spec/byte_buffer_spec.rb67
-rw-r--r--src/ruby/spec/call_spec.rb63
-rw-r--r--src/ruby/spec/channel_spec.rb31
-rw-r--r--src/ruby/spec/client_server_spec.rb368
-rw-r--r--src/ruby/spec/credentials_spec.rb4
-rw-r--r--src/ruby/spec/event_spec.rb53
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb144
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb110
-rw-r--r--src/ruby/spec/generic/rpc_desc_spec.rb130
-rw-r--r--src/ruby/spec/generic/rpc_server_pool_spec.rb5
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb187
-rw-r--r--src/ruby/spec/metadata_spec.rb64
-rw-r--r--src/ruby/spec/server_spec.rb2
-rw-r--r--src/ruby/spec/spec_helper.rb12
55 files changed, 2070 insertions, 2653 deletions
diff --git a/src/ruby/.rspec b/src/ruby/.rspec
index 60a4aad5a2..dd579f7a13 100755
--- a/src/ruby/.rspec
+++ b/src/ruby/.rspec
@@ -1 +1,2 @@
-I.
+--require spec_helper
diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml
index d5bb55e5a8..ed4a4438b3 100644
--- a/src/ruby/.rubocop_todo.yml
+++ b/src/ruby/.rubocop_todo.yml
@@ -1,42 +1,30 @@
# This configuration was generated by `rubocop --auto-gen-config`
-# on 2015-01-16 02:30:04 -0800 using RuboCop version 0.28.0.
+# on 2015-04-17 14:43:27 -0700 using RuboCop version 0.30.0.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
# versions of RuboCop, may require this file to be generated again.
-# Offense count: 3
-# Lint/UselessAssignment:
-# Enabled: false
-
-# Offense count: 33
+# Offense count: 30
Metrics/AbcSize:
- Max: 39
+ Max: 40
# Offense count: 3
# Configuration parameters: CountComments.
Metrics/ClassLength:
- Max: 231
-
-# Offense count: 2
-Metrics/CyclomaticComplexity:
- Max: 8
+ Max: 184
-# Offense count: 36
+# Offense count: 35
# Configuration parameters: CountComments.
Metrics/MethodLength:
- Max: 37
+ Max: 36
-# Offense count: 8
+# Offense count: 7
# Configuration parameters: CountKeywordArgs.
Metrics/ParameterLists:
Max: 8
-# Offense count: 2
-Metrics/PerceivedComplexity:
- Max: 10
-
-# Offense count: 7
+# Offense count: 9
# Configuration parameters: AllowedVariables.
Style/GlobalVars:
Enabled: false
@@ -50,3 +38,7 @@ Style/Next:
# Configuration parameters: Methods.
Style/SingleLineBlockParams:
Enabled: false
+
+# Offense count: 1
+Style/StructInheritance:
+ Enabled: false
diff --git a/src/ruby/CHANGELOG.md b/src/ruby/CHANGELOG.md
new file mode 100644
index 0000000000..8ec6e3cfdb
--- /dev/null
+++ b/src/ruby/CHANGELOG.md
@@ -0,0 +1,11 @@
+## 0.6.1 (2015-04-14)
+
+### Changes
+
+* Begins this ChangeLog ([@tbetbetbe][])
+* Updates to version 0.4 of googleauth. ([@tbetbetbe][])
+* Switch the extension to use the call API. ([@tbetbetbe][])
+* Refactor the C extension to avoid identifiers used by ruby ([@yugui][])
+
+[@tbetbetbe]: https://github.com/tbetbetbe
+[@yugui]: https://github.com/yugui
diff --git a/src/ruby/Rakefile b/src/ruby/Rakefile
index afb354e922..02af9a84b8 100755
--- a/src/ruby/Rakefile
+++ b/src/ruby/Rakefile
@@ -26,6 +26,7 @@ namespace :suite do
SPEC_SUITES.each do |suite|
desc "Run all specs in the #{suite[:title]} spec suite"
RSpec::Core::RakeTask.new(suite[:id]) do |t|
+ ENV['COVERAGE_NAME'] = suite[:id].to_s
spec_files = []
suite[:files].each { |f| spec_files += Dir[f] } if suite[:files]
diff --git a/src/ruby/bin/apis/pubsub_demo.rb b/src/ruby/bin/apis/pubsub_demo.rb
index 9bb324ff64..6d69b0f21e 100755
--- a/src/ruby/bin/apis/pubsub_demo.rb
+++ b/src/ruby/bin/apis/pubsub_demo.rb
@@ -71,7 +71,7 @@ end
# Builds the metadata authentication update proc.
def auth_proc(opts)
- auth_creds = Google::Auth.get_application_default(opts.oauth_scope)
+ auth_creds = Google::Auth.get_application_default
return auth_creds.updater_proc
end
@@ -213,17 +213,14 @@ class NamedActions
end
# Args is used to hold the command line info.
-Args = Struct.new(:host, :oauth_scope, :port, :action, :project_id, :topic_name,
+Args = Struct.new(:host, :port, :action, :project_id, :topic_name,
:sub_name)
# validates the the command line options, returning them as an Arg.
def parse_args
args = Args.new('pubsub-staging.googleapis.com',
- 'https://www.googleapis.com/auth/pubsub',
443, 'list_some_topics', 'stoked-keyword-656')
OptionParser.new do |opts|
- opts.on('--oauth_scope scope',
- 'Scope for OAuth tokens') { |v| args['oauth_scope'] = v }
opts.on('--server_host SERVER_HOST', 'server hostname') do |v|
args.host = v
end
@@ -250,7 +247,7 @@ def parse_args
end
def _check_args(args)
- %w(host port action oauth_scope).each do |a|
+ %w(host port action).each do |a|
if args[a].nil?
raise OptionParser::MissingArgument.new("please specify --#{a}")
end
diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb
index b2a8711c79..a388924722 100755
--- a/src/ruby/bin/interop/interop_client.rb
+++ b/src/ruby/bin/interop/interop_client.rb
@@ -110,6 +110,11 @@ def create_stub(opts)
end
end
+ if opts.test_case == 'jwt_token_creds' # don't use a scope
+ auth_creds = Google::Auth.get_application_default
+ stub_opts[:update_metadata] = auth_creds.updater_proc
+ end
+
logger.info("... connecting securely to #{address}")
Grpc::Testing::TestService::Stub.new(address, **stub_opts)
else
@@ -131,12 +136,14 @@ class PingPongPlayer
include Grpc::Testing::PayloadType
attr_accessor :assertions # required by Minitest::Assertions
attr_accessor :queue
+ attr_accessor :canceller_op
# reqs is the enumerator over the requests
def initialize(msg_sizes)
@queue = Queue.new
@msg_sizes = msg_sizes
@assertions = 0 # required by Minitest::Assertions
+ @canceller_op = nil # used to cancel after the first response
end
def each_item
@@ -150,12 +157,15 @@ class PingPongPlayer
response_parameters: [p_cls.new(size: resp_size)])
yield req
resp = @queue.pop
- assert_equal(:COMPRESSABLE, resp.payload.type,
- 'payload type is wrong')
+ assert_equal(:COMPRESSABLE, resp.payload.type, 'payload type is wrong')
assert_equal(resp_size, resp.payload.body.length,
- 'payload body #{i} has the wrong length')
+ "payload body #{count} has the wrong length")
p "OK: ping_pong #{count}"
count += 1
+ unless @canceller_op.nil?
+ canceller_op.cancel
+ break
+ end
end
end
end
@@ -201,6 +211,15 @@ class NamedTests
p 'OK: service_account_creds'
end
+ def jwt_token_creds
+ json_key = File.read(ENV[AUTH_ENV])
+ wanted_email = MultiJson.load(json_key)['client_email']
+ resp = perform_large_unary(fill_username: true)
+ assert_equal(wanted_email, resp.username,
+ 'service_account_creds: incorrect username')
+ p 'OK: jwt_token_creds'
+ end
+
def compute_engine_creds
resp = perform_large_unary(fill_username: true,
fill_oauth_scope: true)
@@ -246,6 +265,27 @@ class NamedTests
p 'OK: ping_pong'
end
+ def cancel_after_begin
+ msg_sizes = [27_182, 8, 1828, 45_904]
+ reqs = msg_sizes.map do |x|
+ req = Payload.new(body: nulls(x))
+ StreamingInputCallRequest.new(payload: req)
+ end
+ op = @stub.streaming_input_call(reqs, return_op: true)
+ op.cancel
+ assert_raises(GRPC::Cancelled) { op.execute }
+ p 'OK: cancel_after_begin'
+ end
+
+ def cancel_after_first_response
+ msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]]
+ ppp = PingPongPlayer.new(msg_sizes)
+ op = @stub.full_duplex_call(ppp.each_item, return_op: true)
+ ppp.canceller_op = op # causes ppp to cancel after the 1st message
+ assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } }
+ p 'OK: cancel_after_first_response'
+ end
+
def all
all_methods = NamedTests.instance_methods(false).map(&:to_s)
all_methods.each do |m|
diff --git a/src/ruby/bin/interop/interop_server.rb b/src/ruby/bin/interop/interop_server.rb
index 0819ba9bbc..72570d92f3 100755
--- a/src/ruby/bin/interop/interop_server.rb
+++ b/src/ruby/bin/interop/interop_server.rb
@@ -185,7 +185,7 @@ def main
logger.info("... running insecurely on #{host}")
end
s.handle(TestTarget)
- s.run
+ s.run_till_terminated
end
main
diff --git a/src/ruby/bin/math_server.rb b/src/ruby/bin/math_server.rb
index 5cc7613489..1bfe253b85 100755
--- a/src/ruby/bin/math_server.rb
+++ b/src/ruby/bin/math_server.rb
@@ -183,7 +183,7 @@ def main
end
s.handle(Calculator)
- s.run
+ s.run_till_terminated
end
main
diff --git a/src/ruby/bin/noproto_server.rb b/src/ruby/bin/noproto_server.rb
index 9979cb7ebb..f71daeadb3 100755
--- a/src/ruby/bin/noproto_server.rb
+++ b/src/ruby/bin/noproto_server.rb
@@ -105,7 +105,7 @@ def main
end
s.handle(NoProto)
- s.run
+ s.run_till_terminated
end
main
diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c
index ff5a114de5..e3a5277f54 100644
--- a/src/ruby/ext/grpc/rb_byte_buffer.c
+++ b/src/ruby/ext/grpc/rb_byte_buffer.c
@@ -39,203 +39,29 @@
#include <grpc/support/slice.h>
#include "rb_grpc.h"
-/* grpc_rb_byte_buffer wraps a grpc_byte_buffer. It provides a peer ruby
- * object, 'mark' to minimize copying when a byte_buffer is created from
- * ruby. */
-typedef struct grpc_rb_byte_buffer {
- /* Holder of ruby objects involved in constructing the status */
- VALUE mark;
- /* The actual status */
- grpc_byte_buffer *wrapped;
-} grpc_rb_byte_buffer;
-
-/* Destroys ByteBuffer instances. */
-static void grpc_rb_byte_buffer_free(void *p) {
- grpc_rb_byte_buffer *bb = NULL;
- if (p == NULL) {
- return;
- };
- bb = (grpc_rb_byte_buffer *)p;
-
- /* Deletes the wrapped object if the mark object is Qnil, which indicates
- * that no other object is the actual owner. */
- if (bb->wrapped != NULL && bb->mark == Qnil) {
- grpc_byte_buffer_destroy(bb->wrapped);
- }
-
- xfree(p);
-}
-
-/* Protects the mark object from GC */
-static void grpc_rb_byte_buffer_mark(void *p) {
- grpc_rb_byte_buffer *bb = NULL;
- if (p == NULL) {
- return;
- }
- bb = (grpc_rb_byte_buffer *)p;
-
- /* If it's not already cleaned up, mark the mark object */
- if (bb->mark != Qnil && BUILTIN_TYPE(bb->mark) != T_NONE) {
- rb_gc_mark(bb->mark);
- }
+grpc_byte_buffer* grpc_rb_s_to_byte_buffer(char *string, size_t length) {
+ gpr_slice slice = gpr_slice_from_copied_buffer(string, length);
+ grpc_byte_buffer *buffer = grpc_byte_buffer_create(&slice, 1);
+ gpr_slice_unref(slice);
+ return buffer;
}
-/* id_source is the name of the hidden ivar the preserves the original
- * byte_buffer source string */
-static ID id_source;
-
-/* Allocates ByteBuffer instances.
-
- Provides safe default values for the byte_buffer fields. */
-static VALUE grpc_rb_byte_buffer_alloc(VALUE cls) {
- grpc_rb_byte_buffer *wrapper = ALLOC(grpc_rb_byte_buffer);
- wrapper->wrapped = NULL;
- wrapper->mark = Qnil;
- return Data_Wrap_Struct(cls, grpc_rb_byte_buffer_mark,
- grpc_rb_byte_buffer_free, wrapper);
-}
-
-/* Clones ByteBuffer instances.
-
- Gives ByteBuffer a consistent implementation of Ruby's object copy/dup
- protocol. */
-static VALUE grpc_rb_byte_buffer_init_copy(VALUE copy, VALUE orig) {
- grpc_rb_byte_buffer *orig_bb = NULL;
- grpc_rb_byte_buffer *copy_bb = NULL;
-
- if (copy == orig) {
- return copy;
- }
-
- /* Raise an error if orig is not a metadata object or a subclass. */
- if (TYPE(orig) != T_DATA ||
- RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_byte_buffer_free) {
- rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(rb_cByteBuffer));
- }
-
- Data_Get_Struct(orig, grpc_rb_byte_buffer, orig_bb);
- Data_Get_Struct(copy, grpc_rb_byte_buffer, copy_bb);
-
- /* use ruby's MEMCPY to make a byte-for-byte copy of the metadata wrapper
- * object. */
- MEMCPY(copy_bb, orig_bb, grpc_rb_byte_buffer, 1);
- return copy;
-}
-
-/* id_empty is used to return the empty string from to_s when necessary. */
-static ID id_empty;
-
-static VALUE grpc_rb_byte_buffer_to_s(VALUE self) {
- grpc_rb_byte_buffer *wrapper = NULL;
- grpc_byte_buffer *bb = NULL;
- grpc_byte_buffer_reader *reader = NULL;
- char *output = NULL;
+VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) {
size_t length = 0;
+ char *string = NULL;
size_t offset = 0;
- VALUE output_obj = Qnil;
+ grpc_byte_buffer_reader *reader = NULL;
gpr_slice next;
+ if (buffer == NULL) {
+ return Qnil;
- Data_Get_Struct(self, grpc_rb_byte_buffer, wrapper);
- output_obj = rb_ivar_get(wrapper->mark, id_source);
- if (output_obj != Qnil) {
- /* From ruby, ByteBuffers are immutable so if a source is set, return that
- * as the to_s value */
- return output_obj;
- }
-
- /* Read the bytes. */
- bb = wrapper->wrapped;
- if (bb == NULL) {
- return rb_id2str(id_empty);
- }
- length = grpc_byte_buffer_length(bb);
- if (length == 0) {
- return rb_id2str(id_empty);
}
- reader = grpc_byte_buffer_reader_create(bb);
- output = xmalloc(length);
+ length = grpc_byte_buffer_length(buffer);
+ string = xmalloc(length + 1);
+ reader = grpc_byte_buffer_reader_create(buffer);
while (grpc_byte_buffer_reader_next(reader, &next) != 0) {
- memcpy(output + offset, GPR_SLICE_START_PTR(next), GPR_SLICE_LENGTH(next));
+ memcpy(string + offset, GPR_SLICE_START_PTR(next), GPR_SLICE_LENGTH(next));
offset += GPR_SLICE_LENGTH(next);
}
- output_obj = rb_str_new(output, length);
-
- /* Save a references to the computed string in the mark object so that the
- * calling to_s does not do any allocations. */
- wrapper->mark = rb_class_new_instance(0, NULL, rb_cObject);
- rb_ivar_set(wrapper->mark, id_source, output_obj);
-
- return output_obj;
-}
-
-/* Initializes ByteBuffer instances. */
-static VALUE grpc_rb_byte_buffer_init(VALUE self, VALUE src) {
- gpr_slice a_slice;
- grpc_rb_byte_buffer *wrapper = NULL;
- grpc_byte_buffer *byte_buffer = NULL;
-
- if (TYPE(src) != T_STRING) {
- rb_raise(rb_eTypeError, "bad byte_buffer arg: got <%s>, want <String>",
- rb_obj_classname(src));
- return Qnil;
- }
- Data_Get_Struct(self, grpc_rb_byte_buffer, wrapper);
- a_slice = gpr_slice_malloc(RSTRING_LEN(src));
- memcpy(GPR_SLICE_START_PTR(a_slice), RSTRING_PTR(src), RSTRING_LEN(src));
- byte_buffer = grpc_byte_buffer_create(&a_slice, 1);
- gpr_slice_unref(a_slice);
-
- if (byte_buffer == NULL) {
- rb_raise(rb_eArgError, "could not create a byte_buffer, not sure why");
- return Qnil;
- }
- wrapper->wrapped = byte_buffer;
-
- /* Save a references to the original string in the mark object so that the
- * pointers used there is valid for the lifetime of the object. */
- wrapper->mark = rb_class_new_instance(0, NULL, rb_cObject);
- rb_ivar_set(wrapper->mark, id_source, src);
-
- return self;
-}
-
-/* rb_cByteBuffer is the ruby class that proxies grpc_byte_buffer. */
-VALUE rb_cByteBuffer = Qnil;
-
-void Init_grpc_byte_buffer() {
- rb_cByteBuffer =
- rb_define_class_under(rb_mGrpcCore, "ByteBuffer", rb_cObject);
-
- /* Allocates an object managed by the ruby runtime */
- rb_define_alloc_func(rb_cByteBuffer, grpc_rb_byte_buffer_alloc);
-
- /* Provides a ruby constructor and support for dup/clone. */
- rb_define_method(rb_cByteBuffer, "initialize", grpc_rb_byte_buffer_init, 1);
- rb_define_method(rb_cByteBuffer, "initialize_copy",
- grpc_rb_byte_buffer_init_copy, 1);
-
- /* Provides a to_s method that returns the buffer value */
- rb_define_method(rb_cByteBuffer, "to_s", grpc_rb_byte_buffer_to_s, 0);
-
- id_source = rb_intern("__source");
- id_empty = rb_intern("");
-}
-
-VALUE grpc_rb_byte_buffer_create_with_mark(VALUE mark, grpc_byte_buffer *bb) {
- grpc_rb_byte_buffer *byte_buffer = NULL;
- if (bb == NULL) {
- return Qnil;
- }
- byte_buffer = ALLOC(grpc_rb_byte_buffer);
- byte_buffer->wrapped = bb;
- byte_buffer->mark = mark;
- return Data_Wrap_Struct(rb_cByteBuffer, grpc_rb_byte_buffer_mark,
- grpc_rb_byte_buffer_free, byte_buffer);
-}
-
-/* Gets the wrapped byte_buffer from the ruby wrapper */
-grpc_byte_buffer *grpc_rb_get_wrapped_byte_buffer(VALUE v) {
- grpc_rb_byte_buffer *wrapper = NULL;
- Data_Get_Struct(v, grpc_rb_byte_buffer, wrapper);
- return wrapper->wrapped;
+ return rb_str_new(string, length);
}
diff --git a/src/ruby/ext/grpc/rb_byte_buffer.h b/src/ruby/ext/grpc/rb_byte_buffer.h
index 6ef72f3e75..96b9009dae 100644
--- a/src/ruby/ext/grpc/rb_byte_buffer.h
+++ b/src/ruby/ext/grpc/rb_byte_buffer.h
@@ -37,18 +37,10 @@
#include <grpc/grpc.h>
#include <ruby.h>
-/* rb_cByteBuffer is the ByteBuffer class whose instances proxy
- grpc_byte_buffer. */
-extern VALUE rb_cByteBuffer;
+/* Converts a char* with a length to a grpc_byte_buffer */
+grpc_byte_buffer *grpc_rb_s_to_byte_buffer(char *string, size_t length);
-/* Initializes the ByteBuffer class. */
-void Init_grpc_byte_buffer();
-
-/* grpc_rb_byte_buffer_create_with_mark creates a grpc_rb_byte_buffer with a
- * ruby mark object that will be kept alive while the byte_buffer is alive. */
-VALUE grpc_rb_byte_buffer_create_with_mark(VALUE mark, grpc_byte_buffer* bb);
-
-/* Gets the wrapped byte_buffer from its ruby object. */
-grpc_byte_buffer* grpc_rb_get_wrapped_byte_buffer(VALUE v);
+/* Converts a grpc_byte_buffer to a ruby string */
+VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer);
#endif /* GRPC_RB_BYTE_BUFFER_H_ */
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index b5a256d5a6..e76bb930ee 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -36,11 +36,31 @@
#include <ruby.h>
#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+
#include "rb_byte_buffer.h"
#include "rb_completion_queue.h"
-#include "rb_metadata.h"
#include "rb_grpc.h"
+/* grpc_rb_cCall is the Call class whose instances proxy grpc_call. */
+static VALUE grpc_rb_cCall;
+
+/* grpc_rb_eCallError is the ruby class of the exception thrown during call
+ operations; */
+VALUE grpc_rb_eCallError = Qnil;
+
+/* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate
+ a timeout. */
+static VALUE grpc_rb_eOutOfTime = Qnil;
+
+/* grpc_rb_sBatchResult is struct class used to hold the results of a batch
+ * call. */
+static VALUE grpc_rb_sBatchResult;
+
+/* grpc_rb_cMdAry is the MetadataArray class whose instances proxy
+ * grpc_metadata_array. */
+static VALUE grpc_rb_cMdAry;
+
/* id_cq is the name of the hidden ivar that preserves a reference to a
* completion queue */
static ID id_cq;
@@ -62,13 +82,22 @@ static ID id_metadata;
* received by the call and subsequently saved on it. */
static ID id_status;
+/* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */
+static VALUE sym_send_message;
+static VALUE sym_send_metadata;
+static VALUE sym_send_close;
+static VALUE sym_send_status;
+static VALUE sym_message;
+static VALUE sym_status;
+static VALUE sym_cancelled;
+
/* hash_all_calls is a hash of Call address -> reference count that is used to
* track the creation and destruction of rb_call instances.
*/
static VALUE hash_all_calls;
/* Destroys a Call. */
-void grpc_rb_call_destroy(void *p) {
+static void grpc_rb_call_destroy(void *p) {
grpc_call *call = NULL;
VALUE ref_count = Qnil;
if (p == NULL) {
@@ -88,6 +117,38 @@ void grpc_rb_call_destroy(void *p) {
}
}
+static size_t md_ary_datasize(const void *p) {
+ const grpc_metadata_array *const ary = (grpc_metadata_array *)p;
+ size_t i, datasize = sizeof(grpc_metadata_array);
+ for (i = 0; i < ary->count; ++i) {
+ const grpc_metadata *const md = &ary->metadata[i];
+ datasize += strlen(md->key);
+ datasize += md->value_length;
+ }
+ datasize += ary->capacity * sizeof(grpc_metadata);
+ return datasize;
+}
+
+static const rb_data_type_t grpc_rb_md_ary_data_type = {
+ "grpc_metadata_array",
+ {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, md_ary_datasize},
+ NULL,
+ NULL,
+ 0};
+
+/* Describes grpc_call struct for RTypedData */
+static const rb_data_type_t grpc_call_data_type = {
+ "grpc_call",
+ {GRPC_RB_GC_NOT_MARKED, grpc_rb_call_destroy, GRPC_RB_MEMSIZE_UNAVAILABLE},
+ NULL,
+ NULL,
+ /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because
+ * grpc_rb_call_destroy
+ * touches a hash object.
+ * TODO(yugui) Directly use st_table and call the free function earlier?
+ */
+ 0};
+
/* Error code details is a hash containing text strings describing errors */
VALUE rb_error_code_details;
@@ -101,93 +162,15 @@ const char *grpc_call_error_detail_of(grpc_call_error err) {
return detail;
}
-/* grpc_rb_call_add_metadata_hash_cb is the hash iteration callback used by
- grpc_rb_call_add_metadata.
-*/
-int grpc_rb_call_add_metadata_hash_cb(VALUE key, VALUE val, VALUE call_obj) {
- grpc_call *call = NULL;
- grpc_metadata *md = NULL;
- VALUE md_obj = Qnil;
- VALUE md_obj_args[2];
- VALUE flags = rb_ivar_get(call_obj, id_flags);
- grpc_call_error err;
- int array_length;
- int i;
-
- /* Construct a metadata object from key and value and add it */
- Data_Get_Struct(call_obj, grpc_call, call);
- md_obj_args[0] = key;
-
- if (TYPE(val) == T_ARRAY) {
- /* If the value is an array, add each value in the array separately */
- array_length = RARRAY_LEN(val);
- for (i = 0; i < array_length; i++) {
- md_obj_args[1] = rb_ary_entry(val, i);
- md_obj = rb_class_new_instance(2, md_obj_args, rb_cMetadata);
- md = grpc_rb_get_wrapped_metadata(md_obj);
- err = grpc_call_add_metadata_old(call, md, NUM2UINT(flags));
- if (err != GRPC_CALL_OK) {
- rb_raise(rb_eCallError, "add metadata failed: %s (code=%d)",
- grpc_call_error_detail_of(err), err);
- return ST_STOP;
- }
- }
- } else {
- md_obj_args[1] = val;
- md_obj = rb_class_new_instance(2, md_obj_args, rb_cMetadata);
- md = grpc_rb_get_wrapped_metadata(md_obj);
- err = grpc_call_add_metadata_old(call, md, NUM2UINT(flags));
- if (err != GRPC_CALL_OK) {
- rb_raise(rb_eCallError, "add metadata failed: %s (code=%d)",
- grpc_call_error_detail_of(err), err);
- return ST_STOP;
- }
- }
-
- return ST_CONTINUE;
-}
-
-/*
- call-seq:
- call.add_metadata(completion_queue, hash_elements, flags=nil)
-
- Add metadata elements to the call from a ruby hash, to be sent upon
- invocation. flags is a bit-field combination of the write flags defined
- above. REQUIRES: grpc_call_invoke/grpc_call_accept have not been
- called on this call. Produces no events. */
-
-static VALUE grpc_rb_call_add_metadata(int argc, VALUE *argv, VALUE self) {
- VALUE metadata;
- VALUE flags = Qnil;
- ID id_size = rb_intern("size");
-
- /* "11" == 1 mandatory args, 1 (flags) is optional */
- rb_scan_args(argc, argv, "11", &metadata, &flags);
- if (NIL_P(flags)) {
- flags = UINT2NUM(0); /* Default to no flags */
- }
- if (TYPE(metadata) != T_HASH) {
- rb_raise(rb_eTypeError, "add metadata failed: metadata should be a hash");
- return Qnil;
- }
- if (NUM2UINT(rb_funcall(metadata, id_size, 0)) == 0) {
- return Qnil;
- }
- rb_ivar_set(self, id_flags, flags);
- rb_ivar_set(self, id_input_md, metadata);
- rb_hash_foreach(metadata, grpc_rb_call_add_metadata_hash_cb, self);
- return Qnil;
-}
-
/* Called by clients to cancel an RPC on the server.
Can be called multiple times, from any thread. */
static VALUE grpc_rb_call_cancel(VALUE self) {
grpc_call *call = NULL;
grpc_call_error err;
- Data_Get_Struct(self, grpc_call, call);
+ TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
err = grpc_call_cancel(call);
if (err != GRPC_CALL_OK) {
- rb_raise(rb_eCallError, "cancel failed: %s (code=%d)",
+ rb_raise(grpc_rb_eCallError, "cancel failed: %s (code=%d)",
grpc_call_error_detail_of(err), err);
}
@@ -196,77 +179,20 @@ static VALUE grpc_rb_call_cancel(VALUE self) {
/*
call-seq:
- call.invoke(completion_queue, tag, flags=nil)
-
- Invoke the RPC. Starts sending metadata and request headers on the wire.
- flags is a bit-field combination of the write flags defined above.
- REQUIRES: Can be called at most once per call.
- Can only be called on the client.
- Produces a GRPC_INVOKE_ACCEPTED event on completion. */
-static VALUE grpc_rb_call_invoke(int argc, VALUE *argv, VALUE self) {
- VALUE cqueue = Qnil;
- VALUE metadata_read_tag = Qnil;
- VALUE finished_tag = Qnil;
- VALUE flags = Qnil;
- grpc_call *call = NULL;
- grpc_completion_queue *cq = NULL;
- grpc_call_error err;
+ status = call.status
- /* "31" == 3 mandatory args, 1 (flags) is optional */
- rb_scan_args(argc, argv, "31", &cqueue, &metadata_read_tag, &finished_tag,
- &flags);
- if (NIL_P(flags)) {
- flags = UINT2NUM(0); /* Default to no flags */
- }
- cq = grpc_rb_get_wrapped_completion_queue(cqueue);
- Data_Get_Struct(self, grpc_call, call);
- err = grpc_call_invoke_old(call, cq, ROBJECT(metadata_read_tag),
- ROBJECT(finished_tag), NUM2UINT(flags));
- if (err != GRPC_CALL_OK) {
- rb_raise(rb_eCallError, "invoke failed: %s (code=%d)",
- grpc_call_error_detail_of(err), err);
- }
-
- /* Add the completion queue as an instance attribute, prevents it from being
- * GCed until this call object is GCed */
- rb_ivar_set(self, id_cq, cqueue);
-
- return Qnil;
-}
-
-/* Initiate a read on a call. Output event contains a byte buffer with the
- result of the read.
- REQUIRES: No other reads are pending on the call. It is only safe to start
- the next read after the corresponding read event is received. */
-static VALUE grpc_rb_call_start_read(VALUE self, VALUE tag) {
- grpc_call *call = NULL;
- grpc_call_error err;
- Data_Get_Struct(self, grpc_call, call);
- err = grpc_call_start_read_old(call, ROBJECT(tag));
- if (err != GRPC_CALL_OK) {
- rb_raise(rb_eCallError, "start read failed: %s (code=%d)",
- grpc_call_error_detail_of(err), err);
- }
-
- return Qnil;
-}
-
-/*
- call-seq:
- status = call.status
-
- Gets the status object saved the call. */
+ Gets the status object saved the call. */
static VALUE grpc_rb_call_get_status(VALUE self) {
return rb_ivar_get(self, id_status);
}
/*
call-seq:
- call.status = status
+ call.status = status
- Saves a status object on the call. */
+ Saves a status object on the call. */
static VALUE grpc_rb_call_set_status(VALUE self, VALUE status) {
- if (!NIL_P(status) && rb_obj_class(status) != rb_sStatus) {
+ if (!NIL_P(status) && rb_obj_class(status) != grpc_rb_sStatus) {
rb_raise(rb_eTypeError, "bad status: got:<%s> want: <Struct::Status>",
rb_obj_classname(status));
return Qnil;
@@ -277,18 +203,18 @@ static VALUE grpc_rb_call_set_status(VALUE self, VALUE status) {
/*
call-seq:
- metadata = call.metadata
+ metadata = call.metadata
- Gets the metadata object saved the call. */
+ Gets the metadata object saved the call. */
static VALUE grpc_rb_call_get_metadata(VALUE self) {
return rb_ivar_get(self, id_metadata);
}
/*
call-seq:
- call.metadata = metadata
+ call.metadata = metadata
- Saves the metadata hash on the call. */
+ Saves the metadata hash on the call. */
static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
@@ -299,176 +225,425 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
return rb_ivar_set(self, id_metadata, metadata);
}
-/*
- call-seq:
- call.start_write(byte_buffer, tag, flags=nil)
-
- Queue a byte buffer for writing.
- flags is a bit-field combination of the write flags defined above.
- A write with byte_buffer null is allowed, and will not send any bytes on the
- wire. If this is performed without GRPC_WRITE_BUFFER_HINT flag it provides
- a mechanism to flush any previously buffered writes to outgoing flow control.
- REQUIRES: No other writes are pending on the call. It is only safe to
- start the next write after the corresponding write_accepted event
- is received.
- GRPC_INVOKE_ACCEPTED must have been received by the application
- prior to calling this on the client. On the server,
- grpc_call_accept must have been called successfully.
- Produces a GRPC_WRITE_ACCEPTED event. */
-static VALUE grpc_rb_call_start_write(int argc, VALUE *argv, VALUE self) {
- VALUE byte_buffer = Qnil;
- VALUE tag = Qnil;
- VALUE flags = Qnil;
- grpc_call *call = NULL;
- grpc_byte_buffer *bfr = NULL;
- grpc_call_error err;
+/* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used
+ to fill grpc_metadata_array.
+
+ it's capacity should have been computed via a prior call to
+ grpc_rb_md_ary_fill_hash_cb
+*/
+static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
+ grpc_metadata_array *md_ary = NULL;
+ int array_length;
+ int i;
+
+ /* Construct a metadata object from key and value and add it */
+ TypedData_Get_Struct(md_ary_obj, grpc_metadata_array,
+ &grpc_rb_md_ary_data_type, md_ary);
- /* "21" == 2 mandatory args, 1 (flags) is optional */
- rb_scan_args(argc, argv, "21", &byte_buffer, &tag, &flags);
- if (NIL_P(flags)) {
- flags = UINT2NUM(0); /* Default to no flags */
+ if (TYPE(val) == T_ARRAY) {
+ /* If the value is an array, add capacity for each value in the array */
+ array_length = RARRAY_LEN(val);
+ for (i = 0; i < array_length; i++) {
+ if (TYPE(key) == T_SYMBOL) {
+ md_ary->metadata[md_ary->count].key = (char *)rb_id2name(SYM2ID(key));
+ } else { /* StringValueCStr does all other type exclusions for us */
+ md_ary->metadata[md_ary->count].key = StringValueCStr(key);
+ }
+ md_ary->metadata[md_ary->count].value = RSTRING_PTR(rb_ary_entry(val, i));
+ md_ary->metadata[md_ary->count].value_length =
+ RSTRING_LEN(rb_ary_entry(val, i));
+ md_ary->count += 1;
+ }
+ } else {
+ if (TYPE(key) == T_SYMBOL) {
+ md_ary->metadata[md_ary->count].key = (char *)rb_id2name(SYM2ID(key));
+ } else { /* StringValueCStr does all other type exclusions for us */
+ md_ary->metadata[md_ary->count].key = StringValueCStr(key);
+ }
+ md_ary->metadata[md_ary->count].value = RSTRING_PTR(val);
+ md_ary->metadata[md_ary->count].value_length = RSTRING_LEN(val);
+ md_ary->count += 1;
}
- bfr = grpc_rb_get_wrapped_byte_buffer(byte_buffer);
- Data_Get_Struct(self, grpc_call, call);
- err = grpc_call_start_write_old(call, bfr, ROBJECT(tag), NUM2UINT(flags));
- if (err != GRPC_CALL_OK) {
- rb_raise(rb_eCallError, "start write failed: %s (code=%d)",
- grpc_call_error_detail_of(err), err);
+
+ return ST_CONTINUE;
+}
+
+/* grpc_rb_md_ary_capacity_hash_cb is the hash iteration callback used
+ to pre-compute the capacity a grpc_metadata_array.
+*/
+static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val,
+ VALUE md_ary_obj) {
+ grpc_metadata_array *md_ary = NULL;
+
+ /* Construct a metadata object from key and value and add it */
+ TypedData_Get_Struct(md_ary_obj, grpc_metadata_array,
+ &grpc_rb_md_ary_data_type, md_ary);
+
+ if (TYPE(val) == T_ARRAY) {
+ /* If the value is an array, add capacity for each value in the array */
+ md_ary->capacity += RARRAY_LEN(val);
+ } else {
+ md_ary->capacity += 1;
}
+ return ST_CONTINUE;
+}
- return Qnil;
+/* grpc_rb_md_ary_convert converts a ruby metadata hash into
+ a grpc_metadata_array.
+*/
+static void grpc_rb_md_ary_convert(VALUE md_ary_hash,
+ grpc_metadata_array *md_ary) {
+ VALUE md_ary_obj = Qnil;
+ if (md_ary_hash == Qnil) {
+ return; /* Do nothing if the expected has value is nil */
+ }
+ if (TYPE(md_ary_hash) != T_HASH) {
+ rb_raise(rb_eTypeError, "md_ary_convert: got <%s>, want <Hash>",
+ rb_obj_classname(md_ary_hash));
+ return;
+ }
+
+ /* Initialize the array, compute it's capacity, then fill it. */
+ grpc_metadata_array_init(md_ary);
+ md_ary_obj =
+ TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary);
+ rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj);
+ md_ary->metadata = gpr_malloc(md_ary->capacity * sizeof(grpc_metadata));
+ rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj);
}
-/* Queue a status for writing.
-
- call-seq:
- tag = Object.new
- call.write_status(200, "OK", tag)
-
- REQUIRES: No other writes are pending on the call. It is only safe to
- start the next write after the corresponding write_accepted event
- is received.
- GRPC_INVOKE_ACCEPTED must have been received by the application
- prior to calling this.
- Only callable on the server.
- Produces a GRPC_FINISHED event when the status is sent and the stream is
- fully closed */
-static VALUE grpc_rb_call_start_write_status(VALUE self, VALUE code,
- VALUE status, VALUE tag) {
- grpc_call *call = NULL;
- grpc_call_error err;
- Data_Get_Struct(self, grpc_call, call);
- err = grpc_call_start_write_status_old(call, NUM2UINT(code),
- StringValueCStr(status), ROBJECT(tag));
- if (err != GRPC_CALL_OK) {
- rb_raise(rb_eCallError, "start write status: %s (code=%d)",
- grpc_call_error_detail_of(err), err);
+/* Converts a metadata array to a hash. */
+VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary) {
+ VALUE key = Qnil;
+ VALUE new_ary = Qnil;
+ VALUE value = Qnil;
+ VALUE result = rb_hash_new();
+ size_t i;
+
+ for (i = 0; i < md_ary->count; i++) {
+ key = rb_str_new2(md_ary->metadata[i].key);
+ value = rb_hash_aref(result, key);
+ if (value == Qnil) {
+ value = rb_str_new(md_ary->metadata[i].value,
+ md_ary->metadata[i].value_length);
+ rb_hash_aset(result, key, value);
+ } else if (TYPE(value) == T_ARRAY) {
+ /* Add the string to the returned array */
+ rb_ary_push(value, rb_str_new(md_ary->metadata[i].value,
+ md_ary->metadata[i].value_length));
+ } else {
+ /* Add the current value with this key and the new one to an array */
+ new_ary = rb_ary_new();
+ rb_ary_push(new_ary, value);
+ rb_ary_push(new_ary, rb_str_new(md_ary->metadata[i].value,
+ md_ary->metadata[i].value_length));
+ rb_hash_aset(result, key, new_ary);
+ }
}
+ return result;
+}
- return Qnil;
+/* grpc_rb_call_check_op_keys_hash_cb is a hash iteration func that checks
+ each key of an ops hash is valid.
+*/
+static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
+ VALUE ops_ary) {
+ /* Update the capacity; the value is an array, add capacity for each value in
+ * the array */
+ if (TYPE(key) != T_FIXNUM) {
+ rb_raise(rb_eTypeError, "invalid operation : got <%s>, want <Fixnum>",
+ rb_obj_classname(key));
+ return ST_STOP;
+ }
+ switch (NUM2INT(key)) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ case GRPC_OP_SEND_MESSAGE:
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ case GRPC_OP_RECV_MESSAGE:
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ rb_ary_push(ops_ary, key);
+ return ST_CONTINUE;
+ default:
+ rb_raise(rb_eTypeError, "invalid operation : bad value %d", NUM2INT(key));
+ };
+ return ST_STOP;
}
-/* No more messages to send.
- REQUIRES: No other writes are pending on the call. */
-static VALUE grpc_rb_call_writes_done(VALUE self, VALUE tag) {
- grpc_call *call = NULL;
- grpc_call_error err;
- Data_Get_Struct(self, grpc_call, call);
- err = grpc_call_writes_done_old(call, ROBJECT(tag));
- if (err != GRPC_CALL_OK) {
- rb_raise(rb_eCallError, "writes done: %s (code=%d)",
- grpc_call_error_detail_of(err), err);
+/* grpc_rb_op_update_status_from_server adds the values in a ruby status
+ struct to the 'send_status_from_server' portion of an op.
+*/
+static void grpc_rb_op_update_status_from_server(grpc_op *op,
+ grpc_metadata_array *md_ary,
+ VALUE status) {
+ VALUE code = rb_struct_aref(status, sym_code);
+ VALUE details = rb_struct_aref(status, sym_details);
+ VALUE metadata_hash = rb_struct_aref(status, sym_metadata);
+
+ /* TODO: add check to ensure status is the correct struct type */
+ if (TYPE(code) != T_FIXNUM) {
+ rb_raise(rb_eTypeError, "invalid code : got <%s>, want <Fixnum>",
+ rb_obj_classname(code));
+ return;
}
+ if (TYPE(details) != T_STRING) {
+ rb_raise(rb_eTypeError, "invalid details : got <%s>, want <String>",
+ rb_obj_classname(code));
+ return;
+ }
+ op->data.send_status_from_server.status = NUM2INT(code);
+ op->data.send_status_from_server.status_details = StringValueCStr(details);
+ grpc_rb_md_ary_convert(metadata_hash, md_ary);
+ op->data.send_status_from_server.trailing_metadata_count = md_ary->count;
+ op->data.send_status_from_server.trailing_metadata = md_ary->metadata;
+}
- return Qnil;
+/* run_batch_stack holds various values used by the
+ * grpc_rb_call_run_batch function */
+typedef struct run_batch_stack {
+ /* The batch ops */
+ grpc_op ops[8]; /* 8 is the maximum number of operations */
+ size_t op_num; /* tracks the last added operation */
+
+ /* Data being sent */
+ grpc_metadata_array send_metadata;
+ grpc_metadata_array send_trailing_metadata;
+
+ /* Data being received */
+ grpc_byte_buffer *recv_message;
+ grpc_metadata_array recv_metadata;
+ grpc_metadata_array recv_trailing_metadata;
+ int recv_cancelled;
+ grpc_status_code recv_status;
+ char *recv_status_details;
+ size_t recv_status_details_capacity;
+} run_batch_stack;
+
+/* grpc_run_batch_stack_init ensures the run_batch_stack is properly
+ * initialized */
+static void grpc_run_batch_stack_init(run_batch_stack *st) {
+ MEMZERO(st, run_batch_stack, 1);
+ grpc_metadata_array_init(&st->send_metadata);
+ grpc_metadata_array_init(&st->send_trailing_metadata);
+ grpc_metadata_array_init(&st->recv_metadata);
+ grpc_metadata_array_init(&st->recv_trailing_metadata);
+ st->op_num = 0;
}
-/* call-seq:
- call.server_end_initial_metadata(flag)
-
- Only to be called on servers, before sending messages.
- flags is a bit-field combination of the write flags defined above.
-
- REQUIRES: Can be called at most once per call.
- Can only be called on the server, must be called after
- grpc_call_server_accept
- Produces no events */
-static VALUE grpc_rb_call_server_end_initial_metadata(int argc, VALUE *argv,
- VALUE self) {
- VALUE flags = Qnil;
- grpc_call *call = NULL;
- grpc_call_error err;
+/* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
+ * cleaned up */
+static void grpc_run_batch_stack_cleanup(run_batch_stack *st) {
+ grpc_metadata_array_destroy(&st->send_metadata);
+ grpc_metadata_array_destroy(&st->send_trailing_metadata);
+ grpc_metadata_array_destroy(&st->recv_metadata);
+ grpc_metadata_array_destroy(&st->recv_trailing_metadata);
+ if (st->recv_status_details != NULL) {
+ gpr_free(st->recv_status_details);
+ }
+}
- /* "01" == 1 (flags) is optional */
- rb_scan_args(argc, argv, "01", &flags);
- if (NIL_P(flags)) {
- flags = UINT2NUM(0); /* Default to no flags */
+/* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from
+ * ops_hash */
+static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
+ VALUE this_op = Qnil;
+ VALUE this_value = Qnil;
+ VALUE ops_ary = rb_ary_new();
+ size_t i = 0;
+
+ /* Create a ruby array with just the operation keys */
+ rb_hash_foreach(ops_hash, grpc_rb_call_check_op_keys_hash_cb, ops_ary);
+
+ /* Fill the ops array */
+ for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
+ this_op = rb_ary_entry(ops_ary, i);
+ this_value = rb_hash_aref(ops_hash, this_op);
+ switch (NUM2INT(this_op)) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ /* N.B. later there is no need to explicitly delete the metadata keys
+ * and values, they are references to data in ruby objects. */
+ grpc_rb_md_ary_convert(this_value, &st->send_metadata);
+ st->ops[st->op_num].data.send_initial_metadata.count =
+ st->send_metadata.count;
+ st->ops[st->op_num].data.send_initial_metadata.metadata =
+ st->send_metadata.metadata;
+ break;
+ case GRPC_OP_SEND_MESSAGE:
+ st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer(
+ RSTRING_PTR(this_value), RSTRING_LEN(this_value));
+ break;
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ break;
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ /* N.B. later there is no need to explicitly delete the metadata keys
+ * and values, they are references to data in ruby objects. */
+ grpc_rb_op_update_status_from_server(
+ &st->ops[st->op_num], &st->send_trailing_metadata, this_value);
+ break;
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ st->ops[st->op_num].data.recv_initial_metadata = &st->recv_metadata;
+ break;
+ case GRPC_OP_RECV_MESSAGE:
+ st->ops[st->op_num].data.recv_message = &st->recv_message;
+ break;
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ st->ops[st->op_num].data.recv_status_on_client.trailing_metadata =
+ &st->recv_trailing_metadata;
+ st->ops[st->op_num].data.recv_status_on_client.status =
+ &st->recv_status;
+ st->ops[st->op_num].data.recv_status_on_client.status_details =
+ &st->recv_status_details;
+ st->ops[st->op_num].data.recv_status_on_client.status_details_capacity =
+ &st->recv_status_details_capacity;
+ break;
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ st->ops[st->op_num].data.recv_close_on_server.cancelled =
+ &st->recv_cancelled;
+ break;
+ default:
+ grpc_run_batch_stack_cleanup(st);
+ rb_raise(rb_eTypeError, "invalid operation : bad value %d",
+ NUM2INT(this_op));
+ };
+ st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
+ st->op_num++;
}
- Data_Get_Struct(self, grpc_call, call);
- err = grpc_call_server_end_initial_metadata_old(call, NUM2UINT(flags));
- if (err != GRPC_CALL_OK) {
- rb_raise(rb_eCallError, "end_initial_metadata failed: %s (code=%d)",
- grpc_call_error_detail_of(err), err);
+}
+
+/* grpc_run_batch_stack_build_result fills constructs a ruby BatchResult struct
+ after the results have run */
+static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
+ size_t i = 0;
+ VALUE result = rb_struct_new(grpc_rb_sBatchResult, Qnil, Qnil, Qnil, Qnil,
+ Qnil, Qnil, Qnil, Qnil, NULL);
+ for (i = 0; i < st->op_num; i++) {
+ switch (st->ops[i].op) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ rb_struct_aset(result, sym_send_metadata, Qtrue);
+ break;
+ case GRPC_OP_SEND_MESSAGE:
+ rb_struct_aset(result, sym_send_message, Qtrue);
+ grpc_byte_buffer_destroy(st->ops[i].data.send_message);
+ break;
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ rb_struct_aset(result, sym_send_close, Qtrue);
+ break;
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ rb_struct_aset(result, sym_send_status, Qtrue);
+ break;
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ rb_struct_aset(result, sym_metadata,
+ grpc_rb_md_ary_to_h(&st->recv_metadata));
+ case GRPC_OP_RECV_MESSAGE:
+ rb_struct_aset(result, sym_message,
+ grpc_rb_byte_buffer_to_s(st->recv_message));
+ break;
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ rb_struct_aset(
+ result, sym_status,
+ rb_struct_new(grpc_rb_sStatus, UINT2NUM(st->recv_status),
+ (st->recv_status_details == NULL
+ ? Qnil
+ : rb_str_new2(st->recv_status_details)),
+ grpc_rb_md_ary_to_h(&st->recv_trailing_metadata),
+ NULL));
+ break;
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ rb_struct_aset(result, sym_send_close, Qtrue);
+ break;
+ default:
+ break;
+ }
}
- return Qnil;
+ return result;
}
/* call-seq:
- call.server_accept(completion_queue, finished_tag)
-
- Accept an incoming RPC, binding a completion queue to it.
- To be called before sending or receiving messages.
-
- REQUIRES: Can be called at most once per call.
- Can only be called on the server.
- Produces a GRPC_FINISHED event with finished_tag when the call has been
- completed (there may be other events for the call pending at this
- time) */
-static VALUE grpc_rb_call_server_accept(VALUE self, VALUE cqueue,
- VALUE finished_tag) {
+ cq = CompletionQueue.new
+ ops = {
+ GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>,
+ GRPC::Core::CallOps::SEND_MESSAGE => <op_value>,
+ ...
+ }
+ tag = Object.new
+ timeout = 10
+ call.start_batch(cqueue, tag, timeout, ops)
+
+ Start a batch of operations defined in the array ops; when complete, post a
+ completion of type 'tag' to the completion queue bound to the call.
+
+ Also waits for the batch to complete, until timeout is reached.
+ The order of ops specified in the batch has no significance.
+ Only one operation of each type can be active at once in any given
+ batch */
+static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
+ VALUE timeout, VALUE ops_hash) {
+ run_batch_stack st;
grpc_call *call = NULL;
- grpc_completion_queue *cq = grpc_rb_get_wrapped_completion_queue(cqueue);
+ grpc_event *ev = NULL;
grpc_call_error err;
- Data_Get_Struct(self, grpc_call, call);
- err = grpc_call_server_accept_old(call, cq, ROBJECT(finished_tag));
+ VALUE result = Qnil;
+ TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
+
+ /* Validate the ops args, adding them to a ruby array */
+ if (TYPE(ops_hash) != T_HASH) {
+ rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
+ return Qnil;
+ }
+ grpc_run_batch_stack_init(&st);
+ grpc_run_batch_stack_fill_ops(&st, ops_hash);
+
+ /* call grpc_call_start_batch, then wait for it to complete using
+ * pluck_event */
+ err = grpc_call_start_batch(call, st.ops, st.op_num, ROBJECT(tag));
if (err != GRPC_CALL_OK) {
- rb_raise(rb_eCallError, "server_accept failed: %s (code=%d)",
+ grpc_run_batch_stack_cleanup(&st);
+ rb_raise(grpc_rb_eCallError,
+ "grpc_call_start_batch failed with %s (code=%d)",
grpc_call_error_detail_of(err), err);
+ return Qnil;
+ }
+ ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout);
+ if (ev == NULL) {
+ grpc_run_batch_stack_cleanup(&st);
+ rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out");
+ return Qnil;
+ }
+ if (ev->data.op_complete != GRPC_OP_OK) {
+ grpc_run_batch_stack_cleanup(&st);
+ rb_raise(grpc_rb_eCallError, "start_batch completion failed, (code=%d)",
+ ev->data.op_complete);
+ return Qnil;
}
- /* Add the completion queue as an instance attribute, prevents it from being
- * GCed until this call object is GCed */
- rb_ivar_set(self, id_cq, cqueue);
- return Qnil;
+ /* Build and return the BatchResult struct result */
+ result = grpc_run_batch_stack_build_result(&st);
+ grpc_run_batch_stack_cleanup(&st);
+ return result;
}
-/* rb_cCall is the ruby class that proxies grpc_call. */
-VALUE rb_cCall = Qnil;
-
-/* rb_eCallError is the ruby class of the exception thrown during call
- operations; */
-VALUE rb_eCallError = Qnil;
-
-void Init_grpc_error_codes() {
+static void Init_grpc_error_codes() {
/* Constants representing the error codes of grpc_call_error in grpc.h */
- VALUE rb_RpcErrors = rb_define_module_under(rb_mGrpcCore, "RpcErrors");
- rb_define_const(rb_RpcErrors, "OK", UINT2NUM(GRPC_CALL_OK));
- rb_define_const(rb_RpcErrors, "ERROR", UINT2NUM(GRPC_CALL_ERROR));
- rb_define_const(rb_RpcErrors, "NOT_ON_SERVER",
+ VALUE grpc_rb_mRpcErrors =
+ rb_define_module_under(grpc_rb_mGrpcCore, "RpcErrors");
+ rb_define_const(grpc_rb_mRpcErrors, "OK", UINT2NUM(GRPC_CALL_OK));
+ rb_define_const(grpc_rb_mRpcErrors, "ERROR", UINT2NUM(GRPC_CALL_ERROR));
+ rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_SERVER",
UINT2NUM(GRPC_CALL_ERROR_NOT_ON_SERVER));
- rb_define_const(rb_RpcErrors, "NOT_ON_CLIENT",
+ rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_CLIENT",
UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT));
- rb_define_const(rb_RpcErrors, "ALREADY_ACCEPTED",
+ rb_define_const(grpc_rb_mRpcErrors, "ALREADY_ACCEPTED",
UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED));
- rb_define_const(rb_RpcErrors, "ALREADY_INVOKED",
+ rb_define_const(grpc_rb_mRpcErrors, "ALREADY_INVOKED",
UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED));
- rb_define_const(rb_RpcErrors, "NOT_INVOKED",
+ rb_define_const(grpc_rb_mRpcErrors, "NOT_INVOKED",
UINT2NUM(GRPC_CALL_ERROR_NOT_INVOKED));
- rb_define_const(rb_RpcErrors, "ALREADY_FINISHED",
+ rb_define_const(grpc_rb_mRpcErrors, "ALREADY_FINISHED",
UINT2NUM(GRPC_CALL_ERROR_ALREADY_FINISHED));
- rb_define_const(rb_RpcErrors, "TOO_MANY_OPERATIONS",
+ rb_define_const(grpc_rb_mRpcErrors, "TOO_MANY_OPERATIONS",
UINT2NUM(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS));
- rb_define_const(rb_RpcErrors, "INVALID_FLAGS",
+ rb_define_const(grpc_rb_mRpcErrors, "INVALID_FLAGS",
UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS));
/* Add the detail strings to a Hash */
@@ -496,37 +671,54 @@ void Init_grpc_error_codes() {
rb_str_new2("outstanding read or write present"));
rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS),
rb_str_new2("a bad flag was given"));
- rb_define_const(rb_RpcErrors, "ErrorMessages", rb_error_code_details);
+ rb_define_const(grpc_rb_mRpcErrors, "ErrorMessages", rb_error_code_details);
rb_obj_freeze(rb_error_code_details);
}
+static void Init_grpc_op_codes() {
+ /* Constants representing operation type codes in grpc.h */
+ VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
+ rb_define_const(grpc_rb_mCallOps, "SEND_INITIAL_METADATA",
+ UINT2NUM(GRPC_OP_SEND_INITIAL_METADATA));
+ rb_define_const(grpc_rb_mCallOps, "SEND_MESSAGE",
+ UINT2NUM(GRPC_OP_SEND_MESSAGE));
+ rb_define_const(grpc_rb_mCallOps, "SEND_CLOSE_FROM_CLIENT",
+ UINT2NUM(GRPC_OP_SEND_CLOSE_FROM_CLIENT));
+ rb_define_const(grpc_rb_mCallOps, "SEND_STATUS_FROM_SERVER",
+ UINT2NUM(GRPC_OP_SEND_STATUS_FROM_SERVER));
+ rb_define_const(grpc_rb_mCallOps, "RECV_INITIAL_METADATA",
+ UINT2NUM(GRPC_OP_RECV_INITIAL_METADATA));
+ rb_define_const(grpc_rb_mCallOps, "RECV_MESSAGE",
+ UINT2NUM(GRPC_OP_RECV_MESSAGE));
+ rb_define_const(grpc_rb_mCallOps, "RECV_STATUS_ON_CLIENT",
+ UINT2NUM(GRPC_OP_RECV_STATUS_ON_CLIENT));
+ rb_define_const(grpc_rb_mCallOps, "RECV_CLOSE_ON_SERVER",
+ UINT2NUM(GRPC_OP_RECV_CLOSE_ON_SERVER));
+}
+
void Init_grpc_call() {
/* CallError inherits from Exception to signal that it is non-recoverable */
- rb_eCallError =
- rb_define_class_under(rb_mGrpcCore, "CallError", rb_eException);
- rb_cCall = rb_define_class_under(rb_mGrpcCore, "Call", rb_cObject);
+ grpc_rb_eCallError =
+ rb_define_class_under(grpc_rb_mGrpcCore, "CallError", rb_eException);
+ grpc_rb_eOutOfTime =
+ rb_define_class_under(grpc_rb_mGrpcCore, "OutOfTime", rb_eException);
+ grpc_rb_cCall = rb_define_class_under(grpc_rb_mGrpcCore, "Call", rb_cObject);
+ grpc_rb_cMdAry =
+ rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray", rb_cObject);
/* Prevent allocation or inialization of the Call class */
- rb_define_alloc_func(rb_cCall, grpc_rb_cannot_alloc);
- rb_define_method(rb_cCall, "initialize", grpc_rb_cannot_init, 0);
- rb_define_method(rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy, 1);
+ rb_define_alloc_func(grpc_rb_cCall, grpc_rb_cannot_alloc);
+ rb_define_method(grpc_rb_cCall, "initialize", grpc_rb_cannot_init, 0);
+ rb_define_method(grpc_rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy,
+ 1);
/* Add ruby analogues of the Call methods. */
- rb_define_method(rb_cCall, "server_accept", grpc_rb_call_server_accept, 2);
- rb_define_method(rb_cCall, "server_end_initial_metadata",
- grpc_rb_call_server_end_initial_metadata, -1);
- rb_define_method(rb_cCall, "add_metadata", grpc_rb_call_add_metadata, -1);
- rb_define_method(rb_cCall, "cancel", grpc_rb_call_cancel, 0);
- rb_define_method(rb_cCall, "invoke", grpc_rb_call_invoke, -1);
- rb_define_method(rb_cCall, "start_read", grpc_rb_call_start_read, 1);
- rb_define_method(rb_cCall, "start_write", grpc_rb_call_start_write, -1);
- rb_define_method(rb_cCall, "start_write_status",
- grpc_rb_call_start_write_status, 3);
- rb_define_method(rb_cCall, "writes_done", grpc_rb_call_writes_done, 1);
- rb_define_method(rb_cCall, "status", grpc_rb_call_get_status, 0);
- rb_define_method(rb_cCall, "status=", grpc_rb_call_set_status, 1);
- rb_define_method(rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
- rb_define_method(rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1);
+ rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4);
+ rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
+ rb_define_method(grpc_rb_cCall, "status", grpc_rb_call_get_status, 0);
+ rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
+ rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
+ rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1);
/* Ids used to support call attributes */
id_metadata = rb_intern("metadata");
@@ -537,18 +729,33 @@ void Init_grpc_call() {
id_flags = rb_intern("__flags");
id_input_md = rb_intern("__input_md");
+ /* Ids used in constructing the batch result. */
+ sym_send_message = ID2SYM(rb_intern("send_message"));
+ sym_send_metadata = ID2SYM(rb_intern("send_metadata"));
+ sym_send_close = ID2SYM(rb_intern("send_close"));
+ sym_send_status = ID2SYM(rb_intern("send_status"));
+ sym_message = ID2SYM(rb_intern("message"));
+ sym_status = ID2SYM(rb_intern("status"));
+ sym_cancelled = ID2SYM(rb_intern("cancelled"));
+
+ /* The Struct used to return the run_batch result. */
+ grpc_rb_sBatchResult = rb_struct_define(
+ "BatchResult", "send_message", "send_metadata", "send_close",
+ "send_status", "message", "metadata", "status", "cancelled", NULL);
+
/* The hash for reference counting calls, to ensure they can't be destroyed
* more than once */
hash_all_calls = rb_hash_new();
- rb_define_const(rb_cCall, "INTERNAL_ALL_CALLs", hash_all_calls);
+ rb_define_const(grpc_rb_cCall, "INTERNAL_ALL_CALLs", hash_all_calls);
Init_grpc_error_codes();
+ Init_grpc_op_codes();
}
/* Gets the call from the ruby object */
grpc_call *grpc_rb_get_wrapped_call(VALUE v) {
grpc_call *c = NULL;
- Data_Get_Struct(v, grpc_call, c);
+ TypedData_Get_Struct(v, grpc_call, &grpc_call_data_type, c);
return c;
}
@@ -565,5 +772,5 @@ VALUE grpc_rb_wrap_call(grpc_call *c) {
rb_hash_aset(hash_all_calls, OFFT2NUM((VALUE)c),
UINT2NUM(NUM2UINT(obj) + 1));
}
- return Data_Wrap_Struct(rb_cCall, GC_NOT_MARKED, grpc_rb_call_destroy, c);
+ return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, c);
}
diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h
index bb51759a46..003ce0429e 100644
--- a/src/ruby/ext/grpc/rb_call.h
+++ b/src/ruby/ext/grpc/rb_call.h
@@ -46,12 +46,12 @@ VALUE grpc_rb_wrap_call(grpc_call* c);
/* Provides the details of an call error */
const char* grpc_call_error_detail_of(grpc_call_error err);
-/* rb_cCall is the Call class whose instances proxy grpc_call. */
-extern VALUE rb_cCall;
+/* Converts a metadata array to a hash. */
+VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary);
-/* rb_cCallError is the ruby class of the exception thrown during call
+/* grpc_rb_eCallError is the ruby class of the exception thrown during call
operations. */
-extern VALUE rb_eCallError;
+extern VALUE grpc_rb_eCallError;
/* Initializes the Call class. */
void Init_grpc_call();
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 2a48f46ce2..214675af92 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -49,12 +49,20 @@
static ID id_channel;
/* id_target is the name of the hidden ivar that preserves a reference to the
- * target string used to create the call, preserved so that is does not get
+ * target string used to create the call, preserved so that it does not get
* GCed before the channel */
static ID id_target;
+/* id_cqueue is the name of the hidden ivar that preserves a reference to the
+ * completion queue used to create the call, preserved so that it does not get
+ * GCed before the channel */
+static ID id_cqueue;
+
+/* grpc_rb_cChannel is the ruby class that proxies grpc_channel. */
+static VALUE grpc_rb_cChannel = Qnil;
+
/* Used during the conversion of a hash to channel args during channel setup */
-static VALUE rb_cChannelArgs;
+static VALUE grpc_rb_cChannelArgs;
/* grpc_rb_channel wraps a grpc_channel. It provides a peer ruby object,
* 'mark' to minimize copying when a channel is created from ruby. */
@@ -97,13 +105,19 @@ static void grpc_rb_channel_mark(void *p) {
}
}
+static rb_data_type_t grpc_channel_data_type = {
+ "grpc_channel",
+ {grpc_rb_channel_mark, grpc_rb_channel_free, GRPC_RB_MEMSIZE_UNAVAILABLE},
+ NULL, NULL,
+ RUBY_TYPED_FREE_IMMEDIATELY
+};
+
/* Allocates grpc_rb_channel instances. */
static VALUE grpc_rb_channel_alloc(VALUE cls) {
grpc_rb_channel *wrapper = ALLOC(grpc_rb_channel);
wrapper->wrapped = NULL;
wrapper->mark = Qnil;
- return Data_Wrap_Struct(cls, grpc_rb_channel_mark, grpc_rb_channel_free,
- wrapper);
+ return TypedData_Wrap_Struct(cls, &grpc_channel_data_type, wrapper);
}
/*
@@ -127,7 +141,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
/* "21" == 2 mandatory args, 1 (credentials) is optional */
rb_scan_args(argc, argv, "21", &target, &channel_args, &credentials);
- Data_Get_Struct(self, grpc_rb_channel, wrapper);
+ TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
target_chars = StringValueCStr(target);
grpc_rb_hash_convert_to_channel_args(channel_args, &args);
if (credentials == Qnil) {
@@ -142,6 +156,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
if (ch == NULL) {
rb_raise(rb_eRuntimeError, "could not create an rpc channel to target:%s",
target_chars);
+ return Qnil;
}
rb_ivar_set(self, id_target, target);
wrapper->wrapped = ch;
@@ -163,11 +178,12 @@ static VALUE grpc_rb_channel_init_copy(VALUE copy, VALUE orig) {
/* Raise an error if orig is not a channel object or a subclass. */
if (TYPE(orig) != T_DATA ||
RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_channel_free) {
- rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(rb_cChannel));
+ rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(grpc_rb_cChannel));
+ return Qnil;
}
- Data_Get_Struct(orig, grpc_rb_channel, orig_ch);
- Data_Get_Struct(copy, grpc_rb_channel, copy_ch);
+ TypedData_Get_Struct(orig, grpc_rb_channel, &grpc_channel_data_type, orig_ch);
+ TypedData_Get_Struct(copy, grpc_rb_channel, &grpc_channel_data_type, copy_ch);
/* use ruby's MEMCPY to make a byte-for-byte copy of the channel wrapper
* object. */
@@ -177,34 +193,42 @@ static VALUE grpc_rb_channel_init_copy(VALUE copy, VALUE orig) {
/* Create a call given a grpc_channel, in order to call method. The request
is not sent until grpc_call_invoke is called. */
-static VALUE grpc_rb_channel_create_call(VALUE self, VALUE method, VALUE host,
- VALUE deadline) {
+static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method,
+ VALUE host, VALUE deadline) {
VALUE res = Qnil;
grpc_rb_channel *wrapper = NULL;
- grpc_channel *ch = NULL;
grpc_call *call = NULL;
+ grpc_channel *ch = NULL;
+ grpc_completion_queue *cq = NULL;
char *method_chars = StringValueCStr(method);
char *host_chars = StringValueCStr(host);
- Data_Get_Struct(self, grpc_rb_channel, wrapper);
+ cq = grpc_rb_get_wrapped_completion_queue(cqueue);
+ TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
+ return Qnil;
}
call =
- grpc_channel_create_call_old(ch, method_chars, host_chars,
- grpc_rb_time_timeval(deadline,
- /* absolute time */ 0));
+ grpc_channel_create_call(ch, cq, method_chars, host_chars,
+ grpc_rb_time_timeval(deadline,
+ /* absolute time */ 0));
if (call == NULL) {
rb_raise(rb_eRuntimeError, "cannot create call with method %s",
method_chars);
+ return Qnil;
}
res = grpc_rb_wrap_call(call);
- /* Make this channel an instance attribute of the call so that is is not GCed
+ /* Make this channel an instance attribute of the call so that it is not GCed
* before the call. */
rb_ivar_set(res, id_channel, self);
+
+ /* Make the completion queue an instance attribute of the call so that it is
+ * not GCed before the call. */
+ rb_ivar_set(res, id_cqueue, cqueue);
return res;
}
@@ -213,7 +237,7 @@ static VALUE grpc_rb_channel_destroy(VALUE self) {
grpc_rb_channel *wrapper = NULL;
grpc_channel *ch = NULL;
- Data_Get_Struct(self, grpc_rb_channel, wrapper);
+ TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch != NULL) {
grpc_channel_destroy(ch);
@@ -224,41 +248,41 @@ static VALUE grpc_rb_channel_destroy(VALUE self) {
return Qnil;
}
-/* rb_cChannel is the ruby class that proxies grpc_channel. */
-VALUE rb_cChannel = Qnil;
-
void Init_grpc_channel() {
- rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject);
- rb_cChannel = rb_define_class_under(rb_mGrpcCore, "Channel", rb_cObject);
+ grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject);
+ grpc_rb_cChannel =
+ rb_define_class_under(grpc_rb_mGrpcCore, "Channel", rb_cObject);
/* Allocates an object managed by the ruby runtime */
- rb_define_alloc_func(rb_cChannel, grpc_rb_channel_alloc);
+ rb_define_alloc_func(grpc_rb_cChannel, grpc_rb_channel_alloc);
/* Provides a ruby constructor and support for dup/clone. */
- rb_define_method(rb_cChannel, "initialize", grpc_rb_channel_init, -1);
- rb_define_method(rb_cChannel, "initialize_copy", grpc_rb_channel_init_copy,
- 1);
+ rb_define_method(grpc_rb_cChannel, "initialize", grpc_rb_channel_init, -1);
+ rb_define_method(grpc_rb_cChannel, "initialize_copy",
+ grpc_rb_channel_init_copy, 1);
/* Add ruby analogues of the Channel methods. */
- rb_define_method(rb_cChannel, "create_call", grpc_rb_channel_create_call, 3);
- rb_define_method(rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
- rb_define_alias(rb_cChannel, "close", "destroy");
+ rb_define_method(grpc_rb_cChannel, "create_call",
+ grpc_rb_channel_create_call, 4);
+ rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
+ rb_define_alias(grpc_rb_cChannel, "close", "destroy");
id_channel = rb_intern("__channel");
+ id_cqueue = rb_intern("__cqueue");
id_target = rb_intern("__target");
- rb_define_const(rb_cChannel, "SSL_TARGET",
+ rb_define_const(grpc_rb_cChannel, "SSL_TARGET",
ID2SYM(rb_intern(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)));
- rb_define_const(rb_cChannel, "ENABLE_CENSUS",
+ rb_define_const(grpc_rb_cChannel, "ENABLE_CENSUS",
ID2SYM(rb_intern(GRPC_ARG_ENABLE_CENSUS)));
- rb_define_const(rb_cChannel, "MAX_CONCURRENT_STREAMS",
+ rb_define_const(grpc_rb_cChannel, "MAX_CONCURRENT_STREAMS",
ID2SYM(rb_intern(GRPC_ARG_MAX_CONCURRENT_STREAMS)));
- rb_define_const(rb_cChannel, "MAX_MESSAGE_LENGTH",
+ rb_define_const(grpc_rb_cChannel, "MAX_MESSAGE_LENGTH",
ID2SYM(rb_intern(GRPC_ARG_MAX_MESSAGE_LENGTH)));
}
/* Gets the wrapped channel from the ruby wrapper */
grpc_channel *grpc_rb_get_wrapped_channel(VALUE v) {
grpc_rb_channel *wrapper = NULL;
- Data_Get_Struct(v, grpc_rb_channel, wrapper);
+ TypedData_Get_Struct(v, grpc_rb_channel, &grpc_channel_data_type, wrapper);
return wrapper->wrapped;
}
diff --git a/src/ruby/ext/grpc/rb_channel.h b/src/ruby/ext/grpc/rb_channel.h
index a582869cda..6e3c087689 100644
--- a/src/ruby/ext/grpc/rb_channel.h
+++ b/src/ruby/ext/grpc/rb_channel.h
@@ -37,9 +37,6 @@
#include <ruby.h>
#include <grpc/grpc.h>
-/* rb_cChannel is the Channel class whose instances proxy grpc_channel. */
-extern VALUE rb_cChannel;
-
/* Initializes the Channel class. */
void Init_grpc_channel();
diff --git a/src/ruby/ext/grpc/rb_channel_args.c b/src/ruby/ext/grpc/rb_channel_args.c
index 532ee5e785..acd545f5d2 100644
--- a/src/ruby/ext/grpc/rb_channel_args.c
+++ b/src/ruby/ext/grpc/rb_channel_args.c
@@ -38,6 +38,13 @@
#include "rb_grpc.h"
+static rb_data_type_t grpc_rb_channel_args_data_type = {
+ "grpc_channel_args",
+ {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, GRPC_RB_MEMSIZE_UNAVAILABLE},
+ NULL, NULL,
+ RUBY_TYPED_FREE_IMMEDIATELY
+};
+
/* A callback the processes the hash key values in channel_args hash */
static int grpc_rb_channel_create_in_process_add_args_hash_cb(VALUE key,
VALUE val,
@@ -60,7 +67,8 @@ static int grpc_rb_channel_create_in_process_add_args_hash_cb(VALUE key,
return ST_STOP;
}
- Data_Get_Struct(args_obj, grpc_channel_args, args);
+ TypedData_Get_Struct(args_obj, grpc_channel_args,
+ &grpc_rb_channel_args_data_type, args);
if (args->num_args <= 0) {
rb_raise(rb_eRuntimeError, "hash_cb bug: num_args is %lu for key:%s",
args->num_args, StringValueCStr(key));
@@ -109,7 +117,7 @@ typedef struct channel_convert_params {
static VALUE grpc_rb_hash_convert_to_channel_args0(VALUE as_value) {
ID id_size = rb_intern("size");
- VALUE rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject);
+ VALUE grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject);
channel_convert_params* params = (channel_convert_params*)as_value;
size_t num_args = 0;
@@ -126,8 +134,9 @@ static VALUE grpc_rb_hash_convert_to_channel_args0(VALUE as_value) {
MEMZERO(params->dst->args, grpc_arg, num_args);
rb_hash_foreach(params->src_hash,
grpc_rb_channel_create_in_process_add_args_hash_cb,
- Data_Wrap_Struct(rb_cChannelArgs, GC_NOT_MARKED,
- GC_DONT_FREE, params->dst));
+ TypedData_Wrap_Struct(grpc_rb_cChannelArgs,
+ &grpc_rb_channel_args_data_type,
+ params->dst));
/* reset num_args as grpc_rb_channel_create_in_process_add_args_hash_cb
* decrements it during has processing */
params->dst->num_args = num_args;
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index 3fdbdd837a..3cf6c313ee 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -33,12 +33,16 @@
#include "rb_completion_queue.h"
-#include <ruby.h>
+#include <ruby/ruby.h>
+#include <ruby/thread.h>
#include <grpc/grpc.h>
#include <grpc/support/time.h>
#include "rb_grpc.h"
-#include "rb_event.h"
+
+/* grpc_rb_cCompletionQueue is the ruby class that proxies
+ * grpc_completion_queue. */
+static VALUE grpc_rb_cCompletionQueue = Qnil;
/* Used to allow grpc_completion_queue_next call to release the GIL */
typedef struct next_call_stack {
@@ -49,14 +53,16 @@ typedef struct next_call_stack {
} next_call_stack;
/* Calls grpc_completion_queue_next without holding the ruby GIL */
-static void *grpc_rb_completion_queue_next_no_gil(next_call_stack *next_call) {
+static void *grpc_rb_completion_queue_next_no_gil(void *param) {
+ next_call_stack *const next_call = (next_call_stack*)param;
next_call->event =
grpc_completion_queue_next(next_call->cq, next_call->timeout);
return NULL;
}
/* Calls grpc_completion_queue_pluck without holding the ruby GIL */
-static void *grpc_rb_completion_queue_pluck_no_gil(next_call_stack *next_call) {
+static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
+ next_call_stack *const next_call = (next_call_stack*)param;
next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag,
next_call->timeout);
return NULL;
@@ -113,21 +119,31 @@ static void grpc_rb_completion_queue_destroy(void *p) {
grpc_completion_queue_destroy(cq);
}
+static rb_data_type_t grpc_rb_completion_queue_data_type = {
+ "grpc_completion_queue",
+ {GRPC_RB_GC_NOT_MARKED, grpc_rb_completion_queue_destroy,
+ GRPC_RB_MEMSIZE_UNAVAILABLE},
+ NULL, NULL,
+ /* cannot immediately free because grpc_rb_completion_queue_shutdown_drain
+ * calls rb_thread_call_without_gvl. */
+ 0
+};
+
/* Allocates a completion queue. */
static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
grpc_completion_queue *cq = grpc_completion_queue_create();
if (cq == NULL) {
rb_raise(rb_eArgError, "could not create a completion queue: not sure why");
}
- return Data_Wrap_Struct(cls, GC_NOT_MARKED, grpc_rb_completion_queue_destroy,
- cq);
+ return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq);
}
/* Blocks until the next event is available, and returns the event. */
static VALUE grpc_rb_completion_queue_next(VALUE self, VALUE timeout) {
next_call_stack next_call;
MEMZERO(&next_call, next_call_stack, 1);
- Data_Get_Struct(self, grpc_completion_queue, next_call.cq);
+ TypedData_Get_Struct(self, grpc_completion_queue,
+ &grpc_rb_completion_queue_data_type, next_call.cq);
next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
next_call.event = NULL;
rb_thread_call_without_gvl(grpc_rb_completion_queue_next_no_gil,
@@ -140,46 +156,57 @@ static VALUE grpc_rb_completion_queue_next(VALUE self, VALUE timeout) {
/* Blocks until the next event for given tag is available, and returns the
* event. */
-static VALUE grpc_rb_completion_queue_pluck(VALUE self, VALUE tag,
- VALUE timeout) {
+VALUE grpc_rb_completion_queue_pluck(VALUE self, VALUE tag,
+ VALUE timeout) {
+ grpc_event *ev = grpc_rb_completion_queue_pluck_event(self, tag, timeout);
+ if (ev == NULL) {
+ return Qnil;
+ }
+ return grpc_rb_new_event(ev);
+}
+
+/* Blocks until the next event for given tag is available, and returns the
+ * event. */
+grpc_event* grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
+ VALUE timeout) {
next_call_stack next_call;
MEMZERO(&next_call, next_call_stack, 1);
- Data_Get_Struct(self, grpc_completion_queue, next_call.cq);
+ TypedData_Get_Struct(self, grpc_completion_queue,
+ &grpc_rb_completion_queue_data_type, next_call.cq);
next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
next_call.tag = ROBJECT(tag);
next_call.event = NULL;
rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
(void *)&next_call, NULL, NULL);
if (next_call.event == NULL) {
- return Qnil;
+ return NULL;
}
- return grpc_rb_new_event(next_call.event);
+ return next_call.event;
}
-/* rb_cCompletionQueue is the ruby class that proxies grpc_completion_queue. */
-VALUE rb_cCompletionQueue = Qnil;
-
void Init_grpc_completion_queue() {
- rb_cCompletionQueue =
- rb_define_class_under(rb_mGrpcCore, "CompletionQueue", rb_cObject);
+ grpc_rb_cCompletionQueue =
+ rb_define_class_under(grpc_rb_mGrpcCore, "CompletionQueue", rb_cObject);
/* constructor: uses an alloc func without an initializer. Using a simple
alloc func works here as the grpc header does not specify any args for
this func, so no separate initialization step is necessary. */
- rb_define_alloc_func(rb_cCompletionQueue, grpc_rb_completion_queue_alloc);
+ rb_define_alloc_func(grpc_rb_cCompletionQueue,
+ grpc_rb_completion_queue_alloc);
/* Add the next method that waits for the next event. */
- rb_define_method(rb_cCompletionQueue, "next", grpc_rb_completion_queue_next,
- 1);
+ rb_define_method(grpc_rb_cCompletionQueue, "next",
+ grpc_rb_completion_queue_next, 1);
/* Add the pluck method that waits for the next event of given tag */
- rb_define_method(rb_cCompletionQueue, "pluck", grpc_rb_completion_queue_pluck,
- 2);
+ rb_define_method(grpc_rb_cCompletionQueue, "pluck",
+ grpc_rb_completion_queue_pluck, 2);
}
/* Gets the wrapped completion queue from the ruby wrapper */
grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v) {
grpc_completion_queue *cq = NULL;
- Data_Get_Struct(v, grpc_completion_queue, cq);
+ TypedData_Get_Struct(v, grpc_completion_queue,
+ &grpc_rb_completion_queue_data_type, cq);
return cq;
}
diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h
index 38025ea2d2..4d0f49ac47 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.h
+++ b/src/ruby/ext/grpc/rb_completion_queue.h
@@ -40,9 +40,13 @@
/* Gets the wrapped completion queue from the ruby wrapper */
grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v);
-/* rb_cCompletionQueue is the CompletionQueue class whose instances proxy
- grpc_completion_queue. */
-extern VALUE rb_cCompletionQueue;
+/**
+ * Makes the implementation of CompletionQueue#pluck available in other files
+ *
+ * This avoids having code that holds the GIL repeated at multiple sites.
+ */
+grpc_event* grpc_rb_completion_queue_pluck_event(VALUE cqueue, VALUE tag,
+ VALUE timeout);
/* Initializes the CompletionQueue class. */
void Init_grpc_completion_queue();
diff --git a/src/ruby/ext/grpc/rb_credentials.c b/src/ruby/ext/grpc/rb_credentials.c
index 4ee5d6b51c..1ec88914e4 100644
--- a/src/ruby/ext/grpc/rb_credentials.c
+++ b/src/ruby/ext/grpc/rb_credentials.c
@@ -40,6 +40,9 @@
#include "rb_grpc.h"
+/* grpc_rb_cCredentials is the ruby class that proxies grpc_credentials. */
+static VALUE grpc_rb_cCredentials = Qnil;
+
/* grpc_rb_credentials wraps a grpc_credentials. It provides a
* peer ruby object, 'mark' to minimize copying when a credential is
* created from ruby. */
@@ -83,14 +86,21 @@ static void grpc_rb_credentials_mark(void *p) {
}
}
+static rb_data_type_t grpc_rb_credentials_data_type = {
+ "grpc_credentials",
+ {grpc_rb_credentials_mark, grpc_rb_credentials_free,
+ GRPC_RB_MEMSIZE_UNAVAILABLE},
+ NULL,
+ NULL,
+ RUBY_TYPED_FREE_IMMEDIATELY};
+
/* Allocates Credential instances.
Provides safe initial defaults for the instance fields. */
static VALUE grpc_rb_credentials_alloc(VALUE cls) {
grpc_rb_credentials *wrapper = ALLOC(grpc_rb_credentials);
wrapper->wrapped = NULL;
wrapper->mark = Qnil;
- return Data_Wrap_Struct(cls, grpc_rb_credentials_mark,
- grpc_rb_credentials_free, wrapper);
+ return TypedData_Wrap_Struct(cls, &grpc_rb_credentials_data_type, wrapper);
}
/* Clones Credentials instances.
@@ -107,11 +117,13 @@ static VALUE grpc_rb_credentials_init_copy(VALUE copy, VALUE orig) {
/* Raise an error if orig is not a credentials object or a subclass. */
if (TYPE(orig) != T_DATA ||
RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_credentials_free) {
- rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(rb_cCredentials));
+ rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(grpc_rb_cCredentials));
}
- Data_Get_Struct(orig, grpc_rb_credentials, orig_cred);
- Data_Get_Struct(copy, grpc_rb_credentials, copy_cred);
+ TypedData_Get_Struct(orig, grpc_rb_credentials,
+ &grpc_rb_credentials_data_type, orig_cred);
+ TypedData_Get_Struct(copy, grpc_rb_credentials,
+ &grpc_rb_credentials_data_type, copy_cred);
/* use ruby's MEMCPY to make a byte-for-byte copy of the credentials
* wrapper object. */
@@ -133,8 +145,7 @@ static VALUE grpc_rb_default_credentials_create(VALUE cls) {
}
wrapper->mark = Qnil;
- return Data_Wrap_Struct(cls, grpc_rb_credentials_mark,
- grpc_rb_credentials_free, wrapper);
+ return TypedData_Wrap_Struct(cls, &grpc_rb_credentials_data_type, wrapper);
}
/*
@@ -151,8 +162,7 @@ static VALUE grpc_rb_compute_engine_credentials_create(VALUE cls) {
}
wrapper->mark = Qnil;
- return Data_Wrap_Struct(cls, grpc_rb_credentials_mark,
- grpc_rb_credentials_free, wrapper);
+ return TypedData_Wrap_Struct(cls, &grpc_rb_credentials_data_type, wrapper);
}
/*
@@ -166,8 +176,10 @@ static VALUE grpc_rb_composite_credentials_create(VALUE self, VALUE other) {
grpc_rb_credentials *other_wrapper = NULL;
grpc_rb_credentials *wrapper = NULL;
- Data_Get_Struct(self, grpc_rb_credentials, self_wrapper);
- Data_Get_Struct(other, grpc_rb_credentials, other_wrapper);
+ TypedData_Get_Struct(self, grpc_rb_credentials,
+ &grpc_rb_credentials_data_type, self_wrapper);
+ TypedData_Get_Struct(other, grpc_rb_credentials,
+ &grpc_rb_credentials_data_type, other_wrapper);
wrapper = ALLOC(grpc_rb_credentials);
wrapper->wrapped = grpc_composite_credentials_create(self_wrapper->wrapped,
other_wrapper->wrapped);
@@ -178,8 +190,8 @@ static VALUE grpc_rb_composite_credentials_create(VALUE self, VALUE other) {
}
wrapper->mark = Qnil;
- return Data_Wrap_Struct(rb_cCredentials, grpc_rb_credentials_mark,
- grpc_rb_credentials_free, wrapper);
+ return TypedData_Wrap_Struct(grpc_rb_cCredentials,
+ &grpc_rb_credentials_data_type, wrapper);
}
/* The attribute used on the mark object to hold the pem_root_certs. */
@@ -214,7 +226,8 @@ static VALUE grpc_rb_credentials_init(int argc, VALUE *argv, VALUE self) {
rb_scan_args(argc, argv, "12", &pem_root_certs, &pem_private_key,
&pem_cert_chain);
- Data_Get_Struct(self, grpc_rb_credentials, wrapper);
+ TypedData_Get_Struct(self, grpc_rb_credentials,
+ &grpc_rb_credentials_data_type, wrapper);
if (pem_root_certs == Qnil) {
rb_raise(rb_eRuntimeError,
"could not create a credential: nil pem_root_certs");
@@ -225,8 +238,8 @@ static VALUE grpc_rb_credentials_init(int argc, VALUE *argv, VALUE self) {
} else {
key_cert_pair.private_key = RSTRING_PTR(pem_private_key);
key_cert_pair.cert_chain = RSTRING_PTR(pem_cert_chain);
- creds = grpc_ssl_credentials_create(
- RSTRING_PTR(pem_root_certs), &key_cert_pair);
+ creds = grpc_ssl_credentials_create(RSTRING_PTR(pem_root_certs),
+ &key_cert_pair);
}
if (creds == NULL) {
rb_raise(rb_eRuntimeError, "could not create a credentials, not sure why");
@@ -242,30 +255,28 @@ static VALUE grpc_rb_credentials_init(int argc, VALUE *argv, VALUE self) {
return self;
}
-/* rb_cCredentials is the ruby class that proxies grpc_credentials. */
-VALUE rb_cCredentials = Qnil;
-
void Init_grpc_credentials() {
- rb_cCredentials =
- rb_define_class_under(rb_mGrpcCore, "Credentials", rb_cObject);
+ grpc_rb_cCredentials =
+ rb_define_class_under(grpc_rb_mGrpcCore, "Credentials", rb_cObject);
/* Allocates an object managed by the ruby runtime */
- rb_define_alloc_func(rb_cCredentials, grpc_rb_credentials_alloc);
+ rb_define_alloc_func(grpc_rb_cCredentials, grpc_rb_credentials_alloc);
/* Provides a ruby constructor and support for dup/clone. */
- rb_define_method(rb_cCredentials, "initialize", grpc_rb_credentials_init, -1);
- rb_define_method(rb_cCredentials, "initialize_copy",
+ rb_define_method(grpc_rb_cCredentials, "initialize", grpc_rb_credentials_init,
+ -1);
+ rb_define_method(grpc_rb_cCredentials, "initialize_copy",
grpc_rb_credentials_init_copy, 1);
/* Provide static funcs that create new special instances. */
- rb_define_singleton_method(rb_cCredentials, "default",
+ rb_define_singleton_method(grpc_rb_cCredentials, "default",
grpc_rb_default_credentials_create, 0);
- rb_define_singleton_method(rb_cCredentials, "compute_engine",
+ rb_define_singleton_method(grpc_rb_cCredentials, "compute_engine",
grpc_rb_compute_engine_credentials_create, 0);
/* Provide other methods. */
- rb_define_method(rb_cCredentials, "compose",
+ rb_define_method(grpc_rb_cCredentials, "compose",
grpc_rb_composite_credentials_create, 1);
id_pem_cert_chain = rb_intern("__pem_cert_chain");
@@ -276,6 +287,7 @@ void Init_grpc_credentials() {
/* Gets the wrapped grpc_credentials from the ruby wrapper */
grpc_credentials *grpc_rb_get_wrapped_credentials(VALUE v) {
grpc_rb_credentials *wrapper = NULL;
- Data_Get_Struct(v, grpc_rb_credentials, wrapper);
+ TypedData_Get_Struct(v, grpc_rb_credentials, &grpc_rb_credentials_data_type,
+ wrapper);
return wrapper->wrapped;
}
diff --git a/src/ruby/ext/grpc/rb_credentials.h b/src/ruby/ext/grpc/rb_credentials.h
index 3b24397173..e7c43c9c78 100644
--- a/src/ruby/ext/grpc/rb_credentials.h
+++ b/src/ruby/ext/grpc/rb_credentials.h
@@ -37,10 +37,6 @@
#include <ruby.h>
#include <grpc/grpc_security.h>
-/* rb_cCredentials is the ruby class whose instances proxy
- grpc_credentials. */
-extern VALUE rb_cCredentials;
-
/* Initializes the ruby Credentials class. */
void Init_grpc_credentials();
diff --git a/src/ruby/ext/grpc/rb_event.c b/src/ruby/ext/grpc/rb_event.c
deleted file mode 100644
index 2e64af4c84..0000000000
--- a/src/ruby/ext/grpc/rb_event.c
+++ /dev/null
@@ -1,361 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "rb_event.h"
-
-#include <ruby.h>
-
-#include <grpc/grpc.h>
-#include "rb_grpc.h"
-#include "rb_byte_buffer.h"
-#include "rb_call.h"
-#include "rb_metadata.h"
-
-/* grpc_rb_event wraps a grpc_event. It provides a peer ruby object,
- * 'mark' to minimize copying when an event is created from ruby. */
-typedef struct grpc_rb_event {
- /* Holder of ruby objects involved in constructing the channel */
- VALUE mark;
- /* The actual event */
- grpc_event *wrapped;
-} grpc_rb_event;
-
-/* rb_mCompletionType is a ruby module that holds the completion type values */
-VALUE rb_mCompletionType = Qnil;
-
-/* Destroys Event instances. */
-static void grpc_rb_event_free(void *p) {
- grpc_rb_event *ev = NULL;
- if (p == NULL) {
- return;
- };
- ev = (grpc_rb_event *)p;
-
- /* Deletes the wrapped object if the mark object is Qnil, which indicates
- * that no other object is the actual owner. */
- if (ev->wrapped != NULL && ev->mark == Qnil) {
- grpc_event_finish(ev->wrapped);
- rb_warning("event gc: destroyed the c event");
- } else {
- rb_warning("event gc: did not destroy the c event");
- }
-
- xfree(p);
-}
-
-/* Protects the mark object from GC */
-static void grpc_rb_event_mark(void *p) {
- grpc_rb_event *event = NULL;
- if (p == NULL) {
- return;
- }
- event = (grpc_rb_event *)p;
- if (event->mark != Qnil) {
- rb_gc_mark(event->mark);
- }
-}
-
-static VALUE grpc_rb_event_result(VALUE self);
-
-/* Obtains the type of an event. */
-static VALUE grpc_rb_event_type(VALUE self) {
- grpc_event *event = NULL;
- grpc_rb_event *wrapper = NULL;
- Data_Get_Struct(self, grpc_rb_event, wrapper);
- if (wrapper->wrapped == NULL) {
- rb_raise(rb_eRuntimeError, "finished!");
- return Qnil;
- }
-
- event = wrapper->wrapped;
- switch (event->type) {
- case GRPC_QUEUE_SHUTDOWN:
- return rb_const_get(rb_mCompletionType, rb_intern("QUEUE_SHUTDOWN"));
-
- case GRPC_READ:
- return rb_const_get(rb_mCompletionType, rb_intern("READ"));
-
- case GRPC_WRITE_ACCEPTED:
- grpc_rb_event_result(self); /* validates the result */
- return rb_const_get(rb_mCompletionType, rb_intern("WRITE_ACCEPTED"));
-
- case GRPC_FINISH_ACCEPTED:
- grpc_rb_event_result(self); /* validates the result */
- return rb_const_get(rb_mCompletionType, rb_intern("FINISH_ACCEPTED"));
-
- case GRPC_CLIENT_METADATA_READ:
- return rb_const_get(rb_mCompletionType,
- rb_intern("CLIENT_METADATA_READ"));
-
- case GRPC_FINISHED:
- return rb_const_get(rb_mCompletionType, rb_intern("FINISHED"));
-
- case GRPC_SERVER_RPC_NEW:
- return rb_const_get(rb_mCompletionType, rb_intern("SERVER_RPC_NEW"));
-
- default:
- rb_raise(rb_eRuntimeError, "unrecognized event code for an rpc event:%d",
- event->type);
- }
- return Qnil; /* should not be reached */
-}
-
-/* Obtains the tag associated with an event. */
-static VALUE grpc_rb_event_tag(VALUE self) {
- grpc_event *event = NULL;
- grpc_rb_event *wrapper = NULL;
- Data_Get_Struct(self, grpc_rb_event, wrapper);
- if (wrapper->wrapped == NULL) {
- rb_raise(rb_eRuntimeError, "finished!");
- return Qnil;
- }
-
- event = wrapper->wrapped;
- if (event->tag == NULL) {
- return Qnil;
- }
- return (VALUE)event->tag;
-}
-
-/* Obtains the call associated with an event. */
-static VALUE grpc_rb_event_call(VALUE self) {
- grpc_event *event = NULL;
- grpc_rb_event *wrapper = NULL;
- Data_Get_Struct(self, grpc_rb_event, wrapper);
- if (wrapper->wrapped == NULL) {
- rb_raise(rb_eRuntimeError, "finished!");
- return Qnil;
- }
-
- event = wrapper->wrapped;
- if (event->call != NULL) {
- return grpc_rb_wrap_call(event->call);
- }
- return Qnil;
-}
-
-/* Obtains the metadata associated with an event. */
-static VALUE grpc_rb_event_metadata(VALUE self) {
- grpc_event *event = NULL;
- grpc_rb_event *wrapper = NULL;
- grpc_metadata *metadata = NULL;
- VALUE key = Qnil;
- VALUE new_ary = Qnil;
- VALUE result = Qnil;
- VALUE value = Qnil;
- size_t count = 0;
- size_t i = 0;
- Data_Get_Struct(self, grpc_rb_event, wrapper);
- if (wrapper->wrapped == NULL) {
- rb_raise(rb_eRuntimeError, "finished!");
- return Qnil;
- }
-
- /* Figure out which metadata to read. */
- event = wrapper->wrapped;
- switch (event->type) {
- case GRPC_CLIENT_METADATA_READ:
- count = event->data.client_metadata_read.count;
- metadata = event->data.client_metadata_read.elements;
- break;
-
- case GRPC_FINISHED:
- count = event->data.finished.metadata_count;
- metadata = event->data.finished.metadata_elements;
- break;
-
- case GRPC_SERVER_RPC_NEW:
- count = event->data.server_rpc_new.metadata_count;
- metadata = event->data.server_rpc_new.metadata_elements;
- break;
-
- default:
- rb_raise(rb_eRuntimeError,
- "bug: bad event type metadata. got %d; want %d|%d:%d",
- event->type, GRPC_CLIENT_METADATA_READ, GRPC_FINISHED,
- GRPC_SERVER_RPC_NEW);
- return Qnil;
- }
-
- result = rb_hash_new();
- for (i = 0; i < count; i++) {
- key = rb_str_new2(metadata[i].key);
- value = rb_hash_aref(result, key);
- if (value == Qnil) {
- value = rb_str_new(metadata[i].value, metadata[i].value_length);
- rb_hash_aset(result, key, value);
- } else if (TYPE(value) == T_ARRAY) {
- /* Add the string to the returned array */
- rb_ary_push(value,
- rb_str_new(metadata[i].value, metadata[i].value_length));
- } else {
- /* Add the current value with this key and the new one to an array */
- new_ary = rb_ary_new();
- rb_ary_push(new_ary, value);
- rb_ary_push(new_ary,
- rb_str_new(metadata[i].value, metadata[i].value_length));
- rb_hash_aset(result, key, new_ary);
- }
- }
- return result;
-}
-
-/* Obtains the data associated with an event. */
-static VALUE grpc_rb_event_result(VALUE self) {
- grpc_event *event = NULL;
- grpc_rb_event *wrapper = NULL;
- Data_Get_Struct(self, grpc_rb_event, wrapper);
- if (wrapper->wrapped == NULL) {
- rb_raise(rb_eRuntimeError, "finished!");
- return Qnil;
- }
- event = wrapper->wrapped;
-
- switch (event->type) {
- case GRPC_QUEUE_SHUTDOWN:
- return Qnil;
-
- case GRPC_READ:
- return grpc_rb_byte_buffer_create_with_mark(self, event->data.read);
-
- case GRPC_FINISH_ACCEPTED:
- if (event->data.finish_accepted == GRPC_OP_OK) {
- return Qnil;
- }
- rb_raise(rb_eEventError, "finish failed, not sure why (code=%d)",
- event->data.finish_accepted);
- break;
-
- case GRPC_WRITE_ACCEPTED:
- if (event->data.write_accepted == GRPC_OP_OK) {
- return Qnil;
- }
- rb_raise(rb_eEventError, "write failed, not sure why (code=%d)",
- event->data.write_accepted);
- break;
-
- case GRPC_CLIENT_METADATA_READ:
- return grpc_rb_event_metadata(self);
-
- case GRPC_FINISHED:
- return rb_struct_new(rb_sStatus, UINT2NUM(event->data.finished.status),
- (event->data.finished.details == NULL
- ? Qnil
- : rb_str_new2(event->data.finished.details)),
- grpc_rb_event_metadata(self), NULL);
- break;
-
- case GRPC_SERVER_RPC_NEW:
- return rb_struct_new(
- rb_sNewServerRpc, rb_str_new2(event->data.server_rpc_new.method),
- rb_str_new2(event->data.server_rpc_new.host),
- Data_Wrap_Struct(rb_cTimeVal, GC_NOT_MARKED, GC_DONT_FREE,
- (void *)&event->data.server_rpc_new.deadline),
- grpc_rb_event_metadata(self), NULL);
-
- default:
- rb_raise(rb_eRuntimeError, "unrecognized event code for an rpc event:%d",
- event->type);
- }
-
- return Qfalse;
-}
-
-static VALUE grpc_rb_event_finish(VALUE self) {
- grpc_event *event = NULL;
- grpc_rb_event *wrapper = NULL;
- Data_Get_Struct(self, grpc_rb_event, wrapper);
- if (wrapper->wrapped == NULL) { /* already closed */
- return Qnil;
- }
- event = wrapper->wrapped;
- grpc_event_finish(event);
- wrapper->wrapped = NULL;
- wrapper->mark = Qnil;
- return Qnil;
-}
-
-/* rb_cEvent is the Event class whose instances proxy grpc_event */
-VALUE rb_cEvent = Qnil;
-
-/* rb_eEventError is the ruby class of the exception thrown on failures during
- rpc event processing. */
-VALUE rb_eEventError = Qnil;
-
-void Init_grpc_event() {
- rb_eEventError =
- rb_define_class_under(rb_mGrpcCore, "EventError", rb_eStandardError);
- rb_cEvent = rb_define_class_under(rb_mGrpcCore, "Event", rb_cObject);
-
- /* Prevent allocation or inialization from ruby. */
- rb_define_alloc_func(rb_cEvent, grpc_rb_cannot_alloc);
- rb_define_method(rb_cEvent, "initialize", grpc_rb_cannot_init, 0);
- rb_define_method(rb_cEvent, "initialize_copy", grpc_rb_cannot_init_copy, 1);
-
- /* Accessors for the data available in an event. */
- rb_define_method(rb_cEvent, "call", grpc_rb_event_call, 0);
- rb_define_method(rb_cEvent, "result", grpc_rb_event_result, 0);
- rb_define_method(rb_cEvent, "tag", grpc_rb_event_tag, 0);
- rb_define_method(rb_cEvent, "type", grpc_rb_event_type, 0);
- rb_define_method(rb_cEvent, "finish", grpc_rb_event_finish, 0);
- rb_define_alias(rb_cEvent, "close", "finish");
-
- /* Constants representing the completion types */
- rb_mCompletionType =
- rb_define_module_under(rb_mGrpcCore, "CompletionType");
- rb_define_const(rb_mCompletionType, "QUEUE_SHUTDOWN",
- INT2NUM(GRPC_QUEUE_SHUTDOWN));
- rb_define_const(rb_mCompletionType, "OP_COMPLETE", INT2NUM(GRPC_OP_COMPLETE));
- rb_define_const(rb_mCompletionType, "READ", INT2NUM(GRPC_READ));
- rb_define_const(rb_mCompletionType, "WRITE_ACCEPTED",
- INT2NUM(GRPC_WRITE_ACCEPTED));
- rb_define_const(rb_mCompletionType, "FINISH_ACCEPTED",
- INT2NUM(GRPC_FINISH_ACCEPTED));
- rb_define_const(rb_mCompletionType, "CLIENT_METADATA_READ",
- INT2NUM(GRPC_CLIENT_METADATA_READ));
- rb_define_const(rb_mCompletionType, "FINISHED", INT2NUM(GRPC_FINISHED));
- rb_define_const(rb_mCompletionType, "SERVER_RPC_NEW",
- INT2NUM(GRPC_SERVER_RPC_NEW));
- rb_define_const(rb_mCompletionType, "SERVER_SHUTDOWN",
- INT2NUM(GRPC_SERVER_SHUTDOWN));
- rb_define_const(rb_mCompletionType, "RESERVED",
- INT2NUM(GRPC_COMPLETION_DO_NOT_USE));
-}
-
-VALUE grpc_rb_new_event(grpc_event *ev) {
- grpc_rb_event *wrapper = ALLOC(grpc_rb_event);
- wrapper->wrapped = ev;
- wrapper->mark = Qnil;
- return Data_Wrap_Struct(rb_cEvent, grpc_rb_event_mark, grpc_rb_event_free,
- wrapper);
-}
diff --git a/src/ruby/ext/grpc/rb_event.h b/src/ruby/ext/grpc/rb_event.h
deleted file mode 100644
index 3105934b11..0000000000
--- a/src/ruby/ext/grpc/rb_event.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_RB_EVENT_H_
-#define GRPC_RB_EVENT_H_
-
-#include <ruby.h>
-#include <grpc/grpc.h>
-
-/* rb_cEvent is the Event class whose instances proxy grpc_event. */
-extern VALUE rb_cEvent;
-
-/* rb_cEventError is the ruby class that acts the exception thrown during rpc
- event processing. */
-extern VALUE rb_eEventError;
-
-/* Used to create new ruby event objects */
-VALUE grpc_rb_new_event(grpc_event *ev);
-
-/* Initializes the Event and EventError classes. */
-void Init_grpc_event();
-
-#endif /* GRPC_RB_EVENT_H_ */
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index 400efd0dfa..699548b940 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -34,26 +34,27 @@
#include "rb_grpc.h"
#include <math.h>
-#include <ruby.h>
+#include <ruby/ruby.h>
+#include <ruby/vm.h>
#include <sys/time.h>
#include <grpc/grpc.h>
#include <grpc/support/time.h>
-#include "rb_byte_buffer.h"
#include "rb_call.h"
#include "rb_channel.h"
#include "rb_completion_queue.h"
-#include "rb_event.h"
-#include "rb_metadata.h"
#include "rb_server.h"
#include "rb_credentials.h"
#include "rb_server_credentials.h"
-/* Define common vars and funcs declared in rb.h */
-const RUBY_DATA_FUNC GC_NOT_MARKED = NULL;
-const RUBY_DATA_FUNC GC_DONT_FREE = NULL;
+static VALUE grpc_rb_cTimeVal = Qnil;
-VALUE rb_cTimeVal = Qnil;
+static rb_data_type_t grpc_rb_timespec_data_type = {
+ "gpr_timespec",
+ {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, GRPC_RB_MEMSIZE_UNAVAILABLE},
+ NULL,
+ NULL,
+ RUBY_TYPED_FREE_IMMEDIATELY};
/* Alloc func that blocks allocation of a given object by raising an
* exception. */
@@ -99,8 +100,9 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval) {
switch (TYPE(time)) {
case T_DATA:
- if (CLASS_OF(time) == rb_cTimeVal) {
- Data_Get_Struct(time, gpr_timespec, time_const);
+ if (CLASS_OF(time) == grpc_rb_cTimeVal) {
+ TypedData_Get_Struct(time, gpr_timespec, &grpc_rb_timespec_data_type,
+ time_const);
t = *time_const;
} else if (CLASS_OF(time) == rb_cTime) {
t.tv_sec = NUM2INT(rb_funcall(time, id_tv_sec, 0));
@@ -153,37 +155,43 @@ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval) {
return t;
}
-void Init_grpc_status_codes() {
+static void Init_grpc_status_codes() {
/* Constants representing the status codes or grpc_status_code in status.h */
- VALUE rb_mStatusCodes =
- rb_define_module_under(rb_mGrpcCore, "StatusCodes");
- rb_define_const(rb_mStatusCodes, "OK", INT2NUM(GRPC_STATUS_OK));
- rb_define_const(rb_mStatusCodes, "CANCELLED", INT2NUM(GRPC_STATUS_CANCELLED));
- rb_define_const(rb_mStatusCodes, "UNKNOWN", INT2NUM(GRPC_STATUS_UNKNOWN));
- rb_define_const(rb_mStatusCodes, "INVALID_ARGUMENT",
+ VALUE grpc_rb_mStatusCodes =
+ rb_define_module_under(grpc_rb_mGrpcCore, "StatusCodes");
+ rb_define_const(grpc_rb_mStatusCodes, "OK", INT2NUM(GRPC_STATUS_OK));
+ rb_define_const(grpc_rb_mStatusCodes, "CANCELLED",
+ INT2NUM(GRPC_STATUS_CANCELLED));
+ rb_define_const(grpc_rb_mStatusCodes, "UNKNOWN",
+ INT2NUM(GRPC_STATUS_UNKNOWN));
+ rb_define_const(grpc_rb_mStatusCodes, "INVALID_ARGUMENT",
INT2NUM(GRPC_STATUS_INVALID_ARGUMENT));
- rb_define_const(rb_mStatusCodes, "DEADLINE_EXCEEDED",
+ rb_define_const(grpc_rb_mStatusCodes, "DEADLINE_EXCEEDED",
INT2NUM(GRPC_STATUS_DEADLINE_EXCEEDED));
- rb_define_const(rb_mStatusCodes, "NOT_FOUND", INT2NUM(GRPC_STATUS_NOT_FOUND));
- rb_define_const(rb_mStatusCodes, "ALREADY_EXISTS",
+ rb_define_const(grpc_rb_mStatusCodes, "NOT_FOUND",
+ INT2NUM(GRPC_STATUS_NOT_FOUND));
+ rb_define_const(grpc_rb_mStatusCodes, "ALREADY_EXISTS",
INT2NUM(GRPC_STATUS_ALREADY_EXISTS));
- rb_define_const(rb_mStatusCodes, "PERMISSION_DENIED",
+ rb_define_const(grpc_rb_mStatusCodes, "PERMISSION_DENIED",
INT2NUM(GRPC_STATUS_PERMISSION_DENIED));
- rb_define_const(rb_mStatusCodes, "UNAUTHENTICATED",
+ rb_define_const(grpc_rb_mStatusCodes, "UNAUTHENTICATED",
INT2NUM(GRPC_STATUS_UNAUTHENTICATED));
- rb_define_const(rb_mStatusCodes, "RESOURCE_EXHAUSTED",
+ rb_define_const(grpc_rb_mStatusCodes, "RESOURCE_EXHAUSTED",
INT2NUM(GRPC_STATUS_RESOURCE_EXHAUSTED));
- rb_define_const(rb_mStatusCodes, "FAILED_PRECONDITION",
+ rb_define_const(grpc_rb_mStatusCodes, "FAILED_PRECONDITION",
INT2NUM(GRPC_STATUS_FAILED_PRECONDITION));
- rb_define_const(rb_mStatusCodes, "ABORTED", INT2NUM(GRPC_STATUS_ABORTED));
- rb_define_const(rb_mStatusCodes, "OUT_OF_RANGE",
+ rb_define_const(grpc_rb_mStatusCodes, "ABORTED",
+ INT2NUM(GRPC_STATUS_ABORTED));
+ rb_define_const(grpc_rb_mStatusCodes, "OUT_OF_RANGE",
INT2NUM(GRPC_STATUS_OUT_OF_RANGE));
- rb_define_const(rb_mStatusCodes, "UNIMPLEMENTED",
+ rb_define_const(grpc_rb_mStatusCodes, "UNIMPLEMENTED",
INT2NUM(GRPC_STATUS_UNIMPLEMENTED));
- rb_define_const(rb_mStatusCodes, "INTERNAL", INT2NUM(GRPC_STATUS_INTERNAL));
- rb_define_const(rb_mStatusCodes, "UNAVAILABLE",
+ rb_define_const(grpc_rb_mStatusCodes, "INTERNAL",
+ INT2NUM(GRPC_STATUS_INTERNAL));
+ rb_define_const(grpc_rb_mStatusCodes, "UNAVAILABLE",
INT2NUM(GRPC_STATUS_UNAVAILABLE));
- rb_define_const(rb_mStatusCodes, "DATA_LOSS", INT2NUM(GRPC_STATUS_DATA_LOSS));
+ rb_define_const(grpc_rb_mStatusCodes, "DATA_LOSS",
+ INT2NUM(GRPC_STATUS_DATA_LOSS));
}
/* id_at is the constructor method of the ruby standard Time class. */
@@ -195,42 +203,46 @@ static ID id_inspect;
/* id_to_s is the to_s method found on various ruby objects. */
static ID id_to_s;
-/* Converts `a wrapped time constant to a standard time. */
-VALUE grpc_rb_time_val_to_time(VALUE self) {
+/* Converts a wrapped time constant to a standard time. */
+static VALUE grpc_rb_time_val_to_time(VALUE self) {
gpr_timespec *time_const = NULL;
- Data_Get_Struct(self, gpr_timespec, time_const);
+ TypedData_Get_Struct(self, gpr_timespec, &grpc_rb_timespec_data_type,
+ time_const);
return rb_funcall(rb_cTime, id_at, 2, INT2NUM(time_const->tv_sec),
INT2NUM(time_const->tv_nsec));
}
/* Invokes inspect on the ctime version of the time val. */
-VALUE grpc_rb_time_val_inspect(VALUE self) {
+static VALUE grpc_rb_time_val_inspect(VALUE self) {
return rb_funcall(grpc_rb_time_val_to_time(self), id_inspect, 0);
}
/* Invokes to_s on the ctime version of the time val. */
-VALUE grpc_rb_time_val_to_s(VALUE self) {
+static VALUE grpc_rb_time_val_to_s(VALUE self) {
return rb_funcall(grpc_rb_time_val_to_time(self), id_to_s, 0);
}
/* Adds a module with constants that map to gpr's static timeval structs. */
-void Init_grpc_time_consts() {
- VALUE rb_mTimeConsts =
- rb_define_module_under(rb_mGrpcCore, "TimeConsts");
- rb_cTimeVal =
- rb_define_class_under(rb_mGrpcCore, "TimeSpec", rb_cObject);
- rb_define_const(rb_mTimeConsts, "ZERO",
- Data_Wrap_Struct(rb_cTimeVal, GC_NOT_MARKED, GC_DONT_FREE,
- (void *)&gpr_time_0));
- rb_define_const(rb_mTimeConsts, "INFINITE_FUTURE",
- Data_Wrap_Struct(rb_cTimeVal, GC_NOT_MARKED, GC_DONT_FREE,
- (void *)&gpr_inf_future));
- rb_define_const(rb_mTimeConsts, "INFINITE_PAST",
- Data_Wrap_Struct(rb_cTimeVal, GC_NOT_MARKED, GC_DONT_FREE,
- (void *)&gpr_inf_past));
- rb_define_method(rb_cTimeVal, "to_time", grpc_rb_time_val_to_time, 0);
- rb_define_method(rb_cTimeVal, "inspect", grpc_rb_time_val_inspect, 0);
- rb_define_method(rb_cTimeVal, "to_s", grpc_rb_time_val_to_s, 0);
+static void Init_grpc_time_consts() {
+ VALUE grpc_rb_mTimeConsts =
+ rb_define_module_under(grpc_rb_mGrpcCore, "TimeConsts");
+ grpc_rb_cTimeVal =
+ rb_define_class_under(grpc_rb_mGrpcCore, "TimeSpec", rb_cObject);
+ rb_define_const(
+ grpc_rb_mTimeConsts, "ZERO",
+ TypedData_Wrap_Struct(grpc_rb_cTimeVal, &grpc_rb_timespec_data_type,
+ (void *)&gpr_time_0));
+ rb_define_const(
+ grpc_rb_mTimeConsts, "INFINITE_FUTURE",
+ TypedData_Wrap_Struct(grpc_rb_cTimeVal, &grpc_rb_timespec_data_type,
+ (void *)&gpr_inf_future));
+ rb_define_const(
+ grpc_rb_mTimeConsts, "INFINITE_PAST",
+ TypedData_Wrap_Struct(grpc_rb_cTimeVal, &grpc_rb_timespec_data_type,
+ (void *)&gpr_inf_past));
+ rb_define_method(grpc_rb_cTimeVal, "to_time", grpc_rb_time_val_to_time, 0);
+ rb_define_method(grpc_rb_cTimeVal, "inspect", grpc_rb_time_val_inspect, 0);
+ rb_define_method(grpc_rb_cTimeVal, "to_s", grpc_rb_time_val_to_s, 0);
id_at = rb_intern("at");
id_inspect = rb_intern("inspect");
id_to_s = rb_intern("to_s");
@@ -238,35 +250,42 @@ void Init_grpc_time_consts() {
id_tv_nsec = rb_intern("tv_nsec");
}
-void grpc_rb_shutdown(void *vm) { grpc_shutdown(); }
+static void grpc_rb_shutdown(ruby_vm_t *vm) { grpc_shutdown(); }
/* Initialize the GRPC module structs */
-/* rb_sNewServerRpc is the struct that holds new server rpc details. */
-VALUE rb_sNewServerRpc = Qnil;
-/* rb_sStatus is the struct that holds status details. */
-VALUE rb_sStatus = Qnil;
+/* grpc_rb_sNewServerRpc is the struct that holds new server rpc details. */
+VALUE grpc_rb_sNewServerRpc = Qnil;
+/* grpc_rb_sStatus is the struct that holds status details. */
+VALUE grpc_rb_sStatus = Qnil;
/* Initialize the GRPC module. */
-VALUE rb_mGRPC = Qnil;
-VALUE rb_mGrpcCore = Qnil;
+VALUE grpc_rb_mGRPC = Qnil;
+VALUE grpc_rb_mGrpcCore = Qnil;
+
+/* cached Symbols for members in Status struct */
+VALUE sym_code = Qundef;
+VALUE sym_details = Qundef;
+VALUE sym_metadata = Qundef;
void Init_grpc() {
grpc_init();
ruby_vm_at_exit(grpc_rb_shutdown);
- rb_mGRPC = rb_define_module("GRPC");
- rb_mGrpcCore = rb_define_module_under(rb_mGRPC, "Core");
- rb_sNewServerRpc = rb_struct_define("NewServerRpc", "method", "host",
- "deadline", "metadata", NULL);
- rb_sStatus = rb_struct_define("Status", "code", "details", "metadata", NULL);
+ grpc_rb_mGRPC = rb_define_module("GRPC");
+ grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core");
+ grpc_rb_sNewServerRpc =
+ rb_struct_define("NewServerRpc", "method", "host",
+ "deadline", "metadata", "call", NULL);
+ grpc_rb_sStatus =
+ rb_struct_define("Status", "code", "details", "metadata", NULL);
+ sym_code = ID2SYM(rb_intern("code"));
+ sym_details = ID2SYM(rb_intern("details"));
+ sym_metadata = ID2SYM(rb_intern("metadata"));
- Init_grpc_byte_buffer();
- Init_grpc_event();
Init_grpc_channel();
Init_grpc_completion_queue();
Init_grpc_call();
Init_grpc_credentials();
- Init_grpc_metadata();
Init_grpc_server();
Init_grpc_server_credentials();
Init_grpc_status_codes();
diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h
index 851f5ee69f..a502273de1 100644
--- a/src/ruby/ext/grpc/rb_grpc.h
+++ b/src/ruby/ext/grpc/rb_grpc.h
@@ -38,26 +38,36 @@
#include <ruby.h>
#include <grpc/support/time.h>
-/* rb_mGrpcCore is the module containing the ruby wrapper GRPC classes. */
-extern VALUE rb_mGrpcCore;
+/* grpc_rb_mGrpcCore is the module containing the ruby wrapper GRPC classes. */
+extern VALUE grpc_rb_mGrpcCore;
-/* Class used to wrap timeval structs. */
-extern VALUE rb_cTimeVal;
+/* grpc_rb_sNewServerRpc is the struct that holds new server rpc details. */
+extern VALUE grpc_rb_sNewServerRpc;
-/* rb_sNewServerRpc is the struct that holds new server rpc details. */
-extern VALUE rb_sNewServerRpc;
+/* grpc_rb_sStruct is the struct that holds status details. */
+extern VALUE grpc_rb_sStatus;
-/* rb_sStruct is the struct that holds status details. */
-extern VALUE rb_sStatus;
+/* sym_code is the symbol for the code attribute of grpc_rb_sStatus. */
+extern VALUE sym_code;
+
+/* sym_details is the symbol for the details attribute of grpc_rb_sStatus. */
+extern VALUE sym_details;
+
+/* sym_metadata is the symbol for the metadata attribute of grpc_rb_sStatus. */
+extern VALUE sym_metadata;
/* GC_NOT_MARKED is used in calls to Data_Wrap_Struct to indicate that the
wrapped struct does not need to participate in ruby gc. */
-extern const RUBY_DATA_FUNC GC_NOT_MARKED;
+#define GRPC_RB_GC_NOT_MARKED (RUBY_DATA_FUNC)(NULL)
/* GC_DONT_FREED is used in calls to Data_Wrap_Struct to indicate that the
wrapped struct should not be freed the wrapped ruby object is released by
the garbage collector. */
-extern const RUBY_DATA_FUNC GC_DONT_FREE;
+#define GRPC_RB_GC_DONT_FREE (RUBY_DATA_FUNC)(NULL)
+
+/* GRPC_RB_MEMSIZE_UNAVAILABLE is used in rb_data_type_t to indicate that the
+ * number of bytes used by the wrapped struct is not available. */
+#define GRPC_RB_MEMSIZE_UNAVAILABLE (size_t (*)(const void*))(NULL)
/* A ruby object alloc func that fails by raising an exception. */
VALUE grpc_rb_cannot_alloc(VALUE cls);
diff --git a/src/ruby/ext/grpc/rb_metadata.c b/src/ruby/ext/grpc/rb_metadata.c
deleted file mode 100644
index 7622a8c57e..0000000000
--- a/src/ruby/ext/grpc/rb_metadata.c
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "rb_metadata.h"
-
-#include <ruby.h>
-#include <string.h>
-
-#include <grpc/grpc.h>
-#include "rb_grpc.h"
-
-/* grpc_rb_metadata wraps a grpc_metadata. It provides a peer ruby object,
- * 'mark' to minimize copying when a metadata is created from ruby. */
-typedef struct grpc_rb_metadata {
- /* Holder of ruby objects involved in constructing the metadata */
- VALUE mark;
- /* The actual metadata */
- grpc_metadata *wrapped;
-} grpc_rb_metadata;
-
-/* Destroys Metadata instances. */
-static void grpc_rb_metadata_free(void *p) {
- if (p == NULL) {
- return;
- };
-
- /* Because metadata is only created during a call to grpc_call_add_metadata,
- * and the call takes ownership of the metadata, this does not free the
- * wrapped struct, only the wrapper */
- xfree(p);
-}
-
-/* Protects the mark object from GC */
-static void grpc_rb_metadata_mark(void *p) {
- grpc_rb_metadata *md = NULL;
- if (p == NULL) {
- return;
- }
-
- md = (grpc_rb_metadata *)p;
- /* If it's not already cleaned up, mark the mark object */
- if (md->mark != Qnil && BUILTIN_TYPE(md->mark) != T_NONE) {
- rb_gc_mark(md->mark);
- }
-}
-
-/* Allocates Metadata instances.
-
- Provides safe default values for the Metadata fields. */
-static VALUE grpc_rb_metadata_alloc(VALUE cls) {
- grpc_rb_metadata *wrapper = ALLOC(grpc_rb_metadata);
- wrapper->wrapped = NULL;
- wrapper->mark = Qnil;
- return Data_Wrap_Struct(cls, grpc_rb_metadata_mark, grpc_rb_metadata_free,
- wrapper);
-}
-
-/* id_key and id_value are the names of the hidden ivars that preserve the
- * original byte_buffer source string */
-static ID id_key;
-static ID id_value;
-
-/* Initializes Metadata instances. */
-static VALUE grpc_rb_metadata_init(VALUE self, VALUE key, VALUE value) {
- grpc_rb_metadata *wrapper = NULL;
- grpc_metadata *md = ALLOC(grpc_metadata);
-
- /* Use direct pointers to the strings wrapped by the ruby object to avoid
- * copying */
- Data_Get_Struct(self, grpc_rb_metadata, wrapper);
- wrapper->wrapped = md;
- if (TYPE(key) == T_SYMBOL) {
- md->key = (char *)rb_id2name(SYM2ID(key));
- } else { /* StringValueCStr does all other type exclusions for us */
- md->key = StringValueCStr(key);
- }
- md->value = RSTRING_PTR(value);
- md->value_length = RSTRING_LEN(value);
-
- /* Save references to the original values on the mark object so that the
- * pointers used there are valid for the lifetime of the object. */
- wrapper->mark = rb_class_new_instance(0, NULL, rb_cObject);
- rb_ivar_set(wrapper->mark, id_key, key);
- rb_ivar_set(wrapper->mark, id_value, value);
-
- return self;
-}
-
-/* Clones Metadata instances.
-
- Gives Metadata a consistent implementation of Ruby's object copy/dup
- protocol. */
-static VALUE grpc_rb_metadata_init_copy(VALUE copy, VALUE orig) {
- grpc_rb_metadata *orig_md = NULL;
- grpc_rb_metadata *copy_md = NULL;
-
- if (copy == orig) {
- return copy;
- }
-
- /* Raise an error if orig is not a metadata object or a subclass. */
- if (TYPE(orig) != T_DATA ||
- RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_metadata_free) {
- rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(rb_cMetadata));
- }
-
- Data_Get_Struct(orig, grpc_rb_metadata, orig_md);
- Data_Get_Struct(copy, grpc_rb_metadata, copy_md);
-
- /* use ruby's MEMCPY to make a byte-for-byte copy of the metadata wrapper
- * object. */
- MEMCPY(copy_md, orig_md, grpc_rb_metadata, 1);
- return copy;
-}
-
-/* Gets the key from a metadata instance. */
-static VALUE grpc_rb_metadata_key(VALUE self) {
- VALUE key = Qnil;
- grpc_rb_metadata *wrapper = NULL;
- grpc_metadata *md = NULL;
-
- Data_Get_Struct(self, grpc_rb_metadata, wrapper);
- if (wrapper->mark != Qnil) {
- key = rb_ivar_get(wrapper->mark, id_key);
- if (key != Qnil) {
- return key;
- }
- }
-
- md = wrapper->wrapped;
- if (md == NULL || md->key == NULL) {
- return Qnil;
- }
- return rb_str_new2(md->key);
-}
-
-/* Gets the value from a metadata instance. */
-static VALUE grpc_rb_metadata_value(VALUE self) {
- VALUE val = Qnil;
- grpc_rb_metadata *wrapper = NULL;
- grpc_metadata *md = NULL;
-
- Data_Get_Struct(self, grpc_rb_metadata, wrapper);
- if (wrapper->mark != Qnil) {
- val = rb_ivar_get(wrapper->mark, id_value);
- if (val != Qnil) {
- return val;
- }
- }
-
- md = wrapper->wrapped;
- if (md == NULL || md->value == NULL) {
- return Qnil;
- }
- return rb_str_new2(md->value);
-}
-
-/* rb_cMetadata is the Metadata class whose instances proxy grpc_metadata. */
-VALUE rb_cMetadata = Qnil;
-void Init_grpc_metadata() {
- rb_cMetadata =
- rb_define_class_under(rb_mGrpcCore, "Metadata", rb_cObject);
-
- /* Allocates an object managed by the ruby runtime */
- rb_define_alloc_func(rb_cMetadata, grpc_rb_metadata_alloc);
-
- /* Provides a ruby constructor and support for dup/clone. */
- rb_define_method(rb_cMetadata, "initialize", grpc_rb_metadata_init, 2);
- rb_define_method(rb_cMetadata, "initialize_copy", grpc_rb_metadata_init_copy,
- 1);
-
- /* Provides accessors for the code and details. */
- rb_define_method(rb_cMetadata, "key", grpc_rb_metadata_key, 0);
- rb_define_method(rb_cMetadata, "value", grpc_rb_metadata_value, 0);
-
- id_key = rb_intern("__key");
- id_value = rb_intern("__value");
-}
-
-/* Gets the wrapped metadata from the ruby wrapper */
-grpc_metadata *grpc_rb_get_wrapped_metadata(VALUE v) {
- grpc_rb_metadata *wrapper = NULL;
- Data_Get_Struct(v, grpc_rb_metadata, wrapper);
- return wrapper->wrapped;
-}
diff --git a/src/ruby/ext/grpc/rb_metadata.h b/src/ruby/ext/grpc/rb_metadata.h
deleted file mode 100644
index 251072f658..0000000000
--- a/src/ruby/ext/grpc/rb_metadata.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_RB_METADATA_H_
-#define GRPC_RB_METADATA_H_
-
-#include <grpc/grpc.h>
-#include <ruby.h>
-
-/* rb_cMetadata is the Metadata class whose instances proxy grpc_metadata. */
-extern VALUE rb_cMetadata;
-
-/* grpc_rb_metadata_create_with_mark creates a grpc_rb_metadata with a ruby mark
- * object that will be kept alive while the metadata is alive. */
-extern VALUE grpc_rb_metadata_create_with_mark(VALUE mark, grpc_metadata* md);
-
-/* Gets the wrapped metadata from the ruby wrapper */
-grpc_metadata* grpc_rb_get_wrapped_metadata(VALUE v);
-
-/* Initializes the Metadata class. */
-void Init_grpc_metadata();
-
-#endif /* GRPC_RB_METADATA_H_ */
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index c54f02e87a..bc0878af05 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -43,8 +43,11 @@
#include "rb_server_credentials.h"
#include "rb_grpc.h"
-/* rb_cServer is the ruby class that proxies grpc_server. */
-VALUE rb_cServer = Qnil;
+/* grpc_rb_cServer is the ruby class that proxies grpc_server. */
+static VALUE grpc_rb_cServer = Qnil;
+
+/* id_at is the constructor method of the ruby standard Time class. */
+static ID id_at;
/* grpc_rb_server wraps a grpc_server. It provides a peer ruby object,
'mark' to minimize copying when a server is created from ruby. */
@@ -85,13 +88,23 @@ static void grpc_rb_server_mark(void *p) {
}
}
+static const rb_data_type_t grpc_rb_server_data_type = {
+ "grpc_server",
+ {grpc_rb_server_mark, grpc_rb_server_free, GRPC_RB_MEMSIZE_UNAVAILABLE},
+ NULL,
+ NULL,
+ /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free function would block
+ * and we might want to unlock GVL
+ * TODO(yugui) Unlock GVL?
+ */
+ 0};
+
/* Allocates grpc_rb_server instances. */
static VALUE grpc_rb_server_alloc(VALUE cls) {
grpc_rb_server *wrapper = ALLOC(grpc_rb_server);
wrapper->wrapped = NULL;
wrapper->mark = Qnil;
- return Data_Wrap_Struct(cls, grpc_rb_server_mark, grpc_rb_server_free,
- wrapper);
+ return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper);
}
/*
@@ -107,7 +120,8 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) {
grpc_channel_args args;
MEMZERO(&args, grpc_channel_args, 1);
cq = grpc_rb_get_wrapped_completion_queue(cqueue);
- Data_Get_Struct(self, grpc_rb_server, wrapper);
+ TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type,
+ wrapper);
grpc_rb_hash_convert_to_channel_args(channel_args, &args);
srv = grpc_server_create(cq, &args);
@@ -140,11 +154,13 @@ static VALUE grpc_rb_server_init_copy(VALUE copy, VALUE orig) {
/* Raise an error if orig is not a server object or a subclass. */
if (TYPE(orig) != T_DATA ||
RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_server_free) {
- rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(rb_cServer));
+ rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(grpc_rb_cServer));
}
- Data_Get_Struct(orig, grpc_rb_server, orig_srv);
- Data_Get_Struct(copy, grpc_rb_server, copy_srv);
+ TypedData_Get_Struct(orig, grpc_rb_server, &grpc_rb_server_data_type,
+ orig_srv);
+ TypedData_Get_Struct(copy, grpc_rb_server, &grpc_rb_server_data_type,
+ copy_srv);
/* use ruby's MEMCPY to make a byte-for-byte copy of the server wrapper
object. */
@@ -152,25 +168,97 @@ static VALUE grpc_rb_server_init_copy(VALUE copy, VALUE orig) {
return copy;
}
-static VALUE grpc_rb_server_request_call(VALUE self, VALUE tag_new) {
- grpc_call_error err;
+/* request_call_stack holds various values used by the
+ * grpc_rb_server_request_call function */
+typedef struct request_call_stack {
+ grpc_call_details details;
+ grpc_metadata_array md_ary;
+} request_call_stack;
+
+/* grpc_request_call_stack_init ensures the request_call_stack is properly
+ * initialized */
+static void grpc_request_call_stack_init(request_call_stack* st) {
+ MEMZERO(st, request_call_stack, 1);
+ grpc_metadata_array_init(&st->md_ary);
+ grpc_call_details_init(&st->details);
+ st->details.method = NULL;
+ st->details.host = NULL;
+}
+
+/* grpc_request_call_stack_cleanup ensures the request_call_stack is properly
+ * cleaned up */
+static void grpc_request_call_stack_cleanup(request_call_stack* st) {
+ grpc_metadata_array_destroy(&st->md_ary);
+ grpc_call_details_destroy(&st->details);
+}
+
+/* call-seq:
+ cq = CompletionQueue.new
+ tag = Object.new
+ timeout = 10
+ server.request_call(cqueue, tag, timeout)
+
+ Requests notification of a new call on a server. */
+static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
+ VALUE tag_new, VALUE timeout) {
grpc_rb_server *s = NULL;
- Data_Get_Struct(self, grpc_rb_server, s);
+ grpc_call *call = NULL;
+ grpc_event *ev = NULL;
+ grpc_call_error err;
+ request_call_stack st;
+ VALUE result;
+ TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
+ return Qnil;
} else {
- err = grpc_server_request_call_old(s->wrapped, ROBJECT(tag_new));
+ grpc_request_call_stack_init(&st);
+ /* call grpc_server_request_call, then wait for it to complete using
+ * pluck_event */
+ err = grpc_server_request_call(
+ s->wrapped, &call, &st.details, &st.md_ary,
+ grpc_rb_get_wrapped_completion_queue(cqueue),
+ ROBJECT(tag_new));
if (err != GRPC_CALL_OK) {
- rb_raise(rb_eCallError, "server request failed: %s (code=%d)",
+ grpc_request_call_stack_cleanup(&st);
+ rb_raise(grpc_rb_eCallError,
+ "grpc_server_request_call failed: %s (code=%d)",
grpc_call_error_detail_of(err), err);
+ return Qnil;
}
+ ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout);
+ if (ev == NULL) {
+ grpc_request_call_stack_cleanup(&st);
+ return Qnil;
+ }
+ if (ev->data.op_complete != GRPC_OP_OK) {
+ grpc_request_call_stack_cleanup(&st);
+ grpc_event_finish(ev);
+ rb_raise(grpc_rb_eCallError, "request_call completion failed: (code=%d)",
+ ev->data.op_complete);
+ return Qnil;
+ }
+
+ /* build the NewServerRpc struct result */
+ result = rb_struct_new(
+ grpc_rb_sNewServerRpc,
+ rb_str_new2(st.details.method),
+ rb_str_new2(st.details.host),
+ rb_funcall(rb_cTime, id_at, 2, INT2NUM(st.details.deadline.tv_sec),
+ INT2NUM(st.details.deadline.tv_nsec)),
+ grpc_rb_md_ary_to_h(&st.md_ary),
+ grpc_rb_wrap_call(call),
+ NULL);
+ grpc_event_finish(ev);
+ grpc_request_call_stack_cleanup(&st);
+ return result;
}
return Qnil;
}
static VALUE grpc_rb_server_start(VALUE self) {
grpc_rb_server *s = NULL;
- Data_Get_Struct(self, grpc_rb_server, s);
+ TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
} else {
@@ -181,7 +269,7 @@ static VALUE grpc_rb_server_start(VALUE self) {
static VALUE grpc_rb_server_destroy(VALUE self) {
grpc_rb_server *s = NULL;
- Data_Get_Struct(self, grpc_rb_server, s);
+ TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped != NULL) {
grpc_server_shutdown(s->wrapped);
grpc_server_destroy(s->wrapped);
@@ -213,7 +301,7 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) {
/* "11" == 1 mandatory args, 1 (rb_creds) is optional */
rb_scan_args(argc, argv, "11", &port, &rb_creds);
- Data_Get_Struct(self, grpc_rb_server, s);
+ TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
@@ -239,27 +327,32 @@ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) {
}
void Init_grpc_server() {
- rb_cServer = rb_define_class_under(rb_mGrpcCore, "Server", rb_cObject);
+ grpc_rb_cServer =
+ rb_define_class_under(grpc_rb_mGrpcCore, "Server", rb_cObject);
/* Allocates an object managed by the ruby runtime */
- rb_define_alloc_func(rb_cServer, grpc_rb_server_alloc);
+ rb_define_alloc_func(grpc_rb_cServer, grpc_rb_server_alloc);
/* Provides a ruby constructor and support for dup/clone. */
- rb_define_method(rb_cServer, "initialize", grpc_rb_server_init, 2);
- rb_define_method(rb_cServer, "initialize_copy", grpc_rb_server_init_copy, 1);
+ rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 2);
+ rb_define_method(grpc_rb_cServer, "initialize_copy",
+ grpc_rb_server_init_copy, 1);
/* Add the server methods. */
- rb_define_method(rb_cServer, "request_call", grpc_rb_server_request_call, 1);
- rb_define_method(rb_cServer, "start", grpc_rb_server_start, 0);
- rb_define_method(rb_cServer, "destroy", grpc_rb_server_destroy, 0);
- rb_define_alias(rb_cServer, "close", "destroy");
- rb_define_method(rb_cServer, "add_http2_port", grpc_rb_server_add_http2_port,
+ rb_define_method(grpc_rb_cServer, "request_call",
+ grpc_rb_server_request_call, 3);
+ rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
+ rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0);
+ rb_define_alias(grpc_rb_cServer, "close", "destroy");
+ rb_define_method(grpc_rb_cServer, "add_http2_port",
+ grpc_rb_server_add_http2_port,
-1);
+ id_at = rb_intern("at");
}
/* Gets the wrapped server from the ruby wrapper */
grpc_server *grpc_rb_get_wrapped_server(VALUE v) {
grpc_rb_server *wrapper = NULL;
- Data_Get_Struct(v, grpc_rb_server, wrapper);
+ TypedData_Get_Struct(v, grpc_rb_server, &grpc_rb_server_data_type, wrapper);
return wrapper->wrapped;
}
diff --git a/src/ruby/ext/grpc/rb_server.h b/src/ruby/ext/grpc/rb_server.h
index 2726b9a50a..5e4b711d35 100644
--- a/src/ruby/ext/grpc/rb_server.h
+++ b/src/ruby/ext/grpc/rb_server.h
@@ -37,10 +37,6 @@
#include <ruby.h>
#include <grpc/grpc.h>
-/* rb_cServer is the Server class whose instances proxy
- grpc_byte_buffer. */
-extern VALUE rb_cServer;
-
/* Initializes the Server class. */
void Init_grpc_server();
diff --git a/src/ruby/ext/grpc/rb_server_credentials.c b/src/ruby/ext/grpc/rb_server_credentials.c
index fb02987870..a86389445f 100644
--- a/src/ruby/ext/grpc/rb_server_credentials.c
+++ b/src/ruby/ext/grpc/rb_server_credentials.c
@@ -40,6 +40,10 @@
#include "rb_grpc.h"
+/* grpc_rb_cServerCredentials is the ruby class that proxies
+ grpc_server_credentials. */
+static VALUE grpc_rb_cServerCredentials = Qnil;
+
/* grpc_rb_server_credentials wraps a grpc_server_credentials. It provides a
peer ruby object, 'mark' to minimize copying when a server credential is
created from ruby. */
@@ -82,6 +86,14 @@ static void grpc_rb_server_credentials_mark(void *p) {
}
}
+static const rb_data_type_t grpc_rb_server_credentials_data_type = {
+ "grpc_server_credentials",
+ {grpc_rb_server_credentials_mark, grpc_rb_server_credentials_free,
+ GRPC_RB_MEMSIZE_UNAVAILABLE},
+ NULL, NULL,
+ RUBY_TYPED_FREE_IMMEDIATELY
+};
+
/* Allocates ServerCredential instances.
Provides safe initial defaults for the instance fields. */
@@ -89,8 +101,8 @@ static VALUE grpc_rb_server_credentials_alloc(VALUE cls) {
grpc_rb_server_credentials *wrapper = ALLOC(grpc_rb_server_credentials);
wrapper->wrapped = NULL;
wrapper->mark = Qnil;
- return Data_Wrap_Struct(cls, grpc_rb_server_credentials_mark,
- grpc_rb_server_credentials_free, wrapper);
+ return TypedData_Wrap_Struct(cls, &grpc_rb_server_credentials_data_type,
+ wrapper);
}
/* Clones ServerCredentials instances.
@@ -109,11 +121,13 @@ static VALUE grpc_rb_server_credentials_init_copy(VALUE copy, VALUE orig) {
if (TYPE(orig) != T_DATA ||
RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_server_credentials_free) {
rb_raise(rb_eTypeError, "not a %s",
- rb_obj_classname(rb_cServerCredentials));
+ rb_obj_classname(grpc_rb_cServerCredentials));
}
- Data_Get_Struct(orig, grpc_rb_server_credentials, orig_ch);
- Data_Get_Struct(copy, grpc_rb_server_credentials, copy_ch);
+ TypedData_Get_Struct(orig, grpc_rb_server_credentials,
+ &grpc_rb_server_credentials_data_type, orig_ch);
+ TypedData_Get_Struct(copy, grpc_rb_server_credentials,
+ &grpc_rb_server_credentials_data_type, copy_ch);
/* use ruby's MEMCPY to make a byte-for-byte copy of the server_credentials
wrapper object. */
@@ -149,7 +163,8 @@ static VALUE grpc_rb_server_credentials_init(VALUE self, VALUE pem_root_certs,
grpc_rb_server_credentials *wrapper = NULL;
grpc_server_credentials *creds = NULL;
grpc_ssl_pem_key_cert_pair key_cert_pair = {NULL, NULL};
- Data_Get_Struct(self, grpc_rb_server_credentials, wrapper);
+ TypedData_Get_Struct(self, grpc_rb_server_credentials,
+ &grpc_rb_server_credentials_data_type, wrapper);
if (pem_cert_chain == Qnil) {
rb_raise(rb_eRuntimeError,
"could not create a server credential: nil pem_cert_chain");
@@ -180,21 +195,18 @@ static VALUE grpc_rb_server_credentials_init(VALUE self, VALUE pem_root_certs,
return self;
}
-/* rb_cServerCredentials is the ruby class that proxies
- grpc_server_credentials. */
-VALUE rb_cServerCredentials = Qnil;
-
void Init_grpc_server_credentials() {
- rb_cServerCredentials =
- rb_define_class_under(rb_mGrpcCore, "ServerCredentials", rb_cObject);
+ grpc_rb_cServerCredentials =
+ rb_define_class_under(grpc_rb_mGrpcCore, "ServerCredentials", rb_cObject);
/* Allocates an object managed by the ruby runtime */
- rb_define_alloc_func(rb_cServerCredentials, grpc_rb_server_credentials_alloc);
+ rb_define_alloc_func(grpc_rb_cServerCredentials,
+ grpc_rb_server_credentials_alloc);
/* Provides a ruby constructor and support for dup/clone. */
- rb_define_method(rb_cServerCredentials, "initialize",
+ rb_define_method(grpc_rb_cServerCredentials, "initialize",
grpc_rb_server_credentials_init, 3);
- rb_define_method(rb_cServerCredentials, "initialize_copy",
+ rb_define_method(grpc_rb_cServerCredentials, "initialize_copy",
grpc_rb_server_credentials_init_copy, 1);
id_pem_cert_chain = rb_intern("__pem_cert_chain");
@@ -205,6 +217,7 @@ void Init_grpc_server_credentials() {
/* Gets the wrapped grpc_server_credentials from the ruby wrapper */
grpc_server_credentials *grpc_rb_get_wrapped_server_credentials(VALUE v) {
grpc_rb_server_credentials *wrapper = NULL;
- Data_Get_Struct(v, grpc_rb_server_credentials, wrapper);
+ TypedData_Get_Struct(v, grpc_rb_server_credentials,
+ &grpc_rb_server_credentials_data_type, wrapper);
return wrapper->wrapped;
}
diff --git a/src/ruby/ext/grpc/rb_server_credentials.h b/src/ruby/ext/grpc/rb_server_credentials.h
index ef377195a0..35b395ad5c 100644
--- a/src/ruby/ext/grpc/rb_server_credentials.h
+++ b/src/ruby/ext/grpc/rb_server_credentials.h
@@ -37,10 +37,6 @@
#include <ruby.h>
#include <grpc/grpc_security.h>
-/* rb_cServerCredentials is the ruby class whose instances proxy
- grpc_server_credentials. */
-extern VALUE rb_cServerCredentials;
-
/* Initializes the ruby ServerCredentials class. */
void Init_grpc_server_credentials();
diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec
index 45cbacfeb0..19b3e21cb6 100755
--- a/src/ruby/grpc.gemspec
+++ b/src/ruby/grpc.gemspec
@@ -13,6 +13,9 @@ Gem::Specification.new do |s|
s.description = 'Send RPCs from Ruby using GRPC'
s.license = 'BSD-3-Clause'
+ s.required_ruby_version = '>= 2.0.0'
+ s.requirements << 'libgrpc ~> 0.6.0 needs to be installed'
+
s.files = `git ls-files`.split("\n")
s.test_files = `git ls-files -- spec/*`.split("\n")
s.executables = `git ls-files -- bin/*.rb`.split("\n").map do |f|
@@ -22,16 +25,16 @@ Gem::Specification.new do |s|
s.platform = Gem::Platform::RUBY
s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1'
- s.add_dependency 'googleauth', '~> 0.1'
- s.add_dependency 'logging', '~> 1.8'
+ s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests
+ s.add_dependency 'logging', '~> 2.0'
s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests
- s.add_dependency 'xray', '~> 1.1'
- s.add_development_dependency 'bundler', '~> 1.7'
- s.add_development_dependency 'rake', '~> 10.0'
- s.add_development_dependency 'rake-compiler', '~> 0'
- s.add_development_dependency 'rubocop', '~> 0.28.0'
- s.add_development_dependency 'rspec', '~> 3.0'
+ s.add_development_dependency 'simplecov', '~> 0.9'
+ s.add_development_dependency 'bundler', '~> 1.9'
+ s.add_development_dependency 'rake', '~> 10.4'
+ s.add_development_dependency 'rake-compiler', '~> 0.9'
+ s.add_development_dependency 'rspec', '~> 3.2'
+ s.add_development_dependency 'rubocop', '~> 0.30'
s.extensions = %w(ext/grpc/extconf.rb)
end
diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb
index dd02ef7666..80b5743e91 100644
--- a/src/ruby/lib/grpc.rb
+++ b/src/ruby/lib/grpc.rb
@@ -30,8 +30,8 @@
require 'grpc/errors'
require 'grpc/grpc'
require 'grpc/logconfig'
+require 'grpc/notifier'
require 'grpc/version'
-require 'grpc/core/event'
require 'grpc/core/time_consts'
require 'grpc/generic/active_call'
require 'grpc/generic/client_stub'
diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb
index 58944872b5..f1201c1704 100644
--- a/src/ruby/lib/grpc/errors.rb
+++ b/src/ruby/lib/grpc/errors.rb
@@ -31,23 +31,20 @@ require 'grpc'
# GRPC contains the General RPC module.
module GRPC
- # OutOfTime is an exception class that indicates that an RPC exceeded its
- # deadline.
- OutOfTime = Class.new(StandardError)
-
# BadStatus is an exception class that indicates that an error occurred at
# either end of a GRPC connection. When raised, it indicates that a status
# error should be returned to the other end of a GRPC connection; when
# caught it means that this end received a status error.
class BadStatus < StandardError
- attr_reader :code, :details
+ attr_reader :code, :details, :metadata
# @param code [Numeric] the status code
# @param details [String] the details of the exception
- def initialize(code, details = 'unknown cause')
+ def initialize(code, details = 'unknown cause', **kw)
super("#{code}:#{details}")
@code = code
@details = details
+ @metadata = kw
end
# Converts the exception to a GRPC::Status for use in the networking
@@ -55,7 +52,11 @@ module GRPC
#
# @return [Status] with the same code and details
def to_status
- Status.new(code, details)
+ Struct::Status.new(code, details, @metadata)
end
end
+
+ # Cancelled is an exception class that indicates that an rpc was cancelled.
+ class Cancelled < StandardError
+ end
end
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 6256330e88..947c39cd22 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -30,10 +30,23 @@
require 'forwardable'
require 'grpc/generic/bidi_call'
-def assert_event_type(ev, want)
- fail OutOfTime if ev.nil?
- got = ev.type
- fail "Unexpected rpc event: got #{got}, want #{want}" unless got == want
+class Struct
+ # BatchResult is the struct returned by calls to call#start_batch.
+ class BatchResult
+ # check_status returns the status, raising an error if the status
+ # is non-nil and not OK.
+ def check_status
+ return nil if status.nil?
+ fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
+ if status.code != GRPC::Core::StatusCodes::OK
+ # raise BadStatus, propagating the metadata if present.
+ md = status.metadata
+ with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }]
+ fail GRPC::BadStatus.new(status.code, status.details, **with_sym_keys)
+ end
+ status
+ end
+ end
end
# GRPC contains the General RPC module.
@@ -41,10 +54,12 @@ module GRPC
# The ActiveCall class provides simple methods for sending marshallable
# data to a call
class ActiveCall
- include Core::CompletionType
include Core::StatusCodes
include Core::TimeConsts
+ include Core::CallOps
+ extend Forwardable
attr_reader(:deadline)
+ def_delegators :@call, :cancel, :metadata
# client_invoke begins a client invocation.
#
@@ -61,15 +76,14 @@ module GRPC
# @param q [CompletionQueue] the completion queue
# @param deadline [Fixnum,TimeSpec] the deadline
def self.client_invoke(call, q, _deadline, **kw)
- fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
+ fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
- fail(ArgumentError, 'not a CompletionQueue')
+ fail(TypeError, '!Core::CompletionQueue')
end
- call.add_metadata(kw) if kw.length > 0
- client_metadata_read = Object.new
- finished_tag = Object.new
- call.invoke(q, client_metadata_read, finished_tag)
- [finished_tag, client_metadata_read]
+ metadata_tag = Object.new
+ call.run_batch(q, metadata_tag, INFINITE_FUTURE,
+ SEND_INITIAL_METADATA => kw)
+ metadata_tag
end
# Creates an ActiveCall.
@@ -91,69 +105,27 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
- # @param finished_tag [Object] the object used as the call's finish tag,
- # if the call has begun
- # @param read_metadata_tag [Object] the object used as the call's finish
- # tag, if the call has begun
+ # @param metadata_tag [Object] the object use obtain metadata for clients
# @param started [true|false] indicates if the call has begun
- def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
- read_metadata_tag: nil, started: true)
- fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
+ def initialize(call, q, marshal, unmarshal, deadline, started: true,
+ metadata_tag: nil)
+ fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
- fail(ArgumentError, 'not a CompletionQueue')
+ fail(TypeError, '!Core::CompletionQueue')
end
@call = call
@cq = q
@deadline = deadline
- @finished_tag = finished_tag
- @read_metadata_tag = read_metadata_tag
@marshal = marshal
@started = started
@unmarshal = unmarshal
+ @metadata_tag = metadata_tag
end
- # Obtains the status of the call.
- #
- # this value is nil until the call completes
- # @return this call's status
- def status
- @call.status
- end
-
- # Obtains the metadata of the call.
- #
- # At the start of the call this will be nil. During the call this gets
- # some values as soon as the other end of the connection acknowledges the
- # request.
- #
- # @return this calls's metadata
- def metadata
- @call.metadata
- end
-
- # Cancels the call.
- #
- # Cancels the call. The call does not return any result, but once this it
- # has been called, the call should eventually terminate. Due to potential
- # races between the execution of the cancel and the in-flight request, the
- # result of the call after calling #cancel is indeterminate:
- #
- # - the call may terminate with a BadStatus exception, with code=CANCELLED
- # - the call may terminate with OK Status, and return a response
- # - the call may terminate with a different BadStatus exception if that
- # was happening
- def cancel
- @call.cancel
- end
-
- # indicates if the call is shutdown
- def shutdown
- @shutdown ||= false
- end
-
- # indicates if the call is cancelled.
- def cancelled
- @cancelled ||= false
+ # output_metadata are provides access to hash that can be used to
+ # save metadata to be sent as trailer
+ def output_metadata
+ @output_metadata ||= {}
end
# multi_req_view provides a restricted view of this ActiveCall for use
@@ -176,128 +148,94 @@ module GRPC
# writes_done indicates that all writes are completed.
#
- # It blocks until the remote endpoint acknowledges by sending a FINISHED
- # event, unless assert_finished is set to false. Any calls to
- # #remote_send after this call will fail.
+ # It blocks until the remote endpoint acknowledges with at status unless
+ # assert_finished is set to false. Any calls to #remote_send after this
+ # call will fail.
#
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def writes_done(assert_finished = true)
- @call.writes_done(self)
- ev = @cq.pluck(self, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISH_ACCEPTED)
- logger.debug("Writes done: waiting for finish? #{assert_finished}")
- ensure
- ev.close
- end
-
+ ops = {
+ SEND_CLOSE_FROM_CLIENT => nil
+ }
+ ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
+ batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
return unless assert_finished
- ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
- fail 'unexpected nil event' if ev.nil?
- ev.close
- @call.status
+ batch_result.check_status
end
- # finished waits until the call is completed.
+ # finished waits until a client call is completed.
#
- # It blocks until the remote endpoint acknowledges by sending a FINISHED
- # event.
+ # It blocks until the remote endpoint acknowledges by sending a status.
def finished
- ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
- begin
- fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
+ batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE,
+ RECV_STATUS_ON_CLIENT => nil)
+ unless batch_result.status.nil?
if @call.metadata.nil?
- @call.metadata = ev.result.metadata
+ @call.metadata = batch_result.status.metadata
else
- @call.metadata.merge!(ev.result.metadata)
+ @call.metadata.merge!(batch_result.status.metadata)
end
-
- if ev.result.code != Core::StatusCodes::OK
- fail BadStatus.new(ev.result.code, ev.result.details)
- end
- res = ev.result
- ensure
- ev.close
end
- res
+ batch_result.check_status
end
# remote_send sends a request to the remote endpoint.
#
- # It blocks until the remote endpoint acknowledges by sending a
- # WRITE_ACCEPTED. req can be marshalled already.
+ # It blocks until the remote endpoint accepts the message.
#
# @param req [Object, String] the object to send or it's marshal form.
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled = false)
- assert_queue_is_ready
- logger.debug("sending #{req.inspect}, marshalled? #{marshalled}")
+ logger.debug("sending #{req}, marshalled? #{marshalled}")
if marshalled
payload = req
else
payload = @marshal.call(req)
end
- @call.start_write(Core::ByteBuffer.new(payload), self)
-
- # call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
- # until the flow control allows another send on this call.
- ev = @cq.pluck(self, INFINITE_FUTURE)
- begin
- assert_event_type(ev, WRITE_ACCEPTED)
- ensure
- ev.close
- end
+ @call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload)
end
- # send_status sends a status to the remote endpoint
+ # send_status sends a status to the remote endpoint.
#
# @param code [int] the status code to send
# @param details [String] details
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
- def send_status(code = OK, details = '', assert_finished = false)
- assert_queue_is_ready
- @call.start_write_status(code, details, self)
- ev = @cq.pluck(self, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISH_ACCEPTED)
- ensure
- ev.close
- end
- logger.debug("Status sent: #{code}:'#{details}'")
- return finished if assert_finished
+ #
+ # == Keyword Arguments ==
+ # any keyword arguments are treated as metadata to be sent to the server
+ # if a keyword value is a list, multiple metadata for it's key are sent
+ def send_status(code = OK, details = '', assert_finished = false, **kw)
+ ops = {
+ SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, kw)
+ }
+ ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
+ @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
nil
end
# remote_read reads a response from the remote endpoint.
#
- # It blocks until the remote endpoint sends a READ or FINISHED event. On
- # a READ, it returns the response after unmarshalling it. On
- # FINISHED, it returns nil if the status is OK, otherwise raising
- # BadStatus
+ # It blocks until the remote endpoint replies with a message or status.
+ # On receiving a message, it returns the response after unmarshalling it.
+ # On receiving a status, it returns nil if the status is OK, otherwise
+ # raising BadStatus
def remote_read
- if @call.metadata.nil? && !@read_metadata_tag.nil?
- ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE)
- assert_event_type(ev, CLIENT_METADATA_READ)
- @call.metadata = ev.result
- @read_metadata_tag = nil
+ ops = { RECV_MESSAGE => nil }
+ ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil?
+ batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+ unless @metadata_tag.nil?
+ @call.metadata = batch_result.metadata
+ @metadata_tag = nil
end
-
- @call.start_read(self)
- ev = @cq.pluck(self, INFINITE_FUTURE)
- begin
- assert_event_type(ev, READ)
- logger.debug("received req: #{ev.result.inspect}")
- unless ev.result.nil?
- logger.debug("received req.to_s: #{ev.result}")
- res = @unmarshal.call(ev.result.to_s)
- logger.debug("received_req (unmarshalled): #{res.inspect}")
- return res
- end
- ensure
- ev.close
+ logger.debug("received req: #{batch_result}")
+ unless batch_result.nil? || batch_result.message.nil?
+ logger.debug("received req.to_s: #{batch_result.message}")
+ res = @unmarshal.call(batch_result.message)
+ logger.debug("received_req (unmarshalled): #{res.inspect}")
+ return res
end
logger.debug('found nil; the final response has been sent')
nil
@@ -324,7 +262,6 @@ module GRPC
return enum_for(:each_remote_read) unless block_given?
loop do
resp = remote_read
- break if resp.is_a? Struct::Status # is an OK status
break if resp.nil? # the last response was received
yield resp
end
@@ -379,6 +316,9 @@ module GRPC
response = remote_read
finished unless response.is_a? Struct::Status
response
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# client_streamer sends a stream of requests to a GRPC server, and
@@ -402,6 +342,9 @@ module GRPC
response = remote_read
finished unless response.is_a? Struct::Status
response
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# server_streamer sends one request to the GRPC server, which yields a
@@ -428,6 +371,9 @@ module GRPC
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -461,9 +407,11 @@ module GRPC
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started
- bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
- @finished_tag)
+ bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_client(requests, &blk)
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -478,16 +426,16 @@ module GRPC
#
# @param gen_each_reply [Proc] generates the BiDi stream replies
def run_server_bidi(gen_each_reply)
- bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
- @finished_tag)
+ bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_server(gen_each_reply)
end
private
+ # Starts the call if not already started
def start_call(**kw)
- tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
- @finished_tag, @read_metadata_tag = tags
+ return if @started
+ @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
@started = true
end
@@ -505,32 +453,17 @@ module GRPC
# SingleReqView limits access to an ActiveCall's methods for use in server
# handlers that receive just one request.
- SingleReqView = view_class(:cancelled, :deadline, :metadata)
+ SingleReqView = view_class(:cancelled, :deadline, :metadata,
+ :output_metadata)
# MultiReqView limits access to an ActiveCall's methods for use in
# server client_streamer handlers.
MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg,
- :each_remote_read, :metadata)
+ :each_remote_read, :metadata, :output_metadata)
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
- :metadata, :status)
-
- # confirms that no events are enqueued, and that the queue is not
- # shutdown.
- def assert_queue_is_ready
- ev = nil
- begin
- ev = @cq.pluck(self, ZERO)
- fail "unexpected event #{ev.inspect}" unless ev.nil?
- rescue OutOfTime
- logging.debug('timed out waiting for next event')
- # expected, nothing should be on the queue and the deadline was ZERO,
- # except things using another tag
- ensure
- ev.close unless ev.nil?
- end
- end
+ :metadata, :status, :start_call)
end
end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index c66deaae60..4ca3004d6f 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -30,18 +30,12 @@
require 'forwardable'
require 'grpc/grpc'
-def assert_event_type(ev, want)
- fail OutOfTime if ev.nil?
- got = ev.type
- fail("Unexpected rpc event: got #{got}, want #{want}") unless got == want
-end
-
# GRPC contains the General RPC module.
module GRPC
# The BiDiCall class orchestrates exection of a BiDi stream on a client or
# server.
class BidiCall
- include Core::CompletionType
+ include Core::CallOps
include Core::StatusCodes
include Core::TimeConsts
@@ -63,8 +57,7 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
- # @param finished_tag [Object] the object used as the call's finish tag,
- def initialize(call, q, marshal, unmarshal, deadline, finished_tag)
+ def initialize(call, q, marshal, unmarshal, deadline)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
@@ -72,7 +65,6 @@ module GRPC
@call = call
@cq = q
@deadline = deadline
- @finished_tag = finished_tag
@marshal = marshal
@readq = Queue.new
@unmarshal = unmarshal
@@ -86,13 +78,11 @@ module GRPC
# @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield
def run_on_client(requests, &blk)
- enq_th = start_write_loop(requests)
- loop_th = start_read_loop
+ @enq_th = start_write_loop(requests)
+ @loop_th = start_read_loop
replies = each_queued_msg
return replies if blk.nil?
replies.each { |r| blk.call(r) }
- enq_th.join
- loop_th.join
end
# Begins orchestration of the Bidi stream for a server generating replies.
@@ -108,10 +98,8 @@ module GRPC
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
- enq_th = start_write_loop(replys, is_client: false)
- loop_th = start_read_loop
- loop_th.join
- enq_th.join
+ @enq_th = start_write_loop(replys, is_client: false)
+ @loop_th = start_read_loop
end
private
@@ -130,10 +118,12 @@ module GRPC
logger.debug("each_queued_msg: msg##{count}")
count += 1
req = @readq.pop
+ logger.debug("each_queued_msg: req = #{req}")
throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
end
+ @enq_th.join if @enq_th.alive?
end
# during bidi-streaming, read the requests to send from a separate thread
@@ -144,36 +134,23 @@ module GRPC
begin
count = 0
requests.each do |req|
+ logger.debug("bidi-write_loop: #{count}")
count += 1
payload = @marshal.call(req)
- @call.start_write(Core::ByteBuffer.new(payload), write_tag)
- ev = @cq.pluck(write_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, WRITE_ACCEPTED)
- ensure
- ev.close
- end
+ @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_MESSAGE => payload)
end
if is_client
- @call.writes_done(write_tag)
- ev = @cq.pluck(write_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISH_ACCEPTED)
- ensure
- ev.close
- end
- logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
- ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISHED)
- ensure
- ev.close
- end
- logger.debug('bidi-client: finished received')
+ logger.debug("bidi-write-loop: sent #{count}, waiting to finish")
+ batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_CLOSE_FROM_CLIENT => nil,
+ RECV_STATUS_ON_CLIENT => nil)
+ batch_result.check_status
end
rescue StandardError => e
- logger.warn('bidi: write_loop failed')
+ logger.warn('bidi-write_loop: failed')
logger.warn(e)
+ raise e
end
end
end
@@ -187,27 +164,22 @@ module GRPC
# queue the initial read before beginning the loop
loop do
- logger.debug("waiting for read #{count}")
+ logger.debug("bidi-read_loop: #{count}")
count += 1
- @call.start_read(read_tag)
- ev = @cq.pluck(read_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, READ)
-
- # handle the next event.
- if ev.result.nil?
- @readq.push(END_OF_READS)
- logger.debug('done reading!')
- break
- end
-
- # push the latest read onto the queue and continue reading
- logger.debug("received req: #{ev.result}")
- res = @unmarshal.call(ev.result.to_s)
- @readq.push(res)
- ensure
- ev.close
+ # TODO: ensure metadata is read if available, currently it's not
+ batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
+ RECV_MESSAGE => nil)
+ # handle the next message
+ if batch_result.message.nil?
+ @readq.push(END_OF_READS)
+ logger.debug('bidi-read-loop: done reading!')
+ break
end
+
+ # push the latest read onto the queue and continue reading
+ logger.debug("received req: #{batch_result.message}")
+ res = @unmarshal.call(batch_result.message)
+ @readq.push(res)
end
rescue StandardError => e
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index 01328d4a5b..7b2c04aa22 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -28,16 +28,16 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc/generic/active_call'
-require 'xray/thread_dump_signal_handler'
# GRPC contains the General RPC module.
module GRPC
# ClientStub represents an endpoint used to send requests to GRPC servers.
class ClientStub
include Core::StatusCodes
+ include Core::TimeConsts
- # Default deadline is 5 seconds.
- DEFAULT_DEADLINE = 5
+ # Default timeout is 5 seconds.
+ DEFAULT_TIMEOUT = 5
# setup_channel is used by #initialize to constuct a channel from its
# arguments.
@@ -51,6 +51,14 @@ module GRPC
Core::Channel.new(host, kw, creds)
end
+ def self.update_with_jwt_aud_uri(a_hash, host, method)
+ last_slash_idx, res = method.rindex('/'), a_hash.clone
+ return res if last_slash_idx.nil?
+ service_name = method[0..(last_slash_idx - 1)]
+ res[:jwt_aud_uri] = "https://#{host}#{service_name}"
+ res
+ end
+
# check_update_metadata is used by #initialize verify that it's a Proc.
def self.check_update_metadata(update_metadata)
return update_metadata if update_metadata.nil?
@@ -76,8 +84,8 @@ module GRPC
# present the host and arbitrary keyword arg areignored, and the RPC
# connection uses this channel.
#
- # - :deadline
- # when present, this is the default deadline used for calls
+ # - :timeout
+ # when present, this is the default timeout used for calls
#
# - :update_metadata
# when present, this a func that takes a hash and returns a hash
@@ -87,13 +95,13 @@ module GRPC
# @param host [String] the host the stub connects to
# @param q [Core::CompletionQueue] used to wait for events
# @param channel_override [Core::Channel] a pre-created channel
- # @param deadline [Number] the default deadline to use in requests
+ # @param timeout [Number] the default timeout to use in requests
# @param creds [Core::Credentials] the channel
# @param update_metadata a func that updates metadata as described above
# @param kw [KeywordArgs]the channel arguments
def initialize(host, q,
channel_override: nil,
- deadline: DEFAULT_DEADLINE,
+ timeout: nil,
creds: nil,
update_metadata: nil,
**kw)
@@ -103,7 +111,7 @@ module GRPC
@update_metadata = ClientStub.check_update_metadata(update_metadata)
alt_host = kw[Core::Channel::SSL_TARGET]
@host = alt_host.nil? ? host : alt_host
- @deadline = deadline
+ @timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout
end
# request_response sends a request to a GRPC server, and returns the
@@ -140,13 +148,14 @@ module GRPC
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Numeric] (optional) the max completion time in seconds
+ # @param timeout [Numeric] (optional) the max completion time in seconds
# @param return_op [true|false] return an Operation if true
# @return [Object] the response received from the server
- def request_response(method, req, marshal, unmarshal, deadline = nil,
+ def request_response(method, req, marshal, unmarshal, timeout = nil,
return_op: false, **kw)
- c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
- md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+ c = new_active_call(method, marshal, unmarshal, timeout)
+ kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
+ md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.request_response(req, **md) unless return_op
# return the operation view of the active_call; define #execute as a
@@ -197,13 +206,14 @@ module GRPC
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Numeric] the max completion time in seconds
+ # @param timeout [Numeric] the max completion time in seconds
# @param return_op [true|false] return an Operation if true
# @return [Object|Operation] the response received from the server
- def client_streamer(method, requests, marshal, unmarshal, deadline = nil,
+ def client_streamer(method, requests, marshal, unmarshal, timeout = nil,
return_op: false, **kw)
- c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
- md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+ c = new_active_call(method, marshal, unmarshal, timeout)
+ kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
+ md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.client_streamer(requests, **md) unless return_op
# return the operation view of the active_call; define #execute as a
@@ -262,14 +272,15 @@ module GRPC
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Numeric] the max completion time in seconds
+ # @param timeout [Numeric] the max completion time in seconds
# @param return_op [true|false]return an Operation if true
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|Operation|nil] as discussed above
- def server_streamer(method, req, marshal, unmarshal, deadline = nil,
+ def server_streamer(method, req, marshal, unmarshal, timeout = nil,
return_op: false, **kw, &blk)
- c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
- md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+ c = new_active_call(method, marshal, unmarshal, timeout)
+ kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
+ md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.server_streamer(req, **md, &blk) unless return_op
# return the operation view of the active_call; define #execute
@@ -367,14 +378,15 @@ module GRPC
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Numeric] (optional) the max completion time in seconds
+ # @param timeout [Numeric] (optional) the max completion time in seconds
# @param blk [Block] when provided, is executed for each response
# @param return_op [true|false] return an Operation if true
# @return [Enumerator|nil|Operation] as discussed above
- def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil,
+ def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil,
return_op: false, **kw, &blk)
- c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
- md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+ c = new_active_call(method, marshal, unmarshal, timeout)
+ kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
+ md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.bidi_streamer(requests, **md, &blk) unless return_op
# return the operation view of the active_call; define #execute
@@ -390,15 +402,14 @@ module GRPC
# Creates a new active stub
#
- # @param ch [GRPC::Channel] the channel used to create the stub.
+ # @param method [string] the method being called.
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [TimeConst]
- def new_active_call(ch, marshal, unmarshal, deadline = nil)
- absolute_deadline = Core::TimeConsts.from_relative_time(deadline)
- call = @ch.create_call(ch, @host, absolute_deadline)
- ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline,
- started: false)
+ # @param timeout [TimeConst]
+ def new_active_call(method, marshal, unmarshal, timeout = nil)
+ deadline = from_relative_time(timeout.nil? ? @timeout : timeout)
+ call = @ch.create_call(@queue, method, @host, deadline)
+ ActiveCall.new(call, @queue, marshal, unmarshal, deadline, started: false)
end
end
end
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index 2cb3d2eebf..10211ae239 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -80,26 +80,21 @@ module GRPC
else # is a bidi_stream
active_call.run_server_bidi(mth)
end
- send_status(active_call, OK, 'OK')
- active_call.finished
+ send_status(active_call, OK, 'OK', **active_call.output_metadata)
rescue BadStatus => e
- # this is raised by handlers that want GRPC to send an application
- # error code and detail message.
+ # this is raised by handlers that want GRPC to send an application error
+ # code and detail message and some additional app-specific metadata.
logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}")
- send_status(active_call, e.code, e.details)
+ send_status(active_call, e.code, e.details, **e.metadata)
rescue Core::CallError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
logger.warn("failed call: #{active_call}\n#{e}")
- rescue OutOfTime
+ rescue Core::OutOfTime
# This is raised when active_call#method.call exceeeds the deadline
# event. Send a status of deadline exceeded
logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
- rescue Core::EventError => e
- # This is raised by GRPC internals but should rarely, if ever happen.
- # Log it, but don't notify the other endpoint..
- logger.warn("failed call: #{active_call}\n#{e}")
rescue StandardError => e
# This will usuaally be an unhandled error in the handling code.
# Send back a UNKNOWN status to the client
@@ -140,9 +135,9 @@ module GRPC
"##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}"
end
- def send_status(active_client, code, details)
+ def send_status(active_client, code, details, **kw)
details = 'Not sure why' if details.nil?
- active_client.send_status(code, details)
+ active_client.send_status(code, details, code == OK, **kw)
rescue StandardError => e
logger.warn("Could not send status #{code}:#{details}")
logger.warn(e)
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 35e84023be..3375fcf20a 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -31,14 +31,142 @@ require 'grpc/grpc'
require 'grpc/generic/active_call'
require 'grpc/generic/service'
require 'thread'
-require 'xray/thread_dump_signal_handler'
+
+# A global that contains signals the gRPC servers should respond to.
+$grpc_signals = []
# GRPC contains the General RPC module.
module GRPC
+ # Handles the signals in $grpc_signals.
+ #
+ # @return false if the server should exit, true if not.
+ def handle_signals
+ loop do
+ sig = $grpc_signals.shift
+ case sig
+ when 'INT'
+ return false
+ when 'TERM'
+ return false
+ end
+ end
+ true
+ end
+ module_function :handle_signals
+
+ # Sets up a signal handler that adds signals to the signal handling global.
+ #
+ # Signal handlers should do as little as humanly possible.
+ # Here, they just add themselves to $grpc_signals
+ #
+ # RpcServer (and later other parts of gRPC) monitors the signals
+ # $grpc_signals in its own non-signal context.
+ def trap_signals
+ %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
+ end
+ module_function :trap_signals
+
+ # Pool is a simple thread pool.
+ class Pool
+ # Default keep alive period is 1s
+ DEFAULT_KEEP_ALIVE = 1
+
+ def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
+ fail 'pool size must be positive' unless size > 0
+ @jobs = Queue.new
+ @size = size
+ @stopped = false
+ @stop_mutex = Mutex.new
+ @stop_cond = ConditionVariable.new
+ @workers = []
+ @keep_alive = keep_alive
+ end
+
+ # Returns the number of jobs waiting
+ def jobs_waiting
+ @jobs.size
+ end
+
+ # Runs the given block on the queue with the provided args.
+ #
+ # @param args the args passed blk when it is called
+ # @param blk the block to call
+ def schedule(*args, &blk)
+ fail 'already stopped' if @stopped
+ return if blk.nil?
+ logger.info('schedule another job')
+ @jobs << [blk, args]
+ end
+
+ # Starts running the jobs in the thread pool.
+ def start
+ fail 'already stopped' if @stopped
+ until @workers.size == @size.to_i
+ next_thread = Thread.new do
+ catch(:exit) do # allows { throw :exit } to kill a thread
+ loop_execute_jobs
+ end
+ remove_current_thread
+ end
+ @workers << next_thread
+ end
+ end
+
+ # Stops the jobs in the pool
+ def stop
+ logger.info('stopping, will wait for all the workers to exit')
+ @workers.size.times { schedule { throw :exit } }
+ @stopped = true
+ @stop_mutex.synchronize do # wait @keep_alive for works to stop
+ @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
+ end
+ forcibly_stop_workers
+ logger.info('stopped, all workers are shutdown')
+ end
+
+ protected
+
+ # Forcibly shutdown any threads that are still alive.
+ def forcibly_stop_workers
+ return unless @workers.size > 0
+ logger.info("forcibly terminating #{@workers.size} worker(s)")
+ @workers.each do |t|
+ next unless t.alive?
+ begin
+ t.exit
+ rescue StandardError => e
+ logger.warn('error while terminating a worker')
+ logger.warn(e)
+ end
+ end
+ end
+
+ # removes the threads from workers, and signal when all the
+ # threads are complete.
+ def remove_current_thread
+ @stop_mutex.synchronize do
+ @workers.delete(Thread.current)
+ @stop_cond.signal if @workers.size == 0
+ end
+ end
+
+ def loop_execute_jobs
+ loop do
+ begin
+ blk, args = @jobs.pop
+ blk.call(*args)
+ rescue StandardError => e
+ logger.warn('Error in worker thread')
+ logger.warn(e)
+ end
+ end
+ end
+ end
+
# RpcServer hosts a number of services and makes them available on the
# network.
class RpcServer
- include Core::CompletionType
+ include Core::CallOps
include Core::TimeConsts
extend ::Forwardable
@@ -50,6 +178,38 @@ module GRPC
# Default max_waiting_requests size is 20
DEFAULT_MAX_WAITING_REQUESTS = 20
+ # Default poll period is 1s
+ DEFAULT_POLL_PERIOD = 1
+
+ # Signal check period is 0.25s
+ SIGNAL_CHECK_PERIOD = 0.25
+
+ # setup_cq is used by #initialize to constuct a Core::CompletionQueue from
+ # its arguments.
+ def self.setup_cq(alt_cq)
+ return Core::CompletionQueue.new if alt_cq.nil?
+ unless alt_cq.is_a? Core::CompletionQueue
+ fail(TypeError, '!CompletionQueue')
+ end
+ alt_cq
+ end
+
+ # setup_srv is used by #initialize to constuct a Core::Server from its
+ # arguments.
+ def self.setup_srv(alt_srv, cq, **kw)
+ return Core::Server.new(cq, kw) if alt_srv.nil?
+ fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server
+ alt_srv
+ end
+
+ # setup_connect_md_proc is used by #initialize to validate the
+ # connect_md_proc.
+ def self.setup_connect_md_proc(a_proc)
+ return nil if a_proc.nil?
+ fail(TypeError, '!Proc') unless a_proc.is_a? Proc
+ a_proc
+ end
+
# Creates a new RpcServer.
#
# The RPC server is configured using keyword arguments.
@@ -77,30 +237,21 @@ module GRPC
# * max_waiting_requests: the maximum number of requests that are not
# being handled to allow. When this limit is exceeded, the server responds
# with not available to new requests
+ #
+ # * connect_md_proc:
+ # when non-nil is a proc for determining metadata to to send back the client
+ # on receiving an invocation req. The proc signature is:
+ # {key: val, ..} func(method_name, {key: val, ...})
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
- poll_period:INFINITE_FUTURE,
+ poll_period:DEFAULT_POLL_PERIOD,
completion_queue_override:nil,
server_override:nil,
+ connect_md_proc:nil,
**kw)
- if completion_queue_override.nil?
- cq = Core::CompletionQueue.new
- else
- cq = completion_queue_override
- unless cq.is_a? Core::CompletionQueue
- fail(ArgumentError, 'not a CompletionQueue')
- end
- end
- @cq = cq
-
- if server_override.nil?
- srv = Core::Server.new(@cq, kw)
- else
- srv = server_override
- fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server
- end
- @server = srv
-
+ @cq = RpcServer.setup_cq(completion_queue_override)
+ @server = RpcServer.setup_srv(server_override, @cq, **kw)
+ @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
@pool_size = pool_size
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@@ -117,6 +268,13 @@ module GRPC
return unless @running
@stopped = true
@pool.stop
+
+ # TODO: uncomment this:
+ #
+ # This segfaults in the c layer, so its commented out for now. Shutdown
+ # still occurs, but the c layer has to do the cleanup.
+ #
+ # @server.close
end
# determines if the server is currently running
@@ -139,7 +297,21 @@ module GRPC
running?
end
- # determines if the server is currently stopped
+ # Runs the server in its own thread, then waits for signal INT or TERM on
+ # the current thread to terminate it.
+ def run_till_terminated
+ GRPC.trap_signals
+ t = Thread.new { run }
+ wait_till_running
+ loop do
+ sleep SIGNAL_CHECK_PERIOD
+ break unless GRPC.handle_signals
+ end
+ stop
+ t.join
+ end
+
+ # Determines if the server is currently stopped
def stopped?
@stopped ||= false
end
@@ -202,154 +374,71 @@ module GRPC
end
@pool.start
@server.start
- server_tag = Object.new
- until stopped?
- @server.request_call(server_tag)
- ev = @cq.pluck(server_tag, @poll_period)
- next if ev.nil?
- if ev.type != SERVER_RPC_NEW
- logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
- ev.close
- next
- end
- c = new_active_server_call(ev.call, ev.result)
- unless c.nil?
- mth = ev.result.method.to_sym
- ev.close
- @pool.schedule(c) do |call|
- rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
- end
- end
- end
+ loop_handle_server_calls
@running = false
end
- def new_active_server_call(call, new_server_rpc)
- # Accept the call. This is necessary even if a status is to be sent
- # back immediately
- finished_tag = Object.new
- call_queue = Core::CompletionQueue.new
- call.metadata = new_server_rpc.metadata # store the metadata
- call.server_accept(call_queue, finished_tag)
- call.server_end_initial_metadata
-
- # Send UNAVAILABLE if there are too many unprocessed jobs
+ # Sends UNAVAILABLE if there are too many unprocessed jobs
+ def available?(an_rpc)
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
logger.info("waiting: #{jobs_count}, max: #{max}")
- if @pool.jobs_waiting > @max_waiting_requests
- logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}")
- noop = proc { |x| x }
- c = ActiveCall.new(call, call_queue, noop, noop,
- new_server_rpc.deadline,
- finished_tag: finished_tag)
- c.send_status(StatusCodes::UNAVAILABLE, '')
- return nil
- end
-
- # Send NOT_FOUND if the method does not exist
- mth = new_server_rpc.method.to_sym
- unless rpc_descs.key?(mth)
- logger.warn("NOT_FOUND: #{new_server_rpc}")
- noop = proc { |x| x }
- c = ActiveCall.new(call, call_queue, noop, noop,
- new_server_rpc.deadline,
- finished_tag: finished_tag)
- c.send_status(StatusCodes::NOT_FOUND, '')
- return nil
- end
-
- # Create the ActiveCall
- rpc_desc = rpc_descs[mth]
- logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})")
- ActiveCall.new(call, call_queue,
- rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
- new_server_rpc.deadline, finished_tag: finished_tag)
+ return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
+ logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
+ noop = proc { |x| x }
+ c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
+ c.send_status(StatusCodes::UNAVAILABLE, '')
+ nil
end
- # Pool is a simple thread pool for running server requests.
- class Pool
- def initialize(size)
- fail 'pool size must be positive' unless size > 0
- @jobs = Queue.new
- @size = size
- @stopped = false
- @stop_mutex = Mutex.new
- @stop_cond = ConditionVariable.new
- @workers = []
- end
-
- # Returns the number of jobs waiting
- def jobs_waiting
- @jobs.size
- end
-
- # Runs the given block on the queue with the provided args.
- #
- # @param args the args passed blk when it is called
- # @param blk the block to call
- def schedule(*args, &blk)
- fail 'already stopped' if @stopped
- return if blk.nil?
- logger.info('schedule another job')
- @jobs << [blk, args]
- end
+ # Sends NOT_FOUND if the method can't be found
+ def found?(an_rpc)
+ mth = an_rpc.method.to_sym
+ return an_rpc if rpc_descs.key?(mth)
+ logger.warn("NOT_FOUND: #{an_rpc}")
+ noop = proc { |x| x }
+ c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
+ c.send_status(StatusCodes::NOT_FOUND, '')
+ nil
+ end
- # Starts running the jobs in the thread pool.
- def start
- fail 'already stopped' if @stopped
- until @workers.size == @size.to_i
- next_thread = Thread.new do
- catch(:exit) do # allows { throw :exit } to kill a thread
- loop do
- begin
- blk, args = @jobs.pop
- blk.call(*args)
- rescue StandardError => e
- logger.warn('Error in worker thread')
- logger.warn(e)
- end
- end
- end
-
- # removes the threads from workers, and signal when all the
- # threads are complete.
- @stop_mutex.synchronize do
- @workers.delete(Thread.current)
- @stop_cond.signal if @workers.size == 0
- end
+ # handles calls to the server
+ def loop_handle_server_calls
+ fail 'not running' unless @running
+ request_call_tag = Object.new
+ until stopped?
+ deadline = from_relative_time(@poll_period)
+ an_rpc = @server.request_call(@cq, request_call_tag, deadline)
+ c = new_active_server_call(an_rpc)
+ unless c.nil?
+ mth = an_rpc.method.to_sym
+ @pool.schedule(c) do |call|
+ rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
end
- @workers << next_thread
end
end
+ end
- # Stops the jobs in the pool
- def stop
- logger.info('stopping, will wait for all the workers to exit')
- @workers.size.times { schedule { throw :exit } }
- @stopped = true
-
- # TODO: allow configuration of the keepalive period
- keep_alive = 5
- @stop_mutex.synchronize do
- @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0
- end
-
- # Forcibly shutdown any threads that are still alive.
- if @workers.size > 0
- logger.warn("forcibly terminating #{@workers.size} worker(s)")
- @workers.each do |t|
- next unless t.alive?
- begin
- t.exit
- rescue StandardError => e
- logger.warn('error while terminating a worker')
- logger.warn(e)
- end
- end
- end
+ def new_active_server_call(an_rpc)
+ return nil if an_rpc.nil? || an_rpc.call.nil?
- logger.info('stopped, all workers are shutdown')
+ # allow the metadata to be accessed from the call
+ handle_call_tag = Object.new
+ an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers
+ connect_md = nil
+ unless @connect_md_proc.nil?
+ connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
end
+ an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE,
+ SEND_INITIAL_METADATA => connect_md)
+ return nil unless available?(an_rpc)
+ return nil unless found?(an_rpc)
+
+ # Create the ActiveCall
+ logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
+ rpc_desc = rpc_descs[an_rpc.method.to_sym]
+ ActiveCall.new(an_rpc.call, @cq,
+ rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
+ an_rpc.deadline)
end
protected
@@ -362,11 +451,9 @@ module GRPC
@rpc_handlers ||= {}
end
- private
-
def assert_valid_service_class(cls)
unless cls.include?(GenericService)
- fail "#{cls} should 'include GenericService'"
+ fail "#{cls} must 'include GenericService'"
end
if cls.rpc_descs.size == 0
fail "#{cls} should specify some rpc descriptions"
@@ -376,21 +463,17 @@ module GRPC
def add_rpc_descs_for(service)
cls = service.is_a?(Class) ? service : service.class
- specs = rpc_descs
- handlers = rpc_handlers
+ specs, handlers = rpc_descs, rpc_handlers
cls.rpc_descs.each_pair do |name, spec|
route = "/#{cls.service_name}/#{name}".to_sym
- if specs.key? route
- fail "Cannot add rpc #{route} from #{spec}, already registered"
+ fail "already registered: rpc #{route} from #{spec}" if specs.key? route
+ specs[route] = spec
+ if service.is_a?(Class)
+ handlers[route] = cls.new.method(name.to_s.underscore.to_sym)
else
- specs[route] = spec
- if service.is_a?(Class)
- handlers[route] = cls.new.method(name.to_s.underscore.to_sym)
- else
- handlers[route] = service.method(name.to_s.underscore.to_sym)
- end
- logger.info("handling #{route} with #{handlers[route]}")
+ handlers[route] = service.method(name.to_s.underscore.to_sym)
end
+ logger.info("handling #{route} with #{handlers[route]}")
end
end
end
diff --git a/src/ruby/lib/grpc/core/event.rb b/src/ruby/lib/grpc/notifier.rb
index 194aa8ecac..caa18bbed6 100644
--- a/src/ruby/lib/grpc/core/event.rb
+++ b/src/ruby/lib/grpc/notifier.rb
@@ -27,17 +27,33 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-require 'grpc'
-
# GRPC contains the General RPC module.
module GRPC
- module Core
- # Event is a class defined in the c extension
- #
- # Here, we add an inspect method.
- class Event
- def inspect
- "<#{self.class}: type:#{type}, tag:#{tag} result:#{result}>"
+ # Notifier is useful high-level synchronization primitive.
+ class Notifier
+ attr_reader :payload, :notified
+ alias_method :notified?, :notified
+
+ def initialize
+ @mutex = Mutex.new
+ @cvar = ConditionVariable.new
+ @notified = false
+ @payload = nil
+ end
+
+ def wait
+ @mutex.synchronize do
+ @cvar.wait(@mutex) until notified?
+ end
+ end
+
+ def notify(payload)
+ @mutex.synchronize do
+ return Error.new('already notified') if notified?
+ @payload = payload
+ @notified = true
+ @cvar.signal
+ return nil
end
end
end
diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb
index 513a53724f..072fb9b1aa 100644
--- a/src/ruby/lib/grpc/version.rb
+++ b/src/ruby/lib/grpc/version.rb
@@ -29,5 +29,5 @@
# GRPC contains the General RPC module.
module GRPC
- VERSION = '0.5.0'
+ VERSION = '0.6.1'
end
diff --git a/src/ruby/spec/alloc_spec.rb b/src/ruby/spec/alloc_spec.rb
deleted file mode 100644
index 88e7e2b3e7..0000000000
--- a/src/ruby/spec/alloc_spec.rb
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-require 'grpc'
-
-describe 'Wrapped classes where .new cannot create an instance' do
- describe GRPC::Core::Event do
- it 'should fail .new fail with a runtime error' do
- expect { GRPC::Core::Event.new }.to raise_error(TypeError)
- end
- end
-
- describe GRPC::Core::Call do
- it 'should fail .new fail with a runtime error' do
- expect { GRPC::Core::Event.new }.to raise_error(TypeError)
- end
- end
-end
diff --git a/src/ruby/spec/byte_buffer_spec.rb b/src/ruby/spec/byte_buffer_spec.rb
deleted file mode 100644
index e1833ebb3a..0000000000
--- a/src/ruby/spec/byte_buffer_spec.rb
+++ /dev/null
@@ -1,67 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-require 'grpc'
-
-describe GRPC::Core::ByteBuffer do
- describe '#new' do
- it 'is constructed from a string' do
- expect { GRPC::Core::ByteBuffer.new('#new') }.not_to raise_error
- end
-
- it 'can be constructed from the empty string' do
- expect { GRPC::Core::ByteBuffer.new('') }.not_to raise_error
- end
-
- it 'cannot be constructed from nil' do
- expect { GRPC::Core::ByteBuffer.new(nil) }.to raise_error TypeError
- end
-
- it 'cannot be constructed from non-strings' do
- [1, Object.new, :a_symbol].each do |x|
- expect { GRPC::Core::ByteBuffer.new(x) }.to raise_error TypeError
- end
- end
- end
-
- describe '#to_s' do
- it 'is the string value the ByteBuffer was constructed with' do
- expect(GRPC::Core::ByteBuffer.new('#to_s').to_s).to eq('#to_s')
- end
- end
-
- describe '#dup' do
- it 'makes an instance whose #to_s is the original string value' do
- bb = GRPC::Core::ByteBuffer.new('#dup')
- a_copy = bb.dup
- expect(a_copy.to_s).to eq('#dup')
- expect(a_copy.dup.to_s).to eq('#dup')
- end
- end
-end
diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb
index 2617564571..4977c10a7e 100644
--- a/src/ruby/spec/call_spec.rb
+++ b/src/ruby/spec/call_spec.rb
@@ -66,51 +66,34 @@ describe GRPC::Core::RpcErrors do
end
end
-describe GRPC::Core::Call do
+describe GRPC::Core::CallOps do
before(:each) do
- @tag = Object.new
- @client_queue = GRPC::Core::CompletionQueue.new
- fake_host = 'localhost:10101'
- @ch = GRPC::Core::Channel.new(fake_host, nil)
- end
-
- describe '#start_read' do
- xit 'should fail if called immediately' do
- blk = proc { make_test_call.start_read(@tag) }
- expect(&blk).to raise_error GRPC::Core::CallError
- end
- end
-
- describe '#start_write' do
- xit 'should fail if called immediately' do
- bytes = GRPC::Core::ByteBuffer.new('test string')
- blk = proc { make_test_call.start_write(bytes, @tag) }
- expect(&blk).to raise_error GRPC::Core::CallError
- end
+ @known_types = {
+ SEND_INITIAL_METADATA: 0,
+ SEND_MESSAGE: 1,
+ SEND_CLOSE_FROM_CLIENT: 2,
+ SEND_STATUS_FROM_SERVER: 3,
+ RECV_INITIAL_METADATA: 4,
+ RECV_MESSAGE: 5,
+ RECV_STATUS_ON_CLIENT: 6,
+ RECV_CLOSE_ON_SERVER: 7
+ }
end
- describe '#start_write_status' do
- xit 'should fail if called immediately' do
- blk = proc { make_test_call.start_write_status(153, 'x', @tag) }
- expect(&blk).to raise_error GRPC::Core::CallError
- end
+ it 'should have symbols for all the known operation types' do
+ m = GRPC::Core::CallOps
+ syms_and_codes = m.constants.collect { |c| [c, m.const_get(c)] }
+ expect(Hash[syms_and_codes]).to eq(@known_types)
end
+end
- describe '#writes_done' do
- xit 'should fail if called immediately' do
- blk = proc { make_test_call.writes_done(Object.new) }
- expect(&blk).to raise_error GRPC::Core::CallError
- end
- end
+describe GRPC::Core::Call do
+ let(:client_queue) { GRPC::Core::CompletionQueue.new }
+ let(:test_tag) { Object.new }
+ let(:fake_host) { 'localhost:10101' }
- describe '#add_metadata' do
- it 'adds metadata to a call without fail' do
- call = make_test_call
- n = 37
- one_md = proc { |x| [sprintf('key%d', x), sprintf('value%d', x)] }
- metadata = Hash[n.times.collect { |i| one_md.call i }]
- expect { call.add_metadata(metadata) }.to_not raise_error
- end
+ before(:each) do
+ @ch = GRPC::Core::Channel.new(fake_host, nil)
end
describe '#status' do
@@ -154,7 +137,7 @@ describe GRPC::Core::Call do
end
def make_test_call
- @ch.create_call('dummy_method', 'dummy_host', deadline)
+ @ch.create_call(client_queue, 'dummy_method', 'dummy_host', deadline)
end
def deadline
diff --git a/src/ruby/spec/channel_spec.rb b/src/ruby/spec/channel_spec.rb
index af73294abe..d471ff5db6 100644
--- a/src/ruby/spec/channel_spec.rb
+++ b/src/ruby/spec/channel_spec.rb
@@ -36,16 +36,13 @@ def load_test_certs
end
describe GRPC::Core::Channel do
- FAKE_HOST = 'localhost:0'
+ let(:fake_host) { 'localhost:0' }
+ let(:cq) { GRPC::Core::CompletionQueue.new }
def create_test_cert
GRPC::Core::Credentials.new(load_test_certs[0])
end
- before(:each) do
- @cq = GRPC::Core::CompletionQueue.new
- end
-
shared_examples '#new' do
it 'take a host name without channel args' do
expect { GRPC::Core::Channel.new('dummy_host', nil) }.not_to raise_error
@@ -61,7 +58,7 @@ describe GRPC::Core::Channel do
it 'does not take a hash with bad values as channel args' do
blk = construct_with_args(symbol: Object.new)
expect(&blk).to raise_error TypeError
- blk = construct_with_args('1' => Hash.new)
+ blk = construct_with_args('1' => {})
expect(&blk).to raise_error TypeError
end
@@ -115,25 +112,23 @@ describe GRPC::Core::Channel do
describe '#create_call' do
it 'creates a call OK' do
- host = FAKE_HOST
- ch = GRPC::Core::Channel.new(host, nil)
+ ch = GRPC::Core::Channel.new(fake_host, nil)
deadline = Time.now + 5
blk = proc do
- ch.create_call('dummy_method', 'dummy_host', deadline)
+ ch.create_call(cq, 'dummy_method', 'dummy_host', deadline)
end
expect(&blk).to_not raise_error
end
it 'raises an error if called on a closed channel' do
- host = FAKE_HOST
- ch = GRPC::Core::Channel.new(host, nil)
+ ch = GRPC::Core::Channel.new(fake_host, nil)
ch.close
deadline = Time.now + 5
blk = proc do
- ch.create_call('dummy_method', 'dummy_host', deadline)
+ ch.create_call(cq, 'dummy_method', 'dummy_host', deadline)
end
expect(&blk).to raise_error(RuntimeError)
end
@@ -141,15 +136,13 @@ describe GRPC::Core::Channel do
describe '#destroy' do
it 'destroys a channel ok' do
- host = FAKE_HOST
- ch = GRPC::Core::Channel.new(host, nil)
+ ch = GRPC::Core::Channel.new(fake_host, nil)
blk = proc { ch.destroy }
expect(&blk).to_not raise_error
end
it 'can be called more than once without error' do
- host = FAKE_HOST
- ch = GRPC::Core::Channel.new(host, nil)
+ ch = GRPC::Core::Channel.new(fake_host, nil)
blk = proc { ch.destroy }
blk.call
expect(&blk).to_not raise_error
@@ -164,15 +157,13 @@ describe GRPC::Core::Channel do
describe '#close' do
it 'closes a channel ok' do
- host = FAKE_HOST
- ch = GRPC::Core::Channel.new(host, nil)
+ ch = GRPC::Core::Channel.new(fake_host, nil)
blk = proc { ch.close }
expect(&blk).to_not raise_error
end
it 'can be called more than once without error' do
- host = FAKE_HOST
- ch = GRPC::Core::Channel.new(host, nil)
+ ch = GRPC::Core::Channel.new(fake_host, nil)
blk = proc { ch.close }
blk.call
expect(&blk).to_not raise_error
diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb
index 49a2d3bb4d..68af79f907 100644
--- a/src/ruby/spec/client_server_spec.rb
+++ b/src/ruby/spec/client_server_spec.rb
@@ -30,7 +30,6 @@
require 'grpc'
require 'spec_helper'
-include GRPC::Core::CompletionType
include GRPC::Core
def load_test_certs
@@ -40,6 +39,8 @@ def load_test_certs
end
shared_context 'setup: tags' do
+ let(:sent_message) { 'sent message' }
+ let(:reply_text) { 'the reply' }
before(:example) do
@server_finished_tag = Object.new
@client_finished_tag = Object.new
@@ -52,153 +53,136 @@ shared_context 'setup: tags' do
Time.now + 2
end
- def expect_next_event_on(queue, type, tag)
- ev = queue.pluck(tag, deadline)
- if type.nil?
- expect(ev).to be_nil
- else
- expect(ev).to_not be_nil
- expect(ev.type).to be(type)
- end
- ev
- end
-
def server_allows_client_to_proceed
- @server.request_call(@server_tag)
- ev = @server_queue.pluck(@server_tag, deadline)
- expect(ev).not_to be_nil
- expect(ev.type).to be(SERVER_RPC_NEW)
- server_call = ev.call
- server_call.server_accept(@server_queue, @server_finished_tag)
- server_call.server_end_initial_metadata
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ expect(recvd_rpc).to_not eq nil
+ server_call = recvd_rpc.call
+ ops = { CallOps::SEND_INITIAL_METADATA => {} }
+ svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline, ops)
+ expect(svr_batch.send_metadata).to be true
server_call
end
- def server_responds_with(server_call, reply_text)
- reply = ByteBuffer.new(reply_text)
- server_call.start_read(@server_tag)
- ev = @server_queue.pluck(@server_tag, TimeConsts::INFINITE_FUTURE)
- expect(ev.type).to be(READ)
- server_call.start_write(reply, @server_tag)
- ev = @server_queue.pluck(@server_tag, TimeConsts::INFINITE_FUTURE)
- expect(ev).not_to be_nil
- expect(ev.type).to be(WRITE_ACCEPTED)
- end
-
- def client_sends(call, sent = 'a message')
- req = ByteBuffer.new(sent)
- call.start_write(req, @tag)
- ev = @client_queue.pluck(@tag, TimeConsts::INFINITE_FUTURE)
- expect(ev).not_to be_nil
- expect(ev.type).to be(WRITE_ACCEPTED)
- sent
- end
-
def new_client_call
- @ch.create_call('/method', 'foo.test.google.fr', deadline)
+ @ch.create_call(@client_queue, '/method', 'foo.test.google.fr', deadline)
end
end
shared_examples 'basic GRPC message delivery is OK' do
+ include GRPC::Core
include_context 'setup: tags'
- it 'servers receive requests from clients and start responding' do
- reply = ByteBuffer.new('the server payload')
+ it 'servers receive requests from clients and can respond' do
call = new_client_call
- call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag)
-
- # check the server rpc new was received
- # @server.request_call(@server_tag)
- # ev = expect_next_event_on(@server_queue, SERVER_RPC_NEW, @server_tag)
-
- # accept the call
- # server_call = ev.call
- # server_call.server_accept(@server_queue, @server_finished_tag)
- # server_call.server_end_initial_metadata
- server_call = server_allows_client_to_proceed
-
- # client sends a message
- msg = client_sends(call)
+ client_ops = {
+ CallOps::SEND_INITIAL_METADATA => {},
+ CallOps::SEND_MESSAGE => sent_message
+ }
+ batch_result = call.run_batch(@client_queue, @client_tag, deadline,
+ client_ops)
+ expect(batch_result.send_metadata).to be true
+ expect(batch_result.send_message).to be true
# confirm the server can read the inbound message
- server_call.start_read(@server_tag)
- ev = expect_next_event_on(@server_queue, READ, @server_tag)
- expect(ev.result.to_s).to eq(msg)
-
- # the server response
- server_call.start_write(reply, @server_tag)
- expect_next_event_on(@server_queue, WRITE_ACCEPTED, @server_tag)
+ server_call = server_allows_client_to_proceed
+ server_ops = {
+ CallOps::RECV_MESSAGE => nil
+ }
+ svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
+ server_ops)
+ expect(svr_batch.message).to eq(sent_message)
end
it 'responses written by servers are received by the client' do
call = new_client_call
- call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag)
- server_call = server_allows_client_to_proceed
- client_sends(call)
- server_responds_with(server_call, 'server_response')
+ client_ops = {
+ CallOps::SEND_INITIAL_METADATA => {},
+ CallOps::SEND_MESSAGE => sent_message
+ }
+ batch_result = call.run_batch(@client_queue, @client_tag, deadline,
+ client_ops)
+ expect(batch_result.send_metadata).to be true
+ expect(batch_result.send_message).to be true
- call.start_read(@tag)
- ev = expect_next_event_on(@client_queue, READ, @tag)
- expect(ev.result.to_s).to eq('server_response')
+ # confirm the server can read the inbound message
+ server_call = server_allows_client_to_proceed
+ server_ops = {
+ CallOps::RECV_MESSAGE => nil,
+ CallOps::SEND_MESSAGE => reply_text
+ }
+ svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
+ server_ops)
+ expect(svr_batch.message).to eq(sent_message)
+ expect(svr_batch.send_message).to be true
end
it 'servers can ignore a client write and send a status' do
call = new_client_call
- call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag)
-
- # check the server rpc new was received
- @server.request_call(@server_tag)
- ev = expect_next_event_on(@server_queue, SERVER_RPC_NEW, @server_tag)
- expect(ev.tag).to be(@server_tag)
-
- # accept the call - need to do this to sent status.
- server_call = ev.call
- server_call.server_accept(@server_queue, @server_finished_tag)
- server_call.server_end_initial_metadata
- server_call.start_write_status(StatusCodes::NOT_FOUND, 'not found',
- @server_tag)
-
- # Client sends some data
- client_sends(call)
-
- # client gets an empty response for the read, preceeded by some metadata.
- call.start_read(@tag)
- expect_next_event_on(@client_queue, CLIENT_METADATA_READ,
- @client_metadata_tag)
- ev = expect_next_event_on(@client_queue, READ, @tag)
- expect(ev.tag).to be(@tag)
- expect(ev.result.to_s).to eq('')
-
- # finally, after client sends writes_done, they get the finished.
- call.writes_done(@tag)
- expect_next_event_on(@client_queue, FINISH_ACCEPTED, @tag)
- ev = expect_next_event_on(@client_queue, FINISHED, @client_finished_tag)
- expect(ev.result.code).to eq(StatusCodes::NOT_FOUND)
+ client_ops = {
+ CallOps::SEND_INITIAL_METADATA => {},
+ CallOps::SEND_MESSAGE => sent_message
+ }
+ batch_result = call.run_batch(@client_queue, @client_tag, deadline,
+ client_ops)
+ expect(batch_result.send_metadata).to be true
+ expect(batch_result.send_message).to be true
+
+ # confirm the server can read the inbound message
+ the_status = Struct::Status.new(StatusCodes::OK, 'OK')
+ server_call = server_allows_client_to_proceed
+ server_ops = {
+ CallOps::SEND_STATUS_FROM_SERVER => the_status
+ }
+ svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
+ server_ops)
+ expect(svr_batch.message).to eq nil
+ expect(svr_batch.send_status).to be true
end
it 'completes calls by sending status to client and server' do
call = new_client_call
- call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag)
+ client_ops = {
+ CallOps::SEND_INITIAL_METADATA => {},
+ CallOps::SEND_MESSAGE => sent_message
+ }
+ batch_result = call.run_batch(@client_queue, @client_tag, deadline,
+ client_ops)
+ expect(batch_result.send_metadata).to be true
+ expect(batch_result.send_message).to be true
+
+ # confirm the server can read the inbound message and respond
+ the_status = Struct::Status.new(StatusCodes::OK, 'OK', {})
server_call = server_allows_client_to_proceed
- client_sends(call)
- server_responds_with(server_call, 'server_response')
- server_call.start_write_status(10_101, 'status code is 10101', @server_tag)
-
- # first the client says writes are done
- call.start_read(@tag)
- expect_next_event_on(@client_queue, READ, @tag)
- call.writes_done(@tag)
-
- # but nothing happens until the server sends a status
- expect_next_event_on(@server_queue, FINISH_ACCEPTED, @server_tag)
- ev = expect_next_event_on(@server_queue, FINISHED, @server_finished_tag)
- expect(ev.result).to be_a(Struct::Status)
-
- # client gets FINISHED
- expect_next_event_on(@client_queue, FINISH_ACCEPTED, @tag)
- ev = expect_next_event_on(@client_queue, FINISHED, @client_finished_tag)
- expect(ev.result.details).to eq('status code is 10101')
- expect(ev.result.code).to eq(10_101)
+ server_ops = {
+ CallOps::RECV_MESSAGE => nil,
+ CallOps::SEND_MESSAGE => reply_text,
+ CallOps::SEND_STATUS_FROM_SERVER => the_status
+ }
+ svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
+ server_ops)
+ expect(svr_batch.message).to eq sent_message
+ expect(svr_batch.send_status).to be true
+ expect(svr_batch.send_message).to be true
+
+ # confirm the client can receive the server response and status.
+ client_ops = {
+ CallOps::SEND_CLOSE_FROM_CLIENT => nil,
+ CallOps::RECV_MESSAGE => nil,
+ CallOps::RECV_STATUS_ON_CLIENT => nil
+ }
+ batch_result = call.run_batch(@client_queue, @client_tag, deadline,
+ client_ops)
+ expect(batch_result.send_close).to be true
+ expect(batch_result.message).to eq reply_text
+ expect(batch_result.status).to eq the_status
+
+ # confirm the server can receive the client close.
+ server_ops = {
+ CallOps::RECV_CLOSE_ON_SERVER => nil
+ }
+ svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
+ server_ops)
+ expect(svr_batch.send_close).to be true
end
end
@@ -208,11 +192,11 @@ shared_examples 'GRPC metadata delivery works OK' do
describe 'from client => server' do
before(:example) do
n = 7 # arbitrary number of metadata
- diff_keys_fn = proc { |i| [sprintf('k%d', i), sprintf('v%d', i)] }
+ diff_keys_fn = proc { |i| [format('k%d', i), format('v%d', i)] }
diff_keys = Hash[n.times.collect { |x| diff_keys_fn.call x }]
- null_vals_fn = proc { |i| [sprintf('k%d', i), sprintf('v\0%d', i)] }
+ null_vals_fn = proc { |i| [format('k%d', i), format('v\0%d', i)] }
null_vals = Hash[n.times.collect { |x| null_vals_fn.call x }]
- same_keys_fn = proc { |i| [sprintf('k%d', i), [sprintf('v%d', i)] * n] }
+ same_keys_fn = proc { |i| [format('k%d', i), [format('v%d', i)] * n] }
same_keys = Hash[n.times.collect { |x| same_keys_fn.call x }]
symbol_key = { a_key: 'a val' }
@valid_metadata = [diff_keys, same_keys, null_vals, symbol_key]
@@ -224,25 +208,33 @@ shared_examples 'GRPC metadata delivery works OK' do
it 'raises an exception if a metadata key is invalid' do
@bad_keys.each do |md|
call = new_client_call
- expect { call.add_metadata(md) }.to raise_error
+ client_ops = {
+ CallOps::SEND_INITIAL_METADATA => md
+ }
+ blk = proc do
+ call.run_batch(@client_queue, @client_tag, deadline,
+ client_ops)
+ end
+ expect(&blk).to raise_error
end
end
it 'sends all the metadata pairs when keys and values are valid' do
@valid_metadata.each do |md|
call = new_client_call
- call.add_metadata(md)
-
- # Client begins a call OK
- call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag)
-
- # ... server has all metadata available even though the client did not
- # send a write
- @server.request_call(@server_tag)
- ev = expect_next_event_on(@server_queue, SERVER_RPC_NEW, @server_tag)
+ client_ops = {
+ CallOps::SEND_INITIAL_METADATA => md
+ }
+ batch_result = call.run_batch(@client_queue, @client_tag, deadline,
+ client_ops)
+ expect(batch_result.send_metadata).to be true
+
+ # confirm the server can receive the client metadata
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ expect(recvd_rpc).to_not eq nil
+ recvd_md = recvd_rpc.metadata
replace_symbols = Hash[md.each_pair.collect { |x, y| [x.to_s, y] }]
- result = ev.result.metadata
- expect(result.merge(replace_symbols)).to eq(result)
+ expect(recvd_md).to eq(recvd_md.merge(replace_symbols))
end
end
end
@@ -250,11 +242,11 @@ shared_examples 'GRPC metadata delivery works OK' do
describe 'from server => client' do
before(:example) do
n = 7 # arbitrary number of metadata
- diff_keys_fn = proc { |i| [sprintf('k%d', i), sprintf('v%d', i)] }
+ diff_keys_fn = proc { |i| [format('k%d', i), format('v%d', i)] }
diff_keys = Hash[n.times.collect { |x| diff_keys_fn.call x }]
- null_vals_fn = proc { |i| [sprintf('k%d', i), sprintf('v\0%d', i)] }
+ null_vals_fn = proc { |i| [format('k%d', i), format('v\0%d', i)] }
null_vals = Hash[n.times.collect { |x| null_vals_fn.call x }]
- same_keys_fn = proc { |i| [sprintf('k%d', i), [sprintf('v%d', i)] * n] }
+ same_keys_fn = proc { |i| [format('k%d', i), [format('v%d', i)] * n] }
same_keys = Hash[n.times.collect { |x| same_keys_fn.call x }]
symbol_key = { a_key: 'a val' }
@valid_metadata = [diff_keys, same_keys, null_vals, symbol_key]
@@ -266,55 +258,81 @@ shared_examples 'GRPC metadata delivery works OK' do
it 'raises an exception if a metadata key is invalid' do
@bad_keys.each do |md|
call = new_client_call
- call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag)
+ # client signals that it's done sending metadata to allow server to
+ # respond
+ client_ops = {
+ CallOps::SEND_INITIAL_METADATA => nil
+ }
+ call.run_batch(@client_queue, @client_tag, deadline, client_ops)
# server gets the invocation
- @server.request_call(@server_tag)
- ev = expect_next_event_on(@server_queue, SERVER_RPC_NEW, @server_tag)
- expect { ev.call.add_metadata(md) }.to raise_error
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ expect(recvd_rpc).to_not eq nil
+ server_ops = {
+ CallOps::SEND_INITIAL_METADATA => md
+ }
+ blk = proc do
+ recvd_rpc.call.run_batch(@server_queue, @server_tag, deadline,
+ server_ops)
+ end
+ expect(&blk).to raise_error
end
end
- it 'sends a hash that contains the status when no metadata is added' do
+ it 'sends an empty hash if no metadata is added' do
call = new_client_call
- call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag)
-
- # server gets the invocation
- @server.request_call(@server_tag)
- ev = expect_next_event_on(@server_queue, SERVER_RPC_NEW, @server_tag)
- server_call = ev.call
-
- # ... server accepts the call without adding metadata
- server_call.server_accept(@server_queue, @server_finished_tag)
- server_call.server_end_initial_metadata
-
- # there is the HTTP status metadata, though there should not be any
- # TODO: update this with the bug number to be resolved
- ev = expect_next_event_on(@client_queue, CLIENT_METADATA_READ,
- @client_metadata_tag)
- expect(ev.result).to eq({})
+ # client signals that it's done sending metadata to allow server to
+ # respond
+ client_ops = {
+ CallOps::SEND_INITIAL_METADATA => nil
+ }
+ call.run_batch(@client_queue, @client_tag, deadline, client_ops)
+
+ # server gets the invocation but sends no metadata back
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ expect(recvd_rpc).to_not eq nil
+ server_call = recvd_rpc.call
+ server_ops = {
+ CallOps::SEND_INITIAL_METADATA => nil
+ }
+ server_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+
+ # client receives nothing as expected
+ client_ops = {
+ CallOps::RECV_INITIAL_METADATA => nil
+ }
+ batch_result = call.run_batch(@client_queue, @client_tag, deadline,
+ client_ops)
+ expect(batch_result.metadata).to eq({})
end
it 'sends all the pairs when keys and values are valid' do
@valid_metadata.each do |md|
call = new_client_call
- call.invoke(@client_queue, @client_metadata_tag, @client_finished_tag)
-
- # server gets the invocation
- @server.request_call(@server_tag)
- ev = expect_next_event_on(@server_queue, SERVER_RPC_NEW, @server_tag)
- server_call = ev.call
-
- # ... server adds metadata and accepts the call
- server_call.add_metadata(md)
- server_call.server_accept(@server_queue, @server_finished_tag)
- server_call.server_end_initial_metadata
-
- # Now the client can read the metadata
- ev = expect_next_event_on(@client_queue, CLIENT_METADATA_READ,
- @client_metadata_tag)
+ # client signals that it's done sending metadata to allow server to
+ # respond
+ client_ops = {
+ CallOps::SEND_INITIAL_METADATA => nil
+ }
+ call.run_batch(@client_queue, @client_tag, deadline, client_ops)
+
+ # server gets the invocation but sends no metadata back
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ expect(recvd_rpc).to_not eq nil
+ server_call = recvd_rpc.call
+ server_ops = {
+ CallOps::SEND_INITIAL_METADATA => md
+ }
+ server_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+
+ # client receives nothing as expected
+ client_ops = {
+ CallOps::RECV_INITIAL_METADATA => nil
+ }
+ batch_result = call.run_batch(@client_queue, @client_tag, deadline,
+ client_ops)
replace_symbols = Hash[md.each_pair.collect { |x, y| [x.to_s, y] }]
- expect(ev.result).to eq(replace_symbols)
+ expect(batch_result.metadata).to eq(replace_symbols)
end
end
end
diff --git a/src/ruby/spec/credentials_spec.rb b/src/ruby/spec/credentials_spec.rb
index fc97d11a87..8e72e85d54 100644
--- a/src/ruby/spec/credentials_spec.rb
+++ b/src/ruby/spec/credentials_spec.rb
@@ -61,11 +61,11 @@ describe Credentials do
end
describe '#compose' do
- it 'can be completed OK' do
+ it 'cannot be completed OK with 2 SSL creds' do
certs = load_test_certs
cred1 = Credentials.new(*certs)
cred2 = Credentials.new(*certs)
- expect { cred1.compose(cred2) }.to_not raise_error
+ expect { cred1.compose(cred2) }.to raise_error
end
end
end
diff --git a/src/ruby/spec/event_spec.rb b/src/ruby/spec/event_spec.rb
deleted file mode 100644
index 7d92fcd792..0000000000
--- a/src/ruby/spec/event_spec.rb
+++ /dev/null
@@ -1,53 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-require 'grpc'
-
-describe GRPC::Core::CompletionType do
- before(:each) do
- @known_types = {
- QUEUE_SHUTDOWN: 0,
- OP_COMPLETE: 1,
- READ: 2,
- WRITE_ACCEPTED: 3,
- FINISH_ACCEPTED: 4,
- CLIENT_METADATA_READ: 5,
- FINISHED: 6,
- SERVER_RPC_NEW: 7,
- SERVER_SHUTDOWN: 8,
- RESERVED: 9
- }
- end
-
- it 'should have all the known types' do
- mod = GRPC::Core::CompletionType
- blk = proc { Hash[mod.constants.collect { |c| [c, mod.const_get(c)] }] }
- expect(blk.call).to eq(@known_types)
- end
-end
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 96e07cacb4..575871afb1 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -34,12 +34,11 @@ include GRPC::Core::StatusCodes
describe GRPC::ActiveCall do
ActiveCall = GRPC::ActiveCall
Call = GRPC::Core::Call
- CompletionType = GRPC::Core::CompletionType
+ CallOps = GRPC::Core::CallOps
before(:each) do
@pass_through = proc { |x| x }
@server_tag = Object.new
- @server_done_tag = Object.new
@tag = Object.new
@client_queue = GRPC::Core::CompletionQueue.new
@@ -48,7 +47,7 @@ describe GRPC::ActiveCall do
@server = GRPC::Core::Server.new(@server_queue, nil)
server_port = @server.add_http2_port(host)
@server.start
- @ch = GRPC::Core::Channel.new("localhost:#{server_port}", nil)
+ @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil)
end
after(:each) do
@@ -58,12 +57,10 @@ describe GRPC::ActiveCall do
describe 'restricted view methods' do
before(:each) do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
@client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
end
describe '#multi_req_view' do
@@ -90,48 +87,45 @@ describe GRPC::ActiveCall do
describe '#remote_send' do
it 'allows a client to send a payload to the server' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
@client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
@client_call.remote_send(msg)
# check that server rpc new was received
- @server.request_call(@server_tag)
- ev = @server_queue.next(deadline)
- expect(ev.type).to be(CompletionType::SERVER_RPC_NEW)
- expect(ev.call).to be_a(Call)
- expect(ev.tag).to be(@server_tag)
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ expect(recvd_rpc).to_not eq nil
+ recvd_call = recvd_rpc.call
# Accept the call, and verify that the server reads the response ok.
- ev.call.server_accept(@client_queue, @server_tag)
- ev.call.server_end_initial_metadata
- server_call = ActiveCall.new(ev.call, @client_queue, @pass_through,
+ server_ops = {
+ CallOps::SEND_INITIAL_METADATA => {}
+ }
+ recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+ server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
@pass_through, deadline)
expect(server_call.remote_read).to eq(msg)
end
it 'marshals the payload using the marshal func' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ ActiveCall.client_invoke(call, @client_queue, deadline)
marshal = proc { |x| 'marshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, marshal,
- @pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ @pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
# confirm that the message was marshalled
- @server.request_call(@server_tag)
- ev = @server_queue.next(deadline)
- ev.call.server_accept(@client_queue, @server_tag)
- ev.call.server_end_initial_metadata
- server_call = ActiveCall.new(ev.call, @client_queue, @pass_through,
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_call = recvd_rpc.call
+ server_ops = {
+ CallOps::SEND_INITIAL_METADATA => nil
+ }
+ recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+ server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
@pass_through, deadline)
expect(server_call.remote_read).to eq('marshalled:' + msg)
end
@@ -142,23 +136,22 @@ describe GRPC::ActiveCall do
call = make_test_call
ActiveCall.client_invoke(call, @client_queue, deadline,
k1: 'v1', k2: 'v2')
- @server.request_call(@server_tag)
- ev = @server_queue.next(deadline)
- expect(ev).to_not be_nil
- expect(ev.result.metadata['k1']).to eq('v1')
- expect(ev.result.metadata['k2']).to eq('v2')
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_call = recvd_rpc.call
+ expect(recvd_call).to_not be_nil
+ expect(recvd_rpc.metadata).to_not be_nil
+ expect(recvd_rpc.metadata['k1']).to eq('v1')
+ expect(recvd_rpc.metadata['k2']).to eq('v2')
end
end
describe '#remote_read' do
it 'reads the response sent by a server' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -168,12 +161,10 @@ describe GRPC::ActiveCall do
it 'saves no metadata when the server adds no metadata' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -185,12 +176,10 @@ describe GRPC::ActiveCall do
it 'saves metadata add by the server' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2')
@@ -203,12 +192,10 @@ describe GRPC::ActiveCall do
it 'get a nil msg before a status when an OK status is sent' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
client_call.writes_done(false)
@@ -222,13 +209,11 @@ describe GRPC::ActiveCall do
it 'unmarshals the response using the unmarshal func' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
unmarshal = proc { |x| 'unmarshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, @pass_through,
unmarshal, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
# confirm the client receives the unmarshalled message
msg = 'message is a string'
@@ -249,13 +234,11 @@ describe GRPC::ActiveCall do
it 'the returns an enumerator that can read n responses' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
- msg = 'message is 4a string'
+ metadata_tag: md_tag)
+ msg = 'message is a string'
reply = 'server_response'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -269,12 +252,10 @@ describe GRPC::ActiveCall do
it 'the returns an enumerator that stops after an OK Status' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- read_metadata_tag: meta_tag,
- finished_tag: done_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
reply = 'server_response'
client_call.remote_send(msg)
@@ -294,12 +275,10 @@ describe GRPC::ActiveCall do
describe '#writes_done' do
it 'finishes ok if the server sends a status response' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- finished_tag: done_tag,
- read_metadata_tag: meta_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
expect { client_call.writes_done(false) }.to_not raise_error
@@ -312,12 +291,10 @@ describe GRPC::ActiveCall do
it 'finishes ok if the server sends an early status response' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- read_metadata_tag: meta_tag,
- finished_tag: done_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -330,12 +307,10 @@ describe GRPC::ActiveCall do
it 'finishes ok if writes_done is true' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
- deadline)
+ md_tag = ActiveCall.client_invoke(call, @client_queue, deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
- read_metadata_tag: meta_tag,
- finished_tag: done_tag)
+ metadata_tag: md_tag)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -353,21 +328,20 @@ describe GRPC::ActiveCall do
end
def expect_server_to_be_invoked(**kw)
- @server.request_call(@server_tag)
- ev = @server_queue.next(deadline)
- ev.call.add_metadata(kw)
- ev.call.server_accept(@client_queue, @server_done_tag)
- ev.call.server_end_initial_metadata
- ActiveCall.new(ev.call, @client_queue, @pass_through,
- @pass_through, deadline,
- finished_tag: @server_done_tag)
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ expect(recvd_rpc).to_not eq nil
+ recvd_call = recvd_rpc.call
+ recvd_call.run_batch(@server_queue, @server_tag, deadline,
+ CallOps::SEND_INITIAL_METADATA => kw)
+ ActiveCall.new(recvd_call, @server_queue, @pass_through,
+ @pass_through, deadline)
end
def make_test_call
- @ch.create_call('dummy_method', 'dummy_host', deadline)
+ @ch.create_call(@client_queue, '/method', 'a.dummy.host', deadline)
end
def deadline
- Time.now + 1 # in 1 second; arbitrary
+ Time.now + 2 # in 2 seconds; arbitrary
end
end
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 0c98fc40d9..98d68ccfbb 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -28,17 +28,13 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc'
-require 'xray/thread_dump_signal_handler'
-
-NOOP = proc { |x| x }
-FAKE_HOST = 'localhost:0'
def wakey_thread(&blk)
- awake_mutex, awake_cond = Mutex.new, ConditionVariable.new
+ n = GRPC::Notifier.new
t = Thread.new do
- blk.call(awake_mutex, awake_cond)
+ blk.call(n)
end
- awake_mutex.synchronize { awake_cond.wait(awake_mutex) }
+ n.wait
t
end
@@ -50,8 +46,11 @@ end
include GRPC::Core::StatusCodes
include GRPC::Core::TimeConsts
+include GRPC::Core::CallOps
describe 'ClientStub' do
+ let(:noop) { proc { |x| x } }
+
before(:each) do
Thread.abort_on_exception = true
@server = nil
@@ -66,61 +65,56 @@ describe 'ClientStub' do
end
describe '#new' do
+ let(:fake_host) { 'localhost:0' }
it 'can be created from a host and args' do
- host = FAKE_HOST
opts = { a_channel_arg: 'an_arg' }
blk = proc do
- GRPC::ClientStub.new(host, @cq, **opts)
+ GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).not_to raise_error
end
it 'can be created with a default deadline' do
- host = FAKE_HOST
opts = { a_channel_arg: 'an_arg', deadline: 5 }
blk = proc do
- GRPC::ClientStub.new(host, @cq, **opts)
+ GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).not_to raise_error
end
it 'can be created with an channel override' do
- host = FAKE_HOST
opts = { a_channel_arg: 'an_arg', channel_override: @ch }
blk = proc do
- GRPC::ClientStub.new(host, @cq, **opts)
+ GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).not_to raise_error
end
it 'cannot be created with a bad channel override' do
- host = FAKE_HOST
blk = proc do
opts = { a_channel_arg: 'an_arg', channel_override: Object.new }
- GRPC::ClientStub.new(host, @cq, **opts)
+ GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).to raise_error
end
it 'cannot be created with bad credentials' do
- host = FAKE_HOST
blk = proc do
opts = { a_channel_arg: 'an_arg', creds: Object.new }
- GRPC::ClientStub.new(host, @cq, **opts)
+ GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).to raise_error
end
it 'can be created with test test credentials' do
certs = load_test_certs
- host = FAKE_HOST
blk = proc do
opts = {
GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',
a_channel_arg: 'an_arg',
creds: GRPC::Core::Credentials.new(certs[0], nil, nil)
}
- GRPC::ClientStub.new(host, @cq, **opts)
+ GRPC::ClientStub.new(fake_host, @cq, **opts)
end
expect(&blk).to_not raise_error
end
@@ -187,7 +181,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_response(stub)
- stub.request_response(@method, @sent_msg, NOOP, NOOP,
+ stub.request_response(@method, @sent_msg, noop, noop,
k1: 'v1', k2: 'v2')
end
@@ -196,7 +190,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_response(stub)
- op = stub.request_response(@method, @sent_msg, NOOP, NOOP,
+ op = stub.request_response(@method, @sent_msg, noop, noop,
return_op: true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute
@@ -259,7 +253,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_response(stub)
- stub.client_streamer(@method, @sent_msgs, NOOP, NOOP,
+ stub.client_streamer(@method, @sent_msgs, noop, noop,
k1: 'v1', k2: 'v2')
end
@@ -268,7 +262,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_response(stub)
- op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP,
+ op = stub.client_streamer(@method, @sent_msgs, noop, noop,
return_op: true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute
@@ -333,7 +327,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_responses(stub)
- e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP,
+ e = stub.server_streamer(@method, @sent_msg, noop, noop,
k1: 'v1', k2: 'v2')
expect(e).to be_a(Enumerator)
e
@@ -344,7 +338,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_responses(stub)
- op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP,
+ op = stub.server_streamer(@method, @sent_msg, noop, noop,
return_op: true, k1: 'v1', k2: 'v2')
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute
@@ -361,34 +355,30 @@ describe 'ClientStub' do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
+ server_port = create_test_server
+ @host = "localhost:#{server_port}"
end
it 'supports sending all the requests first', bidi: true do
- server_port = create_test_server
- host = "localhost:#{server_port}"
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass)
- stub = GRPC::ClientStub.new(host, @cq)
+ stub = GRPC::ClientStub.new(@host, @cq)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@replys)
th.join
end
it 'supports client-initiated ping pong', bidi: true do
- server_port = create_test_server
- host = "localhost:#{server_port}"
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
- stub = GRPC::ClientStub.new(host, @cq)
+ stub = GRPC::ClientStub.new(@host, @cq)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
it 'supports a server-initiated ping pong', bidi: true do
- server_port = create_test_server
- host = "localhost:#{server_port}"
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
- stub = GRPC::ClientStub.new(host, @cq)
+ stub = GRPC::ClientStub.new(@host, @cq)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
@@ -397,7 +387,7 @@ describe 'ClientStub' do
describe 'without a call operation' do
def get_responses(stub)
- e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP)
+ e = stub.bidi_streamer(@method, @sent_msgs, noop, noop)
expect(e).to be_a(Enumerator)
e
end
@@ -407,7 +397,7 @@ describe 'ClientStub' do
describe 'via a call operation' do
def get_responses(stub)
- op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP,
+ op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
return_op: true)
expect(op).to be_a(GRPC::ActiveCall::Operation)
e = op.execute
@@ -421,8 +411,8 @@ describe 'ClientStub' do
def run_server_streamer(expected_input, replys, status, **kw)
wanted_metadata = kw.clone
- wakey_thread do |mtx, cnd|
- c = expect_server_to_be_invoked(mtx, cnd)
+ wakey_thread do |notifier|
+ c = expect_server_to_be_invoked(notifier)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
@@ -434,8 +424,8 @@ describe 'ClientStub' do
def run_bidi_streamer_handle_inputs_first(expected_inputs, replys,
status)
- wakey_thread do |mtx, cnd|
- c = expect_server_to_be_invoked(mtx, cnd)
+ wakey_thread do |notifier|
+ c = expect_server_to_be_invoked(notifier)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
replys.each { |r| c.remote_send(r) }
c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
@@ -443,8 +433,8 @@ describe 'ClientStub' do
end
def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts)
- wakey_thread do |mtx, cnd|
- c = expect_server_to_be_invoked(mtx, cnd)
+ wakey_thread do |notifier|
+ c = expect_server_to_be_invoked(notifier)
expected_inputs.each do |i|
if client_starts
expect(c.remote_read).to eq(i)
@@ -460,8 +450,8 @@ describe 'ClientStub' do
def run_client_streamer(expected_inputs, resp, status, **kw)
wanted_metadata = kw.clone
- wakey_thread do |mtx, cnd|
- c = expect_server_to_be_invoked(mtx, cnd)
+ wakey_thread do |notifier|
+ c = expect_server_to_be_invoked(notifier)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
@@ -473,8 +463,8 @@ describe 'ClientStub' do
def run_request_response(expected_input, resp, status, **kw)
wanted_metadata = kw.clone
- wakey_thread do |mtx, cnd|
- c = expect_server_to_be_invoked(mtx, cnd)
+ wakey_thread do |notifier|
+ c = expect_server_to_be_invoked(notifier)
expect(c.remote_read).to eq(expected_input)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
@@ -490,24 +480,16 @@ describe 'ClientStub' do
@server.add_http2_port('0.0.0.0:0')
end
- def start_test_server(awake_mutex, awake_cond)
+ def expect_server_to_be_invoked(notifier)
@server.start
- @server_tag = Object.new
- @server.request_call(@server_tag)
- awake_mutex.synchronize { awake_cond.signal }
- end
-
- def expect_server_to_be_invoked(awake_mutex, awake_cond)
- start_test_server(awake_mutex, awake_cond)
- ev = @server_queue.pluck(@server_tag, INFINITE_FUTURE)
- fail OutOfTime if ev.nil?
- server_call = ev.call
- server_call.metadata = ev.result.metadata
- finished_tag = Object.new
- server_call.server_accept(@server_queue, finished_tag)
- server_call.server_end_initial_metadata
- GRPC::ActiveCall.new(server_call, @server_queue, NOOP, NOOP,
- INFINITE_FUTURE,
- finished_tag: finished_tag)
+ notifier.notify(nil)
+ server_tag = Object.new
+ recvd_rpc = @server.request_call(@server_queue, server_tag,
+ INFINITE_FUTURE)
+ recvd_call = recvd_rpc.call
+ recvd_call.metadata = recvd_rpc.metadata
+ recvd_call.run_batch(@server_queue, server_tag, Time.now + 2,
+ SEND_INITIAL_METADATA => nil)
+ GRPC::ActiveCall.new(recvd_call, @server_queue, noop, noop, INFINITE_FUTURE)
end
end
diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb
index 39d1e83748..083632a080 100644
--- a/src/ruby/spec/generic/rpc_desc_spec.rb
+++ b/src/ruby/spec/generic/rpc_desc_spec.rb
@@ -37,7 +37,6 @@ describe GRPC::RpcDesc do
INTERNAL = GRPC::Core::StatusCodes::INTERNAL
UNKNOWN = GRPC::Core::StatusCodes::UNKNOWN
CallError = GRPC::Core::CallError
- EventError = GRPC::Core::EventError
before(:each) do
@request_response = RpcDesc.new('rr', Object.new, Object.new, 'encode',
@@ -53,49 +52,49 @@ describe GRPC::RpcDesc do
@ok_response = Object.new
end
+ shared_examples 'it handles errors' do
+ it 'sends the specified status if BadStatus is raised' do
+ expect(@call).to receive(:remote_read).once.and_return(Object.new)
+ expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
+ {})
+ this_desc.run_server_method(@call, method(:bad_status))
+ end
+
+ it 'sends status UNKNOWN if other StandardErrors are raised' do
+ expect(@call).to receive(:remote_read).once.and_return(Object.new)
+ expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason,
+ false, {})
+ this_desc.run_server_method(@call, method(:other_error))
+ end
+
+ it 'absorbs CallError with no further action' do
+ expect(@call).to receive(:remote_read).once.and_raise(CallError)
+ blk = proc do
+ this_desc.run_server_method(@call, method(:fake_reqresp))
+ end
+ expect(&blk).to_not raise_error
+ end
+ end
+
describe '#run_server_method' do
+ let(:fake_md) { { k1: 'v1', k2: 'v2' } }
describe 'for request responses' do
+ let(:this_desc) { @request_response }
before(:each) do
@call = double('active_call')
allow(@call).to receive(:single_req_view).and_return(@call)
- allow(@call).to receive(:gc)
- end
-
- it 'sends the specified status if BadStatus is raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
- expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
- @request_response.run_server_method(@call, method(:bad_status))
- end
-
- it 'sends status UNKNOWN if other StandardErrors are raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
- expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason)
- @request_response.run_server_method(@call, method(:other_error))
- end
-
- it 'absorbs EventError with no further action' do
- expect(@call).to receive(:remote_read).once.and_raise(EventError)
- blk = proc do
- @request_response.run_server_method(@call, method(:fake_reqresp))
- end
- expect(&blk).to_not raise_error
end
- it 'absorbs CallError with no further action' do
- expect(@call).to receive(:remote_read).once.and_raise(CallError)
- blk = proc do
- @request_response.run_server_method(@call, method(:fake_reqresp))
- end
- expect(&blk).to_not raise_error
- end
+ it_behaves_like 'it handles errors'
it 'sends a response and closes the stream if there no errors' do
req = Object.new
expect(@call).to receive(:remote_read).once.and_return(req)
expect(@call).to receive(:remote_send).once.with(@ok_response)
- expect(@call).to receive(:send_status).once.with(OK, 'OK')
- expect(@call).to receive(:finished).once
- @request_response.run_server_method(@call, method(:fake_reqresp))
+ expect(@call).to receive(:output_metadata).and_return(fake_md)
+ expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
+ **fake_md)
+ this_desc.run_server_method(@call, method(:fake_reqresp))
end
end
@@ -103,27 +102,20 @@ describe GRPC::RpcDesc do
before(:each) do
@call = double('active_call')
allow(@call).to receive(:multi_req_view).and_return(@call)
- allow(@call).to receive(:gc)
end
it 'sends the specified status if BadStatus is raised' do
- expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
+ expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
+ {})
@client_streamer.run_server_method(@call, method(:bad_status_alt))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
- expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason)
+ expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason,
+ false, {})
@client_streamer.run_server_method(@call, method(:other_error_alt))
end
- it 'absorbs EventError with no further action' do
- expect(@call).to receive(:remote_send).once.and_raise(EventError)
- blk = proc do
- @client_streamer.run_server_method(@call, method(:fake_clstream))
- end
- expect(&blk).to_not raise_error
- end
-
it 'absorbs CallError with no further action' do
expect(@call).to receive(:remote_send).once.and_raise(CallError)
blk = proc do
@@ -134,53 +126,29 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do
expect(@call).to receive(:remote_send).once.with(@ok_response)
- expect(@call).to receive(:send_status).once.with(OK, 'OK')
- expect(@call).to receive(:finished).once
+ expect(@call).to receive(:output_metadata).and_return(fake_md)
+ expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
+ **fake_md)
@client_streamer.run_server_method(@call, method(:fake_clstream))
end
end
describe 'for server streaming' do
+ let(:this_desc) { @request_response }
before(:each) do
@call = double('active_call')
allow(@call).to receive(:single_req_view).and_return(@call)
- allow(@call).to receive(:gc)
- end
-
- it 'sends the specified status if BadStatus is raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
- expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
- @server_streamer.run_server_method(@call, method(:bad_status))
- end
-
- it 'sends status UNKNOWN if other StandardErrors are raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
- expect(@call).to receive(:send_status) .once.with(UNKNOWN, @no_reason)
- @server_streamer.run_server_method(@call, method(:other_error))
- end
-
- it 'absorbs EventError with no further action' do
- expect(@call).to receive(:remote_read).once.and_raise(EventError)
- blk = proc do
- @server_streamer.run_server_method(@call, method(:fake_svstream))
- end
- expect(&blk).to_not raise_error
end
- it 'absorbs CallError with no further action' do
- expect(@call).to receive(:remote_read).once.and_raise(CallError)
- blk = proc do
- @server_streamer.run_server_method(@call, method(:fake_svstream))
- end
- expect(&blk).to_not raise_error
- end
+ it_behaves_like 'it handles errors'
it 'sends a response and closes the stream if there no errors' do
req = Object.new
expect(@call).to receive(:remote_read).once.and_return(req)
expect(@call).to receive(:remote_send).twice.with(@ok_response)
- expect(@call).to receive(:send_status).once.with(OK, 'OK')
- expect(@call).to receive(:finished).once
+ expect(@call).to receive(:output_metadata).and_return(fake_md)
+ expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
+ **fake_md)
@server_streamer.run_server_method(@call, method(:fake_svstream))
end
end
@@ -191,26 +159,28 @@ describe GRPC::RpcDesc do
enq_th, rwl_th = double('enqueue_th'), ('read_write_loop_th')
allow(enq_th).to receive(:join)
allow(rwl_th).to receive(:join)
- allow(@call).to receive(:gc)
end
it 'sends the specified status if BadStatus is raised' do
e = GRPC::BadStatus.new(@bs_code, 'NOK')
expect(@call).to receive(:run_server_bidi).and_raise(e)
- expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK')
+ expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
+ {})
@bidi_streamer.run_server_method(@call, method(:bad_status_alt))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
expect(@call).to receive(:run_server_bidi).and_raise(StandardError)
- expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason)
+ expect(@call).to receive(:send_status).once.with(UNKNOWN, @no_reason,
+ false, {})
@bidi_streamer.run_server_method(@call, method(:other_error_alt))
end
it 'closes the stream if there no errors' do
expect(@call).to receive(:run_server_bidi)
- expect(@call).to receive(:send_status).once.with(OK, 'OK')
- expect(@call).to receive(:finished).once
+ expect(@call).to receive(:output_metadata).and_return(fake_md)
+ expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
+ **fake_md)
@bidi_streamer.run_server_method(@call, method(:fake_bidistream))
end
end
diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb
index 8383dc1533..aae3a7d7cb 100644
--- a/src/ruby/spec/generic/rpc_server_pool_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb
@@ -28,11 +28,10 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc'
-require 'xray/thread_dump_signal_handler'
-Pool = GRPC::RpcServer::Pool
+describe GRPC::Pool do
+ Pool = GRPC::Pool
-describe Pool do
describe '#new' do
it 'raises if a non-positive size is used' do
expect { Pool.new(0) }.to raise_error
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 34e5cdcd04..2cd21a15e3 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -28,7 +28,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc'
-require 'xray/thread_dump_signal_handler'
def load_test_certs
test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
@@ -58,18 +57,20 @@ class NoRpcImplementation
rpc :an_rpc, EchoMsg, EchoMsg
end
-# A test service with an implementation.
+# A test service with an echo implementation.
class EchoService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
attr_reader :received_md
- def initialize(_default_var = 'ignored')
+ def initialize(**kw)
+ @trailing_metadata = kw
@received_md = []
end
def an_rpc(req, call)
logger.info('echo service received a request')
+ call.output_metadata.update(@trailing_metadata)
@received_md << call.metadata unless call.metadata.nil?
req
end
@@ -77,6 +78,25 @@ end
EchoStub = EchoService.rpc_stub_class
+# A test service with an implementation that fails with BadStatus
+class FailingService
+ include GRPC::GenericService
+ rpc :an_rpc, EchoMsg, EchoMsg
+ attr_reader :details, :code, :md
+
+ def initialize(_default_var = 'ignored')
+ @details = 'app error'
+ @code = 101
+ @md = { failed_method: 'an_rpc' }
+ end
+
+ def an_rpc(_req, _call)
+ fail GRPC::BadStatus.new(@code, @details, **@md)
+ end
+end
+
+FailingStub = FailingService.rpc_stub_class
+
# A slow test service.
class SlowService
include GRPC::GenericService
@@ -301,21 +321,20 @@ describe GRPC::RpcServer do
end
describe '#run' do
- before(:each) do
- @client_opts = {
- channel_override: @ch
- }
- @marshal = EchoService.rpc_descs[:an_rpc].marshal_proc
- @unmarshal = EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output)
- server_opts = {
- server_override: @server,
- completion_queue_override: @server_queue,
- poll_period: 1
- }
- @srv = RpcServer.new(**server_opts)
- end
+ let(:client_opts) { { channel_override: @ch } }
+ let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc }
+ let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) }
+
+ context 'with no connect_metadata' do
+ before(:each) do
+ server_opts = {
+ server_override: @server,
+ completion_queue_override: @server_queue,
+ poll_period: 1
+ }
+ @srv = RpcServer.new(**server_opts)
+ end
- describe 'when running' do
it 'should return NOT_FOUND status on unknown methods', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@@ -323,8 +342,8 @@ describe GRPC::RpcServer do
req = EchoMsg.new
blk = proc do
cq = GRPC::Core::CompletionQueue.new
- stub = GRPC::ClientStub.new(@host, cq, **@client_opts)
- stub.request_response('/unknown', req, @marshal, @unmarshal)
+ stub = GRPC::ClientStub.new(@host, cq, **client_opts)
+ stub.request_response('/unknown', req, marshal, unmarshal)
end
expect(&blk).to raise_error GRPC::BadStatus
@srv.stop
@@ -337,7 +356,7 @@ describe GRPC::RpcServer do
@srv.wait_till_running
req = EchoMsg.new
n = 5 # arbitrary
- stub = EchoStub.new(@host, **@client_opts)
+ stub = EchoStub.new(@host, **client_opts)
n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) }
@srv.stop
t.join
@@ -349,7 +368,7 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
- stub = EchoStub.new(@host, **@client_opts)
+ stub = EchoStub.new(@host, **client_opts)
expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
expect(service.received_md).to eq(wanted_md)
@@ -363,8 +382,8 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
- stub = SlowStub.new(@host, **@client_opts)
- deadline = service.delay + 0.5 # wait for long enough
+ stub = SlowStub.new(@host, **client_opts)
+ deadline = service.delay + 1.0 # wait for long enough
expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
expect(service.received_md).to eq(wanted_md)
@@ -378,7 +397,7 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
- stub = SlowStub.new(@host, **@client_opts)
+ stub = SlowStub.new(@host, **client_opts)
deadline = 0.1 # too short for SlowService to respond
blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') }
expect(&blk).to raise_error GRPC::BadStatus
@@ -388,19 +407,37 @@ describe GRPC::RpcServer do
t.join
end
+ it 'should handle cancellation correctly', server: true do
+ service = SlowService.new
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ stub = SlowStub.new(@host, **client_opts)
+ op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
+ Thread.new do # cancel the call
+ sleep 0.1
+ op.cancel
+ end
+ expect { op.execute }.to raise_error GRPC::Cancelled
+ @srv.stop
+ t.join
+ end
+
it 'should receive updated metadata', server: true do
service = EchoService.new
@srv.handle(service)
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
- @client_opts[:update_metadata] = proc do |md|
+ client_opts[:update_metadata] = proc do |md|
md[:k1] = 'updated-v1'
md
end
- stub = EchoStub.new(@host, **@client_opts)
+ stub = EchoStub.new(@host, **client_opts)
expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
- wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2' }]
+ wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2',
+ 'jwt_aud_uri' => "https://#{@host}/EchoService" }]
expect(service.received_md).to eq(wanted_md)
@srv.stop
t.join
@@ -415,7 +452,7 @@ describe GRPC::RpcServer do
threads = []
n.times do
threads << Thread.new do
- stub = EchoStub.new(@host, **@client_opts)
+ stub = EchoStub.new(@host, **client_opts)
q << stub.an_rpc(req)
end
end
@@ -443,7 +480,7 @@ describe GRPC::RpcServer do
one_failed_as_unavailable = false
n.times do
threads << Thread.new do
- stub = SlowStub.new(@host, **@client_opts)
+ stub = SlowStub.new(@host, **client_opts)
begin
stub.an_rpc(req)
rescue GRPC::BadStatus => e
@@ -456,5 +493,97 @@ describe GRPC::RpcServer do
expect(one_failed_as_unavailable).to be(true)
end
end
+
+ context 'with connect metadata' do
+ let(:test_md_proc) do
+ proc do |mth, md|
+ res = md.clone
+ res['method'] = mth
+ res['connect_k1'] = 'connect_v1'
+ res
+ end
+ end
+ before(:each) do
+ server_opts = {
+ server_override: @server,
+ completion_queue_override: @server_queue,
+ poll_period: 1,
+ connect_md_proc: test_md_proc
+ }
+ @srv = RpcServer.new(**server_opts)
+ end
+
+ it 'should send connect metadata to the client', server: true do
+ service = EchoService.new
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ stub = EchoStub.new(@host, **client_opts)
+ op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
+ expect(op.metadata).to be nil
+ expect(op.execute).to be_a(EchoMsg)
+ wanted_md = {
+ 'k1' => 'v1',
+ 'k2' => 'v2',
+ 'method' => '/EchoService/an_rpc',
+ 'connect_k1' => 'connect_v1'
+ }
+ expect(op.metadata).to eq(wanted_md)
+ @srv.stop
+ t.join
+ end
+ end
+
+ context 'with trailing metadata' do
+ before(:each) do
+ server_opts = {
+ server_override: @server,
+ completion_queue_override: @server_queue,
+ poll_period: 1
+ }
+ @srv = RpcServer.new(**server_opts)
+ end
+
+ it 'should be added to BadStatus when requests fail', server: true do
+ service = FailingService.new
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ stub = FailingStub.new(@host, **client_opts)
+ blk = proc { stub.an_rpc(req) }
+
+ # confirm it raise the expected error
+ expect(&blk).to raise_error GRPC::BadStatus
+
+ # call again and confirm exception contained the trailing metadata.
+ begin
+ blk.call
+ rescue GRPC::BadStatus => e
+ expect(e.code).to eq(service.code)
+ expect(e.details).to eq(service.details)
+ expect(e.metadata).to eq(service.md)
+ end
+ @srv.stop
+ t.join
+ end
+
+ it 'should be received by the client', server: true do
+ wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' }
+ service = EchoService.new(k1: 'out_v1', k2: 'out_v2')
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ stub = EchoStub.new(@host, **client_opts)
+ op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
+ expect(op.metadata).to be nil
+ expect(op.execute).to be_a(EchoMsg)
+ expect(op.metadata).to eq(wanted_trailers)
+ @srv.stop
+ t.join
+ end
+ end
end
end
diff --git a/src/ruby/spec/metadata_spec.rb b/src/ruby/spec/metadata_spec.rb
deleted file mode 100644
index 2472866692..0000000000
--- a/src/ruby/spec/metadata_spec.rb
+++ /dev/null
@@ -1,64 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-require 'grpc'
-
-describe GRPC::Core::Metadata do
- describe '#new' do
- it 'should create instances' do
- expect { GRPC::Core::Metadata.new('a key', 'a value') }.to_not raise_error
- end
- end
-
- describe '#key' do
- md = GRPC::Core::Metadata.new('a key', 'a value')
- it 'should be the constructor value' do
- expect(md.key).to eq('a key')
- end
- end
-
- describe '#value' do
- md = GRPC::Core::Metadata.new('a key', 'a value')
- it 'should be the constuctor value' do
- expect(md.value).to eq('a value')
- end
- end
-
- describe '#dup' do
- it 'should create a copy that returns the correct key' do
- md = GRPC::Core::Metadata.new('a key', 'a value')
- expect(md.dup.key).to eq('a key')
- end
-
- it 'should create a copy that returns the correct value' do
- md = GRPC::Core::Metadata.new('a key', 'a value')
- expect(md.dup.value).to eq('a value')
- end
- end
-end
diff --git a/src/ruby/spec/server_spec.rb b/src/ruby/spec/server_spec.rb
index a47e484f97..bb566d1b1f 100644
--- a/src/ruby/spec/server_spec.rb
+++ b/src/ruby/spec/server_spec.rb
@@ -152,7 +152,7 @@ describe Server do
it 'does not take a hash with bad values as channel args' do
blk = construct_with_args(symbol: Object.new)
expect(&blk).to raise_error TypeError
- blk = construct_with_args('1' => Hash.new)
+ blk = construct_with_args('1' => {})
expect(&blk).to raise_error TypeError
end
diff --git a/src/ruby/spec/spec_helper.rb b/src/ruby/spec/spec_helper.rb
index 837d2fc42a..101165c146 100644
--- a/src/ruby/spec/spec_helper.rb
+++ b/src/ruby/spec/spec_helper.rb
@@ -35,14 +35,18 @@ $LOAD_PATH.unshift(spec_dir)
$LOAD_PATH.unshift(lib_dir)
$LOAD_PATH.uniq!
-require 'faraday'
+# set up coverage
+require 'simplecov'
+SimpleCov.start do
+ add_filter 'spec'
+ add_filter 'bin'
+ SimpleCov.command_name ENV['COVERAGE_NAME']
+end if ENV['COVERAGE_NAME']
+
require 'rspec'
require 'logging'
require 'rspec/logging_helper'
-# Allow Faraday to support test stubs
-Faraday::Adapter.load_middleware(:test)
-
# Configure RSpec to capture log messages for each test. The output from the
# logs will be stored in the @log_output variable. It is a StringIO instance.
RSpec.configure do |config|