aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rwxr-xr-xexamples/ruby/route_guide/route_guide_server.rb65
-rwxr-xr-xsrc/ruby/pb/test/server.rb65
-rw-r--r--src/ruby/qps/qps-common.rb16
-rw-r--r--src/ruby/qps/server.rb11
4 files changed, 67 insertions, 90 deletions
diff --git a/examples/ruby/route_guide/route_guide_server.rb b/examples/ruby/route_guide/route_guide_server.rb
index a5a73a8bac..41b9174b1d 100755
--- a/examples/ruby/route_guide/route_guide_server.rb
+++ b/examples/ruby/route_guide/route_guide_server.rb
@@ -100,28 +100,6 @@ class RectangleEnum
end
end
-# A EnumeratorQueue wraps a Queue to yield the items added to it.
-class EnumeratorQueue
- extend Forwardable
- def_delegators :@q, :push
-
- def initialize(sentinel)
- @q = Queue.new
- @sentinel = sentinel
- @received_notes = {}
- end
-
- def each_item
- return enum_for(:each_item) unless block_given?
- loop do
- r = @q.pop
- break if r.equal?(@sentinel)
- fail r if r.is_a? Exception
- yield r
- end
- end
-end
-
# ServerImpl provides an implementation of the RouteGuide service.
class ServerImpl < RouteGuide::Service
# @param [Hash] feature_db {location => name}
@@ -166,28 +144,33 @@ class ServerImpl < RouteGuide::Service
end
def route_chat(notes)
- q = EnumeratorQueue.new(self)
- # run a separate thread that processes the incoming requests
- t = Thread.new do
- begin
- notes.each do |n|
- key = {
- 'latitude' => n.location.latitude,
- 'longitude' => n.location.longitude
- }
- earlier_msgs = @received_notes[key]
- @received_notes[key] << n.message
- # send back the earlier messages at this point
- earlier_msgs.each do |r|
- q.push(RouteNote.new(location: n.location, message: r))
- end
+ RouteChatEnumerator.new(notes, @received_notes).each_item
+ end
+end
+
+class RouteChatEnumerator
+ def initialize(notes, received_notes)
+ @notes = notes
+ @received_notes = received_notes
+ end
+ def each_item
+ return enum_for(:each_item) unless block_given?
+ begin
+ @notes.each do |n|
+ key = {
+ 'latitude' => n.location.latitude,
+ 'longitude' => n.location.longitude
+ }
+ earlier_msgs = @received_notes[key]
+ @received_notes[key] << n.message
+ # send back the earlier messages at this point
+ earlier_msgs.each do |r|
+ yield RouteNote.new(location: n.location, message: r)
end
- q.push(self) # signal completion
- rescue StandardError => e
- q.push(e) # signal completion via an error
end
+ rescue StandardError => e
+ fail e # signal completion via an error
end
- q.each_item
end
end
diff --git a/src/ruby/pb/test/server.rb b/src/ruby/pb/test/server.rb
index 0808121661..3f1e0a1ccf 100755
--- a/src/ruby/pb/test/server.rb
+++ b/src/ruby/pb/test/server.rb
@@ -129,27 +129,36 @@ def nulls(l)
[].pack('x' * l).force_encoding('ascii-8bit')
end
-# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item.
-class EnumeratorQueue
- extend Forwardable
- def_delegators :@q, :push
-
- def initialize(sentinel)
- @q = Queue.new
- @sentinel = sentinel
- end
+# A FullDuplexEnumerator passes requests to a block and yields generated responses
+class FullDuplexEnumerator
+ include Grpc::Testing
+ include Grpc::Testing::PayloadType
+ def initialize(requests)
+ @requests = requests
+ end
def each_item
return enum_for(:each_item) unless block_given?
- loop do
- r = @q.pop
- break if r.equal?(@sentinel)
- fail r if r.is_a? Exception
- yield r
+ GRPC.logger.info('interop-server: started receiving')
+ begin
+ cls = StreamingOutputCallResponse
+ @requests.each do |req|
+ req.response_parameters.each do |params|
+ resp_size = params.size
+ GRPC.logger.info("read a req, response size is #{resp_size}")
+ yield cls.new(payload: Payload.new(type: req.response_type,
+ body: nulls(resp_size)))
+ end
+ end
+ GRPC.logger.info('interop-server: finished receiving')
+ rescue StandardError => e
+ GRPC.logger.info('interop-server: failed')
+ GRPC.logger.warn(e)
+ fail e
end
end
end
-
+
# A runnable implementation of the schema-specified testing service, with each
# service method implemented as required by the interop testing spec.
class TestTarget < Grpc::Testing::TestService::Service
@@ -182,31 +191,9 @@ class TestTarget < Grpc::Testing::TestService::Service
def full_duplex_call(reqs)
# reqs is a lazy Enumerator of the requests sent by the client.
- q = EnumeratorQueue.new(self)
- cls = StreamingOutputCallResponse
- Thread.new do
- begin
- GRPC.logger.info('interop-server: started receiving')
- reqs.each do |req|
- req.response_parameters.each do |params|
- resp_size = params.size
- GRPC.logger.info("read a req, response size is #{resp_size}")
- resp = cls.new(payload: Payload.new(type: req.response_type,
- body: nulls(resp_size)))
- q.push(resp)
- end
- end
- GRPC.logger.info('interop-server: finished receiving')
- q.push(self)
- rescue StandardError => e
- GRPC.logger.info('interop-server: failed')
- GRPC.logger.warn(e)
- q.push(e) # share the exception with the enumerator
- end
- end
- q.each_item
+ FullDuplexEnumerator.new(reqs).each_item
end
-
+
def half_duplex_call(reqs)
# TODO: update with unique behaviour of the half_duplex_call if that's
# ever required by any of the tests.
diff --git a/src/ruby/qps/qps-common.rb b/src/ruby/qps/qps-common.rb
index 4119d600b1..4714ccfdb7 100644
--- a/src/ruby/qps/qps-common.rb
+++ b/src/ruby/qps/qps-common.rb
@@ -52,6 +52,7 @@ def load_test_certs
files.map { |f| File.open(File.join(data_dir, f)).read }
end
+
# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item.
class EnumeratorQueue
extend Forwardable
@@ -73,4 +74,19 @@ class EnumeratorQueue
end
end
+# A PingPongEnumerator reads requests and responds one-by-one when enumerated
+# via #each_item
+class PingPongEnumerator
+ def initialize(reqs)
+ @reqs = reqs
+ end
+ def each_item
+ return enum_for(:each_item) unless block_given?
+ sr = Grpc::Testing::SimpleResponse
+ pl = Grpc::Testing::Payload
+ @reqs.each do |req|
+ yield sr.new(payload: pl.new(body: nulls(req.response_size)))
+ end
+ end
+end
diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb
index f51f86d9a9..d0c2073dd1 100644
--- a/src/ruby/qps/server.rb
+++ b/src/ruby/qps/server.rb
@@ -49,16 +49,7 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service
sr.new(payload: pl.new(body: nulls(req.response_size)))
end
def streaming_call(reqs)
- q = EnumeratorQueue.new(self)
- Thread.new {
- sr = Grpc::Testing::SimpleResponse
- pl = Grpc::Testing::Payload
- reqs.each do |req|
- q.push(sr.new(payload: pl.new(body: nulls(req.response_size))))
- end
- q.push(self)
- }
- q.each_item
+ PingPongEnumerator.new(reqs).each_item
end
end