aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2018-08-14 15:04:35 -0700
committerGravatar Vijay Pai <vpai@google.com>2018-08-14 16:23:21 -0700
commit14ad82a76de99de39460d901cf44767308859ae0 (patch)
tree5b5651416c040123b77b65cfe978258a347472c3
parent8165c4c0aaa0fc020f1c74a67d4e50dff6c5e9b1 (diff)
Create a new method handler for resource exhaustion and tie into thread mgr
-rw-r--r--include/grpcpp/impl/codegen/byte_buffer.h4
-rw-r--r--include/grpcpp/impl/codegen/completion_queue.h6
-rw-r--r--include/grpcpp/impl/codegen/method_handler_impl.h17
-rw-r--r--include/grpcpp/impl/codegen/server_context.h6
-rw-r--r--include/grpcpp/server.h3
-rw-r--r--src/cpp/server/server_cc.cc22
-rw-r--r--src/cpp/thread_manager/thread_manager.cc35
-rw-r--r--src/cpp/thread_manager/thread_manager.h2
-rw-r--r--test/cpp/end2end/thread_stress_test.cc113
-rw-r--r--test/cpp/thread_manager/thread_manager_test.cc4
10 files changed, 125 insertions, 87 deletions
diff --git a/include/grpcpp/impl/codegen/byte_buffer.h b/include/grpcpp/impl/codegen/byte_buffer.h
index 86c047ebe7..8cc5158115 100644
--- a/include/grpcpp/impl/codegen/byte_buffer.h
+++ b/include/grpcpp/impl/codegen/byte_buffer.h
@@ -45,6 +45,8 @@ template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
+template <StatusCode code>
+class ErrorMethodHandler;
template <class R>
class DeserializeFuncType;
class GrpcByteBufferPeer;
@@ -144,6 +146,8 @@ class ByteBuffer final {
friend class internal::RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class internal::ServerStreamingHandler;
+ template <StatusCode code>
+ friend class internal::ErrorMethodHandler;
template <class R>
friend class internal::DeserializeFuncType;
friend class ProtoBufferReader;
diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h
index 272575dac2..3f7d4fb765 100644
--- a/include/grpcpp/impl/codegen/completion_queue.h
+++ b/include/grpcpp/impl/codegen/completion_queue.h
@@ -78,9 +78,10 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
-class UnknownMethodHandler;
template <class Streamer, bool WriteNeeded>
class TemplatedBidiStreamingHandler;
+template <StatusCode code>
+class ErrorMethodHandler;
template <class InputMessage, class OutputMessage>
class BlockingUnaryCallImpl;
} // namespace internal
@@ -265,7 +266,8 @@ class CompletionQueue : private GrpcLibraryCodegen {
friend class ::grpc::internal::ServerStreamingHandler;
template <class Streamer, bool WriteNeeded>
friend class ::grpc::internal::TemplatedBidiStreamingHandler;
- friend class ::grpc::internal::UnknownMethodHandler;
+ template <StatusCode code>
+ friend class ::grpc::internal::ErrorMethodHandler;
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
friend class ::grpc::ServerInterface;
diff --git a/include/grpcpp/impl/codegen/method_handler_impl.h b/include/grpcpp/impl/codegen/method_handler_impl.h
index 851aa2a024..53117f941b 100644
--- a/include/grpcpp/impl/codegen/method_handler_impl.h
+++ b/include/grpcpp/impl/codegen/method_handler_impl.h
@@ -272,12 +272,14 @@ class SplitServerStreamingHandler
ServerSplitStreamer<RequestType, ResponseType>, false>(func) {}
};
-/// Handle unknown method by returning UNIMPLEMENTED error.
-class UnknownMethodHandler : public MethodHandler {
+/// General method handler class for errors that prevent real method use
+/// e.g., handle unknown method by returning UNIMPLEMENTED error.
+template <StatusCode code>
+class ErrorMethodHandler : public MethodHandler {
public:
template <class T>
static void FillOps(ServerContext* context, T* ops) {
- Status status(StatusCode::UNIMPLEMENTED, "");
+ Status status(code, "");
if (!context->sent_initial_metadata_) {
ops->SendInitialMetadata(context->initial_metadata_,
context->initial_metadata_flags());
@@ -294,9 +296,18 @@ class UnknownMethodHandler : public MethodHandler {
FillOps(param.server_context, &ops);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
+ // We also have to destroy any request payload in the handler parameter
+ ByteBuffer* payload = param.request.bbuf_ptr();
+ if (payload != nullptr) {
+ payload->Clear();
+ }
}
};
+typedef ErrorMethodHandler<StatusCode::UNIMPLEMENTED> UnknownMethodHandler;
+typedef ErrorMethodHandler<StatusCode::RESOURCE_EXHAUSTED>
+ ResourceExhaustedHandler;
+
} // namespace internal
} // namespace grpc
diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h
index 153b404d9e..10372de129 100644
--- a/include/grpcpp/impl/codegen/server_context.h
+++ b/include/grpcpp/impl/codegen/server_context.h
@@ -63,9 +63,10 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
-class UnknownMethodHandler;
template <class Streamer, bool WriteNeeded>
class TemplatedBidiStreamingHandler;
+template <StatusCode code>
+class ErrorMethodHandler;
class Call;
} // namespace internal
@@ -262,7 +263,8 @@ class ServerContext {
friend class ::grpc::internal::ServerStreamingHandler;
template <class Streamer, bool WriteNeeded>
friend class ::grpc::internal::TemplatedBidiStreamingHandler;
- friend class ::grpc::internal::UnknownMethodHandler;
+ template <StatusCode code>
+ friend class internal::ErrorMethodHandler;
friend class ::grpc::ClientContext;
/// Prevent copying.
diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h
index 189d8bec22..72544c0f0b 100644
--- a/include/grpcpp/server.h
+++ b/include/grpcpp/server.h
@@ -223,6 +223,9 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
std::unique_ptr<HealthCheckServiceInterface> health_check_service_;
bool health_check_service_disabled_;
+
+ // A special handler for resource exhausted in sync case
+ std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_;
};
} // namespace grpc
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index d32d6b4904..66d432c910 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -210,8 +210,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
call_(mrd->call_, server, &cq_, server->max_receive_message_size()),
ctx_(mrd->deadline_, &mrd->request_metadata_),
has_request_payload_(mrd->has_request_payload_),
- request_payload_(mrd->request_payload_),
- method_(mrd->method_) {
+ request_payload_(has_request_payload_ ? mrd->request_payload_
+ : nullptr),
+ method_(mrd->method_),
+ server_(server) {
ctx_.set_call(mrd->call_);
ctx_.cq_ = &cq_;
GPR_ASSERT(mrd->in_flight_);
@@ -225,10 +227,13 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
}
}
- void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks) {
+ void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
+ bool resources) {
ctx_.BeginCompletionOp(&call_);
global_callbacks->PreSynchronousRequest(&ctx_);
- method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter(
+ auto* handler = resources ? method_->handler()
+ : server_->resource_exhausted_handler_.get();
+ handler->RunHandler(internal::MethodHandler::HandlerParameter(
&call_, &ctx_, request_payload_));
global_callbacks->PostSynchronousRequest(&ctx_);
request_payload_ = nullptr;
@@ -250,6 +255,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
internal::RpcServiceMethod* const method_;
+ Server* server_;
};
private:
@@ -300,7 +306,7 @@ class Server::SyncRequestThreadManager : public ThreadManager {
GPR_UNREACHABLE_CODE(return TIMEOUT);
}
- void DoWork(void* tag, bool ok) override {
+ void DoWork(void* tag, bool ok, bool resources) override {
SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
if (!sync_req) {
@@ -320,7 +326,7 @@ class Server::SyncRequestThreadManager : public ThreadManager {
}
GPR_TIMER_SCOPE("cd.Run()", 0);
- cd.Run(global_callbacks_);
+ cd.Run(global_callbacks_, resources);
}
// TODO (sreek) If ok is false here (which it isn't in case of
// grpc_request_registered_call), we should still re-queue the request
@@ -578,6 +584,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
}
}
+ if (!sync_server_cqs_->empty()) {
+ resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler);
+ }
+
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start();
}
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc
index fa9eec5f9b..e48bf951ea 100644
--- a/src/cpp/thread_manager/thread_manager.cc
+++ b/src/cpp/thread_manager/thread_manager.cc
@@ -166,22 +166,37 @@ void ThreadManager::MainWorkLoop() {
case WORK_FOUND:
// If we got work and there are now insufficient pollers and there is
// quota available to create a new thread, start a new poller thread
- if (!shutdown_ && num_pollers_ < min_pollers_ &&
- grpc_resource_user_allocate_threads(resource_user_, 1)) {
- num_pollers_++;
- num_threads_++;
- if (num_threads_ > max_active_threads_sofar_) {
- max_active_threads_sofar_ = num_threads_;
+ bool got_thread;
+ if (!shutdown_ && num_pollers_ < min_pollers_) {
+ if (grpc_resource_user_allocate_threads(resource_user_, 1)) {
+ num_pollers_++;
+ num_threads_++;
+ if (num_threads_ > max_active_threads_sofar_) {
+ max_active_threads_sofar_ = num_threads_;
+ }
+ // Drop lock before spawning thread to avoid contention
+ lock.unlock();
+ new WorkerThread(this);
+ got_thread = true;
+ } else if (num_pollers_ > 0) {
+ // There is still at least some thread polling, so we can go on
+ // even though we couldn't allocate a new thread
+ lock.unlock();
+ got_thread = true;
+ } else {
+ // There are no pollers to spare and we couldn't allocate
+ // a new thread, so resources are exhausted!
+ lock.unlock();
+ got_thread = false;
}
- // Drop lock before spawning thread to avoid contention
- lock.unlock();
- new WorkerThread(this);
} else {
// Drop lock for consistency with above branch
lock.unlock();
+ got_thread = true;
}
// Lock is always released at this point - do the application work
- DoWork(tag, ok);
+ // or return resource exhausted
+ DoWork(tag, ok, got_thread);
// Take the lock again to check post conditions
lock.lock();
// If we're shutdown, we should finish at this point.
diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h
index 01043edb31..352f80baf4 100644
--- a/src/cpp/thread_manager/thread_manager.h
+++ b/src/cpp/thread_manager/thread_manager.h
@@ -72,7 +72,7 @@ class ThreadManager {
// The implementation of DoWork() should also do any setup needed to ensure
// that the next call to PollForWork() (not necessarily by the current thread)
// actually finds some work
- virtual void DoWork(void* tag, bool ok) = 0;
+ virtual void DoWork(void* tag, bool ok, bool resources) = 0;
// Mark the ThreadManager as shutdown and begin draining the work. This is a
// non-blocking call and the caller should call Wait(), a blocking call which
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc
index ccf8400a87..94ad684fe7 100644
--- a/test/cpp/end2end/thread_stress_test.cc
+++ b/test/cpp/end2end/thread_stress_test.cc
@@ -16,6 +16,7 @@
*
*/
+#include <cinttypes>
#include <mutex>
#include <thread>
@@ -24,6 +25,7 @@
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
+#include <grpcpp/resource_quota.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
@@ -51,63 +53,13 @@ namespace testing {
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
public:
- TestServiceImpl() : signal_client_(false) {}
+ TestServiceImpl() {}
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
response->set_message(request->message());
return Status::OK;
}
-
- // Unimplemented is left unimplemented to test the returned error.
-
- Status RequestStream(ServerContext* context,
- ServerReader<EchoRequest>* reader,
- EchoResponse* response) override {
- EchoRequest request;
- response->set_message("");
- while (reader->Read(&request)) {
- response->mutable_message()->append(request.message());
- }
- return Status::OK;
- }
-
- // Return 3 messages.
- // TODO(yangg) make it generic by adding a parameter into EchoRequest
- Status ResponseStream(ServerContext* context, const EchoRequest* request,
- ServerWriter<EchoResponse>* writer) override {
- EchoResponse response;
- response.set_message(request->message() + "0");
- writer->Write(response);
- response.set_message(request->message() + "1");
- writer->Write(response);
- response.set_message(request->message() + "2");
- writer->Write(response);
-
- return Status::OK;
- }
-
- Status BidiStream(
- ServerContext* context,
- ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
- EchoRequest request;
- EchoResponse response;
- while (stream->Read(&request)) {
- gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
- response.set_message(request.message());
- stream->Write(response);
- }
- return Status::OK;
- }
-
- bool signal_client() {
- std::unique_lock<std::mutex> lock(mu_);
- return signal_client_;
- }
-
- private:
- bool signal_client_;
- std::mutex mu_;
};
template <class Service>
@@ -118,6 +70,7 @@ class CommonStressTest {
virtual void SetUp() = 0;
virtual void TearDown() = 0;
virtual void ResetStub() = 0;
+ virtual bool AllowExhaustion() = 0;
grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
protected:
@@ -146,6 +99,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
CreateChannel(server_address_.str(), InsecureChannelCredentials());
this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
+ bool AllowExhaustion() override { return false; }
protected:
void SetUpStart(ServerBuilder* builder, Service* service) override {
@@ -161,7 +115,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
std::ostringstream server_address_;
};
-template <class Service>
+template <class Service, bool allow_resource_exhaustion>
class CommonStressTestInproc : public CommonStressTest<Service> {
public:
void ResetStub() override {
@@ -169,6 +123,7 @@ class CommonStressTestInproc : public CommonStressTest<Service> {
std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args);
this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
+ bool AllowExhaustion() override { return allow_resource_exhaustion; }
protected:
void SetUpStart(ServerBuilder* builder, Service* service) override {
@@ -194,6 +149,26 @@ class CommonStressTestSyncServer : public BaseClass {
};
template <class BaseClass>
+class CommonStressTestSyncServerLowThreadCount : public BaseClass {
+ public:
+ void SetUp() override {
+ ServerBuilder builder;
+ ResourceQuota quota;
+ this->SetUpStart(&builder, &service_);
+ quota.SetMaxThreads(4);
+ builder.SetResourceQuota(quota);
+ this->SetUpEnd(&builder);
+ }
+ void TearDown() override {
+ this->TearDownStart();
+ this->TearDownEnd();
+ }
+
+ private:
+ TestServiceImpl service_;
+};
+
+template <class BaseClass>
class CommonStressTestAsyncServer : public BaseClass {
public:
CommonStressTestAsyncServer() : contexts_(kNumAsyncServerThreads * 100) {}
@@ -293,7 +268,8 @@ class End2endTest : public ::testing::Test {
Common common_;
};
-static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
+static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
+ bool allow_exhaustion, gpr_atm* errors) {
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@@ -301,34 +277,49 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
for (int i = 0; i < num_rpcs; ++i) {
ClientContext context;
Status s = stub->Echo(&context, request, &response);
- EXPECT_EQ(response.message(), request.message());
+ EXPECT_TRUE(s.ok() || (allow_exhaustion &&
+ s.error_code() == StatusCode::RESOURCE_EXHAUSTED));
if (!s.ok()) {
- gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
- s.error_message().c_str());
+ if (!(allow_exhaustion &&
+ s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) {
+ gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
+ s.error_message().c_str());
+ }
+ gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1));
+ } else {
+ EXPECT_EQ(response.message(), request.message());
}
- ASSERT_TRUE(s.ok());
}
}
typedef ::testing::Types<
CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>,
- CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl>>,
+ CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>,
+ CommonStressTestSyncServerLowThreadCount<
+ CommonStressTestInproc<TestServiceImpl, true>>,
CommonStressTestAsyncServer<
CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>,
- CommonStressTestAsyncServer<
- CommonStressTestInproc<grpc::testing::EchoTestService::AsyncService>>>
+ CommonStressTestAsyncServer<CommonStressTestInproc<
+ grpc::testing::EchoTestService::AsyncService, false>>>
CommonTypes;
TYPED_TEST_CASE(End2endTest, CommonTypes);
TYPED_TEST(End2endTest, ThreadStress) {
this->common_.ResetStub();
std::vector<std::thread> threads;
+ gpr_atm errors;
+ gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0));
threads.reserve(kNumThreads);
for (int i = 0; i < kNumThreads; ++i) {
- threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs);
+ threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs,
+ this->common_.AllowExhaustion(), &errors);
}
for (int i = 0; i < kNumThreads; ++i) {
threads[i].join();
}
+ uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors));
+ if (error_cnt != 0) {
+ gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt);
+ }
}
template <class Common>
diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc
index 838f5f72ad..99de5a3e01 100644
--- a/test/cpp/thread_manager/thread_manager_test.cc
+++ b/test/cpp/thread_manager/thread_manager_test.cc
@@ -55,7 +55,7 @@ class ThreadManagerTest final : public grpc::ThreadManager {
num_work_found_(0) {}
grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override;
- void DoWork(void* tag, bool ok) override;
+ void DoWork(void* tag, bool ok, bool resources) override;
// Get number of times PollForWork() returned WORK_FOUND
int GetNumWorkFound();
@@ -102,7 +102,7 @@ grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void** tag,
return WORK_FOUND;
}
-void ThreadManagerTest::DoWork(void* tag, bool ok) {
+void ThreadManagerTest::DoWork(void* tag, bool ok, bool resources) {
gpr_atm_no_barrier_fetch_add(&num_do_work_, 1);
SleepForMs(settings_.work_duration_ms); // Simulate work by sleeping
}