aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/lib')
-rw-r--r--src/ruby/lib/grpc.rb2
-rw-r--r--src/ruby/lib/grpc/errors.rb15
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb275
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb92
-rw-r--r--src/ruby/lib/grpc/generic/client_stub.rb73
-rw-r--r--src/ruby/lib/grpc/generic/rpc_desc.rb19
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb423
-rw-r--r--src/ruby/lib/grpc/notifier.rb (renamed from src/ruby/lib/grpc/core/event.rb)34
-rw-r--r--src/ruby/lib/grpc/version.rb2
9 files changed, 473 insertions, 462 deletions
diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb
index dd02ef7666..80b5743e91 100644
--- a/src/ruby/lib/grpc.rb
+++ b/src/ruby/lib/grpc.rb
@@ -30,8 +30,8 @@
require 'grpc/errors'
require 'grpc/grpc'
require 'grpc/logconfig'
+require 'grpc/notifier'
require 'grpc/version'
-require 'grpc/core/event'
require 'grpc/core/time_consts'
require 'grpc/generic/active_call'
require 'grpc/generic/client_stub'
diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb
index 58944872b5..f1201c1704 100644
--- a/src/ruby/lib/grpc/errors.rb
+++ b/src/ruby/lib/grpc/errors.rb
@@ -31,23 +31,20 @@ require 'grpc'
# GRPC contains the General RPC module.
module GRPC
- # OutOfTime is an exception class that indicates that an RPC exceeded its
- # deadline.
- OutOfTime = Class.new(StandardError)
-
# BadStatus is an exception class that indicates that an error occurred at
# either end of a GRPC connection. When raised, it indicates that a status
# error should be returned to the other end of a GRPC connection; when
# caught it means that this end received a status error.
class BadStatus < StandardError
- attr_reader :code, :details
+ attr_reader :code, :details, :metadata
# @param code [Numeric] the status code
# @param details [String] the details of the exception
- def initialize(code, details = 'unknown cause')
+ def initialize(code, details = 'unknown cause', **kw)
super("#{code}:#{details}")
@code = code
@details = details
+ @metadata = kw
end
# Converts the exception to a GRPC::Status for use in the networking
@@ -55,7 +52,11 @@ module GRPC
#
# @return [Status] with the same code and details
def to_status
- Status.new(code, details)
+ Struct::Status.new(code, details, @metadata)
end
end
+
+ # Cancelled is an exception class that indicates that an rpc was cancelled.
+ class Cancelled < StandardError
+ end
end
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 6256330e88..947c39cd22 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -30,10 +30,23 @@
require 'forwardable'
require 'grpc/generic/bidi_call'
-def assert_event_type(ev, want)
- fail OutOfTime if ev.nil?
- got = ev.type
- fail "Unexpected rpc event: got #{got}, want #{want}" unless got == want
+class Struct
+ # BatchResult is the struct returned by calls to call#start_batch.
+ class BatchResult
+ # check_status returns the status, raising an error if the status
+ # is non-nil and not OK.
+ def check_status
+ return nil if status.nil?
+ fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
+ if status.code != GRPC::Core::StatusCodes::OK
+ # raise BadStatus, propagating the metadata if present.
+ md = status.metadata
+ with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }]
+ fail GRPC::BadStatus.new(status.code, status.details, **with_sym_keys)
+ end
+ status
+ end
+ end
end
# GRPC contains the General RPC module.
@@ -41,10 +54,12 @@ module GRPC
# The ActiveCall class provides simple methods for sending marshallable
# data to a call
class ActiveCall
- include Core::CompletionType
include Core::StatusCodes
include Core::TimeConsts
+ include Core::CallOps
+ extend Forwardable
attr_reader(:deadline)
+ def_delegators :@call, :cancel, :metadata
# client_invoke begins a client invocation.
#
@@ -61,15 +76,14 @@ module GRPC
# @param q [CompletionQueue] the completion queue
# @param deadline [Fixnum,TimeSpec] the deadline
def self.client_invoke(call, q, _deadline, **kw)
- fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
+ fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
- fail(ArgumentError, 'not a CompletionQueue')
+ fail(TypeError, '!Core::CompletionQueue')
end
- call.add_metadata(kw) if kw.length > 0
- client_metadata_read = Object.new
- finished_tag = Object.new
- call.invoke(q, client_metadata_read, finished_tag)
- [finished_tag, client_metadata_read]
+ metadata_tag = Object.new
+ call.run_batch(q, metadata_tag, INFINITE_FUTURE,
+ SEND_INITIAL_METADATA => kw)
+ metadata_tag
end
# Creates an ActiveCall.
@@ -91,69 +105,27 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
- # @param finished_tag [Object] the object used as the call's finish tag,
- # if the call has begun
- # @param read_metadata_tag [Object] the object used as the call's finish
- # tag, if the call has begun
+ # @param metadata_tag [Object] the object use obtain metadata for clients
# @param started [true|false] indicates if the call has begun
- def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
- read_metadata_tag: nil, started: true)
- fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
+ def initialize(call, q, marshal, unmarshal, deadline, started: true,
+ metadata_tag: nil)
+ fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
- fail(ArgumentError, 'not a CompletionQueue')
+ fail(TypeError, '!Core::CompletionQueue')
end
@call = call
@cq = q
@deadline = deadline
- @finished_tag = finished_tag
- @read_metadata_tag = read_metadata_tag
@marshal = marshal
@started = started
@unmarshal = unmarshal
+ @metadata_tag = metadata_tag
end
- # Obtains the status of the call.
- #
- # this value is nil until the call completes
- # @return this call's status
- def status
- @call.status
- end
-
- # Obtains the metadata of the call.
- #
- # At the start of the call this will be nil. During the call this gets
- # some values as soon as the other end of the connection acknowledges the
- # request.
- #
- # @return this calls's metadata
- def metadata
- @call.metadata
- end
-
- # Cancels the call.
- #
- # Cancels the call. The call does not return any result, but once this it
- # has been called, the call should eventually terminate. Due to potential
- # races between the execution of the cancel and the in-flight request, the
- # result of the call after calling #cancel is indeterminate:
- #
- # - the call may terminate with a BadStatus exception, with code=CANCELLED
- # - the call may terminate with OK Status, and return a response
- # - the call may terminate with a different BadStatus exception if that
- # was happening
- def cancel
- @call.cancel
- end
-
- # indicates if the call is shutdown
- def shutdown
- @shutdown ||= false
- end
-
- # indicates if the call is cancelled.
- def cancelled
- @cancelled ||= false
+ # output_metadata are provides access to hash that can be used to
+ # save metadata to be sent as trailer
+ def output_metadata
+ @output_metadata ||= {}
end
# multi_req_view provides a restricted view of this ActiveCall for use
@@ -176,128 +148,94 @@ module GRPC
# writes_done indicates that all writes are completed.
#
- # It blocks until the remote endpoint acknowledges by sending a FINISHED
- # event, unless assert_finished is set to false. Any calls to
- # #remote_send after this call will fail.
+ # It blocks until the remote endpoint acknowledges with at status unless
+ # assert_finished is set to false. Any calls to #remote_send after this
+ # call will fail.
#
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
def writes_done(assert_finished = true)
- @call.writes_done(self)
- ev = @cq.pluck(self, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISH_ACCEPTED)
- logger.debug("Writes done: waiting for finish? #{assert_finished}")
- ensure
- ev.close
- end
-
+ ops = {
+ SEND_CLOSE_FROM_CLIENT => nil
+ }
+ ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
+ batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
return unless assert_finished
- ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
- fail 'unexpected nil event' if ev.nil?
- ev.close
- @call.status
+ batch_result.check_status
end
- # finished waits until the call is completed.
+ # finished waits until a client call is completed.
#
- # It blocks until the remote endpoint acknowledges by sending a FINISHED
- # event.
+ # It blocks until the remote endpoint acknowledges by sending a status.
def finished
- ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
- begin
- fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
+ batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE,
+ RECV_STATUS_ON_CLIENT => nil)
+ unless batch_result.status.nil?
if @call.metadata.nil?
- @call.metadata = ev.result.metadata
+ @call.metadata = batch_result.status.metadata
else
- @call.metadata.merge!(ev.result.metadata)
+ @call.metadata.merge!(batch_result.status.metadata)
end
-
- if ev.result.code != Core::StatusCodes::OK
- fail BadStatus.new(ev.result.code, ev.result.details)
- end
- res = ev.result
- ensure
- ev.close
end
- res
+ batch_result.check_status
end
# remote_send sends a request to the remote endpoint.
#
- # It blocks until the remote endpoint acknowledges by sending a
- # WRITE_ACCEPTED. req can be marshalled already.
+ # It blocks until the remote endpoint accepts the message.
#
# @param req [Object, String] the object to send or it's marshal form.
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled = false)
- assert_queue_is_ready
- logger.debug("sending #{req.inspect}, marshalled? #{marshalled}")
+ logger.debug("sending #{req}, marshalled? #{marshalled}")
if marshalled
payload = req
else
payload = @marshal.call(req)
end
- @call.start_write(Core::ByteBuffer.new(payload), self)
-
- # call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
- # until the flow control allows another send on this call.
- ev = @cq.pluck(self, INFINITE_FUTURE)
- begin
- assert_event_type(ev, WRITE_ACCEPTED)
- ensure
- ev.close
- end
+ @call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload)
end
- # send_status sends a status to the remote endpoint
+ # send_status sends a status to the remote endpoint.
#
# @param code [int] the status code to send
# @param details [String] details
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
- def send_status(code = OK, details = '', assert_finished = false)
- assert_queue_is_ready
- @call.start_write_status(code, details, self)
- ev = @cq.pluck(self, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISH_ACCEPTED)
- ensure
- ev.close
- end
- logger.debug("Status sent: #{code}:'#{details}'")
- return finished if assert_finished
+ #
+ # == Keyword Arguments ==
+ # any keyword arguments are treated as metadata to be sent to the server
+ # if a keyword value is a list, multiple metadata for it's key are sent
+ def send_status(code = OK, details = '', assert_finished = false, **kw)
+ ops = {
+ SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, kw)
+ }
+ ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
+ @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
nil
end
# remote_read reads a response from the remote endpoint.
#
- # It blocks until the remote endpoint sends a READ or FINISHED event. On
- # a READ, it returns the response after unmarshalling it. On
- # FINISHED, it returns nil if the status is OK, otherwise raising
- # BadStatus
+ # It blocks until the remote endpoint replies with a message or status.
+ # On receiving a message, it returns the response after unmarshalling it.
+ # On receiving a status, it returns nil if the status is OK, otherwise
+ # raising BadStatus
def remote_read
- if @call.metadata.nil? && !@read_metadata_tag.nil?
- ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE)
- assert_event_type(ev, CLIENT_METADATA_READ)
- @call.metadata = ev.result
- @read_metadata_tag = nil
+ ops = { RECV_MESSAGE => nil }
+ ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil?
+ batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+ unless @metadata_tag.nil?
+ @call.metadata = batch_result.metadata
+ @metadata_tag = nil
end
-
- @call.start_read(self)
- ev = @cq.pluck(self, INFINITE_FUTURE)
- begin
- assert_event_type(ev, READ)
- logger.debug("received req: #{ev.result.inspect}")
- unless ev.result.nil?
- logger.debug("received req.to_s: #{ev.result}")
- res = @unmarshal.call(ev.result.to_s)
- logger.debug("received_req (unmarshalled): #{res.inspect}")
- return res
- end
- ensure
- ev.close
+ logger.debug("received req: #{batch_result}")
+ unless batch_result.nil? || batch_result.message.nil?
+ logger.debug("received req.to_s: #{batch_result.message}")
+ res = @unmarshal.call(batch_result.message)
+ logger.debug("received_req (unmarshalled): #{res.inspect}")
+ return res
end
logger.debug('found nil; the final response has been sent')
nil
@@ -324,7 +262,6 @@ module GRPC
return enum_for(:each_remote_read) unless block_given?
loop do
resp = remote_read
- break if resp.is_a? Struct::Status # is an OK status
break if resp.nil? # the last response was received
yield resp
end
@@ -379,6 +316,9 @@ module GRPC
response = remote_read
finished unless response.is_a? Struct::Status
response
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# client_streamer sends a stream of requests to a GRPC server, and
@@ -402,6 +342,9 @@ module GRPC
response = remote_read
finished unless response.is_a? Struct::Status
response
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# server_streamer sends one request to the GRPC server, which yields a
@@ -428,6 +371,9 @@ module GRPC
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -461,9 +407,11 @@ module GRPC
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started
- bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
- @finished_tag)
+ bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_client(requests, &blk)
+ rescue GRPC::Core::CallError => e
+ finished # checks for Cancelled
+ raise e
end
# run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -478,16 +426,16 @@ module GRPC
#
# @param gen_each_reply [Proc] generates the BiDi stream replies
def run_server_bidi(gen_each_reply)
- bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
- @finished_tag)
+ bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_server(gen_each_reply)
end
private
+ # Starts the call if not already started
def start_call(**kw)
- tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
- @finished_tag, @read_metadata_tag = tags
+ return if @started
+ @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
@started = true
end
@@ -505,32 +453,17 @@ module GRPC
# SingleReqView limits access to an ActiveCall's methods for use in server
# handlers that receive just one request.
- SingleReqView = view_class(:cancelled, :deadline, :metadata)
+ SingleReqView = view_class(:cancelled, :deadline, :metadata,
+ :output_metadata)
# MultiReqView limits access to an ActiveCall's methods for use in
# server client_streamer handlers.
MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg,
- :each_remote_read, :metadata)
+ :each_remote_read, :metadata, :output_metadata)
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
- :metadata, :status)
-
- # confirms that no events are enqueued, and that the queue is not
- # shutdown.
- def assert_queue_is_ready
- ev = nil
- begin
- ev = @cq.pluck(self, ZERO)
- fail "unexpected event #{ev.inspect}" unless ev.nil?
- rescue OutOfTime
- logging.debug('timed out waiting for next event')
- # expected, nothing should be on the queue and the deadline was ZERO,
- # except things using another tag
- ensure
- ev.close unless ev.nil?
- end
- end
+ :metadata, :status, :start_call)
end
end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index c66deaae60..4ca3004d6f 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -30,18 +30,12 @@
require 'forwardable'
require 'grpc/grpc'
-def assert_event_type(ev, want)
- fail OutOfTime if ev.nil?
- got = ev.type
- fail("Unexpected rpc event: got #{got}, want #{want}") unless got == want
-end
-
# GRPC contains the General RPC module.
module GRPC
# The BiDiCall class orchestrates exection of a BiDi stream on a client or
# server.
class BidiCall
- include Core::CompletionType
+ include Core::CallOps
include Core::StatusCodes
include Core::TimeConsts
@@ -63,8 +57,7 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
- # @param finished_tag [Object] the object used as the call's finish tag,
- def initialize(call, q, marshal, unmarshal, deadline, finished_tag)
+ def initialize(call, q, marshal, unmarshal, deadline)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
@@ -72,7 +65,6 @@ module GRPC
@call = call
@cq = q
@deadline = deadline
- @finished_tag = finished_tag
@marshal = marshal
@readq = Queue.new
@unmarshal = unmarshal
@@ -86,13 +78,11 @@ module GRPC
# @param requests the Enumerable of requests to send
# @return an Enumerator of requests to yield
def run_on_client(requests, &blk)
- enq_th = start_write_loop(requests)
- loop_th = start_read_loop
+ @enq_th = start_write_loop(requests)
+ @loop_th = start_read_loop
replies = each_queued_msg
return replies if blk.nil?
replies.each { |r| blk.call(r) }
- enq_th.join
- loop_th.join
end
# Begins orchestration of the Bidi stream for a server generating replies.
@@ -108,10 +98,8 @@ module GRPC
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
replys = gen_each_reply.call(each_queued_msg)
- enq_th = start_write_loop(replys, is_client: false)
- loop_th = start_read_loop
- loop_th.join
- enq_th.join
+ @enq_th = start_write_loop(replys, is_client: false)
+ @loop_th = start_read_loop
end
private
@@ -130,10 +118,12 @@ module GRPC
logger.debug("each_queued_msg: msg##{count}")
count += 1
req = @readq.pop
+ logger.debug("each_queued_msg: req = #{req}")
throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
end
+ @enq_th.join if @enq_th.alive?
end
# during bidi-streaming, read the requests to send from a separate thread
@@ -144,36 +134,23 @@ module GRPC
begin
count = 0
requests.each do |req|
+ logger.debug("bidi-write_loop: #{count}")
count += 1
payload = @marshal.call(req)
- @call.start_write(Core::ByteBuffer.new(payload), write_tag)
- ev = @cq.pluck(write_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, WRITE_ACCEPTED)
- ensure
- ev.close
- end
+ @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_MESSAGE => payload)
end
if is_client
- @call.writes_done(write_tag)
- ev = @cq.pluck(write_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISH_ACCEPTED)
- ensure
- ev.close
- end
- logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
- ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISHED)
- ensure
- ev.close
- end
- logger.debug('bidi-client: finished received')
+ logger.debug("bidi-write-loop: sent #{count}, waiting to finish")
+ batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_CLOSE_FROM_CLIENT => nil,
+ RECV_STATUS_ON_CLIENT => nil)
+ batch_result.check_status
end
rescue StandardError => e
- logger.warn('bidi: write_loop failed')
+ logger.warn('bidi-write_loop: failed')
logger.warn(e)
+ raise e
end
end
end
@@ -187,27 +164,22 @@ module GRPC
# queue the initial read before beginning the loop
loop do
- logger.debug("waiting for read #{count}")
+ logger.debug("bidi-read_loop: #{count}")
count += 1
- @call.start_read(read_tag)
- ev = @cq.pluck(read_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, READ)
-
- # handle the next event.
- if ev.result.nil?
- @readq.push(END_OF_READS)
- logger.debug('done reading!')
- break
- end
-
- # push the latest read onto the queue and continue reading
- logger.debug("received req: #{ev.result}")
- res = @unmarshal.call(ev.result.to_s)
- @readq.push(res)
- ensure
- ev.close
+ # TODO: ensure metadata is read if available, currently it's not
+ batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
+ RECV_MESSAGE => nil)
+ # handle the next message
+ if batch_result.message.nil?
+ @readq.push(END_OF_READS)
+ logger.debug('bidi-read-loop: done reading!')
+ break
end
+
+ # push the latest read onto the queue and continue reading
+ logger.debug("received req: #{batch_result.message}")
+ res = @unmarshal.call(batch_result.message)
+ @readq.push(res)
end
rescue StandardError => e
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index 01328d4a5b..7b2c04aa22 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -28,16 +28,16 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc/generic/active_call'
-require 'xray/thread_dump_signal_handler'
# GRPC contains the General RPC module.
module GRPC
# ClientStub represents an endpoint used to send requests to GRPC servers.
class ClientStub
include Core::StatusCodes
+ include Core::TimeConsts
- # Default deadline is 5 seconds.
- DEFAULT_DEADLINE = 5
+ # Default timeout is 5 seconds.
+ DEFAULT_TIMEOUT = 5
# setup_channel is used by #initialize to constuct a channel from its
# arguments.
@@ -51,6 +51,14 @@ module GRPC
Core::Channel.new(host, kw, creds)
end
+ def self.update_with_jwt_aud_uri(a_hash, host, method)
+ last_slash_idx, res = method.rindex('/'), a_hash.clone
+ return res if last_slash_idx.nil?
+ service_name = method[0..(last_slash_idx - 1)]
+ res[:jwt_aud_uri] = "https://#{host}#{service_name}"
+ res
+ end
+
# check_update_metadata is used by #initialize verify that it's a Proc.
def self.check_update_metadata(update_metadata)
return update_metadata if update_metadata.nil?
@@ -76,8 +84,8 @@ module GRPC
# present the host and arbitrary keyword arg areignored, and the RPC
# connection uses this channel.
#
- # - :deadline
- # when present, this is the default deadline used for calls
+ # - :timeout
+ # when present, this is the default timeout used for calls
#
# - :update_metadata
# when present, this a func that takes a hash and returns a hash
@@ -87,13 +95,13 @@ module GRPC
# @param host [String] the host the stub connects to
# @param q [Core::CompletionQueue] used to wait for events
# @param channel_override [Core::Channel] a pre-created channel
- # @param deadline [Number] the default deadline to use in requests
+ # @param timeout [Number] the default timeout to use in requests
# @param creds [Core::Credentials] the channel
# @param update_metadata a func that updates metadata as described above
# @param kw [KeywordArgs]the channel arguments
def initialize(host, q,
channel_override: nil,
- deadline: DEFAULT_DEADLINE,
+ timeout: nil,
creds: nil,
update_metadata: nil,
**kw)
@@ -103,7 +111,7 @@ module GRPC
@update_metadata = ClientStub.check_update_metadata(update_metadata)
alt_host = kw[Core::Channel::SSL_TARGET]
@host = alt_host.nil? ? host : alt_host
- @deadline = deadline
+ @timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout
end
# request_response sends a request to a GRPC server, and returns the
@@ -140,13 +148,14 @@ module GRPC
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Numeric] (optional) the max completion time in seconds
+ # @param timeout [Numeric] (optional) the max completion time in seconds
# @param return_op [true|false] return an Operation if true
# @return [Object] the response received from the server
- def request_response(method, req, marshal, unmarshal, deadline = nil,
+ def request_response(method, req, marshal, unmarshal, timeout = nil,
return_op: false, **kw)
- c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
- md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+ c = new_active_call(method, marshal, unmarshal, timeout)
+ kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
+ md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.request_response(req, **md) unless return_op
# return the operation view of the active_call; define #execute as a
@@ -197,13 +206,14 @@ module GRPC
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Numeric] the max completion time in seconds
+ # @param timeout [Numeric] the max completion time in seconds
# @param return_op [true|false] return an Operation if true
# @return [Object|Operation] the response received from the server
- def client_streamer(method, requests, marshal, unmarshal, deadline = nil,
+ def client_streamer(method, requests, marshal, unmarshal, timeout = nil,
return_op: false, **kw)
- c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
- md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+ c = new_active_call(method, marshal, unmarshal, timeout)
+ kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
+ md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.client_streamer(requests, **md) unless return_op
# return the operation view of the active_call; define #execute as a
@@ -262,14 +272,15 @@ module GRPC
# @param req [Object] the request sent to the server
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Numeric] the max completion time in seconds
+ # @param timeout [Numeric] the max completion time in seconds
# @param return_op [true|false]return an Operation if true
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|Operation|nil] as discussed above
- def server_streamer(method, req, marshal, unmarshal, deadline = nil,
+ def server_streamer(method, req, marshal, unmarshal, timeout = nil,
return_op: false, **kw, &blk)
- c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
- md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+ c = new_active_call(method, marshal, unmarshal, timeout)
+ kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
+ md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.server_streamer(req, **md, &blk) unless return_op
# return the operation view of the active_call; define #execute
@@ -367,14 +378,15 @@ module GRPC
# @param requests [Object] an Enumerable of requests to send
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Numeric] (optional) the max completion time in seconds
+ # @param timeout [Numeric] (optional) the max completion time in seconds
# @param blk [Block] when provided, is executed for each response
# @param return_op [true|false] return an Operation if true
# @return [Enumerator|nil|Operation] as discussed above
- def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil,
+ def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil,
return_op: false, **kw, &blk)
- c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
- md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+ c = new_active_call(method, marshal, unmarshal, timeout)
+ kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
+ md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.bidi_streamer(requests, **md, &blk) unless return_op
# return the operation view of the active_call; define #execute
@@ -390,15 +402,14 @@ module GRPC
# Creates a new active stub
#
- # @param ch [GRPC::Channel] the channel used to create the stub.
+ # @param method [string] the method being called.
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [TimeConst]
- def new_active_call(ch, marshal, unmarshal, deadline = nil)
- absolute_deadline = Core::TimeConsts.from_relative_time(deadline)
- call = @ch.create_call(ch, @host, absolute_deadline)
- ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline,
- started: false)
+ # @param timeout [TimeConst]
+ def new_active_call(method, marshal, unmarshal, timeout = nil)
+ deadline = from_relative_time(timeout.nil? ? @timeout : timeout)
+ call = @ch.create_call(@queue, method, @host, deadline)
+ ActiveCall.new(call, @queue, marshal, unmarshal, deadline, started: false)
end
end
end
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index 2cb3d2eebf..10211ae239 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -80,26 +80,21 @@ module GRPC
else # is a bidi_stream
active_call.run_server_bidi(mth)
end
- send_status(active_call, OK, 'OK')
- active_call.finished
+ send_status(active_call, OK, 'OK', **active_call.output_metadata)
rescue BadStatus => e
- # this is raised by handlers that want GRPC to send an application
- # error code and detail message.
+ # this is raised by handlers that want GRPC to send an application error
+ # code and detail message and some additional app-specific metadata.
logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}")
- send_status(active_call, e.code, e.details)
+ send_status(active_call, e.code, e.details, **e.metadata)
rescue Core::CallError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
logger.warn("failed call: #{active_call}\n#{e}")
- rescue OutOfTime
+ rescue Core::OutOfTime
# This is raised when active_call#method.call exceeeds the deadline
# event. Send a status of deadline exceeded
logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
- rescue Core::EventError => e
- # This is raised by GRPC internals but should rarely, if ever happen.
- # Log it, but don't notify the other endpoint..
- logger.warn("failed call: #{active_call}\n#{e}")
rescue StandardError => e
# This will usuaally be an unhandled error in the handling code.
# Send back a UNKNOWN status to the client
@@ -140,9 +135,9 @@ module GRPC
"##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}"
end
- def send_status(active_client, code, details)
+ def send_status(active_client, code, details, **kw)
details = 'Not sure why' if details.nil?
- active_client.send_status(code, details)
+ active_client.send_status(code, details, code == OK, **kw)
rescue StandardError => e
logger.warn("Could not send status #{code}:#{details}")
logger.warn(e)
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 35e84023be..3375fcf20a 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -31,14 +31,142 @@ require 'grpc/grpc'
require 'grpc/generic/active_call'
require 'grpc/generic/service'
require 'thread'
-require 'xray/thread_dump_signal_handler'
+
+# A global that contains signals the gRPC servers should respond to.
+$grpc_signals = []
# GRPC contains the General RPC module.
module GRPC
+ # Handles the signals in $grpc_signals.
+ #
+ # @return false if the server should exit, true if not.
+ def handle_signals
+ loop do
+ sig = $grpc_signals.shift
+ case sig
+ when 'INT'
+ return false
+ when 'TERM'
+ return false
+ end
+ end
+ true
+ end
+ module_function :handle_signals
+
+ # Sets up a signal handler that adds signals to the signal handling global.
+ #
+ # Signal handlers should do as little as humanly possible.
+ # Here, they just add themselves to $grpc_signals
+ #
+ # RpcServer (and later other parts of gRPC) monitors the signals
+ # $grpc_signals in its own non-signal context.
+ def trap_signals
+ %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
+ end
+ module_function :trap_signals
+
+ # Pool is a simple thread pool.
+ class Pool
+ # Default keep alive period is 1s
+ DEFAULT_KEEP_ALIVE = 1
+
+ def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
+ fail 'pool size must be positive' unless size > 0
+ @jobs = Queue.new
+ @size = size
+ @stopped = false
+ @stop_mutex = Mutex.new
+ @stop_cond = ConditionVariable.new
+ @workers = []
+ @keep_alive = keep_alive
+ end
+
+ # Returns the number of jobs waiting
+ def jobs_waiting
+ @jobs.size
+ end
+
+ # Runs the given block on the queue with the provided args.
+ #
+ # @param args the args passed blk when it is called
+ # @param blk the block to call
+ def schedule(*args, &blk)
+ fail 'already stopped' if @stopped
+ return if blk.nil?
+ logger.info('schedule another job')
+ @jobs << [blk, args]
+ end
+
+ # Starts running the jobs in the thread pool.
+ def start
+ fail 'already stopped' if @stopped
+ until @workers.size == @size.to_i
+ next_thread = Thread.new do
+ catch(:exit) do # allows { throw :exit } to kill a thread
+ loop_execute_jobs
+ end
+ remove_current_thread
+ end
+ @workers << next_thread
+ end
+ end
+
+ # Stops the jobs in the pool
+ def stop
+ logger.info('stopping, will wait for all the workers to exit')
+ @workers.size.times { schedule { throw :exit } }
+ @stopped = true
+ @stop_mutex.synchronize do # wait @keep_alive for works to stop
+ @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
+ end
+ forcibly_stop_workers
+ logger.info('stopped, all workers are shutdown')
+ end
+
+ protected
+
+ # Forcibly shutdown any threads that are still alive.
+ def forcibly_stop_workers
+ return unless @workers.size > 0
+ logger.info("forcibly terminating #{@workers.size} worker(s)")
+ @workers.each do |t|
+ next unless t.alive?
+ begin
+ t.exit
+ rescue StandardError => e
+ logger.warn('error while terminating a worker')
+ logger.warn(e)
+ end
+ end
+ end
+
+ # removes the threads from workers, and signal when all the
+ # threads are complete.
+ def remove_current_thread
+ @stop_mutex.synchronize do
+ @workers.delete(Thread.current)
+ @stop_cond.signal if @workers.size == 0
+ end
+ end
+
+ def loop_execute_jobs
+ loop do
+ begin
+ blk, args = @jobs.pop
+ blk.call(*args)
+ rescue StandardError => e
+ logger.warn('Error in worker thread')
+ logger.warn(e)
+ end
+ end
+ end
+ end
+
# RpcServer hosts a number of services and makes them available on the
# network.
class RpcServer
- include Core::CompletionType
+ include Core::CallOps
include Core::TimeConsts
extend ::Forwardable
@@ -50,6 +178,38 @@ module GRPC
# Default max_waiting_requests size is 20
DEFAULT_MAX_WAITING_REQUESTS = 20
+ # Default poll period is 1s
+ DEFAULT_POLL_PERIOD = 1
+
+ # Signal check period is 0.25s
+ SIGNAL_CHECK_PERIOD = 0.25
+
+ # setup_cq is used by #initialize to constuct a Core::CompletionQueue from
+ # its arguments.
+ def self.setup_cq(alt_cq)
+ return Core::CompletionQueue.new if alt_cq.nil?
+ unless alt_cq.is_a? Core::CompletionQueue
+ fail(TypeError, '!CompletionQueue')
+ end
+ alt_cq
+ end
+
+ # setup_srv is used by #initialize to constuct a Core::Server from its
+ # arguments.
+ def self.setup_srv(alt_srv, cq, **kw)
+ return Core::Server.new(cq, kw) if alt_srv.nil?
+ fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server
+ alt_srv
+ end
+
+ # setup_connect_md_proc is used by #initialize to validate the
+ # connect_md_proc.
+ def self.setup_connect_md_proc(a_proc)
+ return nil if a_proc.nil?
+ fail(TypeError, '!Proc') unless a_proc.is_a? Proc
+ a_proc
+ end
+
# Creates a new RpcServer.
#
# The RPC server is configured using keyword arguments.
@@ -77,30 +237,21 @@ module GRPC
# * max_waiting_requests: the maximum number of requests that are not
# being handled to allow. When this limit is exceeded, the server responds
# with not available to new requests
+ #
+ # * connect_md_proc:
+ # when non-nil is a proc for determining metadata to to send back the client
+ # on receiving an invocation req. The proc signature is:
+ # {key: val, ..} func(method_name, {key: val, ...})
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
- poll_period:INFINITE_FUTURE,
+ poll_period:DEFAULT_POLL_PERIOD,
completion_queue_override:nil,
server_override:nil,
+ connect_md_proc:nil,
**kw)
- if completion_queue_override.nil?
- cq = Core::CompletionQueue.new
- else
- cq = completion_queue_override
- unless cq.is_a? Core::CompletionQueue
- fail(ArgumentError, 'not a CompletionQueue')
- end
- end
- @cq = cq
-
- if server_override.nil?
- srv = Core::Server.new(@cq, kw)
- else
- srv = server_override
- fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server
- end
- @server = srv
-
+ @cq = RpcServer.setup_cq(completion_queue_override)
+ @server = RpcServer.setup_srv(server_override, @cq, **kw)
+ @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
@pool_size = pool_size
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@@ -117,6 +268,13 @@ module GRPC
return unless @running
@stopped = true
@pool.stop
+
+ # TODO: uncomment this:
+ #
+ # This segfaults in the c layer, so its commented out for now. Shutdown
+ # still occurs, but the c layer has to do the cleanup.
+ #
+ # @server.close
end
# determines if the server is currently running
@@ -139,7 +297,21 @@ module GRPC
running?
end
- # determines if the server is currently stopped
+ # Runs the server in its own thread, then waits for signal INT or TERM on
+ # the current thread to terminate it.
+ def run_till_terminated
+ GRPC.trap_signals
+ t = Thread.new { run }
+ wait_till_running
+ loop do
+ sleep SIGNAL_CHECK_PERIOD
+ break unless GRPC.handle_signals
+ end
+ stop
+ t.join
+ end
+
+ # Determines if the server is currently stopped
def stopped?
@stopped ||= false
end
@@ -202,154 +374,71 @@ module GRPC
end
@pool.start
@server.start
- server_tag = Object.new
- until stopped?
- @server.request_call(server_tag)
- ev = @cq.pluck(server_tag, @poll_period)
- next if ev.nil?
- if ev.type != SERVER_RPC_NEW
- logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
- ev.close
- next
- end
- c = new_active_server_call(ev.call, ev.result)
- unless c.nil?
- mth = ev.result.method.to_sym
- ev.close
- @pool.schedule(c) do |call|
- rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
- end
- end
- end
+ loop_handle_server_calls
@running = false
end
- def new_active_server_call(call, new_server_rpc)
- # Accept the call. This is necessary even if a status is to be sent
- # back immediately
- finished_tag = Object.new
- call_queue = Core::CompletionQueue.new
- call.metadata = new_server_rpc.metadata # store the metadata
- call.server_accept(call_queue, finished_tag)
- call.server_end_initial_metadata
-
- # Send UNAVAILABLE if there are too many unprocessed jobs
+ # Sends UNAVAILABLE if there are too many unprocessed jobs
+ def available?(an_rpc)
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
logger.info("waiting: #{jobs_count}, max: #{max}")
- if @pool.jobs_waiting > @max_waiting_requests
- logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}")
- noop = proc { |x| x }
- c = ActiveCall.new(call, call_queue, noop, noop,
- new_server_rpc.deadline,
- finished_tag: finished_tag)
- c.send_status(StatusCodes::UNAVAILABLE, '')
- return nil
- end
-
- # Send NOT_FOUND if the method does not exist
- mth = new_server_rpc.method.to_sym
- unless rpc_descs.key?(mth)
- logger.warn("NOT_FOUND: #{new_server_rpc}")
- noop = proc { |x| x }
- c = ActiveCall.new(call, call_queue, noop, noop,
- new_server_rpc.deadline,
- finished_tag: finished_tag)
- c.send_status(StatusCodes::NOT_FOUND, '')
- return nil
- end
-
- # Create the ActiveCall
- rpc_desc = rpc_descs[mth]
- logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})")
- ActiveCall.new(call, call_queue,
- rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
- new_server_rpc.deadline, finished_tag: finished_tag)
+ return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
+ logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
+ noop = proc { |x| x }
+ c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
+ c.send_status(StatusCodes::UNAVAILABLE, '')
+ nil
end
- # Pool is a simple thread pool for running server requests.
- class Pool
- def initialize(size)
- fail 'pool size must be positive' unless size > 0
- @jobs = Queue.new
- @size = size
- @stopped = false
- @stop_mutex = Mutex.new
- @stop_cond = ConditionVariable.new
- @workers = []
- end
-
- # Returns the number of jobs waiting
- def jobs_waiting
- @jobs.size
- end
-
- # Runs the given block on the queue with the provided args.
- #
- # @param args the args passed blk when it is called
- # @param blk the block to call
- def schedule(*args, &blk)
- fail 'already stopped' if @stopped
- return if blk.nil?
- logger.info('schedule another job')
- @jobs << [blk, args]
- end
+ # Sends NOT_FOUND if the method can't be found
+ def found?(an_rpc)
+ mth = an_rpc.method.to_sym
+ return an_rpc if rpc_descs.key?(mth)
+ logger.warn("NOT_FOUND: #{an_rpc}")
+ noop = proc { |x| x }
+ c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
+ c.send_status(StatusCodes::NOT_FOUND, '')
+ nil
+ end
- # Starts running the jobs in the thread pool.
- def start
- fail 'already stopped' if @stopped
- until @workers.size == @size.to_i
- next_thread = Thread.new do
- catch(:exit) do # allows { throw :exit } to kill a thread
- loop do
- begin
- blk, args = @jobs.pop
- blk.call(*args)
- rescue StandardError => e
- logger.warn('Error in worker thread')
- logger.warn(e)
- end
- end
- end
-
- # removes the threads from workers, and signal when all the
- # threads are complete.
- @stop_mutex.synchronize do
- @workers.delete(Thread.current)
- @stop_cond.signal if @workers.size == 0
- end
+ # handles calls to the server
+ def loop_handle_server_calls
+ fail 'not running' unless @running
+ request_call_tag = Object.new
+ until stopped?
+ deadline = from_relative_time(@poll_period)
+ an_rpc = @server.request_call(@cq, request_call_tag, deadline)
+ c = new_active_server_call(an_rpc)
+ unless c.nil?
+ mth = an_rpc.method.to_sym
+ @pool.schedule(c) do |call|
+ rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
end
- @workers << next_thread
end
end
+ end
- # Stops the jobs in the pool
- def stop
- logger.info('stopping, will wait for all the workers to exit')
- @workers.size.times { schedule { throw :exit } }
- @stopped = true
-
- # TODO: allow configuration of the keepalive period
- keep_alive = 5
- @stop_mutex.synchronize do
- @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0
- end
-
- # Forcibly shutdown any threads that are still alive.
- if @workers.size > 0
- logger.warn("forcibly terminating #{@workers.size} worker(s)")
- @workers.each do |t|
- next unless t.alive?
- begin
- t.exit
- rescue StandardError => e
- logger.warn('error while terminating a worker')
- logger.warn(e)
- end
- end
- end
+ def new_active_server_call(an_rpc)
+ return nil if an_rpc.nil? || an_rpc.call.nil?
- logger.info('stopped, all workers are shutdown')
+ # allow the metadata to be accessed from the call
+ handle_call_tag = Object.new
+ an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers
+ connect_md = nil
+ unless @connect_md_proc.nil?
+ connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
end
+ an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE,
+ SEND_INITIAL_METADATA => connect_md)
+ return nil unless available?(an_rpc)
+ return nil unless found?(an_rpc)
+
+ # Create the ActiveCall
+ logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
+ rpc_desc = rpc_descs[an_rpc.method.to_sym]
+ ActiveCall.new(an_rpc.call, @cq,
+ rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
+ an_rpc.deadline)
end
protected
@@ -362,11 +451,9 @@ module GRPC
@rpc_handlers ||= {}
end
- private
-
def assert_valid_service_class(cls)
unless cls.include?(GenericService)
- fail "#{cls} should 'include GenericService'"
+ fail "#{cls} must 'include GenericService'"
end
if cls.rpc_descs.size == 0
fail "#{cls} should specify some rpc descriptions"
@@ -376,21 +463,17 @@ module GRPC
def add_rpc_descs_for(service)
cls = service.is_a?(Class) ? service : service.class
- specs = rpc_descs
- handlers = rpc_handlers
+ specs, handlers = rpc_descs, rpc_handlers
cls.rpc_descs.each_pair do |name, spec|
route = "/#{cls.service_name}/#{name}".to_sym
- if specs.key? route
- fail "Cannot add rpc #{route} from #{spec}, already registered"
+ fail "already registered: rpc #{route} from #{spec}" if specs.key? route
+ specs[route] = spec
+ if service.is_a?(Class)
+ handlers[route] = cls.new.method(name.to_s.underscore.to_sym)
else
- specs[route] = spec
- if service.is_a?(Class)
- handlers[route] = cls.new.method(name.to_s.underscore.to_sym)
- else
- handlers[route] = service.method(name.to_s.underscore.to_sym)
- end
- logger.info("handling #{route} with #{handlers[route]}")
+ handlers[route] = service.method(name.to_s.underscore.to_sym)
end
+ logger.info("handling #{route} with #{handlers[route]}")
end
end
end
diff --git a/src/ruby/lib/grpc/core/event.rb b/src/ruby/lib/grpc/notifier.rb
index 194aa8ecac..caa18bbed6 100644
--- a/src/ruby/lib/grpc/core/event.rb
+++ b/src/ruby/lib/grpc/notifier.rb
@@ -27,17 +27,33 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-require 'grpc'
-
# GRPC contains the General RPC module.
module GRPC
- module Core
- # Event is a class defined in the c extension
- #
- # Here, we add an inspect method.
- class Event
- def inspect
- "<#{self.class}: type:#{type}, tag:#{tag} result:#{result}>"
+ # Notifier is useful high-level synchronization primitive.
+ class Notifier
+ attr_reader :payload, :notified
+ alias_method :notified?, :notified
+
+ def initialize
+ @mutex = Mutex.new
+ @cvar = ConditionVariable.new
+ @notified = false
+ @payload = nil
+ end
+
+ def wait
+ @mutex.synchronize do
+ @cvar.wait(@mutex) until notified?
+ end
+ end
+
+ def notify(payload)
+ @mutex.synchronize do
+ return Error.new('already notified') if notified?
+ @payload = payload
+ @notified = true
+ @cvar.signal
+ return nil
end
end
end
diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb
index 513a53724f..072fb9b1aa 100644
--- a/src/ruby/lib/grpc/version.rb
+++ b/src/ruby/lib/grpc/version.rb
@@ -29,5 +29,5 @@
# GRPC contains the General RPC module.
module GRPC
- VERSION = '0.5.0'
+ VERSION = '0.6.1'
end