aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2017-07-18 17:26:08 -0700
committerGravatar Alexander Polcyn <apolcyn@google.com>2017-07-18 17:26:08 -0700
commit59a19a9d5ecc34b60fd6c035a6cb261261dd48fe (patch)
treef5177b228bfa1ec4c25bde0e48048ec6dd545471 /src/ruby
parentfb1e164cd81bd15b7d2aad595510e0c083fb3d5b (diff)
make sure that client-side view of calls is robust
Diffstat (limited to 'src/ruby')
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb17
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb97
2 files changed, 104 insertions, 10 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 96c773a995..4a748a4ac2 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -46,7 +46,7 @@ module GRPC
extend Forwardable
attr_reader :deadline, :metadata_sent, :metadata_to_send
def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
- :peer, :peer_cert, :trailing_metadata
+ :peer, :peer_cert, :trailing_metadata, :status
# client_invoke begins a client invocation.
#
@@ -105,6 +105,8 @@ module GRPC
@input_stream_done = false
@call_finished = false
@call_finished_mu = Mutex.new
+ @client_call_executed = false
+ @client_call_executed_mu = Mutex.new
end
# Sends the initial metadata that has yet to be sent.
@@ -327,6 +329,7 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def request_response(req, metadata: {})
+ raise_error_if_already_executed
ops = {
SEND_MESSAGE => @marshal.call(req),
SEND_CLOSE_FROM_CLIENT => nil,
@@ -369,6 +372,7 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def client_streamer(requests, metadata: {})
+ raise_error_if_already_executed
begin
merge_metadata_and_send_if_not_already_sent(metadata)
requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
@@ -411,6 +415,7 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Enumerator|nil] a response Enumerator
def server_streamer(req, metadata: {})
+ raise_error_if_already_executed
ops = {
SEND_MESSAGE => @marshal.call(req),
SEND_CLOSE_FROM_CLIENT => nil
@@ -468,6 +473,7 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, metadata: {}, &blk)
+ raise_error_if_already_executed
# Metadata might have already been sent if this is an operation view
merge_metadata_and_send_if_not_already_sent(metadata)
bd = BidiCall.new(@call,
@@ -572,6 +578,15 @@ module GRPC
merge_metadata_to_send(metadata) && send_initial_metadata
end
+ def raise_error_if_already_executed
+ @client_call_executed_mu.synchronize do
+ if @client_call_executed
+ fail GRPC::Core::CallError, 'attempting to re-run a call'
+ end
+ @client_call_executed = true
+ end
+ end
+
def self.view_class(*visible_methods)
Class.new do
extend ::Forwardable
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 3b8f72eda1..7b5e6a95a4 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -36,6 +36,33 @@ include GRPC::Core::StatusCodes
include GRPC::Core::TimeConsts
include GRPC::Core::CallOps
+# check that methods on a finished/closed call t crash
+def check_op_view_of_finished_client_call_is_robust(op_view)
+ # use read_response_stream to try to iterate through
+ # possible response stream
+ fail('need something to attempt reads') unless block_given?
+ expect do
+ resp = op_view.execute
+ yield resp
+ end.to raise_error(GRPC::Core::CallError)
+
+ expect { op_view.start_call }.to raise_error(RuntimeError)
+
+ expect do
+ op_view.wait
+ op_view.cancel
+
+ op_view.metadata
+ op_view.trailing_metadata
+ op_view.status
+
+ op_view.cancelled?
+ op_view.deadline
+ op_view.write_flag
+ op_view.write_flag = 1
+ end.to_not raise_error
+end
+
describe 'ClientStub' do
let(:noop) { proc { |x| x } }
@@ -231,15 +258,27 @@ describe 'ClientStub' do
it_behaves_like 'request response'
- it 'sends metadata to the server ok when running start_call first' do
+ def run_op_view_metadata_test(run_start_call_first)
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @pass,
k1: 'v1', k2: 'v2')
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
- expect(get_response(stub)).to eq(@resp)
+ expect(
+ get_response(stub,
+ run_start_call_first: run_start_call_first)).to eq(@resp)
th.join
end
+
+ it 'sends metadata to the server ok when running start_call first' do
+ run_op_view_metadata_test(true)
+ check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
+ end
+
+ it 'does not crash when used after the call has been finished' do
+ run_op_view_metadata_test(false)
+ check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
+ end
end
end
@@ -307,11 +346,23 @@ describe 'ClientStub' do
it_behaves_like 'client streaming'
- it 'sends metadata to the server ok when running start_call first' do
+ def run_op_view_metadata_test(run_start_call_first)
th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
- expect(get_response(@stub, run_start_call_first: true)).to eq(@resp)
+ expect(
+ get_response(@stub,
+ run_start_call_first: run_start_call_first)).to eq(@resp)
th.join
end
+
+ it 'sends metadata to the server ok when running start_call first' do
+ run_op_view_metadata_test(true)
+ check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
+ end
+
+ it 'does not crash when used after the call has been finished' do
+ run_op_view_metadata_test(false)
+ check_op_view_of_finished_client_call_is_robust(@op) { |r| p r }
+ end
end
end
@@ -377,7 +428,7 @@ describe 'ClientStub' do
end
end
- describe 'without a call operation', test2: true do
+ describe 'without a call operation' do
def get_responses(stub, unmarshal: noop)
e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
metadata: @metadata)
@@ -405,16 +456,30 @@ describe 'ClientStub' do
it_behaves_like 'server streaming'
- it 'should send metadata to the server ok when start_call is run first' do
+ def run_op_view_metadata_test(run_start_call_first)
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @fail,
k1: 'v1', k2: 'v2')
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
- e = get_responses(stub, run_start_call_first: true)
+ e = get_responses(stub, run_start_call_first: run_start_call_first)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
th.join
end
+
+ it 'should send metadata to the server ok when start_call is run first' do
+ run_op_view_metadata_test(true)
+ check_op_view_of_finished_client_call_is_robust(@op) do |responses|
+ responses.each { |r| p r }
+ end
+ end
+
+ it 'does not crash when used after the call has been finished' do
+ run_op_view_metadata_test(false)
+ check_op_view_of_finished_client_call_is_robust(@op) do |responses|
+ responses.each { |r| p r }
+ end
+ end
end
end
@@ -501,14 +566,28 @@ describe 'ClientStub' do
it_behaves_like 'bidi streaming'
- it 'can run start_call before executing the call' do
+ def run_op_view_metadata_test(run_start_call_first)
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
- e = get_responses(stub, run_start_call_first: true)
+ e = get_responses(stub, run_start_call_first: run_start_call_first)
expect(e.collect { |r| r }).to eq(@replys)
th.join
end
+
+ it 'can run start_call before executing the call' do
+ run_op_view_metadata_test(true)
+ check_op_view_of_finished_client_call_is_robust(@op) do |responses|
+ responses.each { |r| p r }
+ end
+ end
+
+ it 'doesnt crash when op_view used after call has finished' do
+ run_op_view_metadata_test(false)
+ check_op_view_of_finished_client_call_is_robust(@op) do |responses|
+ responses.each { |r| p r }
+ end
+ end
end
end