diff options
author | Alexander Polcyn <apolcyn@google.com> | 2018-04-03 16:08:01 -0700 |
---|---|---|
committer | Alexander Polcyn <apolcyn@google.com> | 2018-04-03 20:47:24 -0700 |
commit | ae937d34d6d517ffb29b8a22f947cdc3461aa1ca (patch) | |
tree | 12f66ab9cea170b4ac46f3fdd5e80d8b23e4d1a4 /src/ruby/spec | |
parent | 786126dd41822155493c175eda2d0e54a2b1012f (diff) |
Never throw CallErrors for failed bidi reads or writes
Diffstat (limited to 'src/ruby/spec')
-rw-r--r-- | src/ruby/spec/generic/client_stub_spec.rb | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index d858c4e3fe..da50f8d0c9 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -750,6 +750,90 @@ describe 'ClientStub' do # rubocop:disable Metrics/BlockLength expected_error_message) end end + + # Prompted by grpc/github #14853 + describe 'client-side error handling on bidi streams' do + class EnumeratorQueue + def initialize(queue) + @queue = queue + end + + def each + loop do + msg = @queue.pop + break if msg.nil? + yield msg + end + end + end + + def run_server_bidi_shutdown_after_one_read + @server.start + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = GRPC::ActiveCall.new( + recvd_call, noop, noop, INFINITE_FUTURE, + metadata_received: true, started: false) + expect(server_call.remote_read).to eq('first message') + @server.shutdown_and_notify(from_relative_time(0)) + @server.close + end + + it 'receives a grpc status code when writes to a bidi stream fail' do + # This test tries to trigger the case when a 'SEND_MESSAGE' op + # and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails. + # In this case, iteration through the response stream should result + # in a grpc status code, and the writer thread should not raise an + # exception. + server_thread = Thread.new do + run_server_bidi_shutdown_after_one_read + end + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + request_queue = Queue.new + @sent_msgs = EnumeratorQueue.new(request_queue) + responses = get_responses(stub) + request_queue.push('first message') + # Now wait for the server to shut down. + server_thread.join + # Sanity check. This test is not interesting if + # Thread.abort_on_exception is not set. + expect(Thread.abort_on_exception).to be(true) + # An attempt to send a second message should fail now that the + # server is down. + request_queue.push('second message') + request_queue.push(nil) + expect { responses.next }.to raise_error(GRPC::BadStatus) + end + + def run_server_bidi_shutdown_after_one_write + @server.start + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = GRPC::ActiveCall.new( + recvd_call, noop, noop, INFINITE_FUTURE, + metadata_received: true, started: false) + server_call.send_initial_metadata + server_call.remote_send('message') + @server.shutdown_and_notify(from_relative_time(0)) + @server.close + end + + it 'receives a grpc status code when reading from a failed bidi call' do + server_thread = Thread.new do + run_server_bidi_shutdown_after_one_write + end + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + request_queue = Queue.new + @sent_msgs = EnumeratorQueue.new(request_queue) + responses = get_responses(stub) + expect(responses.next).to eq('message') + # Wait for the server to shut down + server_thread.join + expect { responses.next }.to raise_error(GRPC::BadStatus) + # Push a sentinel to allow the writer thread to finish + request_queue.push(nil) + end + end end describe 'without a call operation' do @@ -810,6 +894,55 @@ describe 'ClientStub' do # rubocop:disable Metrics/BlockLength responses.each { |r| p r } end end + + def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) + @server.start + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = GRPC::ActiveCall.new( + recvd_call, noop, noop, INFINITE_FUTURE, + metadata_received: true, started: false) + server_call.send_initial_metadata + server_call.remote_send('server call received') + wait_for_shutdown_ok_callback.call + # since the client is cancelling the call, + # we should be able to shut down cleanly + @server.shutdown_and_notify(nil) + @server.close + end + + it 'receives a grpc status code when reading from a cancelled bidi call' do + # This test tries to trigger a 'RECV_INITIAL_METADATA' and/or + # 'RECV_MESSAGE' op failure. + # An attempt to read a message might fail; in that case, iteration + # through the response stream should still result in a grpc status. + server_can_shutdown = false + server_can_shutdown_mu = Mutex.new + server_can_shutdown_cv = ConditionVariable.new + wait_for_shutdown_ok_callback = proc do + server_can_shutdown_mu.synchronize do + server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown + end + end + server_thread = Thread.new do + run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) + end + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + request_queue = Queue.new + @sent_msgs = EnumeratorQueue.new(request_queue) + responses = get_responses(stub) + expect(responses.next).to eq('server call received') + @op.cancel + expect { responses.next }.to raise_error(GRPC::Cancelled) + # Now let the server proceed to shut down. + server_can_shutdown_mu.synchronize do + server_can_shutdown = true + server_can_shutdown_cv.broadcast + end + server_thread.join + # Push a sentinel to allow the writer thread to finish + request_queue.push(nil) + end end end |