aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-11-05 14:39:44 -0800
committerGravatar Vijay Pai <vpai@google.com>2018-11-30 16:14:21 -0800
commit2a0c0d7ad6a1256ef6c0398e1900eca2be077a51 (patch)
tree031f2a881661ead48fd9d86d56a6b33a01f64ca9 /test/cpp
parentcdea58eb9a06240062793198739f5278d7193c3a (diff)
Streaming API for callback servers
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/codegen/compiler_test_golden56
-rw-r--r--test/cpp/end2end/end2end_test.cc90
-rw-r--r--test/cpp/end2end/test_service_impl.cc314
-rw-r--r--test/cpp/end2end/test_service_impl.h21
4 files changed, 396 insertions, 85 deletions
diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden
index 5f0eb6c35c..1871e1375e 100644
--- a/test/cpp/codegen/compiler_test_golden
+++ b/test/cpp/codegen/compiler_test_golden
@@ -322,13 +322,13 @@ class ServiceA final {
public:
ExperimentalWithCallbackMethod_MethodA1() {
::grpc::Service::experimental().MarkMethodCallback(0,
- new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithCallbackMethod_MethodA1<BaseClass>, ::grpc::testing::Request, ::grpc::testing::Response>(
+ new ::grpc::internal::CallbackUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(
[this](::grpc::ServerContext* context,
const ::grpc::testing::Request* request,
::grpc::testing::Response* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
- this->MethodA1(context, request, response, controller);
- }, this));
+ return this->MethodA1(context, request, response, controller);
+ }));
}
~ExperimentalWithCallbackMethod_MethodA1() override {
BaseClassMustBeDerivedFromService(this);
@@ -346,6 +346,9 @@ class ServiceA final {
void BaseClassMustBeDerivedFromService(const Service *service) {}
public:
ExperimentalWithCallbackMethod_MethodA2() {
+ ::grpc::Service::experimental().MarkMethodCallback(1,
+ new ::grpc::internal::CallbackClientStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(
+ [this] { return this->MethodA2(); }));
}
~ExperimentalWithCallbackMethod_MethodA2() override {
BaseClassMustBeDerivedFromService(this);
@@ -355,6 +358,9 @@ class ServiceA final {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
+ virtual ::grpc::experimental::ServerReadReactor< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA2() {
+ return new ::grpc::internal::UnimplementedReadReactor<
+ ::grpc::testing::Request, ::grpc::testing::Response>;}
};
template <class BaseClass>
class ExperimentalWithCallbackMethod_MethodA3 : public BaseClass {
@@ -362,6 +368,9 @@ class ServiceA final {
void BaseClassMustBeDerivedFromService(const Service *service) {}
public:
ExperimentalWithCallbackMethod_MethodA3() {
+ ::grpc::Service::experimental().MarkMethodCallback(2,
+ new ::grpc::internal::CallbackServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(
+ [this] { return this->MethodA3(); }));
}
~ExperimentalWithCallbackMethod_MethodA3() override {
BaseClassMustBeDerivedFromService(this);
@@ -371,6 +380,9 @@ class ServiceA final {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
+ virtual ::grpc::experimental::ServerWriteReactor< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA3() {
+ return new ::grpc::internal::UnimplementedWriteReactor<
+ ::grpc::testing::Request, ::grpc::testing::Response>;}
};
template <class BaseClass>
class ExperimentalWithCallbackMethod_MethodA4 : public BaseClass {
@@ -378,6 +390,9 @@ class ServiceA final {
void BaseClassMustBeDerivedFromService(const Service *service) {}
public:
ExperimentalWithCallbackMethod_MethodA4() {
+ ::grpc::Service::experimental().MarkMethodCallback(3,
+ new ::grpc::internal::CallbackBidiHandler< ::grpc::testing::Request, ::grpc::testing::Response>(
+ [this] { return this->MethodA4(); }));
}
~ExperimentalWithCallbackMethod_MethodA4() override {
BaseClassMustBeDerivedFromService(this);
@@ -387,6 +402,9 @@ class ServiceA final {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
+ virtual ::grpc::experimental::ServerBidiReactor< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4() {
+ return new ::grpc::internal::UnimplementedBidiReactor<
+ ::grpc::testing::Request, ::grpc::testing::Response>;}
};
typedef ExperimentalWithCallbackMethod_MethodA1<ExperimentalWithCallbackMethod_MethodA2<ExperimentalWithCallbackMethod_MethodA3<ExperimentalWithCallbackMethod_MethodA4<Service > > > > ExperimentalCallbackService;
template <class BaseClass>
@@ -544,13 +562,13 @@ class ServiceA final {
public:
ExperimentalWithRawCallbackMethod_MethodA1() {
::grpc::Service::experimental().MarkMethodRawCallback(0,
- new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithRawCallbackMethod_MethodA1<BaseClass>, ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
+ new ::grpc::internal::CallbackUnaryHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
[this](::grpc::ServerContext* context,
const ::grpc::ByteBuffer* request,
::grpc::ByteBuffer* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
this->MethodA1(context, request, response, controller);
- }, this));
+ }));
}
~ExperimentalWithRawCallbackMethod_MethodA1() override {
BaseClassMustBeDerivedFromService(this);
@@ -568,6 +586,9 @@ class ServiceA final {
void BaseClassMustBeDerivedFromService(const Service *service) {}
public:
ExperimentalWithRawCallbackMethod_MethodA2() {
+ ::grpc::Service::experimental().MarkMethodRawCallback(1,
+ new ::grpc::internal::CallbackClientStreamingHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
+ [this] { return this->MethodA2(); }));
}
~ExperimentalWithRawCallbackMethod_MethodA2() override {
BaseClassMustBeDerivedFromService(this);
@@ -577,6 +598,9 @@ class ServiceA final {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
+ virtual ::grpc::experimental::ServerReadReactor< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* MethodA2() {
+ return new ::grpc::internal::UnimplementedReadReactor<
+ ::grpc::ByteBuffer, ::grpc::ByteBuffer>;}
};
template <class BaseClass>
class ExperimentalWithRawCallbackMethod_MethodA3 : public BaseClass {
@@ -584,6 +608,9 @@ class ServiceA final {
void BaseClassMustBeDerivedFromService(const Service *service) {}
public:
ExperimentalWithRawCallbackMethod_MethodA3() {
+ ::grpc::Service::experimental().MarkMethodRawCallback(2,
+ new ::grpc::internal::CallbackServerStreamingHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
+ [this] { return this->MethodA3(); }));
}
~ExperimentalWithRawCallbackMethod_MethodA3() override {
BaseClassMustBeDerivedFromService(this);
@@ -593,6 +620,9 @@ class ServiceA final {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
+ virtual ::grpc::experimental::ServerWriteReactor< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* MethodA3() {
+ return new ::grpc::internal::UnimplementedWriteReactor<
+ ::grpc::ByteBuffer, ::grpc::ByteBuffer>;}
};
template <class BaseClass>
class ExperimentalWithRawCallbackMethod_MethodA4 : public BaseClass {
@@ -600,6 +630,9 @@ class ServiceA final {
void BaseClassMustBeDerivedFromService(const Service *service) {}
public:
ExperimentalWithRawCallbackMethod_MethodA4() {
+ ::grpc::Service::experimental().MarkMethodRawCallback(3,
+ new ::grpc::internal::CallbackBidiHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
+ [this] { return this->MethodA4(); }));
}
~ExperimentalWithRawCallbackMethod_MethodA4() override {
BaseClassMustBeDerivedFromService(this);
@@ -609,6 +642,9 @@ class ServiceA final {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
+ virtual ::grpc::experimental::ServerBidiReactor< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* MethodA4() {
+ return new ::grpc::internal::UnimplementedBidiReactor<
+ ::grpc::ByteBuffer, ::grpc::ByteBuffer>;}
};
template <class BaseClass>
class WithStreamedUnaryMethod_MethodA1 : public BaseClass {
@@ -752,13 +788,13 @@ class ServiceB final {
public:
ExperimentalWithCallbackMethod_MethodB1() {
::grpc::Service::experimental().MarkMethodCallback(0,
- new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithCallbackMethod_MethodB1<BaseClass>, ::grpc::testing::Request, ::grpc::testing::Response>(
+ new ::grpc::internal::CallbackUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(
[this](::grpc::ServerContext* context,
const ::grpc::testing::Request* request,
::grpc::testing::Response* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
- this->MethodB1(context, request, response, controller);
- }, this));
+ return this->MethodB1(context, request, response, controller);
+ }));
}
~ExperimentalWithCallbackMethod_MethodB1() override {
BaseClassMustBeDerivedFromService(this);
@@ -815,13 +851,13 @@ class ServiceB final {
public:
ExperimentalWithRawCallbackMethod_MethodB1() {
::grpc::Service::experimental().MarkMethodRawCallback(0,
- new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithRawCallbackMethod_MethodB1<BaseClass>, ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
+ new ::grpc::internal::CallbackUnaryHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>(
[this](::grpc::ServerContext* context,
const ::grpc::ByteBuffer* request,
::grpc::ByteBuffer* response,
::grpc::experimental::ServerCallbackRpcController* controller) {
this->MethodB1(context, request, response, controller);
- }, this));
+ }));
}
~ExperimentalWithRawCallbackMethod_MethodB1() override {
BaseClassMustBeDerivedFromService(this);
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 03291e1785..4d37bae217 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -196,16 +196,18 @@ class TestServiceImplDupPkg
class TestScenario {
public:
TestScenario(bool interceptors, bool proxy, bool inproc_stub,
- const grpc::string& creds_type)
+ const grpc::string& creds_type, bool use_callback_server)
: use_interceptors(interceptors),
use_proxy(proxy),
inproc(inproc_stub),
- credentials_type(creds_type) {}
+ credentials_type(creds_type),
+ callback_server(use_callback_server) {}
void Log() const;
bool use_interceptors;
bool use_proxy;
bool inproc;
const grpc::string credentials_type;
+ bool callback_server;
};
static std::ostream& operator<<(std::ostream& out,
@@ -214,6 +216,8 @@ static std::ostream& operator<<(std::ostream& out,
<< (scenario.use_interceptors ? "true" : "false")
<< ", use_proxy=" << (scenario.use_proxy ? "true" : "false")
<< ", inproc=" << (scenario.inproc ? "true" : "false")
+ << ", server_type="
+ << (scenario.callback_server ? "callback" : "sync")
<< ", credentials='" << scenario.credentials_type << "'}";
}
@@ -280,7 +284,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
builder.experimental().SetInterceptorCreators(std::move(creators));
}
builder.AddListeningPort(server_address_.str(), server_creds);
- builder.RegisterService(&service_);
+ if (!GetParam().callback_server) {
+ builder.RegisterService(&service_);
+ } else {
+ builder.RegisterService(&callback_service_);
+ }
builder.RegisterService("foo.test.youtube.com", &special_service_);
builder.RegisterService(&dup_pkg_service_);
@@ -362,6 +370,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
std::ostringstream server_address_;
const int kMaxMessageSize_;
TestServiceImpl service_;
+ CallbackTestServiceImpl callback_service_;
TestServiceImpl special_service_;
TestServiceImplDupPkg dup_pkg_service_;
grpc::string user_agent_prefix_;
@@ -1016,7 +1025,8 @@ TEST_P(End2endTest, DiffPackageServices) {
EXPECT_TRUE(s.ok());
}
-void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) {
+template <class ServiceType>
+void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(delay_us, GPR_TIMESPAN)));
while (!service->signal_client()) {
@@ -1446,7 +1456,24 @@ TEST_P(ProxyEnd2endTest, ClientCancelsRpc) {
request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
ClientContext context;
- std::thread cancel_thread(CancelRpc, &context, kCancelDelayUs, &service_);
+ std::thread cancel_thread;
+ if (!GetParam().callback_server) {
+ cancel_thread = std::thread(
+ [&context, this](int delay) { CancelRpc(&context, delay, &service_); },
+ kCancelDelayUs);
+ // Note: the unusual pattern above (and below) is caused by a conflict
+ // between two sets of compiler expectations. clang allows const to be
+ // captured without mention, so there is no need to capture kCancelDelayUs
+ // (and indeed clang-tidy complains if you do so). OTOH, a Windows compiler
+ // in our tests requires an explicit capture even for const. We square this
+ // circle by passing the const value in as an argument to the lambda.
+ } else {
+ cancel_thread = std::thread(
+ [&context, this](int delay) {
+ CancelRpc(&context, delay, &callback_service_);
+ },
+ kCancelDelayUs);
+ }
Status s = stub_->Echo(&context, request, &response);
cancel_thread.join();
EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
@@ -1838,10 +1865,12 @@ TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) {
EXPECT_TRUE(s.ok());
}
+// TODO(vjpai): refactor arguments into a struct if it makes sense
std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
bool test_insecure,
bool test_secure,
- bool test_inproc) {
+ bool test_inproc,
+ bool test_callback_server) {
std::vector<TestScenario> scenarios;
std::vector<grpc::string> credentials_types;
if (test_secure) {
@@ -1857,41 +1886,48 @@ std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
if (test_insecure && insec_ok()) {
credentials_types.push_back(kInsecureCredentialsType);
}
+
+ // For now test callback server only with inproc
GPR_ASSERT(!credentials_types.empty());
for (const auto& cred : credentials_types) {
- scenarios.emplace_back(false, false, false, cred);
- scenarios.emplace_back(true, false, false, cred);
+ scenarios.emplace_back(false, false, false, cred, false);
+ scenarios.emplace_back(true, false, false, cred, false);
if (use_proxy) {
- scenarios.emplace_back(false, true, false, cred);
- scenarios.emplace_back(true, true, false, cred);
+ scenarios.emplace_back(false, true, false, cred, false);
+ scenarios.emplace_back(true, true, false, cred, false);
}
}
if (test_inproc && insec_ok()) {
- scenarios.emplace_back(false, false, true, kInsecureCredentialsType);
- scenarios.emplace_back(true, false, true, kInsecureCredentialsType);
+ scenarios.emplace_back(false, false, true, kInsecureCredentialsType, false);
+ scenarios.emplace_back(true, false, true, kInsecureCredentialsType, false);
+ if (test_callback_server) {
+ scenarios.emplace_back(false, false, true, kInsecureCredentialsType,
+ true);
+ scenarios.emplace_back(true, false, true, kInsecureCredentialsType, true);
+ }
}
return scenarios;
}
-INSTANTIATE_TEST_CASE_P(End2end, End2endTest,
- ::testing::ValuesIn(CreateTestScenarios(false, true,
- true, true)));
+INSTANTIATE_TEST_CASE_P(
+ End2end, End2endTest,
+ ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
-INSTANTIATE_TEST_CASE_P(End2endServerTryCancel, End2endServerTryCancelTest,
- ::testing::ValuesIn(CreateTestScenarios(false, true,
- true, true)));
+INSTANTIATE_TEST_CASE_P(
+ End2endServerTryCancel, End2endServerTryCancelTest,
+ ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
-INSTANTIATE_TEST_CASE_P(ProxyEnd2end, ProxyEnd2endTest,
- ::testing::ValuesIn(CreateTestScenarios(true, true,
- true, true)));
+INSTANTIATE_TEST_CASE_P(
+ ProxyEnd2end, ProxyEnd2endTest,
+ ::testing::ValuesIn(CreateTestScenarios(true, true, true, true, false)));
-INSTANTIATE_TEST_CASE_P(SecureEnd2end, SecureEnd2endTest,
- ::testing::ValuesIn(CreateTestScenarios(false, false,
- true, false)));
+INSTANTIATE_TEST_CASE_P(
+ SecureEnd2end, SecureEnd2endTest,
+ ::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true)));
-INSTANTIATE_TEST_CASE_P(ResourceQuotaEnd2end, ResourceQuotaEnd2endTest,
- ::testing::ValuesIn(CreateTestScenarios(false, true,
- true, true)));
+INSTANTIATE_TEST_CASE_P(
+ ResourceQuotaEnd2end, ResourceQuotaEnd2endTest,
+ ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, false)));
} // namespace
} // namespace testing
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index 1726e87ea6..9d9c01cade 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -71,6 +71,46 @@ void CheckServerAuthContext(
}
} // namespace
+namespace {
+int GetIntValueFromMetadataHelper(
+ const char* key,
+ const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
+ int default_value) {
+ if (metadata.find(key) != metadata.end()) {
+ std::istringstream iss(ToString(metadata.find(key)->second));
+ iss >> default_value;
+ gpr_log(GPR_INFO, "%s : %d", key, default_value);
+ }
+
+ return default_value;
+}
+
+int GetIntValueFromMetadata(
+ const char* key,
+ const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
+ int default_value) {
+ return GetIntValueFromMetadataHelper(key, metadata, default_value);
+}
+
+void ServerTryCancel(ServerContext* context) {
+ EXPECT_FALSE(context->IsCancelled());
+ context->TryCancel();
+ gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
+ // Now wait until it's really canceled
+ while (!context->IsCancelled()) {
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_micros(1000, GPR_TIMESPAN)));
+ }
+}
+
+void ServerTryCancelNonblocking(ServerContext* context) {
+ EXPECT_FALSE(context->IsCancelled());
+ context->TryCancel();
+ gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
+}
+
+} // namespace
+
Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) {
// A bit of sleep to make sure that short deadline tests fail
@@ -195,6 +235,7 @@ void CallbackTestServiceImpl::EchoNonDelayed(
controller->Finish(Status(static_cast<StatusCode>(error.code()),
error.error_message(),
error.binary_error_details()));
+ return;
}
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
@@ -254,7 +295,7 @@ void CallbackTestServiceImpl::EchoNonDelayed(
alarm_.experimental().Set(
gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(request->param().client_cancel_after_us(),
+ gpr_time_from_micros(request->param().server_cancel_after_us(),
GPR_TIMESPAN)),
[controller](bool) { controller->Finish(Status::CANCELLED); });
return;
@@ -279,6 +320,7 @@ void CallbackTestServiceImpl::EchoNonDelayed(
request->param().debug_info().SerializeAsString();
context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info);
controller->Finish(Status::CANCELLED);
+ return;
}
}
if (request->has_param() &&
@@ -325,7 +367,7 @@ Status TestServiceImpl::RequestStream(ServerContext* context,
std::thread* server_try_cancel_thd = nullptr;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
- new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
+ new std::thread([context] { ServerTryCancel(context); });
}
int num_msgs_read = 0;
@@ -380,7 +422,7 @@ Status TestServiceImpl::ResponseStream(ServerContext* context,
std::thread* server_try_cancel_thd = nullptr;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
- new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
+ new std::thread([context] { ServerTryCancel(context); });
}
for (int i = 0; i < server_responses_to_send; i++) {
@@ -431,7 +473,7 @@ Status TestServiceImpl::BidiStream(
std::thread* server_try_cancel_thd = nullptr;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
- new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
+ new std::thread([context] { ServerTryCancel(context); });
}
// kServerFinishAfterNReads suggests after how many reads, the server should
@@ -465,44 +507,244 @@ Status TestServiceImpl::BidiStream(
return Status::OK;
}
-namespace {
-int GetIntValueFromMetadataHelper(
- const char* key,
- const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
- int default_value) {
- if (metadata.find(key) != metadata.end()) {
- std::istringstream iss(ToString(metadata.find(key)->second));
- iss >> default_value;
- gpr_log(GPR_INFO, "%s : %d", key, default_value);
- }
+experimental::ServerReadReactor<EchoRequest, EchoResponse>*
+CallbackTestServiceImpl::RequestStream() {
+ class Reactor : public ::grpc::experimental::ServerReadReactor<EchoRequest,
+ EchoResponse> {
+ public:
+ Reactor() {}
+ void OnStarted(ServerContext* context, EchoResponse* response) override {
+ ctx_ = context;
+ response_ = response;
+ // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
+ // the server by calling ServerContext::TryCancel() depending on the
+ // value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
+ // reads any message from the client CANCEL_DURING_PROCESSING: The RPC
+ // is cancelled while the server is reading messages from the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
+ // all the messages from the client
+ server_try_cancel_ = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+
+ response_->set_message("");
+
+ if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancelNonblocking(ctx_);
+ return;
+ }
- return default_value;
-}
-}; // namespace
+ if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
+ ctx_->TryCancel();
+ // Don't wait for it here
+ }
-int TestServiceImpl::GetIntValueFromMetadata(
- const char* key,
- const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
- int default_value) {
- return GetIntValueFromMetadataHelper(key, metadata, default_value);
+ StartRead(&request_);
+ }
+ void OnDone() override { delete this; }
+ void OnCancel() override { FinishOnce(Status::CANCELLED); }
+ void OnReadDone(bool ok) override {
+ if (ok) {
+ response_->mutable_message()->append(request_.message());
+ num_msgs_read_++;
+ StartRead(&request_);
+ } else {
+ gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read_);
+
+ if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
+ // Let OnCancel recover this
+ return;
+ }
+ if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancelNonblocking(ctx_);
+ return;
+ }
+ FinishOnce(Status::OK);
+ }
+ }
+
+ private:
+ void FinishOnce(const Status& s) {
+ std::lock_guard<std::mutex> l(finish_mu_);
+ if (!finished_) {
+ Finish(s);
+ finished_ = true;
+ }
+ }
+
+ ServerContext* ctx_;
+ EchoResponse* response_;
+ EchoRequest request_;
+ int num_msgs_read_{0};
+ int server_try_cancel_;
+ std::mutex finish_mu_;
+ bool finished_{false};
+ };
+
+ return new Reactor;
}
-int CallbackTestServiceImpl::GetIntValueFromMetadata(
- const char* key,
- const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
- int default_value) {
- return GetIntValueFromMetadataHelper(key, metadata, default_value);
+// Return 'kNumResponseStreamMsgs' messages.
+// TODO(yangg) make it generic by adding a parameter into EchoRequest
+experimental::ServerWriteReactor<EchoRequest, EchoResponse>*
+CallbackTestServiceImpl::ResponseStream() {
+ class Reactor
+ : public ::grpc::experimental::ServerWriteReactor<EchoRequest,
+ EchoResponse> {
+ public:
+ Reactor() {}
+ void OnStarted(ServerContext* context,
+ const EchoRequest* request) override {
+ ctx_ = context;
+ request_ = request;
+ // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
+ // the server by calling ServerContext::TryCancel() depending on the
+ // value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
+ // reads any message from the client CANCEL_DURING_PROCESSING: The RPC
+ // is cancelled while the server is reading messages from the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
+ // all the messages from the client
+ server_try_cancel_ = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+ server_coalescing_api_ = GetIntValueFromMetadata(
+ kServerUseCoalescingApi, context->client_metadata(), 0);
+ server_responses_to_send_ = GetIntValueFromMetadata(
+ kServerResponseStreamsToSend, context->client_metadata(),
+ kServerDefaultResponseStreamsToSend);
+ if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancelNonblocking(ctx_);
+ return;
+ }
+
+ if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
+ ctx_->TryCancel();
+ }
+ if (num_msgs_sent_ < server_responses_to_send_) {
+ NextWrite();
+ }
+ }
+ void OnDone() override { delete this; }
+ void OnCancel() override { FinishOnce(Status::CANCELLED); }
+ void OnWriteDone(bool ok) override {
+ if (num_msgs_sent_ < server_responses_to_send_) {
+ NextWrite();
+ } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
+ // Let OnCancel recover this
+ } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancelNonblocking(ctx_);
+ } else {
+ FinishOnce(Status::OK);
+ }
+ }
+
+ private:
+ void FinishOnce(const Status& s) {
+ std::lock_guard<std::mutex> l(finish_mu_);
+ if (!finished_) {
+ Finish(s);
+ finished_ = true;
+ }
+ }
+
+ void NextWrite() {
+ response_.set_message(request_->message() +
+ grpc::to_string(num_msgs_sent_));
+ if (num_msgs_sent_ == server_responses_to_send_ - 1 &&
+ server_coalescing_api_ != 0) {
+ num_msgs_sent_++;
+ StartWriteLast(&response_, WriteOptions());
+ } else {
+ num_msgs_sent_++;
+ StartWrite(&response_);
+ }
+ }
+ ServerContext* ctx_;
+ const EchoRequest* request_;
+ EchoResponse response_;
+ int num_msgs_sent_{0};
+ int server_try_cancel_;
+ int server_coalescing_api_;
+ int server_responses_to_send_;
+ std::mutex finish_mu_;
+ bool finished_{false};
+ };
+ return new Reactor;
}
-void TestServiceImpl::ServerTryCancel(ServerContext* context) {
- EXPECT_FALSE(context->IsCancelled());
- context->TryCancel();
- gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
- // Now wait until it's really canceled
- while (!context->IsCancelled()) {
- gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(1000, GPR_TIMESPAN)));
- }
+experimental::ServerBidiReactor<EchoRequest, EchoResponse>*
+CallbackTestServiceImpl::BidiStream() {
+ class Reactor : public ::grpc::experimental::ServerBidiReactor<EchoRequest,
+ EchoResponse> {
+ public:
+ Reactor() {}
+ void OnStarted(ServerContext* context) override {
+ ctx_ = context;
+ // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
+ // the server by calling ServerContext::TryCancel() depending on the
+ // value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
+ // reads any message from the client CANCEL_DURING_PROCESSING: The RPC
+ // is cancelled while the server is reading messages from the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
+ // all the messages from the client
+ server_try_cancel_ = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+ server_write_last_ = GetIntValueFromMetadata(
+ kServerFinishAfterNReads, context->client_metadata(), 0);
+ if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancelNonblocking(ctx_);
+ return;
+ }
+
+ if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
+ ctx_->TryCancel();
+ }
+
+ StartRead(&request_);
+ }
+ void OnDone() override { delete this; }
+ void OnCancel() override { FinishOnce(Status::CANCELLED); }
+ void OnReadDone(bool ok) override {
+ if (ok) {
+ num_msgs_read_++;
+ gpr_log(GPR_INFO, "recv msg %s", request_.message().c_str());
+ response_.set_message(request_.message());
+ if (num_msgs_read_ == server_write_last_) {
+ StartWriteLast(&response_, WriteOptions());
+ } else {
+ StartWrite(&response_);
+ }
+ } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
+ // Let OnCancel handle this
+ } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancelNonblocking(ctx_);
+ } else {
+ FinishOnce(Status::OK);
+ }
+ }
+ void OnWriteDone(bool ok) override { StartRead(&request_); }
+
+ private:
+ void FinishOnce(const Status& s) {
+ std::lock_guard<std::mutex> l(finish_mu_);
+ if (!finished_) {
+ Finish(s);
+ finished_ = true;
+ }
+ }
+
+ ServerContext* ctx_;
+ EchoRequest request_;
+ EchoResponse response_;
+ int num_msgs_read_{0};
+ int server_try_cancel_;
+ int server_write_last_;
+ std::mutex finish_mu_;
+ bool finished_{false};
+ };
+
+ return new Reactor;
}
} // namespace testing
diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h
index ddfe94487e..fad7768b87 100644
--- a/test/cpp/end2end/test_service_impl.h
+++ b/test/cpp/end2end/test_service_impl.h
@@ -72,13 +72,6 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
}
private:
- int GetIntValueFromMetadata(
- const char* key,
- const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
- int default_value);
-
- void ServerTryCancel(ServerContext* context);
-
bool signal_client_;
std::mutex mu_;
std::unique_ptr<grpc::string> host_;
@@ -95,6 +88,15 @@ class CallbackTestServiceImpl
EchoResponse* response,
experimental::ServerCallbackRpcController* controller) override;
+ experimental::ServerReadReactor<EchoRequest, EchoResponse>* RequestStream()
+ override;
+
+ experimental::ServerWriteReactor<EchoRequest, EchoResponse>* ResponseStream()
+ override;
+
+ experimental::ServerBidiReactor<EchoRequest, EchoResponse>* BidiStream()
+ override;
+
// Unimplemented is left unimplemented to test the returned error.
bool signal_client() {
std::unique_lock<std::mutex> lock(mu_);
@@ -106,11 +108,6 @@ class CallbackTestServiceImpl
EchoResponse* response,
experimental::ServerCallbackRpcController* controller);
- int GetIntValueFromMetadata(
- const char* key,
- const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
- int default_value);
-
Alarm alarm_;
bool signal_client_;
std::mutex mu_;