aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/lib/grpc/generic/active_call.rb
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/lib/grpc/generic/active_call.rb')
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb42
1 files changed, 19 insertions, 23 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 8c3aa284aa..688726ef4a 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -116,10 +116,11 @@ module GRPC
# Sends the initial metadata that has yet to be sent.
# Does nothing if metadata has already been sent for this call.
- def send_initial_metadata
+ def send_initial_metadata(new_metadata = {})
@send_initial_md_mutex.synchronize do
return if @metadata_sent
- @metadata_tag = ActiveCall.client_invoke(@call, @metadata_to_send)
+ @metadata_to_send.merge!(new_metadata)
+ ActiveCall.client_invoke(@call, @metadata_to_send)
@metadata_sent = true
end
end
@@ -321,18 +322,22 @@ module GRPC
# @return [Enumerator] if no block was given
def each_remote_read_then_finish
return enum_for(:each_remote_read_then_finish) unless block_given?
- begin
- loop do
- resp = remote_read
- if resp.nil? # the last response was received
- receive_and_check_status
- break
+ loop do
+ resp =
+ begin
+ remote_read
+ rescue GRPC::Core::CallError => e
+ GRPC.logger.warn("In each_remote_read_then_finish: #{e}")
+ nil
end
- yield resp
- end
- ensure
- set_input_stream_done
+
+ break if resp.nil? # the last response was received
+ yield resp
end
+
+ receive_and_check_status
+ ensure
+ set_input_stream_done
end
# request_response sends a request to a GRPC server, and returns the
@@ -388,7 +393,7 @@ module GRPC
def client_streamer(requests, metadata: {})
raise_error_if_already_executed
begin
- merge_metadata_and_send_if_not_already_sent(metadata)
+ send_initial_metadata(metadata)
requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
rescue GRPC::Core::CallError => e
receive_and_check_status # check for Cancelled
@@ -490,7 +495,7 @@ module GRPC
raise_error_if_already_executed
# Metadata might have already been sent if this is an operation view
begin
- merge_metadata_and_send_if_not_already_sent(metadata)
+ send_initial_metadata(metadata)
rescue GRPC::Core::CallError => e
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
set_input_stream_done
@@ -571,15 +576,6 @@ module GRPC
end
end
- def merge_metadata_and_send_if_not_already_sent(new_metadata = {})
- @send_initial_md_mutex.synchronize do
- return if @metadata_sent
- @metadata_to_send.merge!(new_metadata)
- @call.run_batch(SEND_INITIAL_METADATA => @metadata_to_send)
- @metadata_sent = true
- end
- end
-
def attach_peer_cert(peer_cert)
@peer_cert = peer_cert
end