From fde3dbfb87853299b3e8b459180b935a013e3388 Mon Sep 17 00:00:00 2001 From: Tim Emiola Date: Tue, 11 Aug 2015 16:35:41 -0700 Subject: Allow reserved metadata to be propagated between calls --- src/ruby/ext/grpc/rb_channel.c | 25 +++++++++---- src/ruby/lib/grpc/generic/client_stub.rb | 61 ++++++++++++++++++++++++------- src/ruby/spec/call_spec.rb | 2 +- src/ruby/spec/channel_spec.rb | 4 +- src/ruby/spec/client_server_spec.rb | 2 +- src/ruby/spec/generic/active_call_spec.rb | 2 +- 6 files changed, 71 insertions(+), 25 deletions(-) (limited to 'src') diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index ac591f1563..8bf2bf2e17 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -195,18 +195,28 @@ static VALUE grpc_rb_channel_init_copy(VALUE copy, VALUE orig) { /* Create a call given a grpc_channel, in order to call method. The request is not sent until grpc_call_invoke is called. */ -static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method, - VALUE host, VALUE deadline) { +static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, + VALUE parent, VALUE mask, + VALUE method, VALUE host, + VALUE deadline) { VALUE res = Qnil; grpc_rb_channel *wrapper = NULL; grpc_call *call = NULL; + grpc_call *parent_call = NULL; grpc_channel *ch = NULL; grpc_completion_queue *cq = NULL; + int flags = GRPC_PROPAGATE_DEFAULTS; char *method_chars = StringValueCStr(method); char *host_chars = NULL; if (host != Qnil) { host_chars = StringValueCStr(host); } + if (mask != Qnil) { + flags = NUM2UINT(mask); + } + if (parent != Qnil) { + parent_call = grpc_rb_get_wrapped_call(parent); + } cq = grpc_rb_get_wrapped_completion_queue(cqueue); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); @@ -216,10 +226,10 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method, return Qnil; } - call = grpc_channel_create_call(ch, NULL, GRPC_PROPAGATE_DEFAULTS, cq, - method_chars, host_chars, - grpc_rb_time_timeval(deadline, - /* absolute time */ 0)); + call = grpc_channel_create_call(ch, parent_call, flags, cq, method_chars, + host_chars, grpc_rb_time_timeval( + deadline, + /* absolute time */ 0)); if (call == NULL) { rb_raise(rb_eRuntimeError, "cannot create call with method %s", method_chars); @@ -237,6 +247,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method, return res; } + /* Closes the channel, calling it's destroy method */ static VALUE grpc_rb_channel_destroy(VALUE self) { grpc_rb_channel *wrapper = NULL; @@ -283,7 +294,7 @@ void Init_grpc_channel() { /* Add ruby analogues of the Channel methods. */ rb_define_method(grpc_rb_cChannel, "create_call", - grpc_rb_channel_create_call, 4); + grpc_rb_channel_create_call, 6); rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0); rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0); rb_define_alias(grpc_rb_cChannel, "close", "destroy"); diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb index a2f1ec612d..cce718537c 100644 --- a/src/ruby/lib/grpc/generic/client_stub.rb +++ b/src/ruby/lib/grpc/generic/client_stub.rb @@ -32,6 +32,8 @@ require 'grpc/version' # GRPC contains the General RPC module. module GRPC + # rubocop:disable Metrics/ParameterLists + # ClientStub represents an endpoint used to send requests to GRPC servers. class ClientStub include Core::StatusCodes @@ -68,6 +70,12 @@ module GRPC update_metadata end + # Allows users of the stub to modify the propagate mask. + # + # This is an advanced feature for use when making calls to another gRPC + # server whilst running in the handler of an existing one. + attr_writer :propagate_mask + # Creates a new ClientStub. # # Minimally, a stub is created with the just the host of the gRPC service @@ -91,8 +99,8 @@ module GRPC # # - :update_metadata # when present, this a func that takes a hash and returns a hash - # it can be used to update metadata, i.e, remove, change or update - # amend metadata values. + # it can be used to update metadata, i.e, remove, or amend + # metadata values. # # @param host [String] the host the stub connects to # @param q [Core::CompletionQueue] used to wait for events @@ -105,6 +113,7 @@ module GRPC channel_override: nil, timeout: nil, creds: nil, + propagate_mask: nil, update_metadata: nil, **kw) fail(TypeError, '!CompletionQueue') unless q.is_a?(Core::CompletionQueue) @@ -113,6 +122,7 @@ module GRPC @update_metadata = ClientStub.check_update_metadata(update_metadata) alt_host = kw[Core::Channel::SSL_TARGET] @host = alt_host.nil? ? host : alt_host + @propagate_mask = propagate_mask @timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout end @@ -151,11 +161,15 @@ module GRPC # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param timeout [Numeric] (optional) the max completion time in seconds + # @param parent [Core::Call] a prior call whose reserved metadata + # will be propagated by this one. # @param return_op [true|false] return an Operation if true # @return [Object] the response received from the server def request_response(method, req, marshal, unmarshal, timeout = nil, - return_op: false, **kw) - c = new_active_call(method, marshal, unmarshal, timeout) + return_op: false, + parent: parent, + **kw) + c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.request_response(req, **md) unless return_op @@ -210,10 +224,14 @@ module GRPC # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param timeout [Numeric] the max completion time in seconds # @param return_op [true|false] return an Operation if true + # @param parent [Core::Call] a prior call whose reserved metadata + # will be propagated by this one. # @return [Object|Operation] the response received from the server def client_streamer(method, requests, marshal, unmarshal, timeout = nil, - return_op: false, **kw) - c = new_active_call(method, marshal, unmarshal, timeout) + return_op: false, + parent: nil, + **kw) + c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.client_streamer(requests, **md) unless return_op @@ -276,11 +294,16 @@ module GRPC # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param timeout [Numeric] the max completion time in seconds # @param return_op [true|false]return an Operation if true + # @param parent [Core::Call] a prior call whose reserved metadata + # will be propagated by this one. # @param blk [Block] when provided, is executed for each response # @return [Enumerator|Operation|nil] as discussed above def server_streamer(method, req, marshal, unmarshal, timeout = nil, - return_op: false, **kw, &blk) - c = new_active_call(method, marshal, unmarshal, timeout) + return_op: false, + parent: nil, + **kw, + &blk) + c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.server_streamer(req, **md, &blk) unless return_op @@ -381,12 +404,17 @@ module GRPC # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param timeout [Numeric] (optional) the max completion time in seconds - # @param blk [Block] when provided, is executed for each response + # @param parent [Core::Call] a prior call whose reserved metadata + # will be propagated by this one. # @param return_op [true|false] return an Operation if true + # @param blk [Block] when provided, is executed for each response # @return [Enumerator|nil|Operation] as discussed above def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil, - return_op: false, **kw, &blk) - c = new_active_call(method, marshal, unmarshal, timeout) + return_op: false, + parent: nil, + **kw, + &blk) + c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.bidi_streamer(requests, **md, &blk) unless return_op @@ -407,10 +435,17 @@ module GRPC # @param method [string] the method being called. # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses + # @param parent [Grpc::Call] a parent call, available when calls are + # made from server # @param timeout [TimeConst] - def new_active_call(method, marshal, unmarshal, timeout = nil) + def new_active_call(method, marshal, unmarshal, timeout = nil, parent: nil) deadline = from_relative_time(timeout.nil? ? @timeout : timeout) - call = @ch.create_call(@queue, method, nil, deadline) + call = @ch.create_call(@queue, + parent, # parent call + @propagate_mask, # propagation options + method, + nil, # host use nil, + deadline) ActiveCall.new(call, @queue, marshal, unmarshal, deadline, started: false) end end diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb index 36a442faed..3c5d33ffcd 100644 --- a/src/ruby/spec/call_spec.rb +++ b/src/ruby/spec/call_spec.rb @@ -137,7 +137,7 @@ describe GRPC::Core::Call do end def make_test_call - @ch.create_call(client_queue, 'dummy_method', nil, deadline) + @ch.create_call(client_queue, nil, nil, 'dummy_method', nil, deadline) end def deadline diff --git a/src/ruby/spec/channel_spec.rb b/src/ruby/spec/channel_spec.rb index 9081f0e20c..25cefcdfb7 100644 --- a/src/ruby/spec/channel_spec.rb +++ b/src/ruby/spec/channel_spec.rb @@ -117,7 +117,7 @@ describe GRPC::Core::Channel do deadline = Time.now + 5 blk = proc do - ch.create_call(cq, 'dummy_method', nil, deadline) + ch.create_call(cq, nil, nil, 'dummy_method', nil, deadline) end expect(&blk).to_not raise_error end @@ -128,7 +128,7 @@ describe GRPC::Core::Channel do deadline = Time.now + 5 blk = proc do - ch.create_call(cq, 'dummy_method', nil, deadline) + ch.create_call(cq, nil, nil, 'dummy_method', nil, deadline) end expect(&blk).to raise_error(RuntimeError) end diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index 57c9a8de9b..2e673ff413 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -61,7 +61,7 @@ shared_context 'setup: tags' do end def new_client_call - @ch.create_call(@client_queue, '/method', nil, deadline) + @ch.create_call(@client_queue, nil, nil, '/method', nil, deadline) end end diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 424b2dbdeb..0bf65ba2e9 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -338,7 +338,7 @@ describe GRPC::ActiveCall do end def make_test_call - @ch.create_call(@client_queue, '/method', nil, deadline) + @ch.create_call(@client_queue, nil, nil, '/method', nil, deadline) end def deadline -- cgit v1.2.3