diff options
Diffstat (limited to 'src/ruby/lib/grpc/generic/rpc_server.rb')
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 40 |
1 files changed, 11 insertions, 29 deletions
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index ab7333d133..c92a532a50 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -159,16 +159,6 @@ module GRPC # Signal check period is 0.25s SIGNAL_CHECK_PERIOD = 0.25 - # setup_cq is used by #initialize to constuct a Core::CompletionQueue from - # its arguments. - def self.setup_cq(alt_cq) - return Core::CompletionQueue.new if alt_cq.nil? - unless alt_cq.is_a? Core::CompletionQueue - fail(TypeError, '!CompletionQueue') - end - alt_cq - end - # setup_connect_md_proc is used by #initialize to validate the # connect_md_proc. def self.setup_connect_md_proc(a_proc) @@ -191,10 +181,6 @@ module GRPC # * pool_size: the size of the thread pool the server uses to run its # threads # - # * completion_queue_override: when supplied, this will be used as the - # completion_queue that the server uses to receive network events, - # otherwise its creates a new instance itself - # # * creds: [GRPC::Core::ServerCredentials] # the credentials used to secure the server # @@ -212,11 +198,9 @@ module GRPC def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, - completion_queue_override:nil, connect_md_proc:nil, server_args:{}) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) - @cq = RpcServer.setup_cq(completion_queue_override) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @pool_size = pool_size @@ -226,7 +210,7 @@ module GRPC # running_state can take 4 values: :not_started, :running, :stopping, and # :stopped. State transitions can only proceed in that order. @running_state = :not_started - @server = Core::Server.new(@cq, server_args) + @server = Core::Server.new(server_args) end # stops a running server @@ -240,7 +224,7 @@ module GRPC transition_running_state(:stopping) end deadline = from_relative_time(@poll_period) - @server.close(@cq, deadline) + @server.close(deadline) @pool.stop end @@ -355,7 +339,8 @@ module GRPC return an_rpc if @pool.jobs_waiting <= @max_waiting_requests GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") noop = proc { |x| x } - c = ActiveCall.new(an_rpc.call, an_rpc.cq, noop, noop, an_rpc.deadline) + c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, + metadata_received: true) c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '') nil end @@ -366,7 +351,8 @@ module GRPC return an_rpc if rpc_descs.key?(mth) GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") noop = proc { |x| x } - c = ActiveCall.new(an_rpc.call, an_rpc.cq, noop, noop, an_rpc.deadline) + c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, + metadata_received: true) c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') nil end @@ -374,11 +360,9 @@ module GRPC # handles calls to the server def loop_handle_server_calls fail 'not started' if running_state == :not_started - loop_tag = Object.new while running_state == :running begin - comp_queue = Core::CompletionQueue.new - an_rpc = @server.request_call(comp_queue, loop_tag, INFINITE_FUTURE) + an_rpc = @server.request_call break if (!an_rpc.nil?) && an_rpc.call.nil? active_call = new_active_server_call(an_rpc) unless active_call.nil? @@ -410,15 +394,13 @@ module GRPC return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call - handle_call_tag = Object.new an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers GRPC.logger.debug("call md is #{an_rpc.metadata}") connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) end - an_rpc.call.run_batch(an_rpc.cq, handle_call_tag, INFINITE_FUTURE, - SEND_INITIAL_METADATA => connect_md) + an_rpc.call.run_batch(SEND_INITIAL_METADATA => connect_md) return nil unless available?(an_rpc) return nil unless implemented?(an_rpc) @@ -426,9 +408,9 @@ module GRPC # Create the ActiveCall GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] - c = ActiveCall.new(an_rpc.call, an_rpc.cq, - rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), - an_rpc.deadline) + c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc, + rpc_desc.unmarshal_proc(:input), an_rpc.deadline, + metadata_received: true) mth = an_rpc.method.to_sym [c, mth] end |