diff options
author | Alexander Polcyn <apolcyn@google.com> | 2017-07-18 17:26:08 -0700 |
---|---|---|
committer | Alexander Polcyn <apolcyn@google.com> | 2017-07-18 17:26:08 -0700 |
commit | 59a19a9d5ecc34b60fd6c035a6cb261261dd48fe (patch) | |
tree | f5177b228bfa1ec4c25bde0e48048ec6dd545471 /src/ruby | |
parent | fb1e164cd81bd15b7d2aad595510e0c083fb3d5b (diff) |
make sure that client-side view of calls is robust
Diffstat (limited to 'src/ruby')
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 17 | ||||
-rw-r--r-- | src/ruby/spec/generic/client_stub_spec.rb | 97 |
2 files changed, 104 insertions, 10 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 96c773a995..4a748a4ac2 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -46,7 +46,7 @@ module GRPC extend Forwardable attr_reader :deadline, :metadata_sent, :metadata_to_send def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, - :peer, :peer_cert, :trailing_metadata + :peer, :peer_cert, :trailing_metadata, :status # client_invoke begins a client invocation. # @@ -105,6 +105,8 @@ module GRPC @input_stream_done = false @call_finished = false @call_finished_mu = Mutex.new + @client_call_executed = false + @client_call_executed_mu = Mutex.new end # Sends the initial metadata that has yet to be sent. @@ -327,6 +329,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def request_response(req, metadata: {}) + raise_error_if_already_executed ops = { SEND_MESSAGE => @marshal.call(req), SEND_CLOSE_FROM_CLIENT => nil, @@ -369,6 +372,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def client_streamer(requests, metadata: {}) + raise_error_if_already_executed begin merge_metadata_and_send_if_not_already_sent(metadata) requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } @@ -411,6 +415,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator|nil] a response Enumerator def server_streamer(req, metadata: {}) + raise_error_if_already_executed ops = { SEND_MESSAGE => @marshal.call(req), SEND_CLOSE_FROM_CLIENT => nil @@ -468,6 +473,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, metadata: {}, &blk) + raise_error_if_already_executed # Metadata might have already been sent if this is an operation view merge_metadata_and_send_if_not_already_sent(metadata) bd = BidiCall.new(@call, @@ -572,6 +578,15 @@ module GRPC merge_metadata_to_send(metadata) && send_initial_metadata end + def raise_error_if_already_executed + @client_call_executed_mu.synchronize do + if @client_call_executed + fail GRPC::Core::CallError, 'attempting to re-run a call' + end + @client_call_executed = true + end + end + def self.view_class(*visible_methods) Class.new do extend ::Forwardable diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 3b8f72eda1..7b5e6a95a4 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -36,6 +36,33 @@ include GRPC::Core::StatusCodes include GRPC::Core::TimeConsts include GRPC::Core::CallOps +# check that methods on a finished/closed call t crash +def check_op_view_of_finished_client_call_is_robust(op_view) + # use read_response_stream to try to iterate through + # possible response stream + fail('need something to attempt reads') unless block_given? + expect do + resp = op_view.execute + yield resp + end.to raise_error(GRPC::Core::CallError) + + expect { op_view.start_call }.to raise_error(RuntimeError) + + expect do + op_view.wait + op_view.cancel + + op_view.metadata + op_view.trailing_metadata + op_view.status + + op_view.cancelled? + op_view.deadline + op_view.write_flag + op_view.write_flag = 1 + end.to_not raise_error +end + describe 'ClientStub' do let(:noop) { proc { |x| x } } @@ -231,15 +258,27 @@ describe 'ClientStub' do it_behaves_like 'request response' - it 'sends metadata to the server ok when running start_call first' do + def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" th = run_request_response(@sent_msg, @resp, @pass, k1: 'v1', k2: 'v2') stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - expect(get_response(stub)).to eq(@resp) + expect( + get_response(stub, + run_start_call_first: run_start_call_first)).to eq(@resp) th.join end + + it 'sends metadata to the server ok when running start_call first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + end end end @@ -307,11 +346,23 @@ describe 'ClientStub' do it_behaves_like 'client streaming' - it 'sends metadata to the server ok when running start_call first' do + def run_op_view_metadata_test(run_start_call_first) th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) - expect(get_response(@stub, run_start_call_first: true)).to eq(@resp) + expect( + get_response(@stub, + run_start_call_first: run_start_call_first)).to eq(@resp) th.join end + + it 'sends metadata to the server ok when running start_call first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + end end end @@ -377,7 +428,7 @@ describe 'ClientStub' do end end - describe 'without a call operation', test2: true do + describe 'without a call operation' do def get_responses(stub, unmarshal: noop) e = stub.server_streamer(@method, @sent_msg, noop, unmarshal, metadata: @metadata) @@ -405,16 +456,30 @@ describe 'ClientStub' do it_behaves_like 'server streaming' - it 'should send metadata to the server ok when start_call is run first' do + def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" th = run_server_streamer(@sent_msg, @replys, @fail, k1: 'v1', k2: 'v2') stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - e = get_responses(stub, run_start_call_first: true) + e = get_responses(stub, run_start_call_first: run_start_call_first) expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) th.join end + + it 'should send metadata to the server ok when start_call is run first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call_is_robust(@op) do |responses| + responses.each { |r| p r } + end + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call_is_robust(@op) do |responses| + responses.each { |r| p r } + end + end end end @@ -501,14 +566,28 @@ describe 'ClientStub' do it_behaves_like 'bidi streaming' - it 'can run start_call before executing the call' do + def run_op_view_metadata_test(run_start_call_first) th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, @pass) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) - e = get_responses(stub, run_start_call_first: true) + e = get_responses(stub, run_start_call_first: run_start_call_first) expect(e.collect { |r| r }).to eq(@replys) th.join end + + it 'can run start_call before executing the call' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call_is_robust(@op) do |responses| + responses.each { |r| p r } + end + end + + it 'doesnt crash when op_view used after call has finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call_is_robust(@op) do |responses| + responses.each { |r| p r } + end + end end end |