aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/lib/grpc/generic/rpc_server.rb
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/lib/grpc/generic/rpc_server.rb')
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb40
1 files changed, 11 insertions, 29 deletions
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index ab7333d133..c92a532a50 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -159,16 +159,6 @@ module GRPC
# Signal check period is 0.25s
SIGNAL_CHECK_PERIOD = 0.25
- # setup_cq is used by #initialize to constuct a Core::CompletionQueue from
- # its arguments.
- def self.setup_cq(alt_cq)
- return Core::CompletionQueue.new if alt_cq.nil?
- unless alt_cq.is_a? Core::CompletionQueue
- fail(TypeError, '!CompletionQueue')
- end
- alt_cq
- end
-
# setup_connect_md_proc is used by #initialize to validate the
# connect_md_proc.
def self.setup_connect_md_proc(a_proc)
@@ -191,10 +181,6 @@ module GRPC
# * pool_size: the size of the thread pool the server uses to run its
# threads
#
- # * completion_queue_override: when supplied, this will be used as the
- # completion_queue that the server uses to receive network events,
- # otherwise its creates a new instance itself
- #
# * creds: [GRPC::Core::ServerCredentials]
# the credentials used to secure the server
#
@@ -212,11 +198,9 @@ module GRPC
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:DEFAULT_POLL_PERIOD,
- completion_queue_override:nil,
connect_md_proc:nil,
server_args:{})
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
- @cq = RpcServer.setup_cq(completion_queue_override)
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@pool_size = pool_size
@@ -226,7 +210,7 @@ module GRPC
# running_state can take 4 values: :not_started, :running, :stopping, and
# :stopped. State transitions can only proceed in that order.
@running_state = :not_started
- @server = Core::Server.new(@cq, server_args)
+ @server = Core::Server.new(server_args)
end
# stops a running server
@@ -240,7 +224,7 @@ module GRPC
transition_running_state(:stopping)
end
deadline = from_relative_time(@poll_period)
- @server.close(@cq, deadline)
+ @server.close(deadline)
@pool.stop
end
@@ -355,7 +339,8 @@ module GRPC
return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x }
- c = ActiveCall.new(an_rpc.call, an_rpc.cq, noop, noop, an_rpc.deadline)
+ c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
+ metadata_received: true)
c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '')
nil
end
@@ -366,7 +351,8 @@ module GRPC
return an_rpc if rpc_descs.key?(mth)
GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
noop = proc { |x| x }
- c = ActiveCall.new(an_rpc.call, an_rpc.cq, noop, noop, an_rpc.deadline)
+ c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
+ metadata_received: true)
c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
nil
end
@@ -374,11 +360,9 @@ module GRPC
# handles calls to the server
def loop_handle_server_calls
fail 'not started' if running_state == :not_started
- loop_tag = Object.new
while running_state == :running
begin
- comp_queue = Core::CompletionQueue.new
- an_rpc = @server.request_call(comp_queue, loop_tag, INFINITE_FUTURE)
+ an_rpc = @server.request_call
break if (!an_rpc.nil?) && an_rpc.call.nil?
active_call = new_active_server_call(an_rpc)
unless active_call.nil?
@@ -410,15 +394,13 @@ module GRPC
return nil if an_rpc.nil? || an_rpc.call.nil?
# allow the metadata to be accessed from the call
- handle_call_tag = Object.new
an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers
GRPC.logger.debug("call md is #{an_rpc.metadata}")
connect_md = nil
unless @connect_md_proc.nil?
connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
end
- an_rpc.call.run_batch(an_rpc.cq, handle_call_tag, INFINITE_FUTURE,
- SEND_INITIAL_METADATA => connect_md)
+ an_rpc.call.run_batch(SEND_INITIAL_METADATA => connect_md)
return nil unless available?(an_rpc)
return nil unless implemented?(an_rpc)
@@ -426,9 +408,9 @@ module GRPC
# Create the ActiveCall
GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
rpc_desc = rpc_descs[an_rpc.method.to_sym]
- c = ActiveCall.new(an_rpc.call, an_rpc.cq,
- rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
- an_rpc.deadline)
+ c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc,
+ rpc_desc.unmarshal_proc(:input), an_rpc.deadline,
+ metadata_received: true)
mth = an_rpc.method.to_sym
[c, mth]
end