diff options
author | apolcyn <apolcyn@google.com> | 2017-07-20 17:37:14 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-07-20 17:37:14 -0700 |
commit | d238d842027de8621decd78da19d4f545d73c7fc (patch) | |
tree | 06096b0cc1af12dc766b7f606d138ad26a062d83 /src/ruby/lib | |
parent | ecb1059614f0510551275ee558d429adb7c1087d (diff) | |
parent | 7cc83e0701aafb2cc2412796b72bddbfbda4eac4 (diff) |
Merge pull request #11764 from apolcyn/fix_ruby_socket_leak
Release resources more eagerly on ruby server
Diffstat (limited to 'src/ruby/lib')
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 170 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/bidi_call.rb | 62 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_desc.rb | 4 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 1 |
4 files changed, 168 insertions, 69 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 67c984ab49..87b29c26ea 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -40,13 +40,13 @@ end module GRPC # The ActiveCall class provides simple methods for sending marshallable # data to a call - class ActiveCall + class ActiveCall # rubocop:disable Metrics/ClassLength include Core::TimeConsts include Core::CallOps extend Forwardable - attr_reader :deadline, :metadata_sent, :metadata_to_send + attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, - :peer, :peer_cert, :trailing_metadata + :trailing_metadata, :status # client_invoke begins a client invocation. # @@ -100,6 +100,18 @@ module GRPC fail(ArgumentError, 'Already sent md') if started && metadata_to_send @metadata_to_send = metadata_to_send || {} unless started @send_initial_md_mutex = Mutex.new + + @output_stream_done = false + @input_stream_done = false + @call_finished = false + @call_finished_mu = Mutex.new + + @client_call_executed = false + @client_call_executed_mu = Mutex.new + + # set the peer now so that the accessor can still function + # after the server closes the call + @peer = call.peer end # Sends the initial metadata that has yet to be sent. @@ -142,11 +154,9 @@ module GRPC Operation.new(self) end - # finished waits until a client call is completed. - # - # It blocks until the remote endpoint acknowledges by sending a status. - def finished + def receive_and_check_status batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) + set_input_stream_done attach_status_results_and_complete_call(batch_result) end @@ -155,8 +165,6 @@ module GRPC @call.trailing_metadata = recv_status_batch_result.status.metadata end @call.status = recv_status_batch_result.status - @call.close - op_is_done # The RECV_STATUS in run_batch always succeeds # Check the status for a bad status or failed run batch @@ -193,9 +201,19 @@ module GRPC } ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished @call.run_batch(ops) + set_output_stream_done + nil end + # Intended for use on server-side calls when a single request from + # the client is expected (i.e., unary and server-streaming RPC types). + def read_unary_request + req = remote_read + set_input_stream_done + req + end + def server_unary_response(req, trailing_metadata: {}, code: Core::StatusCodes::OK, details: 'OK') ops = {} @@ -211,6 +229,7 @@ module GRPC ops[RECV_CLOSE_ON_SERVER] = nil @call.run_batch(ops) + set_output_stream_done end # remote_read reads a response from the remote endpoint. @@ -241,6 +260,8 @@ module GRPC # each_remote_read passes each response to the given block or returns an # enumerator the responses if no block is given. + # Used to generate the request enumerable for + # server-side client-streaming RPC's. # # == Enumerator == # @@ -258,10 +279,14 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read return enum_for(:each_remote_read) unless block_given? - loop do - resp = remote_read - break if resp.nil? # the last response was received - yield resp + begin + loop do + resp = remote_read + break if resp.nil? # the last response was received + yield resp + end + ensure + set_input_stream_done end end @@ -287,13 +312,17 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read_then_finish return enum_for(:each_remote_read_then_finish) unless block_given? - loop do - resp = remote_read - if resp.nil? # the last response was received, but not finished yet - finished - break + begin + loop do + resp = remote_read + if resp.nil? # the last response was received + receive_and_check_status + break + end + yield resp end - yield resp + ensure + set_input_stream_done end end @@ -305,6 +334,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def request_response(req, metadata: {}) + raise_error_if_already_executed ops = { SEND_MESSAGE => @marshal.call(req), SEND_CLOSE_FROM_CLIENT => nil, @@ -319,7 +349,15 @@ module GRPC end @metadata_sent = true end - batch_result = @call.run_batch(ops) + + begin + batch_result = @call.run_batch(ops) + # no need to check for cancellation after a CallError because this + # batch contains a RECV_STATUS op + ensure + set_input_stream_done + set_output_stream_done + end @call.metadata = batch_result.metadata attach_status_results_and_complete_call(batch_result) @@ -339,10 +377,20 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def client_streamer(requests, metadata: {}) - # Metadata might have already been sent if this is an operation view - merge_metadata_and_send_if_not_already_sent(metadata) + raise_error_if_already_executed + begin + merge_metadata_and_send_if_not_already_sent(metadata) + requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } + rescue GRPC::Core::CallError => e + receive_and_check_status # check for Cancelled + raise e + rescue => e + set_input_stream_done + raise e + ensure + set_output_stream_done + end - requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } batch_result = @call.run_batch( SEND_CLOSE_FROM_CLIENT => nil, RECV_INITIAL_METADATA => nil, @@ -350,12 +398,11 @@ module GRPC RECV_STATUS_ON_CLIENT => nil ) + set_input_stream_done + @call.metadata = batch_result.metadata attach_status_results_and_complete_call(batch_result) get_message_from_batch_result(batch_result) - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e end # server_streamer sends one request to the GRPC server, which yields a @@ -373,6 +420,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator|nil] a response Enumerator def server_streamer(req, metadata: {}) + raise_error_if_already_executed ops = { SEND_MESSAGE => @marshal.call(req), SEND_CLOSE_FROM_CLIENT => nil @@ -384,13 +432,22 @@ module GRPC end @metadata_sent = true end - @call.run_batch(ops) + + begin + @call.run_batch(ops) + rescue GRPC::Core::CallError => e + receive_and_check_status # checks for Cancelled + raise e + rescue => e + set_input_stream_done + raise e + ensure + set_output_stream_done + end + replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e end # bidi_streamer sends a stream of requests to the GRPC server, and yields @@ -421,6 +478,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, metadata: {}, &blk) + raise_error_if_already_executed # Metadata might have already been sent if this is an operation view merge_metadata_and_send_if_not_already_sent(metadata) bd = BidiCall.new(@call, @@ -428,7 +486,10 @@ module GRPC @unmarshal, metadata_received: @metadata_received) - bd.run_on_client(requests, @op_notifier, &blk) + bd.run_on_client(requests, + proc { set_input_stream_done }, + proc { set_output_stream_done }, + &blk) end # run_server_bidi orchestrates a BiDi stream processing on a server. @@ -449,7 +510,7 @@ module GRPC metadata_received: @metadata_received, req_view: MultiReqView.new(self)) - bd.run_on_server(gen_each_reply) + bd.run_on_server(gen_each_reply, proc { set_input_stream_done }) end # Waits till an operation completes @@ -459,7 +520,8 @@ module GRPC @op_notifier.wait end - # Signals that an operation is done + # Signals that an operation is done. + # Only relevant on the client-side (this is a no-op on the server-side) def op_is_done return if @op_notifier.nil? @op_notifier.notify(self) @@ -484,8 +546,40 @@ module GRPC end end + def attach_peer_cert(peer_cert) + @peer_cert = peer_cert + end + private + # To be called once the "input stream" has been completelly + # read through (i.e, done reading from client or received status) + # note this is idempotent + def set_input_stream_done + @call_finished_mu.synchronize do + @input_stream_done = true + maybe_finish_and_close_call_locked + end + end + + # To be called once the "output stream" has been completelly + # sent through (i.e, done sending from client or sent status) + # note this is idempotent + def set_output_stream_done + @call_finished_mu.synchronize do + @output_stream_done = true + maybe_finish_and_close_call_locked + end + end + + def maybe_finish_and_close_call_locked + return unless @output_stream_done && @input_stream_done + return if @call_finished + @call_finished = true + op_is_done + @call.close + end + # Starts the call if not already started # @param metadata [Hash] metadata to be sent to the server. If a value is # a list, multiple metadata for its key are sent @@ -493,6 +587,15 @@ module GRPC merge_metadata_to_send(metadata) && send_initial_metadata end + def raise_error_if_already_executed + @client_call_executed_mu.synchronize do + if @client_call_executed + fail GRPC::Core::CallError, 'attempting to re-run a call' + end + @client_call_executed = true + end + end + def self.view_class(*visible_methods) Class.new do extend ::Forwardable @@ -518,6 +621,7 @@ module GRPC # server client_streamer handlers. MultiReqView = view_class(:cancelled?, :deadline, :each_remote_read, :metadata, :output_metadata, + :peer, :peer_cert, :send_initial_metadata, :metadata_to_send, :merge_metadata_to_send, diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index e54cf78969..9e125cd986 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -62,12 +62,19 @@ module GRPC # block that can be invoked with each response. # # @param requests the Enumerable of requests to send - # @param op_notifier a Notifier used to signal completion + # @param set_input_stream_done [Proc] called back when we're done + # reading the input stream + # @param set_input_stream_done [Proc] called back when we're done + # sending data on the output stream # @return an Enumerator of requests to yield - def run_on_client(requests, op_notifier, &blk) - @op_notifier = op_notifier - @enq_th = Thread.new { write_loop(requests) } - read_loop(&blk) + def run_on_client(requests, + set_input_stream_done, + set_output_stream_done, + &blk) + @enq_th = Thread.new do + write_loop(requests, set_output_stream_done: set_output_stream_done) + end + read_loop(set_input_stream_done, &blk) end # Begins orchestration of the Bidi stream for a server generating replies. @@ -81,12 +88,17 @@ module GRPC # produced by gen_each_reply could ignore the received_msgs # # @param gen_each_reply [Proc] generates the BiDi stream replies. - def run_on_server(gen_each_reply) + # @param set_input_steam_done [Proc] call back to call when + # the reads have been completely read through. + def run_on_server(gen_each_reply, set_input_stream_done) # Pass in the optional call object parameter if possible if gen_each_reply.arity == 1 - replys = gen_each_reply.call(read_loop(is_client: false)) + replys = gen_each_reply.call( + read_loop(set_input_stream_done, is_client: false)) elsif gen_each_reply.arity == 2 - replys = gen_each_reply.call(read_loop(is_client: false), @req_view) + replys = gen_each_reply.call( + read_loop(set_input_stream_done, is_client: false), + @req_view) else fail 'Illegal arity of reply generator' end @@ -99,22 +111,6 @@ module GRPC END_OF_READS = :end_of_reads END_OF_WRITES = :end_of_writes - # signals that bidi operation is complete - def notify_done - return unless @op_notifier - GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}") - @op_notifier.notify(self) - end - - # signals that a bidi operation is complete (read + write) - def finished - @done_mutex.synchronize do - return unless @reads_complete && @writes_complete && !@complete - @call.close - @complete = true - end - end - # performs a read using @call.run_batch, ensures metadata is set up def read_using_run_batch ops = { RECV_MESSAGE => nil } @@ -127,7 +123,8 @@ module GRPC batch_result end - def write_loop(requests, is_client: true) + # set_output_stream_done is relevant on client-side + def write_loop(requests, is_client: true, set_output_stream_done: nil) GRPC.logger.debug('bidi-write-loop: starting') count = 0 requests.each do |req| @@ -151,23 +148,20 @@ module GRPC GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) GRPC.logger.debug('bidi-write-loop: done') - notify_done - @writes_complete = true - finished end GRPC.logger.debug('bidi-write-loop: finished') rescue StandardError => e GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn(e) - notify_done - @writes_complete = true - finished raise e + ensure + set_output_stream_done.call if is_client end # Provides an enumerator that yields results of remote reads - def read_loop(is_client: true) + def read_loop(set_input_stream_done, is_client: true) return enum_for(:read_loop, + set_input_stream_done, is_client: is_client) unless block_given? GRPC.logger.debug('bidi-read-loop: starting') begin @@ -201,10 +195,10 @@ module GRPC GRPC.logger.warn('bidi: read-loop failed') GRPC.logger.warn(e) raise e + ensure + set_input_stream_done.call end GRPC.logger.debug('bidi-read-loop: finished') - @reads_complete = true - finished # Make sure that the write loop is done done before finishing the call. # Note that blocking is ok at this point because we've already received # a status diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index ce0097573a..89cf8ff6a0 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -48,7 +48,7 @@ module GRPC end def handle_request_response(active_call, mth) - req = active_call.remote_read + req = active_call.read_unary_request resp = mth.call(req, active_call.single_req_view) active_call.server_unary_response( resp, trailing_metadata: active_call.output_metadata) @@ -61,7 +61,7 @@ module GRPC end def handle_server_streamer(active_call, mth) - req = active_call.remote_read + req = active_call.read_unary_request replys = mth.call(req, active_call.single_req_view) replys.each { |r| active_call.remote_send(r) } send_status(active_call, OK, 'OK', active_call.output_metadata) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index ef2cc0ce91..33b3cea1fc 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -418,6 +418,7 @@ module GRPC metadata_received: true, started: false, metadata_to_send: connect_md) + c.attach_peer_cert(an_rpc.call.peer_cert) mth = an_rpc.method.to_sym [c, mth] end |