aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Tim Emiola <temiola@google.com>2015-11-10 15:41:41 -0800
committerGravatar Tim Emiola <temiola@google.com>2015-11-10 15:41:41 -0800
commit24fdc179c149ba5323e35708dfa2fd87515264f5 (patch)
treea081959b27eb81de6634295fe8f10bbdc8dc6e1c
parentb48236a2b4499407ede030c9b9175f84e9797977 (diff)
Ensures that bidi calls obtain metadata.
Fixes an omission from earlier PRs that adds support metadata.
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb10
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb20
2 files changed, 20 insertions, 10 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index d9cb924735..e80d24edc9 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -199,11 +199,7 @@ module GRPC
# marshalled.
def remote_send(req, marshalled = false)
GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
- if marshalled
- payload = req
- else
- payload = @marshal.call(req)
- end
+ payload = marshalled ? req : @marshal.call(req)
@call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload)
end
@@ -417,7 +413,9 @@ module GRPC
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started
- bd = BidiCall.new(@call, @cq, @marshal, @unmarshal)
+ bd = BidiCall.new(@call, @cq, @marshal, @unmarshal,
+ metadata_tag: @metadata_tag)
+ @metadata_tag = nil # run_on_client ensures metadata is read
bd.run_on_client(requests, @op_notifier, &blk)
end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 9dbbb74caf..6b9b785693 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -56,7 +56,8 @@ module GRPC
# the call
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
- def initialize(call, q, marshal, unmarshal)
+ # @param metadata_tag [Object] tag object used to collect metadata
+ def initialize(call, q, marshal, unmarshal, metadata_tag: nil)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
@@ -67,6 +68,7 @@ module GRPC
@op_notifier = nil # signals completion on clients
@readq = Queue.new
@unmarshal = unmarshal
+ @metadata_tag = metadata_tag
end
# Begins orchestration of the Bidi stream for a client sending requests.
@@ -113,6 +115,18 @@ module GRPC
@op_notifier.notify(self)
end
+ # performs a read using @call.run_batch, ensures metadata is set up
+ def read_using_run_batch
+ ops = { RECV_MESSAGE => nil }
+ ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil?
+ batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+ unless @metadata_tag.nil?
+ @call.metadata = batch_result.metadata
+ @metadata_tag = nil
+ end
+ batch_result
+ end
+
# each_queued_msg yields each message on this instances readq
#
# - messages are added to the readq by #read_loop
@@ -169,9 +183,7 @@ module GRPC
loop do
GRPC.logger.debug("bidi-read-loop: #{count}")
count += 1
- # TODO: ensure metadata is read if available, currently it's not
- batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
- RECV_MESSAGE => nil)
+ batch_result = read_using_run_batch
# handle the next message
if batch_result.message.nil?