aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/ruby/lib/grpc.rb2
-rw-r--r--src/ruby/lib/grpc/auth/compute_engine.rb60
-rw-r--r--src/ruby/lib/grpc/auth/service_account.rb42
-rw-r--r--src/ruby/lib/grpc/core/event.rb21
-rw-r--r--src/ruby/lib/grpc/core/time_consts.rb69
-rw-r--r--src/ruby/lib/grpc/errors.rb50
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb904
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb318
-rw-r--r--src/ruby/lib/grpc/generic/client_stub.rb704
-rw-r--r--src/ruby/lib/grpc/generic/rpc_desc.rb201
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb640
-rw-r--r--src/ruby/lib/grpc/generic/service.rb312
-rw-r--r--src/ruby/lib/grpc/logconfig.rb6
-rw-r--r--src/ruby/lib/grpc/version.rb8
-rw-r--r--src/ruby/spec/auth/compute_engine_spec.rb4
-rw-r--r--src/ruby/spec/auth/service_account_spec.rb4
16 files changed, 1663 insertions, 1682 deletions
diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb
index 758ac0c2d1..21253848a7 100644
--- a/src/ruby/lib/grpc.rb
+++ b/src/ruby/lib/grpc.rb
@@ -41,4 +41,4 @@ require 'grpc/generic/service'
require 'grpc/generic/rpc_server'
# alias GRPC
-GRPC = Google::RPC
+GRPC = GRPC
diff --git a/src/ruby/lib/grpc/auth/compute_engine.rb b/src/ruby/lib/grpc/auth/compute_engine.rb
index 9004bef46e..5cb1e1a4dc 100644
--- a/src/ruby/lib/grpc/auth/compute_engine.rb
+++ b/src/ruby/lib/grpc/auth/compute_engine.rb
@@ -30,39 +30,37 @@
require 'faraday'
require 'grpc/auth/signet'
-module Google
- module RPC
- # Module Auth provides classes that provide Google-specific authentication
- # used to access Google gRPC services.
- module Auth
- # Extends Signet::OAuth2::Client so that the auth token is obtained from
- # the GCE metadata server.
- class GCECredentials < Signet::OAuth2::Client
- COMPUTE_AUTH_TOKEN_URI = 'http://metadata/computeMetadata/v1/'\
- 'instance/service-accounts/default/token'
- COMPUTE_CHECK_URI = 'http://metadata.google.internal'
+module GRPC
+ # Module Auth provides classes that provide Google-specific authentication
+ # used to access Google gRPC services.
+ module Auth
+ # Extends Signet::OAuth2::Client so that the auth token is obtained from
+ # the GCE metadata server.
+ class GCECredentials < Signet::OAuth2::Client
+ COMPUTE_AUTH_TOKEN_URI = 'http://metadata/computeMetadata/v1/'\
+ 'instance/service-accounts/default/token'
+ COMPUTE_CHECK_URI = 'http://metadata.google.internal'
- # Detect if this appear to be a GCE instance, by checking if metadata
- # is available
- def self.on_gce?(options = {})
- c = options[:connection] || Faraday.default_connection
- resp = c.get(COMPUTE_CHECK_URI)
- return false unless resp.status == 200
- return false unless resp.headers.key?('Metadata-Flavor')
- return resp.headers['Metadata-Flavor'] == 'Google'
- rescue Faraday::ConnectionFailed
- return false
- end
+ # Detect if this appear to be a GCE instance, by checking if metadata
+ # is available
+ def self.on_gce?(options = {})
+ c = options[:connection] || Faraday.default_connection
+ resp = c.get(COMPUTE_CHECK_URI)
+ return false unless resp.status == 200
+ return false unless resp.headers.key?('Metadata-Flavor')
+ return resp.headers['Metadata-Flavor'] == 'Google'
+ rescue Faraday::ConnectionFailed
+ return false
+ end
- # Overrides the super class method to change how access tokens are
- # fetched.
- def fetch_access_token(options = {})
- c = options[:connection] || Faraday.default_connection
- c.headers = { 'Metadata-Flavor' => 'Google' }
- resp = c.get(COMPUTE_AUTH_TOKEN_URI)
- Signet::OAuth2.parse_credentials(resp.body,
- resp.headers['content-type'])
- end
+ # Overrides the super class method to change how access tokens are
+ # fetched.
+ def fetch_access_token(options = {})
+ c = options[:connection] || Faraday.default_connection
+ c.headers = { 'Metadata-Flavor' => 'Google' }
+ resp = c.get(COMPUTE_AUTH_TOKEN_URI)
+ Signet::OAuth2.parse_credentials(resp.body,
+ resp.headers['content-type'])
end
end
end
diff --git a/src/ruby/lib/grpc/auth/service_account.rb b/src/ruby/lib/grpc/auth/service_account.rb
index 35b5cbfe2d..14b81a9e03 100644
--- a/src/ruby/lib/grpc/auth/service_account.rb
+++ b/src/ruby/lib/grpc/auth/service_account.rb
@@ -39,29 +39,27 @@ def read_json_key(json_key_io)
[json_key['private_key'], json_key['client_email']]
end
-module Google
- module RPC
- # Module Auth provides classes that provide Google-specific authentication
- # used to access Google gRPC services.
- module Auth
- # Authenticates requests using Google's Service Account credentials.
- # (cf https://developers.google.com/accounts/docs/OAuth2ServiceAccount)
- class ServiceAccountCredentials < Signet::OAuth2::Client
- TOKEN_CRED_URI = 'https://www.googleapis.com/oauth2/v3/token'
- AUDIENCE = TOKEN_CRED_URI
+module GRPC
+ # Module Auth provides classes that provide Google-specific authentication
+ # used to access Google gRPC services.
+ module Auth
+ # Authenticates requests using Google's Service Account credentials.
+ # (cf https://developers.google.com/accounts/docs/OAuth2ServiceAccount)
+ class ServiceAccountCredentials < Signet::OAuth2::Client
+ TOKEN_CRED_URI = 'https://www.googleapis.com/oauth2/v3/token'
+ AUDIENCE = TOKEN_CRED_URI
- # Initializes a ServiceAccountCredentials.
- #
- # @param scope [string|array] the scope(s) to access
- # @param json_key_io [IO] an IO from which the JSON key can be read
- def initialize(scope, json_key_io)
- private_key, client_email = read_json_key(json_key_io)
- super(token_credential_uri: TOKEN_CRED_URI,
- audience: AUDIENCE,
- scope: scope,
- issuer: client_email,
- signing_key: OpenSSL::PKey::RSA.new(private_key))
- end
+ # Initializes a ServiceAccountCredentials.
+ #
+ # @param scope [string|array] the scope(s) to access
+ # @param json_key_io [IO] an IO from which the JSON key can be read
+ def initialize(scope, json_key_io)
+ private_key, client_email = read_json_key(json_key_io)
+ super(token_credential_uri: TOKEN_CRED_URI,
+ audience: AUDIENCE,
+ scope: scope,
+ issuer: client_email,
+ signing_key: OpenSSL::PKey::RSA.new(private_key))
end
end
end
diff --git a/src/ruby/lib/grpc/core/event.rb b/src/ruby/lib/grpc/core/event.rb
index 9a333589c2..27df86b4b6 100644
--- a/src/ruby/lib/grpc/core/event.rb
+++ b/src/ruby/lib/grpc/core/event.rb
@@ -27,16 +27,17 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-module Google
- module RPC
- module Core
- # Event is a class defined in the c extension
- #
- # Here, we add an inspect method.
- class Event
- def inspect
- "<#{self.class}: type:#{type}, tag:#{tag} result:#{result}>"
- end
+require 'grpc'
+
+# GRPC contains the General RPC module.
+module GRPC
+ module Core
+ # Event is a class defined in the c extension
+ #
+ # Here, we add an inspect method.
+ class Event
+ def inspect
+ "<#{self.class}: type:#{type}, tag:#{tag} result:#{result}>"
end
end
end
diff --git a/src/ruby/lib/grpc/core/time_consts.rb b/src/ruby/lib/grpc/core/time_consts.rb
index 6876dcb02e..3f3414cfda 100644
--- a/src/ruby/lib/grpc/core/time_consts.rb
+++ b/src/ruby/lib/grpc/core/time_consts.rb
@@ -29,44 +29,43 @@
require 'grpc'
-module Google
- module RPC
- module Core
- # TimeConsts is a module from the C extension.
+# GRPC contains the General RPC module.
+module GRPC
+ module Core
+ # TimeConsts is a module from the C extension.
+ #
+ # Here it's re-opened to add a utility func.
+ module TimeConsts
+ # Converts a time delta to an absolute deadline.
#
- # Here it's re-opened to add a utility func.
- module TimeConsts
- # Converts a time delta to an absolute deadline.
- #
- # Assumes timeish is a relative time, and converts its to an absolute,
- # with following exceptions:
- #
- # * if timish is one of the TimeConsts.TimeSpec constants the value is
- # preserved.
- # * timish < 0 => TimeConsts.INFINITE_FUTURE
- # * timish == 0 => TimeConsts.ZERO
- #
- # @param timeish [Number|TimeSpec]
- # @return timeish [Number|TimeSpec]
- def from_relative_time(timeish)
- if timeish.is_a? TimeSpec
- timeish
- elsif timeish.nil?
- TimeConsts::ZERO
- elsif !timeish.is_a? Numeric
- fail(TypeError,
- "Cannot make an absolute deadline from #{timeish.inspect}")
- elsif timeish < 0
- TimeConsts::INFINITE_FUTURE
- elsif timeish == 0
- TimeConsts::ZERO
- else
- Time.now + timeish
- end
+ # Assumes timeish is a relative time, and converts its to an absolute,
+ # with following exceptions:
+ #
+ # * if timish is one of the TimeConsts.TimeSpec constants the value is
+ # preserved.
+ # * timish < 0 => TimeConsts.INFINITE_FUTURE
+ # * timish == 0 => TimeConsts.ZERO
+ #
+ # @param timeish [Number|TimeSpec]
+ # @return timeish [Number|TimeSpec]
+ def from_relative_time(timeish)
+ if timeish.is_a? TimeSpec
+ timeish
+ elsif timeish.nil?
+ TimeConsts::ZERO
+ elsif !timeish.is_a? Numeric
+ fail(TypeError,
+ "Cannot make an absolute deadline from #{timeish.inspect}")
+ elsif timeish < 0
+ TimeConsts::INFINITE_FUTURE
+ elsif timeish == 0
+ TimeConsts::ZERO
+ else
+ Time.now + timeish
end
-
- module_function :from_relative_time
end
+
+ module_function :from_relative_time
end
end
end
diff --git a/src/ruby/lib/grpc/errors.rb b/src/ruby/lib/grpc/errors.rb
index 70a92bfed7..c2356b7004 100644
--- a/src/ruby/lib/grpc/errors.rb
+++ b/src/ruby/lib/grpc/errors.rb
@@ -29,35 +29,33 @@
require 'grpc'
-module Google
- # Google::RPC contains the General RPC module.
- module RPC
- # OutOfTime is an exception class that indicates that an RPC exceeded its
- # deadline.
- OutOfTime = Class.new(StandardError)
+# GRPC contains the General RPC module.
+module GRPC
+ # OutOfTime is an exception class that indicates that an RPC exceeded its
+ # deadline.
+ OutOfTime = Class.new(StandardError)
- # BadStatus is an exception class that indicates that an error occurred at
- # either end of a GRPC connection. When raised, it indicates that a status
- # error should be returned to the other end of a GRPC connection; when
- # caught it means that this end received a status error.
- class BadStatus < StandardError
- attr_reader :code, :details
+ # BadStatus is an exception class that indicates that an error occurred at
+ # either end of a GRPC connection. When raised, it indicates that a status
+ # error should be returned to the other end of a GRPC connection; when
+ # caught it means that this end received a status error.
+ class BadStatus < StandardError
+ attr_reader :code, :details
- # @param code [Numeric] the status code
- # @param details [String] the details of the exception
- def initialize(code, details = 'unknown cause')
- super("#{code}:#{details}")
- @code = code
- @details = details
- end
+ # @param code [Numeric] the status code
+ # @param details [String] the details of the exception
+ def initialize(code, details = 'unknown cause')
+ super("#{code}:#{details}")
+ @code = code
+ @details = details
+ end
- # Converts the exception to a GRPC::Status for use in the networking
- # wrapper layer.
- #
- # @return [Status] with the same code and details
- def to_status
- Status.new(code, details)
- end
+ # Converts the exception to a GRPC::Status for use in the networking
+ # wrapper layer.
+ #
+ # @return [Status] with the same code and details
+ def to_status
+ Status.new(code, details)
end
end
end
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 6c2b6e91c2..5a4f129dce 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -36,502 +36,500 @@ def assert_event_type(ev, want)
fail "Unexpected rpc event: got #{got}, want #{want}" unless got == want
end
-module Google
- # Google::RPC contains the General RPC module.
- module RPC
- # The ActiveCall class provides simple methods for sending marshallable
- # data to a call
- class ActiveCall
- include Core::CompletionType
- include Core::StatusCodes
- include Core::TimeConsts
- attr_reader(:deadline)
-
- # client_invoke begins a client invocation.
- #
- # Flow Control note: this blocks until flow control accepts that client
- # request can go ahead.
- #
- # deadline is the absolute deadline for the call.
- #
- # == Keyword Arguments ==
- # any keyword arguments are treated as metadata to be sent to the server
- # if a keyword value is a list, multiple metadata for it's key are sent
- #
- # @param call [Call] a call on which to start and invocation
- # @param q [CompletionQueue] the completion queue
- # @param deadline [Fixnum,TimeSpec] the deadline
- def self.client_invoke(call, q, _deadline, **kw)
- fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
- unless q.is_a? Core::CompletionQueue
- fail(ArgumentError, 'not a CompletionQueue')
- end
- call.add_metadata(kw) if kw.length > 0
- client_metadata_read = Object.new
- finished_tag = Object.new
- call.invoke(q, client_metadata_read, finished_tag)
- [finished_tag, client_metadata_read]
- end
-
- # Creates an ActiveCall.
- #
- # ActiveCall should only be created after a call is accepted. That
- # means different things on a client and a server. On the client, the
- # call is accepted after calling call.invoke. On the server, this is
- # after call.accept.
- #
- # #initialize cannot determine if the call is accepted or not; so if a
- # call that's not accepted is used here, the error won't be visible until
- # the ActiveCall methods are called.
- #
- # deadline is the absolute deadline for the call.
- #
- # @param call [Call] the call used by the ActiveCall
- # @param q [CompletionQueue] the completion queue used to accept
- # the call
- # @param marshal [Function] f(obj)->string that marshal requests
- # @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Fixnum] the deadline for the call to complete
- # @param finished_tag [Object] the object used as the call's finish tag,
- # if the call has begun
- # @param read_metadata_tag [Object] the object used as the call's finish
- # tag, if the call has begun
- # @param started [true|false] indicates if the call has begun
- def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
- read_metadata_tag: nil, started: true)
- fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
- unless q.is_a? Core::CompletionQueue
- fail(ArgumentError, 'not a CompletionQueue')
- end
- @call = call
- @cq = q
- @deadline = deadline
- @finished_tag = finished_tag
- @read_metadata_tag = read_metadata_tag
- @marshal = marshal
- @started = started
- @unmarshal = unmarshal
+# GRPC contains the General RPC module.
+module GRPC
+ # The ActiveCall class provides simple methods for sending marshallable
+ # data to a call
+ class ActiveCall
+ include Core::CompletionType
+ include Core::StatusCodes
+ include Core::TimeConsts
+ attr_reader(:deadline)
+
+ # client_invoke begins a client invocation.
+ #
+ # Flow Control note: this blocks until flow control accepts that client
+ # request can go ahead.
+ #
+ # deadline is the absolute deadline for the call.
+ #
+ # == Keyword Arguments ==
+ # any keyword arguments are treated as metadata to be sent to the server
+ # if a keyword value is a list, multiple metadata for it's key are sent
+ #
+ # @param call [Call] a call on which to start and invocation
+ # @param q [CompletionQueue] the completion queue
+ # @param deadline [Fixnum,TimeSpec] the deadline
+ def self.client_invoke(call, q, _deadline, **kw)
+ fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
+ unless q.is_a? Core::CompletionQueue
+ fail(ArgumentError, 'not a CompletionQueue')
end
+ call.add_metadata(kw) if kw.length > 0
+ client_metadata_read = Object.new
+ finished_tag = Object.new
+ call.invoke(q, client_metadata_read, finished_tag)
+ [finished_tag, client_metadata_read]
+ end
- # Obtains the status of the call.
- #
- # this value is nil until the call completes
- # @return this call's status
- def status
- @call.status
+ # Creates an ActiveCall.
+ #
+ # ActiveCall should only be created after a call is accepted. That
+ # means different things on a client and a server. On the client, the
+ # call is accepted after calling call.invoke. On the server, this is
+ # after call.accept.
+ #
+ # #initialize cannot determine if the call is accepted or not; so if a
+ # call that's not accepted is used here, the error won't be visible until
+ # the ActiveCall methods are called.
+ #
+ # deadline is the absolute deadline for the call.
+ #
+ # @param call [Call] the call used by the ActiveCall
+ # @param q [CompletionQueue] the completion queue used to accept
+ # the call
+ # @param marshal [Function] f(obj)->string that marshal requests
+ # @param unmarshal [Function] f(string)->obj that unmarshals responses
+ # @param deadline [Fixnum] the deadline for the call to complete
+ # @param finished_tag [Object] the object used as the call's finish tag,
+ # if the call has begun
+ # @param read_metadata_tag [Object] the object used as the call's finish
+ # tag, if the call has begun
+ # @param started [true|false] indicates if the call has begun
+ def initialize(call, q, marshal, unmarshal, deadline, finished_tag: nil,
+ read_metadata_tag: nil, started: true)
+ fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
+ unless q.is_a? Core::CompletionQueue
+ fail(ArgumentError, 'not a CompletionQueue')
end
+ @call = call
+ @cq = q
+ @deadline = deadline
+ @finished_tag = finished_tag
+ @read_metadata_tag = read_metadata_tag
+ @marshal = marshal
+ @started = started
+ @unmarshal = unmarshal
+ end
- # Obtains the metadata of the call.
- #
- # At the start of the call this will be nil. During the call this gets
- # some values as soon as the other end of the connection acknowledges the
- # request.
- #
- # @return this calls's metadata
- def metadata
- @call.metadata
- end
+ # Obtains the status of the call.
+ #
+ # this value is nil until the call completes
+ # @return this call's status
+ def status
+ @call.status
+ end
- # Cancels the call.
- #
- # Cancels the call. The call does not return any result, but once this it
- # has been called, the call should eventually terminate. Due to potential
- # races between the execution of the cancel and the in-flight request, the
- # result of the call after calling #cancel is indeterminate:
- #
- # - the call may terminate with a BadStatus exception, with code=CANCELLED
- # - the call may terminate with OK Status, and return a response
- # - the call may terminate with a different BadStatus exception if that
- # was happening
- def cancel
- @call.cancel
- end
+ # Obtains the metadata of the call.
+ #
+ # At the start of the call this will be nil. During the call this gets
+ # some values as soon as the other end of the connection acknowledges the
+ # request.
+ #
+ # @return this calls's metadata
+ def metadata
+ @call.metadata
+ end
- # indicates if the call is shutdown
- def shutdown
- @shutdown ||= false
- end
+ # Cancels the call.
+ #
+ # Cancels the call. The call does not return any result, but once this it
+ # has been called, the call should eventually terminate. Due to potential
+ # races between the execution of the cancel and the in-flight request, the
+ # result of the call after calling #cancel is indeterminate:
+ #
+ # - the call may terminate with a BadStatus exception, with code=CANCELLED
+ # - the call may terminate with OK Status, and return a response
+ # - the call may terminate with a different BadStatus exception if that
+ # was happening
+ def cancel
+ @call.cancel
+ end
- # indicates if the call is cancelled.
- def cancelled
- @cancelled ||= false
- end
+ # indicates if the call is shutdown
+ def shutdown
+ @shutdown ||= false
+ end
- # multi_req_view provides a restricted view of this ActiveCall for use
- # in a server client-streaming handler.
- def multi_req_view
- MultiReqView.new(self)
- end
+ # indicates if the call is cancelled.
+ def cancelled
+ @cancelled ||= false
+ end
- # single_req_view provides a restricted view of this ActiveCall for use in
- # a server request-response handler.
- def single_req_view
- SingleReqView.new(self)
- end
+ # multi_req_view provides a restricted view of this ActiveCall for use
+ # in a server client-streaming handler.
+ def multi_req_view
+ MultiReqView.new(self)
+ end
- # operation provides a restricted view of this ActiveCall for use as
- # a Operation.
- def operation
- Operation.new(self)
- end
+ # single_req_view provides a restricted view of this ActiveCall for use in
+ # a server request-response handler.
+ def single_req_view
+ SingleReqView.new(self)
+ end
- # writes_done indicates that all writes are completed.
- #
- # It blocks until the remote endpoint acknowledges by sending a FINISHED
- # event, unless assert_finished is set to false. Any calls to
- # #remote_send after this call will fail.
- #
- # @param assert_finished [true, false] when true(default), waits for
- # FINISHED.
- def writes_done(assert_finished = true)
- @call.writes_done(self)
- ev = @cq.pluck(self, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISH_ACCEPTED)
- logger.debug("Writes done: waiting for finish? #{assert_finished}")
- ensure
- ev.close
- end
+ # operation provides a restricted view of this ActiveCall for use as
+ # a Operation.
+ def operation
+ Operation.new(self)
+ end
- return unless assert_finished
- ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
- fail 'unexpected nil event' if ev.nil?
+ # writes_done indicates that all writes are completed.
+ #
+ # It blocks until the remote endpoint acknowledges by sending a FINISHED
+ # event, unless assert_finished is set to false. Any calls to
+ # #remote_send after this call will fail.
+ #
+ # @param assert_finished [true, false] when true(default), waits for
+ # FINISHED.
+ def writes_done(assert_finished = true)
+ @call.writes_done(self)
+ ev = @cq.pluck(self, INFINITE_FUTURE)
+ begin
+ assert_event_type(ev, FINISH_ACCEPTED)
+ logger.debug("Writes done: waiting for finish? #{assert_finished}")
+ ensure
ev.close
- @call.status
end
- # finished waits until the call is completed.
- #
- # It blocks until the remote endpoint acknowledges by sending a FINISHED
- # event.
- def finished
- ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
- begin
- fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
- if @call.metadata.nil?
- @call.metadata = ev.result.metadata
- else
- @call.metadata.merge!(ev.result.metadata)
- end
-
- if ev.result.code != Core::StatusCodes::OK
- fail BadStatus.new(ev.result.code, ev.result.details)
- end
- res = ev.result
- ensure
- ev.close
- end
- res
- end
+ return unless assert_finished
+ ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
+ fail 'unexpected nil event' if ev.nil?
+ ev.close
+ @call.status
+ end
- # remote_send sends a request to the remote endpoint.
- #
- # It blocks until the remote endpoint acknowledges by sending a
- # WRITE_ACCEPTED. req can be marshalled already.
- #
- # @param req [Object, String] the object to send or it's marshal form.
- # @param marshalled [false, true] indicates if the object is already
- # marshalled.
- def remote_send(req, marshalled = false)
- assert_queue_is_ready
- logger.debug("sending #{req.inspect}, marshalled? #{marshalled}")
- if marshalled
- payload = req
+ # finished waits until the call is completed.
+ #
+ # It blocks until the remote endpoint acknowledges by sending a FINISHED
+ # event.
+ def finished
+ ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
+ begin
+ fail "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
+ if @call.metadata.nil?
+ @call.metadata = ev.result.metadata
else
- payload = @marshal.call(req)
- end
- @call.start_write(Core::ByteBuffer.new(payload), self)
-
- # call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
- # until the flow control allows another send on this call.
- ev = @cq.pluck(self, INFINITE_FUTURE)
- begin
- assert_event_type(ev, WRITE_ACCEPTED)
- ensure
- ev.close
+ @call.metadata.merge!(ev.result.metadata)
end
- end
- # send_status sends a status to the remote endpoint
- #
- # @param code [int] the status code to send
- # @param details [String] details
- # @param assert_finished [true, false] when true(default), waits for
- # FINISHED.
- def send_status(code = OK, details = '', assert_finished = false)
- assert_queue_is_ready
- @call.start_write_status(code, details, self)
- ev = @cq.pluck(self, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISH_ACCEPTED)
- ensure
- ev.close
+ if ev.result.code != Core::StatusCodes::OK
+ fail BadStatus.new(ev.result.code, ev.result.details)
end
- logger.debug("Status sent: #{code}:'#{details}'")
- return finished if assert_finished
- nil
+ res = ev.result
+ ensure
+ ev.close
end
+ res
+ end
- # remote_read reads a response from the remote endpoint.
- #
- # It blocks until the remote endpoint sends a READ or FINISHED event. On
- # a READ, it returns the response after unmarshalling it. On
- # FINISHED, it returns nil if the status is OK, otherwise raising
- # BadStatus
- def remote_read
- if @call.metadata.nil? && !@read_metadata_tag.nil?
- ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE)
- assert_event_type(ev, CLIENT_METADATA_READ)
- @call.metadata = ev.result
- @read_metadata_tag = nil
- end
+ # remote_send sends a request to the remote endpoint.
+ #
+ # It blocks until the remote endpoint acknowledges by sending a
+ # WRITE_ACCEPTED. req can be marshalled already.
+ #
+ # @param req [Object, String] the object to send or it's marshal form.
+ # @param marshalled [false, true] indicates if the object is already
+ # marshalled.
+ def remote_send(req, marshalled = false)
+ assert_queue_is_ready
+ logger.debug("sending #{req.inspect}, marshalled? #{marshalled}")
+ if marshalled
+ payload = req
+ else
+ payload = @marshal.call(req)
+ end
+ @call.start_write(Core::ByteBuffer.new(payload), self)
+
+ # call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
+ # until the flow control allows another send on this call.
+ ev = @cq.pluck(self, INFINITE_FUTURE)
+ begin
+ assert_event_type(ev, WRITE_ACCEPTED)
+ ensure
+ ev.close
+ end
+ end
- @call.start_read(self)
- ev = @cq.pluck(self, INFINITE_FUTURE)
- begin
- assert_event_type(ev, READ)
- logger.debug("received req: #{ev.result.inspect}")
- unless ev.result.nil?
- logger.debug("received req.to_s: #{ev.result}")
- res = @unmarshal.call(ev.result.to_s)
- logger.debug("received_req (unmarshalled): #{res.inspect}")
- return res
- end
- ensure
- ev.close
- end
- logger.debug('found nil; the final response has been sent')
- nil
+ # send_status sends a status to the remote endpoint
+ #
+ # @param code [int] the status code to send
+ # @param details [String] details
+ # @param assert_finished [true, false] when true(default), waits for
+ # FINISHED.
+ def send_status(code = OK, details = '', assert_finished = false)
+ assert_queue_is_ready
+ @call.start_write_status(code, details, self)
+ ev = @cq.pluck(self, INFINITE_FUTURE)
+ begin
+ assert_event_type(ev, FINISH_ACCEPTED)
+ ensure
+ ev.close
end
+ logger.debug("Status sent: #{code}:'#{details}'")
+ return finished if assert_finished
+ nil
+ end
- # each_remote_read passes each response to the given block or returns an
- # enumerator the responses if no block is given.
- #
- # == Enumerator ==
- #
- # * #next blocks until the remote endpoint sends a READ or FINISHED
- # * for each read, enumerator#next yields the response
- # * on status
- # * if it's is OK, enumerator#next raises StopException
- # * if is not OK, enumerator#next raises RuntimeException
- #
- # == Block ==
- #
- # * if provided it is executed for each response
- # * the call blocks until no more responses are provided
- #
- # @return [Enumerator] if no block was given
- def each_remote_read
- return enum_for(:each_remote_read) unless block_given?
- loop do
- resp = remote_read
- break if resp.is_a? Struct::Status # is an OK status
- break if resp.nil? # the last response was received
- yield resp
- end
+ # remote_read reads a response from the remote endpoint.
+ #
+ # It blocks until the remote endpoint sends a READ or FINISHED event. On
+ # a READ, it returns the response after unmarshalling it. On
+ # FINISHED, it returns nil if the status is OK, otherwise raising
+ # BadStatus
+ def remote_read
+ if @call.metadata.nil? && !@read_metadata_tag.nil?
+ ev = @cq.pluck(@read_metadata_tag, INFINITE_FUTURE)
+ assert_event_type(ev, CLIENT_METADATA_READ)
+ @call.metadata = ev.result
+ @read_metadata_tag = nil
end
- # each_remote_read_then_finish passes each response to the given block or
- # returns an enumerator of the responses if no block is given.
- #
- # It is like each_remote_read, but it blocks on finishing on detecting
- # the final message.
- #
- # == Enumerator ==
- #
- # * #next blocks until the remote endpoint sends a READ or FINISHED
- # * for each read, enumerator#next yields the response
- # * on status
- # * if it's is OK, enumerator#next raises StopException
- # * if is not OK, enumerator#next raises RuntimeException
- #
- # == Block ==
- #
- # * if provided it is executed for each response
- # * the call blocks until no more responses are provided
- #
- # @return [Enumerator] if no block was given
- def each_remote_read_then_finish
- return enum_for(:each_remote_read_then_finish) unless block_given?
- loop do
- resp = remote_read
- break if resp.is_a? Struct::Status # is an OK status
- if resp.nil? # the last response was received, but not finished yet
- finished
- break
- end
- yield resp
+ @call.start_read(self)
+ ev = @cq.pluck(self, INFINITE_FUTURE)
+ begin
+ assert_event_type(ev, READ)
+ logger.debug("received req: #{ev.result.inspect}")
+ unless ev.result.nil?
+ logger.debug("received req.to_s: #{ev.result}")
+ res = @unmarshal.call(ev.result.to_s)
+ logger.debug("received_req (unmarshalled): #{res.inspect}")
+ return res
end
+ ensure
+ ev.close
end
+ logger.debug('found nil; the final response has been sent')
+ nil
+ end
- # request_response sends a request to a GRPC server, and returns the
- # response.
- #
- # == Keyword Arguments ==
- # any keyword arguments are treated as metadata to be sent to the server
- # if a keyword value is a list, multiple metadata for it's key are sent
- #
- # @param req [Object] the request sent to the server
- # @return [Object] the response received from the server
- def request_response(req, **kw)
- start_call(**kw) unless @started
- remote_send(req)
- writes_done(false)
- response = remote_read
- finished unless response.is_a? Struct::Status
- response
+ # each_remote_read passes each response to the given block or returns an
+ # enumerator the responses if no block is given.
+ #
+ # == Enumerator ==
+ #
+ # * #next blocks until the remote endpoint sends a READ or FINISHED
+ # * for each read, enumerator#next yields the response
+ # * on status
+ # * if it's is OK, enumerator#next raises StopException
+ # * if is not OK, enumerator#next raises RuntimeException
+ #
+ # == Block ==
+ #
+ # * if provided it is executed for each response
+ # * the call blocks until no more responses are provided
+ #
+ # @return [Enumerator] if no block was given
+ def each_remote_read
+ return enum_for(:each_remote_read) unless block_given?
+ loop do
+ resp = remote_read
+ break if resp.is_a? Struct::Status # is an OK status
+ break if resp.nil? # the last response was received
+ yield resp
end
+ end
- # client_streamer sends a stream of requests to a GRPC server, and
- # returns a single response.
- #
- # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
- # #each enumeration protocol. In the simplest case, requests will be an
- # array of marshallable objects; in typical case it will be an Enumerable
- # that allows dynamic construction of the marshallable objects.
- #
- # == Keyword Arguments ==
- # any keyword arguments are treated as metadata to be sent to the server
- # if a keyword value is a list, multiple metadata for it's key are sent
- #
- # @param requests [Object] an Enumerable of requests to send
- # @return [Object] the response received from the server
- def client_streamer(requests, **kw)
- start_call(**kw) unless @started
- requests.each { |r| remote_send(r) }
- writes_done(false)
- response = remote_read
- finished unless response.is_a? Struct::Status
- response
+ # each_remote_read_then_finish passes each response to the given block or
+ # returns an enumerator of the responses if no block is given.
+ #
+ # It is like each_remote_read, but it blocks on finishing on detecting
+ # the final message.
+ #
+ # == Enumerator ==
+ #
+ # * #next blocks until the remote endpoint sends a READ or FINISHED
+ # * for each read, enumerator#next yields the response
+ # * on status
+ # * if it's is OK, enumerator#next raises StopException
+ # * if is not OK, enumerator#next raises RuntimeException
+ #
+ # == Block ==
+ #
+ # * if provided it is executed for each response
+ # * the call blocks until no more responses are provided
+ #
+ # @return [Enumerator] if no block was given
+ def each_remote_read_then_finish
+ return enum_for(:each_remote_read_then_finish) unless block_given?
+ loop do
+ resp = remote_read
+ break if resp.is_a? Struct::Status # is an OK status
+ if resp.nil? # the last response was received, but not finished yet
+ finished
+ break
+ end
+ yield resp
end
+ end
- # server_streamer sends one request to the GRPC server, which yields a
- # stream of responses.
- #
- # responses provides an enumerator over the streamed responses, i.e. it
- # follows Ruby's #each iteration protocol. The enumerator blocks while
- # waiting for each response, stops when the server signals that no
- # further responses will be supplied. If the implicit block is provided,
- # it is executed with each response as the argument and no result is
- # returned.
- #
- # == Keyword Arguments ==
- # any keyword arguments are treated as metadata to be sent to the server
- # if a keyword value is a list, multiple metadata for it's key are sent
- # any keyword arguments are treated as metadata to be sent to the server.
- #
- # @param req [Object] the request sent to the server
- # @return [Enumerator|nil] a response Enumerator
- def server_streamer(req, **kw)
- start_call(**kw) unless @started
- remote_send(req)
- writes_done(false)
- replies = enum_for(:each_remote_read_then_finish)
- return replies unless block_given?
- replies.each { |r| yield r }
- end
+ # request_response sends a request to a GRPC server, and returns the
+ # response.
+ #
+ # == Keyword Arguments ==
+ # any keyword arguments are treated as metadata to be sent to the server
+ # if a keyword value is a list, multiple metadata for it's key are sent
+ #
+ # @param req [Object] the request sent to the server
+ # @return [Object] the response received from the server
+ def request_response(req, **kw)
+ start_call(**kw) unless @started
+ remote_send(req)
+ writes_done(false)
+ response = remote_read
+ finished unless response.is_a? Struct::Status
+ response
+ end
- # bidi_streamer sends a stream of requests to the GRPC server, and yields
- # a stream of responses.
- #
- # This method takes an Enumerable of requests, and returns and enumerable
- # of responses.
- #
- # == requests ==
- #
- # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
- # #each enumeration protocol. In the simplest case, requests will be an
- # array of marshallable objects; in typical case it will be an
- # Enumerable that allows dynamic construction of the marshallable
- # objects.
- #
- # == responses ==
- #
- # This is an enumerator of responses. I.e, its #next method blocks
- # waiting for the next response. Also, if at any point the block needs
- # to consume all the remaining responses, this can be done using #each or
- # #collect. Calling #each or #collect should only be done if
- # the_call#writes_done has been called, otherwise the block will loop
- # forever.
- #
- # == Keyword Arguments ==
- # any keyword arguments are treated as metadata to be sent to the server
- # if a keyword value is a list, multiple metadata for it's key are sent
- #
- # @param requests [Object] an Enumerable of requests to send
- # @return [Enumerator, nil] a response Enumerator
- def bidi_streamer(requests, **kw, &blk)
- start_call(**kw) unless @started
- bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
- @finished_tag)
- bd.run_on_client(requests, &blk)
- end
+ # client_streamer sends a stream of requests to a GRPC server, and
+ # returns a single response.
+ #
+ # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
+ # #each enumeration protocol. In the simplest case, requests will be an
+ # array of marshallable objects; in typical case it will be an Enumerable
+ # that allows dynamic construction of the marshallable objects.
+ #
+ # == Keyword Arguments ==
+ # any keyword arguments are treated as metadata to be sent to the server
+ # if a keyword value is a list, multiple metadata for it's key are sent
+ #
+ # @param requests [Object] an Enumerable of requests to send
+ # @return [Object] the response received from the server
+ def client_streamer(requests, **kw)
+ start_call(**kw) unless @started
+ requests.each { |r| remote_send(r) }
+ writes_done(false)
+ response = remote_read
+ finished unless response.is_a? Struct::Status
+ response
+ end
- # run_server_bidi orchestrates a BiDi stream processing on a server.
- #
- # N.B. gen_each_reply is a func(Enumerable<Requests>)
- #
- # It takes an enumerable of requests as an arg, in case there is a
- # relationship between the stream of requests and the stream of replies.
- #
- # This does not mean that must necessarily be one. E.g, the replies
- # produced by gen_each_reply could ignore the received_msgs
- #
- # @param gen_each_reply [Proc] generates the BiDi stream replies
- def run_server_bidi(gen_each_reply)
- bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
- @finished_tag)
- bd.run_on_server(gen_each_reply)
- end
+ # server_streamer sends one request to the GRPC server, which yields a
+ # stream of responses.
+ #
+ # responses provides an enumerator over the streamed responses, i.e. it
+ # follows Ruby's #each iteration protocol. The enumerator blocks while
+ # waiting for each response, stops when the server signals that no
+ # further responses will be supplied. If the implicit block is provided,
+ # it is executed with each response as the argument and no result is
+ # returned.
+ #
+ # == Keyword Arguments ==
+ # any keyword arguments are treated as metadata to be sent to the server
+ # if a keyword value is a list, multiple metadata for it's key are sent
+ # any keyword arguments are treated as metadata to be sent to the server.
+ #
+ # @param req [Object] the request sent to the server
+ # @return [Enumerator|nil] a response Enumerator
+ def server_streamer(req, **kw)
+ start_call(**kw) unless @started
+ remote_send(req)
+ writes_done(false)
+ replies = enum_for(:each_remote_read_then_finish)
+ return replies unless block_given?
+ replies.each { |r| yield r }
+ end
+
+ # bidi_streamer sends a stream of requests to the GRPC server, and yields
+ # a stream of responses.
+ #
+ # This method takes an Enumerable of requests, and returns and enumerable
+ # of responses.
+ #
+ # == requests ==
+ #
+ # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
+ # #each enumeration protocol. In the simplest case, requests will be an
+ # array of marshallable objects; in typical case it will be an
+ # Enumerable that allows dynamic construction of the marshallable
+ # objects.
+ #
+ # == responses ==
+ #
+ # This is an enumerator of responses. I.e, its #next method blocks
+ # waiting for the next response. Also, if at any point the block needs
+ # to consume all the remaining responses, this can be done using #each or
+ # #collect. Calling #each or #collect should only be done if
+ # the_call#writes_done has been called, otherwise the block will loop
+ # forever.
+ #
+ # == Keyword Arguments ==
+ # any keyword arguments are treated as metadata to be sent to the server
+ # if a keyword value is a list, multiple metadata for it's key are sent
+ #
+ # @param requests [Object] an Enumerable of requests to send
+ # @return [Enumerator, nil] a response Enumerator
+ def bidi_streamer(requests, **kw, &blk)
+ start_call(**kw) unless @started
+ bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
+ @finished_tag)
+ bd.run_on_client(requests, &blk)
+ end
- private
+ # run_server_bidi orchestrates a BiDi stream processing on a server.
+ #
+ # N.B. gen_each_reply is a func(Enumerable<Requests>)
+ #
+ # It takes an enumerable of requests as an arg, in case there is a
+ # relationship between the stream of requests and the stream of replies.
+ #
+ # This does not mean that must necessarily be one. E.g, the replies
+ # produced by gen_each_reply could ignore the received_msgs
+ #
+ # @param gen_each_reply [Proc] generates the BiDi stream replies
+ def run_server_bidi(gen_each_reply)
+ bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline,
+ @finished_tag)
+ bd.run_on_server(gen_each_reply)
+ end
- def start_call(**kw)
- tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
- @finished_tag, @read_metadata_tag = tags
- @started = true
- end
+ private
- def self.view_class(*visible_methods)
- Class.new do
- extend ::Forwardable
- def_delegators :@wrapped, *visible_methods
+ def start_call(**kw)
+ tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
+ @finished_tag, @read_metadata_tag = tags
+ @started = true
+ end
- # @param wrapped [ActiveCall] the call whose methods are shielded
- def initialize(wrapped)
- @wrapped = wrapped
- end
+ def self.view_class(*visible_methods)
+ Class.new do
+ extend ::Forwardable
+ def_delegators :@wrapped, *visible_methods
+
+ # @param wrapped [ActiveCall] the call whose methods are shielded
+ def initialize(wrapped)
+ @wrapped = wrapped
end
end
+ end
- # SingleReqView limits access to an ActiveCall's methods for use in server
- # handlers that receive just one request.
- SingleReqView = view_class(:cancelled, :deadline)
-
- # MultiReqView limits access to an ActiveCall's methods for use in
- # server client_streamer handlers.
- MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg,
- :each_remote_read)
-
- # Operation limits access to an ActiveCall's methods for use as
- # a Operation on the client.
- Operation = view_class(:cancel, :cancelled, :deadline, :execute,
- :metadata, :status)
-
- # confirms that no events are enqueued, and that the queue is not
- # shutdown.
- def assert_queue_is_ready
- ev = nil
- begin
- ev = @cq.pluck(self, ZERO)
- fail "unexpected event #{ev.inspect}" unless ev.nil?
- rescue OutOfTime
- logging.debug('timed out waiting for next event')
- # expected, nothing should be on the queue and the deadline was ZERO,
- # except things using another tag
- ensure
- ev.close unless ev.nil?
- end
+ # SingleReqView limits access to an ActiveCall's methods for use in server
+ # handlers that receive just one request.
+ SingleReqView = view_class(:cancelled, :deadline)
+
+ # MultiReqView limits access to an ActiveCall's methods for use in
+ # server client_streamer handlers.
+ MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg,
+ :each_remote_read)
+
+ # Operation limits access to an ActiveCall's methods for use as
+ # a Operation on the client.
+ Operation = view_class(:cancel, :cancelled, :deadline, :execute,
+ :metadata, :status)
+
+ # confirms that no events are enqueued, and that the queue is not
+ # shutdown.
+ def assert_queue_is_ready
+ ev = nil
+ begin
+ ev = @cq.pluck(self, ZERO)
+ fail "unexpected event #{ev.inspect}" unless ev.nil?
+ rescue OutOfTime
+ logging.debug('timed out waiting for next event')
+ # expected, nothing should be on the queue and the deadline was ZERO,
+ # except things using another tag
+ ensure
+ ev.close unless ev.nil?
end
end
end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 099d57151c..8350690fdf 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -36,186 +36,184 @@ def assert_event_type(ev, want)
fail("Unexpected rpc event: got #{got}, want #{want}") unless got == want
end
-module Google
- # Google::RPC contains the General RPC module.
- module RPC
- # The BiDiCall class orchestrates exection of a BiDi stream on a client or
- # server.
- class BidiCall
- include Core::CompletionType
- include Core::StatusCodes
- include Core::TimeConsts
+# GRPC contains the General RPC module.
+module GRPC
+ # The BiDiCall class orchestrates exection of a BiDi stream on a client or
+ # server.
+ class BidiCall
+ include Core::CompletionType
+ include Core::StatusCodes
+ include Core::TimeConsts
- # Creates a BidiCall.
- #
- # BidiCall should only be created after a call is accepted. That means
- # different things on a client and a server. On the client, the call is
- # accepted after call.invoke. On the server, this is after call.accept.
- #
- # #initialize cannot determine if the call is accepted or not; so if a
- # call that's not accepted is used here, the error won't be visible until
- # the BidiCall#run is called.
- #
- # deadline is the absolute deadline for the call.
- #
- # @param call [Call] the call used by the ActiveCall
- # @param q [CompletionQueue] the completion queue used to accept
- # the call
- # @param marshal [Function] f(obj)->string that marshal requests
- # @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Fixnum] the deadline for the call to complete
- # @param finished_tag [Object] the object used as the call's finish tag,
- def initialize(call, q, marshal, unmarshal, deadline, finished_tag)
- fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
- unless q.is_a? Core::CompletionQueue
- fail(ArgumentError, 'not a CompletionQueue')
- end
- @call = call
- @cq = q
- @deadline = deadline
- @finished_tag = finished_tag
- @marshal = marshal
- @readq = Queue.new
- @unmarshal = unmarshal
+ # Creates a BidiCall.
+ #
+ # BidiCall should only be created after a call is accepted. That means
+ # different things on a client and a server. On the client, the call is
+ # accepted after call.invoke. On the server, this is after call.accept.
+ #
+ # #initialize cannot determine if the call is accepted or not; so if a
+ # call that's not accepted is used here, the error won't be visible until
+ # the BidiCall#run is called.
+ #
+ # deadline is the absolute deadline for the call.
+ #
+ # @param call [Call] the call used by the ActiveCall
+ # @param q [CompletionQueue] the completion queue used to accept
+ # the call
+ # @param marshal [Function] f(obj)->string that marshal requests
+ # @param unmarshal [Function] f(string)->obj that unmarshals responses
+ # @param deadline [Fixnum] the deadline for the call to complete
+ # @param finished_tag [Object] the object used as the call's finish tag,
+ def initialize(call, q, marshal, unmarshal, deadline, finished_tag)
+ fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
+ unless q.is_a? Core::CompletionQueue
+ fail(ArgumentError, 'not a CompletionQueue')
end
+ @call = call
+ @cq = q
+ @deadline = deadline
+ @finished_tag = finished_tag
+ @marshal = marshal
+ @readq = Queue.new
+ @unmarshal = unmarshal
+ end
- # Begins orchestration of the Bidi stream for a client sending requests.
- #
- # The method either returns an Enumerator of the responses, or accepts a
- # block that can be invoked with each response.
- #
- # @param requests the Enumerable of requests to send
- # @return an Enumerator of requests to yield
- def run_on_client(requests, &blk)
- enq_th = start_write_loop(requests)
- loop_th = start_read_loop
- replies = each_queued_msg
- return replies if blk.nil?
- replies.each { |r| blk.call(r) }
- enq_th.join
- loop_th.join
- end
+ # Begins orchestration of the Bidi stream for a client sending requests.
+ #
+ # The method either returns an Enumerator of the responses, or accepts a
+ # block that can be invoked with each response.
+ #
+ # @param requests the Enumerable of requests to send
+ # @return an Enumerator of requests to yield
+ def run_on_client(requests, &blk)
+ enq_th = start_write_loop(requests)
+ loop_th = start_read_loop
+ replies = each_queued_msg
+ return replies if blk.nil?
+ replies.each { |r| blk.call(r) }
+ enq_th.join
+ loop_th.join
+ end
- # Begins orchestration of the Bidi stream for a server generating replies.
- #
- # N.B. gen_each_reply is a func(Enumerable<Requests>)
- #
- # It takes an enumerable of requests as an arg, in case there is a
- # relationship between the stream of requests and the stream of replies.
- #
- # This does not mean that must necessarily be one. E.g, the replies
- # produced by gen_each_reply could ignore the received_msgs
- #
- # @param gen_each_reply [Proc] generates the BiDi stream replies.
- def run_on_server(gen_each_reply)
- replys = gen_each_reply.call(each_queued_msg)
- enq_th = start_write_loop(replys, is_client: false)
- loop_th = start_read_loop
- loop_th.join
- enq_th.join
- end
+ # Begins orchestration of the Bidi stream for a server generating replies.
+ #
+ # N.B. gen_each_reply is a func(Enumerable<Requests>)
+ #
+ # It takes an enumerable of requests as an arg, in case there is a
+ # relationship between the stream of requests and the stream of replies.
+ #
+ # This does not mean that must necessarily be one. E.g, the replies
+ # produced by gen_each_reply could ignore the received_msgs
+ #
+ # @param gen_each_reply [Proc] generates the BiDi stream replies.
+ def run_on_server(gen_each_reply)
+ replys = gen_each_reply.call(each_queued_msg)
+ enq_th = start_write_loop(replys, is_client: false)
+ loop_th = start_read_loop
+ loop_th.join
+ enq_th.join
+ end
- private
+ private
- END_OF_READS = :end_of_reads
- END_OF_WRITES = :end_of_writes
+ END_OF_READS = :end_of_reads
+ END_OF_WRITES = :end_of_writes
- # each_queued_msg yields each message on this instances readq
- #
- # - messages are added to the readq by #read_loop
- # - iteration ends when the instance itself is added
- def each_queued_msg
- return enum_for(:each_queued_msg) unless block_given?
- count = 0
- loop do
- logger.debug("each_queued_msg: msg##{count}")
- count += 1
- req = @readq.pop
- throw req if req.is_a? StandardError
- break if req.equal?(END_OF_READS)
- yield req
- end
+ # each_queued_msg yields each message on this instances readq
+ #
+ # - messages are added to the readq by #read_loop
+ # - iteration ends when the instance itself is added
+ def each_queued_msg
+ return enum_for(:each_queued_msg) unless block_given?
+ count = 0
+ loop do
+ logger.debug("each_queued_msg: msg##{count}")
+ count += 1
+ req = @readq.pop
+ throw req if req.is_a? StandardError
+ break if req.equal?(END_OF_READS)
+ yield req
end
+ end
- # during bidi-streaming, read the requests to send from a separate thread
- # read so that read_loop does not block waiting for requests to read.
- def start_write_loop(requests, is_client: true)
- Thread.new do # TODO: run on a thread pool
- write_tag = Object.new
- begin
- count = 0
- requests.each do |req|
- count += 1
- payload = @marshal.call(req)
- @call.start_write(Core::ByteBuffer.new(payload), write_tag)
- ev = @cq.pluck(write_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, WRITE_ACCEPTED)
- ensure
- ev.close
- end
+ # during bidi-streaming, read the requests to send from a separate thread
+ # read so that read_loop does not block waiting for requests to read.
+ def start_write_loop(requests, is_client: true)
+ Thread.new do # TODO: run on a thread pool
+ write_tag = Object.new
+ begin
+ count = 0
+ requests.each do |req|
+ count += 1
+ payload = @marshal.call(req)
+ @call.start_write(Core::ByteBuffer.new(payload), write_tag)
+ ev = @cq.pluck(write_tag, INFINITE_FUTURE)
+ begin
+ assert_event_type(ev, WRITE_ACCEPTED)
+ ensure
+ ev.close
end
- if is_client
- @call.writes_done(write_tag)
- ev = @cq.pluck(write_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISH_ACCEPTED)
- ensure
- ev.close
- end
- logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
- ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, FINISHED)
- ensure
- ev.close
- end
- logger.debug('bidi-client: finished received')
+ end
+ if is_client
+ @call.writes_done(write_tag)
+ ev = @cq.pluck(write_tag, INFINITE_FUTURE)
+ begin
+ assert_event_type(ev, FINISH_ACCEPTED)
+ ensure
+ ev.close
end
- rescue StandardError => e
- logger.warn('bidi: write_loop failed')
- logger.warn(e)
+ logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
+ ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
+ begin
+ assert_event_type(ev, FINISHED)
+ ensure
+ ev.close
+ end
+ logger.debug('bidi-client: finished received')
end
+ rescue StandardError => e
+ logger.warn('bidi: write_loop failed')
+ logger.warn(e)
end
end
+ end
- # starts the read loop
- def start_read_loop
- Thread.new do
- begin
- read_tag = Object.new
- count = 0
-
- # queue the initial read before beginning the loop
- loop do
- logger.debug("waiting for read #{count}")
- count += 1
- @call.start_read(read_tag)
- ev = @cq.pluck(read_tag, INFINITE_FUTURE)
- begin
- assert_event_type(ev, READ)
+ # starts the read loop
+ def start_read_loop
+ Thread.new do
+ begin
+ read_tag = Object.new
+ count = 0
- # handle the next event.
- if ev.result.nil?
- @readq.push(END_OF_READS)
- logger.debug('done reading!')
- break
- end
+ # queue the initial read before beginning the loop
+ loop do
+ logger.debug("waiting for read #{count}")
+ count += 1
+ @call.start_read(read_tag)
+ ev = @cq.pluck(read_tag, INFINITE_FUTURE)
+ begin
+ assert_event_type(ev, READ)
- # push the latest read onto the queue and continue reading
- logger.debug("received req: #{ev.result}")
- res = @unmarshal.call(ev.result.to_s)
- @readq.push(res)
- ensure
- ev.close
+ # handle the next event.
+ if ev.result.nil?
+ @readq.push(END_OF_READS)
+ logger.debug('done reading!')
+ break
end
- end
- rescue StandardError => e
- logger.warn('bidi: read_loop failed')
- logger.warn(e)
- @readq.push(e) # let each_queued_msg terminate with this error
+ # push the latest read onto the queue and continue reading
+ logger.debug("received req: #{ev.result}")
+ res = @unmarshal.call(ev.result.to_s)
+ @readq.push(res)
+ ensure
+ ev.close
+ end
end
+
+ rescue StandardError => e
+ logger.warn('bidi: read_loop failed')
+ logger.warn(e)
+ @readq.push(e) # let each_queued_msg terminate with this error
end
end
end
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index 7e13de19ca..1a4ed58fa3 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -30,381 +30,379 @@
require 'grpc/generic/active_call'
require 'xray/thread_dump_signal_handler'
-module Google
- # Google::RPC contains the General RPC module.
- module RPC
- # ClientStub represents an endpoint used to send requests to GRPC servers.
- class ClientStub
- include Core::StatusCodes
+# GRPC contains the General RPC module.
+module GRPC
+ # ClientStub represents an endpoint used to send requests to GRPC servers.
+ class ClientStub
+ include Core::StatusCodes
- # Default deadline is 5 seconds.
- DEFAULT_DEADLINE = 5
+ # Default deadline is 5 seconds.
+ DEFAULT_DEADLINE = 5
- # Creates a new ClientStub.
- #
- # Minimally, a stub is created with the just the host of the gRPC service
- # it wishes to access, e.g.,
- #
- # my_stub = ClientStub.new(example.host.com:50505)
- #
- # Any arbitrary keyword arguments are treated as channel arguments used to
- # configure the RPC connection to the host.
- #
- # There are some specific keyword args that are not used to configure the
- # channel:
- #
- # - :channel_override
- # when present, this must be a pre-created GRPC::Channel. If it's
- # present the host and arbitrary keyword arg areignored, and the RPC
- # connection uses this channel.
- #
- # - :deadline
- # when present, this is the default deadline used for calls
- #
- # - :update_metadata
- # when present, this a func that takes a hash and returns a hash
- # it can be used to update metadata, i.e, remove, change or update
- # amend metadata values.
- #
- # @param host [String] the host the stub connects to
- # @param q [Core::CompletionQueue] used to wait for events
- # @param channel_override [Core::Channel] a pre-created channel
- # @param deadline [Number] the default deadline to use in requests
- # @param creds [Core::Credentials] the channel
- # @param update_metadata a func that updates metadata as described above
- # @param kw [KeywordArgs]the channel arguments
- def initialize(host, q,
- channel_override:nil,
- deadline: DEFAULT_DEADLINE,
- creds: nil,
- update_metadata: nil,
- **kw)
- unless q.is_a? Core::CompletionQueue
- fail(ArgumentError, 'not a CompletionQueue')
- end
- @queue = q
+ # Creates a new ClientStub.
+ #
+ # Minimally, a stub is created with the just the host of the gRPC service
+ # it wishes to access, e.g.,
+ #
+ # my_stub = ClientStub.new(example.host.com:50505)
+ #
+ # Any arbitrary keyword arguments are treated as channel arguments used to
+ # configure the RPC connection to the host.
+ #
+ # There are some specific keyword args that are not used to configure the
+ # channel:
+ #
+ # - :channel_override
+ # when present, this must be a pre-created GRPC::Channel. If it's
+ # present the host and arbitrary keyword arg areignored, and the RPC
+ # connection uses this channel.
+ #
+ # - :deadline
+ # when present, this is the default deadline used for calls
+ #
+ # - :update_metadata
+ # when present, this a func that takes a hash and returns a hash
+ # it can be used to update metadata, i.e, remove, change or update
+ # amend metadata values.
+ #
+ # @param host [String] the host the stub connects to
+ # @param q [Core::CompletionQueue] used to wait for events
+ # @param channel_override [Core::Channel] a pre-created channel
+ # @param deadline [Number] the default deadline to use in requests
+ # @param creds [Core::Credentials] the channel
+ # @param update_metadata a func that updates metadata as described above
+ # @param kw [KeywordArgs]the channel arguments
+ def initialize(host, q,
+ channel_override:nil,
+ deadline: DEFAULT_DEADLINE,
+ creds: nil,
+ update_metadata: nil,
+ **kw)
+ unless q.is_a? Core::CompletionQueue
+ fail(ArgumentError, 'not a CompletionQueue')
+ end
+ @queue = q
- # set the channel instance
- if !channel_override.nil?
- ch = channel_override
- fail(ArgumentError, 'not a Channel') unless ch.is_a? Core::Channel
+ # set the channel instance
+ if !channel_override.nil?
+ ch = channel_override
+ fail(ArgumentError, 'not a Channel') unless ch.is_a? Core::Channel
+ else
+ if creds.nil?
+ ch = Core::Channel.new(host, kw)
+ elsif !creds.is_a?(Core::Credentials)
+ fail(ArgumentError, 'not a Credentials')
else
- if creds.nil?
- ch = Core::Channel.new(host, kw)
- elsif !creds.is_a?(Core::Credentials)
- fail(ArgumentError, 'not a Credentials')
- else
- ch = Core::Channel.new(host, kw, creds)
- end
+ ch = Core::Channel.new(host, kw, creds)
end
- @ch = ch
-
- @update_metadata = nil
- unless update_metadata.nil?
- unless update_metadata.is_a? Proc
- fail(ArgumentError, 'update_metadata is not a Proc')
- end
- @update_metadata = update_metadata
- end
-
- @host = host
- @deadline = deadline
end
+ @ch = ch
- # request_response sends a request to a GRPC server, and returns the
- # response.
- #
- # == Flow Control ==
- # This is a blocking call.
- #
- # * it does not return until a response is received.
- #
- # * the requests is sent only when GRPC core's flow control allows it to
- # be sent.
- #
- # == Errors ==
- # An RuntimeError is raised if
- #
- # * the server responds with a non-OK status
- #
- # * the deadline is exceeded
- #
- # == Return Value ==
- #
- # If return_op is false, the call returns the response
- #
- # If return_op is true, the call returns an Operation, calling execute
- # on the Operation returns the response.
- #
- # == Keyword Args ==
- #
- # Unspecified keyword arguments are treated as metadata to be sent to the
- # server.
- #
- # @param method [String] the RPC method to call on the GRPC server
- # @param req [Object] the request sent to the server
- # @param marshal [Function] f(obj)->string that marshals requests
- # @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Numeric] (optional) the max completion time in seconds
- # @param return_op [true|false] return an Operation if true
- # @return [Object] the response received from the server
- def request_response(method, req, marshal, unmarshal, deadline = nil,
- return_op: false, **kw)
- c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
- md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
- return c.request_response(req, **md) unless return_op
-
- # return the operation view of the active_call; define #execute as a
- # new method for this instance that invokes #request_response.
- op = c.operation
- op.define_singleton_method(:execute) do
- c.request_response(req, **md)
+ @update_metadata = nil
+ unless update_metadata.nil?
+ unless update_metadata.is_a? Proc
+ fail(ArgumentError, 'update_metadata is not a Proc')
end
- op
+ @update_metadata = update_metadata
end
- # client_streamer sends a stream of requests to a GRPC server, and
- # returns a single response.
- #
- # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
- # #each enumeration protocol. In the simplest case, requests will be an
- # array of marshallable objects; in typical case it will be an Enumerable
- # that allows dynamic construction of the marshallable objects.
- #
- # == Flow Control ==
- # This is a blocking call.
- #
- # * it does not return until a response is received.
- #
- # * each requests is sent only when GRPC core's flow control allows it to
- # be sent.
- #
- # == Errors ==
- # An RuntimeError is raised if
- #
- # * the server responds with a non-OK status
- #
- # * the deadline is exceeded
- #
- # == Return Value ==
- #
- # If return_op is false, the call consumes the requests and returns
- # the response.
- #
- # If return_op is true, the call returns the response.
- #
- # == Keyword Args ==
- #
- # Unspecified keyword arguments are treated as metadata to be sent to the
- # server.
- #
- # @param method [String] the RPC method to call on the GRPC server
- # @param requests [Object] an Enumerable of requests to send
- # @param marshal [Function] f(obj)->string that marshals requests
- # @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Numeric] the max completion time in seconds
- # @param return_op [true|false] return an Operation if true
- # @return [Object|Operation] the response received from the server
- def client_streamer(method, requests, marshal, unmarshal, deadline = nil,
- return_op: false, **kw)
- c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
- md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
- return c.client_streamer(requests, **md) unless return_op
+ @host = host
+ @deadline = deadline
+ end
+
+ # request_response sends a request to a GRPC server, and returns the
+ # response.
+ #
+ # == Flow Control ==
+ # This is a blocking call.
+ #
+ # * it does not return until a response is received.
+ #
+ # * the requests is sent only when GRPC core's flow control allows it to
+ # be sent.
+ #
+ # == Errors ==
+ # An RuntimeError is raised if
+ #
+ # * the server responds with a non-OK status
+ #
+ # * the deadline is exceeded
+ #
+ # == Return Value ==
+ #
+ # If return_op is false, the call returns the response
+ #
+ # If return_op is true, the call returns an Operation, calling execute
+ # on the Operation returns the response.
+ #
+ # == Keyword Args ==
+ #
+ # Unspecified keyword arguments are treated as metadata to be sent to the
+ # server.
+ #
+ # @param method [String] the RPC method to call on the GRPC server
+ # @param req [Object] the request sent to the server
+ # @param marshal [Function] f(obj)->string that marshals requests
+ # @param unmarshal [Function] f(string)->obj that unmarshals responses
+ # @param deadline [Numeric] (optional) the max completion time in seconds
+ # @param return_op [true|false] return an Operation if true
+ # @return [Object] the response received from the server
+ def request_response(method, req, marshal, unmarshal, deadline = nil,
+ return_op: false, **kw)
+ c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
+ md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+ return c.request_response(req, **md) unless return_op
- # return the operation view of the active_call; define #execute as a
- # new method for this instance that invokes #client_streamer.
- op = c.operation
- op.define_singleton_method(:execute) do
- c.client_streamer(requests, **md)
- end
- op
+ # return the operation view of the active_call; define #execute as a
+ # new method for this instance that invokes #request_response.
+ op = c.operation
+ op.define_singleton_method(:execute) do
+ c.request_response(req, **md)
end
+ op
+ end
- # server_streamer sends one request to the GRPC server, which yields a
- # stream of responses.
- #
- # responses provides an enumerator over the streamed responses, i.e. it
- # follows Ruby's #each iteration protocol. The enumerator blocks while
- # waiting for each response, stops when the server signals that no
- # further responses will be supplied. If the implicit block is provided,
- # it is executed with each response as the argument and no result is
- # returned.
- #
- # == Flow Control ==
- # This is a blocking call.
- #
- # * the request is sent only when GRPC core's flow control allows it to
- # be sent.
- #
- # * the request will not complete until the server sends the final
- # response followed by a status message.
- #
- # == Errors ==
- # An RuntimeError is raised if
- #
- # * the server responds with a non-OK status when any response is
- # * retrieved
- #
- # * the deadline is exceeded
- #
- # == Return Value ==
- #
- # if the return_op is false, the return value is an Enumerator of the
- # results, unless a block is provided, in which case the block is
- # executed with each response.
- #
- # if return_op is true, the function returns an Operation whose #execute
- # method runs server streamer call. Again, Operation#execute either
- # calls the given block with each response or returns an Enumerator of the
- # responses.
- #
- # == Keyword Args ==
- #
- # Unspecified keyword arguments are treated as metadata to be sent to the
- # server.
- #
- # @param method [String] the RPC method to call on the GRPC server
- # @param req [Object] the request sent to the server
- # @param marshal [Function] f(obj)->string that marshals requests
- # @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Numeric] the max completion time in seconds
- # @param return_op [true|false]return an Operation if true
- # @param blk [Block] when provided, is executed for each response
- # @return [Enumerator|Operation|nil] as discussed above
- def server_streamer(method, req, marshal, unmarshal, deadline = nil,
- return_op: false, **kw, &blk)
- c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
- md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
- return c.server_streamer(req, **md, &blk) unless return_op
+ # client_streamer sends a stream of requests to a GRPC server, and
+ # returns a single response.
+ #
+ # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
+ # #each enumeration protocol. In the simplest case, requests will be an
+ # array of marshallable objects; in typical case it will be an Enumerable
+ # that allows dynamic construction of the marshallable objects.
+ #
+ # == Flow Control ==
+ # This is a blocking call.
+ #
+ # * it does not return until a response is received.
+ #
+ # * each requests is sent only when GRPC core's flow control allows it to
+ # be sent.
+ #
+ # == Errors ==
+ # An RuntimeError is raised if
+ #
+ # * the server responds with a non-OK status
+ #
+ # * the deadline is exceeded
+ #
+ # == Return Value ==
+ #
+ # If return_op is false, the call consumes the requests and returns
+ # the response.
+ #
+ # If return_op is true, the call returns the response.
+ #
+ # == Keyword Args ==
+ #
+ # Unspecified keyword arguments are treated as metadata to be sent to the
+ # server.
+ #
+ # @param method [String] the RPC method to call on the GRPC server
+ # @param requests [Object] an Enumerable of requests to send
+ # @param marshal [Function] f(obj)->string that marshals requests
+ # @param unmarshal [Function] f(string)->obj that unmarshals responses
+ # @param deadline [Numeric] the max completion time in seconds
+ # @param return_op [true|false] return an Operation if true
+ # @return [Object|Operation] the response received from the server
+ def client_streamer(method, requests, marshal, unmarshal, deadline = nil,
+ return_op: false, **kw)
+ c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
+ md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+ return c.client_streamer(requests, **md) unless return_op
- # return the operation view of the active_call; define #execute
- # as a new method for this instance that invokes #server_streamer
- op = c.operation
- op.define_singleton_method(:execute) do
- c.server_streamer(req, **md, &blk)
- end
- op
+ # return the operation view of the active_call; define #execute as a
+ # new method for this instance that invokes #client_streamer.
+ op = c.operation
+ op.define_singleton_method(:execute) do
+ c.client_streamer(requests, **md)
end
+ op
+ end
- # bidi_streamer sends a stream of requests to the GRPC server, and yields
- # a stream of responses.
- #
- # This method takes an Enumerable of requests, and returns and enumerable
- # of responses.
- #
- # == requests ==
- #
- # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
- # #each enumeration protocol. In the simplest case, requests will be an
- # array of marshallable objects; in typical case it will be an
- # Enumerable that allows dynamic construction of the marshallable
- # objects.
- #
- # == responses ==
- #
- # This is an enumerator of responses. I.e, its #next method blocks
- # waiting for the next response. Also, if at any point the block needs
- # to consume all the remaining responses, this can be done using #each or
- # #collect. Calling #each or #collect should only be done if
- # the_call#writes_done has been called, otherwise the block will loop
- # forever.
- #
- # == Flow Control ==
- # This is a blocking call.
- #
- # * the call completes when the next call to provided block returns
- # * [False]
- #
- # * the execution block parameters are two objects for sending and
- # receiving responses, each of which blocks waiting for flow control.
- # E.g, calles to bidi_call#remote_send will wait until flow control
- # allows another write before returning; and obviously calls to
- # responses#next block until the next response is available.
- #
- # == Termination ==
- #
- # As well as sending and receiving messages, the block passed to the
- # function is also responsible for:
- #
- # * calling bidi_call#writes_done to indicate no further reqs will be
- # sent.
- #
- # * returning false if once the bidi stream is functionally completed.
- #
- # Note that response#next will indicate that there are no further
- # responses by throwing StopIteration, but can only happen either
- # if bidi_call#writes_done is called.
- #
- # To terminate the RPC correctly the block:
- #
- # * must call bidi#writes_done and then
- #
- # * either return false as soon as there is no need for other responses
- #
- # * loop on responses#next until no further responses are available
- #
- # == Errors ==
- # An RuntimeError is raised if
- #
- # * the server responds with a non-OK status when any response is
- # * retrieved
- #
- # * the deadline is exceeded
- #
- #
- # == Keyword Args ==
- #
- # Unspecified keyword arguments are treated as metadata to be sent to the
- # server.
- #
- # == Return Value ==
- #
- # if the return_op is false, the return value is an Enumerator of the
- # results, unless a block is provided, in which case the block is
- # executed with each response.
- #
- # if return_op is true, the function returns an Operation whose #execute
- # method runs the Bidi call. Again, Operation#execute either calls a
- # given block with each response or returns an Enumerator of the
- # responses.
- #
- # @param method [String] the RPC method to call on the GRPC server
- # @param requests [Object] an Enumerable of requests to send
- # @param marshal [Function] f(obj)->string that marshals requests
- # @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [Numeric] (optional) the max completion time in seconds
- # @param blk [Block] when provided, is executed for each response
- # @param return_op [true|false] return an Operation if true
- # @return [Enumerator|nil|Operation] as discussed above
- def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil,
+ # server_streamer sends one request to the GRPC server, which yields a
+ # stream of responses.
+ #
+ # responses provides an enumerator over the streamed responses, i.e. it
+ # follows Ruby's #each iteration protocol. The enumerator blocks while
+ # waiting for each response, stops when the server signals that no
+ # further responses will be supplied. If the implicit block is provided,
+ # it is executed with each response as the argument and no result is
+ # returned.
+ #
+ # == Flow Control ==
+ # This is a blocking call.
+ #
+ # * the request is sent only when GRPC core's flow control allows it to
+ # be sent.
+ #
+ # * the request will not complete until the server sends the final
+ # response followed by a status message.
+ #
+ # == Errors ==
+ # An RuntimeError is raised if
+ #
+ # * the server responds with a non-OK status when any response is
+ # * retrieved
+ #
+ # * the deadline is exceeded
+ #
+ # == Return Value ==
+ #
+ # if the return_op is false, the return value is an Enumerator of the
+ # results, unless a block is provided, in which case the block is
+ # executed with each response.
+ #
+ # if return_op is true, the function returns an Operation whose #execute
+ # method runs server streamer call. Again, Operation#execute either
+ # calls the given block with each response or returns an Enumerator of the
+ # responses.
+ #
+ # == Keyword Args ==
+ #
+ # Unspecified keyword arguments are treated as metadata to be sent to the
+ # server.
+ #
+ # @param method [String] the RPC method to call on the GRPC server
+ # @param req [Object] the request sent to the server
+ # @param marshal [Function] f(obj)->string that marshals requests
+ # @param unmarshal [Function] f(string)->obj that unmarshals responses
+ # @param deadline [Numeric] the max completion time in seconds
+ # @param return_op [true|false]return an Operation if true
+ # @param blk [Block] when provided, is executed for each response
+ # @return [Enumerator|Operation|nil] as discussed above
+ def server_streamer(method, req, marshal, unmarshal, deadline = nil,
return_op: false, **kw, &blk)
- c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
- md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
- return c.bidi_streamer(requests, **md, &blk) unless return_op
+ c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
+ md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+ return c.server_streamer(req, **md, &blk) unless return_op
- # return the operation view of the active_call; define #execute
- # as a new method for this instance that invokes #bidi_streamer
- op = c.operation
- op.define_singleton_method(:execute) do
- c.bidi_streamer(requests, **md, &blk)
- end
- op
+ # return the operation view of the active_call; define #execute
+ # as a new method for this instance that invokes #server_streamer
+ op = c.operation
+ op.define_singleton_method(:execute) do
+ c.server_streamer(req, **md, &blk)
end
+ op
+ end
- private
+ # bidi_streamer sends a stream of requests to the GRPC server, and yields
+ # a stream of responses.
+ #
+ # This method takes an Enumerable of requests, and returns and enumerable
+ # of responses.
+ #
+ # == requests ==
+ #
+ # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
+ # #each enumeration protocol. In the simplest case, requests will be an
+ # array of marshallable objects; in typical case it will be an
+ # Enumerable that allows dynamic construction of the marshallable
+ # objects.
+ #
+ # == responses ==
+ #
+ # This is an enumerator of responses. I.e, its #next method blocks
+ # waiting for the next response. Also, if at any point the block needs
+ # to consume all the remaining responses, this can be done using #each or
+ # #collect. Calling #each or #collect should only be done if
+ # the_call#writes_done has been called, otherwise the block will loop
+ # forever.
+ #
+ # == Flow Control ==
+ # This is a blocking call.
+ #
+ # * the call completes when the next call to provided block returns
+ # * [False]
+ #
+ # * the execution block parameters are two objects for sending and
+ # receiving responses, each of which blocks waiting for flow control.
+ # E.g, calles to bidi_call#remote_send will wait until flow control
+ # allows another write before returning; and obviously calls to
+ # responses#next block until the next response is available.
+ #
+ # == Termination ==
+ #
+ # As well as sending and receiving messages, the block passed to the
+ # function is also responsible for:
+ #
+ # * calling bidi_call#writes_done to indicate no further reqs will be
+ # sent.
+ #
+ # * returning false if once the bidi stream is functionally completed.
+ #
+ # Note that response#next will indicate that there are no further
+ # responses by throwing StopIteration, but can only happen either
+ # if bidi_call#writes_done is called.
+ #
+ # To terminate the RPC correctly the block:
+ #
+ # * must call bidi#writes_done and then
+ #
+ # * either return false as soon as there is no need for other responses
+ #
+ # * loop on responses#next until no further responses are available
+ #
+ # == Errors ==
+ # An RuntimeError is raised if
+ #
+ # * the server responds with a non-OK status when any response is
+ # * retrieved
+ #
+ # * the deadline is exceeded
+ #
+ #
+ # == Keyword Args ==
+ #
+ # Unspecified keyword arguments are treated as metadata to be sent to the
+ # server.
+ #
+ # == Return Value ==
+ #
+ # if the return_op is false, the return value is an Enumerator of the
+ # results, unless a block is provided, in which case the block is
+ # executed with each response.
+ #
+ # if return_op is true, the function returns an Operation whose #execute
+ # method runs the Bidi call. Again, Operation#execute either calls a
+ # given block with each response or returns an Enumerator of the
+ # responses.
+ #
+ # @param method [String] the RPC method to call on the GRPC server
+ # @param requests [Object] an Enumerable of requests to send
+ # @param marshal [Function] f(obj)->string that marshals requests
+ # @param unmarshal [Function] f(string)->obj that unmarshals responses
+ # @param deadline [Numeric] (optional) the max completion time in seconds
+ # @param blk [Block] when provided, is executed for each response
+ # @param return_op [true|false] return an Operation if true
+ # @return [Enumerator|nil|Operation] as discussed above
+ def bidi_streamer(method, requests, marshal, unmarshal, deadline = nil,
+ return_op: false, **kw, &blk)
+ c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
+ md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
+ return c.bidi_streamer(requests, **md, &blk) unless return_op
- # Creates a new active stub
- #
- # @param ch [GRPC::Channel] the channel used to create the stub.
- # @param marshal [Function] f(obj)->string that marshals requests
- # @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param deadline [TimeConst]
- def new_active_call(ch, marshal, unmarshal, deadline = nil)
- absolute_deadline = Core::TimeConsts.from_relative_time(deadline)
- call = @ch.create_call(ch, @host, absolute_deadline)
- ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline,
- started: false)
+ # return the operation view of the active_call; define #execute
+ # as a new method for this instance that invokes #bidi_streamer
+ op = c.operation
+ op.define_singleton_method(:execute) do
+ c.bidi_streamer(requests, **md, &blk)
end
+ op
+ end
+
+ private
+
+ # Creates a new active stub
+ #
+ # @param ch [GRPC::Channel] the channel used to create the stub.
+ # @param marshal [Function] f(obj)->string that marshals requests
+ # @param unmarshal [Function] f(string)->obj that unmarshals responses
+ # @param deadline [TimeConst]
+ def new_active_call(ch, marshal, unmarshal, deadline = nil)
+ absolute_deadline = Core::TimeConsts.from_relative_time(deadline)
+ call = @ch.create_call(ch, @host, absolute_deadline)
+ ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline,
+ started: false)
end
end
end
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index 876397a6e7..6d8bdeb3cc 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -29,123 +29,122 @@
require 'grpc/grpc'
-module Google
- module RPC
- # RpcDesc is a Descriptor of an RPC method.
- class RpcDesc < Struct.new(:name, :input, :output, :marshal_method,
- :unmarshal_method)
- include Core::StatusCodes
+# GRPC contains the General RPC module.
+module GRPC
+ # RpcDesc is a Descriptor of an RPC method.
+ class RpcDesc < Struct.new(:name, :input, :output, :marshal_method,
+ :unmarshal_method)
+ include Core::StatusCodes
- # Used to wrap a message class to indicate that it needs to be streamed.
- class Stream
- attr_accessor :type
+ # Used to wrap a message class to indicate that it needs to be streamed.
+ class Stream
+ attr_accessor :type
- def initialize(type)
- @type = type
- end
+ def initialize(type)
+ @type = type
end
+ end
- # @return [Proc] { |instance| marshalled(instance) }
- def marshal_proc
- proc { |o| o.class.method(marshal_method).call(o).to_s }
- end
+ # @return [Proc] { |instance| marshalled(instance) }
+ def marshal_proc
+ proc { |o| o.class.method(marshal_method).call(o).to_s }
+ end
- # @param [:input, :output] target determines whether to produce the an
- # unmarshal Proc for the rpc input parameter or
- # its output parameter
- #
- # @return [Proc] An unmarshal proc { |marshalled(instance)| instance }
- def unmarshal_proc(target)
- fail ArgumentError unless [:input, :output].include?(target)
- unmarshal_class = method(target).call
- unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream
- proc { |o| unmarshal_class.method(unmarshal_method).call(o) }
- end
+ # @param [:input, :output] target determines whether to produce the an
+ # unmarshal Proc for the rpc input parameter or
+ # its output parameter
+ #
+ # @return [Proc] An unmarshal proc { |marshalled(instance)| instance }
+ def unmarshal_proc(target)
+ fail ArgumentError unless [:input, :output].include?(target)
+ unmarshal_class = method(target).call
+ unmarshal_class = unmarshal_class.type if unmarshal_class.is_a? Stream
+ proc { |o| unmarshal_class.method(unmarshal_method).call(o) }
+ end
- def run_server_method(active_call, mth)
- # While a server method is running, it might be cancelled, its deadline
- # might be reached, the handler could throw an unknown error, or a
- # well-behaved handler could throw a StatusError.
- if request_response?
- req = active_call.remote_read
- resp = mth.call(req, active_call.single_req_view)
- active_call.remote_send(resp)
- elsif client_streamer?
- resp = mth.call(active_call.multi_req_view)
- active_call.remote_send(resp)
- elsif server_streamer?
- req = active_call.remote_read
- replys = mth.call(req, active_call.single_req_view)
- replys.each { |r| active_call.remote_send(r) }
- else # is a bidi_stream
- active_call.run_server_bidi(mth)
- end
- send_status(active_call, OK, 'OK')
- rescue BadStatus => e
- # this is raised by handlers that want GRPC to send an application
- # error code and detail message.
- logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}")
- send_status(active_call, e.code, e.details)
- rescue Core::CallError => e
- # This is raised by GRPC internals but should rarely, if ever happen.
- # Log it, but don't notify the other endpoint..
- logger.warn("failed call: #{active_call}\n#{e}")
- rescue OutOfTime
- # This is raised when active_call#method.call exceeeds the deadline
- # event. Send a status of deadline exceeded
- logger.warn("late call: #{active_call}")
- send_status(active_call, DEADLINE_EXCEEDED, 'late')
- rescue Core::EventError => e
- # This is raised by GRPC internals but should rarely, if ever happen.
- # Log it, but don't notify the other endpoint..
- logger.warn("failed call: #{active_call}\n#{e}")
- rescue StandardError => e
- # This will usuaally be an unhandled error in the handling code.
- # Send back a UNKNOWN status to the client
- logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
- logger.warn(e)
- send_status(active_call, UNKNOWN, 'no reason given')
+ def run_server_method(active_call, mth)
+ # While a server method is running, it might be cancelled, its deadline
+ # might be reached, the handler could throw an unknown error, or a
+ # well-behaved handler could throw a StatusError.
+ if request_response?
+ req = active_call.remote_read
+ resp = mth.call(req, active_call.single_req_view)
+ active_call.remote_send(resp)
+ elsif client_streamer?
+ resp = mth.call(active_call.multi_req_view)
+ active_call.remote_send(resp)
+ elsif server_streamer?
+ req = active_call.remote_read
+ replys = mth.call(req, active_call.single_req_view)
+ replys.each { |r| active_call.remote_send(r) }
+ else # is a bidi_stream
+ active_call.run_server_bidi(mth)
end
+ send_status(active_call, OK, 'OK')
+ rescue BadStatus => e
+ # this is raised by handlers that want GRPC to send an application
+ # error code and detail message.
+ logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}")
+ send_status(active_call, e.code, e.details)
+ rescue Core::CallError => e
+ # This is raised by GRPC internals but should rarely, if ever happen.
+ # Log it, but don't notify the other endpoint..
+ logger.warn("failed call: #{active_call}\n#{e}")
+ rescue OutOfTime
+ # This is raised when active_call#method.call exceeeds the deadline
+ # event. Send a status of deadline exceeded
+ logger.warn("late call: #{active_call}")
+ send_status(active_call, DEADLINE_EXCEEDED, 'late')
+ rescue Core::EventError => e
+ # This is raised by GRPC internals but should rarely, if ever happen.
+ # Log it, but don't notify the other endpoint..
+ logger.warn("failed call: #{active_call}\n#{e}")
+ rescue StandardError => e
+ # This will usuaally be an unhandled error in the handling code.
+ # Send back a UNKNOWN status to the client
+ logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
+ logger.warn(e)
+ send_status(active_call, UNKNOWN, 'no reason given')
+ end
- def assert_arity_matches(mth)
- if request_response? || server_streamer?
- if mth.arity != 2
- fail arity_error(mth, 2, "should be #{mth.name}(req, call)")
- end
- else
- if mth.arity != 1
- fail arity_error(mth, 1, "should be #{mth.name}(call)")
- end
+ def assert_arity_matches(mth)
+ if request_response? || server_streamer?
+ if mth.arity != 2
+ fail arity_error(mth, 2, "should be #{mth.name}(req, call)")
+ end
+ else
+ if mth.arity != 1
+ fail arity_error(mth, 1, "should be #{mth.name}(call)")
end
end
+ end
- def request_response?
- !input.is_a?(Stream) && !output.is_a?(Stream)
- end
+ def request_response?
+ !input.is_a?(Stream) && !output.is_a?(Stream)
+ end
- def client_streamer?
- input.is_a?(Stream) && !output.is_a?(Stream)
- end
+ def client_streamer?
+ input.is_a?(Stream) && !output.is_a?(Stream)
+ end
- def server_streamer?
- !input.is_a?(Stream) && output.is_a?(Stream)
- end
+ def server_streamer?
+ !input.is_a?(Stream) && output.is_a?(Stream)
+ end
- def bidi_streamer?
- input.is_a?(Stream) && output.is_a?(Stream)
- end
+ def bidi_streamer?
+ input.is_a?(Stream) && output.is_a?(Stream)
+ end
- def arity_error(mth, want, msg)
- "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}"
- end
+ def arity_error(mth, want, msg)
+ "##{mth.name}: bad arg count; got:#{mth.arity}, want:#{want}, #{msg}"
+ end
- def send_status(active_client, code, details)
- details = 'Not sure why' if details.nil?
- active_client.send_status(code, details)
- rescue StandardError => e
- logger.warn("Could not send status #{code}:#{details}")
- logger.warn(e)
- end
+ def send_status(active_client, code, details)
+ details = 'Not sure why' if details.nil?
+ active_client.send_status(code, details)
+ rescue StandardError => e
+ logger.warn("Could not send status #{code}:#{details}")
+ logger.warn(e)
end
end
end
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 40c5ec118e..dd40eba57f 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -33,372 +33,370 @@ require 'grpc/generic/service'
require 'thread'
require 'xray/thread_dump_signal_handler'
-module Google
- # Google::RPC contains the General RPC module.
- module RPC
- # RpcServer hosts a number of services and makes them available on the
- # network.
- class RpcServer
- include Core::CompletionType
- include Core::TimeConsts
- extend ::Forwardable
-
- def_delegators :@server, :add_http2_port
-
- # Default thread pool size is 3
- DEFAULT_POOL_SIZE = 3
-
- # Default max_waiting_requests size is 20
- DEFAULT_MAX_WAITING_REQUESTS = 20
-
- # Creates a new RpcServer.
- #
- # The RPC server is configured using keyword arguments.
- #
- # There are some specific keyword args used to configure the RpcServer
- # instance, however other arbitrary are allowed and when present are used
- # to configure the listeninng connection set up by the RpcServer.
- #
- # * server_override: which if passed must be a [GRPC::Core::Server]. When
- # present.
- #
- # * poll_period: when present, the server polls for new events with this
- # period
- #
- # * pool_size: the size of the thread pool the server uses to run its
- # threads
- #
- # * completion_queue_override: when supplied, this will be used as the
- # completion_queue that the server uses to receive network events,
- # otherwise its creates a new instance itself
- #
- # * creds: [GRPC::Core::ServerCredentials]
- # the credentials used to secure the server
- #
- # * max_waiting_requests: the maximum number of requests that are not
- # being handled to allow. When this limit is exceeded, the server responds
- # with not available to new requests
- def initialize(pool_size:DEFAULT_POOL_SIZE,
- max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
- poll_period:INFINITE_FUTURE,
- completion_queue_override:nil,
- creds:nil,
- server_override:nil,
- **kw)
- if completion_queue_override.nil?
- cq = Core::CompletionQueue.new
- else
- cq = completion_queue_override
- unless cq.is_a? Core::CompletionQueue
- fail(ArgumentError, 'not a CompletionQueue')
- end
+# GRPC contains the General RPC module.
+module GRPC
+ # RpcServer hosts a number of services and makes them available on the
+ # network.
+ class RpcServer
+ include Core::CompletionType
+ include Core::TimeConsts
+ extend ::Forwardable
+
+ def_delegators :@server, :add_http2_port
+
+ # Default thread pool size is 3
+ DEFAULT_POOL_SIZE = 3
+
+ # Default max_waiting_requests size is 20
+ DEFAULT_MAX_WAITING_REQUESTS = 20
+
+ # Creates a new RpcServer.
+ #
+ # The RPC server is configured using keyword arguments.
+ #
+ # There are some specific keyword args used to configure the RpcServer
+ # instance, however other arbitrary are allowed and when present are used
+ # to configure the listeninng connection set up by the RpcServer.
+ #
+ # * server_override: which if passed must be a [GRPC::Core::Server]. When
+ # present.
+ #
+ # * poll_period: when present, the server polls for new events with this
+ # period
+ #
+ # * pool_size: the size of the thread pool the server uses to run its
+ # threads
+ #
+ # * completion_queue_override: when supplied, this will be used as the
+ # completion_queue that the server uses to receive network events,
+ # otherwise its creates a new instance itself
+ #
+ # * creds: [GRPC::Core::ServerCredentials]
+ # the credentials used to secure the server
+ #
+ # * max_waiting_requests: the maximum number of requests that are not
+ # being handled to allow. When this limit is exceeded, the server responds
+ # with not available to new requests
+ def initialize(pool_size:DEFAULT_POOL_SIZE,
+ max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
+ poll_period:INFINITE_FUTURE,
+ completion_queue_override:nil,
+ creds:nil,
+ server_override:nil,
+ **kw)
+ if completion_queue_override.nil?
+ cq = Core::CompletionQueue.new
+ else
+ cq = completion_queue_override
+ unless cq.is_a? Core::CompletionQueue
+ fail(ArgumentError, 'not a CompletionQueue')
end
- @cq = cq
+ end
+ @cq = cq
- if server_override.nil?
- if creds.nil?
- srv = Core::Server.new(@cq, kw)
- elsif !creds.is_a? Core::ServerCredentials
- fail(ArgumentError, 'not a ServerCredentials')
- else
- srv = Core::Server.new(@cq, kw, creds)
- end
+ if server_override.nil?
+ if creds.nil?
+ srv = Core::Server.new(@cq, kw)
+ elsif !creds.is_a? Core::ServerCredentials
+ fail(ArgumentError, 'not a ServerCredentials')
else
- srv = server_override
- fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server
+ srv = Core::Server.new(@cq, kw, creds)
end
- @server = srv
-
- @pool_size = pool_size
- @max_waiting_requests = max_waiting_requests
- @poll_period = poll_period
- @run_mutex = Mutex.new
- @run_cond = ConditionVariable.new
- @pool = Pool.new(@pool_size)
+ else
+ srv = server_override
+ fail(ArgumentError, 'not a Server') unless srv.is_a? Core::Server
end
+ @server = srv
+
+ @pool_size = pool_size
+ @max_waiting_requests = max_waiting_requests
+ @poll_period = poll_period
+ @run_mutex = Mutex.new
+ @run_cond = ConditionVariable.new
+ @pool = Pool.new(@pool_size)
+ end
- # stops a running server
- #
- # the call has no impact if the server is already stopped, otherwise
- # server's current call loop is it's last.
- def stop
- return unless @running
- @stopped = true
- @pool.stop
- end
+ # stops a running server
+ #
+ # the call has no impact if the server is already stopped, otherwise
+ # server's current call loop is it's last.
+ def stop
+ return unless @running
+ @stopped = true
+ @pool.stop
+ end
- # determines if the server is currently running
- def running?
- @running ||= false
- end
+ # determines if the server is currently running
+ def running?
+ @running ||= false
+ end
- # Is called from other threads to wait for #run to start up the server.
- #
- # If run has not been called, this returns immediately.
- #
- # @param timeout [Numeric] number of seconds to wait
- # @result [true, false] true if the server is running, false otherwise
- def wait_till_running(timeout = 0.1)
- end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100
- while Time.now < end_time
- @run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running?
- sleep(sleep_period)
- end
- running?
+ # Is called from other threads to wait for #run to start up the server.
+ #
+ # If run has not been called, this returns immediately.
+ #
+ # @param timeout [Numeric] number of seconds to wait
+ # @result [true, false] true if the server is running, false otherwise
+ def wait_till_running(timeout = 0.1)
+ end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100
+ while Time.now < end_time
+ @run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running?
+ sleep(sleep_period)
end
+ running?
+ end
- # determines if the server is currently stopped
- def stopped?
- @stopped ||= false
- end
+ # determines if the server is currently stopped
+ def stopped?
+ @stopped ||= false
+ end
- # handle registration of classes
- #
- # service is either a class that includes GRPC::GenericService and whose
- # #new function can be called without argument or any instance of such a
- # class.
- #
- # E.g, after
- #
- # class Divider
- # include GRPC::GenericService
- # rpc :div DivArgs, DivReply # single request, single response
- # def initialize(optional_arg='default option') # no args
- # ...
- # end
- #
- # srv = GRPC::RpcServer.new(...)
- #
- # # Either of these works
- #
- # srv.handle(Divider)
- #
- # # or
- #
- # srv.handle(Divider.new('replace optional arg'))
- #
- # It raises RuntimeError:
- # - if service is not valid service class or object
- # - its handler methods are already registered
- # - if the server is already running
- #
- # @param service [Object|Class] a service class or object as described
- # above
- def handle(service)
- fail 'cannot add services if the server is running' if running?
- fail 'cannot add services if the server is stopped' if stopped?
- cls = service.is_a?(Class) ? service : service.class
- assert_valid_service_class(cls)
- add_rpc_descs_for(service)
- end
+ # handle registration of classes
+ #
+ # service is either a class that includes GRPC::GenericService and whose
+ # #new function can be called without argument or any instance of such a
+ # class.
+ #
+ # E.g, after
+ #
+ # class Divider
+ # include GRPC::GenericService
+ # rpc :div DivArgs, DivReply # single request, single response
+ # def initialize(optional_arg='default option') # no args
+ # ...
+ # end
+ #
+ # srv = GRPC::RpcServer.new(...)
+ #
+ # # Either of these works
+ #
+ # srv.handle(Divider)
+ #
+ # # or
+ #
+ # srv.handle(Divider.new('replace optional arg'))
+ #
+ # It raises RuntimeError:
+ # - if service is not valid service class or object
+ # - its handler methods are already registered
+ # - if the server is already running
+ #
+ # @param service [Object|Class] a service class or object as described
+ # above
+ def handle(service)
+ fail 'cannot add services if the server is running' if running?
+ fail 'cannot add services if the server is stopped' if stopped?
+ cls = service.is_a?(Class) ? service : service.class
+ assert_valid_service_class(cls)
+ add_rpc_descs_for(service)
+ end
- # runs the server
- #
- # - if no rpc_descs are registered, this exits immediately, otherwise it
- # continues running permanently and does not return until program exit.
- #
- # - #running? returns true after this is called, until #stop cause the
- # the server to stop.
- def run
- if rpc_descs.size == 0
- logger.warn('did not run as no services were present')
- return
- end
- @run_mutex.synchronize do
- @running = true
- @run_cond.signal
+ # runs the server
+ #
+ # - if no rpc_descs are registered, this exits immediately, otherwise it
+ # continues running permanently and does not return until program exit.
+ #
+ # - #running? returns true after this is called, until #stop cause the
+ # the server to stop.
+ def run
+ if rpc_descs.size == 0
+ logger.warn('did not run as no services were present')
+ return
+ end
+ @run_mutex.synchronize do
+ @running = true
+ @run_cond.signal
+ end
+ @pool.start
+ @server.start
+ server_tag = Object.new
+ until stopped?
+ @server.request_call(server_tag)
+ ev = @cq.pluck(server_tag, @poll_period)
+ next if ev.nil?
+ if ev.type != SERVER_RPC_NEW
+ logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
+ ev.close
+ next
end
- @pool.start
- @server.start
- server_tag = Object.new
- until stopped?
- @server.request_call(server_tag)
- ev = @cq.pluck(server_tag, @poll_period)
- next if ev.nil?
- if ev.type != SERVER_RPC_NEW
- logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
- ev.close
- next
- end
- c = new_active_server_call(ev.call, ev.result)
- unless c.nil?
- mth = ev.result.method.to_sym
- ev.close
- @pool.schedule(c) do |call|
- rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
- end
+ c = new_active_server_call(ev.call, ev.result)
+ unless c.nil?
+ mth = ev.result.method.to_sym
+ ev.close
+ @pool.schedule(c) do |call|
+ rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
end
end
- @running = false
end
+ @running = false
+ end
- def new_active_server_call(call, new_server_rpc)
- # Accept the call. This is necessary even if a status is to be sent
- # back immediately
- finished_tag = Object.new
- call_queue = Core::CompletionQueue.new
- call.metadata = new_server_rpc.metadata # store the metadata
- call.server_accept(call_queue, finished_tag)
- call.server_end_initial_metadata
-
- # Send UNAVAILABLE if there are too many unprocessed jobs
- jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
- logger.info("waiting: #{jobs_count}, max: #{max}")
- if @pool.jobs_waiting > @max_waiting_requests
- logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}")
- noop = proc { |x| x }
- c = ActiveCall.new(call, call_queue, noop, noop,
- new_server_rpc.deadline,
- finished_tag: finished_tag)
- c.send_status(StatusCodes::UNAVAILABLE, '')
- return nil
- end
-
- # Send NOT_FOUND if the method does not exist
- mth = new_server_rpc.method.to_sym
- unless rpc_descs.key?(mth)
- logger.warn("NOT_FOUND: #{new_server_rpc}")
- noop = proc { |x| x }
- c = ActiveCall.new(call, call_queue, noop, noop,
- new_server_rpc.deadline,
- finished_tag: finished_tag)
- c.send_status(StatusCodes::NOT_FOUND, '')
- return nil
- end
+ def new_active_server_call(call, new_server_rpc)
+ # Accept the call. This is necessary even if a status is to be sent
+ # back immediately
+ finished_tag = Object.new
+ call_queue = Core::CompletionQueue.new
+ call.metadata = new_server_rpc.metadata # store the metadata
+ call.server_accept(call_queue, finished_tag)
+ call.server_end_initial_metadata
+
+ # Send UNAVAILABLE if there are too many unprocessed jobs
+ jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
+ logger.info("waiting: #{jobs_count}, max: #{max}")
+ if @pool.jobs_waiting > @max_waiting_requests
+ logger.warn("NOT AVAILABLE: too many jobs_waiting: #{new_server_rpc}")
+ noop = proc { |x| x }
+ c = ActiveCall.new(call, call_queue, noop, noop,
+ new_server_rpc.deadline,
+ finished_tag: finished_tag)
+ c.send_status(StatusCodes::UNAVAILABLE, '')
+ return nil
+ end
- # Create the ActiveCall
- rpc_desc = rpc_descs[mth]
- logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})")
- ActiveCall.new(call, call_queue,
- rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
- new_server_rpc.deadline, finished_tag: finished_tag)
+ # Send NOT_FOUND if the method does not exist
+ mth = new_server_rpc.method.to_sym
+ unless rpc_descs.key?(mth)
+ logger.warn("NOT_FOUND: #{new_server_rpc}")
+ noop = proc { |x| x }
+ c = ActiveCall.new(call, call_queue, noop, noop,
+ new_server_rpc.deadline,
+ finished_tag: finished_tag)
+ c.send_status(StatusCodes::NOT_FOUND, '')
+ return nil
end
- # Pool is a simple thread pool for running server requests.
- class Pool
- def initialize(size)
- fail 'pool size must be positive' unless size > 0
- @jobs = Queue.new
- @size = size
- @stopped = false
- @stop_mutex = Mutex.new
- @stop_cond = ConditionVariable.new
- @workers = []
- end
+ # Create the ActiveCall
+ rpc_desc = rpc_descs[mth]
+ logger.info("deadline is #{new_server_rpc.deadline}; (now=#{Time.now})")
+ ActiveCall.new(call, call_queue,
+ rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
+ new_server_rpc.deadline, finished_tag: finished_tag)
+ end
- # Returns the number of jobs waiting
- def jobs_waiting
- @jobs.size
- end
+ # Pool is a simple thread pool for running server requests.
+ class Pool
+ def initialize(size)
+ fail 'pool size must be positive' unless size > 0
+ @jobs = Queue.new
+ @size = size
+ @stopped = false
+ @stop_mutex = Mutex.new
+ @stop_cond = ConditionVariable.new
+ @workers = []
+ end
- # Runs the given block on the queue with the provided args.
- #
- # @param args the args passed blk when it is called
- # @param blk the block to call
- def schedule(*args, &blk)
- fail 'already stopped' if @stopped
- return if blk.nil?
- logger.info('schedule another job')
- @jobs << [blk, args]
- end
+ # Returns the number of jobs waiting
+ def jobs_waiting
+ @jobs.size
+ end
+
+ # Runs the given block on the queue with the provided args.
+ #
+ # @param args the args passed blk when it is called
+ # @param blk the block to call
+ def schedule(*args, &blk)
+ fail 'already stopped' if @stopped
+ return if blk.nil?
+ logger.info('schedule another job')
+ @jobs << [blk, args]
+ end
- # Starts running the jobs in the thread pool.
- def start
- fail 'already stopped' if @stopped
- until @workers.size == @size.to_i
- next_thread = Thread.new do
- catch(:exit) do # allows { throw :exit } to kill a thread
- loop do
- begin
- blk, args = @jobs.pop
- blk.call(*args)
- rescue StandardError => e
- logger.warn('Error in worker thread')
- logger.warn(e)
- end
+ # Starts running the jobs in the thread pool.
+ def start
+ fail 'already stopped' if @stopped
+ until @workers.size == @size.to_i
+ next_thread = Thread.new do
+ catch(:exit) do # allows { throw :exit } to kill a thread
+ loop do
+ begin
+ blk, args = @jobs.pop
+ blk.call(*args)
+ rescue StandardError => e
+ logger.warn('Error in worker thread')
+ logger.warn(e)
end
end
+ end
- # removes the threads from workers, and signal when all the
- # threads are complete.
- @stop_mutex.synchronize do
- @workers.delete(Thread.current)
- @stop_cond.signal if @workers.size == 0
- end
+ # removes the threads from workers, and signal when all the
+ # threads are complete.
+ @stop_mutex.synchronize do
+ @workers.delete(Thread.current)
+ @stop_cond.signal if @workers.size == 0
end
- @workers << next_thread
end
+ @workers << next_thread
end
+ end
- # Stops the jobs in the pool
- def stop
- logger.info('stopping, will wait for all the workers to exit')
- @workers.size.times { schedule { throw :exit } }
- @stopped = true
+ # Stops the jobs in the pool
+ def stop
+ logger.info('stopping, will wait for all the workers to exit')
+ @workers.size.times { schedule { throw :exit } }
+ @stopped = true
- # TODO: allow configuration of the keepalive period
- keep_alive = 5
- @stop_mutex.synchronize do
- @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0
- end
+ # TODO: allow configuration of the keepalive period
+ keep_alive = 5
+ @stop_mutex.synchronize do
+ @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0
+ end
- # Forcibly shutdown any threads that are still alive.
- if @workers.size > 0
- logger.warn("forcibly terminating #{@workers.size} worker(s)")
- @workers.each do |t|
- next unless t.alive?
- begin
- t.exit
- rescue StandardError => e
- logger.warn('error while terminating a worker')
- logger.warn(e)
- end
+ # Forcibly shutdown any threads that are still alive.
+ if @workers.size > 0
+ logger.warn("forcibly terminating #{@workers.size} worker(s)")
+ @workers.each do |t|
+ next unless t.alive?
+ begin
+ t.exit
+ rescue StandardError => e
+ logger.warn('error while terminating a worker')
+ logger.warn(e)
end
end
-
- logger.info('stopped, all workers are shutdown')
end
+
+ logger.info('stopped, all workers are shutdown')
end
+ end
- protected
+ protected
- def rpc_descs
- @rpc_descs ||= {}
- end
+ def rpc_descs
+ @rpc_descs ||= {}
+ end
- def rpc_handlers
- @rpc_handlers ||= {}
- end
+ def rpc_handlers
+ @rpc_handlers ||= {}
+ end
- private
+ private
- def assert_valid_service_class(cls)
- unless cls.include?(GenericService)
- fail "#{cls} should 'include GenericService'"
- end
- if cls.rpc_descs.size == 0
- fail "#{cls} should specify some rpc descriptions"
- end
- cls.assert_rpc_descs_have_methods
+ def assert_valid_service_class(cls)
+ unless cls.include?(GenericService)
+ fail "#{cls} should 'include GenericService'"
+ end
+ if cls.rpc_descs.size == 0
+ fail "#{cls} should specify some rpc descriptions"
end
+ cls.assert_rpc_descs_have_methods
+ end
- def add_rpc_descs_for(service)
- cls = service.is_a?(Class) ? service : service.class
- specs = rpc_descs
- handlers = rpc_handlers
- cls.rpc_descs.each_pair do |name, spec|
- route = "/#{cls.service_name}/#{name}".to_sym
- if specs.key? route
- fail "Cannot add rpc #{route} from #{spec}, already registered"
+ def add_rpc_descs_for(service)
+ cls = service.is_a?(Class) ? service : service.class
+ specs = rpc_descs
+ handlers = rpc_handlers
+ cls.rpc_descs.each_pair do |name, spec|
+ route = "/#{cls.service_name}/#{name}".to_sym
+ if specs.key? route
+ fail "Cannot add rpc #{route} from #{spec}, already registered"
+ else
+ specs[route] = spec
+ if service.is_a?(Class)
+ handlers[route] = cls.new.method(name.to_s.underscore.to_sym)
else
- specs[route] = spec
- if service.is_a?(Class)
- handlers[route] = cls.new.method(name.to_s.underscore.to_sym)
- else
- handlers[route] = service.method(name.to_s.underscore.to_sym)
- end
- logger.info("handling #{route} with #{handlers[route]}")
+ handlers[route] = service.method(name.to_s.underscore.to_sym)
end
+ logger.info("handling #{route} with #{handlers[route]}")
end
end
end
diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb
index ff37617ccf..f0b85a0fa2 100644
--- a/src/ruby/lib/grpc/generic/service.rb
+++ b/src/ruby/lib/grpc/generic/service.rb
@@ -48,188 +48,186 @@ class String
end
end
-module Google
- # Google::RPC contains the General RPC module.
- module RPC
- # Provides behaviour used to implement schema-derived service classes.
- #
- # Is intended to be used to support both client and server
- # IDL-schema-derived servers.
- module GenericService
- # Used to indicate that a name has already been specified
- class DuplicateRpcName < StandardError
- def initialize(name)
- super("rpc (#{name}) is already defined")
- end
+# GRPC contains the General RPC module.
+module GRPC
+ # Provides behaviour used to implement schema-derived service classes.
+ #
+ # Is intended to be used to support both client and server
+ # IDL-schema-derived servers.
+ module GenericService
+ # Used to indicate that a name has already been specified
+ class DuplicateRpcName < StandardError
+ def initialize(name)
+ super("rpc (#{name}) is already defined")
end
+ end
- # Provides a simple DSL to describe RPC services.
+ # Provides a simple DSL to describe RPC services.
+ #
+ # E.g, a Maths service that uses the serializable messages DivArgs,
+ # DivReply and Num might define its endpoint uses the following way:
+ #
+ # rpc :div DivArgs, DivReply # single request, single response
+ # rpc :sum stream(Num), Num # streamed input, single response
+ # rpc :fib FibArgs, stream(Num) # single request, streamed response
+ # rpc :div_many stream(DivArgs), stream(DivReply)
+ # # streamed req and resp
+ #
+ # Each 'rpc' adds an RpcDesc to classes including this module, and
+ # #assert_rpc_descs_have_methods is used to ensure the including class
+ # provides methods with signatures that support all the descriptors.
+ module Dsl
+ # This configures the method names that the serializable message
+ # implementation uses to marshal and unmarshal messages.
#
- # E.g, a Maths service that uses the serializable messages DivArgs,
- # DivReply and Num might define its endpoint uses the following way:
+ # - unmarshal_class method must be a class method on the serializable
+ # message type that takes a string (byte stream) and produces and object
#
- # rpc :div DivArgs, DivReply # single request, single response
- # rpc :sum stream(Num), Num # streamed input, single response
- # rpc :fib FibArgs, stream(Num) # single request, streamed response
- # rpc :div_many stream(DivArgs), stream(DivReply)
- # # streamed req and resp
+ # - marshal_class_method is called on a serializable message instance
+ # and produces a serialized string.
#
- # Each 'rpc' adds an RpcDesc to classes including this module, and
- # #assert_rpc_descs_have_methods is used to ensure the including class
- # provides methods with signatures that support all the descriptors.
- module Dsl
- # This configures the method names that the serializable message
- # implementation uses to marshal and unmarshal messages.
- #
- # - unmarshal_class method must be a class method on the serializable
- # message type that takes a string (byte stream) and produces and object
- #
- # - marshal_class_method is called on a serializable message instance
- # and produces a serialized string.
- #
- # The Dsl verifies that the types in the descriptor have both the
- # unmarshal and marshal methods.
- attr_writer(:marshal_class_method, :unmarshal_class_method)
-
- # This allows configuration of the service name.
- attr_accessor(:service_name)
-
- # Adds an RPC spec.
- #
- # Takes the RPC name and the classes representing the types to be
- # serialized, and adds them to the including classes rpc_desc hash.
- #
- # input and output should both have the methods #marshal and #unmarshal
- # that are responsible for writing and reading an object instance from a
- # byte buffer respectively.
- #
- # @param name [String] the name of the rpc
- # @param input [Object] the input parameter's class
- # @param output [Object] the output parameter's class
- def rpc(name, input, output)
- fail(DuplicateRpcName, name) if rpc_descs.key? name
- assert_can_marshal(input)
- assert_can_marshal(output)
- rpc_descs[name] = RpcDesc.new(name, input, output,
- marshal_class_method,
- unmarshal_class_method)
- end
+ # The Dsl verifies that the types in the descriptor have both the
+ # unmarshal and marshal methods.
+ attr_writer(:marshal_class_method, :unmarshal_class_method)
- def inherited(subclass)
- # Each subclass should have a distinct class variable with its own
- # rpc_descs
- subclass.rpc_descs.merge!(rpc_descs)
- subclass.service_name = service_name
- end
+ # This allows configuration of the service name.
+ attr_accessor(:service_name)
- # the name of the instance method used to marshal events to a byte
- # stream.
- def marshal_class_method
- @marshal_class_method ||= :marshal
- end
+ # Adds an RPC spec.
+ #
+ # Takes the RPC name and the classes representing the types to be
+ # serialized, and adds them to the including classes rpc_desc hash.
+ #
+ # input and output should both have the methods #marshal and #unmarshal
+ # that are responsible for writing and reading an object instance from a
+ # byte buffer respectively.
+ #
+ # @param name [String] the name of the rpc
+ # @param input [Object] the input parameter's class
+ # @param output [Object] the output parameter's class
+ def rpc(name, input, output)
+ fail(DuplicateRpcName, name) if rpc_descs.key? name
+ assert_can_marshal(input)
+ assert_can_marshal(output)
+ rpc_descs[name] = RpcDesc.new(name, input, output,
+ marshal_class_method,
+ unmarshal_class_method)
+ end
- # the name of the class method used to unmarshal from a byte stream.
- def unmarshal_class_method
- @unmarshal_class_method ||= :unmarshal
- end
+ def inherited(subclass)
+ # Each subclass should have a distinct class variable with its own
+ # rpc_descs
+ subclass.rpc_descs.merge!(rpc_descs)
+ subclass.service_name = service_name
+ end
- def assert_can_marshal(cls)
- cls = cls.type if cls.is_a? RpcDesc::Stream
- mth = unmarshal_class_method
- unless cls.methods.include? mth
- fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
- end
- mth = marshal_class_method
- return if cls.methods.include? mth
+ # the name of the instance method used to marshal events to a byte
+ # stream.
+ def marshal_class_method
+ @marshal_class_method ||= :marshal
+ end
+
+ # the name of the class method used to unmarshal from a byte stream.
+ def unmarshal_class_method
+ @unmarshal_class_method ||= :unmarshal
+ end
+
+ def assert_can_marshal(cls)
+ cls = cls.type if cls.is_a? RpcDesc::Stream
+ mth = unmarshal_class_method
+ unless cls.methods.include? mth
fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
end
+ mth = marshal_class_method
+ return if cls.methods.include? mth
+ fail(ArgumentError, "#{cls} needs #{cls}.#{mth}")
+ end
- # @param cls [Class] the class of a serializable type
- # @return cls wrapped in a RpcDesc::Stream
- def stream(cls)
- assert_can_marshal(cls)
- RpcDesc::Stream.new(cls)
- end
+ # @param cls [Class] the class of a serializable type
+ # @return cls wrapped in a RpcDesc::Stream
+ def stream(cls)
+ assert_can_marshal(cls)
+ RpcDesc::Stream.new(cls)
+ end
- # the RpcDescs defined for this GenericService, keyed by name.
- def rpc_descs
- @rpc_descs ||= {}
- end
+ # the RpcDescs defined for this GenericService, keyed by name.
+ def rpc_descs
+ @rpc_descs ||= {}
+ end
- # Creates a rpc client class with methods for accessing the methods
- # currently in rpc_descs.
- def rpc_stub_class
- descs = rpc_descs
- route_prefix = service_name
- Class.new(ClientStub) do
- # @param host [String] the host the stub connects to
- # @param kw [KeywordArgs] the channel arguments, plus any optional
- # args for configuring the client's channel
- def initialize(host, **kw)
- super(host, Core::CompletionQueue.new, **kw)
- end
+ # Creates a rpc client class with methods for accessing the methods
+ # currently in rpc_descs.
+ def rpc_stub_class
+ descs = rpc_descs
+ route_prefix = service_name
+ Class.new(ClientStub) do
+ # @param host [String] the host the stub connects to
+ # @param kw [KeywordArgs] the channel arguments, plus any optional
+ # args for configuring the client's channel
+ def initialize(host, **kw)
+ super(host, Core::CompletionQueue.new, **kw)
+ end
- # Used define_method to add a method for each rpc_desc. Each method
- # calls the base class method for the given descriptor.
- descs.each_pair do |name, desc|
- mth_name = name.to_s.underscore.to_sym
- marshal = desc.marshal_proc
- unmarshal = desc.unmarshal_proc(:output)
- route = "/#{route_prefix}/#{name}"
- if desc.request_response?
- define_method(mth_name) do |req, deadline = nil|
- logger.debug("calling #{@host}:#{route}")
- request_response(route, req, marshal, unmarshal, deadline)
- end
- elsif desc.client_streamer?
- define_method(mth_name) do |reqs, deadline = nil|
- logger.debug("calling #{@host}:#{route}")
- client_streamer(route, reqs, marshal, unmarshal, deadline)
- end
- elsif desc.server_streamer?
- define_method(mth_name) do |req, deadline = nil, &blk|
- logger.debug("calling #{@host}:#{route}")
- server_streamer(route, req, marshal, unmarshal, deadline,
- &blk)
- end
- else # is a bidi_stream
- define_method(mth_name) do |reqs, deadline = nil, &blk|
- logger.debug("calling #{@host}:#{route}")
- bidi_streamer(route, reqs, marshal, unmarshal, deadline, &blk)
- end
+ # Used define_method to add a method for each rpc_desc. Each method
+ # calls the base class method for the given descriptor.
+ descs.each_pair do |name, desc|
+ mth_name = name.to_s.underscore.to_sym
+ marshal = desc.marshal_proc
+ unmarshal = desc.unmarshal_proc(:output)
+ route = "/#{route_prefix}/#{name}"
+ if desc.request_response?
+ define_method(mth_name) do |req, deadline = nil|
+ logger.debug("calling #{@host}:#{route}")
+ request_response(route, req, marshal, unmarshal, deadline)
+ end
+ elsif desc.client_streamer?
+ define_method(mth_name) do |reqs, deadline = nil|
+ logger.debug("calling #{@host}:#{route}")
+ client_streamer(route, reqs, marshal, unmarshal, deadline)
+ end
+ elsif desc.server_streamer?
+ define_method(mth_name) do |req, deadline = nil, &blk|
+ logger.debug("calling #{@host}:#{route}")
+ server_streamer(route, req, marshal, unmarshal, deadline,
+ &blk)
+ end
+ else # is a bidi_stream
+ define_method(mth_name) do |reqs, deadline = nil, &blk|
+ logger.debug("calling #{@host}:#{route}")
+ bidi_streamer(route, reqs, marshal, unmarshal, deadline, &blk)
end
end
end
end
+ end
- # Asserts that the appropriate methods are defined for each added rpc
- # spec. Is intended to aid verifying that server classes are correctly
- # implemented.
- def assert_rpc_descs_have_methods
- rpc_descs.each_pair do |m, spec|
- mth_name = m.to_s.underscore.to_sym
- unless instance_methods.include?(mth_name)
- fail "#{self} does not provide instance method '#{mth_name}'"
- end
- spec.assert_arity_matches(instance_method(mth_name))
+ # Asserts that the appropriate methods are defined for each added rpc
+ # spec. Is intended to aid verifying that server classes are correctly
+ # implemented.
+ def assert_rpc_descs_have_methods
+ rpc_descs.each_pair do |m, spec|
+ mth_name = m.to_s.underscore.to_sym
+ unless instance_methods.include?(mth_name)
+ fail "#{self} does not provide instance method '#{mth_name}'"
end
+ spec.assert_arity_matches(instance_method(mth_name))
end
end
+ end
- def self.included(o)
- o.extend(Dsl)
- # Update to the use the service name including module. Proivde a default
- # that can be nil e,g. when modules are declared dynamically.
- return unless o.service_name.nil?
- if o.name.nil?
- o.service_name = 'GenericService'
+ def self.included(o)
+ o.extend(Dsl)
+ # Update to the use the service name including module. Proivde a default
+ # that can be nil e,g. when modules are declared dynamically.
+ return unless o.service_name.nil?
+ if o.name.nil?
+ o.service_name = 'GenericService'
+ else
+ modules = o.name.split('::')
+ if modules.length > 2
+ o.service_name = modules[modules.length - 2]
else
- modules = o.name.split('::')
- if modules.length > 2
- o.service_name = modules[modules.length - 2]
- else
- o.service_name = modules.first
- end
+ o.service_name = modules.first
end
end
end
diff --git a/src/ruby/lib/grpc/logconfig.rb b/src/ruby/lib/grpc/logconfig.rb
index 6442f23e89..cf280feae6 100644
--- a/src/ruby/lib/grpc/logconfig.rb
+++ b/src/ruby/lib/grpc/logconfig.rb
@@ -35,6 +35,6 @@ Logging.logger.root.appenders = Logging.appenders.stdout
Logging.logger.root.level = :info
# TODO: provide command-line configuration for logging
-Logging.logger['Google::RPC'].level = :debug
-Logging.logger['Google::RPC::ActiveCall'].level = :info
-Logging.logger['Google::RPC::BidiCall'].level = :info
+Logging.logger['GRPC'].level = :debug
+Logging.logger['GRPC::ActiveCall'].level = :info
+Logging.logger['GRPC::BidiCall'].level = :info
diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb
index dd526e583a..0107bc4ada 100644
--- a/src/ruby/lib/grpc/version.rb
+++ b/src/ruby/lib/grpc/version.rb
@@ -27,9 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-module Google
- # Google::RPC contains the General RPC module.
- module RPC
- VERSION = '0.0.1'
- end
+# GRPC contains the General RPC module.
+module GRPC
+ VERSION = '0.0.1'
end
diff --git a/src/ruby/spec/auth/compute_engine_spec.rb b/src/ruby/spec/auth/compute_engine_spec.rb
index 9e0b4660fa..c43214d086 100644
--- a/src/ruby/spec/auth/compute_engine_spec.rb
+++ b/src/ruby/spec/auth/compute_engine_spec.rb
@@ -36,9 +36,9 @@ require 'faraday'
require 'grpc/auth/compute_engine'
require 'spec_helper'
-describe Google::RPC::Auth::GCECredentials do
+describe GRPC::Auth::GCECredentials do
MD_URI = '/computeMetadata/v1/instance/service-accounts/default/token'
- GCECredentials = Google::RPC::Auth::GCECredentials
+ GCECredentials = GRPC::Auth::GCECredentials
before(:example) do
@client = GCECredentials.new
diff --git a/src/ruby/spec/auth/service_account_spec.rb b/src/ruby/spec/auth/service_account_spec.rb
index cbc6a73ac2..2f14a1ae05 100644
--- a/src/ruby/spec/auth/service_account_spec.rb
+++ b/src/ruby/spec/auth/service_account_spec.rb
@@ -38,7 +38,7 @@ require 'multi_json'
require 'openssl'
require 'spec_helper'
-describe Google::RPC::Auth::ServiceAccountCredentials do
+describe GRPC::Auth::ServiceAccountCredentials do
before(:example) do
@key = OpenSSL::PKey::RSA.new(2048)
cred_json = {
@@ -49,7 +49,7 @@ describe Google::RPC::Auth::ServiceAccountCredentials do
type: 'service_account'
}
cred_json_text = MultiJson.dump(cred_json)
- @client = Google::RPC::Auth::ServiceAccountCredentials.new(
+ @client = GRPC::Auth::ServiceAccountCredentials.new(
'https://www.googleapis.com/auth/userinfo.profile',
StringIO.new(cred_json_text))
end