aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Michael Lumish <mlumish@google.com>2015-06-19 10:54:48 -0700
committerGravatar Michael Lumish <mlumish@google.com>2015-06-19 10:54:48 -0700
commit61a25fbff7be2915c50db154895a984f240ef6c7 (patch)
tree63cb8a1f03780781e3add6e3b7abff948d4246c5 /src
parenta15f08cc9a8004785ec937b2ee06951b600d87b6 (diff)
parentaa57bab3ca99f621f042b0a3985b36a0e0e2d6f6 (diff)
Merge pull request #2138 from tbetbetbe/grpc-ruby-fix-cancel-after-first-response
Corrects the cancel_after_first_response behaviour
Diffstat (limited to 'src')
-rw-r--r--src/ruby/.rubocop_todo.yml2
-rwxr-xr-xsrc/ruby/bin/interop/interop_client.rb3
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb21
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb14
-rw-r--r--src/ruby/lib/grpc/logconfig.rb2
5 files changed, 36 insertions, 6 deletions
diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml
index c35e970df6..05db404582 100644
--- a/src/ruby/.rubocop_todo.yml
+++ b/src/ruby/.rubocop_todo.yml
@@ -12,7 +12,7 @@ Metrics/AbcSize:
# Offense count: 3
# Configuration parameters: CountComments.
Metrics/ClassLength:
- Max: 192
+ Max: 200
# Offense count: 35
# Configuration parameters: CountComments.
diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb
index 16fb1b199d..da4caa842b 100755
--- a/src/ruby/bin/interop/interop_client.rb
+++ b/src/ruby/bin/interop/interop_client.rb
@@ -284,7 +284,8 @@ class NamedTests
op = @stub.full_duplex_call(ppp.each_item, return_op: true)
ppp.canceller_op = op # causes ppp to cancel after the 1st message
op.execute.each { |r| ppp.queue.push(r) }
- assert(op.cancelled, 'call operation should be CANCELLED')
+ op.wait
+ assert(op.cancelled, 'call operation was not CANCELLED')
p 'OK: cancel_after_first_response'
end
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 3814ef34b4..215c0069a3 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -120,6 +120,7 @@ module GRPC
@started = started
@unmarshal = unmarshal
@metadata_tag = metadata_tag
+ @op_notifier = nil
end
# output_metadata are provides access to hash that can be used to
@@ -148,6 +149,7 @@ module GRPC
# operation provides a restricted view of this ActiveCall for use as
# a Operation.
def operation
+ @op_notifier = Notifier.new
Operation.new(self)
end
@@ -167,6 +169,7 @@ module GRPC
batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
return unless assert_finished
@call.status = batch_result.status
+ op_is_done
batch_result.check_status
end
@@ -184,6 +187,7 @@ module GRPC
end
end
@call.status = batch_result.status
+ op_is_done
batch_result.check_status
end
@@ -415,7 +419,7 @@ module GRPC
def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
- bd.run_on_client(requests, &blk)
+ bd.run_on_client(requests, @op_notifier, &blk)
end
# run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -434,6 +438,19 @@ module GRPC
bd.run_on_server(gen_each_reply)
end
+ # Waits till an operation completes
+ def wait
+ return if @op_notifier.nil?
+ GRPC.logger.debug("active_call.wait: on #{@op_notifier}")
+ @op_notifier.wait
+ end
+
+ # Signals that an operation is done
+ def op_is_done
+ return if @op_notifier.nil?
+ @op_notifier.notify(self)
+ end
+
private
# Starts the call if not already started
@@ -468,6 +485,6 @@ module GRPC
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
- :metadata, :status, :start_call)
+ :metadata, :status, :start_call, :wait)
end
end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 489dd5162a..3b0c71395c 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -66,6 +66,7 @@ module GRPC
@cq = q
@deadline = deadline
@marshal = marshal
+ @op_notifier = nil # signals completion on clients
@readq = Queue.new
@unmarshal = unmarshal
end
@@ -76,8 +77,10 @@ module GRPC
# block that can be invoked with each response.
#
# @param requests the Enumerable of requests to send
+ # @op_notifier a Notifier used to signal completion
# @return an Enumerator of requests to yield
- def run_on_client(requests, &blk)
+ def run_on_client(requests, op_notifier, &blk)
+ @op_notifier = op_notifier
@enq_th = Thread.new { write_loop(requests) }
@loop_th = start_read_loop
each_queued_msg(&blk)
@@ -105,6 +108,13 @@ module GRPC
END_OF_READS = :end_of_reads
END_OF_WRITES = :end_of_writes
+ # signals that bidi operation is complete
+ def notify_done
+ return unless @op_notifier
+ GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}")
+ @op_notifier.notify(self)
+ end
+
# each_queued_msg yields each message on this instances readq
#
# - messages are added to the readq by #read_loop
@@ -143,11 +153,13 @@ module GRPC
@call.status = batch_result.status
batch_result.check_status
GRPC.logger.debug("bidi-write-loop: done status #{@call.status}")
+ notify_done
end
GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
+ notify_done
raise e
end
diff --git a/src/ruby/lib/grpc/logconfig.rb b/src/ruby/lib/grpc/logconfig.rb
index 96812170ba..e9b4aa3c95 100644
--- a/src/ruby/lib/grpc/logconfig.rb
+++ b/src/ruby/lib/grpc/logconfig.rb
@@ -38,6 +38,6 @@ Logging.logger.root.appenders = Logging.appenders.stdout
Logging.logger.root.level = :info
# TODO: provide command-line configuration for logging
-Logging.logger['GRPC'].level = :debug
+Logging.logger['GRPC'].level = :info
Logging.logger['GRPC::ActiveCall'].level = :info
Logging.logger['GRPC::BidiCall'].level = :info