diff options
Diffstat (limited to 'src/ruby')
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 14 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 1 | ||||
-rw-r--r-- | src/ruby/spec/generic/client_stub_spec.rb | 166 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_server_spec.rb | 145 |
4 files changed, 276 insertions, 50 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 4a748a4ac2..84c8887168 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -44,9 +44,9 @@ module GRPC include Core::TimeConsts include Core::CallOps extend Forwardable - attr_reader :deadline, :metadata_sent, :metadata_to_send + attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, - :peer, :peer_cert, :trailing_metadata, :status + :trailing_metadata, :status # client_invoke begins a client invocation. # @@ -105,8 +105,13 @@ module GRPC @input_stream_done = false @call_finished = false @call_finished_mu = Mutex.new + @client_call_executed = false @client_call_executed_mu = Mutex.new + + # set the peer now so that the accessor can still function + # after the server closes the call + @peer = call.peer end # Sends the initial metadata that has yet to be sent. @@ -541,6 +546,10 @@ module GRPC end end + def attach_peer_cert(peer_cert) + @peer_cert = peer_cert + end + private # To be called once the "input stream" has been completelly @@ -612,6 +621,7 @@ module GRPC # server client_streamer handlers. MultiReqView = view_class(:cancelled?, :deadline, :each_queued_msg, :each_remote_read, :metadata, :output_metadata, + :peer, :peer_cert, :send_initial_metadata, :metadata_to_send, :merge_metadata_to_send, diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index ef2cc0ce91..33b3cea1fc 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -418,6 +418,7 @@ module GRPC metadata_received: true, started: false, metadata_to_send: connect_md) + c.attach_peer_cert(an_rpc.call.peer_cert) mth = an_rpc.method.to_sym [c, mth] end diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 7b5e6a95a4..a8653e73cf 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -37,7 +37,9 @@ include GRPC::Core::TimeConsts include GRPC::Core::CallOps # check that methods on a finished/closed call t crash -def check_op_view_of_finished_client_call_is_robust(op_view) +def check_op_view_of_finished_client_call(op_view, + expected_metadata, + expected_trailing_metadata) # use read_response_stream to try to iterate through # possible response stream fail('need something to attempt reads') unless block_given? @@ -48,21 +50,39 @@ def check_op_view_of_finished_client_call_is_robust(op_view) expect { op_view.start_call }.to raise_error(RuntimeError) + sanity_check_values_of_accessors(op_view, + expected_metadata, + expected_trailing_metadata) + expect do op_view.wait op_view.cancel - - op_view.metadata - op_view.trailing_metadata - op_view.status - - op_view.cancelled? - op_view.deadline - op_view.write_flag op_view.write_flag = 1 end.to_not raise_error end +def sanity_check_values_of_accessors(op_view, + expected_metadata, + expected_trailing_metadata) + expected_status = Struct::Status.new + expected_status.code = 0 + expected_status.details = 'OK' + expected_status.metadata = expected_trailing_metadata + + expect(op_view.status).to eq(expected_status) + expect(op_view.metadata).to eq(expected_metadata) + expect(op_view.trailing_metadata).to eq(expected_trailing_metadata) + + expect(op_view.cancelled?).to be(false) + expect(op_view.write_flag).to be(nil) + + # The deadline attribute of a call can be either + # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive. + # TODO: fix so that the accessor always returns the same type. + expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) || + op_view.deadline.is_a?(Time)).to be(true) +end + describe 'ClientStub' do let(:noop) { proc { |x| x } } @@ -154,7 +174,7 @@ describe 'ClientStub' do server_port = create_test_server host = "localhost:#{server_port}" th = run_request_response(@sent_msg, @resp, @pass, - k1: 'v1', k2: 'v2') + expected_metadata: { k1: 'v1', k2: 'v2' }) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) expect(get_response(stub)).to eq(@resp) th.join @@ -261,8 +281,14 @@ describe 'ClientStub' do def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" - th = run_request_response(@sent_msg, @resp, @pass, - k1: 'v1', k2: 'v2') + + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_request_response( + @sent_msg, @resp, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) expect( get_response(stub, @@ -272,12 +298,14 @@ describe 'ClientStub' do it 'sends metadata to the server ok when running start_call first' do run_op_view_metadata_test(true) - check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } end it 'does not crash when used after the call has been finished' do run_op_view_metadata_test(false) - check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } end end end @@ -300,7 +328,8 @@ describe 'ClientStub' do end it 'should send metadata to the server ok' do - th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) + th = run_client_streamer(@sent_msgs, @resp, @pass, + expected_metadata: @metadata) expect(get_response(@stub)).to eq(@resp) th.join end @@ -347,7 +376,13 @@ describe 'ClientStub' do it_behaves_like 'client streaming' def run_op_view_metadata_test(run_start_call_first) - th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_client_streamer( + @sent_msgs, @resp, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) expect( get_response(@stub, run_start_call_first: run_start_call_first)).to eq(@resp) @@ -356,12 +391,14 @@ describe 'ClientStub' do it 'sends metadata to the server ok when running start_call first' do run_op_view_metadata_test(true) - check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } end it 'does not crash when used after the call has been finished' do run_op_view_metadata_test(false) - check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } end end end @@ -396,7 +433,7 @@ describe 'ClientStub' do server_port = create_test_server host = "localhost:#{server_port}" th = run_server_streamer(@sent_msg, @replys, @fail, - k1: 'v1', k2: 'v2') + expected_metadata: { k1: 'v1', k2: 'v2' }) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) e = get_responses(stub) expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) @@ -459,24 +496,31 @@ describe 'ClientStub' do def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" - th = run_server_streamer(@sent_msg, @replys, @fail, - k1: 'v1', k2: 'v2') + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_server_streamer( + @sent_msg, @replys, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) e = get_responses(stub, run_start_call_first: run_start_call_first) - expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) + expect(e.collect { |r| r }).to eq(@replys) th.join end it 'should send metadata to the server ok when start_call is run first' do run_op_view_metadata_test(true) - check_op_view_of_finished_client_call_is_robust(@op) do |responses| + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| responses.each { |r| p r } end end it 'does not crash when used after the call has been finished' do run_op_view_metadata_test(false) - check_op_view_of_finished_client_call_is_robust(@op) do |responses| + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| responses.each { |r| p r } end end @@ -530,7 +574,7 @@ describe 'ClientStub' do it 'should send metadata to the server ok' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true, - **@metadata) + expected_metadata: @metadata) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) @@ -567,40 +611,52 @@ describe 'ClientStub' do it_behaves_like 'bidi streaming' def run_op_view_metadata_test(run_start_call_first) - th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, - @pass) + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_bidi_streamer_echo_ping_pong( + @sent_msgs, @pass, true, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub, run_start_call_first: run_start_call_first) - expect(e.collect { |r| r }).to eq(@replys) + expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end it 'can run start_call before executing the call' do run_op_view_metadata_test(true) - check_op_view_of_finished_client_call_is_robust(@op) do |responses| + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| responses.each { |r| p r } end end it 'doesnt crash when op_view used after call has finished' do run_op_view_metadata_test(false) - check_op_view_of_finished_client_call_is_robust(@op) do |responses| + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| responses.each { |r| p r } end end end end - def run_server_streamer(expected_input, replys, status, **kw) - wanted_metadata = kw.clone + def run_server_streamer(expected_input, replys, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end expect(c.remote_read).to eq(expected_input) replys.each { |r| c.remote_send(r) } - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end @@ -615,10 +671,13 @@ describe 'ClientStub' do end def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts, - **kw) - wanted_metadata = kw.clone + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end @@ -631,33 +690,44 @@ describe 'ClientStub' do expect(c.remote_read).to eq(i) end end - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end - def run_client_streamer(expected_inputs, resp, status, **kw) - wanted_metadata = kw.clone + def run_client_streamer(expected_inputs, resp, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end - def run_request_response(expected_input, resp, status, **kw) - wanted_metadata = kw.clone + def run_request_response(expected_input, resp, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) expect(c.remote_read).to eq(expected_input) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end @@ -675,13 +745,13 @@ describe 'ClientStub' do @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) end - def expect_server_to_be_invoked(notifier) + def expect_server_to_be_invoked(notifier, metadata_to_send: nil) @server.start notifier.notify(nil) recvd_rpc = @server.request_call recvd_call = recvd_rpc.call recvd_call.metadata = recvd_rpc.metadata - recvd_call.run_batch(SEND_INITIAL_METADATA => nil) + recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send) GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true) end diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 9633a828a2..4258d59851 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -111,6 +111,47 @@ end SlowStub = SlowService.rpc_stub_class +# a test service that hangs onto call objects +# and uses them after the server-side call has been +# finished +class CheckCallAfterFinishedService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg + rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg) + rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg) + attr_reader :server_side_call + + def an_rpc(req, call) + fail 'shouldnt reuse service' unless @call.nil? + @server_side_call = call + req + end + + def a_client_streaming_rpc(call) + fail 'shouldnt reuse service' unless @call.nil? + @server_side_call = call + # iterate through requests so call can complete + call.each_remote_read.each { |r| p r } + EchoMsg.new + end + + def a_server_streaming_rpc(_, call) + fail 'shouldnt reuse service' unless @call.nil? + @server_side_call = call + [EchoMsg.new, EchoMsg.new] + end + + def a_bidi_rpc(requests, call) + fail 'shouldnt reuse service' unless @call.nil? + @server_side_call = call + requests.each { |r| p r } + [EchoMsg.new, EchoMsg.new] + end +end + +CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class + describe GRPC::RpcServer do RpcServer = GRPC::RpcServer StatusCodes = GRPC::Core::StatusCodes @@ -505,5 +546,109 @@ describe GRPC::RpcServer do t.join end end + + context 'when call objects are used after calls have completed' do + before(:each) do + server_opts = { + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) + @alt_host = "0.0.0.0:#{alt_port}" + + @service = CheckCallAfterFinishedService.new + @srv.handle(@service) + @srv_thd = Thread.new { @srv.run } + @srv.wait_till_running + end + + # check that the server-side call is still in a usable state even + # after it has finished + def check_single_req_view_of_finished_call(call) + common_check_of_finished_server_call(call) + + expect(call.peer).to be_a(String) + expect(call.peer_cert).to be(nil) + end + + def check_multi_req_view_of_finished_call(call) + common_check_of_finished_server_call(call) + + expect do + call.each_remote_read.each { |r| p r } + end.to raise_error(GRPC::Core::CallError) + end + + def common_check_of_finished_server_call(call) + expect do + call.merge_metadata_to_send({}) + end.to raise_error(RuntimeError) + + expect do + call.send_initial_metadata + end.to_not raise_error + + expect(call.cancelled?).to be(false) + expect(call.metadata).to be_a(Hash) + expect(call.metadata['user-agent']).to be_a(String) + + expect(call.metadata_sent).to be(true) + expect(call.output_metadata).to eq({}) + expect(call.metadata_to_send).to eq({}) + expect(call.deadline.is_a?(Time)).to be(true) + end + + it 'should not crash when call used after an unary call is finished' do + req = EchoMsg.new + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + resp = stub.an_rpc(req) + expect(resp).to be_a(EchoMsg) + @srv.stop + @srv_thd.join + + check_single_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after client streaming finished' do + requests = [EchoMsg.new, EchoMsg.new] + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + resp = stub.a_client_streaming_rpc(requests) + expect(resp).to be_a(EchoMsg) + @srv.stop + @srv_thd.join + + check_multi_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after server streaming finished' do + req = EchoMsg.new + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + responses = stub.a_server_streaming_rpc(req) + responses.each do |r| + expect(r).to be_a(EchoMsg) + end + @srv.stop + @srv_thd.join + + check_single_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after a bidi call is finished' do + requests = [EchoMsg.new, EchoMsg.new] + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + responses = stub.a_bidi_rpc(requests) + responses.each do |r| + expect(r).to be_a(EchoMsg) + end + @srv.stop + @srv_thd.join + + check_multi_req_view_of_finished_call(@service.server_side_call) + end + end end end |