From d9892bdd8032485919067748c5eedcf52f56240f Mon Sep 17 00:00:00 2001 From: Alex Polcyn Date: Mon, 4 Jul 2016 00:52:39 -0700 Subject: Moved sending of initial metadata from server into server handler methods --- src/ruby/lib/grpc/generic/active_call.rb | 47 ++++- src/ruby/lib/grpc/generic/bidi_call.rb | 16 +- src/ruby/lib/grpc/generic/rpc_desc.rb | 9 +- src/ruby/lib/grpc/generic/rpc_server.rb | 23 ++- src/ruby/spec/generic/active_call_spec.rb | 329 +++++++++++++++++++++++++++++- src/ruby/spec/generic/rpc_desc_spec.rb | 14 +- 6 files changed, 412 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 4260d85437..d43a9e7a4b 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -58,7 +58,7 @@ module GRPC include Core::TimeConsts include Core::CallOps extend Forwardable - attr_reader(:deadline) + attr_reader :deadline, :metadata_sent, :metadata_to_send def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, :peer, :peer_cert, :trailing_metadata @@ -101,7 +101,7 @@ module GRPC # @param metadata_received [true|false] indicates if metadata has already # been received. Should always be true for server calls def initialize(call, marshal, unmarshal, deadline, started: true, - metadata_received: false) + metadata_received: false, metadata_to_send: nil) fail(TypeError, '!Core::Call') unless call.is_a? Core::Call @call = call @deadline = deadline @@ -110,6 +110,14 @@ module GRPC @metadata_received = metadata_received @metadata_sent = started @op_notifier = nil + + fail(ArgumentError, 'Already sent md') if started && metadata_to_send + @metadata_to_send = metadata_to_send || {} unless started + end + + def send_initial_metadata + fail 'Already sent metadata' if @metadata_sent + start_call(@metadata_to_send) end # output_metadata are provides access to hash that can be used to @@ -187,7 +195,7 @@ module GRPC # @param marshalled [false, true] indicates if the object is already # marshalled. def remote_send(req, marshalled = false) - # TODO(murgatroid99): ensure metadata was sent + start_call(@metadata_to_send) unless @metadata_sent GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}") payload = marshalled ? req : @marshal.call(req) @call.run_batch(SEND_MESSAGE => payload) @@ -203,6 +211,7 @@ module GRPC # list, mulitple metadata for its key are sent def send_status(code = OK, details = '', assert_finished = false, metadata: {}) + start_call unless @metadata_sent ops = { SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata) } @@ -392,9 +401,12 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, metadata: {}, &blk) - start_call(metadata) - bd = BidiCall.new(@call, @marshal, @unmarshal, + start_call(metadata) unless @metadata_sent + bd = BidiCall.new(@call, + @marshal, + @unmarshal, metadata_received: @metadata_received) + bd.run_on_client(requests, @op_notifier, &blk) end @@ -410,8 +422,12 @@ module GRPC # # @param gen_each_reply [Proc] generates the BiDi stream replies def run_server_bidi(gen_each_reply) - bd = BidiCall.new(@call, @marshal, @unmarshal, - metadata_received: @metadata_received) + bd = BidiCall.new(@call, + @marshal, + @unmarshal, + metadata_received: @metadata_received, + req_view: MultiReqView.new(self)) + bd.run_on_server(gen_each_reply) end @@ -428,6 +444,11 @@ module GRPC @op_notifier.notify(self) end + def merge_metadata_to_send(new_metadata = {}) + fail('cant change metadata after already sent') if @metadata_sent + @metadata_to_send.merge!(new_metadata) + end + private # Starts the call if not already started @@ -454,12 +475,20 @@ module GRPC # SingleReqView limits access to an ActiveCall's methods for use in server # handlers that receive just one request. SingleReqView = view_class(:cancelled?, :deadline, :metadata, - :output_metadata, :peer, :peer_cert) + :output_metadata, :peer, :peer_cert, + :send_initial_metadata, + :metadata_to_send, + :merge_metadata_to_send, + :metadata_sent) # MultiReqView limits access to an ActiveCall's methods for use in # server client_streamer handlers. MultiReqView = view_class(:cancelled?, :deadline, :each_queued_msg, - :each_remote_read, :metadata, :output_metadata) + :each_remote_read, :metadata, :output_metadata, + :send_initial_metadata, + :metadata_to_send, + :merge_metadata_to_send, + :metadata_sent) # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 425dc3e519..0b6ef4918c 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -56,7 +56,8 @@ module GRPC # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param metadata_received [true|false] indicates if metadata has already # been received. Should always be true for server calls - def initialize(call, marshal, unmarshal, metadata_received: false) + def initialize(call, marshal, unmarshal, metadata_received: false, + req_view: nil) fail(ArgumentError, 'not a call') unless call.is_a? Core::Call @call = call @marshal = marshal @@ -68,6 +69,7 @@ module GRPC @writes_complete = false @complete = false @done_mutex = Mutex.new + @req_view = req_view end # Begins orchestration of the Bidi stream for a client sending requests. @@ -97,7 +99,15 @@ module GRPC # # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) - replys = gen_each_reply.call(each_queued_msg) + # Pass in the optional call object parameter if possible + if gen_each_reply.arity == 1 + replys = gen_each_reply.call(each_queued_msg) + elsif gen_each_reply.arity == 2 + replys = gen_each_reply.call(each_queued_msg, @req_view) + else + fail 'Illegal arity of reply generator' + end + @loop_th = start_read_loop(is_client: false) write_loop(replys, is_client: false) end @@ -162,6 +172,8 @@ module GRPC payload = @marshal.call(req) # Fails if status already received begin + @req_view.send_initial_metadata unless + @req_view.nil? || @req_view.metadata_sent @call.run_batch(SEND_MESSAGE => payload) rescue GRPC::Core::CallError => e # This is almost definitely caused by a status arriving while still diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 913f55d0d3..584fe78169 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -104,7 +104,14 @@ module GRPC end def assert_arity_matches(mth) - if request_response? || server_streamer? + # A bidi handler function can optionally be passed a second + # call object parameter for access to metadata, cancelling, etc. + if bidi_streamer? + if mth.arity != 2 && mth.arity != 1 + fail arity_error(mth, 2, "should be #{mth.name}(req, call) or " \ + "#{mth.name}(req)") + end + elsif request_response? || server_streamer? if mth.arity != 2 fail arity_error(mth, 2, "should be #{mth.name}(req, call)") end diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index c92a532a50..b1d30b8e38 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -339,8 +339,11 @@ module GRPC return an_rpc if @pool.jobs_waiting <= @max_waiting_requests GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") noop = proc { |x| x } + + # Create a new active call that knows that metadata hasn't been + # sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, - metadata_received: true) + metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '') nil end @@ -351,8 +354,11 @@ module GRPC return an_rpc if rpc_descs.key?(mth) GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") noop = proc { |x| x } + + # Create a new active call that knows that + # metadata hasn't been sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, - metadata_received: true) + metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') nil end @@ -400,17 +406,20 @@ module GRPC unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) end - an_rpc.call.run_batch(SEND_INITIAL_METADATA => connect_md) return nil unless available?(an_rpc) return nil unless implemented?(an_rpc) - # Create the ActiveCall + # Create the ActiveCall. Indicate that metadata hasnt been sent yet. GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] - c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc, - rpc_desc.unmarshal_proc(:input), an_rpc.deadline, - metadata_received: true) + c = ActiveCall.new(an_rpc.call, + rpc_desc.marshal_proc, + rpc_desc.unmarshal_proc(:input), + an_rpc.deadline, + metadata_received: true, + started: false, + metadata_to_send: connect_md) mth = an_rpc.method.to_sym [c, mth] end diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 018580e0df..0c72be9a98 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -60,8 +60,10 @@ describe GRPC::ActiveCall do end describe '#multi_req_view' do - it 'exposes a fixed subset of the ActiveCall methods' do - want = %w(cancelled?, deadline, each_remote_read, metadata, shutdown) + it 'exposes a fixed subset of the ActiveCall.methods' do + want = %w(cancelled?, deadline, each_remote_read, metadata, \ + shutdown, peer, peer_cert, send_initial_metadata, \ + initial_metadata_sent) v = @client_call.multi_req_view want.each do |w| expect(v.methods.include?(w)) @@ -70,8 +72,10 @@ describe GRPC::ActiveCall do end describe '#single_req_view' do - it 'exposes a fixed subset of the ActiveCall methods' do - want = %w(cancelled?, deadline, metadata, shutdown) + it 'exposes a fixed subset of the ActiveCall.methods' do + want = %w(cancelled?, deadline, metadata, shutdown, \ + send_initial_metadata, metadata_to_send, \ + merge_metadata_to_send, initial_metadata_sent) v = @client_call.single_req_view want.each do |w| expect(v.methods.include?(w)) @@ -149,6 +153,158 @@ describe GRPC::ActiveCall do end end + describe 'sending initial metadata', send_initial_metadata: true do + it 'sends metadata before sending a message if it hasnt been sent yet' do + call = make_test_call + @client_call = ActiveCall.new( + call, + @pass_through, + @pass_through, + deadline, + started: false) + + metadata = { key: 'dummy_val', other: 'other_val' } + expect(@client_call.metadata_sent).to eq(false) + @client_call.merge_metadata_to_send(metadata) + + message = 'dummy message' + + expect(call).to( + receive(:run_batch) + .with( + hash_including( + CallOps::SEND_INITIAL_METADATA => metadata)).once) + + expect(call).to( + receive(:run_batch).with(hash_including( + CallOps::SEND_MESSAGE => message)).once) + @client_call.remote_send(message) + + expect(@client_call.metadata_sent).to eq(true) + end + + it 'doesnt send metadata if it thinks its already been sent' do + call = make_test_call + + @client_call = ActiveCall.new(call, + @pass_through, + @pass_through, + deadline) + + expect(@client_call.metadata_sent).to eql(true) + expect(call).to( + receive(:run_batch).with(hash_including( + CallOps::SEND_INITIAL_METADATA)).never) + + @client_call.remote_send('test message') + end + + it 'sends metadata if it is explicitly sent and ok to do so' do + call = make_test_call + + @client_call = ActiveCall.new(call, + @pass_through, + @pass_through, + deadline, + started: false) + + expect(@client_call.metadata_sent).to eql(false) + + metadata = { test_key: 'val' } + @client_call.merge_metadata_to_send(metadata) + expect(@client_call.metadata_to_send).to eq(metadata) + + expect(call).to( + receive(:run_batch).with(hash_including( + CallOps::SEND_INITIAL_METADATA => + metadata)).once) + @client_call.send_initial_metadata + end + + it 'explicit sending fails if metadata has already been sent' do + call = make_test_call + + @client_call = ActiveCall.new(call, + @pass_through, + @pass_through, + deadline) + + expect(@client_call.metadata_sent).to eql(true) + + blk = proc do + @client_call.send_initial_metadata + end + + expect { blk.call }.to raise_error + end + end + + describe '#merge_metadata_to_send', merge_metadata_to_send: true do + it 'adds to existing metadata when there is existing metadata to send' do + call = make_test_call + starting_metadata = { k1: 'key1_val', k2: 'key2_val' } + @client_call = ActiveCall.new( + call, + @pass_through, @pass_through, + deadline, + started: false, + metadata_to_send: starting_metadata) + + expect(@client_call.metadata_to_send).to eq(starting_metadata) + + @client_call.merge_metadata_to_send( + k3: 'key3_val', + k4: 'key4_val') + + expected_md_to_send = { + k1: 'key1_val', + k2: 'key2_val', + k3: 'key3_val', + k4: 'key4_val' } + + expect(@client_call.metadata_to_send).to eq(expected_md_to_send) + + @client_call.merge_metadata_to_send(k5: 'key5_val') + expected_md_to_send.merge!(k5: 'key5_val') + expect(@client_call.metadata_to_send).to eq(expected_md_to_send) + end + + it 'overrides existing metadata if adding metadata with an existing key' do + call = make_test_call + starting_metadata = { k1: 'key1_val', k2: 'key2_val' } + @client_call = ActiveCall.new( + call, + @pass_through, + @pass_through, + deadline, + started: false, + metadata_to_send: starting_metadata) + + expect(@client_call.metadata_to_send).to eq(starting_metadata) + @client_call.merge_metadata_to_send(k1: 'key1_new_val') + expect(@client_call.metadata_to_send).to eq(k1: 'key1_new_val', + k2: 'key2_val') + end + + it 'fails when initial metadata has already been sent' do + call = make_test_call + @client_call = ActiveCall.new( + call, + @pass_through, + @pass_through, + deadline, + started: true) + + expect(@client_call.metadata_sent).to eq(true) + + blk = proc do + @client_call.merge_metadata_to_send(k1: 'key1_val') + end + + expect { blk.call }.to raise_error + end + end + describe '#client_invoke' do it 'sends metadata to the server when present' do call = make_test_call @@ -163,7 +319,26 @@ describe GRPC::ActiveCall do end end - describe '#remote_read' do + describe '#send_status', send_status: true do + it 'works when no metadata or messages have been sent yet' do + call = make_test_call + ActiveCall.client_invoke(call) + + recvd_rpc = @server.request_call + server_call = ActiveCall.new( + recvd_rpc.call, + @pass_through, + @pass_through, + deadline, + started: false) + + expect(server_call.metadata_sent).to eq(false) + blk = proc { server_call.send_status(OK) } + expect { blk.call }.to_not raise_error + end + end + + describe '#remote_read', remote_read: true do it 'reads the response sent by a server' do call = make_test_call ActiveCall.client_invoke(call) @@ -205,6 +380,31 @@ describe GRPC::ActiveCall do expect(client_call.metadata).to eq(expected) end + it 'get a status from server when nothing else sent from server' do + client_call = make_test_call + ActiveCall.client_invoke(client_call) + + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + + server_call = ActiveCall.new( + recvd_call, + @pass_through, + @pass_through, + deadline, + started: false) + + server_call.send_status(OK, 'OK') + + # Check that we can receive initial metadata and a status + client_call.run_batch( + CallOps::RECV_INITIAL_METADATA => nil) + batch_result = client_call.run_batch( + CallOps::RECV_STATUS_ON_CLIENT => nil) + + expect(batch_result.status.code).to eq(OK) + end + it 'get a nil msg before a status when an OK status is sent' do call = make_test_call ActiveCall.client_invoke(call) @@ -329,6 +529,125 @@ describe GRPC::ActiveCall do end end + # Test sending of the initial metadata in #run_server_bidi + # from the server handler both implicitly and explicitly, + # when the server handler function has one argument and two arguments + describe '#run_server_bidi sanity tests', run_server_bidi: true do + it 'sends the initial metadata implicitly if not already sent' do + requests = ['first message', 'second message'] + server_to_client_metadata = { 'test_key' => 'test_val' } + server_status = OK + + client_call = make_test_call + client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) + + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = ActiveCall.new(recvd_call, + @pass_through, + @pass_through, + deadline, + metadata_received: true, + started: false, + metadata_to_send: server_to_client_metadata) + + # Server handler that doesn't have access to a "call" + # It echoes the requests + fake_gen_each_reply_with_no_call_param = proc do |msgs| + msgs + end + + server_thread = Thread.new do + server_call.run_server_bidi( + fake_gen_each_reply_with_no_call_param) + server_call.send_status(server_status) + end + + # Send the requests and send a close so the server can send a status + requests.each do |message| + client_call.run_batch(CallOps::SEND_MESSAGE => message) + end + client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) + + server_thread.join + + # Expect that initial metadata was sent, + # the requests were echoed, and a status was sent + batch_result = client_call.run_batch( + CallOps::RECV_INITIAL_METADATA => nil) + expect(batch_result.metadata).to eq(server_to_client_metadata) + + requests.each do |message| + batch_result = client_call.run_batch( + CallOps::RECV_MESSAGE => nil) + expect(batch_result.message).to eq(message) + end + + batch_result = client_call.run_batch( + CallOps::RECV_STATUS_ON_CLIENT => nil) + expect(batch_result.status.code).to eq(server_status) + end + + it 'sends the metadata when sent explicitly and not already sent' do + requests = ['first message', 'second message'] + server_to_client_metadata = { 'test_key' => 'test_val' } + server_status = OK + + client_call = make_test_call + client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) + + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = ActiveCall.new(recvd_call, + @pass_through, + @pass_through, + deadline, + metadata_received: true, + started: false) + + # Fake server handler that has access to a "call" object and + # uses it to explicitly update and sent the initial metadata + fake_gen_each_reply_with_call_param = proc do |msgs, call_param| + call_param.merge_metadata_to_send(server_to_client_metadata) + call_param.send_initial_metadata + msgs + end + + server_thread = Thread.new do + server_call.run_server_bidi( + fake_gen_each_reply_with_call_param) + server_call.send_status(server_status) + end + + # Send requests and a close from the client so the server + # can send a status + requests.each do |message| + client_call.run_batch( + CallOps::SEND_MESSAGE => message) + end + client_call.run_batch( + CallOps::SEND_CLOSE_FROM_CLIENT => nil) + + server_thread.join + + # Verify that the correct metadata was sent, the requests + # were echoed, and the correct status was sent + batch_result = client_call.run_batch( + CallOps::RECV_INITIAL_METADATA => nil) + expect(batch_result.metadata).to eq(server_to_client_metadata) + + requests.each do |message| + batch_result = client_call.run_batch( + CallOps::RECV_MESSAGE => nil) + expect(batch_result.message).to eq(message) + end + + batch_result = client_call.run_batch( + CallOps::RECV_STATUS_ON_CLIENT => nil) + expect(batch_result.status.code).to eq(server_status) + end + end + def expect_server_to_receive(sent_text, **kw) c = expect_server_to_be_invoked(**kw) expect(c.remote_read).to eq(sent_text) diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index d2080b7ca2..1a895005bc 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -196,6 +196,9 @@ describe GRPC::RpcDesc do def fake_svstream(_arg1, _arg2) end + def fake_three_args(_arg1, _arg2, _arg3) + end + it 'raises when a request_response does not have 2 args' do [:fake_clstream, :no_arg].each do |mth| blk = proc do @@ -244,8 +247,8 @@ describe GRPC::RpcDesc do expect(&blk).to_not raise_error end - it 'raises when a bidi streamer does not have 1 arg' do - [:fake_svstream, :no_arg].each do |mth| + it 'raises when a bidi streamer does not have 1 or 2 args' do + [:fake_three_args, :no_arg].each do |mth| blk = proc do @bidi_streamer.assert_arity_matches(method(mth)) end @@ -259,6 +262,13 @@ describe GRPC::RpcDesc do end expect(&blk).to_not raise_error end + + it 'passes when a bidi streamer has 2 args' do + blk = proc do + @bidi_streamer.assert_arity_matches(method(:fake_svstream)) + end + expect(&blk).to_not raise_error + end end describe '#request_response?' do -- cgit v1.2.3 From a9bc030a3a9eb876c4ca00b40d68013ed91a8d66 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 11 Aug 2016 12:05:55 -0700 Subject: add mutex wrapper around sending and modifying of initial metadata --- src/ruby/lib/grpc/generic/active_call.rb | 37 ++++--- src/ruby/spec/generic/active_call_spec.rb | 159 ++++++++++-------------------- 2 files changed, 80 insertions(+), 116 deletions(-) (limited to 'src') diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index d43a9e7a4b..f9c41f0c0e 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -113,11 +113,17 @@ module GRPC fail(ArgumentError, 'Already sent md') if started && metadata_to_send @metadata_to_send = metadata_to_send || {} unless started + @send_initial_md_mutex = Mutex.new end + # Sends the initial metadata that has yet to be sent. + # Fails if metadata has already been sent for this call. def send_initial_metadata - fail 'Already sent metadata' if @metadata_sent - start_call(@metadata_to_send) + @send_initial_md_mutex.synchronize do + fail('Already send initial metadata') if @metadata_sent + @metadata_tag = ActiveCall.client_invoke(@call, @metadata_to_send) + @metadata_sent = true + end end # output_metadata are provides access to hash that can be used to @@ -195,7 +201,7 @@ module GRPC # @param marshalled [false, true] indicates if the object is already # marshalled. def remote_send(req, marshalled = false) - start_call(@metadata_to_send) unless @metadata_sent + send_initial_metadata unless @metadata_sent GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}") payload = marshalled ? req : @marshal.call(req) @call.run_batch(SEND_MESSAGE => payload) @@ -211,7 +217,7 @@ module GRPC # list, mulitple metadata for its key are sent def send_status(code = OK, details = '', assert_finished = false, metadata: {}) - start_call unless @metadata_sent + send_initial_metadata unless @metadata_sent ops = { SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata) } @@ -312,7 +318,8 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def request_response(req, metadata: {}) - start_call(metadata) + merge_metadata_to_send(metadata) && + send_initial_metadata unless @metadata_sent remote_send(req) writes_done(false) response = remote_read @@ -336,7 +343,8 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def client_streamer(requests, metadata: {}) - start_call(metadata) + merge_metadata_to_send(metadata) && + send_initial_metadata unless @metadata_sent requests.each { |r| remote_send(r) } writes_done(false) response = remote_read @@ -362,7 +370,8 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator|nil] a response Enumerator def server_streamer(req, metadata: {}) - start_call(metadata) + merge_metadata_to_send(metadata) && + send_initial_metadata unless @metadata_sent remote_send(req) writes_done(false) replies = enum_for(:each_remote_read_then_finish) @@ -401,7 +410,8 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, metadata: {}, &blk) - start_call(metadata) unless @metadata_sent + merge_metadata_to_send(metadata) && + send_initial_metadata unless @metadata_sent bd = BidiCall.new(@call, @marshal, @unmarshal, @@ -444,9 +454,14 @@ module GRPC @op_notifier.notify(self) end + # Add to the metadata that will be sent from the server. + # Fails if metadata has already been sent. + # Unused by client calls. def merge_metadata_to_send(new_metadata = {}) - fail('cant change metadata after already sent') if @metadata_sent - @metadata_to_send.merge!(new_metadata) + @send_initial_md_mutex.synchronize do + fail('cant change metadata after already sent') if @metadata_sent + @metadata_to_send.merge!(new_metadata) + end end private @@ -456,7 +471,7 @@ module GRPC # a list, multiple metadata for its key are sent def start_call(metadata = {}) return if @metadata_sent - @metadata_tag = ActiveCall.client_invoke(@call, metadata) + merge_metadata_to_send(metadata) && send_initial_metadata @metadata_sent = true end diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 0c72be9a98..79f739e8fa 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -242,7 +242,12 @@ describe GRPC::ActiveCall do describe '#merge_metadata_to_send', merge_metadata_to_send: true do it 'adds to existing metadata when there is existing metadata to send' do call = make_test_call - starting_metadata = { k1: 'key1_val', k2: 'key2_val' } + starting_metadata = { + k1: 'key1_val', + k2: 'key2_val', + k3: 'key3_val' + } + @client_call = ActiveCall.new( call, @pass_through, @pass_through, @@ -253,13 +258,13 @@ describe GRPC::ActiveCall do expect(@client_call.metadata_to_send).to eq(starting_metadata) @client_call.merge_metadata_to_send( - k3: 'key3_val', + k3: 'key3_new_val', k4: 'key4_val') expected_md_to_send = { k1: 'key1_val', k2: 'key2_val', - k3: 'key3_val', + k3: 'key3_new_val', k4: 'key4_val' } expect(@client_call.metadata_to_send).to eq(expected_md_to_send) @@ -269,23 +274,6 @@ describe GRPC::ActiveCall do expect(@client_call.metadata_to_send).to eq(expected_md_to_send) end - it 'overrides existing metadata if adding metadata with an existing key' do - call = make_test_call - starting_metadata = { k1: 'key1_val', k2: 'key2_val' } - @client_call = ActiveCall.new( - call, - @pass_through, - @pass_through, - deadline, - started: false, - metadata_to_send: starting_metadata) - - expect(@client_call.metadata_to_send).to eq(starting_metadata) - @client_call.merge_metadata_to_send(k1: 'key1_new_val') - expect(@client_call.metadata_to_send).to eq(k1: 'key1_new_val', - k2: 'key2_val') - end - it 'fails when initial metadata has already been sent' do call = make_test_call @client_call = ActiveCall.new( @@ -530,121 +518,82 @@ describe GRPC::ActiveCall do end # Test sending of the initial metadata in #run_server_bidi - # from the server handler both implicitly and explicitly, - # when the server handler function has one argument and two arguments - describe '#run_server_bidi sanity tests', run_server_bidi: true do - it 'sends the initial metadata implicitly if not already sent' do - requests = ['first message', 'second message'] - server_to_client_metadata = { 'test_key' => 'test_val' } - server_status = OK + # from the server handler both implicitly and explicitly. + describe '#run_server_bidi metadata sending tests', run_server_bidi: true do + before(:each) do + @requests = ['first message', 'second message'] + @server_to_client_metadata = { 'test_key' => 'test_val' } + @server_status = OK - client_call = make_test_call - client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) + @client_call = make_test_call + @client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) recvd_rpc = @server.request_call recvd_call = recvd_rpc.call - server_call = ActiveCall.new(recvd_call, - @pass_through, - @pass_through, - deadline, - metadata_received: true, - started: false, - metadata_to_send: server_to_client_metadata) - - # Server handler that doesn't have access to a "call" - # It echoes the requests - fake_gen_each_reply_with_no_call_param = proc do |msgs| - msgs - end - - server_thread = Thread.new do - server_call.run_server_bidi( - fake_gen_each_reply_with_no_call_param) - server_call.send_status(server_status) - end + @server_call = ActiveCall.new( + recvd_call, + @pass_through, + @pass_through, + deadline, + metadata_received: true, + started: false, + metadata_to_send: @server_to_client_metadata) + end + after(:each) do # Send the requests and send a close so the server can send a status - requests.each do |message| - client_call.run_batch(CallOps::SEND_MESSAGE => message) + @requests.each do |message| + @client_call.run_batch(CallOps::SEND_MESSAGE => message) end - client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) + @client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) - server_thread.join + @server_thread.join # Expect that initial metadata was sent, # the requests were echoed, and a status was sent - batch_result = client_call.run_batch( + batch_result = @client_call.run_batch( CallOps::RECV_INITIAL_METADATA => nil) - expect(batch_result.metadata).to eq(server_to_client_metadata) + expect(batch_result.metadata).to eq(@server_to_client_metadata) - requests.each do |message| - batch_result = client_call.run_batch( + @requests.each do |message| + batch_result = @client_call.run_batch( CallOps::RECV_MESSAGE => nil) expect(batch_result.message).to eq(message) end - batch_result = client_call.run_batch( + batch_result = @client_call.run_batch( CallOps::RECV_STATUS_ON_CLIENT => nil) - expect(batch_result.status.code).to eq(server_status) + expect(batch_result.status.code).to eq(@server_status) end - it 'sends the metadata when sent explicitly and not already sent' do - requests = ['first message', 'second message'] - server_to_client_metadata = { 'test_key' => 'test_val' } - server_status = OK - - client_call = make_test_call - client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) + it 'sends the initial metadata implicitly if not already sent' do + # Server handler that doesn't have access to a "call" + # It echoes the requests + fake_gen_each_reply_with_no_call_param = proc do |msgs| + msgs + end - recvd_rpc = @server.request_call - recvd_call = recvd_rpc.call - server_call = ActiveCall.new(recvd_call, - @pass_through, - @pass_through, - deadline, - metadata_received: true, - started: false) + @server_thread = Thread.new do + @server_call.run_server_bidi( + fake_gen_each_reply_with_no_call_param) + @server_call.send_status(@server_status) + end + end + it 'sends the metadata when sent explicitly and not already sent' do # Fake server handler that has access to a "call" object and - # uses it to explicitly update and sent the initial metadata + # uses it to explicitly update and send the initial metadata fake_gen_each_reply_with_call_param = proc do |msgs, call_param| - call_param.merge_metadata_to_send(server_to_client_metadata) + call_param.merge_metadata_to_send(@server_to_client_metadata) call_param.send_initial_metadata msgs end - server_thread = Thread.new do - server_call.run_server_bidi( + @server_thread = Thread.new do + @server_call.run_server_bidi( fake_gen_each_reply_with_call_param) - server_call.send_status(server_status) - end - - # Send requests and a close from the client so the server - # can send a status - requests.each do |message| - client_call.run_batch( - CallOps::SEND_MESSAGE => message) - end - client_call.run_batch( - CallOps::SEND_CLOSE_FROM_CLIENT => nil) - - server_thread.join - - # Verify that the correct metadata was sent, the requests - # were echoed, and the correct status was sent - batch_result = client_call.run_batch( - CallOps::RECV_INITIAL_METADATA => nil) - expect(batch_result.metadata).to eq(server_to_client_metadata) - - requests.each do |message| - batch_result = client_call.run_batch( - CallOps::RECV_MESSAGE => nil) - expect(batch_result.message).to eq(message) + @server_call.send_status(@server_status) end - - batch_result = client_call.run_batch( - CallOps::RECV_STATUS_ON_CLIENT => nil) - expect(batch_result.status.code).to eq(server_status) end end -- cgit v1.2.3 From 17d5c071153b540ee502712a4b0521b911d148bf Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 11 Aug 2016 15:21:43 -0700 Subject: use sent flag only under mutex and dont fail in send_initial_metadata --- src/ruby/lib/grpc/generic/active_call.rb | 22 ++++++++-------------- src/ruby/lib/grpc/generic/bidi_call.rb | 3 +-- src/ruby/spec/generic/active_call_spec.rb | 4 ++-- 3 files changed, 11 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index f9c41f0c0e..23688dc924 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -117,10 +117,10 @@ module GRPC end # Sends the initial metadata that has yet to be sent. - # Fails if metadata has already been sent for this call. + # Does nothing if metadata has already been sent for this call. def send_initial_metadata @send_initial_md_mutex.synchronize do - fail('Already send initial metadata') if @metadata_sent + return if @metadata_sent @metadata_tag = ActiveCall.client_invoke(@call, @metadata_to_send) @metadata_sent = true end @@ -201,7 +201,7 @@ module GRPC # @param marshalled [false, true] indicates if the object is already # marshalled. def remote_send(req, marshalled = false) - send_initial_metadata unless @metadata_sent + send_initial_metadata GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}") payload = marshalled ? req : @marshal.call(req) @call.run_batch(SEND_MESSAGE => payload) @@ -217,7 +217,7 @@ module GRPC # list, mulitple metadata for its key are sent def send_status(code = OK, details = '', assert_finished = false, metadata: {}) - send_initial_metadata unless @metadata_sent + send_initial_metadata ops = { SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata) } @@ -318,8 +318,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def request_response(req, metadata: {}) - merge_metadata_to_send(metadata) && - send_initial_metadata unless @metadata_sent + merge_metadata_to_send(metadata) && send_initial_metadata remote_send(req) writes_done(false) response = remote_read @@ -343,8 +342,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def client_streamer(requests, metadata: {}) - merge_metadata_to_send(metadata) && - send_initial_metadata unless @metadata_sent + merge_metadata_to_send(metadata) && send_initial_metadata requests.each { |r| remote_send(r) } writes_done(false) response = remote_read @@ -370,8 +368,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator|nil] a response Enumerator def server_streamer(req, metadata: {}) - merge_metadata_to_send(metadata) && - send_initial_metadata unless @metadata_sent + merge_metadata_to_send(metadata) && send_initial_metadata remote_send(req) writes_done(false) replies = enum_for(:each_remote_read_then_finish) @@ -410,8 +407,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, metadata: {}, &blk) - merge_metadata_to_send(metadata) && - send_initial_metadata unless @metadata_sent + merge_metadata_to_send(metadata) && send_initial_metadata bd = BidiCall.new(@call, @marshal, @unmarshal, @@ -470,9 +466,7 @@ module GRPC # @param metadata [Hash] metadata to be sent to the server. If a value is # a list, multiple metadata for its key are sent def start_call(metadata = {}) - return if @metadata_sent merge_metadata_to_send(metadata) && send_initial_metadata - @metadata_sent = true end def self.view_class(*visible_methods) diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 0b6ef4918c..14905b721c 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -172,8 +172,7 @@ module GRPC payload = @marshal.call(req) # Fails if status already received begin - @req_view.send_initial_metadata unless - @req_view.nil? || @req_view.metadata_sent + @req_view.send_initial_metadata unless @req_view.nil? @call.run_batch(SEND_MESSAGE => payload) rescue GRPC::Core::CallError => e # This is almost definitely caused by a status arriving while still diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 79f739e8fa..48bc61e494 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -221,7 +221,7 @@ describe GRPC::ActiveCall do @client_call.send_initial_metadata end - it 'explicit sending fails if metadata has already been sent' do + it 'explicit sending does nothing if metadata has already been sent' do call = make_test_call @client_call = ActiveCall.new(call, @@ -235,7 +235,7 @@ describe GRPC::ActiveCall do @client_call.send_initial_metadata end - expect { blk.call }.to raise_error + expect { blk.call }.to_not raise_error end end -- cgit v1.2.3