diff options
author | Tim Emiola <temiola@google.com> | 2015-11-10 15:41:41 -0800 |
---|---|---|
committer | Tim Emiola <temiola@google.com> | 2015-11-10 15:41:41 -0800 |
commit | 24fdc179c149ba5323e35708dfa2fd87515264f5 (patch) | |
tree | a081959b27eb81de6634295fe8f10bbdc8dc6e1c | |
parent | b48236a2b4499407ede030c9b9175f84e9797977 (diff) |
Ensures that bidi calls obtain metadata.
Fixes an omission from earlier PRs that adds support metadata.
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 10 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/bidi_call.rb | 20 |
2 files changed, 20 insertions, 10 deletions
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index d9cb924735..e80d24edc9 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -199,11 +199,7 @@ module GRPC # marshalled. def remote_send(req, marshalled = false) GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}") - if marshalled - payload = req - else - payload = @marshal.call(req) - end + payload = marshalled ? req : @marshal.call(req) @call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload) end @@ -417,7 +413,9 @@ module GRPC # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, **kw, &blk) start_call(**kw) unless @started - bd = BidiCall.new(@call, @cq, @marshal, @unmarshal) + bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, + metadata_tag: @metadata_tag) + @metadata_tag = nil # run_on_client ensures metadata is read bd.run_on_client(requests, @op_notifier, &blk) end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 9dbbb74caf..6b9b785693 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -56,7 +56,8 @@ module GRPC # the call # @param marshal [Function] f(obj)->string that marshal requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - def initialize(call, q, marshal, unmarshal) + # @param metadata_tag [Object] tag object used to collect metadata + def initialize(call, q, marshal, unmarshal, metadata_tag: nil) fail(ArgumentError, 'not a call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue fail(ArgumentError, 'not a CompletionQueue') @@ -67,6 +68,7 @@ module GRPC @op_notifier = nil # signals completion on clients @readq = Queue.new @unmarshal = unmarshal + @metadata_tag = metadata_tag end # Begins orchestration of the Bidi stream for a client sending requests. @@ -113,6 +115,18 @@ module GRPC @op_notifier.notify(self) end + # performs a read using @call.run_batch, ensures metadata is set up + def read_using_run_batch + ops = { RECV_MESSAGE => nil } + ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil? + batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) + unless @metadata_tag.nil? + @call.metadata = batch_result.metadata + @metadata_tag = nil + end + batch_result + end + # each_queued_msg yields each message on this instances readq # # - messages are added to the readq by #read_loop @@ -169,9 +183,7 @@ module GRPC loop do GRPC.logger.debug("bidi-read-loop: #{count}") count += 1 - # TODO: ensure metadata is read if available, currently it's not - batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, - RECV_MESSAGE => nil) + batch_result = read_using_run_batch # handle the next message if batch_result.message.nil? |