aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/end2end/test_service_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/end2end/test_service_impl.cc')
-rw-r--r--test/cpp/end2end/test_service_impl.cc356
1 files changed, 320 insertions, 36 deletions
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index 605356724f..6729ad14f4 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -69,6 +69,62 @@ void CheckServerAuthContext(
EXPECT_EQ(expected_client_identity, identity[0]);
}
}
+
+// Returns the number of pairs in metadata that exactly match the given
+// key-value pair. Returns -1 if the pair wasn't found.
+int MetadataMatchCount(
+ const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
+ const grpc::string& key, const grpc::string& value) {
+ int count = 0;
+ for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
+ metadata.begin();
+ iter != metadata.end(); ++iter) {
+ if (ToString(iter->first) == key && ToString(iter->second) == value) {
+ count++;
+ }
+ }
+ return count;
+}
+} // namespace
+
+namespace {
+int GetIntValueFromMetadataHelper(
+ const char* key,
+ const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
+ int default_value) {
+ if (metadata.find(key) != metadata.end()) {
+ std::istringstream iss(ToString(metadata.find(key)->second));
+ iss >> default_value;
+ gpr_log(GPR_INFO, "%s : %d", key, default_value);
+ }
+
+ return default_value;
+}
+
+int GetIntValueFromMetadata(
+ const char* key,
+ const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
+ int default_value) {
+ return GetIntValueFromMetadataHelper(key, metadata, default_value);
+}
+
+void ServerTryCancel(ServerContext* context) {
+ EXPECT_FALSE(context->IsCancelled());
+ context->TryCancel();
+ gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
+ // Now wait until it's really canceled
+ while (!context->IsCancelled()) {
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_micros(1000, GPR_TIMESPAN)));
+ }
+}
+
+void ServerTryCancelNonblocking(ServerContext* context) {
+ EXPECT_FALSE(context->IsCancelled());
+ context->TryCancel();
+ gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
+}
+
} // namespace
Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
@@ -165,6 +221,18 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
return Status::OK;
}
+Status TestServiceImpl::CheckClientInitialMetadata(ServerContext* context,
+ const SimpleRequest* request,
+ SimpleResponse* response) {
+ EXPECT_EQ(MetadataMatchCount(context->client_metadata(),
+ kCheckClientInitialMetadataKey,
+ kCheckClientInitialMetadataVal),
+ 1);
+ EXPECT_EQ(1u,
+ context->client_metadata().count(kCheckClientInitialMetadataKey));
+ return Status::OK;
+}
+
void CallbackTestServiceImpl::Echo(
ServerContext* context, const EchoRequest* request, EchoResponse* response,
experimental::ServerCallbackRpcController* controller) {
@@ -183,6 +251,19 @@ void CallbackTestServiceImpl::Echo(
}
}
+void CallbackTestServiceImpl::CheckClientInitialMetadata(
+ ServerContext* context, const SimpleRequest* request,
+ SimpleResponse* response,
+ experimental::ServerCallbackRpcController* controller) {
+ EXPECT_EQ(MetadataMatchCount(context->client_metadata(),
+ kCheckClientInitialMetadataKey,
+ kCheckClientInitialMetadataVal),
+ 1);
+ EXPECT_EQ(1u,
+ context->client_metadata().count(kCheckClientInitialMetadataKey));
+ controller->Finish(Status::OK);
+}
+
void CallbackTestServiceImpl::EchoNonDelayed(
ServerContext* context, const EchoRequest* request, EchoResponse* response,
experimental::ServerCallbackRpcController* controller) {
@@ -195,6 +276,7 @@ void CallbackTestServiceImpl::EchoNonDelayed(
controller->Finish(Status(static_cast<StatusCode>(error.code()),
error.error_message(),
error.binary_error_details()));
+ return;
}
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
@@ -223,6 +305,7 @@ void CallbackTestServiceImpl::EchoNonDelayed(
return;
}
+ gpr_log(GPR_DEBUG, "Request message was %s", request->message().c_str());
response->set_message(request->message());
MaybeEchoDeadline(context, request, response);
if (host_) {
@@ -253,7 +336,7 @@ void CallbackTestServiceImpl::EchoNonDelayed(
alarm_.experimental().Set(
gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(request->param().client_cancel_after_us(),
+ gpr_time_from_micros(request->param().server_cancel_after_us(),
GPR_TIMESPAN)),
[controller](bool) { controller->Finish(Status::CANCELLED); });
return;
@@ -278,6 +361,7 @@ void CallbackTestServiceImpl::EchoNonDelayed(
request->param().debug_info().SerializeAsString();
context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info);
controller->Finish(Status::CANCELLED);
+ return;
}
}
if (request->has_param() &&
@@ -324,7 +408,7 @@ Status TestServiceImpl::RequestStream(ServerContext* context,
std::thread* server_try_cancel_thd = nullptr;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
- new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
+ new std::thread([context] { ServerTryCancel(context); });
}
int num_msgs_read = 0;
@@ -379,7 +463,7 @@ Status TestServiceImpl::ResponseStream(ServerContext* context,
std::thread* server_try_cancel_thd = nullptr;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
- new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
+ new std::thread([context] { ServerTryCancel(context); });
}
for (int i = 0; i < server_responses_to_send; i++) {
@@ -430,7 +514,7 @@ Status TestServiceImpl::BidiStream(
std::thread* server_try_cancel_thd = nullptr;
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
- new std::thread(&TestServiceImpl::ServerTryCancel, this, context);
+ new std::thread([context] { ServerTryCancel(context); });
}
// kServerFinishAfterNReads suggests after how many reads, the server should
@@ -464,44 +548,244 @@ Status TestServiceImpl::BidiStream(
return Status::OK;
}
-namespace {
-int GetIntValueFromMetadataHelper(
- const char* key,
- const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
- int default_value) {
- if (metadata.find(key) != metadata.end()) {
- std::istringstream iss(ToString(metadata.find(key)->second));
- iss >> default_value;
- gpr_log(GPR_INFO, "%s : %d", key, default_value);
- }
+experimental::ServerReadReactor<EchoRequest, EchoResponse>*
+CallbackTestServiceImpl::RequestStream() {
+ class Reactor : public ::grpc::experimental::ServerReadReactor<EchoRequest,
+ EchoResponse> {
+ public:
+ Reactor() {}
+ void OnStarted(ServerContext* context, EchoResponse* response) override {
+ ctx_ = context;
+ response_ = response;
+ // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
+ // the server by calling ServerContext::TryCancel() depending on the
+ // value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
+ // reads any message from the client CANCEL_DURING_PROCESSING: The RPC
+ // is cancelled while the server is reading messages from the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
+ // all the messages from the client
+ server_try_cancel_ = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+
+ response_->set_message("");
+
+ if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancelNonblocking(ctx_);
+ return;
+ }
- return default_value;
-}
-}; // namespace
+ if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
+ ctx_->TryCancel();
+ // Don't wait for it here
+ }
-int TestServiceImpl::GetIntValueFromMetadata(
- const char* key,
- const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
- int default_value) {
- return GetIntValueFromMetadataHelper(key, metadata, default_value);
+ StartRead(&request_);
+ }
+ void OnDone() override { delete this; }
+ void OnCancel() override { FinishOnce(Status::CANCELLED); }
+ void OnReadDone(bool ok) override {
+ if (ok) {
+ response_->mutable_message()->append(request_.message());
+ num_msgs_read_++;
+ StartRead(&request_);
+ } else {
+ gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read_);
+
+ if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
+ // Let OnCancel recover this
+ return;
+ }
+ if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancelNonblocking(ctx_);
+ return;
+ }
+ FinishOnce(Status::OK);
+ }
+ }
+
+ private:
+ void FinishOnce(const Status& s) {
+ std::lock_guard<std::mutex> l(finish_mu_);
+ if (!finished_) {
+ Finish(s);
+ finished_ = true;
+ }
+ }
+
+ ServerContext* ctx_;
+ EchoResponse* response_;
+ EchoRequest request_;
+ int num_msgs_read_{0};
+ int server_try_cancel_;
+ std::mutex finish_mu_;
+ bool finished_{false};
+ };
+
+ return new Reactor;
}
-int CallbackTestServiceImpl::GetIntValueFromMetadata(
- const char* key,
- const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
- int default_value) {
- return GetIntValueFromMetadataHelper(key, metadata, default_value);
+// Return 'kNumResponseStreamMsgs' messages.
+// TODO(yangg) make it generic by adding a parameter into EchoRequest
+experimental::ServerWriteReactor<EchoRequest, EchoResponse>*
+CallbackTestServiceImpl::ResponseStream() {
+ class Reactor
+ : public ::grpc::experimental::ServerWriteReactor<EchoRequest,
+ EchoResponse> {
+ public:
+ Reactor() {}
+ void OnStarted(ServerContext* context,
+ const EchoRequest* request) override {
+ ctx_ = context;
+ request_ = request;
+ // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
+ // the server by calling ServerContext::TryCancel() depending on the
+ // value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
+ // reads any message from the client CANCEL_DURING_PROCESSING: The RPC
+ // is cancelled while the server is reading messages from the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
+ // all the messages from the client
+ server_try_cancel_ = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+ server_coalescing_api_ = GetIntValueFromMetadata(
+ kServerUseCoalescingApi, context->client_metadata(), 0);
+ server_responses_to_send_ = GetIntValueFromMetadata(
+ kServerResponseStreamsToSend, context->client_metadata(),
+ kServerDefaultResponseStreamsToSend);
+ if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancelNonblocking(ctx_);
+ return;
+ }
+
+ if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
+ ctx_->TryCancel();
+ }
+ if (num_msgs_sent_ < server_responses_to_send_) {
+ NextWrite();
+ }
+ }
+ void OnDone() override { delete this; }
+ void OnCancel() override { FinishOnce(Status::CANCELLED); }
+ void OnWriteDone(bool ok) override {
+ if (num_msgs_sent_ < server_responses_to_send_) {
+ NextWrite();
+ } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
+ // Let OnCancel recover this
+ } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancelNonblocking(ctx_);
+ } else {
+ FinishOnce(Status::OK);
+ }
+ }
+
+ private:
+ void FinishOnce(const Status& s) {
+ std::lock_guard<std::mutex> l(finish_mu_);
+ if (!finished_) {
+ Finish(s);
+ finished_ = true;
+ }
+ }
+
+ void NextWrite() {
+ response_.set_message(request_->message() +
+ grpc::to_string(num_msgs_sent_));
+ if (num_msgs_sent_ == server_responses_to_send_ - 1 &&
+ server_coalescing_api_ != 0) {
+ num_msgs_sent_++;
+ StartWriteLast(&response_, WriteOptions());
+ } else {
+ num_msgs_sent_++;
+ StartWrite(&response_);
+ }
+ }
+ ServerContext* ctx_;
+ const EchoRequest* request_;
+ EchoResponse response_;
+ int num_msgs_sent_{0};
+ int server_try_cancel_;
+ int server_coalescing_api_;
+ int server_responses_to_send_;
+ std::mutex finish_mu_;
+ bool finished_{false};
+ };
+ return new Reactor;
}
-void TestServiceImpl::ServerTryCancel(ServerContext* context) {
- EXPECT_FALSE(context->IsCancelled());
- context->TryCancel();
- gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
- // Now wait until it's really canceled
- while (!context->IsCancelled()) {
- gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(1000, GPR_TIMESPAN)));
- }
+experimental::ServerBidiReactor<EchoRequest, EchoResponse>*
+CallbackTestServiceImpl::BidiStream() {
+ class Reactor : public ::grpc::experimental::ServerBidiReactor<EchoRequest,
+ EchoResponse> {
+ public:
+ Reactor() {}
+ void OnStarted(ServerContext* context) override {
+ ctx_ = context;
+ // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
+ // the server by calling ServerContext::TryCancel() depending on the
+ // value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
+ // reads any message from the client CANCEL_DURING_PROCESSING: The RPC
+ // is cancelled while the server is reading messages from the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
+ // all the messages from the client
+ server_try_cancel_ = GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+ server_write_last_ = GetIntValueFromMetadata(
+ kServerFinishAfterNReads, context->client_metadata(), 0);
+ if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
+ ServerTryCancelNonblocking(ctx_);
+ return;
+ }
+
+ if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
+ ctx_->TryCancel();
+ }
+
+ StartRead(&request_);
+ }
+ void OnDone() override { delete this; }
+ void OnCancel() override { FinishOnce(Status::CANCELLED); }
+ void OnReadDone(bool ok) override {
+ if (ok) {
+ num_msgs_read_++;
+ gpr_log(GPR_INFO, "recv msg %s", request_.message().c_str());
+ response_.set_message(request_.message());
+ if (num_msgs_read_ == server_write_last_) {
+ StartWriteLast(&response_, WriteOptions());
+ } else {
+ StartWrite(&response_);
+ }
+ } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
+ // Let OnCancel handle this
+ } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
+ ServerTryCancelNonblocking(ctx_);
+ } else {
+ FinishOnce(Status::OK);
+ }
+ }
+ void OnWriteDone(bool ok) override { StartRead(&request_); }
+
+ private:
+ void FinishOnce(const Status& s) {
+ std::lock_guard<std::mutex> l(finish_mu_);
+ if (!finished_) {
+ Finish(s);
+ finished_ = true;
+ }
+ }
+
+ ServerContext* ctx_;
+ EchoRequest request_;
+ EchoResponse response_;
+ int num_msgs_read_{0};
+ int server_try_cancel_;
+ int server_write_last_;
+ std::mutex finish_mu_;
+ bool finished_{false};
+ };
+
+ return new Reactor;
}
} // namespace testing