aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/lib
diff options
context:
space:
mode:
authorGravatar Shaun McCormick <splittingred@gmail.com>2017-08-11 14:57:14 -0500
committerGravatar Shaun McCormick <splittingred@gmail.com>2017-09-25 09:28:04 -0500
commiteec7a917baf37ed26449b106f99027c97aedaba6 (patch)
tree2fb7503f739eca1aff43579ce2a065ce31f4d17b /src/ruby/lib
parent76c9e0806373181b32f1fba63686a3502acfb9cf (diff)
Add Ruby server interceptors
Diffstat (limited to 'src/ruby/lib')
-rw-r--r--src/ruby/lib/grpc.rb1
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb43
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb29
-rw-r--r--src/ruby/lib/grpc/generic/client_stub.rb133
-rw-r--r--src/ruby/lib/grpc/generic/interceptor_registry.rb53
-rw-r--r--src/ruby/lib/grpc/generic/interceptors.rb186
-rw-r--r--src/ruby/lib/grpc/generic/rpc_desc.rb80
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb18
8 files changed, 464 insertions, 79 deletions
diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb
index 98bfc0a0fa..37b0392072 100644
--- a/src/ruby/lib/grpc.rb
+++ b/src/ruby/lib/grpc.rb
@@ -24,6 +24,7 @@ require_relative 'grpc/generic/active_call'
require_relative 'grpc/generic/client_stub'
require_relative 'grpc/generic/service'
require_relative 'grpc/generic/rpc_server'
+require_relative 'grpc/generic/interceptors'
begin
file = File.open(ssl_roots_path)
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 10eb70b4a7..8c3aa284aa 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -154,6 +154,15 @@ module GRPC
Operation.new(self)
end
+ ##
+ # Returns a restricted view of this ActiveCall for use in interceptors
+ #
+ # @return [InterceptableView]
+ #
+ def interceptable
+ InterceptableView.new(self)
+ end
+
def receive_and_check_status
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
set_input_stream_done
@@ -515,15 +524,27 @@ module GRPC
# This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs
#
- # @param gen_each_reply [Proc] generates the BiDi stream replies
- def run_server_bidi(gen_each_reply)
- bd = BidiCall.new(@call,
- @marshal,
- @unmarshal,
- metadata_received: @metadata_received,
- req_view: MultiReqView.new(self))
-
- bd.run_on_server(gen_each_reply, proc { set_input_stream_done })
+ # @param mth [Proc] generates the BiDi stream replies
+ # @param interception_ctx [InterceptionContext]
+ #
+ def run_server_bidi(mth, interception_ctx)
+ view = multi_req_view
+ bidi_call = BidiCall.new(
+ @call,
+ @marshal,
+ @unmarshal,
+ metadata_received: @metadata_received,
+ req_view: view
+ )
+ requests = bidi_call.read_next_loop(proc { set_input_stream_done }, false)
+ interception_ctx.intercept!(
+ :bidi_streamer,
+ call: view,
+ method: mth,
+ requests: requests
+ ) do
+ bidi_call.run_on_server(mth, requests)
+ end
end
# Waits till an operation completes
@@ -645,5 +666,9 @@ module GRPC
Operation = view_class(:cancel, :cancelled?, :deadline, :execute,
:metadata, :status, :start_call, :wait, :write_flag,
:write_flag=, :trailing_metadata)
+
+ # InterceptableView further limits access to an ActiveCall's methods
+ # for use in interceptors on the client, exposing only the deadline
+ InterceptableView = view_class(:deadline)
end
end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index c2239d0178..3bdcc0062e 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -87,23 +87,32 @@ module GRPC
# This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs
#
- # @param gen_each_reply [Proc] generates the BiDi stream replies.
- # @param set_input_steam_done [Proc] call back to call when
- # the reads have been completely read through.
- def run_on_server(gen_each_reply, set_input_stream_done)
+ # @param [Proc] gen_each_reply generates the BiDi stream replies.
+ # @param [Enumerable] requests The enumerable of requests to run
+ def run_on_server(gen_each_reply, requests)
+ replies = nil
+
# Pass in the optional call object parameter if possible
if gen_each_reply.arity == 1
- replys = gen_each_reply.call(
- read_loop(set_input_stream_done, is_client: false))
+ replies = gen_each_reply.call(requests)
elsif gen_each_reply.arity == 2
- replys = gen_each_reply.call(
- read_loop(set_input_stream_done, is_client: false),
- @req_view)
+ replies = gen_each_reply.call(requests, @req_view)
else
fail 'Illegal arity of reply generator'
end
- write_loop(replys, is_client: false)
+ write_loop(replies, is_client: false)
+ end
+
+ ##
+ # Read the next stream iteration
+ #
+ # @param [Proc] finalize_stream callback to call when the reads have been
+ # completely read through.
+ # @param [Boolean] is_client If this is a client or server request
+ #
+ def read_next_loop(finalize_stream, is_client = false)
+ read_loop(finalize_stream, is_client: is_client)
end
private
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index 75a95a4e94..9a50f8a99d 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -89,17 +89,23 @@ module GRPC
# used within a gRPC server.
# @param channel_args [Hash] the channel arguments. Note: this argument is
# ignored if the channel_override argument is provided.
+ # @param interceptors [Array<GRPC::ClientInterceptor>] An array of
+ # GRPC::ClientInterceptor objects that will be used for
+ # intercepting calls before they are executed
+ # Interceptors are an EXPERIMENTAL API.
def initialize(host, creds,
channel_override: nil,
timeout: nil,
propagate_mask: nil,
- channel_args: {})
+ channel_args: {},
+ interceptors: [])
@ch = ClientStub.setup_channel(channel_override, host, creds,
channel_args)
alt_host = channel_args[Core::Channel::SSL_TARGET]
@host = alt_host.nil? ? host : alt_host
@propagate_mask = propagate_mask
@timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout
+ @interceptors = InterceptorRegistry.new(interceptors)
end
# request_response sends a request to a GRPC server, and returns the
@@ -149,16 +155,29 @@ module GRPC
deadline: deadline,
parent: parent,
credentials: credentials)
- return c.request_response(req, metadata: metadata) unless return_op
-
- # return the operation view of the active_call; define #execute as a
- # new method for this instance that invokes #request_response.
- c.merge_metadata_to_send(metadata)
- op = c.operation
- op.define_singleton_method(:execute) do
- c.request_response(req, metadata: metadata)
+ interception_context = @interceptors.build_context
+ intercept_args = {
+ method: method,
+ request: req,
+ call: c.interceptable,
+ metadata: metadata
+ }
+ if return_op
+ # return the operation view of the active_call; define #execute as a
+ # new method for this instance that invokes #request_response.
+ c.merge_metadata_to_send(metadata)
+ op = c.operation
+ op.define_singleton_method(:execute) do
+ interception_context.intercept!(:request_response, intercept_args) do
+ c.request_response(req, metadata: metadata)
+ end
+ end
+ op
+ else
+ interception_context.intercept!(:request_response, intercept_args) do
+ c.request_response(req, metadata: metadata)
+ end
end
- op
end
# client_streamer sends a stream of requests to a GRPC server, and
@@ -213,16 +232,29 @@ module GRPC
deadline: deadline,
parent: parent,
credentials: credentials)
- return c.client_streamer(requests, metadata: metadata) unless return_op
-
- # return the operation view of the active_call; define #execute as a
- # new method for this instance that invokes #client_streamer.
- c.merge_metadata_to_send(metadata)
- op = c.operation
- op.define_singleton_method(:execute) do
- c.client_streamer(requests)
+ interception_context = @interceptors.build_context
+ intercept_args = {
+ method: method,
+ requests: requests,
+ call: c.interceptable,
+ metadata: metadata
+ }
+ if return_op
+ # return the operation view of the active_call; define #execute as a
+ # new method for this instance that invokes #client_streamer.
+ c.merge_metadata_to_send(metadata)
+ op = c.operation
+ op.define_singleton_method(:execute) do
+ interception_context.intercept!(:client_streamer, intercept_args) do
+ c.client_streamer(requests)
+ end
+ end
+ op
+ else
+ interception_context.intercept!(:client_streamer, intercept_args) do
+ c.client_streamer(requests, metadata: metadata)
+ end
end
- op
end
# server_streamer sends one request to the GRPC server, which yields a
@@ -292,16 +324,29 @@ module GRPC
deadline: deadline,
parent: parent,
credentials: credentials)
- return c.server_streamer(req, metadata: metadata, &blk) unless return_op
-
- # return the operation view of the active_call; define #execute
- # as a new method for this instance that invokes #server_streamer
- c.merge_metadata_to_send(metadata)
- op = c.operation
- op.define_singleton_method(:execute) do
- c.server_streamer(req, &blk)
+ interception_context = @interceptors.build_context
+ intercept_args = {
+ method: method,
+ request: req,
+ call: c.interceptable,
+ metadata: metadata
+ }
+ if return_op
+ # return the operation view of the active_call; define #execute
+ # as a new method for this instance that invokes #server_streamer
+ c.merge_metadata_to_send(metadata)
+ op = c.operation
+ op.define_singleton_method(:execute) do
+ interception_context.intercept!(:server_streamer, intercept_args) do
+ c.server_streamer(req, &blk)
+ end
+ end
+ op
+ else
+ interception_context.intercept!(:server_streamer, intercept_args) do
+ c.server_streamer(req, metadata: metadata, &blk)
+ end
end
- op
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -405,17 +450,29 @@ module GRPC
deadline: deadline,
parent: parent,
credentials: credentials)
- return c.bidi_streamer(requests, metadata: metadata,
- &blk) unless return_op
-
- # return the operation view of the active_call; define #execute
- # as a new method for this instance that invokes #bidi_streamer
- c.merge_metadata_to_send(metadata)
- op = c.operation
- op.define_singleton_method(:execute) do
- c.bidi_streamer(requests, &blk)
+ interception_context = @interceptors.build_context
+ intercept_args = {
+ method: method,
+ requests: requests,
+ call: c.interceptable,
+ metadata: metadata
+ }
+ if return_op
+ # return the operation view of the active_call; define #execute
+ # as a new method for this instance that invokes #bidi_streamer
+ c.merge_metadata_to_send(metadata)
+ op = c.operation
+ op.define_singleton_method(:execute) do
+ interception_context.intercept!(:bidi_streamer, intercept_args) do
+ c.bidi_streamer(requests, &blk)
+ end
+ end
+ op
+ else
+ interception_context.intercept!(:bidi_streamer, intercept_args) do
+ c.bidi_streamer(requests, metadata: metadata, &blk)
+ end
end
- op
end
private
diff --git a/src/ruby/lib/grpc/generic/interceptor_registry.rb b/src/ruby/lib/grpc/generic/interceptor_registry.rb
new file mode 100644
index 0000000000..b241eb9a86
--- /dev/null
+++ b/src/ruby/lib/grpc/generic/interceptor_registry.rb
@@ -0,0 +1,53 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# GRPC contains the General RPC module.
+module GRPC
+ ##
+ # Represents a registry of added interceptors available for enumeration.
+ # The registry can be used for both server and client interceptors.
+ # This class is internal to gRPC and not meant for public usage.
+ #
+ class InterceptorRegistry
+ ##
+ # An error raised when an interceptor is attempted to be added
+ # that does not extend GRPC::Interceptor
+ #
+ class DescendantError < StandardError; end
+
+ ##
+ # Initialize the registry with an empty interceptor list
+ # This is an EXPERIMENTAL API.
+ #
+ def initialize(interceptors = [])
+ @interceptors = []
+ interceptors.each do |i|
+ base = GRPC::Interceptor
+ unless i.class.ancestors.include?(base)
+ fail DescendantError, "Interceptors must descend from #{base}"
+ end
+ @interceptors << i
+ end
+ end
+
+ ##
+ # Builds an interception context from this registry
+ #
+ # @return [InterceptionContext]
+ #
+ def build_context
+ InterceptionContext.new(@interceptors)
+ end
+ end
+end
diff --git a/src/ruby/lib/grpc/generic/interceptors.rb b/src/ruby/lib/grpc/generic/interceptors.rb
new file mode 100644
index 0000000000..73faec4b9c
--- /dev/null
+++ b/src/ruby/lib/grpc/generic/interceptors.rb
@@ -0,0 +1,186 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+require_relative 'interceptor_registry'
+
+# GRPC contains the General RPC module.
+module GRPC
+ ##
+ # Base class for interception in GRPC
+ #
+ class Interceptor
+ ##
+ # @param [Hash] options A hash of options that will be used
+ # by the interceptor. This is an EXPERIMENTAL API.
+ #
+ def initialize(options = {})
+ @options = options || {}
+ end
+ end
+
+ ##
+ # ClientInterceptor allows for wrapping outbound gRPC client stub requests.
+ # This is an EXPERIMENTAL API.
+ #
+ class ClientInterceptor < Interceptor
+ ##
+ # Intercept a unary request response call
+ #
+ # @param [Object] request
+ # @param [GRPC::ActiveCall] call
+ # @param [Method] method
+ # @param [Hash] metadata
+ #
+ def request_response(request:, call:, method:, metadata:)
+ GRPC.logger.debug "Intercepting request response method #{method}" \
+ " for request #{request} with call #{call} and metadata: #{metadata}"
+ yield
+ end
+
+ ##
+ # Intercept a client streaming call
+ #
+ # @param [Enumerable] requests
+ # @param [GRPC::ActiveCall] call
+ # @param [Method] method
+ # @param [Hash] metadata
+ #
+ def client_streamer(requests:, call:, method:, metadata:)
+ GRPC.logger.debug "Intercepting client streamer method #{method}" \
+ " for requests #{requests} with call #{call} and metadata: #{metadata}"
+ yield
+ end
+
+ ##
+ # Intercept a server streaming call
+ #
+ # @param [Object] request
+ # @param [GRPC::ActiveCall] call
+ # @param [Method] method
+ # @param [Hash] metadata
+ #
+ def server_streamer(request:, call:, method:, metadata:)
+ GRPC.logger.debug "Intercepting server streamer method #{method}" \
+ " for request #{request} with call #{call} and metadata: #{metadata}"
+ yield
+ end
+
+ ##
+ # Intercept a BiDi streaming call
+ #
+ # @param [Enumerable] requests
+ # @param [GRPC::ActiveCall] call
+ # @param [Method] method
+ # @param [Hash] metadata
+ #
+ def bidi_streamer(requests:, call:, method:, metadata:)
+ GRPC.logger.debug "Intercepting bidi streamer method #{method}" \
+ " for requests #{requests} with call #{call} and metadata: #{metadata}"
+ yield
+ end
+ end
+
+ ##
+ # ServerInterceptor allows for wrapping gRPC server execution handling.
+ # This is an EXPERIMENTAL API.
+ #
+ class ServerInterceptor < Interceptor
+ ##
+ # Intercept a unary request response call.
+ #
+ # @param [Object] request
+ # @param [GRPC::ActiveCall::SingleReqView] call
+ # @param [Method] method
+ #
+ def request_response(request:, call:, method:)
+ GRPC.logger.debug "Intercepting request response method #{method}" \
+ " for request #{request} with call #{call}"
+ yield
+ end
+
+ ##
+ # Intercept a client streaming call
+ #
+ # @param [GRPC::ActiveCall::MultiReqView] call
+ # @param [Method] method
+ #
+ def client_streamer(call:, method:)
+ GRPC.logger.debug "Intercepting client streamer method #{method}" \
+ " with call #{call}"
+ yield
+ end
+
+ ##
+ # Intercept a server streaming call
+ #
+ # @param [Object] request
+ # @param [GRPC::ActiveCall::SingleReqView] call
+ # @param [Method] method
+ #
+ def server_streamer(request:, call:, method:)
+ GRPC.logger.debug "Intercepting server streamer method #{method}" \
+ " for request #{request} with call #{call}"
+ yield
+ end
+
+ ##
+ # Intercept a BiDi streaming call
+ #
+ # @param [Enumerable<Object>] requests
+ # @param [GRPC::ActiveCall::MultiReqView] call
+ # @param [Method] method
+ #
+ def bidi_streamer(requests:, call:, method:)
+ GRPC.logger.debug "Intercepting bidi streamer method #{method}" \
+ " for requests #{requests} with call #{call}"
+ yield
+ end
+ end
+
+ ##
+ # Represents the context in which an interceptor runs. Used to provide an
+ # injectable mechanism for handling interception. This is an EXPERIMENTAL API.
+ #
+ class InterceptionContext
+ ##
+ # @param [Array<GRPC::Interceptor>]
+ #
+ def initialize(interceptors = [])
+ @interceptors = interceptors.dup
+ end
+
+ ##
+ # Intercept the call and fire out to interceptors in a FIFO execution.
+ # This is an EXPERIMENTAL API.
+ #
+ # @param [Symbol] type The request type
+ # @param [Hash] args The arguments for the call
+ #
+ def intercept!(type, args = {})
+ return yield if @interceptors.none?
+
+ i = @interceptors.pop
+ return yield unless i
+
+ i.send(type, args) do
+ if @interceptors.any?
+ intercept!(type, args) do
+ yield
+ end
+ else
+ yield
+ end
+ end
+ end
+ end
+end
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index 6fb6c412fb..5fd1805aab 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -47,43 +47,85 @@ module GRPC
proc { |o| unmarshal_class.method(unmarshal_method).call(o) }
end
- def handle_request_response(active_call, mth)
+ def handle_request_response(active_call, mth, inter_ctx)
req = active_call.read_unary_request
- resp = mth.call(req, active_call.single_req_view)
- active_call.server_unary_response(
- resp, trailing_metadata: active_call.output_metadata)
+ call = active_call.single_req_view
+
+ inter_ctx.intercept!(
+ :request_response,
+ method: mth,
+ call: call,
+ request: req
+ ) do
+ resp = mth.call(req, call)
+ active_call.server_unary_response(
+ resp,
+ trailing_metadata: active_call.output_metadata
+ )
+ end
end
- def handle_client_streamer(active_call, mth)
- resp = mth.call(active_call.multi_req_view)
- active_call.server_unary_response(
- resp, trailing_metadata: active_call.output_metadata)
+ def handle_client_streamer(active_call, mth, inter_ctx)
+ call = active_call.multi_req_view
+
+ inter_ctx.intercept!(
+ :client_streamer,
+ method: mth,
+ call: call
+ ) do
+ resp = mth.call(call)
+ active_call.server_unary_response(
+ resp,
+ trailing_metadata: active_call.output_metadata
+ )
+ end
end
- def handle_server_streamer(active_call, mth)
+ def handle_server_streamer(active_call, mth, inter_ctx)
req = active_call.read_unary_request
- replys = mth.call(req, active_call.single_req_view)
- replys.each { |r| active_call.remote_send(r) }
- send_status(active_call, OK, 'OK', active_call.output_metadata)
+ call = active_call.single_req_view
+
+ inter_ctx.intercept!(
+ :server_streamer,
+ method: mth,
+ call: call,
+ request: req
+ ) do
+ replies = mth.call(req, call)
+ replies.each { |r| active_call.remote_send(r) }
+ send_status(active_call, OK, 'OK', active_call.output_metadata)
+ end
end
- def handle_bidi_streamer(active_call, mth)
- active_call.run_server_bidi(mth)
+ ##
+ # @param [GRPC::ActiveCall] active_call
+ # @param [Method] mth
+ # @param [Array<GRPC::InterceptionContext>] inter_ctx
+ #
+ def handle_bidi_streamer(active_call, mth, inter_ctx)
+ active_call.run_server_bidi(mth, inter_ctx)
send_status(active_call, OK, 'OK', active_call.output_metadata)
end
- def run_server_method(active_call, mth)
+ ##
+ # @param [GRPC::ActiveCall] active_call The current active call object
+ # for the request
+ # @param [Method] mth The current RPC method being called
+ # @param [GRPC::InterceptionContext] inter_ctx The interception context
+ # being executed
+ #
+ def run_server_method(active_call, mth, inter_ctx = InterceptionContext.new)
# While a server method is running, it might be cancelled, its deadline
# might be reached, the handler could throw an unknown error, or a
# well-behaved handler could throw a StatusError.
if request_response?
- handle_request_response(active_call, mth)
+ handle_request_response(active_call, mth, inter_ctx)
elsif client_streamer?
- handle_client_streamer(active_call, mth)
+ handle_client_streamer(active_call, mth, inter_ctx)
elsif server_streamer?
- handle_server_streamer(active_call, mth)
+ handle_server_streamer(active_call, mth, inter_ctx)
else # is a bidi_stream
- handle_bidi_streamer(active_call, mth)
+ handle_bidi_streamer(active_call, mth, inter_ctx)
end
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application error
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 33b3cea1fc..d5fc11dc1c 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -196,11 +196,18 @@ module GRPC
#
# * server_args:
# A server arguments hash to be passed down to the underlying core server
+ #
+ # * interceptors:
+ # Am array of GRPC::ServerInterceptor objects that will be used for
+ # intercepting server handlers to provide extra functionality.
+ # Interceptors are an EXPERIMENTAL API.
+ #
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:DEFAULT_POLL_PERIOD,
connect_md_proc:nil,
- server_args:{})
+ server_args:{},
+ interceptors:[])
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@@ -212,6 +219,7 @@ module GRPC
# :stopped. State transitions can only proceed in that order.
@running_state = :not_started
@server = Core::Server.new(server_args)
+ @interceptors = InterceptorRegistry.new(interceptors)
end
# stops a running server
@@ -374,7 +382,11 @@ module GRPC
@pool.schedule(active_call) do |ac|
c, mth = ac
begin
- rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
+ rpc_descs[mth].run_server_method(
+ c,
+ rpc_handlers[mth],
+ @interceptors.build_context
+ )
rescue StandardError
c.send_status(GRPC::Core::StatusCodes::INTERNAL,
'Server handler failed')
@@ -382,7 +394,7 @@ module GRPC
end
end
rescue Core::CallError, RuntimeError => e
- # these might happen for various reasonse. The correct behaviour of
+ # these might happen for various reasons. The correct behavior of
# the server is to log them and continue, if it's not shutting down.
if running_state == :running
GRPC.logger.warn("server call failed: #{e}")