aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/spec
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2017-07-31 14:45:54 -0700
committerGravatar Alexander Polcyn <apolcyn@google.com>2017-07-31 14:58:24 -0700
commit39088914ef71e878ed3184c3b70eda8d332bb368 (patch)
tree85f5c5d51f551437cbbcdbb66264e02d2aacb41f /src/ruby/spec
parent8dc1b7db51c49869ab24404bc810500db51fc86d (diff)
fix cancellation test flake
Diffstat (limited to 'src/ruby/spec')
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb88
1 files changed, 79 insertions, 9 deletions
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index e0646f4599..e4fe594e22 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -111,6 +111,32 @@ end
SlowStub = SlowService.rpc_stub_class
+# A test service that allows a synchronized RPC cancellation
+class SynchronizedCancellationService
+ include GRPC::GenericService
+ rpc :an_rpc, EchoMsg, EchoMsg
+ attr_reader :received_md, :delay
+
+ # notify_request_received and wait_until_rpc_cancelled are
+ # callbacks to synchronously allow the client to proceed with
+ # cancellation (after the unary request has been received),
+ # and to synchronously wait until the client has cancelled the
+ # current RPC.
+ def initialize(notify_request_received, wait_until_rpc_cancelled)
+ @notify_request_received = notify_request_received
+ @wait_until_rpc_cancelled = wait_until_rpc_cancelled
+ end
+
+ def an_rpc(req, _call)
+ GRPC.logger.info('starting a synchronusly cancelled rpc')
+ @notify_request_received.call(req)
+ @wait_until_rpc_cancelled.call
+ req # send back the req as the response
+ end
+end
+
+SynchronizedCancellationStub = SynchronizedCancellationService.rpc_stub_class
+
# a test service that hangs onto call objects
# and uses them after the server-side call has been
# finished
@@ -384,20 +410,64 @@ describe GRPC::RpcServer do
end
it 'should handle cancellation correctly', server: true do
- service = SlowService.new
+ request_received = false
+ request_received_mu = Mutex.new
+ request_received_cv = ConditionVariable.new
+ notify_request_received = proc do |req|
+ request_received_mu.synchronize do
+ fail 'req is nil' if req.nil?
+ expect(req.is_a?(EchoMsg)).to be true
+ fail 'test bug - already set' if request_received
+ request_received = true
+ request_received_cv.signal
+ end
+ end
+
+ rpc_cancelled = false
+ rpc_cancelled_mu = Mutex.new
+ rpc_cancelled_cv = ConditionVariable.new
+ wait_until_rpc_cancelled = proc do
+ rpc_cancelled_mu.synchronize do
+ loop do
+ break if rpc_cancelled
+ rpc_cancelled_cv.wait(rpc_cancelled_mu)
+ end
+ end
+ end
+
+ service = SynchronizedCancellationService.new(notify_request_received,
+ wait_until_rpc_cancelled)
@srv.handle(service)
- t = Thread.new { @srv.run }
+ srv_thd = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
- stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)
- op = stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }, return_op: true)
- Thread.new do # cancel the call
- sleep 0.1
- op.cancel
+ stub = SynchronizedCancellationStub.new(@host,
+ :this_channel_is_insecure,
+ **client_opts)
+ op = stub.an_rpc(req, return_op: true)
+
+ client_thd = Thread.new do
+ expect { op.execute }.to raise_error GRPC::Cancelled
end
- expect { op.execute }.to raise_error GRPC::Cancelled
+
+ request_received_mu.synchronize do
+ loop do
+ break if request_received
+ request_received_cv.wait(request_received_mu)
+ end
+ end
+
+ op.cancel
+
+ rpc_cancelled_mu.synchronize do
+ fail 'test bug - already set' if rpc_cancelled
+ rpc_cancelled = true
+ rpc_cancelled_cv.signal
+ end
+
+ client_thd.join
@srv.stop
- t.join
+ srv_thd.join
end
it 'should handle multiple parallel requests', server: true do