diff options
Diffstat (limited to 'src/ruby/lib/grpc/generic/active_call.rb')
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 146 |
1 files changed, 94 insertions, 52 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index dfc2644c46..3b31f77ec0 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -43,7 +43,8 @@ class Struct GRPC.logger.debug("Failing with status #{status}") # raise BadStatus, propagating the metadata if present. md = status.metadata - fail GRPC::BadStatus.new(status.code, status.details, md) + fail GRPC::BadStatus.new_status_exception( + status.code, status.details, md) end status end @@ -156,41 +157,25 @@ module GRPC Operation.new(self) end - # writes_done indicates that all writes are completed. - # - # It blocks until the remote endpoint acknowledges with at status unless - # assert_finished is set to false. Any calls to #remote_send after this - # call will fail. - # - # @param assert_finished [true, false] when true(default), waits for - # FINISHED. - def writes_done(assert_finished = true) - ops = { - SEND_CLOSE_FROM_CLIENT => nil - } - ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished - batch_result = @call.run_batch(ops) - return unless assert_finished - unless batch_result.status.nil? - @call.trailing_metadata = batch_result.status.metadata - end - @call.status = batch_result.status - op_is_done - batch_result.check_status - end - # finished waits until a client call is completed. # # It blocks until the remote endpoint acknowledges by sending a status. def finished batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) - unless batch_result.status.nil? - @call.trailing_metadata = batch_result.status.metadata + attach_status_results_and_complete_call(batch_result) + end + + def attach_status_results_and_complete_call(recv_status_batch_result) + unless recv_status_batch_result.status.nil? + @call.trailing_metadata = recv_status_batch_result.status.metadata end - @call.status = batch_result.status - op_is_done - batch_result.check_status + @call.status = recv_status_batch_result.status @call.close + op_is_done + + # The RECV_STATUS in run_batch always succeeds + # Check the status for a bad status or failed run batch + recv_status_batch_result.check_status end # remote_send sends a request to the remote endpoint. @@ -226,6 +211,23 @@ module GRPC nil end + def server_unary_response(req, trailing_metadata: {}, + code: Core::StatusCodes::OK, details: 'OK') + ops = {} + @send_initial_md_mutex.synchronize do + ops[SEND_INITIAL_METADATA] = @metadata_to_send unless @metadata_sent + @metadata_sent = true + end + + payload = @marshal.call(req) + ops[SEND_MESSAGE] = payload + ops[SEND_STATUS_FROM_SERVER] = Struct::Status.new( + code, details, trailing_metadata) + ops[RECV_CLOSE_ON_SERVER] = nil + + @call.run_batch(ops) + end + # remote_read reads a response from the remote endpoint. # # It blocks until the remote endpoint replies with a message or status. @@ -240,9 +242,13 @@ module GRPC @call.metadata = batch_result.metadata @metadata_received = true end - unless batch_result.nil? || batch_result.message.nil? - res = @unmarshal.call(batch_result.message) - return res + get_message_from_batch_result(batch_result) + end + + def get_message_from_batch_result(recv_message_batch_result) + unless recv_message_batch_result.nil? || + recv_message_batch_result.message.nil? + return @unmarshal.call(recv_message_batch_result.message) end GRPC.logger.debug('found nil; the final response has been sent') nil @@ -298,7 +304,6 @@ module GRPC return enum_for(:each_remote_read_then_finish) unless block_given? loop do resp = remote_read - break if resp.is_a? Struct::Status # is an OK status if resp.nil? # the last response was received, but not finished yet finished break @@ -315,15 +320,25 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def request_response(req, metadata: {}) - merge_metadata_to_send(metadata) && send_initial_metadata - remote_send(req) - writes_done(false) - response = remote_read - finished unless response.is_a? Struct::Status - response - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e + ops = { + SEND_MESSAGE => @marshal.call(req), + SEND_CLOSE_FROM_CLIENT => nil, + RECV_INITIAL_METADATA => nil, + RECV_MESSAGE => nil, + RECV_STATUS_ON_CLIENT => nil + } + @send_initial_md_mutex.synchronize do + # Metadata might have already been sent if this is an operation view + unless @metadata_sent + ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata) + end + @metadata_sent = true + end + batch_result = @call.run_batch(ops) + + @call.metadata = batch_result.metadata + attach_status_results_and_complete_call(batch_result) + get_message_from_batch_result(batch_result) end # client_streamer sends a stream of requests to a GRPC server, and @@ -339,12 +354,20 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def client_streamer(requests, metadata: {}) - merge_metadata_to_send(metadata) && send_initial_metadata - requests.each { |r| remote_send(r) } - writes_done(false) - response = remote_read - finished unless response.is_a? Struct::Status - response + # Metadata might have already been sent if this is an operation view + merge_metadata_and_send_if_not_already_sent(metadata) + + requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } + batch_result = @call.run_batch( + SEND_CLOSE_FROM_CLIENT => nil, + RECV_INITIAL_METADATA => nil, + RECV_MESSAGE => nil, + RECV_STATUS_ON_CLIENT => nil + ) + + @call.metadata = batch_result.metadata + attach_status_results_and_complete_call(batch_result) + get_message_from_batch_result(batch_result) rescue GRPC::Core::CallError => e finished # checks for Cancelled raise e @@ -365,9 +388,18 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator|nil] a response Enumerator def server_streamer(req, metadata: {}) - merge_metadata_to_send(metadata) && send_initial_metadata - remote_send(req) - writes_done(false) + ops = { + SEND_MESSAGE => @marshal.call(req), + SEND_CLOSE_FROM_CLIENT => nil + } + @send_initial_md_mutex.synchronize do + # Metadata might have already been sent if this is an operation view + unless @metadata_sent + ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata) + end + @metadata_sent = true + end + @call.run_batch(ops) replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } @@ -404,7 +436,8 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, metadata: {}, &blk) - merge_metadata_to_send(metadata) && send_initial_metadata + # Metadata might have already been sent if this is an operation view + merge_metadata_and_send_if_not_already_sent(metadata) bd = BidiCall.new(@call, @marshal, @unmarshal, @@ -457,6 +490,15 @@ module GRPC end end + def merge_metadata_and_send_if_not_already_sent(new_metadata = {}) + @send_initial_md_mutex.synchronize do + return if @metadata_sent + @metadata_to_send.merge!(new_metadata) + @call.run_batch(SEND_INITIAL_METADATA => @metadata_to_send) + @metadata_sent = true + end + end + private # Starts the call if not already started |