aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/lib
diff options
context:
space:
mode:
authorGravatar Tim Emiola <temiola@google.com>2015-04-16 17:43:59 -0700
committerGravatar Tim Emiola <temiola@google.com>2015-04-17 12:40:59 -0700
commit3fd2be2e32ab9edf2030f06c707e0c5cca140755 (patch)
treefdf3bb38ae4ec596db812993145878d1d838c1bf /src/ruby/lib
parentf9e77b3972cc84a62c76e85d6112a91db6424e7d (diff)
Adds a hook for returning the client connect metadata
Diffstat (limited to 'src/ruby/lib')
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb59
1 files changed, 38 insertions, 21 deletions
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 8d91c31a65..aa6c7e0989 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -38,6 +38,23 @@ $grpc_signals = []
# GRPC contains the General RPC module.
module GRPC
+ # Handles the signals in $grpc_signals.
+ #
+ # @return false if the server should exit, true if not.
+ def handle_signals
+ loop do
+ sig = $grpc_signals.shift
+ case sig
+ when 'INT'
+ return false
+ when 'TERM'
+ return false
+ end
+ end
+ true
+ end
+ module_function :handle_signals
+
# Pool is a simple thread pool.
class Pool
# Default keep alive period is 1s
@@ -185,6 +202,14 @@ module GRPC
alt_srv
end
+ # setup_connect_md_proc is used by #initialize to validate the
+ # connect_md_proc.
+ def self.setup_connect_md_proc(a_proc)
+ return nil if a_proc.nil?
+ fail(TypeError, '!Proc') unless a_proc.is_a? Proc
+ a_proc
+ end
+
# Creates a new RpcServer.
#
# The RPC server is configured using keyword arguments.
@@ -212,14 +237,21 @@ module GRPC
# * max_waiting_requests: the maximum number of requests that are not
# being handled to allow. When this limit is exceeded, the server responds
# with not available to new requests
+ #
+ # * connect_md_proc:
+ # when non-nil is a proc for determining metadata to to send back the client
+ # on receiving an invocation req. The proc signature is:
+ # {key: val, ..} func(method_name, {key: val, ...})
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:DEFAULT_POLL_PERIOD,
completion_queue_override:nil,
server_override:nil,
+ connect_md_proc:nil,
**kw)
@cq = RpcServer.setup_cq(completion_queue_override)
@server = RpcServer.setup_srv(server_override, @cq, **kw)
+ @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
@pool_size = pool_size
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@@ -279,22 +311,6 @@ module GRPC
t.join
end
- # Handles the signals in $grpc_signals.
- #
- # @return false if the server should exit, true if not.
- def handle_signals
- loop do
- sig = $grpc_signals.shift
- case sig
- when 'INT'
- return false
- when 'TERM'
- return false
- end
- end
- true
- end
-
# Determines if the server is currently stopped
def stopped?
@stopped ||= false
@@ -403,16 +419,17 @@ module GRPC
end
def new_active_server_call(an_rpc)
- # Accept the call. This is necessary even if a status is to be sent
- # back immediately
return nil if an_rpc.nil? || an_rpc.call.nil?
# allow the metadata to be accessed from the call
handle_call_tag = Object.new
- an_rpc.call.metadata = an_rpc.metadata
- # TODO: add a hook to send md
+ an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers
+ connect_md = nil
+ unless @connect_md_proc.nil?
+ connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
+ end
an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE,
- SEND_INITIAL_METADATA => nil)
+ SEND_INITIAL_METADATA => connect_md)
return nil unless available?(an_rpc)
return nil unless found?(an_rpc)