aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb25
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb133
2 files changed, 151 insertions, 7 deletions
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 086455db0b..ffb232b827 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -124,12 +124,18 @@ module GRPC
def read_using_run_batch
ops = { RECV_MESSAGE => nil }
ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
- batch_result = @call.run_batch(ops)
- unless @metadata_received
- @call.metadata = batch_result.metadata
- @metadata_received = true
+ begin
+ batch_result = @call.run_batch(ops)
+ unless @metadata_received
+ @call.metadata = batch_result.metadata
+ @metadata_received = true
+ end
+ batch_result
+ rescue GRPC::Core::CallError => e
+ GRPC.logger.warn('bidi call: read_using_run_batch failed')
+ GRPC.logger.warn(e)
+ nil
end
- batch_result
end
# set_output_stream_done is relevant on client-side
@@ -155,7 +161,12 @@ module GRPC
GRPC.logger.debug("bidi-write-loop: #{count} writes done")
if is_client
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
- @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
+ begin
+ @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
+ rescue GRPC::Core::CallError => e
+ GRPC.logger.warn('bidi-write-loop: send close failed')
+ GRPC.logger.warn(e)
+ end
GRPC.logger.debug('bidi-write-loop: done')
end
GRPC.logger.debug('bidi-write-loop: finished')
@@ -187,7 +198,7 @@ module GRPC
batch_result = read_using_run_batch
# handle the next message
- if batch_result.message.nil?
+ if batch_result.nil? || batch_result.message.nil?
GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
if is_client
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index d858c4e3fe..da50f8d0c9 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -750,6 +750,90 @@ describe 'ClientStub' do # rubocop:disable Metrics/BlockLength
expected_error_message)
end
end
+
+ # Prompted by grpc/github #14853
+ describe 'client-side error handling on bidi streams' do
+ class EnumeratorQueue
+ def initialize(queue)
+ @queue = queue
+ end
+
+ def each
+ loop do
+ msg = @queue.pop
+ break if msg.nil?
+ yield msg
+ end
+ end
+ end
+
+ def run_server_bidi_shutdown_after_one_read
+ @server.start
+ recvd_rpc = @server.request_call
+ recvd_call = recvd_rpc.call
+ server_call = GRPC::ActiveCall.new(
+ recvd_call, noop, noop, INFINITE_FUTURE,
+ metadata_received: true, started: false)
+ expect(server_call.remote_read).to eq('first message')
+ @server.shutdown_and_notify(from_relative_time(0))
+ @server.close
+ end
+
+ it 'receives a grpc status code when writes to a bidi stream fail' do
+ # This test tries to trigger the case when a 'SEND_MESSAGE' op
+ # and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails.
+ # In this case, iteration through the response stream should result
+ # in a grpc status code, and the writer thread should not raise an
+ # exception.
+ server_thread = Thread.new do
+ run_server_bidi_shutdown_after_one_read
+ end
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ request_queue = Queue.new
+ @sent_msgs = EnumeratorQueue.new(request_queue)
+ responses = get_responses(stub)
+ request_queue.push('first message')
+ # Now wait for the server to shut down.
+ server_thread.join
+ # Sanity check. This test is not interesting if
+ # Thread.abort_on_exception is not set.
+ expect(Thread.abort_on_exception).to be(true)
+ # An attempt to send a second message should fail now that the
+ # server is down.
+ request_queue.push('second message')
+ request_queue.push(nil)
+ expect { responses.next }.to raise_error(GRPC::BadStatus)
+ end
+
+ def run_server_bidi_shutdown_after_one_write
+ @server.start
+ recvd_rpc = @server.request_call
+ recvd_call = recvd_rpc.call
+ server_call = GRPC::ActiveCall.new(
+ recvd_call, noop, noop, INFINITE_FUTURE,
+ metadata_received: true, started: false)
+ server_call.send_initial_metadata
+ server_call.remote_send('message')
+ @server.shutdown_and_notify(from_relative_time(0))
+ @server.close
+ end
+
+ it 'receives a grpc status code when reading from a failed bidi call' do
+ server_thread = Thread.new do
+ run_server_bidi_shutdown_after_one_write
+ end
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ request_queue = Queue.new
+ @sent_msgs = EnumeratorQueue.new(request_queue)
+ responses = get_responses(stub)
+ expect(responses.next).to eq('message')
+ # Wait for the server to shut down
+ server_thread.join
+ expect { responses.next }.to raise_error(GRPC::BadStatus)
+ # Push a sentinel to allow the writer thread to finish
+ request_queue.push(nil)
+ end
+ end
end
describe 'without a call operation' do
@@ -810,6 +894,55 @@ describe 'ClientStub' do # rubocop:disable Metrics/BlockLength
responses.each { |r| p r }
end
end
+
+ def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
+ @server.start
+ recvd_rpc = @server.request_call
+ recvd_call = recvd_rpc.call
+ server_call = GRPC::ActiveCall.new(
+ recvd_call, noop, noop, INFINITE_FUTURE,
+ metadata_received: true, started: false)
+ server_call.send_initial_metadata
+ server_call.remote_send('server call received')
+ wait_for_shutdown_ok_callback.call
+ # since the client is cancelling the call,
+ # we should be able to shut down cleanly
+ @server.shutdown_and_notify(nil)
+ @server.close
+ end
+
+ it 'receives a grpc status code when reading from a cancelled bidi call' do
+ # This test tries to trigger a 'RECV_INITIAL_METADATA' and/or
+ # 'RECV_MESSAGE' op failure.
+ # An attempt to read a message might fail; in that case, iteration
+ # through the response stream should still result in a grpc status.
+ server_can_shutdown = false
+ server_can_shutdown_mu = Mutex.new
+ server_can_shutdown_cv = ConditionVariable.new
+ wait_for_shutdown_ok_callback = proc do
+ server_can_shutdown_mu.synchronize do
+ server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown
+ end
+ end
+ server_thread = Thread.new do
+ run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
+ end
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ request_queue = Queue.new
+ @sent_msgs = EnumeratorQueue.new(request_queue)
+ responses = get_responses(stub)
+ expect(responses.next).to eq('server call received')
+ @op.cancel
+ expect { responses.next }.to raise_error(GRPC::Cancelled)
+ # Now let the server proceed to shut down.
+ server_can_shutdown_mu.synchronize do
+ server_can_shutdown = true
+ server_can_shutdown_cv.broadcast
+ end
+ server_thread.join
+ # Push a sentinel to allow the writer thread to finish
+ request_queue.push(nil)
+ end
end
end