diff options
author | Tim Emiola <temiola@google.com> | 2015-04-15 08:09:25 -0700 |
---|---|---|
committer | Tim Emiola <temiola@google.com> | 2015-04-15 17:29:56 -0700 |
commit | 1b39916bbaaef36662ce97dd9d7996e8ef6092a8 (patch) | |
tree | 07b8c3c77d6ce25f47b76e7fadd922f644baa17b /src | |
parent | 72d70fc0af2c30609ef765ec938cc231d9dfe255 (diff) |
Adds an explicit Cancellation exception
- uses Forwardable to provide access the @call within an ActiveCall
- removes redundant methods from ActiveCall
Diffstat (limited to 'src')
-rw-r--r-- | src/ruby/lib/grpc/errors.rb | 4 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 94 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_server_spec.rb | 4 |
3 files changed, 44 insertions, 58 deletions
diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb index b23793730f..35e9c02a94 100644 --- a/src/ruby/lib/grpc/errors.rb +++ b/src/ruby/lib/grpc/errors.rb @@ -54,4 +54,8 @@ module GRPC Status.new(code, details) end end + + # Cancelled is an exception class that indicates that an rpc was cancelled. + class Cancelled < StandardError + end end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 489349c2c9..8d63de4145 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -30,6 +30,22 @@ require 'forwardable' require 'grpc/generic/bidi_call' +class Struct + # BatchResult is the struct returned by calls to call#start_batch. + class BatchResult + # check_status returns the status, raising an error if the status + # is non-nil and not OK. + def check_status + return nil if status.nil? + fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED + if status.code != GRPC::Core::StatusCodes::OK + fail GRPC::BadStatus.new(status.code, status.details) + end + status + end + end +end + # GRPC contains the General RPC module. module GRPC # The ActiveCall class provides simple methods for sending marshallable @@ -38,7 +54,9 @@ module GRPC include Core::StatusCodes include Core::TimeConsts include Core::CallOps + extend Forwardable attr_reader(:deadline) + def_delegators :@call, :cancel, :metadata # client_invoke begins a client invocation. # @@ -101,50 +119,6 @@ module GRPC @metadata_tag = metadata_tag end - # Obtains the status of the call. - # - # this value is nil until the call completes - # @return this call's status - def status - @call.status - end - - # Obtains the metadata of the call. - # - # At the start of the call this will be nil. During the call this gets - # some values as soon as the other end of the connection acknowledges the - # request. - # - # @return this calls's metadata - def metadata - @call.metadata - end - - # Cancels the call. - # - # Cancels the call. The call does not return any result, but once this it - # has been called, the call should eventually terminate. Due to potential - # races between the execution of the cancel and the in-flight request, the - # result of the call after calling #cancel is indeterminate: - # - # - the call may terminate with a BadStatus exception, with code=CANCELLED - # - the call may terminate with OK Status, and return a response - # - the call may terminate with a different BadStatus exception if that - # was happening - def cancel - @call.cancel - end - - # indicates if the call is shutdown - def shutdown - @shutdown ||= false - end - - # indicates if the call is cancelled. - def cancelled - @cancelled ||= false - end - # multi_req_view provides a restricted view of this ActiveCall for use # in a server client-streaming handler. def multi_req_view @@ -176,9 +150,9 @@ module GRPC SEND_CLOSE_FROM_CLIENT => nil } ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished - @call.run_batch(@cq, self, INFINITE_FUTURE, ops) + batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) return unless assert_finished - @call.status + batch_result.check_status end # finished waits until a client call is completed. @@ -192,17 +166,12 @@ module GRPC elsif !batch_result.metadata.nil? @call.metadata.merge!(batch_result.metadata) end - if batch_result.status.code != Core::StatusCodes::OK - fail BadStatus.new(batch_result.status.code, - batch_result.status.details) - end - batch_result + batch_result.check_status end # remote_send sends a request to the remote endpoint. # - # It blocks until the remote endpoint acknowledges by sending a - # WRITE_ACCEPTED. req can be marshalled already. + # It blocks until the remote endpoint accepts the message. # # @param req [Object, String] the object to send or it's marshal form. # @param marshalled [false, true] indicates if the object is already @@ -332,6 +301,9 @@ module GRPC response = remote_read finished unless response.is_a? Struct::Status response + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # client_streamer sends a stream of requests to a GRPC server, and @@ -355,6 +327,9 @@ module GRPC response = remote_read finished unless response.is_a? Struct::Status response + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # server_streamer sends one request to the GRPC server, which yields a @@ -381,6 +356,9 @@ module GRPC replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # bidi_streamer sends a stream of requests to the GRPC server, and yields @@ -416,6 +394,9 @@ module GRPC start_call(**kw) unless @started bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd.run_on_client(requests, &blk) + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # run_server_bidi orchestrates a BiDi stream processing on a server. @@ -436,9 +417,10 @@ module GRPC private + # Starts the call if not already started def start_call(**kw) - tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) - @finished_tag, @read_metadata_tag = tags + return if @started + @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) @started = true end @@ -466,6 +448,6 @@ module GRPC # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. Operation = view_class(:cancel, :cancelled, :deadline, :execute, - :metadata, :status) + :metadata, :status, :start_call) end end diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 25ad6f7e59..1323bacfa6 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -396,11 +396,11 @@ describe GRPC::RpcServer do req = EchoMsg.new stub = SlowStub.new(@host, **@client_opts) op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true) - cancel_thread = Thread.new do + Thread.new do # cancel the call sleep 0.1 op.cancel end - expect{op.execute}.to raise_error GRPC::Cancelled + expect { op.execute }.to raise_error GRPC::Cancelled @srv.stop t.join end |