diff options
author | Vijay Pai <vpai@google.com> | 2018-11-05 14:39:44 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2018-11-30 16:14:21 -0800 |
commit | 2a0c0d7ad6a1256ef6c0398e1900eca2be077a51 (patch) | |
tree | 031f2a881661ead48fd9d86d56a6b33a01f64ca9 /test/cpp | |
parent | cdea58eb9a06240062793198739f5278d7193c3a (diff) |
Streaming API for callback servers
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/codegen/compiler_test_golden | 56 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 90 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.cc | 314 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.h | 21 |
4 files changed, 396 insertions, 85 deletions
diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 5f0eb6c35c..1871e1375e 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -322,13 +322,13 @@ class ServiceA final { public: ExperimentalWithCallbackMethod_MethodA1() { ::grpc::Service::experimental().MarkMethodCallback(0, - new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithCallbackMethod_MethodA1<BaseClass>, ::grpc::testing::Request, ::grpc::testing::Response>( + new ::grpc::internal::CallbackUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>( [this](::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, ::grpc::experimental::ServerCallbackRpcController* controller) { - this->MethodA1(context, request, response, controller); - }, this)); + return this->MethodA1(context, request, response, controller); + })); } ~ExperimentalWithCallbackMethod_MethodA1() override { BaseClassMustBeDerivedFromService(this); @@ -346,6 +346,9 @@ class ServiceA final { void BaseClassMustBeDerivedFromService(const Service *service) {} public: ExperimentalWithCallbackMethod_MethodA2() { + ::grpc::Service::experimental().MarkMethodCallback(1, + new ::grpc::internal::CallbackClientStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>( + [this] { return this->MethodA2(); })); } ~ExperimentalWithCallbackMethod_MethodA2() override { BaseClassMustBeDerivedFromService(this); @@ -355,6 +358,9 @@ class ServiceA final { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } + virtual ::grpc::experimental::ServerReadReactor< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA2() { + return new ::grpc::internal::UnimplementedReadReactor< + ::grpc::testing::Request, ::grpc::testing::Response>;} }; template <class BaseClass> class ExperimentalWithCallbackMethod_MethodA3 : public BaseClass { @@ -362,6 +368,9 @@ class ServiceA final { void BaseClassMustBeDerivedFromService(const Service *service) {} public: ExperimentalWithCallbackMethod_MethodA3() { + ::grpc::Service::experimental().MarkMethodCallback(2, + new ::grpc::internal::CallbackServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>( + [this] { return this->MethodA3(); })); } ~ExperimentalWithCallbackMethod_MethodA3() override { BaseClassMustBeDerivedFromService(this); @@ -371,6 +380,9 @@ class ServiceA final { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } + virtual ::grpc::experimental::ServerWriteReactor< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA3() { + return new ::grpc::internal::UnimplementedWriteReactor< + ::grpc::testing::Request, ::grpc::testing::Response>;} }; template <class BaseClass> class ExperimentalWithCallbackMethod_MethodA4 : public BaseClass { @@ -378,6 +390,9 @@ class ServiceA final { void BaseClassMustBeDerivedFromService(const Service *service) {} public: ExperimentalWithCallbackMethod_MethodA4() { + ::grpc::Service::experimental().MarkMethodCallback(3, + new ::grpc::internal::CallbackBidiHandler< ::grpc::testing::Request, ::grpc::testing::Response>( + [this] { return this->MethodA4(); })); } ~ExperimentalWithCallbackMethod_MethodA4() override { BaseClassMustBeDerivedFromService(this); @@ -387,6 +402,9 @@ class ServiceA final { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } + virtual ::grpc::experimental::ServerBidiReactor< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4() { + return new ::grpc::internal::UnimplementedBidiReactor< + ::grpc::testing::Request, ::grpc::testing::Response>;} }; typedef ExperimentalWithCallbackMethod_MethodA1<ExperimentalWithCallbackMethod_MethodA2<ExperimentalWithCallbackMethod_MethodA3<ExperimentalWithCallbackMethod_MethodA4<Service > > > > ExperimentalCallbackService; template <class BaseClass> @@ -544,13 +562,13 @@ class ServiceA final { public: ExperimentalWithRawCallbackMethod_MethodA1() { ::grpc::Service::experimental().MarkMethodRawCallback(0, - new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithRawCallbackMethod_MethodA1<BaseClass>, ::grpc::ByteBuffer, ::grpc::ByteBuffer>( + new ::grpc::internal::CallbackUnaryHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>( [this](::grpc::ServerContext* context, const ::grpc::ByteBuffer* request, ::grpc::ByteBuffer* response, ::grpc::experimental::ServerCallbackRpcController* controller) { this->MethodA1(context, request, response, controller); - }, this)); + })); } ~ExperimentalWithRawCallbackMethod_MethodA1() override { BaseClassMustBeDerivedFromService(this); @@ -568,6 +586,9 @@ class ServiceA final { void BaseClassMustBeDerivedFromService(const Service *service) {} public: ExperimentalWithRawCallbackMethod_MethodA2() { + ::grpc::Service::experimental().MarkMethodRawCallback(1, + new ::grpc::internal::CallbackClientStreamingHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>( + [this] { return this->MethodA2(); })); } ~ExperimentalWithRawCallbackMethod_MethodA2() override { BaseClassMustBeDerivedFromService(this); @@ -577,6 +598,9 @@ class ServiceA final { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } + virtual ::grpc::experimental::ServerReadReactor< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* MethodA2() { + return new ::grpc::internal::UnimplementedReadReactor< + ::grpc::ByteBuffer, ::grpc::ByteBuffer>;} }; template <class BaseClass> class ExperimentalWithRawCallbackMethod_MethodA3 : public BaseClass { @@ -584,6 +608,9 @@ class ServiceA final { void BaseClassMustBeDerivedFromService(const Service *service) {} public: ExperimentalWithRawCallbackMethod_MethodA3() { + ::grpc::Service::experimental().MarkMethodRawCallback(2, + new ::grpc::internal::CallbackServerStreamingHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>( + [this] { return this->MethodA3(); })); } ~ExperimentalWithRawCallbackMethod_MethodA3() override { BaseClassMustBeDerivedFromService(this); @@ -593,6 +620,9 @@ class ServiceA final { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } + virtual ::grpc::experimental::ServerWriteReactor< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* MethodA3() { + return new ::grpc::internal::UnimplementedWriteReactor< + ::grpc::ByteBuffer, ::grpc::ByteBuffer>;} }; template <class BaseClass> class ExperimentalWithRawCallbackMethod_MethodA4 : public BaseClass { @@ -600,6 +630,9 @@ class ServiceA final { void BaseClassMustBeDerivedFromService(const Service *service) {} public: ExperimentalWithRawCallbackMethod_MethodA4() { + ::grpc::Service::experimental().MarkMethodRawCallback(3, + new ::grpc::internal::CallbackBidiHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>( + [this] { return this->MethodA4(); })); } ~ExperimentalWithRawCallbackMethod_MethodA4() override { BaseClassMustBeDerivedFromService(this); @@ -609,6 +642,9 @@ class ServiceA final { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } + virtual ::grpc::experimental::ServerBidiReactor< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* MethodA4() { + return new ::grpc::internal::UnimplementedBidiReactor< + ::grpc::ByteBuffer, ::grpc::ByteBuffer>;} }; template <class BaseClass> class WithStreamedUnaryMethod_MethodA1 : public BaseClass { @@ -752,13 +788,13 @@ class ServiceB final { public: ExperimentalWithCallbackMethod_MethodB1() { ::grpc::Service::experimental().MarkMethodCallback(0, - new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithCallbackMethod_MethodB1<BaseClass>, ::grpc::testing::Request, ::grpc::testing::Response>( + new ::grpc::internal::CallbackUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>( [this](::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, ::grpc::experimental::ServerCallbackRpcController* controller) { - this->MethodB1(context, request, response, controller); - }, this)); + return this->MethodB1(context, request, response, controller); + })); } ~ExperimentalWithCallbackMethod_MethodB1() override { BaseClassMustBeDerivedFromService(this); @@ -815,13 +851,13 @@ class ServiceB final { public: ExperimentalWithRawCallbackMethod_MethodB1() { ::grpc::Service::experimental().MarkMethodRawCallback(0, - new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithRawCallbackMethod_MethodB1<BaseClass>, ::grpc::ByteBuffer, ::grpc::ByteBuffer>( + new ::grpc::internal::CallbackUnaryHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>( [this](::grpc::ServerContext* context, const ::grpc::ByteBuffer* request, ::grpc::ByteBuffer* response, ::grpc::experimental::ServerCallbackRpcController* controller) { this->MethodB1(context, request, response, controller); - }, this)); + })); } ~ExperimentalWithRawCallbackMethod_MethodB1() override { BaseClassMustBeDerivedFromService(this); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 03291e1785..4d37bae217 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -196,16 +196,18 @@ class TestServiceImplDupPkg class TestScenario { public: TestScenario(bool interceptors, bool proxy, bool inproc_stub, - const grpc::string& creds_type) + const grpc::string& creds_type, bool use_callback_server) : use_interceptors(interceptors), use_proxy(proxy), inproc(inproc_stub), - credentials_type(creds_type) {} + credentials_type(creds_type), + callback_server(use_callback_server) {} void Log() const; bool use_interceptors; bool use_proxy; bool inproc; const grpc::string credentials_type; + bool callback_server; }; static std::ostream& operator<<(std::ostream& out, @@ -214,6 +216,8 @@ static std::ostream& operator<<(std::ostream& out, << (scenario.use_interceptors ? "true" : "false") << ", use_proxy=" << (scenario.use_proxy ? "true" : "false") << ", inproc=" << (scenario.inproc ? "true" : "false") + << ", server_type=" + << (scenario.callback_server ? "callback" : "sync") << ", credentials='" << scenario.credentials_type << "'}"; } @@ -280,7 +284,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { builder.experimental().SetInterceptorCreators(std::move(creators)); } builder.AddListeningPort(server_address_.str(), server_creds); - builder.RegisterService(&service_); + if (!GetParam().callback_server) { + builder.RegisterService(&service_); + } else { + builder.RegisterService(&callback_service_); + } builder.RegisterService("foo.test.youtube.com", &special_service_); builder.RegisterService(&dup_pkg_service_); @@ -362,6 +370,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { std::ostringstream server_address_; const int kMaxMessageSize_; TestServiceImpl service_; + CallbackTestServiceImpl callback_service_; TestServiceImpl special_service_; TestServiceImplDupPkg dup_pkg_service_; grpc::string user_agent_prefix_; @@ -1016,7 +1025,8 @@ TEST_P(End2endTest, DiffPackageServices) { EXPECT_TRUE(s.ok()); } -void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) { +template <class ServiceType> +void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) { gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_micros(delay_us, GPR_TIMESPAN))); while (!service->signal_client()) { @@ -1446,7 +1456,24 @@ TEST_P(ProxyEnd2endTest, ClientCancelsRpc) { request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs); ClientContext context; - std::thread cancel_thread(CancelRpc, &context, kCancelDelayUs, &service_); + std::thread cancel_thread; + if (!GetParam().callback_server) { + cancel_thread = std::thread( + [&context, this](int delay) { CancelRpc(&context, delay, &service_); }, + kCancelDelayUs); + // Note: the unusual pattern above (and below) is caused by a conflict + // between two sets of compiler expectations. clang allows const to be + // captured without mention, so there is no need to capture kCancelDelayUs + // (and indeed clang-tidy complains if you do so). OTOH, a Windows compiler + // in our tests requires an explicit capture even for const. We square this + // circle by passing the const value in as an argument to the lambda. + } else { + cancel_thread = std::thread( + [&context, this](int delay) { + CancelRpc(&context, delay, &callback_service_); + }, + kCancelDelayUs); + } Status s = stub_->Echo(&context, request, &response); cancel_thread.join(); EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); @@ -1838,10 +1865,12 @@ TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) { EXPECT_TRUE(s.ok()); } +// TODO(vjpai): refactor arguments into a struct if it makes sense std::vector<TestScenario> CreateTestScenarios(bool use_proxy, bool test_insecure, bool test_secure, - bool test_inproc) { + bool test_inproc, + bool test_callback_server) { std::vector<TestScenario> scenarios; std::vector<grpc::string> credentials_types; if (test_secure) { @@ -1857,41 +1886,48 @@ std::vector<TestScenario> CreateTestScenarios(bool use_proxy, if (test_insecure && insec_ok()) { credentials_types.push_back(kInsecureCredentialsType); } + + // For now test callback server only with inproc GPR_ASSERT(!credentials_types.empty()); for (const auto& cred : credentials_types) { - scenarios.emplace_back(false, false, false, cred); - scenarios.emplace_back(true, false, false, cred); + scenarios.emplace_back(false, false, false, cred, false); + scenarios.emplace_back(true, false, false, cred, false); if (use_proxy) { - scenarios.emplace_back(false, true, false, cred); - scenarios.emplace_back(true, true, false, cred); + scenarios.emplace_back(false, true, false, cred, false); + scenarios.emplace_back(true, true, false, cred, false); } } if (test_inproc && insec_ok()) { - scenarios.emplace_back(false, false, true, kInsecureCredentialsType); - scenarios.emplace_back(true, false, true, kInsecureCredentialsType); + scenarios.emplace_back(false, false, true, kInsecureCredentialsType, false); + scenarios.emplace_back(true, false, true, kInsecureCredentialsType, false); + if (test_callback_server) { + scenarios.emplace_back(false, false, true, kInsecureCredentialsType, + true); + scenarios.emplace_back(true, false, true, kInsecureCredentialsType, true); + } } return scenarios; } -INSTANTIATE_TEST_CASE_P(End2end, End2endTest, - ::testing::ValuesIn(CreateTestScenarios(false, true, - true, true))); +INSTANTIATE_TEST_CASE_P( + End2end, End2endTest, + ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true))); -INSTANTIATE_TEST_CASE_P(End2endServerTryCancel, End2endServerTryCancelTest, - ::testing::ValuesIn(CreateTestScenarios(false, true, - true, true))); +INSTANTIATE_TEST_CASE_P( + End2endServerTryCancel, End2endServerTryCancelTest, + ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true))); -INSTANTIATE_TEST_CASE_P(ProxyEnd2end, ProxyEnd2endTest, - ::testing::ValuesIn(CreateTestScenarios(true, true, - true, true))); +INSTANTIATE_TEST_CASE_P( + ProxyEnd2end, ProxyEnd2endTest, + ::testing::ValuesIn(CreateTestScenarios(true, true, true, true, false))); -INSTANTIATE_TEST_CASE_P(SecureEnd2end, SecureEnd2endTest, - ::testing::ValuesIn(CreateTestScenarios(false, false, - true, false))); +INSTANTIATE_TEST_CASE_P( + SecureEnd2end, SecureEnd2endTest, + ::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true))); -INSTANTIATE_TEST_CASE_P(ResourceQuotaEnd2end, ResourceQuotaEnd2endTest, - ::testing::ValuesIn(CreateTestScenarios(false, true, - true, true))); +INSTANTIATE_TEST_CASE_P( + ResourceQuotaEnd2end, ResourceQuotaEnd2endTest, + ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, false))); } // namespace } // namespace testing diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 1726e87ea6..9d9c01cade 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -71,6 +71,46 @@ void CheckServerAuthContext( } } // namespace +namespace { +int GetIntValueFromMetadataHelper( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value) { + if (metadata.find(key) != metadata.end()) { + std::istringstream iss(ToString(metadata.find(key)->second)); + iss >> default_value; + gpr_log(GPR_INFO, "%s : %d", key, default_value); + } + + return default_value; +} + +int GetIntValueFromMetadata( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value) { + return GetIntValueFromMetadataHelper(key, metadata, default_value); +} + +void ServerTryCancel(ServerContext* context) { + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); + // Now wait until it's really canceled + while (!context->IsCancelled()) { + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000, GPR_TIMESPAN))); + } +} + +void ServerTryCancelNonblocking(ServerContext* context) { + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); +} + +} // namespace + Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) { // A bit of sleep to make sure that short deadline tests fail @@ -195,6 +235,7 @@ void CallbackTestServiceImpl::EchoNonDelayed( controller->Finish(Status(static_cast<StatusCode>(error.code()), error.error_message(), error.binary_error_details())); + return; } int server_try_cancel = GetIntValueFromMetadata( kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); @@ -254,7 +295,7 @@ void CallbackTestServiceImpl::EchoNonDelayed( alarm_.experimental().Set( gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().client_cancel_after_us(), + gpr_time_from_micros(request->param().server_cancel_after_us(), GPR_TIMESPAN)), [controller](bool) { controller->Finish(Status::CANCELLED); }); return; @@ -279,6 +320,7 @@ void CallbackTestServiceImpl::EchoNonDelayed( request->param().debug_info().SerializeAsString(); context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info); controller->Finish(Status::CANCELLED); + return; } } if (request->has_param() && @@ -325,7 +367,7 @@ Status TestServiceImpl::RequestStream(ServerContext* context, std::thread* server_try_cancel_thd = nullptr; if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = - new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + new std::thread([context] { ServerTryCancel(context); }); } int num_msgs_read = 0; @@ -380,7 +422,7 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, std::thread* server_try_cancel_thd = nullptr; if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = - new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + new std::thread([context] { ServerTryCancel(context); }); } for (int i = 0; i < server_responses_to_send; i++) { @@ -431,7 +473,7 @@ Status TestServiceImpl::BidiStream( std::thread* server_try_cancel_thd = nullptr; if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = - new std::thread(&TestServiceImpl::ServerTryCancel, this, context); + new std::thread([context] { ServerTryCancel(context); }); } // kServerFinishAfterNReads suggests after how many reads, the server should @@ -465,44 +507,244 @@ Status TestServiceImpl::BidiStream( return Status::OK; } -namespace { -int GetIntValueFromMetadataHelper( - const char* key, - const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, - int default_value) { - if (metadata.find(key) != metadata.end()) { - std::istringstream iss(ToString(metadata.find(key)->second)); - iss >> default_value; - gpr_log(GPR_INFO, "%s : %d", key, default_value); - } +experimental::ServerReadReactor<EchoRequest, EchoResponse>* +CallbackTestServiceImpl::RequestStream() { + class Reactor : public ::grpc::experimental::ServerReadReactor<EchoRequest, + EchoResponse> { + public: + Reactor() {} + void OnStarted(ServerContext* context, EchoResponse* response) override { + ctx_ = context; + response_ = response; + // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by + // the server by calling ServerContext::TryCancel() depending on the + // value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server + // reads any message from the client CANCEL_DURING_PROCESSING: The RPC + // is cancelled while the server is reading messages from the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads + // all the messages from the client + server_try_cancel_ = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + + response_->set_message(""); + + if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) { + ServerTryCancelNonblocking(ctx_); + return; + } - return default_value; -} -}; // namespace + if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { + ctx_->TryCancel(); + // Don't wait for it here + } -int TestServiceImpl::GetIntValueFromMetadata( - const char* key, - const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, - int default_value) { - return GetIntValueFromMetadataHelper(key, metadata, default_value); + StartRead(&request_); + } + void OnDone() override { delete this; } + void OnCancel() override { FinishOnce(Status::CANCELLED); } + void OnReadDone(bool ok) override { + if (ok) { + response_->mutable_message()->append(request_.message()); + num_msgs_read_++; + StartRead(&request_); + } else { + gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read_); + + if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { + // Let OnCancel recover this + return; + } + if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) { + ServerTryCancelNonblocking(ctx_); + return; + } + FinishOnce(Status::OK); + } + } + + private: + void FinishOnce(const Status& s) { + std::lock_guard<std::mutex> l(finish_mu_); + if (!finished_) { + Finish(s); + finished_ = true; + } + } + + ServerContext* ctx_; + EchoResponse* response_; + EchoRequest request_; + int num_msgs_read_{0}; + int server_try_cancel_; + std::mutex finish_mu_; + bool finished_{false}; + }; + + return new Reactor; } -int CallbackTestServiceImpl::GetIntValueFromMetadata( - const char* key, - const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, - int default_value) { - return GetIntValueFromMetadataHelper(key, metadata, default_value); +// Return 'kNumResponseStreamMsgs' messages. +// TODO(yangg) make it generic by adding a parameter into EchoRequest +experimental::ServerWriteReactor<EchoRequest, EchoResponse>* +CallbackTestServiceImpl::ResponseStream() { + class Reactor + : public ::grpc::experimental::ServerWriteReactor<EchoRequest, + EchoResponse> { + public: + Reactor() {} + void OnStarted(ServerContext* context, + const EchoRequest* request) override { + ctx_ = context; + request_ = request; + // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by + // the server by calling ServerContext::TryCancel() depending on the + // value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server + // reads any message from the client CANCEL_DURING_PROCESSING: The RPC + // is cancelled while the server is reading messages from the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads + // all the messages from the client + server_try_cancel_ = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + server_coalescing_api_ = GetIntValueFromMetadata( + kServerUseCoalescingApi, context->client_metadata(), 0); + server_responses_to_send_ = GetIntValueFromMetadata( + kServerResponseStreamsToSend, context->client_metadata(), + kServerDefaultResponseStreamsToSend); + if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) { + ServerTryCancelNonblocking(ctx_); + return; + } + + if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { + ctx_->TryCancel(); + } + if (num_msgs_sent_ < server_responses_to_send_) { + NextWrite(); + } + } + void OnDone() override { delete this; } + void OnCancel() override { FinishOnce(Status::CANCELLED); } + void OnWriteDone(bool ok) override { + if (num_msgs_sent_ < server_responses_to_send_) { + NextWrite(); + } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { + // Let OnCancel recover this + } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) { + ServerTryCancelNonblocking(ctx_); + } else { + FinishOnce(Status::OK); + } + } + + private: + void FinishOnce(const Status& s) { + std::lock_guard<std::mutex> l(finish_mu_); + if (!finished_) { + Finish(s); + finished_ = true; + } + } + + void NextWrite() { + response_.set_message(request_->message() + + grpc::to_string(num_msgs_sent_)); + if (num_msgs_sent_ == server_responses_to_send_ - 1 && + server_coalescing_api_ != 0) { + num_msgs_sent_++; + StartWriteLast(&response_, WriteOptions()); + } else { + num_msgs_sent_++; + StartWrite(&response_); + } + } + ServerContext* ctx_; + const EchoRequest* request_; + EchoResponse response_; + int num_msgs_sent_{0}; + int server_try_cancel_; + int server_coalescing_api_; + int server_responses_to_send_; + std::mutex finish_mu_; + bool finished_{false}; + }; + return new Reactor; } -void TestServiceImpl::ServerTryCancel(ServerContext* context) { - EXPECT_FALSE(context->IsCancelled()); - context->TryCancel(); - gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); - // Now wait until it's really canceled - while (!context->IsCancelled()) { - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000, GPR_TIMESPAN))); - } +experimental::ServerBidiReactor<EchoRequest, EchoResponse>* +CallbackTestServiceImpl::BidiStream() { + class Reactor : public ::grpc::experimental::ServerBidiReactor<EchoRequest, + EchoResponse> { + public: + Reactor() {} + void OnStarted(ServerContext* context) override { + ctx_ = context; + // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by + // the server by calling ServerContext::TryCancel() depending on the + // value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server + // reads any message from the client CANCEL_DURING_PROCESSING: The RPC + // is cancelled while the server is reading messages from the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads + // all the messages from the client + server_try_cancel_ = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + server_write_last_ = GetIntValueFromMetadata( + kServerFinishAfterNReads, context->client_metadata(), 0); + if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) { + ServerTryCancelNonblocking(ctx_); + return; + } + + if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { + ctx_->TryCancel(); + } + + StartRead(&request_); + } + void OnDone() override { delete this; } + void OnCancel() override { FinishOnce(Status::CANCELLED); } + void OnReadDone(bool ok) override { + if (ok) { + num_msgs_read_++; + gpr_log(GPR_INFO, "recv msg %s", request_.message().c_str()); + response_.set_message(request_.message()); + if (num_msgs_read_ == server_write_last_) { + StartWriteLast(&response_, WriteOptions()); + } else { + StartWrite(&response_); + } + } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) { + // Let OnCancel handle this + } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) { + ServerTryCancelNonblocking(ctx_); + } else { + FinishOnce(Status::OK); + } + } + void OnWriteDone(bool ok) override { StartRead(&request_); } + + private: + void FinishOnce(const Status& s) { + std::lock_guard<std::mutex> l(finish_mu_); + if (!finished_) { + Finish(s); + finished_ = true; + } + } + + ServerContext* ctx_; + EchoRequest request_; + EchoResponse response_; + int num_msgs_read_{0}; + int server_try_cancel_; + int server_write_last_; + std::mutex finish_mu_; + bool finished_{false}; + }; + + return new Reactor; } } // namespace testing diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index ddfe94487e..fad7768b87 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -72,13 +72,6 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { } private: - int GetIntValueFromMetadata( - const char* key, - const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, - int default_value); - - void ServerTryCancel(ServerContext* context); - bool signal_client_; std::mutex mu_; std::unique_ptr<grpc::string> host_; @@ -95,6 +88,15 @@ class CallbackTestServiceImpl EchoResponse* response, experimental::ServerCallbackRpcController* controller) override; + experimental::ServerReadReactor<EchoRequest, EchoResponse>* RequestStream() + override; + + experimental::ServerWriteReactor<EchoRequest, EchoResponse>* ResponseStream() + override; + + experimental::ServerBidiReactor<EchoRequest, EchoResponse>* BidiStream() + override; + // Unimplemented is left unimplemented to test the returned error. bool signal_client() { std::unique_lock<std::mutex> lock(mu_); @@ -106,11 +108,6 @@ class CallbackTestServiceImpl EchoResponse* response, experimental::ServerCallbackRpcController* controller); - int GetIntValueFromMetadata( - const char* key, - const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, - int default_value); - Alarm alarm_; bool signal_client_; std::mutex mu_; |