diff options
Diffstat (limited to 'src/ruby/lib')
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 42 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_desc.rb | 6 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 5 | ||||
-rw-r--r-- | src/ruby/lib/grpc/version.rb | 2 |
4 files changed, 26 insertions, 29 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 8c3aa284aa..688726ef4a 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -116,10 +116,11 @@ module GRPC # Sends the initial metadata that has yet to be sent. # Does nothing if metadata has already been sent for this call. - def send_initial_metadata + def send_initial_metadata(new_metadata = {}) @send_initial_md_mutex.synchronize do return if @metadata_sent - @metadata_tag = ActiveCall.client_invoke(@call, @metadata_to_send) + @metadata_to_send.merge!(new_metadata) + ActiveCall.client_invoke(@call, @metadata_to_send) @metadata_sent = true end end @@ -321,18 +322,22 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read_then_finish return enum_for(:each_remote_read_then_finish) unless block_given? - begin - loop do - resp = remote_read - if resp.nil? # the last response was received - receive_and_check_status - break + loop do + resp = + begin + remote_read + rescue GRPC::Core::CallError => e + GRPC.logger.warn("In each_remote_read_then_finish: #{e}") + nil end - yield resp - end - ensure - set_input_stream_done + + break if resp.nil? # the last response was received + yield resp end + + receive_and_check_status + ensure + set_input_stream_done end # request_response sends a request to a GRPC server, and returns the @@ -388,7 +393,7 @@ module GRPC def client_streamer(requests, metadata: {}) raise_error_if_already_executed begin - merge_metadata_and_send_if_not_already_sent(metadata) + send_initial_metadata(metadata) requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } rescue GRPC::Core::CallError => e receive_and_check_status # check for Cancelled @@ -490,7 +495,7 @@ module GRPC raise_error_if_already_executed # Metadata might have already been sent if this is an operation view begin - merge_metadata_and_send_if_not_already_sent(metadata) + send_initial_metadata(metadata) rescue GRPC::Core::CallError => e batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) set_input_stream_done @@ -571,15 +576,6 @@ module GRPC end end - def merge_metadata_and_send_if_not_already_sent(new_metadata = {}) - @send_initial_md_mutex.synchronize do - return if @metadata_sent - @metadata_to_send.merge!(new_metadata) - @call.run_batch(SEND_INITIAL_METADATA => @metadata_to_send) - @metadata_sent = true - end - end - def attach_peer_cert(peer_cert) @peer_cert = peer_cert end diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 5fd1805aab..efb0e4233d 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -32,7 +32,7 @@ module GRPC # @return [Proc] { |instance| marshalled(instance) } def marshal_proc - proc { |o| o.class.method(marshal_method).call(o).to_s } + proc { |o| o.class.send(marshal_method, o).to_s } end # @param [:input, :output] target determines whether to produce the an @@ -42,9 +42,9 @@ module GRPC # @return [Proc] An unmarshal proc { |marshalled(instance)| instance } def unmarshal_proc(target) fail ArgumentError unless [:input, :output].include?(target) - unmarshal_class = method(target).call + unmarshal_class = send(target) unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream - proc { |o| unmarshal_class.method(unmarshal_method).call(o) } + proc { |o| unmarshal_class.send(unmarshal_method, o) } end def handle_request_response(active_call, mth, inter_ctx) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 31ab6a302b..3b5a0ce27f 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -136,7 +136,7 @@ module GRPC begin blk, args = worker_queue.pop blk.call(*args) - rescue StandardError => e + rescue StandardError, GRPC::Core::CallError => e GRPC.logger.warn('Error in worker thread') GRPC.logger.warn(e) end @@ -364,7 +364,8 @@ module GRPC # sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) - c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '') + c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, + 'No free threads in thread pool') nil end diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb index 15f375100a..0c3e1ef734 100644 --- a/src/ruby/lib/grpc/version.rb +++ b/src/ruby/lib/grpc/version.rb @@ -14,5 +14,5 @@ # GRPC contains the General RPC module. module GRPC - VERSION = '1.13.0.dev' + VERSION = '1.16.0.dev' end |