aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/lib/grpc/generic/bidi_call.rb
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/lib/grpc/generic/bidi_call.rb')
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb15
1 files changed, 13 insertions, 2 deletions
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index c2ac3c4daf..196f84f65f 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -56,7 +56,8 @@ module GRPC
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param metadata_received [true|false] indicates if metadata has already
# been received. Should always be true for server calls
- def initialize(call, marshal, unmarshal, metadata_received: false)
+ def initialize(call, marshal, unmarshal, metadata_received: false,
+ req_view: nil)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
@call = call
@marshal = marshal
@@ -68,6 +69,7 @@ module GRPC
@writes_complete = false
@complete = false
@done_mutex = Mutex.new
+ @req_view = req_view
end
# Begins orchestration of the Bidi stream for a client sending requests.
@@ -97,7 +99,15 @@ module GRPC
#
# @param gen_each_reply [Proc] generates the BiDi stream replies.
def run_on_server(gen_each_reply)
- replys = gen_each_reply.call(each_queued_msg)
+ # Pass in the optional call object parameter if possible
+ if gen_each_reply.arity == 1
+ replys = gen_each_reply.call(each_queued_msg)
+ elsif gen_each_reply.arity == 2
+ replys = gen_each_reply.call(each_queued_msg, @req_view)
+ else
+ fail 'Illegal arity of reply generator'
+ end
+
@loop_th = start_read_loop(is_client: false)
write_loop(replys, is_client: false)
end
@@ -162,6 +172,7 @@ module GRPC
payload = @marshal.call(req)
# Fails if status already received
begin
+ @req_view.send_initial_metadata unless @req_view.nil?
@call.run_batch(SEND_MESSAGE => payload)
rescue GRPC::Core::CallError => e
# This is almost definitely caused by a status arriving while still