diff options
author | Shaun McCormick <splittingred@gmail.com> | 2017-08-11 14:57:14 -0500 |
---|---|---|
committer | Shaun McCormick <splittingred@gmail.com> | 2017-09-25 09:28:04 -0500 |
commit | eec7a917baf37ed26449b106f99027c97aedaba6 (patch) | |
tree | 2fb7503f739eca1aff43579ce2a065ce31f4d17b /src/ruby/lib | |
parent | 76c9e0806373181b32f1fba63686a3502acfb9cf (diff) |
Add Ruby server interceptors
Diffstat (limited to 'src/ruby/lib')
-rw-r--r-- | src/ruby/lib/grpc.rb | 1 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 43 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/bidi_call.rb | 29 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/client_stub.rb | 133 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/interceptor_registry.rb | 53 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/interceptors.rb | 186 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_desc.rb | 80 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 18 |
8 files changed, 464 insertions, 79 deletions
diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index 98bfc0a0fa..37b0392072 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -24,6 +24,7 @@ require_relative 'grpc/generic/active_call' require_relative 'grpc/generic/client_stub' require_relative 'grpc/generic/service' require_relative 'grpc/generic/rpc_server' +require_relative 'grpc/generic/interceptors' begin file = File.open(ssl_roots_path) diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 10eb70b4a7..8c3aa284aa 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -154,6 +154,15 @@ module GRPC Operation.new(self) end + ## + # Returns a restricted view of this ActiveCall for use in interceptors + # + # @return [InterceptableView] + # + def interceptable + InterceptableView.new(self) + end + def receive_and_check_status batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) set_input_stream_done @@ -515,15 +524,27 @@ module GRPC # This does not mean that must necessarily be one. E.g, the replies # produced by gen_each_reply could ignore the received_msgs # - # @param gen_each_reply [Proc] generates the BiDi stream replies - def run_server_bidi(gen_each_reply) - bd = BidiCall.new(@call, - @marshal, - @unmarshal, - metadata_received: @metadata_received, - req_view: MultiReqView.new(self)) - - bd.run_on_server(gen_each_reply, proc { set_input_stream_done }) + # @param mth [Proc] generates the BiDi stream replies + # @param interception_ctx [InterceptionContext] + # + def run_server_bidi(mth, interception_ctx) + view = multi_req_view + bidi_call = BidiCall.new( + @call, + @marshal, + @unmarshal, + metadata_received: @metadata_received, + req_view: view + ) + requests = bidi_call.read_next_loop(proc { set_input_stream_done }, false) + interception_ctx.intercept!( + :bidi_streamer, + call: view, + method: mth, + requests: requests + ) do + bidi_call.run_on_server(mth, requests) + end end # Waits till an operation completes @@ -645,5 +666,9 @@ module GRPC Operation = view_class(:cancel, :cancelled?, :deadline, :execute, :metadata, :status, :start_call, :wait, :write_flag, :write_flag=, :trailing_metadata) + + # InterceptableView further limits access to an ActiveCall's methods + # for use in interceptors on the client, exposing only the deadline + InterceptableView = view_class(:deadline) end end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index c2239d0178..3bdcc0062e 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -87,23 +87,32 @@ module GRPC # This does not mean that must necessarily be one. E.g, the replies # produced by gen_each_reply could ignore the received_msgs # - # @param gen_each_reply [Proc] generates the BiDi stream replies. - # @param set_input_steam_done [Proc] call back to call when - # the reads have been completely read through. - def run_on_server(gen_each_reply, set_input_stream_done) + # @param [Proc] gen_each_reply generates the BiDi stream replies. + # @param [Enumerable] requests The enumerable of requests to run + def run_on_server(gen_each_reply, requests) + replies = nil + # Pass in the optional call object parameter if possible if gen_each_reply.arity == 1 - replys = gen_each_reply.call( - read_loop(set_input_stream_done, is_client: false)) + replies = gen_each_reply.call(requests) elsif gen_each_reply.arity == 2 - replys = gen_each_reply.call( - read_loop(set_input_stream_done, is_client: false), - @req_view) + replies = gen_each_reply.call(requests, @req_view) else fail 'Illegal arity of reply generator' end - write_loop(replys, is_client: false) + write_loop(replies, is_client: false) + end + + ## + # Read the next stream iteration + # + # @param [Proc] finalize_stream callback to call when the reads have been + # completely read through. + # @param [Boolean] is_client If this is a client or server request + # + def read_next_loop(finalize_stream, is_client = false) + read_loop(finalize_stream, is_client: is_client) end private diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb index 75a95a4e94..9a50f8a99d 100644 --- a/src/ruby/lib/grpc/generic/client_stub.rb +++ b/src/ruby/lib/grpc/generic/client_stub.rb @@ -89,17 +89,23 @@ module GRPC # used within a gRPC server. # @param channel_args [Hash] the channel arguments. Note: this argument is # ignored if the channel_override argument is provided. + # @param interceptors [Array<GRPC::ClientInterceptor>] An array of + # GRPC::ClientInterceptor objects that will be used for + # intercepting calls before they are executed + # Interceptors are an EXPERIMENTAL API. def initialize(host, creds, channel_override: nil, timeout: nil, propagate_mask: nil, - channel_args: {}) + channel_args: {}, + interceptors: []) @ch = ClientStub.setup_channel(channel_override, host, creds, channel_args) alt_host = channel_args[Core::Channel::SSL_TARGET] @host = alt_host.nil? ? host : alt_host @propagate_mask = propagate_mask @timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout + @interceptors = InterceptorRegistry.new(interceptors) end # request_response sends a request to a GRPC server, and returns the @@ -149,16 +155,29 @@ module GRPC deadline: deadline, parent: parent, credentials: credentials) - return c.request_response(req, metadata: metadata) unless return_op - - # return the operation view of the active_call; define #execute as a - # new method for this instance that invokes #request_response. - c.merge_metadata_to_send(metadata) - op = c.operation - op.define_singleton_method(:execute) do - c.request_response(req, metadata: metadata) + interception_context = @interceptors.build_context + intercept_args = { + method: method, + request: req, + call: c.interceptable, + metadata: metadata + } + if return_op + # return the operation view of the active_call; define #execute as a + # new method for this instance that invokes #request_response. + c.merge_metadata_to_send(metadata) + op = c.operation + op.define_singleton_method(:execute) do + interception_context.intercept!(:request_response, intercept_args) do + c.request_response(req, metadata: metadata) + end + end + op + else + interception_context.intercept!(:request_response, intercept_args) do + c.request_response(req, metadata: metadata) + end end - op end # client_streamer sends a stream of requests to a GRPC server, and @@ -213,16 +232,29 @@ module GRPC deadline: deadline, parent: parent, credentials: credentials) - return c.client_streamer(requests, metadata: metadata) unless return_op - - # return the operation view of the active_call; define #execute as a - # new method for this instance that invokes #client_streamer. - c.merge_metadata_to_send(metadata) - op = c.operation - op.define_singleton_method(:execute) do - c.client_streamer(requests) + interception_context = @interceptors.build_context + intercept_args = { + method: method, + requests: requests, + call: c.interceptable, + metadata: metadata + } + if return_op + # return the operation view of the active_call; define #execute as a + # new method for this instance that invokes #client_streamer. + c.merge_metadata_to_send(metadata) + op = c.operation + op.define_singleton_method(:execute) do + interception_context.intercept!(:client_streamer, intercept_args) do + c.client_streamer(requests) + end + end + op + else + interception_context.intercept!(:client_streamer, intercept_args) do + c.client_streamer(requests, metadata: metadata) + end end - op end # server_streamer sends one request to the GRPC server, which yields a @@ -292,16 +324,29 @@ module GRPC deadline: deadline, parent: parent, credentials: credentials) - return c.server_streamer(req, metadata: metadata, &blk) unless return_op - - # return the operation view of the active_call; define #execute - # as a new method for this instance that invokes #server_streamer - c.merge_metadata_to_send(metadata) - op = c.operation - op.define_singleton_method(:execute) do - c.server_streamer(req, &blk) + interception_context = @interceptors.build_context + intercept_args = { + method: method, + request: req, + call: c.interceptable, + metadata: metadata + } + if return_op + # return the operation view of the active_call; define #execute + # as a new method for this instance that invokes #server_streamer + c.merge_metadata_to_send(metadata) + op = c.operation + op.define_singleton_method(:execute) do + interception_context.intercept!(:server_streamer, intercept_args) do + c.server_streamer(req, &blk) + end + end + op + else + interception_context.intercept!(:server_streamer, intercept_args) do + c.server_streamer(req, metadata: metadata, &blk) + end end - op end # bidi_streamer sends a stream of requests to the GRPC server, and yields @@ -405,17 +450,29 @@ module GRPC deadline: deadline, parent: parent, credentials: credentials) - return c.bidi_streamer(requests, metadata: metadata, - &blk) unless return_op - - # return the operation view of the active_call; define #execute - # as a new method for this instance that invokes #bidi_streamer - c.merge_metadata_to_send(metadata) - op = c.operation - op.define_singleton_method(:execute) do - c.bidi_streamer(requests, &blk) + interception_context = @interceptors.build_context + intercept_args = { + method: method, + requests: requests, + call: c.interceptable, + metadata: metadata + } + if return_op + # return the operation view of the active_call; define #execute + # as a new method for this instance that invokes #bidi_streamer + c.merge_metadata_to_send(metadata) + op = c.operation + op.define_singleton_method(:execute) do + interception_context.intercept!(:bidi_streamer, intercept_args) do + c.bidi_streamer(requests, &blk) + end + end + op + else + interception_context.intercept!(:bidi_streamer, intercept_args) do + c.bidi_streamer(requests, metadata: metadata, &blk) + end end - op end private diff --git a/src/ruby/lib/grpc/generic/interceptor_registry.rb b/src/ruby/lib/grpc/generic/interceptor_registry.rb new file mode 100644 index 0000000000..b241eb9a86 --- /dev/null +++ b/src/ruby/lib/grpc/generic/interceptor_registry.rb @@ -0,0 +1,53 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# GRPC contains the General RPC module. +module GRPC + ## + # Represents a registry of added interceptors available for enumeration. + # The registry can be used for both server and client interceptors. + # This class is internal to gRPC and not meant for public usage. + # + class InterceptorRegistry + ## + # An error raised when an interceptor is attempted to be added + # that does not extend GRPC::Interceptor + # + class DescendantError < StandardError; end + + ## + # Initialize the registry with an empty interceptor list + # This is an EXPERIMENTAL API. + # + def initialize(interceptors = []) + @interceptors = [] + interceptors.each do |i| + base = GRPC::Interceptor + unless i.class.ancestors.include?(base) + fail DescendantError, "Interceptors must descend from #{base}" + end + @interceptors << i + end + end + + ## + # Builds an interception context from this registry + # + # @return [InterceptionContext] + # + def build_context + InterceptionContext.new(@interceptors) + end + end +end diff --git a/src/ruby/lib/grpc/generic/interceptors.rb b/src/ruby/lib/grpc/generic/interceptors.rb new file mode 100644 index 0000000000..73faec4b9c --- /dev/null +++ b/src/ruby/lib/grpc/generic/interceptors.rb @@ -0,0 +1,186 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +require_relative 'interceptor_registry' + +# GRPC contains the General RPC module. +module GRPC + ## + # Base class for interception in GRPC + # + class Interceptor + ## + # @param [Hash] options A hash of options that will be used + # by the interceptor. This is an EXPERIMENTAL API. + # + def initialize(options = {}) + @options = options || {} + end + end + + ## + # ClientInterceptor allows for wrapping outbound gRPC client stub requests. + # This is an EXPERIMENTAL API. + # + class ClientInterceptor < Interceptor + ## + # Intercept a unary request response call + # + # @param [Object] request + # @param [GRPC::ActiveCall] call + # @param [Method] method + # @param [Hash] metadata + # + def request_response(request:, call:, method:, metadata:) + GRPC.logger.debug "Intercepting request response method #{method}" \ + " for request #{request} with call #{call} and metadata: #{metadata}" + yield + end + + ## + # Intercept a client streaming call + # + # @param [Enumerable] requests + # @param [GRPC::ActiveCall] call + # @param [Method] method + # @param [Hash] metadata + # + def client_streamer(requests:, call:, method:, metadata:) + GRPC.logger.debug "Intercepting client streamer method #{method}" \ + " for requests #{requests} with call #{call} and metadata: #{metadata}" + yield + end + + ## + # Intercept a server streaming call + # + # @param [Object] request + # @param [GRPC::ActiveCall] call + # @param [Method] method + # @param [Hash] metadata + # + def server_streamer(request:, call:, method:, metadata:) + GRPC.logger.debug "Intercepting server streamer method #{method}" \ + " for request #{request} with call #{call} and metadata: #{metadata}" + yield + end + + ## + # Intercept a BiDi streaming call + # + # @param [Enumerable] requests + # @param [GRPC::ActiveCall] call + # @param [Method] method + # @param [Hash] metadata + # + def bidi_streamer(requests:, call:, method:, metadata:) + GRPC.logger.debug "Intercepting bidi streamer method #{method}" \ + " for requests #{requests} with call #{call} and metadata: #{metadata}" + yield + end + end + + ## + # ServerInterceptor allows for wrapping gRPC server execution handling. + # This is an EXPERIMENTAL API. + # + class ServerInterceptor < Interceptor + ## + # Intercept a unary request response call. + # + # @param [Object] request + # @param [GRPC::ActiveCall::SingleReqView] call + # @param [Method] method + # + def request_response(request:, call:, method:) + GRPC.logger.debug "Intercepting request response method #{method}" \ + " for request #{request} with call #{call}" + yield + end + + ## + # Intercept a client streaming call + # + # @param [GRPC::ActiveCall::MultiReqView] call + # @param [Method] method + # + def client_streamer(call:, method:) + GRPC.logger.debug "Intercepting client streamer method #{method}" \ + " with call #{call}" + yield + end + + ## + # Intercept a server streaming call + # + # @param [Object] request + # @param [GRPC::ActiveCall::SingleReqView] call + # @param [Method] method + # + def server_streamer(request:, call:, method:) + GRPC.logger.debug "Intercepting server streamer method #{method}" \ + " for request #{request} with call #{call}" + yield + end + + ## + # Intercept a BiDi streaming call + # + # @param [Enumerable<Object>] requests + # @param [GRPC::ActiveCall::MultiReqView] call + # @param [Method] method + # + def bidi_streamer(requests:, call:, method:) + GRPC.logger.debug "Intercepting bidi streamer method #{method}" \ + " for requests #{requests} with call #{call}" + yield + end + end + + ## + # Represents the context in which an interceptor runs. Used to provide an + # injectable mechanism for handling interception. This is an EXPERIMENTAL API. + # + class InterceptionContext + ## + # @param [Array<GRPC::Interceptor>] + # + def initialize(interceptors = []) + @interceptors = interceptors.dup + end + + ## + # Intercept the call and fire out to interceptors in a FIFO execution. + # This is an EXPERIMENTAL API. + # + # @param [Symbol] type The request type + # @param [Hash] args The arguments for the call + # + def intercept!(type, args = {}) + return yield if @interceptors.none? + + i = @interceptors.pop + return yield unless i + + i.send(type, args) do + if @interceptors.any? + intercept!(type, args) do + yield + end + else + yield + end + end + end + end +end diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 6fb6c412fb..5fd1805aab 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -47,43 +47,85 @@ module GRPC proc { |o| unmarshal_class.method(unmarshal_method).call(o) } end - def handle_request_response(active_call, mth) + def handle_request_response(active_call, mth, inter_ctx) req = active_call.read_unary_request - resp = mth.call(req, active_call.single_req_view) - active_call.server_unary_response( - resp, trailing_metadata: active_call.output_metadata) + call = active_call.single_req_view + + inter_ctx.intercept!( + :request_response, + method: mth, + call: call, + request: req + ) do + resp = mth.call(req, call) + active_call.server_unary_response( + resp, + trailing_metadata: active_call.output_metadata + ) + end end - def handle_client_streamer(active_call, mth) - resp = mth.call(active_call.multi_req_view) - active_call.server_unary_response( - resp, trailing_metadata: active_call.output_metadata) + def handle_client_streamer(active_call, mth, inter_ctx) + call = active_call.multi_req_view + + inter_ctx.intercept!( + :client_streamer, + method: mth, + call: call + ) do + resp = mth.call(call) + active_call.server_unary_response( + resp, + trailing_metadata: active_call.output_metadata + ) + end end - def handle_server_streamer(active_call, mth) + def handle_server_streamer(active_call, mth, inter_ctx) req = active_call.read_unary_request - replys = mth.call(req, active_call.single_req_view) - replys.each { |r| active_call.remote_send(r) } - send_status(active_call, OK, 'OK', active_call.output_metadata) + call = active_call.single_req_view + + inter_ctx.intercept!( + :server_streamer, + method: mth, + call: call, + request: req + ) do + replies = mth.call(req, call) + replies.each { |r| active_call.remote_send(r) } + send_status(active_call, OK, 'OK', active_call.output_metadata) + end end - def handle_bidi_streamer(active_call, mth) - active_call.run_server_bidi(mth) + ## + # @param [GRPC::ActiveCall] active_call + # @param [Method] mth + # @param [Array<GRPC::InterceptionContext>] inter_ctx + # + def handle_bidi_streamer(active_call, mth, inter_ctx) + active_call.run_server_bidi(mth, inter_ctx) send_status(active_call, OK, 'OK', active_call.output_metadata) end - def run_server_method(active_call, mth) + ## + # @param [GRPC::ActiveCall] active_call The current active call object + # for the request + # @param [Method] mth The current RPC method being called + # @param [GRPC::InterceptionContext] inter_ctx The interception context + # being executed + # + def run_server_method(active_call, mth, inter_ctx = InterceptionContext.new) # While a server method is running, it might be cancelled, its deadline # might be reached, the handler could throw an unknown error, or a # well-behaved handler could throw a StatusError. if request_response? - handle_request_response(active_call, mth) + handle_request_response(active_call, mth, inter_ctx) elsif client_streamer? - handle_client_streamer(active_call, mth) + handle_client_streamer(active_call, mth, inter_ctx) elsif server_streamer? - handle_server_streamer(active_call, mth) + handle_server_streamer(active_call, mth, inter_ctx) else # is a bidi_stream - handle_bidi_streamer(active_call, mth) + handle_bidi_streamer(active_call, mth, inter_ctx) end rescue BadStatus => e # this is raised by handlers that want GRPC to send an application error diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 33b3cea1fc..d5fc11dc1c 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -196,11 +196,18 @@ module GRPC # # * server_args: # A server arguments hash to be passed down to the underlying core server + # + # * interceptors: + # Am array of GRPC::ServerInterceptor objects that will be used for + # intercepting server handlers to provide extra functionality. + # Interceptors are an EXPERIMENTAL API. + # def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, connect_md_proc:nil, - server_args:{}) + server_args:{}, + interceptors:[]) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @@ -212,6 +219,7 @@ module GRPC # :stopped. State transitions can only proceed in that order. @running_state = :not_started @server = Core::Server.new(server_args) + @interceptors = InterceptorRegistry.new(interceptors) end # stops a running server @@ -374,7 +382,11 @@ module GRPC @pool.schedule(active_call) do |ac| c, mth = ac begin - rpc_descs[mth].run_server_method(c, rpc_handlers[mth]) + rpc_descs[mth].run_server_method( + c, + rpc_handlers[mth], + @interceptors.build_context + ) rescue StandardError c.send_status(GRPC::Core::StatusCodes::INTERNAL, 'Server handler failed') @@ -382,7 +394,7 @@ module GRPC end end rescue Core::CallError, RuntimeError => e - # these might happen for various reasonse. The correct behaviour of + # these might happen for various reasons. The correct behavior of # the server is to log them and continue, if it's not shutting down. if running_state == :running GRPC.logger.warn("server call failed: #{e}") |