aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2015-04-16 08:28:49 -0700
committerGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2015-04-16 08:28:49 -0700
commit45617040082e194e26e9c3e18ef795ca349be781 (patch)
tree8e141a254903a13561c430f3b643fc2964be45d7 /src
parent0afc3addf4b438d32ab9a7424c0ae588a1c8e212 (diff)
parentb5bcca44df242f7c914e7cc73a67a981f135d3ea (diff)
Merge pull request #1293 from tbetbetbe/grpc_ruby_add_interop_cancellation_tests
Grpc ruby add interop cancellation tests
Diffstat (limited to 'src')
-rwxr-xr-xsrc/ruby/bin/interop/interop_client.rb32
-rw-r--r--src/ruby/lib/grpc/errors.rb4
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb94
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb32
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb17
5 files changed, 105 insertions, 74 deletions
diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb
index af7a1d5b15..6f1fe2614f 100755
--- a/src/ruby/bin/interop/interop_client.rb
+++ b/src/ruby/bin/interop/interop_client.rb
@@ -136,12 +136,14 @@ class PingPongPlayer
include Grpc::Testing::PayloadType
attr_accessor :assertions # required by Minitest::Assertions
attr_accessor :queue
+ attr_accessor :canceller_op
# reqs is the enumerator over the requests
def initialize(msg_sizes)
@queue = Queue.new
@msg_sizes = msg_sizes
@assertions = 0 # required by Minitest::Assertions
+ @canceller_op = nil # used to cancel after the first response
end
def each_item
@@ -155,12 +157,15 @@ class PingPongPlayer
response_parameters: [p_cls.new(size: resp_size)])
yield req
resp = @queue.pop
- assert_equal(:COMPRESSABLE, resp.payload.type,
- 'payload type is wrong')
+ assert_equal(:COMPRESSABLE, resp.payload.type, 'payload type is wrong')
assert_equal(resp_size, resp.payload.body.length,
- 'payload body #{i} has the wrong length')
+ "payload body #{count} has the wrong length")
p "OK: ping_pong #{count}"
count += 1
+ unless @canceller_op.nil?
+ canceller_op.cancel
+ break
+ end
end
end
end
@@ -260,6 +265,27 @@ class NamedTests
p 'OK: ping_pong'
end
+ def cancel_after_begin
+ msg_sizes = [27_182, 8, 1828, 45_904]
+ reqs = msg_sizes.map do |x|
+ req = Payload.new(body: nulls(x))
+ StreamingInputCallRequest.new(payload: req)
+ end
+ op = @stub.streaming_input_call(reqs, return_op: true)
+ op.cancel
+ assert_raises(GRPC::Cancelled) { op.execute }
+ p 'OK: cancel_after_begin'
+ end
+
+ def cancel_after_first
+ msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]]
+ ppp = PingPongPlayer.new(msg_sizes)
+ op = @stub.full_duplex_call(ppp.each_item, return_op: true)
+ ppp.canceller_op = op # causes ppp to cancel after the 1st message
+ assert_raises(GRPC::Cancelled) { op.execute.each { |r| ppp.queue.push(r) } }
+ p 'OK: cancel_after_first'
+ end
+
def all
all_methods = NamedTests.instance_methods(false).map(&:to_s)
all_methods.each do |m|
diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb
index b23793730f..35e9c02a94 100644
--- a/src/ruby/lib/grpc/errors.rb
+++ b/src/ruby/lib/grpc/errors.rb
@@ -54,4 +54,8 @@ module GRPC
Status.new(code, details)
end
end
+
+ # Cancelled is an exception class that indicates that an rpc was cancelled.
+ class Cancelled < StandardError
+ end
end
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 489349c2c9..8d63de4145 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -30,6 +30,22 @@
require 'forwardable'
require 'grpc/generic/bidi_call'
+class Struct
+ # BatchResult is the struct returned by calls to call#start_batch.
+ class BatchResult
+ # check_status returns the status, raising an error if the status
+ # is non-nil and not OK.
+ def check_status
+ return nil if status.nil?
+ fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
+ if status.code != GRPC::Core::StatusCodes::OK
+ fail GRPC::BadStatus.new(status.code, status.details)
+ end
+ status
+ end
+ end
+end
+
# GRPC contains the General RPC module.
module GRPC
# The ActiveCall class provides simple methods for sending marshallable
@@ -38,7 +54,9 @@ module GRPC
include Core::StatusCodes
include Core::TimeConsts
include Core::CallOps
+ extend Forwardable
attr_reader(:deadline)
+ def_delegators :@call, :cancel, :metadata
# client_invoke begins a client invocation.
#
@@ -101,50 +119,6 @@ module GRPC
@metadata_tag = metadata_tag
end
- # Obtains the status of the call.
- #
- # this value is nil until the call completes
- # @return this call's status
- def status
- @call.status
- end
-
- # Obtains the metadata of the call.
- #
- # At the start of the call this will be nil. During the call this gets
- # some values as soon as the other end of the connection acknowledges the
- # request.
- #
- # @return this calls's metadata
- def metadata
- @call.metadata
- end
-
- # Cancels the call.
- #
- # Cancels the call. The call does not return any result, but once this it
- # has been called, the call should eventually terminate. Due to potential
- # races between the execution of the cancel and the in-flight request, the
- # result of the call after calling #cancel is indeterminate:
- #
- # - the call may terminate with a BadStatus exception, with code=CANCELLED
- # - the call may terminate with OK Status, and return a response
- # - the call may terminate with a different BadStatus exception if that
- # was happening
- def cancel
- @call.cancel
- end
-
- # indicates if the call is shutdown
- def shutdown
- @shutdown ||= false
- end
-
- # indicates if the call is cancelled.
- def cancelled
- @cancelled ||= false
- end
-
# multi_req_view provides a restricted view of this ActiveCall for use
# in a server client-streaming handler.
def multi_req_view
@@ -176,9 +150,9 @@ module GRPC
SEND_CLOSE_FROM_CLIENT => nil
}
ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
- @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+ batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
return unless assert_finished
- @call.status
+ batch_result.check_status
end
# finished waits until a client call is completed.
@@ -192,17 +166,12 @@ module GRPC
elsif !batch_result.metadata.nil?
@call.metadata.merge!(batch_result.metadata)
end
- if batch_result.status.code != Core::StatusCodes::OK
- fail BadStatus.new(batch_result.status.code,
- batch_result.status.details)
- end
- batch_result
+ batch_result.check_status
end
# remote_send sends a request to the remote endpoint.
#
- # It blocks until the remote endpoint acknowledges by sending a
- # WRITE_ACCEPTED. req can be marshalled already.
+ # It blocks until the remote endpoint accepts the message.
#
# @param req [Object, String] the object to send or it's marshal form.
# @param marshalled [false, true] indicates if the object is already
@@ -332,6 +301,9 @@ module GRPC
response = remote_read
finished unless response.is_a? Struct::Status
response
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# client_streamer sends a stream of requests to a GRPC server, and
@@ -355,6 +327,9 @@ module GRPC
response = remote_read
finished unless response.is_a? Struct::Status
response
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# server_streamer sends one request to the GRPC server, which yields a
@@ -381,6 +356,9 @@ module GRPC
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -416,6 +394,9 @@ module GRPC
start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_client(requests, &blk)
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -436,9 +417,10 @@ module GRPC
private
+ # Starts the call if not already started
def start_call(**kw)
- tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
- @finished_tag, @read_metadata_tag = tags
+ return if @started
+ @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
@started = true
end
@@ -466,6 +448,6 @@ module GRPC
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
- :metadata, :status)
+ :metadata, :status, :start_call)
end
end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 1c1b3b0db7..b813ab5b54 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -78,13 +78,11 @@ module GRPC
# @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield
def run_on_client(requests, &blk)
- enq_th = start_write_loop(requests)
- loop_th = start_read_loop
+ @enq_th = start_write_loop(requests)
+ @loop_th = start_read_loop
replies = each_queued_msg
return replies if blk.nil?
replies.each { |r| blk.call(r) }
- enq_th.join
- loop_th.join
end
# Begins orchestration of the Bidi stream for a server generating replies.
@@ -100,10 +98,8 @@ module GRPC
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
- enq_th = start_write_loop(replys, is_client: false)
- loop_th = start_read_loop
- loop_th.join
- enq_th.join
+ @enq_th = start_write_loop(replys, is_client: false)
+ @loop_th = start_read_loop
end
private
@@ -122,10 +118,13 @@ module GRPC
logger.debug("each_queued_msg: msg##{count}")
count += 1
req = @readq.pop
+ logger.debug("each_queued_msg: req = #{req}")
throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
end
+ @loop_th.join
+ @enq_th.join
end
# during bidi-streaming, read the requests to send from a separate thread
@@ -136,20 +135,23 @@ module GRPC
begin
count = 0
requests.each do |req|
+ logger.debug("bidi-write_loop: #{count}")
count += 1
payload = @marshal.call(req)
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload)
end
if is_client
- logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
- @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
- SEND_CLOSE_FROM_CLIENT => nil,
- RECV_STATUS_ON_CLIENT => nil)
+ logger.debug("bidi-write-loop: sent #{count}, waiting to finish")
+ batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_CLOSE_FROM_CLIENT => nil,
+ RECV_STATUS_ON_CLIENT => nil)
+ batch_result.check_status
end
rescue StandardError => e
- logger.warn('bidi: write_loop failed')
+ logger.warn('bidi-write_loop: failed')
logger.warn(e)
+ raise e
end
end
end
@@ -163,7 +165,7 @@ module GRPC
# queue the initial read before beginning the loop
loop do
- logger.debug("waiting for read #{count}")
+ logger.debug("bidi-read_loop: #{count}")
count += 1
# TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
@@ -171,7 +173,7 @@ module GRPC
# handle the next message
if batch_result.message.nil?
@readq.push(END_OF_READS)
- logger.debug('done reading!')
+ logger.debug('bidi-read-loop: done reading!')
break
end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 245999ea03..1323bacfa6 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -388,6 +388,23 @@ describe GRPC::RpcServer do
t.join
end
+ it 'should handle cancellation correctly', server: true do
+ service = SlowService.new
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ stub = SlowStub.new(@host, **@client_opts)
+ op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
+ Thread.new do # cancel the call
+ sleep 0.1
+ op.cancel
+ end
+ expect { op.execute }.to raise_error GRPC::Cancelled
+ @srv.stop
+ t.join
+ end
+
it 'should receive updated metadata', server: true do
service = EchoService.new
@srv.handle(service)