diff options
Diffstat (limited to 'src/ruby/spec/generic')
-rw-r--r-- | src/ruby/spec/generic/rpc_server_spec.rb | 88 |
1 files changed, 79 insertions, 9 deletions
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index e0646f4599..e4fe594e22 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -111,6 +111,32 @@ end SlowStub = SlowService.rpc_stub_class +# A test service that allows a synchronized RPC cancellation +class SynchronizedCancellationService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + attr_reader :received_md, :delay + + # notify_request_received and wait_until_rpc_cancelled are + # callbacks to synchronously allow the client to proceed with + # cancellation (after the unary request has been received), + # and to synchronously wait until the client has cancelled the + # current RPC. + def initialize(notify_request_received, wait_until_rpc_cancelled) + @notify_request_received = notify_request_received + @wait_until_rpc_cancelled = wait_until_rpc_cancelled + end + + def an_rpc(req, _call) + GRPC.logger.info('starting a synchronusly cancelled rpc') + @notify_request_received.call(req) + @wait_until_rpc_cancelled.call + req # send back the req as the response + end +end + +SynchronizedCancellationStub = SynchronizedCancellationService.rpc_stub_class + # a test service that hangs onto call objects # and uses them after the server-side call has been # finished @@ -384,20 +410,64 @@ describe GRPC::RpcServer do end it 'should handle cancellation correctly', server: true do - service = SlowService.new + request_received = false + request_received_mu = Mutex.new + request_received_cv = ConditionVariable.new + notify_request_received = proc do |req| + request_received_mu.synchronize do + fail 'req is nil' if req.nil? + expect(req.is_a?(EchoMsg)).to be true + fail 'test bug - already set' if request_received + request_received = true + request_received_cv.signal + end + end + + rpc_cancelled = false + rpc_cancelled_mu = Mutex.new + rpc_cancelled_cv = ConditionVariable.new + wait_until_rpc_cancelled = proc do + rpc_cancelled_mu.synchronize do + loop do + break if rpc_cancelled + rpc_cancelled_cv.wait(rpc_cancelled_mu) + end + end + end + + service = SynchronizedCancellationService.new(notify_request_received, + wait_until_rpc_cancelled) @srv.handle(service) - t = Thread.new { @srv.run } + srv_thd = Thread.new { @srv.run } @srv.wait_till_running req = EchoMsg.new - stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts) - op = stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }, return_op: true) - Thread.new do # cancel the call - sleep 0.1 - op.cancel + stub = SynchronizedCancellationStub.new(@host, + :this_channel_is_insecure, + **client_opts) + op = stub.an_rpc(req, return_op: true) + + client_thd = Thread.new do + expect { op.execute }.to raise_error GRPC::Cancelled end - expect { op.execute }.to raise_error GRPC::Cancelled + + request_received_mu.synchronize do + loop do + break if request_received + request_received_cv.wait(request_received_mu) + end + end + + op.cancel + + rpc_cancelled_mu.synchronize do + fail 'test bug - already set' if rpc_cancelled + rpc_cancelled = true + rpc_cancelled_cv.signal + end + + client_thd.join @srv.stop - t.join + srv_thd.join end it 'should handle multiple parallel requests', server: true do |