aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/lib')
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb42
-rw-r--r--src/ruby/lib/grpc/generic/rpc_desc.rb6
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb5
-rw-r--r--src/ruby/lib/grpc/version.rb2
4 files changed, 26 insertions, 29 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 8c3aa284aa..688726ef4a 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -116,10 +116,11 @@ module GRPC
# Sends the initial metadata that has yet to be sent.
# Does nothing if metadata has already been sent for this call.
- def send_initial_metadata
+ def send_initial_metadata(new_metadata = {})
@send_initial_md_mutex.synchronize do
return if @metadata_sent
- @metadata_tag = ActiveCall.client_invoke(@call, @metadata_to_send)
+ @metadata_to_send.merge!(new_metadata)
+ ActiveCall.client_invoke(@call, @metadata_to_send)
@metadata_sent = true
end
end
@@ -321,18 +322,22 @@ module GRPC
# @return [Enumerator] if no block was given
def each_remote_read_then_finish
return enum_for(:each_remote_read_then_finish) unless block_given?
- begin
- loop do
- resp = remote_read
- if resp.nil? # the last response was received
- receive_and_check_status
- break
+ loop do
+ resp =
+ begin
+ remote_read
+ rescue GRPC::Core::CallError => e
+ GRPC.logger.warn("In each_remote_read_then_finish: #{e}")
+ nil
end
- yield resp
- end
- ensure
- set_input_stream_done
+
+ break if resp.nil? # the last response was received
+ yield resp
end
+
+ receive_and_check_status
+ ensure
+ set_input_stream_done
end
# request_response sends a request to a GRPC server, and returns the
@@ -388,7 +393,7 @@ module GRPC
def client_streamer(requests, metadata: {})
raise_error_if_already_executed
begin
- merge_metadata_and_send_if_not_already_sent(metadata)
+ send_initial_metadata(metadata)
requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
rescue GRPC::Core::CallError => e
receive_and_check_status # check for Cancelled
@@ -490,7 +495,7 @@ module GRPC
raise_error_if_already_executed
# Metadata might have already been sent if this is an operation view
begin
- merge_metadata_and_send_if_not_already_sent(metadata)
+ send_initial_metadata(metadata)
rescue GRPC::Core::CallError => e
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
set_input_stream_done
@@ -571,15 +576,6 @@ module GRPC
end
end
- def merge_metadata_and_send_if_not_already_sent(new_metadata = {})
- @send_initial_md_mutex.synchronize do
- return if @metadata_sent
- @metadata_to_send.merge!(new_metadata)
- @call.run_batch(SEND_INITIAL_METADATA => @metadata_to_send)
- @metadata_sent = true
- end
- end
-
def attach_peer_cert(peer_cert)
@peer_cert = peer_cert
end
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index 5fd1805aab..efb0e4233d 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -32,7 +32,7 @@ module GRPC
# @return [Proc] { |instance| marshalled(instance) }
def marshal_proc
- proc { |o| o.class.method(marshal_method).call(o).to_s }
+ proc { |o| o.class.send(marshal_method, o).to_s }
end
# @param [:input, :output] target determines whether to produce the an
@@ -42,9 +42,9 @@ module GRPC
# @return [Proc] An unmarshal proc { |marshalled(instance)| instance }
def unmarshal_proc(target)
fail ArgumentError unless [:input, :output].include?(target)
- unmarshal_class = method(target).call
+ unmarshal_class = send(target)
unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream
- proc { |o| unmarshal_class.method(unmarshal_method).call(o) }
+ proc { |o| unmarshal_class.send(unmarshal_method, o) }
end
def handle_request_response(active_call, mth, inter_ctx)
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 31ab6a302b..3b5a0ce27f 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -136,7 +136,7 @@ module GRPC
begin
blk, args = worker_queue.pop
blk.call(*args)
- rescue StandardError => e
+ rescue StandardError, GRPC::Core::CallError => e
GRPC.logger.warn('Error in worker thread')
GRPC.logger.warn(e)
end
@@ -364,7 +364,8 @@ module GRPC
# sent yet
c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
metadata_received: true, started: false)
- c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '')
+ c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED,
+ 'No free threads in thread pool')
nil
end
diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb
index 15f375100a..0c3e1ef734 100644
--- a/src/ruby/lib/grpc/version.rb
+++ b/src/ruby/lib/grpc/version.rb
@@ -14,5 +14,5 @@
# GRPC contains the General RPC module.
module GRPC
- VERSION = '1.13.0.dev'
+ VERSION = '1.16.0.dev'
end