diff options
Diffstat (limited to 'src/ruby/lib/grpc/generic/active_call.rb')
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 42 |
1 files changed, 19 insertions, 23 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 8c3aa284aa..688726ef4a 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -116,10 +116,11 @@ module GRPC # Sends the initial metadata that has yet to be sent. # Does nothing if metadata has already been sent for this call. - def send_initial_metadata + def send_initial_metadata(new_metadata = {}) @send_initial_md_mutex.synchronize do return if @metadata_sent - @metadata_tag = ActiveCall.client_invoke(@call, @metadata_to_send) + @metadata_to_send.merge!(new_metadata) + ActiveCall.client_invoke(@call, @metadata_to_send) @metadata_sent = true end end @@ -321,18 +322,22 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read_then_finish return enum_for(:each_remote_read_then_finish) unless block_given? - begin - loop do - resp = remote_read - if resp.nil? # the last response was received - receive_and_check_status - break + loop do + resp = + begin + remote_read + rescue GRPC::Core::CallError => e + GRPC.logger.warn("In each_remote_read_then_finish: #{e}") + nil end - yield resp - end - ensure - set_input_stream_done + + break if resp.nil? # the last response was received + yield resp end + + receive_and_check_status + ensure + set_input_stream_done end # request_response sends a request to a GRPC server, and returns the @@ -388,7 +393,7 @@ module GRPC def client_streamer(requests, metadata: {}) raise_error_if_already_executed begin - merge_metadata_and_send_if_not_already_sent(metadata) + send_initial_metadata(metadata) requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } rescue GRPC::Core::CallError => e receive_and_check_status # check for Cancelled @@ -490,7 +495,7 @@ module GRPC raise_error_if_already_executed # Metadata might have already been sent if this is an operation view begin - merge_metadata_and_send_if_not_already_sent(metadata) + send_initial_metadata(metadata) rescue GRPC::Core::CallError => e batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) set_input_stream_done @@ -571,15 +576,6 @@ module GRPC end end - def merge_metadata_and_send_if_not_already_sent(new_metadata = {}) - @send_initial_md_mutex.synchronize do - return if @metadata_sent - @metadata_to_send.merge!(new_metadata) - @call.run_batch(SEND_INITIAL_METADATA => @metadata_to_send) - @metadata_sent = true - end - end - def attach_peer_cert(peer_cert) @peer_cert = peer_cert end |