diff options
author | Jan Tattermusch <jtattermusch@users.noreply.github.com> | 2015-04-20 13:16:00 -0700 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@users.noreply.github.com> | 2015-04-20 13:16:00 -0700 |
commit | 291800b78f73ad974a4e472555ac4845a2b32aa0 (patch) | |
tree | 423fe665a68350b46f8872a89628678702af922a /src/ruby/lib | |
parent | 3bb40596c5303d7a7055504e20a53c465c0ac8e4 (diff) | |
parent | a80aa7d86a1aa1ae64780c23341fecfb06fac640 (diff) |
Merge pull request #1309 from tbetbetbe/grpc_ruby_rpc_server_md
Grpc ruby add support for returning metadata to the rpc server
Diffstat (limited to 'src/ruby/lib')
-rw-r--r-- | src/ruby/lib/grpc/errors.rb | 7 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 34 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_desc.rb | 12 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 334 |
4 files changed, 216 insertions, 171 deletions
diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb index 35e9c02a94..f1201c1704 100644 --- a/src/ruby/lib/grpc/errors.rb +++ b/src/ruby/lib/grpc/errors.rb @@ -36,14 +36,15 @@ module GRPC # error should be returned to the other end of a GRPC connection; when # caught it means that this end received a status error. class BadStatus < StandardError - attr_reader :code, :details + attr_reader :code, :details, :metadata # @param code [Numeric] the status code # @param details [String] the details of the exception - def initialize(code, details = 'unknown cause') + def initialize(code, details = 'unknown cause', **kw) super("#{code}:#{details}") @code = code @details = details + @metadata = kw end # Converts the exception to a GRPC::Status for use in the networking @@ -51,7 +52,7 @@ module GRPC # # @return [Status] with the same code and details def to_status - Status.new(code, details) + Struct::Status.new(code, details, @metadata) end end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 8d63de4145..43ba549905 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -39,7 +39,10 @@ class Struct return nil if status.nil? fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED if status.code != GRPC::Core::StatusCodes::OK - fail GRPC::BadStatus.new(status.code, status.details) + # raise BadStatus, propagating the metadata if present. + md = status.metadata + with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }] + fail GRPC::BadStatus.new(status.code, status.details, **with_sym_keys) end status end @@ -119,6 +122,12 @@ module GRPC @metadata_tag = metadata_tag end + # output_metadata are provides access to hash that can be used to + # save metadata to be sent as trailer + def output_metadata + @output_metadata ||= {} + end + # multi_req_view provides a restricted view of this ActiveCall for use # in a server client-streaming handler. def multi_req_view @@ -161,10 +170,12 @@ module GRPC def finished batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, RECV_STATUS_ON_CLIENT => nil) - if @call.metadata.nil? - @call.metadata = batch_result.metadata - elsif !batch_result.metadata.nil? - @call.metadata.merge!(batch_result.metadata) + unless batch_result.status.nil? + if @call.metadata.nil? + @call.metadata = batch_result.status.metadata + else + @call.metadata.merge!(batch_result.status.metadata) + end end batch_result.check_status end @@ -192,9 +203,13 @@ module GRPC # @param details [String] details # @param assert_finished [true, false] when true(default), waits for # FINISHED. - def send_status(code = OK, details = '', assert_finished = false) + # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + def send_status(code = OK, details = '', assert_finished = false, **kw) ops = { - SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details) + SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, kw) } ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished @call.run_batch(@cq, self, INFINITE_FUTURE, ops) @@ -438,12 +453,13 @@ module GRPC # SingleReqView limits access to an ActiveCall's methods for use in server # handlers that receive just one request. - SingleReqView = view_class(:cancelled, :deadline, :metadata) + SingleReqView = view_class(:cancelled, :deadline, :metadata, + :output_metadata) # MultiReqView limits access to an ActiveCall's methods for use in # server client_streamer handlers. MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg, - :each_remote_read, :metadata) + :each_remote_read, :metadata, :output_metadata) # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 3e48b8e51d..10211ae239 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -80,12 +80,12 @@ module GRPC else # is a bidi_stream active_call.run_server_bidi(mth) end - send_status(active_call, OK, 'OK') + send_status(active_call, OK, 'OK', **active_call.output_metadata) rescue BadStatus => e - # this is raised by handlers that want GRPC to send an application - # error code and detail message. + # this is raised by handlers that want GRPC to send an application error + # code and detail message and some additional app-specific metadata. logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}") - send_status(active_call, e.code, e.details) + send_status(active_call, e.code, e.details, **e.metadata) rescue Core::CallError => e # This is raised by GRPC internals but should rarely, if ever happen. # Log it, but don't notify the other endpoint.. @@ -135,9 +135,9 @@ module GRPC "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}" end - def send_status(active_client, code, details) + def send_status(active_client, code, details, **kw) details = 'Not sure why' if details.nil? - active_client.send_status(code, details, code == OK) + active_client.send_status(code, details, code == OK, **kw) rescue StandardError => e logger.warn("Could not send status #{code}:#{details}") logger.warn(e) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index c7c8267fa3..88c24aa92b 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -37,6 +37,120 @@ $grpc_signals = [] # GRPC contains the General RPC module. module GRPC + # Handles the signals in $grpc_signals. + # + # @return false if the server should exit, true if not. + def handle_signals + loop do + sig = $grpc_signals.shift + case sig + when 'INT' + return false + when 'TERM' + return false + end + end + true + end + module_function :handle_signals + + # Pool is a simple thread pool. + class Pool + # Default keep alive period is 1s + DEFAULT_KEEP_ALIVE = 1 + + def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) + fail 'pool size must be positive' unless size > 0 + @jobs = Queue.new + @size = size + @stopped = false + @stop_mutex = Mutex.new + @stop_cond = ConditionVariable.new + @workers = [] + @keep_alive = keep_alive + end + + # Returns the number of jobs waiting + def jobs_waiting + @jobs.size + end + + # Runs the given block on the queue with the provided args. + # + # @param args the args passed blk when it is called + # @param blk the block to call + def schedule(*args, &blk) + fail 'already stopped' if @stopped + return if blk.nil? + logger.info('schedule another job') + @jobs << [blk, args] + end + + # Starts running the jobs in the thread pool. + def start + fail 'already stopped' if @stopped + until @workers.size == @size.to_i + next_thread = Thread.new do + catch(:exit) do # allows { throw :exit } to kill a thread + loop_execute_jobs + end + remove_current_thread + end + @workers << next_thread + end + end + + # Stops the jobs in the pool + def stop + logger.info('stopping, will wait for all the workers to exit') + @workers.size.times { schedule { throw :exit } } + @stopped = true + @stop_mutex.synchronize do # wait @keep_alive for works to stop + @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 + end + forcibly_stop_workers + logger.info('stopped, all workers are shutdown') + end + + protected + + # Forcibly shutdown any threads that are still alive. + def forcibly_stop_workers + return unless @workers.size > 0 + logger.info("forcibly terminating #{@workers.size} worker(s)") + @workers.each do |t| + next unless t.alive? + begin + t.exit + rescue StandardError => e + logger.warn('error while terminating a worker') + logger.warn(e) + end + end + end + + # removes the threads from workers, and signal when all the + # threads are complete. + def remove_current_thread + @stop_mutex.synchronize do + @workers.delete(Thread.current) + @stop_cond.signal if @workers.size == 0 + end + end + + def loop_execute_jobs + loop do + begin + blk, args = @jobs.pop + blk.call(*args) + rescue StandardError => e + logger.warn('Error in worker thread') + logger.warn(e) + end + end + end + end + # RpcServer hosts a number of services and makes them available on the # network. class RpcServer @@ -69,6 +183,32 @@ module GRPC %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } end + # setup_cq is used by #initialize to constuct a Core::CompletionQueue from + # its arguments. + def self.setup_cq(alt_cq) + return Core::CompletionQueue.new if alt_cq.nil? + unless alt_cq.is_a? Core::CompletionQueue + fail(TypeError, '!CompletionQueue') + end + alt_cq + end + + # setup_srv is used by #initialize to constuct a Core::Server from its + # arguments. + def self.setup_srv(alt_srv, cq, **kw) + return Core::Server.new(cq, kw) if alt_srv.nil? + fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server + alt_srv + end + + # setup_connect_md_proc is used by #initialize to validate the + # connect_md_proc. + def self.setup_connect_md_proc(a_proc) + return nil if a_proc.nil? + fail(TypeError, '!Proc') unless a_proc.is_a? Proc + a_proc + end + # Creates a new RpcServer. # # The RPC server is configured using keyword arguments. @@ -96,30 +236,21 @@ module GRPC # * max_waiting_requests: the maximum number of requests that are not # being handled to allow. When this limit is exceeded, the server responds # with not available to new requests + # + # * connect_md_proc: + # when non-nil is a proc for determining metadata to to send back the client + # on receiving an invocation req. The proc signature is: + # {key: val, ..} func(method_name, {key: val, ...}) def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, completion_queue_override:nil, server_override:nil, + connect_md_proc:nil, **kw) - if completion_queue_override.nil? - cq = Core::CompletionQueue.new - else - cq = completion_queue_override - unless cq.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') - end - end - @cq = cq - - if server_override.nil? - srv = Core::Server.new(@cq, kw) - else - srv = server_override - fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server - end - @server = srv - + @cq = RpcServer.setup_cq(completion_queue_override) + @server = RpcServer.setup_srv(server_override, @cq, **kw) + @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @pool_size = pool_size @max_waiting_requests = max_waiting_requests @poll_period = poll_period @@ -179,22 +310,6 @@ module GRPC t.join end - # Handles the signals in $grpc_signals. - # - # @return false if the server should exit, true if not. - def handle_signals - loop do - sig = $grpc_signals.shift - case sig - when 'INT' - return false - when 'TERM' - return false - end - end - true - end - # Determines if the server is currently stopped def stopped? @stopped ||= false @@ -258,19 +373,7 @@ module GRPC end @pool.start @server.start - request_call_tag = Object.new - until stopped? - deadline = from_relative_time(@poll_period) - an_rpc = @server.request_call(@cq, request_call_tag, deadline) - next if an_rpc.nil? - c = new_active_server_call(an_rpc) - unless c.nil? - mth = an_rpc.method.to_sym - @pool.schedule(c) do |call| - rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) - end - end - end + loop_handle_server_calls @running = false end @@ -297,17 +400,35 @@ module GRPC nil end + # handles calls to the server + def loop_handle_server_calls + fail 'not running' unless @running + request_call_tag = Object.new + until stopped? + deadline = from_relative_time(@poll_period) + an_rpc = @server.request_call(@cq, request_call_tag, deadline) + c = new_active_server_call(an_rpc) + unless c.nil? + mth = an_rpc.method.to_sym + @pool.schedule(c) do |call| + rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) + end + end + end + end + def new_active_server_call(an_rpc) - # Accept the call. This is necessary even if a status is to be sent - # back immediately return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call handle_call_tag = Object.new - an_rpc.call.metadata = an_rpc.metadata - # TODO: add a hook to send md + an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers + connect_md = nil + unless @connect_md_proc.nil? + connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) + end an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE, - SEND_INITIAL_METADATA => nil) + SEND_INITIAL_METADATA => connect_md) return nil unless available?(an_rpc) return nil unless found?(an_rpc) @@ -319,93 +440,6 @@ module GRPC an_rpc.deadline) end - # Pool is a simple thread pool for running server requests. - class Pool - # Default keep alive period is 1s - DEFAULT_KEEP_ALIVE = 1 - - def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) - fail 'pool size must be positive' unless size > 0 - @jobs = Queue.new - @size = size - @stopped = false - @stop_mutex = Mutex.new - @stop_cond = ConditionVariable.new - @workers = [] - @keep_alive = keep_alive - end - - # Returns the number of jobs waiting - def jobs_waiting - @jobs.size - end - - # Runs the given block on the queue with the provided args. - # - # @param args the args passed blk when it is called - # @param blk the block to call - def schedule(*args, &blk) - fail 'already stopped' if @stopped - return if blk.nil? - logger.info('schedule another job') - @jobs << [blk, args] - end - - # Starts running the jobs in the thread pool. - def start - fail 'already stopped' if @stopped - until @workers.size == @size.to_i - next_thread = Thread.new do - catch(:exit) do # allows { throw :exit } to kill a thread - loop do - begin - blk, args = @jobs.pop - blk.call(*args) - rescue StandardError => e - logger.warn('Error in worker thread') - logger.warn(e) - end - end - end - - # removes the threads from workers, and signal when all the - # threads are complete. - @stop_mutex.synchronize do - @workers.delete(Thread.current) - @stop_cond.signal if @workers.size == 0 - end - end - @workers << next_thread - end - end - - # Stops the jobs in the pool - def stop - logger.info('stopping, will wait for all the workers to exit') - @workers.size.times { schedule { throw :exit } } - @stopped = true - - @stop_mutex.synchronize do - @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 - end - - # Forcibly shutdown any threads that are still alive. - if @workers.size > 0 - logger.info("forcibly terminating #{@workers.size} worker(s)") - @workers.each do |t| - next unless t.alive? - begin - t.exit - rescue StandardError => e - logger.warn('error while terminating a worker') - logger.warn(e) - end - end - end - logger.info('stopped, all workers are shutdown') - end - end - protected def rpc_descs @@ -416,11 +450,9 @@ module GRPC @rpc_handlers ||= {} end - private - def assert_valid_service_class(cls) unless cls.include?(GenericService) - fail "#{cls} should 'include GenericService'" + fail "#{cls} must 'include GenericService'" end if cls.rpc_descs.size == 0 fail "#{cls} should specify some rpc descriptions" @@ -430,21 +462,17 @@ module GRPC def add_rpc_descs_for(service) cls = service.is_a?(Class) ? service : service.class - specs = rpc_descs - handlers = rpc_handlers + specs, handlers = rpc_descs, rpc_handlers cls.rpc_descs.each_pair do |name, spec| route = "/#{cls.service_name}/#{name}".to_sym - if specs.key? route - fail "Cannot add rpc #{route} from #{spec}, already registered" + fail "already registered: rpc #{route} from #{spec}" if specs.key? route + specs[route] = spec + if service.is_a?(Class) + handlers[route] = cls.new.method(name.to_s.underscore.to_sym) else - specs[route] = spec - if service.is_a?(Class) - handlers[route] = cls.new.method(name.to_s.underscore.to_sym) - else - handlers[route] = service.method(name.to_s.underscore.to_sym) - end - logger.info("handling #{route} with #{handlers[route]}") + handlers[route] = service.method(name.to_s.underscore.to_sym) end + logger.info("handling #{route} with #{handlers[route]}") end end end |