aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/ruby/lib/grpc/errors.rb4
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb94
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb4
3 files changed, 44 insertions, 58 deletions
diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb
index b23793730f..35e9c02a94 100644
--- a/src/ruby/lib/grpc/errors.rb
+++ b/src/ruby/lib/grpc/errors.rb
@@ -54,4 +54,8 @@ module GRPC
Status.new(code, details)
end
end
+
+ # Cancelled is an exception class that indicates that an rpc was cancelled.
+ class Cancelled < StandardError
+ end
end
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 489349c2c9..8d63de4145 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -30,6 +30,22 @@
require 'forwardable'
require 'grpc/generic/bidi_call'
+class Struct
+ # BatchResult is the struct returned by calls to call#start_batch.
+ class BatchResult
+ # check_status returns the status, raising an error if the status
+ # is non-nil and not OK.
+ def check_status
+ return nil if status.nil?
+ fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
+ if status.code != GRPC::Core::StatusCodes::OK
+ fail GRPC::BadStatus.new(status.code, status.details)
+ end
+ status
+ end
+ end
+end
+
# GRPC contains the General RPC module.
module GRPC
# The ActiveCall class provides simple methods for sending marshallable
@@ -38,7 +54,9 @@ module GRPC
include Core::StatusCodes
include Core::TimeConsts
include Core::CallOps
+ extend Forwardable
attr_reader(:deadline)
+ def_delegators :@call, :cancel, :metadata
# client_invoke begins a client invocation.
#
@@ -101,50 +119,6 @@ module GRPC
@metadata_tag = metadata_tag
end
- # Obtains the status of the call.
- #
- # this value is nil until the call completes
- # @return this call's status
- def status
- @call.status
- end
-
- # Obtains the metadata of the call.
- #
- # At the start of the call this will be nil. During the call this gets
- # some values as soon as the other end of the connection acknowledges the
- # request.
- #
- # @return this calls's metadata
- def metadata
- @call.metadata
- end
-
- # Cancels the call.
- #
- # Cancels the call. The call does not return any result, but once this it
- # has been called, the call should eventually terminate. Due to potential
- # races between the execution of the cancel and the in-flight request, the
- # result of the call after calling #cancel is indeterminate:
- #
- # - the call may terminate with a BadStatus exception, with code=CANCELLED
- # - the call may terminate with OK Status, and return a response
- # - the call may terminate with a different BadStatus exception if that
- # was happening
- def cancel
- @call.cancel
- end
-
- # indicates if the call is shutdown
- def shutdown
- @shutdown ||= false
- end
-
- # indicates if the call is cancelled.
- def cancelled
- @cancelled ||= false
- end
-
# multi_req_view provides a restricted view of this ActiveCall for use
# in a server client-streaming handler.
def multi_req_view
@@ -176,9 +150,9 @@ module GRPC
SEND_CLOSE_FROM_CLIENT => nil
}
ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
- @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+ batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
return unless assert_finished
- @call.status
+ batch_result.check_status
end
# finished waits until a client call is completed.
@@ -192,17 +166,12 @@ module GRPC
elsif !batch_result.metadata.nil?
@call.metadata.merge!(batch_result.metadata)
end
- if batch_result.status.code != Core::StatusCodes::OK
- fail BadStatus.new(batch_result.status.code,
- batch_result.status.details)
- end
- batch_result
+ batch_result.check_status
end
# remote_send sends a request to the remote endpoint.
#
- # It blocks until the remote endpoint acknowledges by sending a
- # WRITE_ACCEPTED. req can be marshalled already.
+ # It blocks until the remote endpoint accepts the message.
#
# @param req [Object, String] the object to send or it's marshal form.
# @param marshalled [false, true] indicates if the object is already
@@ -332,6 +301,9 @@ module GRPC
response = remote_read
finished unless response.is_a? Struct::Status
response
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# client_streamer sends a stream of requests to a GRPC server, and
@@ -355,6 +327,9 @@ module GRPC
response = remote_read
finished unless response.is_a? Struct::Status
response
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# server_streamer sends one request to the GRPC server, which yields a
@@ -381,6 +356,9 @@ module GRPC
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -416,6 +394,9 @@ module GRPC
start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_client(requests, &blk)
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -436,9 +417,10 @@ module GRPC
private
+ # Starts the call if not already started
def start_call(**kw)
- tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
- @finished_tag, @read_metadata_tag = tags
+ return if @started
+ @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
@started = true
end
@@ -466,6 +448,6 @@ module GRPC
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
- :metadata, :status)
+ :metadata, :status, :start_call)
end
end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 25ad6f7e59..1323bacfa6 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -396,11 +396,11 @@ describe GRPC::RpcServer do
req = EchoMsg.new
stub = SlowStub.new(@host, **@client_opts)
op = stub.an_rpc(req, k1: 'v1', k2: 'v2', return_op: true)
- cancel_thread = Thread.new do
+ Thread.new do # cancel the call
sleep 0.1
op.cancel
end
- expect{op.execute}.to raise_error GRPC::Cancelled
+ expect { op.execute }.to raise_error GRPC::Cancelled
@srv.stop
t.join
end