diff options
author | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2017-03-24 00:54:47 +0100 |
---|---|---|
committer | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2017-03-24 00:54:47 +0100 |
commit | e44a50ffd740e576938f2d6892350cc337051924 (patch) | |
tree | 19dff6f774731c6c40f229a8373585e32f98c94c /src/ruby | |
parent | 6f1e443a519cd28d97be78c5ca2ca72a45f6b598 (diff) | |
parent | d0432ff5af7f6af26d441bc05454f9f511843628 (diff) |
Merge branch 'v1.2.x' of https://github.com/grpc/grpc into upmerge-1.2.0
Diffstat (limited to 'src/ruby')
21 files changed, 1379 insertions, 69 deletions
diff --git a/src/ruby/.rubocop.yml b/src/ruby/.rubocop.yml index 0f61ccfa81..7174f3b15a 100644 --- a/src/ruby/.rubocop.yml +++ b/src/ruby/.rubocop.yml @@ -9,6 +9,7 @@ AllCops: - 'bin/math_services_pb.rb' - 'pb/grpc/health/v1/*' - 'pb/test/**/*' + - 'end2end/lib/*' Metrics/CyclomaticComplexity: Max: 9 diff --git a/src/ruby/end2end/README.md b/src/ruby/end2end/README.md new file mode 100644 index 0000000000..ea5ab6d4bc --- /dev/null +++ b/src/ruby/end2end/README.md @@ -0,0 +1,18 @@ +This directory contains some grpc-ruby end to end tests. + +Each test here involves two files: a "driver" and a "client". For example, +the "channel_closing" test involves channel_closing_driver.rb +and channel_closing_client.rb. + +Typically, the "driver" will start up a simple "echo" server, and then +spawn a client. It gives the client the address of the "echo" server as +well as an address to listen on for control rpcs. Depending on the test, the +client usually starts up a "ClientControl" grpc server for the driver to +interact with (the driver can tell the client process to do strange things at +different times, depending on the test). + +So far these tests are mostly useful for testing process-shutdown related +situations, since the client's run in separate processes. + +These tests are invoked through the "tools/run_tests/run_tests.py" script (the +Rakefile doesn't start these). diff --git a/src/ruby/end2end/channel_closing_client.rb b/src/ruby/end2end/channel_closing_client.rb new file mode 100755 index 0000000000..8449797360 --- /dev/null +++ b/src/ruby/end2end/channel_closing_client.rb @@ -0,0 +1,84 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative './end2end_common' + +# Calls '#close' on a Channel when "shutdown" called. This tries to +# trigger a hang or crash bug by closing a channel actively being watched +class ChannelClosingClientController < ClientControl::ClientController::Service + def initialize(ch) + @ch = ch + end + + def shutdown(_, _) + @ch.close + ClientControl::Void.new + end +end + +def main + client_control_port = '' + server_port = '' + OptionParser.new do |opts| + opts.on('--client_control_port=P', String) do |p| + client_control_port = p + end + opts.on('--server_port=P', String) do |p| + server_port = p + end + end.parse! + + ch = GRPC::Core::Channel.new("localhost:#{server_port}", {}, + :this_channel_is_insecure) + + srv = GRPC::RpcServer.new + thd = Thread.new do + srv.add_http2_port("0.0.0.0:#{client_control_port}", :this_port_is_insecure) + srv.handle(ChannelClosingClientController.new(ch)) + srv.run + end + + # this should break out with an exception once the channel is closed + loop do + begin + state = ch.connectivity_state(true) + ch.watch_connectivity_state(state, Time.now + 360) + rescue RuntimeError => e + STDERR.puts "(expected) error occurred: #{e.inspect}" + break + end + end + + srv.stop + thd.join +end + +main diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb new file mode 100755 index 0000000000..43e2fe8cbb --- /dev/null +++ b/src/ruby/end2end/channel_closing_driver.rb @@ -0,0 +1,67 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# make sure that the client doesn't hang when channel is closed +# explictly while it's used + +require_relative './end2end_common' + +def main + STDERR.puts 'start server' + server_runner = ServerRunner.new + server_port = server_runner.run + + sleep 1 + + STDERR.puts 'start client' + control_stub, client_pid = start_client('channel_closing_client.rb', + server_port) + + sleep 3 + + begin + Timeout.timeout(10) do + control_stub.shutdown(ClientControl::Void.new) + Process.wait(client_pid) + end + rescue Timeout::Error + STDERR.puts "timeout wait for client pid #{client_pid}" + Process.kill('SIGKILL', client_pid) + Process.wait(client_pid) + STDERR.puts 'killed client child' + raise 'Timed out waiting for client process. It likely hangs when a ' \ + 'channel is closed while connectivity is watched' + end + + server_runner.stop +end + +main diff --git a/src/ruby/end2end/channel_state_client.rb b/src/ruby/end2end/channel_state_client.rb new file mode 100755 index 0000000000..08c21bbb8e --- /dev/null +++ b/src/ruby/end2end/channel_state_client.rb @@ -0,0 +1,54 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative './end2end_common' + +def main + server_port = '' + OptionParser.new do |opts| + opts.on('--client_control_port=P', String) do + STDERR.puts 'client_control_port ignored' + end + opts.on('--server_port=P', String) do |p| + server_port = p + end + end.parse! + + ch = GRPC::Core::Channel.new("localhost:#{server_port}", {}, + :this_channel_is_insecure) + + loop do + state = ch.connectivity_state + ch.watch_connectivity_state(state, Time.now + 360) + end +end + +main diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb new file mode 100755 index 0000000000..c3184bf939 --- /dev/null +++ b/src/ruby/end2end/channel_state_driver.rb @@ -0,0 +1,64 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# make sure that the client doesn't hang when process ended abruptly + +require_relative './end2end_common' + +def main + STDERR.puts 'start server' + server_runner = ServerRunner.new + server_port = server_runner.run + + sleep 1 + + STDERR.puts 'start client' + _, client_pid = start_client('channel_state_client.rb', server_port) + + sleep 3 + + Process.kill('SIGTERM', client_pid) + + begin + Timeout.timeout(10) { Process.wait(client_pid) } + rescue Timeout::Error + STDERR.puts "timeout wait for client pid #{client_pid}" + Process.kill('SIGKILL', client_pid) + Process.wait(client_pid) + STDERR.puts 'killed client child' + raise 'Timed out waiting for client process. ' \ + 'It likely hangs when ended abruptly' + end + + server_runner.stop +end + +main diff --git a/src/ruby/end2end/end2end_common.rb b/src/ruby/end2end/end2end_common.rb new file mode 100755 index 0000000000..9534bb2078 --- /dev/null +++ b/src/ruby/end2end/end2end_common.rb @@ -0,0 +1,109 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +this_dir = File.expand_path(File.dirname(__FILE__)) +protos_lib_dir = File.join(this_dir, 'lib') +grpc_lib_dir = File.join(File.dirname(this_dir), 'lib') +$LOAD_PATH.unshift(grpc_lib_dir) unless $LOAD_PATH.include?(grpc_lib_dir) +$LOAD_PATH.unshift(protos_lib_dir) unless $LOAD_PATH.include?(protos_lib_dir) +$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) + +require 'grpc' +require 'echo_services_pb' +require 'client_control_services_pb' +require 'socket' +require 'optparse' +require 'thread' +require 'timeout' +require 'English' # see https://github.com/bbatsov/rubocop/issues/1747 + +# GreeterServer is simple server that implements the Helloworld Greeter server. +class EchoServerImpl < Echo::EchoServer::Service + # say_hello implements the SayHello rpc method. + def echo(echo_req, _) + Echo::EchoReply.new(response: echo_req.request) + end +end + +# ServerRunner starts an "echo server" that test clients can make calls to +class ServerRunner + def initialize + end + + def run + @srv = GRPC::RpcServer.new + port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) + @srv.handle(EchoServerImpl) + + @thd = Thread.new do + @srv.run + end + @srv.wait_till_running + port + end + + def stop + @srv.stop + @thd.join + fail 'server not stopped' unless @srv.stopped? + end +end + +def start_client(client_main, server_port) + this_dir = File.expand_path(File.dirname(__FILE__)) + + tmp_server = TCPServer.new(0) + client_control_port = tmp_server.local_address.ip_port + tmp_server.close + + client_path = File.join(this_dir, client_main) + client_pid = Process.spawn(RbConfig.ruby, + client_path, + "--client_control_port=#{client_control_port}", + "--server_port=#{server_port}") + sleep 1 + control_stub = ClientControl::ClientController::Stub.new( + "localhost:#{client_control_port}", :this_channel_is_insecure) + [control_stub, client_pid] +end + +def cleanup(control_stub, client_pid, server_runner) + control_stub.shutdown(ClientControl::Void.new) + Process.wait(client_pid) + + client_exit_code = $CHILD_STATUS + + if client_exit_code != 0 + fail "term sig test failure: client exit code: #{client_exit_code}" + end + + server_runner.stop +end diff --git a/src/ruby/end2end/gen_protos.sh b/src/ruby/end2end/gen_protos.sh new file mode 100644 index 0000000000..f78d9ad394 --- /dev/null +++ b/src/ruby/end2end/gen_protos.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +grpc_tools_ruby_protoc -I protos --ruby_out=lib --grpc_out=lib protos/echo.proto protos/client_control.proto diff --git a/src/ruby/end2end/lib/client_control_pb.rb b/src/ruby/end2end/lib/client_control_pb.rb new file mode 100644 index 0000000000..1a938f1b5a --- /dev/null +++ b/src/ruby/end2end/lib/client_control_pb.rb @@ -0,0 +1,17 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: client_control.proto + +require 'google/protobuf' + +Google::Protobuf::DescriptorPool.generated_pool.build do + add_message "client_control.DoEchoRpcRequest" do + optional :request, :string, 1 + end + add_message "client_control.Void" do + end +end + +module ClientControl + DoEchoRpcRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("client_control.DoEchoRpcRequest").msgclass + Void = Google::Protobuf::DescriptorPool.generated_pool.lookup("client_control.Void").msgclass +end diff --git a/src/ruby/end2end/lib/client_control_services_pb.rb b/src/ruby/end2end/lib/client_control_services_pb.rb new file mode 100644 index 0000000000..04b2291bc7 --- /dev/null +++ b/src/ruby/end2end/lib/client_control_services_pb.rb @@ -0,0 +1,53 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: client_control.proto for package 'client_control' +# Original file comments: +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + +require 'grpc' +require 'client_control_pb' + +module ClientControl + module ClientController + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'client_control.ClientController' + + rpc :DoEchoRpc, DoEchoRpcRequest, Void + rpc :Shutdown, Void, Void + end + + Stub = Service.rpc_stub_class + end +end diff --git a/src/ruby/end2end/lib/echo_pb.rb b/src/ruby/end2end/lib/echo_pb.rb new file mode 100644 index 0000000000..c62adc0753 --- /dev/null +++ b/src/ruby/end2end/lib/echo_pb.rb @@ -0,0 +1,18 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: echo.proto + +require 'google/protobuf' + +Google::Protobuf::DescriptorPool.generated_pool.build do + add_message "echo.EchoRequest" do + optional :request, :string, 1 + end + add_message "echo.EchoReply" do + optional :response, :string, 1 + end +end + +module Echo + EchoRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("echo.EchoRequest").msgclass + EchoReply = Google::Protobuf::DescriptorPool.generated_pool.lookup("echo.EchoReply").msgclass +end diff --git a/src/ruby/end2end/lib/echo_services_pb.rb b/src/ruby/end2end/lib/echo_services_pb.rb new file mode 100644 index 0000000000..c66e975bf3 --- /dev/null +++ b/src/ruby/end2end/lib/echo_services_pb.rb @@ -0,0 +1,52 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: echo.proto for package 'echo' +# Original file comments: +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + +require 'grpc' +require 'echo_pb' + +module Echo + module EchoServer + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'echo.EchoServer' + + rpc :Echo, EchoRequest, EchoReply + end + + Stub = Service.rpc_stub_class + end +end diff --git a/src/ruby/end2end/protos/client_control.proto b/src/ruby/end2end/protos/client_control.proto new file mode 100644 index 0000000000..f985bb486d --- /dev/null +++ b/src/ruby/end2end/protos/client_control.proto @@ -0,0 +1,43 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package client_control; + +service ClientController { + rpc DoEchoRpc (DoEchoRpcRequest) returns (Void) {} + rpc Shutdown(Void) returns (Void) {} +} + +message DoEchoRpcRequest { + string request = 1; +} + +message Void{} diff --git a/src/ruby/end2end/protos/echo.proto b/src/ruby/end2end/protos/echo.proto new file mode 100644 index 0000000000..d47afef70f --- /dev/null +++ b/src/ruby/end2end/protos/echo.proto @@ -0,0 +1,46 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package echo; + +service EchoServer { + rpc Echo (EchoRequest) returns (EchoReply) {} +} + +// The request message containing the user's name. +message EchoRequest { + string request = 1; +} + +// The response message containing the greetings +message EchoReply { + string response = 1; +} diff --git a/src/ruby/end2end/sig_handling_client.rb b/src/ruby/end2end/sig_handling_client.rb new file mode 100755 index 0000000000..5b1e54718e --- /dev/null +++ b/src/ruby/end2end/sig_handling_client.rb @@ -0,0 +1,89 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative './end2end_common' + +# Test client. Sends RPC's as normal but process also has signal handlers +class SigHandlingClientController < ClientControl::ClientController::Service + def initialize(srv, stub) + @srv = srv + @stub = stub + end + + def do_echo_rpc(req, _) + response = @stub.echo(Echo::EchoRequest.new(request: req.request)) + fail 'bad response' unless response.response == req.request + ClientControl::Void.new + end + + def shutdown(_, _) + Thread.new do + # TODO(apolcyn) There is a race between stopping the + # server and the "shutdown" rpc completing, + # See if stop method on server can end active RPC cleanly, to + # avoid this sleep. + sleep 3 + @srv.stop + end + ClientControl::Void.new + end +end + +def main + client_control_port = '' + server_port = '' + OptionParser.new do |opts| + opts.on('--client_control_port=P', String) do |p| + client_control_port = p + end + opts.on('--server_port=P', String) do |p| + server_port = p + end + end.parse! + + Signal.trap('TERM') do + STDERR.puts 'SIGTERM received' + end + + Signal.trap('INT') do + STDERR.puts 'SIGINT received' + end + + srv = GRPC::RpcServer.new + srv.add_http2_port("0.0.0.0:#{client_control_port}", + :this_port_is_insecure) + stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", + :this_channel_is_insecure) + srv.handle(SigHandlingClientController.new(srv, stub)) + srv.run +end + +main diff --git a/src/ruby/end2end/sig_handling_driver.rb b/src/ruby/end2end/sig_handling_driver.rb new file mode 100755 index 0000000000..c5d46e074c --- /dev/null +++ b/src/ruby/end2end/sig_handling_driver.rb @@ -0,0 +1,61 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# smoke test for a grpc-using app that receives and +# handles process-ending signals + +require_relative './end2end_common' + +def main + STDERR.puts 'start server' + server_runner = ServerRunner.new + server_port = server_runner.run + + sleep 1 + + STDERR.puts 'start client' + control_stub, client_pid = start_client('sig_handling_client.rb', server_port) + + sleep 1 + + count = 0 + while count < 5 + control_stub.do_echo_rpc( + ClientControl::DoEchoRpcRequest.new(request: 'hello')) + Process.kill('SIGTERM', client_pid) + Process.kill('SIGINT', client_pid) + count += 1 + end + + cleanup(control_stub, client_pid, server_runner) +end + +main diff --git a/src/ruby/end2end/sig_int_during_channel_watch_client.rb b/src/ruby/end2end/sig_int_during_channel_watch_client.rb new file mode 100755 index 0000000000..389fc5ba33 --- /dev/null +++ b/src/ruby/end2end/sig_int_during_channel_watch_client.rb @@ -0,0 +1,70 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative './end2end_common' + +# Start polling the channel state in both the main thread +# and a child thread. Try to get the driver to send process-ending +# interrupt while both a child thread and the main thread are in the +# middle of a blocking connectivity_state call. +def main + server_port = '' + OptionParser.new do |opts| + opts.on('--client_control_port=P', String) do + STDERR.puts 'client_control_port not used' + end + opts.on('--server_port=P', String) do |p| + server_port = p + end + end.parse! + + thd = Thread.new do + child_thread_channel = GRPC::Core::Channel.new("localhost:#{server_port}", + {}, + :this_channel_is_insecure) + loop do + state = child_thread_channel.connectivity_state(false) + child_thread_channel.watch_connectivity_state(state, Time.now + 360) + end + end + + main_channel = GRPC::Core::Channel.new("localhost:#{server_port}", + {}, + :this_channel_is_insecure) + loop do + state = main_channel.connectivity_state(false) + main_channel.watch_connectivity_state(state, Time.now + 360) + end + + thd.join +end + +main diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb new file mode 100755 index 0000000000..84d039bf19 --- /dev/null +++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb @@ -0,0 +1,69 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# abruptly end a process that has active calls to +# Channel.watch_connectivity_state + +require_relative './end2end_common' + +def main + STDERR.puts 'start server' + server_runner = ServerRunner.new + server_port = server_runner.run + + sleep 1 + + STDERR.puts 'start client' + _, client_pid = start_client('sig_int_during_channel_watch_client.rb', + server_port) + + # give time for the client to get into the middle + # of a channel state watch call + sleep 1 + Process.kill('SIGINT', client_pid) + + begin + Timeout.timeout(10) do + Process.wait(client_pid) + end + rescue Timeout::Error + STDERR.puts "timeout wait for client pid #{client_pid}" + Process.kill('SIGKILL', client_pid) + Process.wait(client_pid) + STDERR.puts 'killed client child' + raise 'Timed out waiting for client process. It likely hangs when a ' \ + 'SIGINT is sent while there is an active connectivity_state call' + end + + server_runner.stop +end + +main diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 84e43d3f7b..1c20c8813f 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -32,21 +32,22 @@ */ #include <ruby/ruby.h> +#include <ruby/thread.h> #include "rb_grpc_imports.generated.h" -#include "rb_channel.h" #include "rb_byte_buffer.h" +#include "rb_channel.h" #include <grpc/grpc.h> #include <grpc/grpc_security.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/time.h> -#include "rb_grpc.h" #include "rb_call.h" #include "rb_channel_args.h" #include "rb_channel_credentials.h" #include "rb_completion_queue.h" +#include "rb_grpc.h" #include "rb_server.h" /* id_channel is the name of the hidden ivar that preserves a reference to the @@ -73,9 +74,26 @@ typedef struct grpc_rb_channel { /* The actual channel */ grpc_channel *wrapped; - grpc_completion_queue *queue; + int request_safe_destroy; + int safe_to_destroy; + grpc_connectivity_state current_connectivity_state; + + int mu_init_done; + int abort_watch_connectivity_state; + gpr_mu channel_mu; + gpr_cv channel_cv; } grpc_rb_channel; +/* Forward declarations of functions involved in temporary fix to + * https://github.com/grpc/grpc/issues/9941 */ +static void grpc_rb_channel_try_register_connection_polling( + grpc_rb_channel *wrapper); +static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper); + +static grpc_completion_queue *channel_polling_cq; +static gpr_mu global_connection_polling_mu; +static int abort_channel_polling = 0; + /* Destroys Channel instances. */ static void grpc_rb_channel_free(void *p) { grpc_rb_channel *ch = NULL; @@ -85,8 +103,13 @@ static void grpc_rb_channel_free(void *p) { ch = (grpc_rb_channel *)p; if (ch->wrapped != NULL) { - grpc_channel_destroy(ch->wrapped); - grpc_rb_completion_queue_destroy(ch->queue); + grpc_rb_channel_safe_destroy(ch); + ch->wrapped = NULL; + } + + if (ch->mu_init_done) { + gpr_mu_destroy(&ch->channel_mu); + gpr_cv_destroy(&ch->channel_cv); } xfree(p); @@ -104,13 +127,15 @@ static void grpc_rb_channel_mark(void *p) { } } -static rb_data_type_t grpc_channel_data_type = { - "grpc_channel", - {grpc_rb_channel_mark, grpc_rb_channel_free, GRPC_RB_MEMSIZE_UNAVAILABLE, - {NULL, NULL}}, - NULL, NULL, +static rb_data_type_t grpc_channel_data_type = {"grpc_channel", + {grpc_rb_channel_mark, + grpc_rb_channel_free, + GRPC_RB_MEMSIZE_UNAVAILABLE, + {NULL, NULL}}, + NULL, + NULL, #ifdef RUBY_TYPED_FREE_IMMEDIATELY - RUBY_TYPED_FREE_IMMEDIATELY + RUBY_TYPED_FREE_IMMEDIATELY #endif }; @@ -145,6 +170,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); + wrapper->mu_init_done = 0; target_chars = StringValueCStr(target); grpc_rb_hash_convert_to_channel_args(channel_args, &args); if (TYPE(credentials) == T_SYMBOL) { @@ -159,6 +185,27 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { creds = grpc_rb_get_wrapped_channel_credentials(credentials); ch = grpc_secure_channel_create(creds, target_chars, &args, NULL); } + + GPR_ASSERT(ch); + + wrapper->wrapped = ch; + + gpr_mu_init(&wrapper->channel_mu); + gpr_cv_init(&wrapper->channel_cv); + wrapper->mu_init_done = 1; + + gpr_mu_lock(&wrapper->channel_mu); + wrapper->abort_watch_connectivity_state = 0; + wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); + wrapper->safe_to_destroy = 0; + wrapper->request_safe_destroy = 0; + + gpr_cv_broadcast(&wrapper->channel_cv); + gpr_mu_unlock(&wrapper->channel_mu); + + + grpc_rb_channel_try_register_connection_polling(wrapper); + if (args.args != NULL) { xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */ } @@ -169,25 +216,28 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { } rb_ivar_set(self, id_target, target); wrapper->wrapped = ch; - wrapper->queue = grpc_completion_queue_create(NULL); return self; } /* call-seq: - insecure_channel = Channel:new("myhost:8080", {'arg1': 'value1'}) - creds = ... - secure_channel = Channel:new("myhost:443", {'arg1': 'value1'}, creds) + ch.connectivity_state -> state + ch.connectivity_state(true) -> state - Creates channel instances. */ + Indicates the current state of the channel, whose value is one of the + constants defined in GRPC::Core::ConnectivityStates. + + It also tries to connect if the chennel is idle in the second form. */ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, VALUE self) { - VALUE try_to_connect = Qfalse; + VALUE try_to_connect_param = Qfalse; + int grpc_try_to_connect = 0; grpc_rb_channel *wrapper = NULL; grpc_channel *ch = NULL; /* "01" == 0 mandatory args, 1 (try_to_connect) is optional */ - rb_scan_args(argc, argv, "01", try_to_connect); + rb_scan_args(argc, argv, "01", &try_to_connect_param); + grpc_try_to_connect = RTEST(try_to_connect_param) ? 1 : 0; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; @@ -195,57 +245,88 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } - return NUM2LONG( - grpc_channel_check_connectivity_state(ch, (int)try_to_connect)); + return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect)); } -/* Watch for a change in connectivity state. +typedef struct watch_state_stack { + grpc_rb_channel *wrapper; + gpr_timespec deadline; + int last_state; +} watch_state_stack; + +static void *watch_channel_state_without_gvl(void *arg) { + watch_state_stack *stack = (watch_state_stack*)arg; + gpr_timespec deadline = stack->deadline; + grpc_rb_channel *wrapper = stack->wrapper; + int last_state = stack->last_state; + void *return_value = (void*)0; + + gpr_mu_lock(&wrapper->channel_mu); + while(wrapper->current_connectivity_state == last_state && + !wrapper->request_safe_destroy && + !wrapper->safe_to_destroy && + !wrapper->abort_watch_connectivity_state && + gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) { + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); + } + if (wrapper->current_connectivity_state != last_state) { + return_value = (void*)1; + } + gpr_mu_unlock(&wrapper->channel_mu); + + return return_value; +} - Once the channel connectivity state is different from the last observed - state, tag will be enqueued on cq with success=1 +static void watch_channel_state_unblocking_func(void *arg) { + grpc_rb_channel *wrapper = (grpc_rb_channel*)arg; + gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called"); + gpr_mu_lock(&wrapper->channel_mu); + wrapper->abort_watch_connectivity_state = 1; + gpr_cv_broadcast(&wrapper->channel_cv); + gpr_mu_unlock(&wrapper->channel_mu); +} - If deadline expires BEFORE the state is changed, tag will be enqueued on - the completion queue with success=0 */ +/* Wait until the channel's connectivity state becomes different from + * "last_state", or "deadline" expires. + * Returns true if the the channel's connectivity state becomes + * different from "last_state" within "deadline". + * Returns false if "deadline" expires before the channel's connectivity + * state changes from "last_state". + * */ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, VALUE deadline) { grpc_rb_channel *wrapper = NULL; - grpc_channel *ch = NULL; - grpc_completion_queue *cq = NULL; - - void *tag = wrapper; - - grpc_event event; + watch_state_stack stack; + void* out; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); - ch = wrapper->wrapped; - cq = wrapper->queue; - if (ch == NULL) { + + if (wrapper->wrapped == NULL) { rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } - grpc_channel_watch_connectivity_state( - ch, - (grpc_connectivity_state)NUM2LONG(last_state), - grpc_rb_time_timeval(deadline, /* absolute time */ 0), - cq, - tag); - event = rb_completion_queue_pluck(cq, tag, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + if (!FIXNUM_P(last_state)) { + rb_raise(rb_eTypeError, "bad type for last_state. want a GRPC::Core::ChannelState constant"); + return Qnil; + } - if (event.success) { + stack.wrapper = wrapper; + stack.deadline = grpc_rb_time_timeval(deadline, 0); + stack.last_state = NUM2LONG(last_state); + out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper); + if (out) { return Qtrue; - } else { - return Qfalse; } + return Qfalse; } /* Create a call given a grpc_channel, in order to call method. The request is not sent until grpc_call_invoke is called. */ -static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, - VALUE mask, VALUE method, - VALUE host, VALUE deadline) { +static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, + VALUE method, VALUE host, + VALUE deadline) { VALUE res = Qnil; grpc_rb_channel *wrapper = NULL; grpc_call *call = NULL; @@ -256,10 +337,11 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, grpc_slice method_slice; grpc_slice host_slice; grpc_slice *host_slice_ptr = NULL; - char* tmp_str = NULL; + char *tmp_str = NULL; if (host != Qnil) { - host_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host)); + host_slice = + grpc_slice_from_copied_buffer(RSTRING_PTR(host), RSTRING_LEN(host)); host_slice_ptr = &host_slice; } if (mask != Qnil) { @@ -277,17 +359,18 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, return Qnil; } - method_slice = grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method)); + method_slice = + grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method)); call = grpc_channel_create_call(ch, parent_call, flags, cq, method_slice, - host_slice_ptr, grpc_rb_time_timeval( - deadline, - /* absolute time */ 0), NULL); + host_slice_ptr, + grpc_rb_time_timeval(deadline, + /* absolute time */ 0), + NULL); if (call == NULL) { tmp_str = grpc_slice_to_c_string(method_slice); - rb_raise(rb_eRuntimeError, "cannot create call with method %s", - tmp_str); + rb_raise(rb_eRuntimeError, "cannot create call with method %s", tmp_str); return Qnil; } @@ -304,7 +387,6 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, return res; } - /* Closes the channel, calling it's destroy method */ static VALUE grpc_rb_channel_destroy(VALUE self) { grpc_rb_channel *wrapper = NULL; @@ -313,19 +395,18 @@ static VALUE grpc_rb_channel_destroy(VALUE self) { TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; if (ch != NULL) { - grpc_channel_destroy(ch); + grpc_rb_channel_safe_destroy(wrapper); wrapper->wrapped = NULL; } return Qnil; } - /* Called to obtain the target that this channel accesses. */ static VALUE grpc_rb_channel_get_target(VALUE self) { grpc_rb_channel *wrapper = NULL; VALUE res = Qnil; - char* target = NULL; + char *target = NULL; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); target = grpc_channel_get_target(wrapper->wrapped); @@ -335,10 +416,122 @@ static VALUE grpc_rb_channel_get_target(VALUE self) { return res; } +// Either start polling channel connection state or signal that it's free to +// destroy. +// Not safe to call while a channel's connection state is polled. +static void grpc_rb_channel_try_register_connection_polling( + grpc_rb_channel *wrapper) { + grpc_connectivity_state conn_state; + gpr_timespec sleep_time = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); + + GPR_ASSERT(wrapper); + GPR_ASSERT(wrapper->wrapped); + gpr_mu_lock(&wrapper->channel_mu); + if (wrapper->request_safe_destroy) { + wrapper->safe_to_destroy = 1; + gpr_cv_broadcast(&wrapper->channel_cv); + gpr_mu_unlock(&wrapper->channel_mu); + return; + } + gpr_mu_lock(&global_connection_polling_mu); + + conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); + if (conn_state != wrapper->current_connectivity_state) { + wrapper->current_connectivity_state = conn_state; + gpr_cv_broadcast(&wrapper->channel_cv); + } + // avoid posting work to the channel polling cq if it's been shutdown + if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) { + grpc_channel_watch_connectivity_state( + wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper); + } else { + wrapper->safe_to_destroy = 1; + gpr_cv_broadcast(&wrapper->channel_cv); + } + gpr_mu_unlock(&global_connection_polling_mu); + gpr_mu_unlock(&wrapper->channel_mu); +} + +// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized +static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { + gpr_mu_lock(&wrapper->channel_mu); + wrapper->request_safe_destroy = 1; + + while (!wrapper->safe_to_destroy) { + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, + gpr_inf_future(GPR_CLOCK_REALTIME)); + } + GPR_ASSERT(wrapper->safe_to_destroy); + gpr_mu_unlock(&wrapper->channel_mu); + + grpc_channel_destroy(wrapper->wrapped); +} + +// Note this loop breaks out with a single call of +// "grpc_rb_event_unblocking_func". +// This assumes that a ruby call the unblocking func +// indicates process shutdown. +// In the worst case, this stops polling channel connectivity +// early and falls back to current behavior. +static void *run_poll_channels_loop_no_gil(void *arg) { + grpc_event event; + (void)arg; + for (;;) { + event = grpc_completion_queue_next( + channel_polling_cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + if (event.type == GRPC_QUEUE_SHUTDOWN) { + break; + } + if (event.type == GRPC_OP_COMPLETE) { + grpc_rb_channel_try_register_connection_polling((grpc_rb_channel *)event.tag); + } + } + grpc_completion_queue_destroy(channel_polling_cq); + gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop"); + return NULL; +} + +// Notify the channel polling loop to cleanup and shutdown. +static void grpc_rb_event_unblocking_func(void *arg) { + (void)arg; + gpr_mu_lock(&global_connection_polling_mu); + gpr_log(GPR_DEBUG, "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting connection polling"); + abort_channel_polling = 1; + grpc_completion_queue_shutdown(channel_polling_cq); + gpr_mu_unlock(&global_connection_polling_mu); +} + +// Poll channel connectivity states in background thread without the GIL. +static VALUE run_poll_channels_loop(VALUE arg) { + (void)arg; + gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); + rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL, + grpc_rb_event_unblocking_func, NULL); + return Qnil; +} + +/* Temporary fix for + * https://github.com/GoogleCloudPlatform/google-cloud-ruby/issues/899. + * Transports in idle channels can get destroyed. Normally c-core re-connects, + * but in grpc-ruby core never gets a thread until an RPC is made, because ruby + * only calls c-core's "completion_queu_pluck" API. + * This uses a global background thread that calls + * "completion_queue_next" on registered "watch_channel_connectivity_state" + * calls - so that c-core can reconnect if needed, when there aren't any RPC's. + * TODO(apolcyn) remove this when core handles new RPCs on dead connections. + */ +static void start_poll_channels_loop() { + channel_polling_cq = grpc_completion_queue_create(NULL); + gpr_mu_init(&global_connection_polling_mu); + abort_channel_polling = 0; + rb_thread_create(run_poll_channels_loop, NULL); +} + static void Init_grpc_propagate_masks() { /* Constants representing call propagation masks in grpc.h */ - VALUE grpc_rb_mPropagateMasks = rb_define_module_under( - grpc_rb_mGrpcCore, "PropagateMasks"); + VALUE grpc_rb_mPropagateMasks = + rb_define_module_under(grpc_rb_mGrpcCore, "PropagateMasks"); rb_define_const(grpc_rb_mPropagateMasks, "DEADLINE", UINT2NUM(GRPC_PROPAGATE_DEADLINE)); rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_STATS_CONTEXT", @@ -353,8 +546,8 @@ static void Init_grpc_propagate_masks() { static void Init_grpc_connectivity_states() { /* Constants representing call propagation masks in grpc.h */ - VALUE grpc_rb_mConnectivityStates = rb_define_module_under( - grpc_rb_mGrpcCore, "ConnectivityStates"); + VALUE grpc_rb_mConnectivityStates = + rb_define_module_under(grpc_rb_mGrpcCore, "ConnectivityStates"); rb_define_const(grpc_rb_mConnectivityStates, "IDLE", LONG2NUM(GRPC_CHANNEL_IDLE)); rb_define_const(grpc_rb_mConnectivityStates, "CONNECTING", @@ -382,12 +575,11 @@ void Init_grpc_channel() { /* Add ruby analogues of the Channel methods. */ rb_define_method(grpc_rb_cChannel, "connectivity_state", - grpc_rb_channel_get_connectivity_state, - -1); + grpc_rb_channel_get_connectivity_state, -1); rb_define_method(grpc_rb_cChannel, "watch_connectivity_state", - grpc_rb_channel_watch_connectivity_state, 4); - rb_define_method(grpc_rb_cChannel, "create_call", - grpc_rb_channel_create_call, 5); + grpc_rb_channel_watch_connectivity_state, 2); + rb_define_method(grpc_rb_cChannel, "create_call", grpc_rb_channel_create_call, + 5); rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0); rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0); rb_define_alias(grpc_rb_cChannel, "close", "destroy"); @@ -405,6 +597,7 @@ void Init_grpc_channel() { id_insecure_channel = rb_intern("this_channel_is_insecure"); Init_grpc_propagate_masks(); Init_grpc_connectivity_states(); + start_poll_channels_loop(); } /* Gets the wrapped channel from the ruby wrapper */ diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb new file mode 100644 index 0000000000..940d68b9b0 --- /dev/null +++ b/src/ruby/spec/channel_connection_spec.rb @@ -0,0 +1,141 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require 'grpc' + +# A test message +class EchoMsg + def self.marshal(_o) + '' + end + + def self.unmarshal(_o) + EchoMsg.new + end +end + +# A test service with an echo implementation. +class EchoService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + attr_reader :received_md + + def initialize(**kw) + @trailing_metadata = kw + @received_md = [] + end + + def an_rpc(req, call) + GRPC.logger.info('echo service received a request') + call.output_metadata.update(@trailing_metadata) + @received_md << call.metadata unless call.metadata.nil? + req + end +end + +EchoStub = EchoService.rpc_stub_class + +def start_server(port = 0) + @srv = GRPC::RpcServer.new + server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure) + @srv.handle(EchoService) + @server_thd = Thread.new { @srv.run } + @srv.wait_till_running + server_port +end + +def stop_server + expect(@srv.stopped?).to be(false) + @srv.stop + @server_thd.join + expect(@srv.stopped?).to be(true) +end + +describe 'channel connection behavior' do + it 'the client channel handles temporary loss of a transport' do + port = start_server + stub = EchoStub.new("localhost:#{port}", :this_channel_is_insecure) + req = EchoMsg.new + expect(stub.an_rpc(req)).to be_a(EchoMsg) + stop_server + sleep 1 + # TODO(apolcyn) grabbing the same port might fail, is this stable enough? + start_server(port) + expect(stub.an_rpc(req)).to be_a(EchoMsg) + stop_server + end + + it 'observably connects and reconnects to transient server' \ + ' when using the channel state API' do + port = start_server + ch = GRPC::Core::Channel.new("localhost:#{port}", {}, + :this_channel_is_insecure) + + expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE) + + state = ch.connectivity_state(true) + + count = 0 + while count < 20 && state != GRPC::Core::ConnectivityStates::READY + ch.watch_connectivity_state(state, Time.now + 60) + state = ch.connectivity_state(true) + count += 1 + end + + expect(state).to be(GRPC::Core::ConnectivityStates::READY) + + stop_server + + state = ch.connectivity_state + + count = 0 + while count < 20 && state == GRPC::Core::ConnectivityStates::READY + ch.watch_connectivity_state(state, Time.now + 60) + state = ch.connectivity_state + count += 1 + end + + expect(state).to_not be(GRPC::Core::ConnectivityStates::READY) + + start_server(port) + + state = ch.connectivity_state(true) + + count = 0 + while count < 20 && state != GRPC::Core::ConnectivityStates::READY + ch.watch_connectivity_state(state, Time.now + 60) + state = ch.connectivity_state(true) + count += 1 + end + + expect(state).to be(GRPC::Core::ConnectivityStates::READY) + + stop_server + end +end diff --git a/src/ruby/spec/channel_spec.rb b/src/ruby/spec/channel_spec.rb index 740eac631a..a289a00f04 100644 --- a/src/ruby/spec/channel_spec.rb +++ b/src/ruby/spec/channel_spec.rb @@ -153,6 +153,35 @@ describe GRPC::Core::Channel do end end + describe '#connectivity_state' do + it 'returns an enum' do + ch = GRPC::Core::Channel.new(fake_host, nil, :this_channel_is_insecure) + valid_states = [ + GRPC::Core::ConnectivityStates::IDLE, + GRPC::Core::ConnectivityStates::CONNECTING, + GRPC::Core::ConnectivityStates::READY, + GRPC::Core::ConnectivityStates::TRANSIENT_FAILURE, + GRPC::Core::ConnectivityStates::FATAL_FAILURE + ] + + expect(valid_states).to include(ch.connectivity_state) + end + + it 'returns an enum when trying to connect' do + ch = GRPC::Core::Channel.new(fake_host, nil, :this_channel_is_insecure) + ch.connectivity_state(true) + valid_states = [ + GRPC::Core::ConnectivityStates::IDLE, + GRPC::Core::ConnectivityStates::CONNECTING, + GRPC::Core::ConnectivityStates::READY, + GRPC::Core::ConnectivityStates::TRANSIENT_FAILURE, + GRPC::Core::ConnectivityStates::FATAL_FAILURE + ] + + expect(valid_states).to include(ch.connectivity_state) + end + end + describe '::SSL_TARGET' do it 'is a symbol' do expect(GRPC::Core::Channel::SSL_TARGET).to be_a(Symbol) |