diff options
author | Michael Lumish <mlumish@google.com> | 2015-06-19 10:54:48 -0700 |
---|---|---|
committer | Michael Lumish <mlumish@google.com> | 2015-06-19 10:54:48 -0700 |
commit | 61a25fbff7be2915c50db154895a984f240ef6c7 (patch) | |
tree | 63cb8a1f03780781e3add6e3b7abff948d4246c5 /src | |
parent | a15f08cc9a8004785ec937b2ee06951b600d87b6 (diff) | |
parent | aa57bab3ca99f621f042b0a3985b36a0e0e2d6f6 (diff) |
Merge pull request #2138 from tbetbetbe/grpc-ruby-fix-cancel-after-first-response
Corrects the cancel_after_first_response behaviour
Diffstat (limited to 'src')
-rw-r--r-- | src/ruby/.rubocop_todo.yml | 2 | ||||
-rwxr-xr-x | src/ruby/bin/interop/interop_client.rb | 3 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 21 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/bidi_call.rb | 14 | ||||
-rw-r--r-- | src/ruby/lib/grpc/logconfig.rb | 2 |
5 files changed, 36 insertions, 6 deletions
diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml index c35e970df6..05db404582 100644 --- a/src/ruby/.rubocop_todo.yml +++ b/src/ruby/.rubocop_todo.yml @@ -12,7 +12,7 @@ Metrics/AbcSize: # Offense count: 3 # Configuration parameters: CountComments. Metrics/ClassLength: - Max: 192 + Max: 200 # Offense count: 35 # Configuration parameters: CountComments. diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index 16fb1b199d..da4caa842b 100755 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -284,7 +284,8 @@ class NamedTests op = @stub.full_duplex_call(ppp.each_item, return_op: true) ppp.canceller_op = op # causes ppp to cancel after the 1st message op.execute.each { |r| ppp.queue.push(r) } - assert(op.cancelled, 'call operation should be CANCELLED') + op.wait + assert(op.cancelled, 'call operation was not CANCELLED') p 'OK: cancel_after_first_response' end diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 3814ef34b4..215c0069a3 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -120,6 +120,7 @@ module GRPC @started = started @unmarshal = unmarshal @metadata_tag = metadata_tag + @op_notifier = nil end # output_metadata are provides access to hash that can be used to @@ -148,6 +149,7 @@ module GRPC # operation provides a restricted view of this ActiveCall for use as # a Operation. def operation + @op_notifier = Notifier.new Operation.new(self) end @@ -167,6 +169,7 @@ module GRPC batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) return unless assert_finished @call.status = batch_result.status + op_is_done batch_result.check_status end @@ -184,6 +187,7 @@ module GRPC end end @call.status = batch_result.status + op_is_done batch_result.check_status end @@ -415,7 +419,7 @@ module GRPC def bidi_streamer(requests, **kw, &blk) start_call(**kw) unless @started bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) - bd.run_on_client(requests, &blk) + bd.run_on_client(requests, @op_notifier, &blk) end # run_server_bidi orchestrates a BiDi stream processing on a server. @@ -434,6 +438,19 @@ module GRPC bd.run_on_server(gen_each_reply) end + # Waits till an operation completes + def wait + return if @op_notifier.nil? + GRPC.logger.debug("active_call.wait: on #{@op_notifier}") + @op_notifier.wait + end + + # Signals that an operation is done + def op_is_done + return if @op_notifier.nil? + @op_notifier.notify(self) + end + private # Starts the call if not already started @@ -468,6 +485,6 @@ module GRPC # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. Operation = view_class(:cancel, :cancelled, :deadline, :execute, - :metadata, :status, :start_call) + :metadata, :status, :start_call, :wait) end end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 489dd5162a..3b0c71395c 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -66,6 +66,7 @@ module GRPC @cq = q @deadline = deadline @marshal = marshal + @op_notifier = nil # signals completion on clients @readq = Queue.new @unmarshal = unmarshal end @@ -76,8 +77,10 @@ module GRPC # block that can be invoked with each response. # # @param requests the Enumerable of requests to send + # @op_notifier a Notifier used to signal completion # @return an Enumerator of requests to yield - def run_on_client(requests, &blk) + def run_on_client(requests, op_notifier, &blk) + @op_notifier = op_notifier @enq_th = Thread.new { write_loop(requests) } @loop_th = start_read_loop each_queued_msg(&blk) @@ -105,6 +108,13 @@ module GRPC END_OF_READS = :end_of_reads END_OF_WRITES = :end_of_writes + # signals that bidi operation is complete + def notify_done + return unless @op_notifier + GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}") + @op_notifier.notify(self) + end + # each_queued_msg yields each message on this instances readq # # - messages are added to the readq by #read_loop @@ -143,11 +153,13 @@ module GRPC @call.status = batch_result.status batch_result.check_status GRPC.logger.debug("bidi-write-loop: done status #{@call.status}") + notify_done end GRPC.logger.debug('bidi-write-loop: finished') rescue StandardError => e GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn(e) + notify_done raise e end diff --git a/src/ruby/lib/grpc/logconfig.rb b/src/ruby/lib/grpc/logconfig.rb index 96812170ba..e9b4aa3c95 100644 --- a/src/ruby/lib/grpc/logconfig.rb +++ b/src/ruby/lib/grpc/logconfig.rb @@ -38,6 +38,6 @@ Logging.logger.root.appenders = Logging.appenders.stdout Logging.logger.root.level = :info # TODO: provide command-line configuration for logging -Logging.logger['GRPC'].level = :debug +Logging.logger['GRPC'].level = :info Logging.logger['GRPC::ActiveCall'].level = :info Logging.logger['GRPC::BidiCall'].level = :info |