diff options
author | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2015-04-24 00:12:30 +0200 |
---|---|---|
committer | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2015-04-24 00:38:19 +0200 |
commit | 23be2803662c19f73a66c82c9d5dbd62b537515f (patch) | |
tree | 9d8afeb18c21cddcbfb9bb014f08f1474021db98 /src/ruby/lib | |
parent | b7c2035e83a9b3e346f1fd37f9ad55c2070fb02e (diff) | |
parent | 3afd92ff511f52db3ecf892d9af65053323c89cb (diff) |
Merge branch 'master' of github.com:grpc/grpc into the-purge-2
Conflicts:
src/cpp/client/channel.cc
vsprojects/vs2010/grpc++.vcxproj
vsprojects/vs2013/grpc++.vcxproj.filters
Diffstat (limited to 'src/ruby/lib')
-rw-r--r-- | src/ruby/lib/grpc.rb | 1 | ||||
-rw-r--r-- | src/ruby/lib/grpc/core/event.rb | 44 | ||||
-rw-r--r-- | src/ruby/lib/grpc/errors.rb | 15 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 273 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/bidi_call.rb | 93 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/client_stub.rb | 73 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_desc.rb | 19 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 422 | ||||
-rw-r--r-- | src/ruby/lib/grpc/version.rb | 2 |
9 files changed, 446 insertions, 496 deletions
diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index dd02ef7666..b0f68035cd 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -31,7 +31,6 @@ require 'grpc/errors' require 'grpc/grpc' require 'grpc/logconfig' require 'grpc/version' -require 'grpc/core/event' require 'grpc/core/time_consts' require 'grpc/generic/active_call' require 'grpc/generic/client_stub' diff --git a/src/ruby/lib/grpc/core/event.rb b/src/ruby/lib/grpc/core/event.rb deleted file mode 100644 index 194aa8ecac..0000000000 --- a/src/ruby/lib/grpc/core/event.rb +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -require 'grpc' - -# GRPC contains the General RPC module. -module GRPC - module Core - # Event is a class defined in the c extension - # - # Here, we add an inspect method. - class Event - def inspect - "<#{self.class}: type:#{type}, tag:#{tag} result:#{result}>" - end - end - end -end diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb index 58944872b5..f1201c1704 100644 --- a/src/ruby/lib/grpc/errors.rb +++ b/src/ruby/lib/grpc/errors.rb @@ -31,23 +31,20 @@ require 'grpc' # GRPC contains the General RPC module. module GRPC - # OutOfTime is an exception class that indicates that an RPC exceeded its - # deadline. - OutOfTime = Class.new(StandardError) - # BadStatus is an exception class that indicates that an error occurred at # either end of a GRPC connection. When raised, it indicates that a status # error should be returned to the other end of a GRPC connection; when # caught it means that this end received a status error. class BadStatus < StandardError - attr_reader :code, :details + attr_reader :code, :details, :metadata # @param code [Numeric] the status code # @param details [String] the details of the exception - def initialize(code, details = 'unknown cause') + def initialize(code, details = 'unknown cause', **kw) super("#{code}:#{details}") @code = code @details = details + @metadata = kw end # Converts the exception to a GRPC::Status for use in the networking @@ -55,7 +52,11 @@ module GRPC # # @return [Status] with the same code and details def to_status - Status.new(code, details) + Struct::Status.new(code, details, @metadata) end end + + # Cancelled is an exception class that indicates that an rpc was cancelled. + class Cancelled < StandardError + end end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 6256330e88..43ba549905 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -30,10 +30,23 @@ require 'forwardable' require 'grpc/generic/bidi_call' -def assert_event_type(ev, want) - fail OutOfTime if ev.nil? - got = ev.type - fail "Unexpected rpc event: got #{got}, want #{want}" unless got == want +class Struct + # BatchResult is the struct returned by calls to call#start_batch. + class BatchResult + # check_status returns the status, raising an error if the status + # is non-nil and not OK. + def check_status + return nil if status.nil? + fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED + if status.code != GRPC::Core::StatusCodes::OK + # raise BadStatus, propagating the metadata if present. + md = status.metadata + with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }] + fail GRPC::BadStatus.new(status.code, status.details, **with_sym_keys) + end + status + end + end end # GRPC contains the General RPC module. @@ -41,10 +54,12 @@ module GRPC # The ActiveCall class provides simple methods for sending marshallable # data to a call class ActiveCall - include Core::CompletionType include Core::StatusCodes include Core::TimeConsts + include Core::CallOps + extend Forwardable attr_reader(:deadline) + def_delegators :@call, :cancel, :metadata # client_invoke begins a client invocation. # @@ -61,15 +76,14 @@ module GRPC # @param q [CompletionQueue] the completion queue # @param deadline [Fixnum,TimeSpec] the deadline def self.client_invoke(call, q, _deadline, **kw) - fail(ArgumentError, 'not a call') unless call.is_a? Core::Call + fail(TypeError, '!Core::Call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') + fail(TypeError, '!Core::CompletionQueue') end - call.add_metadata(kw) if kw.length > 0 - client_metadata_read = Object.new - finished_tag = Object.new - call.invoke(q, client_metadata_read, finished_tag) - [finished_tag, client_metadata_read] + metadata_tag = Object.new + call.run_batch(q, metadata_tag, INFINITE_FUTURE, + SEND_INITIAL_METADATA => kw) + metadata_tag end # Creates an ActiveCall. @@ -91,69 +105,27 @@ module GRPC # @param marshal [Function] f(obj)->string that marshal requests # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param deadline [Fixnum] the deadline for the call to complete - # @param finished_tag [Object] the object used as the call's finish tag, - # if the call has begun - # @param read_metadata_tag [Object] the object used as the call's finish - # tag, if the call has begun + # @param metadata_tag [Object] the object use obtain metadata for clients # @param started [true|false] indicates if the call has begun - def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil, - read_metadata_tag: nil, started: true) - fail(ArgumentError, 'not a call') unless call.is_a? Core::Call + def initialize(call, q, marshal, unmarshal, deadline, started: true, + metadata_tag: nil) + fail(TypeError, '!Core::Call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') + fail(TypeError, '!Core::CompletionQueue') end @call = call @cq = q @deadline = deadline - @finished_tag = finished_tag - @read_metadata_tag = read_metadata_tag @marshal = marshal @started = started @unmarshal = unmarshal + @metadata_tag = metadata_tag end - # Obtains the status of the call. - # - # this value is nil until the call completes - # @return this call's status - def status - @call.status - end - - # Obtains the metadata of the call. - # - # At the start of the call this will be nil. During the call this gets - # some values as soon as the other end of the connection acknowledges the - # request. - # - # @return this calls's metadata - def metadata - @call.metadata - end - - # Cancels the call. - # - # Cancels the call. The call does not return any result, but once this it - # has been called, the call should eventually terminate. Due to potential - # races between the execution of the cancel and the in-flight request, the - # result of the call after calling #cancel is indeterminate: - # - # - the call may terminate with a BadStatus exception, with code=CANCELLED - # - the call may terminate with OK Status, and return a response - # - the call may terminate with a different BadStatus exception if that - # was happening - def cancel - @call.cancel - end - - # indicates if the call is shutdown - def shutdown - @shutdown ||= false - end - - # indicates if the call is cancelled. - def cancelled - @cancelled ||= false + # output_metadata are provides access to hash that can be used to + # save metadata to be sent as trailer + def output_metadata + @output_metadata ||= {} end # multi_req_view provides a restricted view of this ActiveCall for use @@ -176,128 +148,94 @@ module GRPC # writes_done indicates that all writes are completed. # - # It blocks until the remote endpoint acknowledges by sending a FINISHED - # event, unless assert_finished is set to false. Any calls to - # #remote_send after this call will fail. + # It blocks until the remote endpoint acknowledges with at status unless + # assert_finished is set to false. Any calls to #remote_send after this + # call will fail. # # @param assert_finished [true, false] when true(default), waits for # FINISHED. def writes_done(assert_finished = true) - @call.writes_done(self) - ev = @cq.pluck(self, INFINITE_FUTURE) - begin - assert_event_type(ev, FINISH_ACCEPTED) - logger.debug("Writes done: waiting for finish? #{assert_finished}") - ensure - ev.close - end - + ops = { + SEND_CLOSE_FROM_CLIENT => nil + } + ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished + batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) return unless assert_finished - ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) - fail 'unexpected nil event' if ev.nil? - ev.close - @call.status + batch_result.check_status end - # finished waits until the call is completed. + # finished waits until a client call is completed. # - # It blocks until the remote endpoint acknowledges by sending a FINISHED - # event. + # It blocks until the remote endpoint acknowledges by sending a status. def finished - ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) - begin - fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED + batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, + RECV_STATUS_ON_CLIENT => nil) + unless batch_result.status.nil? if @call.metadata.nil? - @call.metadata = ev.result.metadata + @call.metadata = batch_result.status.metadata else - @call.metadata.merge!(ev.result.metadata) + @call.metadata.merge!(batch_result.status.metadata) end - - if ev.result.code != Core::StatusCodes::OK - fail BadStatus.new(ev.result.code, ev.result.details) - end - res = ev.result - ensure - ev.close end - res + batch_result.check_status end # remote_send sends a request to the remote endpoint. # - # It blocks until the remote endpoint acknowledges by sending a - # WRITE_ACCEPTED. req can be marshalled already. + # It blocks until the remote endpoint accepts the message. # # @param req [Object, String] the object to send or it's marshal form. # @param marshalled [false, true] indicates if the object is already # marshalled. def remote_send(req, marshalled = false) - assert_queue_is_ready logger.debug("sending #{req.inspect}, marshalled? #{marshalled}") if marshalled payload = req else payload = @marshal.call(req) end - @call.start_write(Core::ByteBuffer.new(payload), self) - - # call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return - # until the flow control allows another send on this call. - ev = @cq.pluck(self, INFINITE_FUTURE) - begin - assert_event_type(ev, WRITE_ACCEPTED) - ensure - ev.close - end + @call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload) end - # send_status sends a status to the remote endpoint + # send_status sends a status to the remote endpoint. # # @param code [int] the status code to send # @param details [String] details # @param assert_finished [true, false] when true(default), waits for # FINISHED. - def send_status(code = OK, details = '', assert_finished = false) - assert_queue_is_ready - @call.start_write_status(code, details, self) - ev = @cq.pluck(self, INFINITE_FUTURE) - begin - assert_event_type(ev, FINISH_ACCEPTED) - ensure - ev.close - end - logger.debug("Status sent: #{code}:'#{details}'") - return finished if assert_finished + # + # == Keyword Arguments == + # any keyword arguments are treated as metadata to be sent to the server + # if a keyword value is a list, multiple metadata for it's key are sent + def send_status(code = OK, details = '', assert_finished = false, **kw) + ops = { + SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, kw) + } + ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished + @call.run_batch(@cq, self, INFINITE_FUTURE, ops) nil end # remote_read reads a response from the remote endpoint. # - # It blocks until the remote endpoint sends a READ or FINISHED event. On - # a READ, it returns the response after unmarshalling it. On - # FINISHED, it returns nil if the status is OK, otherwise raising - # BadStatus + # It blocks until the remote endpoint replies with a message or status. + # On receiving a message, it returns the response after unmarshalling it. + # On receiving a status, it returns nil if the status is OK, otherwise + # raising BadStatus def remote_read - if @call.metadata.nil? && !@read_metadata_tag.nil? - ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE) - assert_event_type(ev, CLIENT_METADATA_READ) - @call.metadata = ev.result - @read_metadata_tag = nil + ops = { RECV_MESSAGE => nil } + ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil? + batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) + unless @metadata_tag.nil? + @call.metadata = batch_result.metadata + @metadata_tag = nil end - - @call.start_read(self) - ev = @cq.pluck(self, INFINITE_FUTURE) - begin - assert_event_type(ev, READ) - logger.debug("received req: #{ev.result.inspect}") - unless ev.result.nil? - logger.debug("received req.to_s: #{ev.result}") - res = @unmarshal.call(ev.result.to_s) - logger.debug("received_req (unmarshalled): #{res.inspect}") - return res - end - ensure - ev.close + logger.debug("received req: #{batch_result}") + unless batch_result.nil? || batch_result.message.nil? + logger.debug("received req.to_s: #{batch_result.message}") + res = @unmarshal.call(batch_result.message) + logger.debug("received_req (unmarshalled): #{res.inspect}") + return res end logger.debug('found nil; the final response has been sent') nil @@ -324,7 +262,6 @@ module GRPC return enum_for(:each_remote_read) unless block_given? loop do resp = remote_read - break if resp.is_a? Struct::Status # is an OK status break if resp.nil? # the last response was received yield resp end @@ -379,6 +316,9 @@ module GRPC response = remote_read finished unless response.is_a? Struct::Status response + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # client_streamer sends a stream of requests to a GRPC server, and @@ -402,6 +342,9 @@ module GRPC response = remote_read finished unless response.is_a? Struct::Status response + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # server_streamer sends one request to the GRPC server, which yields a @@ -428,6 +371,9 @@ module GRPC replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # bidi_streamer sends a stream of requests to the GRPC server, and yields @@ -461,9 +407,11 @@ module GRPC # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, **kw, &blk) start_call(**kw) unless @started - bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline, - @finished_tag) + bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd.run_on_client(requests, &blk) + rescue GRPC::Core::CallError => e + finished # checks for Cancelled + raise e end # run_server_bidi orchestrates a BiDi stream processing on a server. @@ -478,16 +426,16 @@ module GRPC # # @param gen_each_reply [Proc] generates the BiDi stream replies def run_server_bidi(gen_each_reply) - bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline, - @finished_tag) + bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd.run_on_server(gen_each_reply) end private + # Starts the call if not already started def start_call(**kw) - tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) - @finished_tag, @read_metadata_tag = tags + return if @started + @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) @started = true end @@ -505,32 +453,17 @@ module GRPC # SingleReqView limits access to an ActiveCall's methods for use in server # handlers that receive just one request. - SingleReqView = view_class(:cancelled, :deadline, :metadata) + SingleReqView = view_class(:cancelled, :deadline, :metadata, + :output_metadata) # MultiReqView limits access to an ActiveCall's methods for use in # server client_streamer handlers. MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg, - :each_remote_read, :metadata) + :each_remote_read, :metadata, :output_metadata) # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. Operation = view_class(:cancel, :cancelled, :deadline, :execute, - :metadata, :status) - - # confirms that no events are enqueued, and that the queue is not - # shutdown. - def assert_queue_is_ready - ev = nil - begin - ev = @cq.pluck(self, ZERO) - fail "unexpected event #{ev.inspect}" unless ev.nil? - rescue OutOfTime - logging.debug('timed out waiting for next event') - # expected, nothing should be on the queue and the deadline was ZERO, - # except things using another tag - ensure - ev.close unless ev.nil? - end - end + :metadata, :status, :start_call) end end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index c66deaae60..b813ab5b54 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -30,18 +30,12 @@ require 'forwardable' require 'grpc/grpc' -def assert_event_type(ev, want) - fail OutOfTime if ev.nil? - got = ev.type - fail("Unexpected rpc event: got #{got}, want #{want}") unless got == want -end - # GRPC contains the General RPC module. module GRPC # The BiDiCall class orchestrates exection of a BiDi stream on a client or # server. class BidiCall - include Core::CompletionType + include Core::CallOps include Core::StatusCodes include Core::TimeConsts @@ -63,8 +57,7 @@ module GRPC # @param marshal [Function] f(obj)->string that marshal requests # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param deadline [Fixnum] the deadline for the call to complete - # @param finished_tag [Object] the object used as the call's finish tag, - def initialize(call, q, marshal, unmarshal, deadline, finished_tag) + def initialize(call, q, marshal, unmarshal, deadline) fail(ArgumentError, 'not a call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue fail(ArgumentError, 'not a CompletionQueue') @@ -72,7 +65,6 @@ module GRPC @call = call @cq = q @deadline = deadline - @finished_tag = finished_tag @marshal = marshal @readq = Queue.new @unmarshal = unmarshal @@ -86,13 +78,11 @@ module GRPC # @param requests the Enumerable of requests to send # @return an Enumerator of requests to yield def run_on_client(requests, &blk) - enq_th = start_write_loop(requests) - loop_th = start_read_loop + @enq_th = start_write_loop(requests) + @loop_th = start_read_loop replies = each_queued_msg return replies if blk.nil? replies.each { |r| blk.call(r) } - enq_th.join - loop_th.join end # Begins orchestration of the Bidi stream for a server generating replies. @@ -108,10 +98,8 @@ module GRPC # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) - enq_th = start_write_loop(replys, is_client: false) - loop_th = start_read_loop - loop_th.join - enq_th.join + @enq_th = start_write_loop(replys, is_client: false) + @loop_th = start_read_loop end private @@ -130,10 +118,13 @@ module GRPC logger.debug("each_queued_msg: msg##{count}") count += 1 req = @readq.pop + logger.debug("each_queued_msg: req = #{req}") throw req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req end + @loop_th.join + @enq_th.join end # during bidi-streaming, read the requests to send from a separate thread @@ -144,36 +135,23 @@ module GRPC begin count = 0 requests.each do |req| + logger.debug("bidi-write_loop: #{count}") count += 1 payload = @marshal.call(req) - @call.start_write(Core::ByteBuffer.new(payload), write_tag) - ev = @cq.pluck(write_tag, INFINITE_FUTURE) - begin - assert_event_type(ev, WRITE_ACCEPTED) - ensure - ev.close - end + @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_MESSAGE => payload) end if is_client - @call.writes_done(write_tag) - ev = @cq.pluck(write_tag, INFINITE_FUTURE) - begin - assert_event_type(ev, FINISH_ACCEPTED) - ensure - ev.close - end - logger.debug("bidi-client: sent #{count} reqs, waiting to finish") - ev = @cq.pluck(@finished_tag, INFINITE_FUTURE) - begin - assert_event_type(ev, FINISHED) - ensure - ev.close - end - logger.debug('bidi-client: finished received') + logger.debug("bidi-write-loop: sent #{count}, waiting to finish") + batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil, + RECV_STATUS_ON_CLIENT => nil) + batch_result.check_status end rescue StandardError => e - logger.warn('bidi: write_loop failed') + logger.warn('bidi-write_loop: failed') logger.warn(e) + raise e end end end @@ -187,27 +165,22 @@ module GRPC # queue the initial read before beginning the loop loop do - logger.debug("waiting for read #{count}") + logger.debug("bidi-read_loop: #{count}") count += 1 - @call.start_read(read_tag) - ev = @cq.pluck(read_tag, INFINITE_FUTURE) - begin - assert_event_type(ev, READ) - - # handle the next event. - if ev.result.nil? - @readq.push(END_OF_READS) - logger.debug('done reading!') - break - end - - # push the latest read onto the queue and continue reading - logger.debug("received req: #{ev.result}") - res = @unmarshal.call(ev.result.to_s) - @readq.push(res) - ensure - ev.close + # TODO: ensure metadata is read if available, currently it's not + batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, + RECV_MESSAGE => nil) + # handle the next message + if batch_result.message.nil? + @readq.push(END_OF_READS) + logger.debug('bidi-read-loop: done reading!') + break end + + # push the latest read onto the queue and continue reading + logger.debug("received req: #{batch_result.message}") + res = @unmarshal.call(batch_result.message) + @readq.push(res) end rescue StandardError => e diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb index 01328d4a5b..7b2c04aa22 100644 --- a/src/ruby/lib/grpc/generic/client_stub.rb +++ b/src/ruby/lib/grpc/generic/client_stub.rb @@ -28,16 +28,16 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require 'grpc/generic/active_call' -require 'xray/thread_dump_signal_handler' # GRPC contains the General RPC module. module GRPC # ClientStub represents an endpoint used to send requests to GRPC servers. class ClientStub include Core::StatusCodes + include Core::TimeConsts - # Default deadline is 5 seconds. - DEFAULT_DEADLINE = 5 + # Default timeout is 5 seconds. + DEFAULT_TIMEOUT = 5 # setup_channel is used by #initialize to constuct a channel from its # arguments. @@ -51,6 +51,14 @@ module GRPC Core::Channel.new(host, kw, creds) end + def self.update_with_jwt_aud_uri(a_hash, host, method) + last_slash_idx, res = method.rindex('/'), a_hash.clone + return res if last_slash_idx.nil? + service_name = method[0..(last_slash_idx - 1)] + res[:jwt_aud_uri] = "https://#{host}#{service_name}" + res + end + # check_update_metadata is used by #initialize verify that it's a Proc. def self.check_update_metadata(update_metadata) return update_metadata if update_metadata.nil? @@ -76,8 +84,8 @@ module GRPC # present the host and arbitrary keyword arg areignored, and the RPC # connection uses this channel. # - # - :deadline - # when present, this is the default deadline used for calls + # - :timeout + # when present, this is the default timeout used for calls # # - :update_metadata # when present, this a func that takes a hash and returns a hash @@ -87,13 +95,13 @@ module GRPC # @param host [String] the host the stub connects to # @param q [Core::CompletionQueue] used to wait for events # @param channel_override [Core::Channel] a pre-created channel - # @param deadline [Number] the default deadline to use in requests + # @param timeout [Number] the default timeout to use in requests # @param creds [Core::Credentials] the channel # @param update_metadata a func that updates metadata as described above # @param kw [KeywordArgs]the channel arguments def initialize(host, q, channel_override: nil, - deadline: DEFAULT_DEADLINE, + timeout: nil, creds: nil, update_metadata: nil, **kw) @@ -103,7 +111,7 @@ module GRPC @update_metadata = ClientStub.check_update_metadata(update_metadata) alt_host = kw[Core::Channel::SSL_TARGET] @host = alt_host.nil? ? host : alt_host - @deadline = deadline + @timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout end # request_response sends a request to a GRPC server, and returns the @@ -140,13 +148,14 @@ module GRPC # @param req [Object] the request sent to the server # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Numeric] (optional) the max completion time in seconds + # @param timeout [Numeric] (optional) the max completion time in seconds # @param return_op [true|false] return an Operation if true # @return [Object] the response received from the server - def request_response(method, req, marshal, unmarshal, deadline = nil, + def request_response(method, req, marshal, unmarshal, timeout = nil, return_op: false, **kw) - c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + c = new_active_call(method, marshal, unmarshal, timeout) + kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.request_response(req, **md) unless return_op # return the operation view of the active_call; define #execute as a @@ -197,13 +206,14 @@ module GRPC # @param requests [Object] an Enumerable of requests to send # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Numeric] the max completion time in seconds + # @param timeout [Numeric] the max completion time in seconds # @param return_op [true|false] return an Operation if true # @return [Object|Operation] the response received from the server - def client_streamer(method, requests, marshal, unmarshal, deadline = nil, + def client_streamer(method, requests, marshal, unmarshal, timeout = nil, return_op: false, **kw) - c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + c = new_active_call(method, marshal, unmarshal, timeout) + kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.client_streamer(requests, **md) unless return_op # return the operation view of the active_call; define #execute as a @@ -262,14 +272,15 @@ module GRPC # @param req [Object] the request sent to the server # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Numeric] the max completion time in seconds + # @param timeout [Numeric] the max completion time in seconds # @param return_op [true|false]return an Operation if true # @param blk [Block] when provided, is executed for each response # @return [Enumerator|Operation|nil] as discussed above - def server_streamer(method, req, marshal, unmarshal, deadline = nil, + def server_streamer(method, req, marshal, unmarshal, timeout = nil, return_op: false, **kw, &blk) - c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + c = new_active_call(method, marshal, unmarshal, timeout) + kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.server_streamer(req, **md, &blk) unless return_op # return the operation view of the active_call; define #execute @@ -367,14 +378,15 @@ module GRPC # @param requests [Object] an Enumerable of requests to send # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Numeric] (optional) the max completion time in seconds + # @param timeout [Numeric] (optional) the max completion time in seconds # @param blk [Block] when provided, is executed for each response # @param return_op [true|false] return an Operation if true # @return [Enumerator|nil|Operation] as discussed above - def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil, + def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil, return_op: false, **kw, &blk) - c = new_active_call(method, marshal, unmarshal, deadline || @deadline) - md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) + c = new_active_call(method, marshal, unmarshal, timeout) + kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) + md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.bidi_streamer(requests, **md, &blk) unless return_op # return the operation view of the active_call; define #execute @@ -390,15 +402,14 @@ module GRPC # Creates a new active stub # - # @param ch [GRPC::Channel] the channel used to create the stub. + # @param method [string] the method being called. # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [TimeConst] - def new_active_call(ch, marshal, unmarshal, deadline = nil) - absolute_deadline = Core::TimeConsts.from_relative_time(deadline) - call = @ch.create_call(ch, @host, absolute_deadline) - ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline, - started: false) + # @param timeout [TimeConst] + def new_active_call(method, marshal, unmarshal, timeout = nil) + deadline = from_relative_time(timeout.nil? ? @timeout : timeout) + call = @ch.create_call(@queue, method, @host, deadline) + ActiveCall.new(call, @queue, marshal, unmarshal, deadline, started: false) end end end diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 2cb3d2eebf..10211ae239 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -80,26 +80,21 @@ module GRPC else # is a bidi_stream active_call.run_server_bidi(mth) end - send_status(active_call, OK, 'OK') - active_call.finished + send_status(active_call, OK, 'OK', **active_call.output_metadata) rescue BadStatus => e - # this is raised by handlers that want GRPC to send an application - # error code and detail message. + # this is raised by handlers that want GRPC to send an application error + # code and detail message and some additional app-specific metadata. logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}") - send_status(active_call, e.code, e.details) + send_status(active_call, e.code, e.details, **e.metadata) rescue Core::CallError => e # This is raised by GRPC internals but should rarely, if ever happen. # Log it, but don't notify the other endpoint.. logger.warn("failed call: #{active_call}\n#{e}") - rescue OutOfTime + rescue Core::OutOfTime # This is raised when active_call#method.call exceeeds the deadline # event. Send a status of deadline exceeded logger.warn("late call: #{active_call}") send_status(active_call, DEADLINE_EXCEEDED, 'late') - rescue Core::EventError => e - # This is raised by GRPC internals but should rarely, if ever happen. - # Log it, but don't notify the other endpoint.. - logger.warn("failed call: #{active_call}\n#{e}") rescue StandardError => e # This will usuaally be an unhandled error in the handling code. # Send back a UNKNOWN status to the client @@ -140,9 +135,9 @@ module GRPC "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}" end - def send_status(active_client, code, details) + def send_status(active_client, code, details, **kw) details = 'Not sure why' if details.nil? - active_client.send_status(code, details) + active_client.send_status(code, details, code == OK, **kw) rescue StandardError => e logger.warn("Could not send status #{code}:#{details}") logger.warn(e) diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 35e84023be..88c24aa92b 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -31,14 +31,130 @@ require 'grpc/grpc' require 'grpc/generic/active_call' require 'grpc/generic/service' require 'thread' -require 'xray/thread_dump_signal_handler' + +# A global that contains signals the gRPC servers should respond to. +$grpc_signals = [] # GRPC contains the General RPC module. module GRPC + # Handles the signals in $grpc_signals. + # + # @return false if the server should exit, true if not. + def handle_signals + loop do + sig = $grpc_signals.shift + case sig + when 'INT' + return false + when 'TERM' + return false + end + end + true + end + module_function :handle_signals + + # Pool is a simple thread pool. + class Pool + # Default keep alive period is 1s + DEFAULT_KEEP_ALIVE = 1 + + def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) + fail 'pool size must be positive' unless size > 0 + @jobs = Queue.new + @size = size + @stopped = false + @stop_mutex = Mutex.new + @stop_cond = ConditionVariable.new + @workers = [] + @keep_alive = keep_alive + end + + # Returns the number of jobs waiting + def jobs_waiting + @jobs.size + end + + # Runs the given block on the queue with the provided args. + # + # @param args the args passed blk when it is called + # @param blk the block to call + def schedule(*args, &blk) + fail 'already stopped' if @stopped + return if blk.nil? + logger.info('schedule another job') + @jobs << [blk, args] + end + + # Starts running the jobs in the thread pool. + def start + fail 'already stopped' if @stopped + until @workers.size == @size.to_i + next_thread = Thread.new do + catch(:exit) do # allows { throw :exit } to kill a thread + loop_execute_jobs + end + remove_current_thread + end + @workers << next_thread + end + end + + # Stops the jobs in the pool + def stop + logger.info('stopping, will wait for all the workers to exit') + @workers.size.times { schedule { throw :exit } } + @stopped = true + @stop_mutex.synchronize do # wait @keep_alive for works to stop + @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 + end + forcibly_stop_workers + logger.info('stopped, all workers are shutdown') + end + + protected + + # Forcibly shutdown any threads that are still alive. + def forcibly_stop_workers + return unless @workers.size > 0 + logger.info("forcibly terminating #{@workers.size} worker(s)") + @workers.each do |t| + next unless t.alive? + begin + t.exit + rescue StandardError => e + logger.warn('error while terminating a worker') + logger.warn(e) + end + end + end + + # removes the threads from workers, and signal when all the + # threads are complete. + def remove_current_thread + @stop_mutex.synchronize do + @workers.delete(Thread.current) + @stop_cond.signal if @workers.size == 0 + end + end + + def loop_execute_jobs + loop do + begin + blk, args = @jobs.pop + blk.call(*args) + rescue StandardError => e + logger.warn('Error in worker thread') + logger.warn(e) + end + end + end + end + # RpcServer hosts a number of services and makes them available on the # network. class RpcServer - include Core::CompletionType + include Core::CallOps include Core::TimeConsts extend ::Forwardable @@ -50,6 +166,49 @@ module GRPC # Default max_waiting_requests size is 20 DEFAULT_MAX_WAITING_REQUESTS = 20 + # Default poll period is 1s + DEFAULT_POLL_PERIOD = 1 + + # Signal check period is 0.25s + SIGNAL_CHECK_PERIOD = 0.25 + + # Sets up a signal handler that adds signals to the signal handling global. + # + # Signal handlers should do as little as humanly possible. + # Here, they just add themselves to $grpc_signals + # + # RpcServer (and later other parts of gRPC) monitors the signals + # $grpc_signals in its own non-signal context. + def self.trap_signals + %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } + end + + # setup_cq is used by #initialize to constuct a Core::CompletionQueue from + # its arguments. + def self.setup_cq(alt_cq) + return Core::CompletionQueue.new if alt_cq.nil? + unless alt_cq.is_a? Core::CompletionQueue + fail(TypeError, '!CompletionQueue') + end + alt_cq + end + + # setup_srv is used by #initialize to constuct a Core::Server from its + # arguments. + def self.setup_srv(alt_srv, cq, **kw) + return Core::Server.new(cq, kw) if alt_srv.nil? + fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server + alt_srv + end + + # setup_connect_md_proc is used by #initialize to validate the + # connect_md_proc. + def self.setup_connect_md_proc(a_proc) + return nil if a_proc.nil? + fail(TypeError, '!Proc') unless a_proc.is_a? Proc + a_proc + end + # Creates a new RpcServer. # # The RPC server is configured using keyword arguments. @@ -77,30 +236,21 @@ module GRPC # * max_waiting_requests: the maximum number of requests that are not # being handled to allow. When this limit is exceeded, the server responds # with not available to new requests + # + # * connect_md_proc: + # when non-nil is a proc for determining metadata to to send back the client + # on receiving an invocation req. The proc signature is: + # {key: val, ..} func(method_name, {key: val, ...}) def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, - poll_period:INFINITE_FUTURE, + poll_period:DEFAULT_POLL_PERIOD, completion_queue_override:nil, server_override:nil, + connect_md_proc:nil, **kw) - if completion_queue_override.nil? - cq = Core::CompletionQueue.new - else - cq = completion_queue_override - unless cq.is_a? Core::CompletionQueue - fail(ArgumentError, 'not a CompletionQueue') - end - end - @cq = cq - - if server_override.nil? - srv = Core::Server.new(@cq, kw) - else - srv = server_override - fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server - end - @server = srv - + @cq = RpcServer.setup_cq(completion_queue_override) + @server = RpcServer.setup_srv(server_override, @cq, **kw) + @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @pool_size = pool_size @max_waiting_requests = max_waiting_requests @poll_period = poll_period @@ -117,6 +267,13 @@ module GRPC return unless @running @stopped = true @pool.stop + + # TODO: uncomment this: + # + # This segfaults in the c layer, so its commented out for now. Shutdown + # still occurs, but the c layer has to do the cleanup. + # + # @server.close end # determines if the server is currently running @@ -139,7 +296,21 @@ module GRPC running? end - # determines if the server is currently stopped + # Runs the server in its own thread, then waits for signal INT or TERM on + # the current thread to terminate it. + def run_till_terminated + self.class.trap_signals + t = Thread.new { run } + wait_till_running + loop do + sleep SIGNAL_CHECK_PERIOD + break unless handle_signals + end + stop + t.join + end + + # Determines if the server is currently stopped def stopped? @stopped ||= false end @@ -202,154 +373,71 @@ module GRPC end @pool.start @server.start - server_tag = Object.new - until stopped? - @server.request_call(server_tag) - ev = @cq.pluck(server_tag, @poll_period) - next if ev.nil? - if ev.type != SERVER_RPC_NEW - logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}") - ev.close - next - end - c = new_active_server_call(ev.call, ev.result) - unless c.nil? - mth = ev.result.method.to_sym - ev.close - @pool.schedule(c) do |call| - rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) - end - end - end + loop_handle_server_calls @running = false end - def new_active_server_call(call, new_server_rpc) - # Accept the call. This is necessary even if a status is to be sent - # back immediately - finished_tag = Object.new - call_queue = Core::CompletionQueue.new - call.metadata = new_server_rpc.metadata # store the metadata - call.server_accept(call_queue, finished_tag) - call.server_end_initial_metadata - - # Send UNAVAILABLE if there are too many unprocessed jobs + # Sends UNAVAILABLE if there are too many unprocessed jobs + def available?(an_rpc) jobs_count, max = @pool.jobs_waiting, @max_waiting_requests logger.info("waiting: #{jobs_count}, max: #{max}") - if @pool.jobs_waiting > @max_waiting_requests - logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}") - noop = proc { |x| x } - c = ActiveCall.new(call, call_queue, noop, noop, - new_server_rpc.deadline, - finished_tag: finished_tag) - c.send_status(StatusCodes::UNAVAILABLE, '') - return nil - end - - # Send NOT_FOUND if the method does not exist - mth = new_server_rpc.method.to_sym - unless rpc_descs.key?(mth) - logger.warn("NOT_FOUND: #{new_server_rpc}") - noop = proc { |x| x } - c = ActiveCall.new(call, call_queue, noop, noop, - new_server_rpc.deadline, - finished_tag: finished_tag) - c.send_status(StatusCodes::NOT_FOUND, '') - return nil - end - - # Create the ActiveCall - rpc_desc = rpc_descs[mth] - logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})") - ActiveCall.new(call, call_queue, - rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), - new_server_rpc.deadline, finished_tag: finished_tag) + return an_rpc if @pool.jobs_waiting <= @max_waiting_requests + logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") + noop = proc { |x| x } + c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) + c.send_status(StatusCodes::UNAVAILABLE, '') + nil end - # Pool is a simple thread pool for running server requests. - class Pool - def initialize(size) - fail 'pool size must be positive' unless size > 0 - @jobs = Queue.new - @size = size - @stopped = false - @stop_mutex = Mutex.new - @stop_cond = ConditionVariable.new - @workers = [] - end - - # Returns the number of jobs waiting - def jobs_waiting - @jobs.size - end - - # Runs the given block on the queue with the provided args. - # - # @param args the args passed blk when it is called - # @param blk the block to call - def schedule(*args, &blk) - fail 'already stopped' if @stopped - return if blk.nil? - logger.info('schedule another job') - @jobs << [blk, args] - end + # Sends NOT_FOUND if the method can't be found + def found?(an_rpc) + mth = an_rpc.method.to_sym + return an_rpc if rpc_descs.key?(mth) + logger.warn("NOT_FOUND: #{an_rpc}") + noop = proc { |x| x } + c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) + c.send_status(StatusCodes::NOT_FOUND, '') + nil + end - # Starts running the jobs in the thread pool. - def start - fail 'already stopped' if @stopped - until @workers.size == @size.to_i - next_thread = Thread.new do - catch(:exit) do # allows { throw :exit } to kill a thread - loop do - begin - blk, args = @jobs.pop - blk.call(*args) - rescue StandardError => e - logger.warn('Error in worker thread') - logger.warn(e) - end - end - end - - # removes the threads from workers, and signal when all the - # threads are complete. - @stop_mutex.synchronize do - @workers.delete(Thread.current) - @stop_cond.signal if @workers.size == 0 - end + # handles calls to the server + def loop_handle_server_calls + fail 'not running' unless @running + request_call_tag = Object.new + until stopped? + deadline = from_relative_time(@poll_period) + an_rpc = @server.request_call(@cq, request_call_tag, deadline) + c = new_active_server_call(an_rpc) + unless c.nil? + mth = an_rpc.method.to_sym + @pool.schedule(c) do |call| + rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) end - @workers << next_thread end end + end - # Stops the jobs in the pool - def stop - logger.info('stopping, will wait for all the workers to exit') - @workers.size.times { schedule { throw :exit } } - @stopped = true - - # TODO: allow configuration of the keepalive period - keep_alive = 5 - @stop_mutex.synchronize do - @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0 - end - - # Forcibly shutdown any threads that are still alive. - if @workers.size > 0 - logger.warn("forcibly terminating #{@workers.size} worker(s)") - @workers.each do |t| - next unless t.alive? - begin - t.exit - rescue StandardError => e - logger.warn('error while terminating a worker') - logger.warn(e) - end - end - end + def new_active_server_call(an_rpc) + return nil if an_rpc.nil? || an_rpc.call.nil? - logger.info('stopped, all workers are shutdown') + # allow the metadata to be accessed from the call + handle_call_tag = Object.new + an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers + connect_md = nil + unless @connect_md_proc.nil? + connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) end + an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE, + SEND_INITIAL_METADATA => connect_md) + return nil unless available?(an_rpc) + return nil unless found?(an_rpc) + + # Create the ActiveCall + logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") + rpc_desc = rpc_descs[an_rpc.method.to_sym] + ActiveCall.new(an_rpc.call, @cq, + rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), + an_rpc.deadline) end protected @@ -362,11 +450,9 @@ module GRPC @rpc_handlers ||= {} end - private - def assert_valid_service_class(cls) unless cls.include?(GenericService) - fail "#{cls} should 'include GenericService'" + fail "#{cls} must 'include GenericService'" end if cls.rpc_descs.size == 0 fail "#{cls} should specify some rpc descriptions" @@ -376,21 +462,17 @@ module GRPC def add_rpc_descs_for(service) cls = service.is_a?(Class) ? service : service.class - specs = rpc_descs - handlers = rpc_handlers + specs, handlers = rpc_descs, rpc_handlers cls.rpc_descs.each_pair do |name, spec| route = "/#{cls.service_name}/#{name}".to_sym - if specs.key? route - fail "Cannot add rpc #{route} from #{spec}, already registered" + fail "already registered: rpc #{route} from #{spec}" if specs.key? route + specs[route] = spec + if service.is_a?(Class) + handlers[route] = cls.new.method(name.to_s.underscore.to_sym) else - specs[route] = spec - if service.is_a?(Class) - handlers[route] = cls.new.method(name.to_s.underscore.to_sym) - else - handlers[route] = service.method(name.to_s.underscore.to_sym) - end - logger.info("handling #{route} with #{handlers[route]}") + handlers[route] = service.method(name.to_s.underscore.to_sym) end + logger.info("handling #{route} with #{handlers[route]}") end end end diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb index bfd0cbb393..072fb9b1aa 100644 --- a/src/ruby/lib/grpc/version.rb +++ b/src/ruby/lib/grpc/version.rb @@ -29,5 +29,5 @@ # GRPC contains the General RPC module. module GRPC - VERSION = '0.6.0' + VERSION = '0.6.1' end |