# Copyright 2015 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require_relative '../grpc' require_relative 'active_call' require_relative 'service' require 'thread' # GRPC contains the General RPC module. module GRPC # Pool is a simple thread pool. class Pool # Default keep alive period is 1s DEFAULT_KEEP_ALIVE = 1 def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) fail 'pool size must be positive' unless size > 0 @jobs = Queue.new @size = size @stopped = false @stop_mutex = Mutex.new # needs to be held when accessing @stopped @stop_cond = ConditionVariable.new @workers = [] @keep_alive = keep_alive # Each worker thread has its own queue to push and pull jobs # these queues are put into @ready_queues when that worker is idle @ready_workers = Queue.new end # Returns the number of jobs waiting def jobs_waiting @jobs.size end def ready_for_work? # Busy worker threads are either doing work, or have a single job # waiting on them. Workers that are idle with no jobs waiting # have their "queues" in @ready_workers !@ready_workers.empty? end # Runs the given block on the queue with the provided args. # # @param args the args passed blk when it is called # @param blk the block to call def schedule(*args, &blk) return if blk.nil? @stop_mutex.synchronize do if @stopped GRPC.logger.warn('did not schedule job, already stopped') return end GRPC.logger.info('schedule another job') fail 'No worker threads available' if @ready_workers.empty? worker_queue = @ready_workers.pop fail 'worker already has a task waiting' unless worker_queue.empty? worker_queue << [blk, args] end end # Starts running the jobs in the thread pool. def start @stop_mutex.synchronize do fail 'already stopped' if @stopped end until @workers.size == @size.to_i new_worker_queue = Queue.new @ready_workers << new_worker_queue next_thread = Thread.new(new_worker_queue) do |jobs| catch(:exit) do # allows { throw :exit } to kill a thread loop_execute_jobs(jobs) end remove_current_thread end @workers << next_thread end end # Stops the jobs in the pool def stop GRPC.logger.info('stopping, will wait for all the workers to exit') @stop_mutex.synchronize do # wait @keep_alive seconds for workers to stop @stopped = true loop do break unless ready_for_work? worker_queue = @ready_workers.pop worker_queue << [proc { throw :exit }, []] end @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end forcibly_stop_workers GRPC.logger.info('stopped, all workers are shutdown') end protected # Forcibly shutdown any threads that are still alive. def forcibly_stop_workers return unless @workers.size > 0 GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)") @workers.each do |t| next unless t.alive? begin t.exit rescue StandardError => e GRPC.logger.warn('error while terminating a worker') GRPC.logger.warn(e) end end end # removes the threads from workers, and signal when all the # threads are complete. def remove_current_thread @stop_mutex.synchronize do @workers.delete(Thread.current) @stop_cond.signal if @workers.size.zero? end end def loop_execute_jobs(worker_queue) loop do begin blk, args = worker_queue.pop blk.call(*args) rescue StandardError, GRPC::Core::CallError => e GRPC.logger.warn('Error in worker thread') GRPC.logger.warn(e) end # there shouldn't be any work given to this thread while its busy fail('received a task while busy') unless worker_queue.empty? @stop_mutex.synchronize do return if @stopped @ready_workers << worker_queue end end end end # RpcServer hosts a number of services and makes them available on the # network. class RpcServer include Core::CallOps include Core::TimeConsts extend ::Forwardable def_delegators :@server, :add_http2_port # Default thread pool size is 30 DEFAULT_POOL_SIZE = 30 # Deprecated due to internal changes to the thread pool DEFAULT_MAX_WAITING_REQUESTS = 20 # Default poll period is 1s DEFAULT_POLL_PERIOD = 1 # Signal check period is 0.25s SIGNAL_CHECK_PERIOD = 0.25 # setup_connect_md_proc is used by #initialize to validate the # connect_md_proc. def self.setup_connect_md_proc(a_proc) return nil if a_proc.nil? fail(TypeError, '!Proc') unless a_proc.is_a? Proc a_proc end # Creates a new RpcServer. # # The RPC server is configured using keyword arguments. # # There are some specific keyword args used to configure the RpcServer # instance. # # * pool_size: the size of the thread pool the server uses to run its # threads. No more concurrent requests can be made than the size # of the thread pool # # * max_waiting_requests: Deprecated due to internal changes to the thread # pool. This is still an argument for compatibility but is ignored. # # * poll_period: The amount of time in seconds to wait for # currently-serviced RPC's to finish before cancelling them when shutting # down the server. # # * pool_keep_alive: The amount of time in seconds to wait # for currently busy thread-pool threads to finish before # forcing an abrupt exit to each thread. # # * connect_md_proc: # when non-nil is a proc for determining metadata to to send back the client # on receiving an invocation req. The proc signature is: # {key: val, ..} func(method_name, {key: val, ...}) # # * server_args: # A server arguments hash to be passed down to the underlying core server # # * interceptors: # Am array of GRPC::ServerInterceptor objects that will be used for # intercepting server handlers to provide extra functionality. # Interceptors are an EXPERIMENTAL API. # def initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, pool_keep_alive: GRPC::RpcServer::DEFAULT_POOL_SIZE, connect_md_proc: nil, server_args: {}, interceptors: []) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @pool_size = pool_size @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive) @run_cond = ConditionVariable.new @run_mutex = Mutex.new # running_state can take 4 values: :not_started, :running, :stopping, and # :stopped. State transitions can only proceed in that order. @running_state = :not_started @server = Core::Server.new(server_args) @interceptors = InterceptorRegistry.new(interceptors) end # stops a running server # # the call has no impact if the server is already stopped, otherwise # server's current call loop is it's last. def stop @run_mutex.synchronize do fail 'Cannot stop before starting' if @running_state == :not_started return if @running_state != :running transition_running_state(:stopping) deadline = from_relative_time(@poll_period) @server.shutdown_and_notify(deadline) end @pool.stop end def running_state @run_mutex.synchronize do return @running_state end end # Can only be called while holding @run_mutex def transition_running_state(target_state) state_transitions = { not_started: :running, running: :stopping, stopping: :stopped } if state_transitions[@running_state] == target_state @running_state = target_state else fail "Bad server state transition: #{@running_state}->#{target_state}" end end def running? running_state == :running end def stopped? running_state == :stopped end # Is called from other threads to wait for #run to start up the server. # # If run has not been called, this returns immediately. # # @param timeout [Numeric] number of seconds to wait # @return [true, false] true if the server is running, false otherwise def wait_till_running(timeout = nil) @run_mutex.synchronize do @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started return @running_state == :running end end # handle registration of classes # # service is either a class that includes GRPC::GenericService and whose # #new function can be called without argument or any instance of such a # class. # # E.g, after # # class Divider # include GRPC::GenericService # rpc :div DivArgs, DivReply # single request, single response # def initialize(optional_arg='default option') # no args # ... # end # # srv = GRPC::RpcServer.new(...) # # # Either of these works # # srv.handle(Divider) # # # or # # srv.handle(Divider.new('replace optional arg')) # # It raises RuntimeError: # - if service is not valid service class or object # - its handler methods are already registered # - if the server is already running # # @param service [Object|Class] a service class or object as described # above def handle(service) @run_mutex.synchronize do unless @running_state == :not_started fail 'cannot add services if the server has been started' end cls = service.is_a?(Class) ? service : service.class assert_valid_service_class(cls) add_rpc_descs_for(service) end end # runs the server # # - if no rpc_descs are registered, this exits immediately, otherwise it # continues running permanently and does not return until program exit. # # - #running? returns true after this is called, until #stop cause the # the server to stop. def run @run_mutex.synchronize do fail 'cannot run without registering services' if rpc_descs.size.zero? @pool.start @server.start transition_running_state(:running) @run_cond.broadcast end loop_handle_server_calls end alias_method :run_till_terminated, :run # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs def available?(an_rpc) return an_rpc if @pool.ready_for_work? GRPC.logger.warn('no free worker threads currently') noop = proc { |x| x } # Create a new active call that knows that metadata hasn't been # sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, 'No free threads in thread pool') nil end # Sends UNIMPLEMENTED if the method is not implemented by this server def implemented?(an_rpc) mth = an_rpc.method.to_sym return an_rpc if rpc_descs.key?(mth) GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") noop = proc { |x| x } # Create a new active call that knows that # metadata hasn't been sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') nil end # handles calls to the server def loop_handle_server_calls fail 'not started' if running_state == :not_started while running_state == :running begin an_rpc = @server.request_call break if (!an_rpc.nil?) && an_rpc.call.nil? active_call = new_active_server_call(an_rpc) unless active_call.nil? @pool.schedule(active_call) do |ac| c, mth = ac begin rpc_descs[mth].run_server_method( c, rpc_handlers[mth], @interceptors.build_context ) rescue StandardError c.send_status(GRPC::Core::StatusCodes::INTERNAL, 'Server handler failed') end end end rescue Core::CallError, RuntimeError => e # these might happen for various reasons. The correct behavior of # the server is to log them and continue, if it's not shutting down. if running_state == :running GRPC.logger.warn("server call failed: #{e}") end next end end # @running_state should be :stopping here @run_mutex.synchronize do transition_running_state(:stopped) GRPC.logger.info("stopped: #{self}") @server.close end end def new_active_server_call(an_rpc) return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) end return nil unless available?(an_rpc) return nil unless implemented?(an_rpc) # Create the ActiveCall. Indicate that metadata hasnt been sent yet. GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), an_rpc.deadline, metadata_received: true, started: false, metadata_to_send: connect_md) c.attach_peer_cert(an_rpc.call.peer_cert) mth = an_rpc.method.to_sym [c, mth] end protected def rpc_descs @rpc_descs ||= {} end def rpc_handlers @rpc_handlers ||= {} end def assert_valid_service_class(cls) unless cls.include?(GenericService) fail "#{cls} must 'include GenericService'" end fail "#{cls} should specify some rpc descriptions" if cls.rpc_descs.size.zero? end # This should be called while holding @run_mutex def add_rpc_descs_for(service) cls = service.is_a?(Class) ? service : service.class specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {}) cls.rpc_descs.each_pair do |name, spec| route = "/#{cls.service_name}/#{name}".to_sym fail "already registered: rpc #{route} from #{spec}" if specs.key? route specs[route] = spec rpc_name = GenericService.underscore(name.to_s).to_sym if service.is_a?(Class) handlers[route] = cls.new.method(rpc_name) else handlers[route] = service.method(rpc_name) end GRPC.logger.info("handling #{route} with #{handlers[route]}") end end end end