diff options
author | Jan Tattermusch <jtattermusch@users.noreply.github.com> | 2015-04-16 08:28:49 -0700 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@users.noreply.github.com> | 2015-04-16 08:28:49 -0700 |
commit | 45617040082e194e26e9c3e18ef795ca349be781 (patch) | |
tree | 8e141a254903a13561c430f3b643fc2964be45d7 | |
parent | 0afc3addf4b438d32ab9a7424c0ae588a1c8e212 (diff) | |
parent | b5bcca44df242f7c914e7cc73a67a981f135d3ea (diff) |
Merge pull request #1293 from tbetbetbe/grpc_ruby_add_interop_cancellation_tests
Grpc ruby add interop cancellation tests
-rwxr-xr-x | src/ruby/bin/interop/interop_client.rb | 32 | ||||
-rw-r--r-- | src/ruby/lib/grpc/errors.rb | 4 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 94 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/bidi_call.rb | 32 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_server_spec.rb | 17 |
5 files changed, 105 insertions, 74 deletions
diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index af7a1d5b15..6f1fe2614f 100755 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -136,12 +136,14 @@ class PingPongPlayer include Grpc::Testing::PayloadType attr_accessor :assertions # required by Minitest::Assertions attr_accessor :queue + attr_accessor :canceller_op # reqs is the enumerator over the requests def initialize(msg_sizes) @queue = Queue.new @msg_sizes = msg_sizes @assertions = 0 # required by Minitest::Assertions + @canceller_op = nil # used to cancel after the first response end def each_item @@ -155,12 +157,15 @@ class PingPongPlayer response_parameters: [p_cls.new(size: resp_size)]) yield req resp = @queue.pop - assert_equal(:COMPRESSABLE, resp.payload.type, - 'payload type is wrong') + assert_equal(:COMPRESSABLE, resp.payload.type, 'payload type is wrong') assert_equal(resp_size, resp.payload.body.length, - 'payload body #{i} has the wrong length') + "payload body #{count} has the wrong length") p "OK: ping_pong #{count}" count += 1 + unless @canceller_op.nil? + canceller_op.cancel + break + end end end end @@ -260,6 +265,27 @@ class NamedTests p 'OK: ping_pong' end + def cancel_after_begin + msg_sizes = [27_182, 8, 1828, 45_904] + reqs = msg_sizes.map do |x| + req = Payload.new(body: nulls(x)) + StreamingInputCallRequest.new(payload: req) + end + op = @stub.streaming_input_call(reqs, return_op: true) + op.cancel + assert_raises(GRPC::Cancelled) { op.execute } + p 'OK: cancel_after_begin' + end + + def cancel_after_first + msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]] + ppp = PingPongPlayer.new(msg_sizes) + op = @stub.full_duplex_call(ppp.each_item, return_op: true) + ppp.canceller_op = op # causes ppp to cancel after the 1st message + assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } } + p 'OK: cancel_after_first' + end + def all all_methods = NamedTests.instance_methods(false).map(&:to_s) all_methods.each do |m| diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb index b23793730f..35e9c02a94 100644 --- a/src/ruby/lib/grpc/errors.rb +++ b/src/ruby/lib/grpc/errors.rb @@ -54,4 +54,8 @@ module GRPC Status.new(code, details) end end + + # Cancelled is an exception class that indicates that an rpc was cancelled. + class Cancelled < StandardError + end end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 489349c2c9..8d63de4145 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -30,6 +30,22 @@ require 'forwardable' require 'grpc/generic/bidi_call' +class Struct + # BatchResult is the struct returned by calls to call#start_batch. + class BatchResult + # check_status returns the status, raising an error if the status + # is non-nil and not OK. + def check_status + return nil if status.nil? + fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED + if status.code != GRPC::Core::StatusCodes::OK + fail GRPC::BadStatus.new(status.code, status.details) + end + status + end + end +end + # GRPC contains the General RPC module. module GRPC # The ActiveCall class provides simple methods for sending marshallable @@ -38,7 +54,9 @@ module GRPC include Core::StatusCodes include Core::TimeConsts include Core::CallOps + extend Forwardable attr_reader(:deadline) + def_delegators :@call, :cancel, :metadata # client_invoke begins a client invocation. # @@ -101,50 +119,6 @@ module GRPC @metadata_tag = metadata_tag end - # Obtains the status of the call. - # - # this value is nil until the call completes - # @return this call's status - def status - @call.status - end - - # Obtains the metadata of the call. - # - # At the start of the call this will be nil. During the call this gets - # some values as soon as the other end of the connection acknowledges the - # request. - # - # @return this calls's metadata - def metadata - @call.metadata - end - - # Cancels the call. - # - # Cancels the call. The call does not return any result, but once this it - # has been called, the call should eventually terminate. Due to potential - # races between the execution of the cancel and the in-flight request, the - # result of the call after calling #cancel is indeterminate: - # - # - the call may terminate with a BadStatus exception, with code=CANCELLED - # - the call may terminate with OK Status, and return a response - # - the call may terminate with a different BadStatus exception if that - # was happening - def cancel - @call.cancel - end - - # indicates if the call is shutdown - def shutdown - @shutdown ||= false - end - - # indicates if the call is cancelled. - def cancelled - @cancelled ||= false - end - # multi_req_view provides a restricted view of this ActiveCall for use # in a server client-streaming handler. def multi_req_view @@ -176,9 +150,9 @@ module GRPC SEND_CLOSE_FROM_CLIENT => nil } ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished - @call.run_batch(@cq, self, INFINITE_FUTURE, ops) + batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) return unless assert_finished - @call.status + batch_result.check_status end # finished waits until a client call is completed. @@ -192,17 +166,12 @@ module GRPC elsif !batch_result.metadata.nil? @call.metadata.merge!(batch_result.metadata) end - if batch_result.status.code != Core::StatusCodes::OK - fail BadStatus.new(batch_result.status.code, - batch_result.status.details) - end - batch_result + batch_result.check_status end # remote_send sends a request to the remote endpoint. # - # It blocks until the remote endpoint acknowledges by sending a - # WRITE_ACCEPTED. req can be marshalled already. + # It blocks until the remote endpoint accepts the message. # # @param req [Object, String] the object to send or it's marshal form. # @param marshalled [false, true] indicates if the object is already @@ -332,6 +301,9 @@ module GRPC response = remote_read finished unless response.is_a? Struct::Status response + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # client_streamer sends a stream of requests to a GRPC server, and @@ -355,6 +327,9 @@ module GRPC response = remote_read finished unless response.is_a? Struct::Status response + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # server_streamer sends one request to the GRPC server, which yields a @@ -381,6 +356,9 @@ module GRPC replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # bidi_streamer sends a stream of requests to the GRPC server, and yields @@ -416,6 +394,9 @@ module GRPC start_call(**kw) unless @started bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd.run_on_client(requests, &blk) + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # run_server_bidi orchestrates a BiDi stream processing on a server. @@ -436,9 +417,10 @@ module GRPC private + # Starts the call if not already started def start_call(**kw) - tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) - @finished_tag, @read_metadata_tag = tags + return if @started + @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) @started = true end @@ -466,6 +448,6 @@ module GRPC # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. Operation = view_class(:cancel, :cancelled, :deadline, :execute, - :metadata, :status) + :metadata, :status, :start_call) end end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 1c1b3b0db7..b813ab5b54 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -78,13 +78,11 @@ module GRPC # @param requests the Enumerable of requests to send # @return an Enumerator of requests to yield def run_on_client(requests, &blk) - enq_th = start_write_loop(requests) - loop_th = start_read_loop + @enq_th = start_write_loop(requests) + @loop_th = start_read_loop replies = each_queued_msg return replies if blk.nil? replies.each { |r| blk.call(r) } - enq_th.join - loop_th.join end # Begins orchestration of the Bidi stream for a server generating replies. @@ -100,10 +98,8 @@ module GRPC # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) - enq_th = start_write_loop(replys, is_client: false) - loop_th = start_read_loop - loop_th.join - enq_th.join + @enq_th = start_write_loop(replys, is_client: false) + @loop_th = start_read_loop end private @@ -122,10 +118,13 @@ module GRPC logger.debug("each_queued_msg: msg##{count}") count += 1 req = @readq.pop + logger.debug("each_queued_msg: req = #{req}") throw req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req end + @loop_th.join + @enq_th.join end # during bidi-streaming, read the requests to send from a separate thread @@ -136,20 +135,23 @@ module GRPC begin count = 0 requests.each do |req| + logger.debug("bidi-write_loop: #{count}") count += 1 payload = @marshal.call(req) @call.run_batch(@cq, write_tag, INFINITE_FUTURE, SEND_MESSAGE => payload) end if is_client - logger.debug("bidi-client: sent #{count} reqs, waiting to finish") - @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_CLOSE_FROM_CLIENT => nil, - RECV_STATUS_ON_CLIENT => nil) + logger.debug("bidi-write-loop: sent #{count}, waiting to finish") + batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil, + RECV_STATUS_ON_CLIENT => nil) + batch_result.check_status end rescue StandardError => e - logger.warn('bidi: write_loop failed') + logger.warn('bidi-write_loop: failed') logger.warn(e) + raise e end end end @@ -163,7 +165,7 @@ module GRPC # queue the initial read before beginning the loop loop do - logger.debug("waiting for read #{count}") + logger.debug("bidi-read_loop: #{count}") count += 1 # TODO: ensure metadata is read if available, currently it's not batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, @@ -171,7 +173,7 @@ module GRPC # handle the next message if batch_result.message.nil? @readq.push(END_OF_READS) - logger.debug('done reading!') + logger.debug('bidi-read-loop: done reading!') break end diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 245999ea03..1323bacfa6 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -388,6 +388,23 @@ describe GRPC::RpcServer do t.join end + it 'should handle cancellation correctly', server: true do + service = SlowService.new + @srv.handle(service) + t = Thread.new { @srv.run } + @srv.wait_till_running + req = EchoMsg.new + stub = SlowStub.new(@host, **@client_opts) + op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) + Thread.new do # cancel the call + sleep 0.1 + op.cancel + end + expect { op.execute }.to raise_error GRPC::Cancelled + @srv.stop + t.join + end + it 'should receive updated metadata', server: true do service = EchoService.new @srv.handle(service) |