aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Tim Emiola <temiola@google.com>2015-04-02 21:45:26 -0700
committerGravatar Tim Emiola <temiola@google.com>2015-04-10 11:23:43 -0700
commitb22a21ebe7ba21c769046f5ccc0de0b15f1028ff (patch)
tree65eea9c75cd96c3d18abbdb03f57c7331bc98d17
parent5684b4073ce35b17f53a3250f37ead26b0b05e5d (diff)
Update RPC server to use the new call API
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb91
1 files changed, 44 insertions, 47 deletions
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 35e84023be..30a4bf1532 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -38,7 +38,7 @@ module GRPC
# RpcServer hosts a number of services and makes them available on the
# network.
class RpcServer
- include Core::CompletionType
+ include Core::CallOps
include Core::TimeConsts
extend ::Forwardable
@@ -202,20 +202,14 @@ module GRPC
end
@pool.start
@server.start
- server_tag = Object.new
+ request_call_tag = Object.new
until stopped?
- @server.request_call(server_tag)
- ev = @cq.pluck(server_tag, @poll_period)
- next if ev.nil?
- if ev.type != SERVER_RPC_NEW
- logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
- ev.close
- next
- end
- c = new_active_server_call(ev.call, ev.result)
+ deadline = from_relative_time(@poll_period)
+ an_rpc = @server.request_call(@cq, request_call_tag, deadline)
+ next if an_rpc.nil?
+ c = new_active_server_call(an_rpc)
unless c.nil?
- mth = ev.result.method.to_sym
- ev.close
+ mth = an_rpc.method.to_sym
@pool.schedule(c) do |call|
rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
end
@@ -224,46 +218,49 @@ module GRPC
@running = false
end
- def new_active_server_call(call, new_server_rpc)
- # Accept the call. This is necessary even if a status is to be sent
- # back immediately
- finished_tag = Object.new
- call_queue = Core::CompletionQueue.new
- call.metadata = new_server_rpc.metadata # store the metadata
- call.server_accept(call_queue, finished_tag)
- call.server_end_initial_metadata
-
- # Send UNAVAILABLE if there are too many unprocessed jobs
+ # Sends UNAVAILABLE if there are too many unprocessed jobs
+ def available?(an_rpc)
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
logger.info("waiting: #{jobs_count}, max: #{max}")
- if @pool.jobs_waiting > @max_waiting_requests
- logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}")
- noop = proc { |x| x }
- c = ActiveCall.new(call, call_queue, noop, noop,
- new_server_rpc.deadline,
- finished_tag: finished_tag)
- c.send_status(StatusCodes::UNAVAILABLE, '')
- return nil
- end
+ return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
+ logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
+ noop = proc { |x| x }
+ c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
+ c.send_status(StatusCodes::UNAVAILABLE, '')
+ nil
+ end
- # Send NOT_FOUND if the method does not exist
- mth = new_server_rpc.method.to_sym
- unless rpc_descs.key?(mth)
- logger.warn("NOT_FOUND: #{new_server_rpc}")
- noop = proc { |x| x }
- c = ActiveCall.new(call, call_queue, noop, noop,
- new_server_rpc.deadline,
- finished_tag: finished_tag)
- c.send_status(StatusCodes::NOT_FOUND, '')
- return nil
- end
+ # Sends NOT_FOUND if the method can't be found
+ def found?(an_rpc)
+ mth = an_rpc.method.to_sym
+ return an_rpc if rpc_descs.key?(mth)
+ logger.warn("NOT_FOUND: #{an_rpc}")
+ noop = proc { |x| x }
+ c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
+ c.send_status(StatusCodes::NOT_FOUND, '')
+ nil
+ end
+
+ def new_active_server_call(an_rpc)
+ # Accept the call. This is necessary even if a status is to be sent
+ # back immediately
+ return nil if an_rpc.nil? || an_rpc.call.nil?
+
+ # allow the metadata to be accessed from the call
+ handle_call_tag = Object.new
+ an_rpc.call.metadata = an_rpc.metadata
+ # TODO: add a hook to send md
+ an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE,
+ SEND_INITIAL_METADATA => nil)
+ return nil unless available?(an_rpc)
+ return nil unless found?(an_rpc)
# Create the ActiveCall
- rpc_desc = rpc_descs[mth]
- logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})")
- ActiveCall.new(call, call_queue,
+ logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
+ rpc_desc = rpc_descs[an_rpc.method.to_sym]
+ ActiveCall.new(an_rpc.call, @cq,
rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
- new_server_rpc.deadline, finished_tag: finished_tag)
+ an_rpc.deadline)
end
# Pool is a simple thread pool for running server requests.