From b22a21ebe7ba21c769046f5ccc0de0b15f1028ff Mon Sep 17 00:00:00 2001 From: Tim Emiola Date: Thu, 2 Apr 2015 21:45:26 -0700 Subject: Update RPC server to use the new call API --- src/ruby/lib/grpc/generic/rpc_server.rb | 91 ++++++++++++++++----------------- 1 file changed, 44 insertions(+), 47 deletions(-) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 35e84023be..30a4bf1532 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -38,7 +38,7 @@ module GRPC # RpcServer hosts a number of services and makes them available on the # network. class RpcServer - include Core::CompletionType + include Core::CallOps include Core::TimeConsts extend ::Forwardable @@ -202,20 +202,14 @@ module GRPC end @pool.start @server.start - server_tag = Object.new + request_call_tag = Object.new until stopped? - @server.request_call(server_tag) - ev = @cq.pluck(server_tag, @poll_period) - next if ev.nil? - if ev.type != SERVER_RPC_NEW - logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}") - ev.close - next - end - c = new_active_server_call(ev.call, ev.result) + deadline = from_relative_time(@poll_period) + an_rpc = @server.request_call(@cq, request_call_tag, deadline) + next if an_rpc.nil? + c = new_active_server_call(an_rpc) unless c.nil? - mth = ev.result.method.to_sym - ev.close + mth = an_rpc.method.to_sym @pool.schedule(c) do |call| rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) end @@ -224,46 +218,49 @@ module GRPC @running = false end - def new_active_server_call(call, new_server_rpc) - # Accept the call. This is necessary even if a status is to be sent - # back immediately - finished_tag = Object.new - call_queue = Core::CompletionQueue.new - call.metadata = new_server_rpc.metadata # store the metadata - call.server_accept(call_queue, finished_tag) - call.server_end_initial_metadata - - # Send UNAVAILABLE if there are too many unprocessed jobs + # Sends UNAVAILABLE if there are too many unprocessed jobs + def available?(an_rpc) jobs_count, max = @pool.jobs_waiting, @max_waiting_requests logger.info("waiting: #{jobs_count}, max: #{max}") - if @pool.jobs_waiting > @max_waiting_requests - logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}") - noop = proc { |x| x } - c = ActiveCall.new(call, call_queue, noop, noop, - new_server_rpc.deadline, - finished_tag: finished_tag) - c.send_status(StatusCodes::UNAVAILABLE, '') - return nil - end + return an_rpc if @pool.jobs_waiting <= @max_waiting_requests + logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") + noop = proc { |x| x } + c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) + c.send_status(StatusCodes::UNAVAILABLE, '') + nil + end - # Send NOT_FOUND if the method does not exist - mth = new_server_rpc.method.to_sym - unless rpc_descs.key?(mth) - logger.warn("NOT_FOUND: #{new_server_rpc}") - noop = proc { |x| x } - c = ActiveCall.new(call, call_queue, noop, noop, - new_server_rpc.deadline, - finished_tag: finished_tag) - c.send_status(StatusCodes::NOT_FOUND, '') - return nil - end + # Sends NOT_FOUND if the method can't be found + def found?(an_rpc) + mth = an_rpc.method.to_sym + return an_rpc if rpc_descs.key?(mth) + logger.warn("NOT_FOUND: #{an_rpc}") + noop = proc { |x| x } + c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) + c.send_status(StatusCodes::NOT_FOUND, '') + nil + end + + def new_active_server_call(an_rpc) + # Accept the call. This is necessary even if a status is to be sent + # back immediately + return nil if an_rpc.nil? || an_rpc.call.nil? + + # allow the metadata to be accessed from the call + handle_call_tag = Object.new + an_rpc.call.metadata = an_rpc.metadata + # TODO: add a hook to send md + an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE, + SEND_INITIAL_METADATA => nil) + return nil unless available?(an_rpc) + return nil unless found?(an_rpc) # Create the ActiveCall - rpc_desc = rpc_descs[mth] - logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})") - ActiveCall.new(call, call_queue, + logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") + rpc_desc = rpc_descs[an_rpc.method.to_sym] + ActiveCall.new(an_rpc.call, @cq, rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), - new_server_rpc.deadline, finished_tag: finished_tag) + an_rpc.deadline) end # Pool is a simple thread pool for running server requests. -- cgit v1.2.3