diff options
author | Vijay Pai <vpai@google.com> | 2018-10-08 13:28:10 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2018-10-29 13:41:25 -0700 |
commit | 84e763f10a1e10d36c7de35970f9d25958ee2e16 (patch) | |
tree | 0aeb2dfa3ac8a6ec9d829989cb99d70ee20fa8eb /test/cpp | |
parent | ccfd919190babf042c2876e6b94fa38a2508b424 (diff) |
Experimental C++ server callback unary API
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/codegen/compiler_test_golden | 199 | ||||
-rw-r--r-- | test/cpp/end2end/client_callback_end2end_test.cc | 53 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.cc | 150 | ||||
-rw-r--r-- | test/cpp/end2end/test_service_impl.h | 33 |
4 files changed, 423 insertions, 12 deletions
diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 93e1e68654..fdc67969d9 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -33,6 +33,7 @@ #include <grpcpp/impl/codegen/method_handler_impl.h> #include <grpcpp/impl/codegen/proto_utils.h> #include <grpcpp/impl/codegen/rpc_method.h> +#include <grpcpp/impl/codegen/server_callback.h> #include <grpcpp/impl/codegen/service_type.h> #include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/stub_options.h> @@ -308,6 +309,80 @@ class ServiceA final { }; typedef WithAsyncMethod_MethodA1<WithAsyncMethod_MethodA2<WithAsyncMethod_MethodA3<WithAsyncMethod_MethodA4<Service > > > > AsyncService; template <class BaseClass> + class ExperimentalWithCallbackMethod_MethodA1 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithCallbackMethod_MethodA1() { + ::grpc::Service::experimental().MarkMethodCallback(0, + new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithCallbackMethod_MethodA1<BaseClass>, ::grpc::testing::Request, ::grpc::testing::Response>( + [this](::grpc::ServerContext* context, + const ::grpc::testing::Request* request, + ::grpc::testing::Response* response, + ::grpc::experimental::ServerCallbackRpcController* controller) { + this->MethodA1(context, request, response, controller); + }, this)); + } + ~ExperimentalWithCallbackMethod_MethodA1() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA1(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + virtual void MethodA1(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); } + }; + template <class BaseClass> + class ExperimentalWithCallbackMethod_MethodA2 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithCallbackMethod_MethodA2() { + } + ~ExperimentalWithCallbackMethod_MethodA2() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA2(::grpc::ServerContext* context, ::grpc::ServerReader< ::grpc::testing::Request>* reader, ::grpc::testing::Response* response) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + }; + template <class BaseClass> + class ExperimentalWithCallbackMethod_MethodA3 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithCallbackMethod_MethodA3() { + } + ~ExperimentalWithCallbackMethod_MethodA3() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA3(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::ServerWriter< ::grpc::testing::Response>* writer) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + }; + template <class BaseClass> + class ExperimentalWithCallbackMethod_MethodA4 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithCallbackMethod_MethodA4() { + } + ~ExperimentalWithCallbackMethod_MethodA4() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA4(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::grpc::testing::Response, ::grpc::testing::Request>* stream) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + }; + typedef ExperimentalWithCallbackMethod_MethodA1<ExperimentalWithCallbackMethod_MethodA2<ExperimentalWithCallbackMethod_MethodA3<ExperimentalWithCallbackMethod_MethodA4<Service > > > > ExperimentalCallbackService; + template <class BaseClass> class WithGenericMethod_MethodA1 : public BaseClass { private: void BaseClassMustBeDerivedFromService(const Service *service) {} @@ -456,6 +531,79 @@ class ServiceA final { } }; template <class BaseClass> + class ExperimentalWithRawCallbackMethod_MethodA1 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithRawCallbackMethod_MethodA1() { + ::grpc::Service::experimental().MarkMethodRawCallback(0, + new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithRawCallbackMethod_MethodA1<BaseClass>, ::grpc::ByteBuffer, ::grpc::ByteBuffer>( + [this](::grpc::ServerContext* context, + const ::grpc::ByteBuffer* request, + ::grpc::ByteBuffer* response, + ::grpc::experimental::ServerCallbackRpcController* controller) { + this->MethodA1(context, request, response, controller); + }, this)); + } + ~ExperimentalWithRawCallbackMethod_MethodA1() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA1(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + virtual void MethodA1(::grpc::ServerContext* context, const ::grpc::ByteBuffer* request, ::grpc::ByteBuffer* response, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); } + }; + template <class BaseClass> + class ExperimentalWithRawCallbackMethod_MethodA2 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithRawCallbackMethod_MethodA2() { + } + ~ExperimentalWithRawCallbackMethod_MethodA2() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA2(::grpc::ServerContext* context, ::grpc::ServerReader< ::grpc::testing::Request>* reader, ::grpc::testing::Response* response) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + }; + template <class BaseClass> + class ExperimentalWithRawCallbackMethod_MethodA3 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithRawCallbackMethod_MethodA3() { + } + ~ExperimentalWithRawCallbackMethod_MethodA3() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA3(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::ServerWriter< ::grpc::testing::Response>* writer) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + }; + template <class BaseClass> + class ExperimentalWithRawCallbackMethod_MethodA4 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithRawCallbackMethod_MethodA4() { + } + ~ExperimentalWithRawCallbackMethod_MethodA4() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA4(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::grpc::testing::Response, ::grpc::testing::Request>* stream) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + }; + template <class BaseClass> class WithStreamedUnaryMethod_MethodA1 : public BaseClass { private: void BaseClassMustBeDerivedFromService(const Service *service) {} @@ -591,6 +739,32 @@ class ServiceB final { }; typedef WithAsyncMethod_MethodB1<Service > AsyncService; template <class BaseClass> + class ExperimentalWithCallbackMethod_MethodB1 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithCallbackMethod_MethodB1() { + ::grpc::Service::experimental().MarkMethodCallback(0, + new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithCallbackMethod_MethodB1<BaseClass>, ::grpc::testing::Request, ::grpc::testing::Response>( + [this](::grpc::ServerContext* context, + const ::grpc::testing::Request* request, + ::grpc::testing::Response* response, + ::grpc::experimental::ServerCallbackRpcController* controller) { + this->MethodB1(context, request, response, controller); + }, this)); + } + ~ExperimentalWithCallbackMethod_MethodB1() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodB1(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + virtual void MethodB1(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); } + }; + typedef ExperimentalWithCallbackMethod_MethodB1<Service > ExperimentalCallbackService; + template <class BaseClass> class WithGenericMethod_MethodB1 : public BaseClass { private: void BaseClassMustBeDerivedFromService(const Service *service) {} @@ -628,6 +802,31 @@ class ServiceB final { } }; template <class BaseClass> + class ExperimentalWithRawCallbackMethod_MethodB1 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithRawCallbackMethod_MethodB1() { + ::grpc::Service::experimental().MarkMethodRawCallback(0, + new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithRawCallbackMethod_MethodB1<BaseClass>, ::grpc::ByteBuffer, ::grpc::ByteBuffer>( + [this](::grpc::ServerContext* context, + const ::grpc::ByteBuffer* request, + ::grpc::ByteBuffer* response, + ::grpc::experimental::ServerCallbackRpcController* controller) { + this->MethodB1(context, request, response, controller); + }, this)); + } + ~ExperimentalWithRawCallbackMethod_MethodB1() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodB1(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + virtual void MethodB1(::grpc::ServerContext* context, const ::grpc::ByteBuffer* request, ::grpc::ByteBuffer* response, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); } + }; + template <class BaseClass> class WithStreamedUnaryMethod_MethodB1 : public BaseClass { private: void BaseClassMustBeDerivedFromService(const Service *service) {} diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 62a85641c7..7ffc610ce2 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -41,13 +41,38 @@ namespace grpc { namespace testing { namespace { -class ClientCallbackEnd2endTest : public ::testing::Test { +class TestScenario { + public: + TestScenario(bool serve_callback) : callback_server(serve_callback) {} + void Log() const; + bool callback_server; +}; + +static std::ostream& operator<<(std::ostream& out, + const TestScenario& scenario) { + return out << "TestScenario{callback_server=" + << (scenario.callback_server ? "true" : "false") << "}"; +} + +void TestScenario::Log() const { + std::ostringstream out; + out << *this; + gpr_log(GPR_DEBUG, "%s", out.str().c_str()); +} + +class ClientCallbackEnd2endTest + : public ::testing::TestWithParam<TestScenario> { protected: - ClientCallbackEnd2endTest() {} + ClientCallbackEnd2endTest() { GetParam().Log(); } void SetUp() override { ServerBuilder builder; - builder.RegisterService(&service_); + + if (!GetParam().callback_server) { + builder.RegisterService(&service_); + } else { + builder.RegisterService(&callback_service_); + } server_ = builder.BuildAndStart(); is_server_started_ = true; @@ -151,37 +176,38 @@ class ClientCallbackEnd2endTest : public ::testing::Test { std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::unique_ptr<grpc::GenericStub> generic_stub_; TestServiceImpl service_; + CallbackTestServiceImpl callback_service_; std::unique_ptr<Server> server_; }; -TEST_F(ClientCallbackEnd2endTest, SimpleRpc) { +TEST_P(ClientCallbackEnd2endTest, SimpleRpc) { ResetStub(); SendRpcs(1, false); } -TEST_F(ClientCallbackEnd2endTest, SequentialRpcs) { +TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) { ResetStub(); SendRpcs(10, false); } -TEST_F(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) { +TEST_P(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) { ResetStub(); SendRpcs(10, true); } -TEST_F(ClientCallbackEnd2endTest, SequentialGenericRpcs) { +TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) { ResetStub(); SendRpcsGeneric(10, false); } #if GRPC_ALLOW_EXCEPTIONS -TEST_F(ClientCallbackEnd2endTest, ExceptingRpc) { +TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) { ResetStub(); SendRpcsGeneric(10, true); } #endif -TEST_F(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { +TEST_P(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { ResetStub(); std::vector<std::thread> threads; threads.reserve(10); @@ -193,7 +219,7 @@ TEST_F(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { } } -TEST_F(ClientCallbackEnd2endTest, MultipleRpcs) { +TEST_P(ClientCallbackEnd2endTest, MultipleRpcs) { ResetStub(); std::vector<std::thread> threads; threads.reserve(10); @@ -205,7 +231,7 @@ TEST_F(ClientCallbackEnd2endTest, MultipleRpcs) { } } -TEST_F(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { +TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { ResetStub(); EchoRequest request; EchoResponse response; @@ -230,6 +256,11 @@ TEST_F(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { } } +TestScenario scenarios[] = {TestScenario{false}, TestScenario{true}}; + +INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest, + ::testing::ValuesIn(scenarios)); + } // namespace } // namespace testing } // namespace grpc diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 3c3a5d9cd4..605356724f 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -165,6 +165,138 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, return Status::OK; } +void CallbackTestServiceImpl::Echo( + ServerContext* context, const EchoRequest* request, EchoResponse* response, + experimental::ServerCallbackRpcController* controller) { + // A bit of sleep to make sure that short deadline tests fail + if (request->has_param() && request->param().server_sleep_us() > 0) { + // Set an alarm for that much time + alarm_.experimental().Set( + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_micros(request->param().server_sleep_us(), + GPR_TIMESPAN)), + [this, context, request, response, controller](bool) { + EchoNonDelayed(context, request, response, controller); + }); + } else { + EchoNonDelayed(context, request, response, controller); + } +} + +void CallbackTestServiceImpl::EchoNonDelayed( + ServerContext* context, const EchoRequest* request, EchoResponse* response, + experimental::ServerCallbackRpcController* controller) { + if (request->has_param() && request->param().server_die()) { + gpr_log(GPR_ERROR, "The request should not reach application handler."); + GPR_ASSERT(0); + } + if (request->has_param() && request->param().has_expected_error()) { + const auto& error = request->param().expected_error(); + controller->Finish(Status(static_cast<StatusCode>(error.code()), + error.error_message(), + error.binary_error_details())); + } + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + if (server_try_cancel > DO_NOT_CANCEL) { + // Since this is a unary RPC, by the time this server handler is called, + // the 'request' message is already read from the client. So the scenarios + // in server_try_cancel don't make much sense. Just cancel the RPC as long + // as server_try_cancel is not DO_NOT_CANCEL + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); + // Now wait until it's really canceled + + std::function<void(bool)> recurrence = [this, context, controller, + &recurrence](bool) { + if (!context->IsCancelled()) { + alarm_.experimental().Set( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000, GPR_TIMESPAN)), + recurrence); + } else { + controller->Finish(Status::CANCELLED); + } + }; + recurrence(true); + return; + } + + response->set_message(request->message()); + MaybeEchoDeadline(context, request, response); + if (host_) { + response->mutable_param()->set_host(*host_); + } + if (request->has_param() && request->param().client_cancel_after_us()) { + { + std::unique_lock<std::mutex> lock(mu_); + signal_client_ = true; + } + std::function<void(bool)> recurrence = [this, context, request, controller, + &recurrence](bool) { + if (!context->IsCancelled()) { + alarm_.experimental().Set( + gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().client_cancel_after_us(), + GPR_TIMESPAN)), + recurrence); + } else { + controller->Finish(Status::CANCELLED); + } + }; + recurrence(true); + return; + } else if (request->has_param() && + request->param().server_cancel_after_us()) { + alarm_.experimental().Set( + gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().client_cancel_after_us(), + GPR_TIMESPAN)), + [controller](bool) { controller->Finish(Status::CANCELLED); }); + return; + } else if (!request->has_param() || + !request->param().skip_cancelled_check()) { + EXPECT_FALSE(context->IsCancelled()); + } + + if (request->has_param() && request->param().echo_metadata()) { + const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = + context->client_metadata(); + for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator + iter = client_metadata.begin(); + iter != client_metadata.end(); ++iter) { + context->AddTrailingMetadata(ToString(iter->first), + ToString(iter->second)); + } + // Terminate rpc with error and debug info in trailer. + if (request->param().debug_info().stack_entries_size() || + !request->param().debug_info().detail().empty()) { + grpc::string serialized_debug_info = + request->param().debug_info().SerializeAsString(); + context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info); + controller->Finish(Status::CANCELLED); + } + } + if (request->has_param() && + (request->param().expected_client_identity().length() > 0 || + request->param().check_auth_context())) { + CheckServerAuthContext(context, + request->param().expected_transport_security_type(), + request->param().expected_client_identity()); + } + if (request->has_param() && request->param().response_message_length() > 0) { + response->set_message( + grpc::string(request->param().response_message_length(), '\0')); + } + if (request->has_param() && request->param().echo_peer()) { + response->mutable_param()->set_peer(context->peer()); + } + controller->Finish(Status::OK); +} + // Unimplemented is left unimplemented to test the returned error. Status TestServiceImpl::RequestStream(ServerContext* context, @@ -332,7 +464,8 @@ Status TestServiceImpl::BidiStream( return Status::OK; } -int TestServiceImpl::GetIntValueFromMetadata( +namespace { +int GetIntValueFromMetadataHelper( const char* key, const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, int default_value) { @@ -344,6 +477,21 @@ int TestServiceImpl::GetIntValueFromMetadata( return default_value; } +}; // namespace + +int TestServiceImpl::GetIntValueFromMetadata( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value) { + return GetIntValueFromMetadataHelper(key, metadata, default_value); +} + +int CallbackTestServiceImpl::GetIntValueFromMetadata( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value) { + return GetIntValueFromMetadataHelper(key, metadata, default_value); +} void TestServiceImpl::ServerTryCancel(ServerContext* context) { EXPECT_FALSE(context->IsCancelled()); diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 052543a03e..ddfe94487e 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -22,6 +22,7 @@ #include <mutex> #include <grpc/grpc.h> +#include <grpcpp/alarm.h> #include <grpcpp/server_context.h> #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -78,7 +79,39 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { void ServerTryCancel(ServerContext* context); + bool signal_client_; + std::mutex mu_; + std::unique_ptr<grpc::string> host_; +}; + +class CallbackTestServiceImpl + : public ::grpc::testing::EchoTestService::ExperimentalCallbackService { + public: + CallbackTestServiceImpl() : signal_client_(false), host_() {} + explicit CallbackTestServiceImpl(const grpc::string& host) + : signal_client_(false), host_(new grpc::string(host)) {} + + void Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response, + experimental::ServerCallbackRpcController* controller) override; + + // Unimplemented is left unimplemented to test the returned error. + bool signal_client() { + std::unique_lock<std::mutex> lock(mu_); + return signal_client_; + } + private: + void EchoNonDelayed(ServerContext* context, const EchoRequest* request, + EchoResponse* response, + experimental::ServerCallbackRpcController* controller); + + int GetIntValueFromMetadata( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value); + + Alarm alarm_; bool signal_client_; std::mutex mu_; std::unique_ptr<grpc::string> host_; |