aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby')
-rwxr-xr-xsrc/ruby/pb/test/server.rb65
-rw-r--r--src/ruby/qps/qps-common.rb16
-rw-r--r--src/ruby/qps/server.rb11
3 files changed, 43 insertions, 49 deletions
diff --git a/src/ruby/pb/test/server.rb b/src/ruby/pb/test/server.rb
index 0808121661..3f1e0a1ccf 100755
--- a/src/ruby/pb/test/server.rb
+++ b/src/ruby/pb/test/server.rb
@@ -129,27 +129,36 @@ def nulls(l)
[].pack('x' * l).force_encoding('ascii-8bit')
end
-# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item.
-class EnumeratorQueue
- extend Forwardable
- def_delegators :@q, :push
-
- def initialize(sentinel)
- @q = Queue.new
- @sentinel = sentinel
- end
+# A FullDuplexEnumerator passes requests to a block and yields generated responses
+class FullDuplexEnumerator
+ include Grpc::Testing
+ include Grpc::Testing::PayloadType
+ def initialize(requests)
+ @requests = requests
+ end
def each_item
return enum_for(:each_item) unless block_given?
- loop do
- r = @q.pop
- break if r.equal?(@sentinel)
- fail r if r.is_a? Exception
- yield r
+ GRPC.logger.info('interop-server: started receiving')
+ begin
+ cls = StreamingOutputCallResponse
+ @requests.each do |req|
+ req.response_parameters.each do |params|
+ resp_size = params.size
+ GRPC.logger.info("read a req, response size is #{resp_size}")
+ yield cls.new(payload: Payload.new(type: req.response_type,
+ body: nulls(resp_size)))
+ end
+ end
+ GRPC.logger.info('interop-server: finished receiving')
+ rescue StandardError => e
+ GRPC.logger.info('interop-server: failed')
+ GRPC.logger.warn(e)
+ fail e
end
end
end
-
+
# A runnable implementation of the schema-specified testing service, with each
# service method implemented as required by the interop testing spec.
class TestTarget < Grpc::Testing::TestService::Service
@@ -182,31 +191,9 @@ class TestTarget < Grpc::Testing::TestService::Service
def full_duplex_call(reqs)
# reqs is a lazy Enumerator of the requests sent by the client.
- q = EnumeratorQueue.new(self)
- cls = StreamingOutputCallResponse
- Thread.new do
- begin
- GRPC.logger.info('interop-server: started receiving')
- reqs.each do |req|
- req.response_parameters.each do |params|
- resp_size = params.size
- GRPC.logger.info("read a req, response size is #{resp_size}")
- resp = cls.new(payload: Payload.new(type: req.response_type,
- body: nulls(resp_size)))
- q.push(resp)
- end
- end
- GRPC.logger.info('interop-server: finished receiving')
- q.push(self)
- rescue StandardError => e
- GRPC.logger.info('interop-server: failed')
- GRPC.logger.warn(e)
- q.push(e) # share the exception with the enumerator
- end
- end
- q.each_item
+ FullDuplexEnumerator.new(reqs).each_item
end
-
+
def half_duplex_call(reqs)
# TODO: update with unique behaviour of the half_duplex_call if that's
# ever required by any of the tests.
diff --git a/src/ruby/qps/qps-common.rb b/src/ruby/qps/qps-common.rb
index 4119d600b1..4714ccfdb7 100644
--- a/src/ruby/qps/qps-common.rb
+++ b/src/ruby/qps/qps-common.rb
@@ -52,6 +52,7 @@ def load_test_certs
files.map { |f| File.open(File.join(data_dir, f)).read }
end
+
# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item.
class EnumeratorQueue
extend Forwardable
@@ -73,4 +74,19 @@ class EnumeratorQueue
end
end
+# A PingPongEnumerator reads requests and responds one-by-one when enumerated
+# via #each_item
+class PingPongEnumerator
+ def initialize(reqs)
+ @reqs = reqs
+ end
+ def each_item
+ return enum_for(:each_item) unless block_given?
+ sr = Grpc::Testing::SimpleResponse
+ pl = Grpc::Testing::Payload
+ @reqs.each do |req|
+ yield sr.new(payload: pl.new(body: nulls(req.response_size)))
+ end
+ end
+end
diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb
index f51f86d9a9..d0c2073dd1 100644
--- a/src/ruby/qps/server.rb
+++ b/src/ruby/qps/server.rb
@@ -49,16 +49,7 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service
sr.new(payload: pl.new(body: nulls(req.response_size)))
end
def streaming_call(reqs)
- q = EnumeratorQueue.new(self)
- Thread.new {
- sr = Grpc::Testing::SimpleResponse
- pl = Grpc::Testing::Payload
- reqs.each do |req|
- q.push(sr.new(payload: pl.new(body: nulls(req.response_size))))
- end
- q.push(self)
- }
- q.each_item
+ PingPongEnumerator.new(reqs).each_item
end
end