diff options
author | Tim Emiola <temiola@google.com> | 2015-04-16 14:14:34 -0700 |
---|---|---|
committer | Tim Emiola <temiola@google.com> | 2015-04-16 14:27:34 -0700 |
commit | e6be7f31e45ecc180a3dff7c2d0d6ae8f72017df (patch) | |
tree | 977c509b4bd94cc17c342142fd20c25b32e6421b /src/ruby/lib | |
parent | bf6d78c8e4908670de1fe02eaa639dde93166816 (diff) |
Refactor: Move the Pool out from RpcServer
Diffstat (limited to 'src/ruby/lib')
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 184 |
1 files changed, 97 insertions, 87 deletions
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index bc2211ef7e..6910d95283 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -38,6 +38,103 @@ $grpc_signals = [] # GRPC contains the General RPC module. module GRPC + # Pool is a simple thread pool. + class Pool + # Default keep alive period is 1s + DEFAULT_KEEP_ALIVE = 1 + + def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) + fail 'pool size must be positive' unless size > 0 + @jobs = Queue.new + @size = size + @stopped = false + @stop_mutex = Mutex.new + @stop_cond = ConditionVariable.new + @workers = [] + @keep_alive = keep_alive + end + + # Returns the number of jobs waiting + def jobs_waiting + @jobs.size + end + + # Runs the given block on the queue with the provided args. + # + # @param args the args passed blk when it is called + # @param blk the block to call + def schedule(*args, &blk) + fail 'already stopped' if @stopped + return if blk.nil? + logger.info('schedule another job') + @jobs << [blk, args] + end + + # Starts running the jobs in the thread pool. + def start + fail 'already stopped' if @stopped + until @workers.size == @size.to_i + next_thread = Thread.new do + catch(:exit) do # allows { throw :exit } to kill a thread + loop_execute_jobs + end + remove_current_thread + end + @workers << next_thread + end + end + + # Stops the jobs in the pool + def stop + logger.info('stopping, will wait for all the workers to exit') + @workers.size.times { schedule { throw :exit } } + @stopped = true + @stop_mutex.synchronize do # wait @keep_alive for works to stop + @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 + end + forcibly_stop_workers + logger.info('stopped, all workers are shutdown') + end + + protected + + # Forcibly shutdown any threads that are still alive. + def forcibly_stop_workers + return unless @workers.size > 0 + logger.info("forcibly terminating #{@workers.size} worker(s)") + @workers.each do |t| + next unless t.alive? + begin + t.exit + rescue StandardError => e + logger.warn('error while terminating a worker') + logger.warn(e) + end + end + end + + # removes the threads from workers, and signal when all the + # threads are complete. + def remove_current_thread + @stop_mutex.synchronize do + @workers.delete(Thread.current) + @stop_cond.signal if @workers.size == 0 + end + end + + def loop_execute_jobs + loop do + begin + blk, args = @jobs.pop + blk.call(*args) + rescue StandardError => e + logger.warn('Error in worker thread') + logger.warn(e) + end + end + end + end + # RpcServer hosts a number of services and makes them available on the # network. class RpcServer @@ -320,93 +417,6 @@ module GRPC an_rpc.deadline) end - # Pool is a simple thread pool for running server requests. - class Pool - # Default keep alive period is 1s - DEFAULT_KEEP_ALIVE = 1 - - def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) - fail 'pool size must be positive' unless size > 0 - @jobs = Queue.new - @size = size - @stopped = false - @stop_mutex = Mutex.new - @stop_cond = ConditionVariable.new - @workers = [] - @keep_alive = keep_alive - end - - # Returns the number of jobs waiting - def jobs_waiting - @jobs.size - end - - # Runs the given block on the queue with the provided args. - # - # @param args the args passed blk when it is called - # @param blk the block to call - def schedule(*args, &blk) - fail 'already stopped' if @stopped - return if blk.nil? - logger.info('schedule another job') - @jobs << [blk, args] - end - - # Starts running the jobs in the thread pool. - def start - fail 'already stopped' if @stopped - until @workers.size == @size.to_i - next_thread = Thread.new do - catch(:exit) do # allows { throw :exit } to kill a thread - loop do - begin - blk, args = @jobs.pop - blk.call(*args) - rescue StandardError => e - logger.warn('Error in worker thread') - logger.warn(e) - end - end - end - - # removes the threads from workers, and signal when all the - # threads are complete. - @stop_mutex.synchronize do - @workers.delete(Thread.current) - @stop_cond.signal if @workers.size == 0 - end - end - @workers << next_thread - end - end - - # Stops the jobs in the pool - def stop - logger.info('stopping, will wait for all the workers to exit') - @workers.size.times { schedule { throw :exit } } - @stopped = true - - @stop_mutex.synchronize do - @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 - end - - # Forcibly shutdown any threads that are still alive. - if @workers.size > 0 - logger.info("forcibly terminating #{@workers.size} worker(s)") - @workers.each do |t| - next unless t.alive? - begin - t.exit - rescue StandardError => e - logger.warn('error while terminating a worker') - logger.warn(e) - end - end - end - logger.info('stopped, all workers are shutdown') - end - end - protected def rpc_descs |