aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/spec
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2018-04-03 16:08:01 -0700
committerGravatar Alexander Polcyn <apolcyn@google.com>2018-04-03 20:47:24 -0700
commitae937d34d6d517ffb29b8a22f947cdc3461aa1ca (patch)
tree12f66ab9cea170b4ac46f3fdd5e80d8b23e4d1a4 /src/ruby/spec
parent786126dd41822155493c175eda2d0e54a2b1012f (diff)
Never throw CallErrors for failed bidi reads or writes
Diffstat (limited to 'src/ruby/spec')
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb133
1 files changed, 133 insertions, 0 deletions
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index d858c4e3fe..da50f8d0c9 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -750,6 +750,90 @@ describe 'ClientStub' do # rubocop:disable Metrics/BlockLength
expected_error_message)
end
end
+
+ # Prompted by grpc/github #14853
+ describe 'client-side error handling on bidi streams' do
+ class EnumeratorQueue
+ def initialize(queue)
+ @queue = queue
+ end
+
+ def each
+ loop do
+ msg = @queue.pop
+ break if msg.nil?
+ yield msg
+ end
+ end
+ end
+
+ def run_server_bidi_shutdown_after_one_read
+ @server.start
+ recvd_rpc = @server.request_call
+ recvd_call = recvd_rpc.call
+ server_call = GRPC::ActiveCall.new(
+ recvd_call, noop, noop, INFINITE_FUTURE,
+ metadata_received: true, started: false)
+ expect(server_call.remote_read).to eq('first message')
+ @server.shutdown_and_notify(from_relative_time(0))
+ @server.close
+ end
+
+ it 'receives a grpc status code when writes to a bidi stream fail' do
+ # This test tries to trigger the case when a 'SEND_MESSAGE' op
+ # and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails.
+ # In this case, iteration through the response stream should result
+ # in a grpc status code, and the writer thread should not raise an
+ # exception.
+ server_thread = Thread.new do
+ run_server_bidi_shutdown_after_one_read
+ end
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ request_queue = Queue.new
+ @sent_msgs = EnumeratorQueue.new(request_queue)
+ responses = get_responses(stub)
+ request_queue.push('first message')
+ # Now wait for the server to shut down.
+ server_thread.join
+ # Sanity check. This test is not interesting if
+ # Thread.abort_on_exception is not set.
+ expect(Thread.abort_on_exception).to be(true)
+ # An attempt to send a second message should fail now that the
+ # server is down.
+ request_queue.push('second message')
+ request_queue.push(nil)
+ expect { responses.next }.to raise_error(GRPC::BadStatus)
+ end
+
+ def run_server_bidi_shutdown_after_one_write
+ @server.start
+ recvd_rpc = @server.request_call
+ recvd_call = recvd_rpc.call
+ server_call = GRPC::ActiveCall.new(
+ recvd_call, noop, noop, INFINITE_FUTURE,
+ metadata_received: true, started: false)
+ server_call.send_initial_metadata
+ server_call.remote_send('message')
+ @server.shutdown_and_notify(from_relative_time(0))
+ @server.close
+ end
+
+ it 'receives a grpc status code when reading from a failed bidi call' do
+ server_thread = Thread.new do
+ run_server_bidi_shutdown_after_one_write
+ end
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ request_queue = Queue.new
+ @sent_msgs = EnumeratorQueue.new(request_queue)
+ responses = get_responses(stub)
+ expect(responses.next).to eq('message')
+ # Wait for the server to shut down
+ server_thread.join
+ expect { responses.next }.to raise_error(GRPC::BadStatus)
+ # Push a sentinel to allow the writer thread to finish
+ request_queue.push(nil)
+ end
+ end
end
describe 'without a call operation' do
@@ -810,6 +894,55 @@ describe 'ClientStub' do # rubocop:disable Metrics/BlockLength
responses.each { |r| p r }
end
end
+
+ def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
+ @server.start
+ recvd_rpc = @server.request_call
+ recvd_call = recvd_rpc.call
+ server_call = GRPC::ActiveCall.new(
+ recvd_call, noop, noop, INFINITE_FUTURE,
+ metadata_received: true, started: false)
+ server_call.send_initial_metadata
+ server_call.remote_send('server call received')
+ wait_for_shutdown_ok_callback.call
+ # since the client is cancelling the call,
+ # we should be able to shut down cleanly
+ @server.shutdown_and_notify(nil)
+ @server.close
+ end
+
+ it 'receives a grpc status code when reading from a cancelled bidi call' do
+ # This test tries to trigger a 'RECV_INITIAL_METADATA' and/or
+ # 'RECV_MESSAGE' op failure.
+ # An attempt to read a message might fail; in that case, iteration
+ # through the response stream should still result in a grpc status.
+ server_can_shutdown = false
+ server_can_shutdown_mu = Mutex.new
+ server_can_shutdown_cv = ConditionVariable.new
+ wait_for_shutdown_ok_callback = proc do
+ server_can_shutdown_mu.synchronize do
+ server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown
+ end
+ end
+ server_thread = Thread.new do
+ run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
+ end
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ request_queue = Queue.new
+ @sent_msgs = EnumeratorQueue.new(request_queue)
+ responses = get_responses(stub)
+ expect(responses.next).to eq('server call received')
+ @op.cancel
+ expect { responses.next }.to raise_error(GRPC::Cancelled)
+ # Now let the server proceed to shut down.
+ server_can_shutdown_mu.synchronize do
+ server_can_shutdown = true
+ server_can_shutdown_cv.broadcast
+ end
+ server_thread.join
+ # Push a sentinel to allow the writer thread to finish
+ request_queue.push(nil)
+ end
end
end