diff options
Diffstat (limited to 'src/ruby/spec/generic/client_stub_spec.rb')
-rw-r--r-- | src/ruby/spec/generic/client_stub_spec.rb | 484 |
1 files changed, 484 insertions, 0 deletions
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb new file mode 100644 index 0000000000..c8dee74563 --- /dev/null +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -0,0 +1,484 @@ +# Copyright 2014, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require 'grpc' +require 'grpc/generic/active_call' +require 'grpc/generic/client_stub' +require 'xray/thread_dump_signal_handler' +require_relative '../port_picker' + +NOOP = Proc.new { |x| x } + +def wakey_thread(&blk) + awake_mutex, awake_cond = Mutex.new, ConditionVariable.new + t = Thread.new do + blk.call(awake_mutex, awake_cond) + end + awake_mutex.synchronize { awake_cond.wait(awake_mutex) } + t +end + + +include GRPC::StatusCodes + +describe 'ClientStub' do + BadStatus = GRPC::BadStatus + TimeConsts = GRPC::TimeConsts + + before(:each) do + Thread.abort_on_exception = true + @server = nil + @method = 'an_rpc_method' + @pass = OK + @fail = INTERNAL + @cq = GRPC::CompletionQueue.new + end + + after(:each) do + @server.close unless @server.nil? + end + + describe '#new' do + + it 'can be created from a host and args' do + host = new_test_host + opts = {:a_channel_arg => 'an_arg'} + blk = Proc.new do + GRPC::ClientStub.new(host, @cq, **opts) + end + expect(&blk).not_to raise_error + end + + it 'can be created with a default deadline' do + host = new_test_host + opts = {:a_channel_arg => 'an_arg', :deadline => 5} + blk = Proc.new do + GRPC::ClientStub.new(host, @cq, **opts) + end + expect(&blk).not_to raise_error + end + + it 'can be created with an channel override' do + host = new_test_host + opts = {:a_channel_arg => 'an_arg', :channel_override => @ch} + blk = Proc.new do + GRPC::ClientStub.new(host, @cq, **opts) + end + expect(&blk).not_to raise_error + end + + it 'cannot be created with a bad channel override' do + host = new_test_host + blk = Proc.new do + opts = {:a_channel_arg => 'an_arg', :channel_override => Object.new} + GRPC::ClientStub.new(host, @cq, **opts) + end + expect(&blk).to raise_error + end + + end + + describe '#request_response' do + before(:each) do + @sent_msg, @resp = 'a_msg', 'a_reply' + end + + describe 'without a call operation' do + + it 'should send a request to/receive a_reply from a server' do + host = new_test_host + th = run_request_response(host, @sent_msg, @resp, @pass) + stub = GRPC::ClientStub.new(host, @cq) + resp = stub.request_response(@method, @sent_msg, NOOP, NOOP) + expect(resp).to eq(@resp) + th.join + end + + it 'should send a request when configured using an override channel' do + alt_host = new_test_host + th = run_request_response(alt_host, @sent_msg, @resp, @pass) + ch = GRPC::Channel.new(alt_host, nil) + stub = GRPC::ClientStub.new('ignored-host', @cq, + channel_override:ch) + resp = stub.request_response(@method, @sent_msg, NOOP, NOOP) + expect(resp).to eq(@resp) + th.join + end + + it 'should raise an error if the status is not OK' do + host = new_test_host + th = run_request_response(host, @sent_msg, @resp, @fail) + stub = GRPC::ClientStub.new(host, @cq) + blk = Proc.new do + stub.request_response(@method, @sent_msg, NOOP, NOOP) + end + expect(&blk).to raise_error(BadStatus) + th.join + end + + end + + describe 'via a call operation' do + + it 'should send a request to/receive a_reply from a server' do + host = new_test_host + th = run_request_response(host, @sent_msg, @resp, @pass) + stub = GRPC::ClientStub.new(host, @cq) + op = stub.request_response(@method, @sent_msg, NOOP, NOOP, + return_op:true) + expect(op).to be_a(GRPC::ActiveCall::Operation) + resp = op.execute() + expect(resp).to eq(@resp) + th.join + end + + it 'should raise an error if the status is not OK' do + host = new_test_host + th = run_request_response(host, @sent_msg, @resp, @fail) + stub = GRPC::ClientStub.new(host, @cq) + op = stub.request_response(@method, @sent_msg, NOOP, NOOP, + return_op:true) + expect(op).to be_a(GRPC::ActiveCall::Operation) + blk = Proc.new do + op.execute() + end + expect(&blk).to raise_error(BadStatus) + th.join + end + + end + + end + + describe '#client_streamer' do + + before(:each) do + @sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s } + @resp = 'a_reply' + end + + describe 'without a call operation' do + + it 'should send requests to/receive a reply from a server' do + host = new_test_host + th = run_client_streamer(host, @sent_msgs, @resp, @pass) + stub = GRPC::ClientStub.new(host, @cq) + resp = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP) + expect(resp).to eq(@resp) + th.join + end + + it 'should raise an error if the status is not ok' do + host = new_test_host + th = run_client_streamer(host, @sent_msgs, @resp, @fail) + stub = GRPC::ClientStub.new(host, @cq) + blk = Proc.new do + stub.client_streamer(@method, @sent_msgs, NOOP, NOOP) + end + expect(&blk).to raise_error(BadStatus) + th.join + end + + end + + describe 'via a call operation' do + + it 'should send requests to/receive a reply from a server' do + host = new_test_host + th = run_client_streamer(host, @sent_msgs, @resp, @pass) + stub = GRPC::ClientStub.new(host, @cq) + op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP, + return_op:true) + expect(op).to be_a(GRPC::ActiveCall::Operation) + resp = op.execute() + expect(resp).to eq(@resp) + th.join + end + + it 'should raise an error if the status is not ok' do + host = new_test_host + th = run_client_streamer(host, @sent_msgs, @resp, @fail) + stub = GRPC::ClientStub.new(host, @cq) + op = stub.client_streamer(@method, @sent_msgs, NOOP, NOOP, + return_op:true) + expect(op).to be_a(GRPC::ActiveCall::Operation) + blk = Proc.new do + op.execute() + end + expect(&blk).to raise_error(BadStatus) + th.join + end + + end + + end + + describe '#server_streamer' do + + before(:each) do + @sent_msg = 'a_msg' + @replys = Array.new(3) { |i| 'reply_' + (i+1).to_s } + end + + describe 'without a call operation' do + + it 'should send a request to/receive replies from a server' do + host = new_test_host + th = run_server_streamer(host, @sent_msg, @replys, @pass) + stub = GRPC::ClientStub.new(host, @cq) + e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP) + expect(e).to be_a(Enumerator) + expect(e.collect { |r| r }).to eq(@replys) + th.join + end + + it 'should raise an error if the status is not ok' do + host = new_test_host + th = run_server_streamer(host, @sent_msg, @replys, @fail) + stub = GRPC::ClientStub.new(host, @cq) + e = stub.server_streamer(@method, @sent_msg, NOOP, NOOP) + expect(e).to be_a(Enumerator) + expect { e.collect { |r| r } }.to raise_error(BadStatus) + th.join + end + + end + + describe 'via a call operation' do + + it 'should send a request to/receive replies from a server' do + host = new_test_host + th = run_server_streamer(host, @sent_msg, @replys, @pass) + stub = GRPC::ClientStub.new(host, @cq) + op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP, + return_op:true) + expect(op).to be_a(GRPC::ActiveCall::Operation) + e = op.execute() + expect(e).to be_a(Enumerator) + th.join + end + + it 'should raise an error if the status is not ok' do + host = new_test_host + th = run_server_streamer(host, @sent_msg, @replys, @fail) + stub = GRPC::ClientStub.new(host, @cq) + op = stub.server_streamer(@method, @sent_msg, NOOP, NOOP, + return_op:true) + expect(op).to be_a(GRPC::ActiveCall::Operation) + e = op.execute() + expect(e).to be_a(Enumerator) + expect { e.collect { |r| r } }.to raise_error(BadStatus) + th.join + end + + end + + end + + describe '#bidi_streamer' do + before(:each) do + @sent_msgs = Array.new(3) { |i| 'msg_' + (i+1).to_s } + @replys = Array.new(3) { |i| 'reply_' + (i+1).to_s } + end + + describe 'without a call operation' do + + it 'supports a simple scenario with all requests sent first' do + host = new_test_host + th = run_bidi_streamer_handle_inputs_first(host, @sent_msgs, @replys, + @pass) + stub = GRPC::ClientStub.new(host, @cq) + e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP) + expect(e).to be_a(Enumerator) + expect(e.collect { |r| r }).to eq(@replys) + th.join + end + + it 'supports a simple scenario with a client-initiated ping pong' do + host = new_test_host + th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, true) + stub = GRPC::ClientStub.new(host, @cq) + e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP) + expect(e).to be_a(Enumerator) + expect(e.collect { |r| r }).to eq(@sent_msgs) + th.join + end + + # disabled because an unresolved wire-protocol implementation feature + # + # - servers should be able initiate messaging, however, as it stand + # servers don't know if all the client metadata has been sent until + # they receive a message from the client. Without receiving all the + # metadata, the server does not accept the call, so this test hangs. + xit 'supports a simple scenario with a server-initiated ping pong' do + host = new_test_host + th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, false) + stub = GRPC::ClientStub.new(host, @cq) + e = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP) + expect(e).to be_a(Enumerator) + expect(e.collect { |r| r }).to eq(@sent_msgs) + th.join + end + + end + + describe 'via a call operation' do + + it 'supports a simple scenario with all requests sent first' do + host = new_test_host + th = run_bidi_streamer_handle_inputs_first(host, @sent_msgs, @replys, + @pass) + stub = GRPC::ClientStub.new(host, @cq) + op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP, + return_op:true) + expect(op).to be_a(GRPC::ActiveCall::Operation) + e = op.execute + expect(e).to be_a(Enumerator) + expect(e.collect { |r| r }).to eq(@replys) + th.join + end + + it 'supports a simple scenario with a client-initiated ping pong' do + host = new_test_host + th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, true) + stub = GRPC::ClientStub.new(host, @cq) + op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP, + return_op:true) + expect(op).to be_a(GRPC::ActiveCall::Operation) + e = op.execute + expect(e).to be_a(Enumerator) + expect(e.collect { |r| r }).to eq(@sent_msgs) + th.join + end + + # disabled because an unresolved wire-protocol implementation feature + # + # - servers should be able initiate messaging, however, as it stand + # servers don't know if all the client metadata has been sent until + # they receive a message from the client. Without receiving all the + # metadata, the server does not accept the call, so this test hangs. + xit 'supports a simple scenario with a server-initiated ping pong' do + th = run_bidi_streamer_echo_ping_pong(host, @sent_msgs, @pass, false) + stub = GRPC::ClientStub.new(host, @cq) + op = stub.bidi_streamer(@method, @sent_msgs, NOOP, NOOP, + return_op:true) + expect(op).to be_a(GRPC::ActiveCall::Operation) + e = op.execute + expect(e).to be_a(Enumerator) + expect(e.collect { |r| r }).to eq(@sent_msgs) + th.join + end + + end + + end + + def run_server_streamer(hostname, expected_input, replys, status) + wakey_thread do |mtx, cnd| + c = expect_server_to_be_invoked(hostname, mtx, cnd) + expect(c.remote_read).to eq(expected_input) + replys.each { |r| c.remote_send(r) } + c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + end + end + + def run_bidi_streamer_handle_inputs_first(hostname, expected_inputs, replys, + status) + wakey_thread do |mtx, cnd| + c = expect_server_to_be_invoked(hostname, mtx, cnd) + expected_inputs.each { |i| expect(c.remote_read).to eq(i) } + replys.each { |r| c.remote_send(r) } + c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + end + end + + def run_bidi_streamer_echo_ping_pong(hostname, expected_inputs, status, + client_starts) + wakey_thread do |mtx, cnd| + c = expect_server_to_be_invoked(hostname, mtx, cnd) + expected_inputs.each do |i| + if client_starts + expect(c.remote_read).to eq(i) + c.remote_send(i) + else + c.remote_send(i) + expect(c.remote_read).to eq(i) + end + end + c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + end + end + + def run_client_streamer(hostname, expected_inputs, resp, status) + wakey_thread do |mtx, cnd| + c = expect_server_to_be_invoked(hostname, mtx, cnd) + expected_inputs.each { |i| expect(c.remote_read).to eq(i) } + c.remote_send(resp) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + end + end + + def run_request_response(hostname, expected_input, resp, status) + wakey_thread do |mtx, cnd| + c = expect_server_to_be_invoked(hostname, mtx, cnd) + expect(c.remote_read).to eq(expected_input) + c.remote_send(resp) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + end + end + + def start_test_server(hostname, awake_mutex, awake_cond) + server_queue = GRPC::CompletionQueue.new + @server = GRPC::Server.new(server_queue, nil) + @server.add_http2_port(hostname) + @server.start + @server_tag = Object.new + @server.request_call(@server_tag) + awake_mutex.synchronize { awake_cond.signal } + server_queue + end + + def expect_server_to_be_invoked(hostname, awake_mutex, awake_cond) + server_queue = start_test_server(hostname, awake_mutex, awake_cond) + test_deadline = Time.now + 10 # fail tests after 10 seconds + ev = server_queue.pluck(@server_tag, TimeConsts::INFINITE_FUTURE) + raise OutOfTime if ev.nil? + finished_tag = Object.new + ev.call.accept(server_queue, finished_tag) + GRPC::ActiveCall.new(ev.call, server_queue, NOOP, + NOOP, TimeConsts::INFINITE_FUTURE, + finished_tag: finished_tag) + end + + def new_test_host + port = find_unused_tcp_port + "localhost:#{port}" + end + +end |