diff options
Diffstat (limited to 'src/ruby')
-rwxr-xr-x | src/ruby/end2end/channel_closing_driver.rb | 2 | ||||
-rwxr-xr-x | src/ruby/end2end/channel_state_driver.rb | 2 | ||||
-rwxr-xr-x | src/ruby/end2end/end2end_common.rb | 5 | ||||
-rwxr-xr-x | src/ruby/end2end/killed_client_thread_client.rb | 58 | ||||
-rwxr-xr-x | src/ruby/end2end/killed_client_thread_driver.rb | 114 | ||||
-rwxr-xr-x | src/ruby/end2end/sig_handling_driver.rb | 2 | ||||
-rwxr-xr-x | src/ruby/end2end/sig_int_during_channel_watch_driver.rb | 2 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_call.c | 18 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_server_pool_spec.rb | 44 |
9 files changed, 217 insertions, 30 deletions
diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb index 43e2fe8cbb..d3e5373b0b 100755 --- a/src/ruby/end2end/channel_closing_driver.rb +++ b/src/ruby/end2end/channel_closing_driver.rb @@ -36,7 +36,7 @@ require_relative './end2end_common' def main STDERR.puts 'start server' - server_runner = ServerRunner.new + server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run sleep 1 diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb index c3184bf939..80fb62899e 100755 --- a/src/ruby/end2end/channel_state_driver.rb +++ b/src/ruby/end2end/channel_state_driver.rb @@ -35,7 +35,7 @@ require_relative './end2end_common' def main STDERR.puts 'start server' - server_runner = ServerRunner.new + server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run sleep 1 diff --git a/src/ruby/end2end/end2end_common.rb b/src/ruby/end2end/end2end_common.rb index 9534bb2078..1c87ceddf1 100755 --- a/src/ruby/end2end/end2end_common.rb +++ b/src/ruby/end2end/end2end_common.rb @@ -55,13 +55,14 @@ end # ServerRunner starts an "echo server" that test clients can make calls to class ServerRunner - def initialize + def initialize(service_impl) + @service_impl = service_impl end def run @srv = GRPC::RpcServer.new port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) - @srv.handle(EchoServerImpl) + @srv.handle(@service_impl) @thd = Thread.new do @srv.run diff --git a/src/ruby/end2end/killed_client_thread_client.rb b/src/ruby/end2end/killed_client_thread_client.rb new file mode 100755 index 0000000000..d5a7db7d58 --- /dev/null +++ b/src/ruby/end2end/killed_client_thread_client.rb @@ -0,0 +1,58 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# Attempt to reproduce +# https://github.com/GoogleCloudPlatform/google-cloud-ruby/issues/1327 + +require_relative './end2end_common' + +def main + server_port = '' + OptionParser.new do |opts| + opts.on('--client_control_port=P', String) do + STDERR.puts 'client control port not used' + end + opts.on('--server_port=P', String) do |p| + server_port = p + end + end.parse! + + thd = Thread.new do + stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", + :this_channel_is_insecure) + stub.echo(Echo::EchoRequest.new(request: 'hello')) + fail 'the clients rpc in this test shouldnt complete. ' \ + 'expecting SIGINT to happen in the middle of the call' + end + thd.join +end + +main diff --git a/src/ruby/end2end/killed_client_thread_driver.rb b/src/ruby/end2end/killed_client_thread_driver.rb new file mode 100755 index 0000000000..f76d3e1746 --- /dev/null +++ b/src/ruby/end2end/killed_client_thread_driver.rb @@ -0,0 +1,114 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative './end2end_common' + +# Service that sleeps for a long time upon receiving an 'echo request' +# Also, this notifies @call_started_cv once it has received a request. +class SleepingEchoServerImpl < Echo::EchoServer::Service + def initialize(call_started, call_started_mu, call_started_cv) + @call_started = call_started + @call_started_mu = call_started_mu + @call_started_cv = call_started_cv + end + + def echo(echo_req, _) + @call_started_mu.synchronize do + @call_started.set_true + @call_started_cv.signal + end + sleep 1000 + Echo::EchoReply.new(response: echo_req.request) + end +end + +# Mutable boolean +class BoolHolder + attr_reader :val + + def init + @val = false + end + + def set_true + @val = true + end +end + +def main + STDERR.puts 'start server' + + call_started = BoolHolder.new + call_started_mu = Mutex.new + call_started_cv = ConditionVariable.new + + service_impl = SleepingEchoServerImpl.new(call_started, + call_started_mu, + call_started_cv) + server_runner = ServerRunner.new(service_impl) + server_port = server_runner.run + + STDERR.puts 'start client' + _, client_pid = start_client('killed_client_thread_client.rb', + server_port) + + call_started_mu.synchronize do + call_started_cv.wait(call_started_mu) until call_started.val + end + + # SIGINT the child process now that it's + # in the middle of an RPC (happening on a non-main thread) + Process.kill('SIGINT', client_pid) + STDERR.puts 'sent shutdown' + + begin + Timeout.timeout(10) do + Process.wait(client_pid) + end + rescue Timeout::Error + STDERR.puts "timeout wait for client pid #{client_pid}" + Process.kill('SIGKILL', client_pid) + Process.wait(client_pid) + STDERR.puts 'killed client child' + raise 'Timed out waiting for client process. ' \ + 'It likely hangs when killed while in the middle of an rpc' + end + + client_exit_code = $CHILD_STATUS + if client_exit_code.termsig != 2 # SIGINT + fail 'expected client exit from SIGINT ' \ + "but got child status: #{client_exit_code}" + end + + server_runner.stop +end + +main diff --git a/src/ruby/end2end/sig_handling_driver.rb b/src/ruby/end2end/sig_handling_driver.rb index c5d46e074c..6691464dc6 100755 --- a/src/ruby/end2end/sig_handling_driver.rb +++ b/src/ruby/end2end/sig_handling_driver.rb @@ -36,7 +36,7 @@ require_relative './end2end_common' def main STDERR.puts 'start server' - server_runner = ServerRunner.new + server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run sleep 1 diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb index 84d039bf19..670cda0919 100755 --- a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb +++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb @@ -36,7 +36,7 @@ require_relative './end2end_common' def main STDERR.puts 'start server' - server_runner = ServerRunner.new + server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run sleep 1 diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index e4b2cfed00..997600b459 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -784,7 +784,7 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) { Only one operation of each type can be active at once in any given batch */ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) { - run_batch_stack st; + run_batch_stack *st = NULL; grpc_rb_call *call = NULL; grpc_event ev; grpc_call_error err; @@ -792,6 +792,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) { VALUE rb_write_flag = rb_ivar_get(self, id_write_flag); unsigned write_flag = 0; void *tag = (void*)&st; + if (RTYPEDDATA_DATA(self) == NULL) { rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call"); return Qnil; @@ -806,14 +807,16 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) { if (rb_write_flag != Qnil) { write_flag = NUM2UINT(rb_write_flag); } - grpc_run_batch_stack_init(&st, write_flag); - grpc_run_batch_stack_fill_ops(&st, ops_hash); + st = gpr_malloc(sizeof(run_batch_stack)); + grpc_run_batch_stack_init(st, write_flag); + grpc_run_batch_stack_fill_ops(st, ops_hash); /* call grpc_call_start_batch, then wait for it to complete using * pluck_event */ - err = grpc_call_start_batch(call->wrapped, st.ops, st.op_num, tag, NULL); + err = grpc_call_start_batch(call->wrapped, st->ops, st->op_num, tag, NULL); if (err != GRPC_CALL_OK) { - grpc_run_batch_stack_cleanup(&st); + grpc_run_batch_stack_cleanup(st); + gpr_free(st); rb_raise(grpc_rb_eCallError, "grpc_call_start_batch failed with %s (code=%d)", grpc_call_error_detail_of(err), err); @@ -826,8 +829,9 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) { } /* Build and return the BatchResult struct result, if there is an error, it's reflected in the status */ - result = grpc_run_batch_stack_build_result(&st); - grpc_run_batch_stack_cleanup(&st); + result = grpc_run_batch_stack_build_result(st); + grpc_run_batch_stack_cleanup(st); + gpr_free(st); return result; } diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb index 69e8222cb9..0803ca74ed 100644 --- a/src/ruby/spec/generic/rpc_server_pool_spec.rb +++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb @@ -52,28 +52,31 @@ describe GRPC::Pool do expect(p.ready_for_work?).to be(false) end - it 'it stops being ready after all workers jobs waiting or running' do + it 'it stops being ready after all workers are busy' do p = Pool.new(5) p.start - job = proc { sleep(3) } # sleep so workers busy when done scheduling - 5.times do - expect(p.ready_for_work?).to be(true) - p.schedule(&job) + + wait_mu = Mutex.new + wait_cv = ConditionVariable.new + wait = true + + job = proc do + wait_mu.synchronize do + wait_cv.wait(wait_mu) while wait + end end - expect(p.ready_for_work?).to be(false) - end - it 'it becomes ready again after jobs complete' do - p = Pool.new(5) - p.start - job = proc {} 5.times do expect(p.ready_for_work?).to be(true) p.schedule(&job) end + expect(p.ready_for_work?).to be(false) - sleep 5 # give the pool time do get at least one task done - expect(p.ready_for_work?).to be(true) + + wait_mu.synchronize do + wait = false + wait_cv.broadcast + end end end @@ -105,13 +108,20 @@ describe GRPC::Pool do it 'stops jobs when there are long running jobs' do p = Pool.new(1) p.start - o, q = Object.new, Queue.new + + wait_forever_mu = Mutex.new + wait_forever_cv = ConditionVariable.new + wait_forever = true + + job_running = Queue.new job = proc do - sleep(5) # long running - q.push(o) + job_running.push(Object.new) + wait_forever_mu.synchronize do + wait_forever_cv.wait while wait_forever + end end p.schedule(&job) - sleep(1) # should ensure the long job gets scheduled + job_running.pop expect { p.stop }.not_to raise_error end end |