aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Michael Lumish <mlumish@google.com>2015-08-12 19:36:01 -0400
committerGravatar Michael Lumish <mlumish@google.com>2015-08-12 19:36:01 -0400
commit62fee02cefaab3d8e266ed9513542c502ccb7a40 (patch)
treea25b964e43f82729300d2ee414c53f2574a11819
parent30bc5d3fa96e87cc41036fe71749450375937dd6 (diff)
parent9fbfb5b0169a10e24fcee5659794b19bdfec3d7e (diff)
Merge pull request #2892 from tbetbetbe/grpc-ruby-enable-propagation-between-calls
Grpc ruby enable propagation between calls
-rw-r--r--src/ruby/ext/grpc/rb_channel.c42
-rwxr-xr-xsrc/ruby/grpc.gemspec2
-rw-r--r--src/ruby/lib/grpc/generic/client_stub.rb61
-rw-r--r--src/ruby/spec/call_spec.rb2
-rw-r--r--src/ruby/spec/channel_spec.rb4
-rw-r--r--src/ruby/spec/client_server_spec.rb2
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb2
7 files changed, 89 insertions, 26 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index ac591f1563..c973a1db6c 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -195,18 +195,28 @@ static VALUE grpc_rb_channel_init_copy(VALUE copy, VALUE orig) {
/* Create a call given a grpc_channel, in order to call method. The request
is not sent until grpc_call_invoke is called. */
-static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method,
- VALUE host, VALUE deadline) {
+static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue,
+ VALUE parent, VALUE mask,
+ VALUE method, VALUE host,
+ VALUE deadline) {
VALUE res = Qnil;
grpc_rb_channel *wrapper = NULL;
grpc_call *call = NULL;
+ grpc_call *parent_call = NULL;
grpc_channel *ch = NULL;
grpc_completion_queue *cq = NULL;
+ int flags = GRPC_PROPAGATE_DEFAULTS;
char *method_chars = StringValueCStr(method);
char *host_chars = NULL;
if (host != Qnil) {
host_chars = StringValueCStr(host);
}
+ if (mask != Qnil) {
+ flags = NUM2UINT(mask);
+ }
+ if (parent != Qnil) {
+ parent_call = grpc_rb_get_wrapped_call(parent);
+ }
cq = grpc_rb_get_wrapped_completion_queue(cqueue);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
@@ -216,10 +226,10 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method,
return Qnil;
}
- call = grpc_channel_create_call(ch, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
- method_chars, host_chars,
- grpc_rb_time_timeval(deadline,
- /* absolute time */ 0));
+ call = grpc_channel_create_call(ch, parent_call, flags, cq, method_chars,
+ host_chars, grpc_rb_time_timeval(
+ deadline,
+ /* absolute time */ 0));
if (call == NULL) {
rb_raise(rb_eRuntimeError, "cannot create call with method %s",
method_chars);
@@ -237,6 +247,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method,
return res;
}
+
/* Closes the channel, calling it's destroy method */
static VALUE grpc_rb_channel_destroy(VALUE self) {
grpc_rb_channel *wrapper = NULL;
@@ -268,6 +279,22 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
return res;
}
+static void Init_grpc_propagate_masks() {
+ /* Constants representing call propagation masks in grpc.h */
+ VALUE grpc_rb_mPropagateMasks = rb_define_module_under(
+ grpc_rb_mGrpcCore, "PropagateMasks");
+ rb_define_const(grpc_rb_mPropagateMasks, "DEADLINE",
+ UINT2NUM(GRPC_PROPAGATE_DEADLINE));
+ rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_STATS_CONTEXT",
+ UINT2NUM(GRPC_PROPAGATE_CENSUS_STATS_CONTEXT));
+ rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_TRACING_CONTEXT",
+ UINT2NUM(GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT));
+ rb_define_const(grpc_rb_mPropagateMasks, "CANCELLATION",
+ UINT2NUM(GRPC_PROPAGATE_CANCELLATION));
+ rb_define_const(grpc_rb_mPropagateMasks, "DEFAULTS",
+ UINT2NUM(GRPC_PROPAGATE_DEFAULTS));
+}
+
void Init_grpc_channel() {
grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject);
grpc_rb_cChannel =
@@ -283,7 +310,7 @@ void Init_grpc_channel() {
/* Add ruby analogues of the Channel methods. */
rb_define_method(grpc_rb_cChannel, "create_call",
- grpc_rb_channel_create_call, 4);
+ grpc_rb_channel_create_call, 6);
rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
rb_define_alias(grpc_rb_cChannel, "close", "destroy");
@@ -299,6 +326,7 @@ void Init_grpc_channel() {
ID2SYM(rb_intern(GRPC_ARG_MAX_CONCURRENT_STREAMS)));
rb_define_const(grpc_rb_cChannel, "MAX_MESSAGE_LENGTH",
ID2SYM(rb_intern(GRPC_ARG_MAX_MESSAGE_LENGTH)));
+ Init_grpc_propagate_masks();
}
/* Gets the wrapped channel from the ruby wrapper */
diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec
index 45f31329e9..eb748458b9 100755
--- a/src/ruby/grpc.gemspec
+++ b/src/ruby/grpc.gemspec
@@ -22,7 +22,7 @@ Gem::Specification.new do |s|
s.files += Dir.glob('bin/**/*')
s.test_files = Dir.glob('spec/**/*')
%w(math noproto).each do |b|
- s.executables += [ "#{b}_client.rb", "#{b}_server.rb" ]
+ s.executables += ["#{b}_client.rb", "#{b}_server.rb"]
end
s.require_paths = %w( bin lib )
s.platform = Gem::Platform::RUBY
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index a2f1ec612d..cce718537c 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -32,6 +32,8 @@ require 'grpc/version'
# GRPC contains the General RPC module.
module GRPC
+ # rubocop:disable Metrics/ParameterLists
+
# ClientStub represents an endpoint used to send requests to GRPC servers.
class ClientStub
include Core::StatusCodes
@@ -68,6 +70,12 @@ module GRPC
update_metadata
end
+ # Allows users of the stub to modify the propagate mask.
+ #
+ # This is an advanced feature for use when making calls to another gRPC
+ # server whilst running in the handler of an existing one.
+ attr_writer :propagate_mask
+
# Creates a new ClientStub.
#
# Minimally, a stub is created with the just the host of the gRPC service
@@ -91,8 +99,8 @@ module GRPC
#
# - :update_metadata
# when present, this a func that takes a hash and returns a hash
- # it can be used to update metadata, i.e, remove, change or update
- # amend metadata values.
+ # it can be used to update metadata, i.e, remove, or amend
+ # metadata values.
#
# @param host [String] the host the stub connects to
# @param q [Core::CompletionQueue] used to wait for events
@@ -105,6 +113,7 @@ module GRPC
channel_override: nil,
timeout: nil,
creds: nil,
+ propagate_mask: nil,
update_metadata: nil,
**kw)
fail(TypeError, '!CompletionQueue') unless q.is_a?(Core::CompletionQueue)
@@ -113,6 +122,7 @@ module GRPC
@update_metadata = ClientStub.check_update_metadata(update_metadata)
alt_host = kw[Core::Channel::SSL_TARGET]
@host = alt_host.nil? ? host : alt_host
+ @propagate_mask = propagate_mask
@timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout
end
@@ -151,11 +161,15 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] (optional) the max completion time in seconds
+ # @param parent [Core::Call] a prior call whose reserved metadata
+ # will be propagated by this one.
# @param return_op [true|false] return an Operation if true
# @return [Object] the response received from the server
def request_response(method, req, marshal, unmarshal, timeout = nil,
- return_op: false, **kw)
- c = new_active_call(method, marshal, unmarshal, timeout)
+ return_op: false,
+ parent: parent,
+ **kw)
+ c = new_active_call(method, marshal, unmarshal, timeout, parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.request_response(req, **md) unless return_op
@@ -210,10 +224,14 @@ module GRPC
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] the max completion time in seconds
# @param return_op [true|false] return an Operation if true
+ # @param parent [Core::Call] a prior call whose reserved metadata
+ # will be propagated by this one.
# @return [Object|Operation] the response received from the server
def client_streamer(method, requests, marshal, unmarshal, timeout = nil,
- return_op: false, **kw)
- c = new_active_call(method, marshal, unmarshal, timeout)
+ return_op: false,
+ parent: nil,
+ **kw)
+ c = new_active_call(method, marshal, unmarshal, timeout, parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.client_streamer(requests, **md) unless return_op
@@ -276,11 +294,16 @@ module GRPC
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] the max completion time in seconds
# @param return_op [true|false]return an Operation if true
+ # @param parent [Core::Call] a prior call whose reserved metadata
+ # will be propagated by this one.
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|Operation|nil] as discussed above
def server_streamer(method, req, marshal, unmarshal, timeout = nil,
- return_op: false, **kw, &blk)
- c = new_active_call(method, marshal, unmarshal, timeout)
+ return_op: false,
+ parent: nil,
+ **kw,
+ &blk)
+ c = new_active_call(method, marshal, unmarshal, timeout, parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.server_streamer(req, **md, &blk) unless return_op
@@ -381,12 +404,17 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] (optional) the max completion time in seconds
- # @param blk [Block] when provided, is executed for each response
+ # @param parent [Core::Call] a prior call whose reserved metadata
+ # will be propagated by this one.
# @param return_op [true|false] return an Operation if true
+ # @param blk [Block] when provided, is executed for each response
# @return [Enumerator|nil|Operation] as discussed above
def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil,
- return_op: false, **kw, &blk)
- c = new_active_call(method, marshal, unmarshal, timeout)
+ return_op: false,
+ parent: nil,
+ **kw,
+ &blk)
+ c = new_active_call(method, marshal, unmarshal, timeout, parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.bidi_streamer(requests, **md, &blk) unless return_op
@@ -407,10 +435,17 @@ module GRPC
# @param method [string] the method being called.
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
+ # @param parent [Grpc::Call] a parent call, available when calls are
+ # made from server
# @param timeout [TimeConst]
- def new_active_call(method, marshal, unmarshal, timeout = nil)
+ def new_active_call(method, marshal, unmarshal, timeout = nil, parent: nil)
deadline = from_relative_time(timeout.nil? ? @timeout : timeout)
- call = @ch.create_call(@queue, method, nil, deadline)
+ call = @ch.create_call(@queue,
+ parent, # parent call
+ @propagate_mask, # propagation options
+ method,
+ nil, # host use nil,
+ deadline)
ActiveCall.new(call, @queue, marshal, unmarshal, deadline, started: false)
end
end
diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb
index 36a442faed..3c5d33ffcd 100644
--- a/src/ruby/spec/call_spec.rb
+++ b/src/ruby/spec/call_spec.rb
@@ -137,7 +137,7 @@ describe GRPC::Core::Call do
end
def make_test_call
- @ch.create_call(client_queue, 'dummy_method', nil, deadline)
+ @ch.create_call(client_queue, nil, nil, 'dummy_method', nil, deadline)
end
def deadline
diff --git a/src/ruby/spec/channel_spec.rb b/src/ruby/spec/channel_spec.rb
index 9081f0e20c..25cefcdfb7 100644
--- a/src/ruby/spec/channel_spec.rb
+++ b/src/ruby/spec/channel_spec.rb
@@ -117,7 +117,7 @@ describe GRPC::Core::Channel do
deadline = Time.now + 5
blk = proc do
- ch.create_call(cq, 'dummy_method', nil, deadline)
+ ch.create_call(cq, nil, nil, 'dummy_method', nil, deadline)
end
expect(&blk).to_not raise_error
end
@@ -128,7 +128,7 @@ describe GRPC::Core::Channel do
deadline = Time.now + 5
blk = proc do
- ch.create_call(cq, 'dummy_method', nil, deadline)
+ ch.create_call(cq, nil, nil, 'dummy_method', nil, deadline)
end
expect(&blk).to raise_error(RuntimeError)
end
diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb
index 57c9a8de9b..2e673ff413 100644
--- a/src/ruby/spec/client_server_spec.rb
+++ b/src/ruby/spec/client_server_spec.rb
@@ -61,7 +61,7 @@ shared_context 'setup: tags' do
end
def new_client_call
- @ch.create_call(@client_queue, '/method', nil, deadline)
+ @ch.create_call(@client_queue, nil, nil, '/method', nil, deadline)
end
end
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 424b2dbdeb..0bf65ba2e9 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -338,7 +338,7 @@ describe GRPC::ActiveCall do
end
def make_test_call
- @ch.create_call(@client_queue, '/method', nil, deadline)
+ @ch.create_call(@client_queue, nil, nil, '/method', nil, deadline)
end
def deadline