diff options
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/common/channel_filter.cc | 9 | ||||
-rw-r--r-- | src/cpp/common/channel_filter.h | 7 | ||||
-rw-r--r-- | src/cpp/common/resource_quota_cc.cc | 4 | ||||
-rw-r--r-- | src/cpp/common/version_cc.cc | 2 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/grpc_context.cc | 38 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/grpc_plugin.cc | 6 | ||||
-rw-r--r-- | src/cpp/ext/filters/census/grpc_plugin.h | 4 | ||||
-rw-r--r-- | src/cpp/server/health/default_health_check_service.cc | 484 | ||||
-rw-r--r-- | src/cpp/server/health/default_health_check_service.h | 242 | ||||
-rw-r--r-- | src/cpp/server/health/health.pb.c | 1 | ||||
-rw-r--r-- | src/cpp/server/health/health.pb.h | 7 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 2 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 84 | ||||
-rw-r--r-- | src/cpp/thread_manager/thread_manager.cc | 81 | ||||
-rw-r--r-- | src/cpp/thread_manager/thread_manager.h | 54 |
15 files changed, 847 insertions, 178 deletions
diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc index 0634b0416f..422e7bb65e 100644 --- a/src/cpp/common/channel_filter.cc +++ b/src/cpp/common/channel_filter.cc @@ -78,13 +78,8 @@ bool MaybeAddFilter(grpc_channel_stack_builder* builder, void* arg) { grpc_channel_stack_builder_get_channel_arguments(builder); if (!filter.include_filter(*args)) return true; } - if (filter.prepend) { - return grpc_channel_stack_builder_prepend_filter(builder, &filter.filter, - nullptr, nullptr); - } else { - return grpc_channel_stack_builder_append_filter(builder, &filter.filter, - nullptr, nullptr); - } + return grpc_channel_stack_builder_prepend_filter(builder, &filter.filter, + nullptr, nullptr); } } // namespace diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index 359c72737c..5e569c97e6 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -36,8 +36,7 @@ /// \c ChannelData. Then register the filter using something like this: /// \code{.cpp} /// RegisterChannelFilter<MyChannelDataSubclass, MyCallDataSubclass>( -/// "name-of-filter", GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_LOW, -/// true, nullptr); +/// "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr); /// \endcode namespace grpc { @@ -352,7 +351,6 @@ class ChannelFilter final { struct FilterRecord { grpc_channel_stack_type stack_type; int priority; - bool prepend; std::function<bool(const grpc_channel_args&)> include_filter; grpc_channel_filter filter; }; @@ -365,14 +363,12 @@ void ChannelFilterPluginShutdown(); /// Registers a new filter. /// Must be called by only one thread at a time. -/// The \a prepend argument decides whether to prepend or append the filter. /// The \a include_filter argument specifies a function that will be called /// to determine at run-time whether or not to add the filter. If the /// value is nullptr, the filter will be added unconditionally. template <typename ChannelDataType, typename CallDataType> void RegisterChannelFilter( const char* name, grpc_channel_stack_type stack_type, int priority, - bool prepend, std::function<bool(const grpc_channel_args&)> include_filter) { // If we haven't been called before, initialize channel_filters and // call grpc_register_plugin(). @@ -387,7 +383,6 @@ void RegisterChannelFilter( internal::FilterRecord filter_record = { stack_type, priority, - prepend, include_filter, {FilterType::StartTransportStreamOpBatch, FilterType::StartTransportOp, FilterType::call_data_size, FilterType::InitCallElement, diff --git a/src/cpp/common/resource_quota_cc.cc b/src/cpp/common/resource_quota_cc.cc index daeb0ba171..276e5f7954 100644 --- a/src/cpp/common/resource_quota_cc.cc +++ b/src/cpp/common/resource_quota_cc.cc @@ -33,4 +33,8 @@ ResourceQuota& ResourceQuota::Resize(size_t new_size) { return *this; } +ResourceQuota& ResourceQuota::SetMaxThreads(int new_max_threads) { + grpc_resource_quota_set_max_threads(impl_, new_max_threads); + return *this; +} } // namespace grpc diff --git a/src/cpp/common/version_cc.cc b/src/cpp/common/version_cc.cc index b8fca9a6ee..cc797f1546 100644 --- a/src/cpp/common/version_cc.cc +++ b/src/cpp/common/version_cc.cc @@ -22,5 +22,5 @@ #include <grpcpp/grpcpp.h> namespace grpc { -grpc::string Version() { return "1.15.0-dev"; } +grpc::string Version() { return "1.16.0-dev"; } } // namespace grpc diff --git a/src/cpp/ext/filters/census/grpc_context.cc b/src/cpp/ext/filters/census/grpc_context.cc deleted file mode 100644 index 599a798dda..0000000000 --- a/src/cpp/ext/filters/census/grpc_context.cc +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#include <grpc/census.h> -#include <grpc/grpc.h> -#include "src/core/lib/surface/api_trace.h" -#include "src/core/lib/surface/call.h" - -void grpc_census_call_set_context(grpc_call* call, census_context* context) { - GRPC_API_TRACE("grpc_census_call_set_context(call=%p, census_context=%p)", 2, - (call, context)); - if (context != nullptr) { - grpc_call_context_set(call, GRPC_CONTEXT_TRACING, context, nullptr); - } -} - -census_context* grpc_census_call_get_context(grpc_call* call) { - GRPC_API_TRACE("grpc_census_call_get_context(call=%p)", 1, (call)); - return static_cast<census_context*>( - grpc_call_context_get(call, GRPC_CONTEXT_TRACING)); -} diff --git a/src/cpp/ext/filters/census/grpc_plugin.cc b/src/cpp/ext/filters/census/grpc_plugin.cc index f79e0e0e96..f978ed3bf5 100644 --- a/src/cpp/ext/filters/census/grpc_plugin.cc +++ b/src/cpp/ext/filters/census/grpc_plugin.cc @@ -32,12 +32,10 @@ namespace grpc { void RegisterOpenCensusPlugin() { RegisterChannelFilter<CensusChannelData, CensusClientCallData>( - "opencensus_client", GRPC_CLIENT_CHANNEL, - GRPC_CHANNEL_INIT_PRIORITY_VERY_HIGH, true /* prepend */, + "opencensus_client", GRPC_CLIENT_CHANNEL, INT_MAX /* priority */, nullptr /* condition function */); RegisterChannelFilter<CensusChannelData, CensusServerCallData>( - "opencensus_server", GRPC_SERVER_CHANNEL, - GRPC_CHANNEL_INIT_PRIORITY_VERY_HIGH, true /* prepend */, + "opencensus_server", GRPC_SERVER_CHANNEL, INT_MAX /* priority */, nullptr /* condition function */); // Access measures to ensure they are initialized. Otherwise, creating a view diff --git a/src/cpp/ext/filters/census/grpc_plugin.h b/src/cpp/ext/filters/census/grpc_plugin.h index 7ff2e7a8b8..9e319cb994 100644 --- a/src/cpp/ext/filters/census/grpc_plugin.h +++ b/src/cpp/ext/filters/census/grpc_plugin.h @@ -24,15 +24,11 @@ #include "absl/strings/string_view.h" #include "include/grpcpp/opencensus.h" #include "opencensus/stats/stats.h" -#include "opencensus/trace/span.h" namespace grpc { class ServerContext; -// Returns the tracing Span for the current RPC. -::opencensus::trace::Span GetSpanFromServerContext(ServerContext* context); - // The tag keys set when recording RPC stats. ::opencensus::stats::TagKey ClientMethodTagKey(); ::opencensus::stats::TagKey ClientStatusTagKey(); diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index bfda67d086..670da63a4a 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -30,29 +30,162 @@ #include "src/cpp/server/health/health.pb.h" namespace grpc { + +// +// DefaultHealthCheckService +// + +DefaultHealthCheckService::DefaultHealthCheckService() { + services_map_[""].SetServingStatus(SERVING); +} + +void DefaultHealthCheckService::SetServingStatus( + const grpc::string& service_name, bool serving) { + std::unique_lock<std::mutex> lock(mu_); + services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING); +} + +void DefaultHealthCheckService::SetServingStatus(bool serving) { + const ServingStatus status = serving ? SERVING : NOT_SERVING; + std::unique_lock<std::mutex> lock(mu_); + for (auto& p : services_map_) { + ServiceData& service_data = p.second; + service_data.SetServingStatus(status); + } +} + +DefaultHealthCheckService::ServingStatus +DefaultHealthCheckService::GetServingStatus( + const grpc::string& service_name) const { + std::lock_guard<std::mutex> lock(mu_); + auto it = services_map_.find(service_name); + if (it == services_map_.end()) { + return NOT_FOUND; + } + const ServiceData& service_data = it->second; + return service_data.GetServingStatus(); +} + +void DefaultHealthCheckService::RegisterCallHandler( + const grpc::string& service_name, + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { + std::unique_lock<std::mutex> lock(mu_); + ServiceData& service_data = services_map_[service_name]; + service_data.AddCallHandler(handler /* copies ref */); + handler->SendHealth(std::move(handler), service_data.GetServingStatus()); +} + +void DefaultHealthCheckService::UnregisterCallHandler( + const grpc::string& service_name, + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { + std::unique_lock<std::mutex> lock(mu_); + auto it = services_map_.find(service_name); + if (it == services_map_.end()) return; + ServiceData& service_data = it->second; + service_data.RemoveCallHandler(std::move(handler)); + if (service_data.Unused()) { + services_map_.erase(it); + } +} + +DefaultHealthCheckService::HealthCheckServiceImpl* +DefaultHealthCheckService::GetHealthCheckService( + std::unique_ptr<ServerCompletionQueue> cq) { + GPR_ASSERT(impl_ == nullptr); + impl_.reset(new HealthCheckServiceImpl(this, std::move(cq))); + return impl_.get(); +} + +// +// DefaultHealthCheckService::ServiceData +// + +void DefaultHealthCheckService::ServiceData::SetServingStatus( + ServingStatus status) { + status_ = status; + for (auto& call_handler : call_handlers_) { + call_handler->SendHealth(call_handler /* copies ref */, status); + } +} + +void DefaultHealthCheckService::ServiceData::AddCallHandler( + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { + call_handlers_.insert(std::move(handler)); +} + +void DefaultHealthCheckService::ServiceData::RemoveCallHandler( + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { + call_handlers_.erase(std::move(handler)); +} + +// +// DefaultHealthCheckService::HealthCheckServiceImpl +// + namespace { const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check"; +const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch"; } // namespace DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl( - DefaultHealthCheckService* service) - : service_(service), method_(nullptr) { - internal::MethodHandler* handler = - new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, - ByteBuffer>( - std::mem_fn(&HealthCheckServiceImpl::Check), this); - method_ = new internal::RpcServiceMethod( - kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler); - AddMethod(method_); -} - -Status DefaultHealthCheckService::HealthCheckServiceImpl::Check( - ServerContext* context, const ByteBuffer* request, ByteBuffer* response) { - // Decode request. - std::vector<Slice> slices; - if (!request->Dump(&slices).ok()) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + DefaultHealthCheckService* database, + std::unique_ptr<ServerCompletionQueue> cq) + : database_(database), cq_(std::move(cq)) { + // Add Check() method. + check_method_ = new internal::RpcServiceMethod( + kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr); + AddMethod(check_method_); + // Add Watch() method. + watch_method_ = new internal::RpcServiceMethod( + kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr); + AddMethod(watch_method_); + // Create serving thread. + thread_ = std::unique_ptr<::grpc_core::Thread>( + new ::grpc_core::Thread("grpc_health_check_service", Serve, this)); +} + +DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { + // We will reach here after the server starts shutting down. + shutdown_ = true; + { + std::unique_lock<std::mutex> lock(cq_shutdown_mu_); + cq_->Shutdown(); + } + thread_->Join(); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() { + thread_->Start(); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { + HealthCheckServiceImpl* service = + reinterpret_cast<HealthCheckServiceImpl*>(arg); + // TODO(juanlishen): This is a workaround to wait for the cq to be ready. + // Need to figure out why cq is not ready after service starts. + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(1, GPR_TIMESPAN))); + CheckCallHandler::CreateAndStart(service->cq_.get(), service->database_, + service); + WatchCallHandler::CreateAndStart(service->cq_.get(), service->database_, + service); + void* tag; + bool ok; + while (true) { + if (!service->cq_->Next(&tag, &ok)) { + // The completion queue is shutting down. + GPR_ASSERT(service->shutdown_); + break; + } + auto* next_step = static_cast<CallableTag*>(tag); + next_step->Run(ok); } +} + +bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( + const ByteBuffer& request, grpc::string* service_name) { + std::vector<Slice> slices; + if (!request.Dump(&slices).ok()) return false; uint8_t* request_bytes = nullptr; bool request_bytes_owned = false; size_t request_size = 0; @@ -64,14 +197,13 @@ Status DefaultHealthCheckService::HealthCheckServiceImpl::Check( request_size = slices[0].size(); } else { request_bytes_owned = true; - request_bytes = static_cast<uint8_t*>(gpr_malloc(request->Length())); + request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length())); uint8_t* copy_to = request_bytes; for (size_t i = 0; i < slices.size(); i++) { memcpy(copy_to, slices[i].begin(), slices[i].size()); copy_to += slices[i].size(); } } - if (request_bytes != nullptr) { pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size); bool decode_status = pb_decode( @@ -79,26 +211,22 @@ Status DefaultHealthCheckService::HealthCheckServiceImpl::Check( if (request_bytes_owned) { gpr_free(request_bytes); } - if (!decode_status) { - return Status(StatusCode::INVALID_ARGUMENT, ""); - } - } - - // Check status from the associated default health checking service. - DefaultHealthCheckService::ServingStatus serving_status = - service_->GetServingStatus( - request_struct.has_service ? request_struct.service : ""); - if (serving_status == DefaultHealthCheckService::NOT_FOUND) { - return Status(StatusCode::NOT_FOUND, ""); + if (!decode_status) return false; } + *service_name = request_struct.has_service ? request_struct.service : ""; + return true; +} - // Encode response +bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse( + ServingStatus status, ByteBuffer* response) { grpc_health_v1_HealthCheckResponse response_struct; response_struct.has_status = true; response_struct.status = - serving_status == DefaultHealthCheckService::SERVING - ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING - : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING; + status == NOT_FOUND + ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN + : status == SERVING + ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING + : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING; pb_ostream_t ostream; memset(&ostream, 0, sizeof(ostream)); pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields, @@ -108,48 +236,282 @@ Status DefaultHealthCheckService::HealthCheckServiceImpl::Check( GRPC_SLICE_LENGTH(response_slice)); bool encode_status = pb_encode( &ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct); - if (!encode_status) { - return Status(StatusCode::INTERNAL, "Failed to encode response."); - } + if (!encode_status) return false; Slice encoded_response(response_slice, Slice::STEAL_REF); ByteBuffer response_buffer(&encoded_response, 1); response->Swap(&response_buffer); - return Status::OK; + return true; } -DefaultHealthCheckService::DefaultHealthCheckService() { - services_map_.emplace("", true); +// +// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler +// + +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) { + std::shared_ptr<CallHandler> self = + std::make_shared<CheckCallHandler>(cq, database, service); + CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get()); + { + std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_); + if (service->shutdown_) return; + // Request a Check() call. + handler->next_ = + CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_, + &handler->writer_, cq, cq, &handler->next_); + } } -void DefaultHealthCheckService::SetServingStatus( - const grpc::string& service_name, bool serving) { - std::lock_guard<std::mutex> lock(mu_); - services_map_[service_name] = serving; +DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + CheckCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) + : cq_(cq), database_(database), service_(service), writer_(&ctx_) {} + +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) { + if (!ok) { + // The value of ok being false means that the server is shutting down. + return; + } + // Spawn a new handler instance to serve the next new client. Every handler + // instance will deallocate itself when it's done. + CreateAndStart(cq_, database_, service_); + // Process request. + gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_, + this); + grpc::string service_name; + grpc::Status status = Status::OK; + ByteBuffer response; + if (!service_->DecodeRequest(request_, &service_name)) { + status = Status(INVALID_ARGUMENT, ""); + } else { + ServingStatus serving_status = database_->GetServingStatus(service_name); + if (serving_status == NOT_FOUND) { + status = Status(StatusCode::NOT_FOUND, "service name unknown"); + } else if (!service_->EncodeResponse(serving_status, &response)) { + status = Status(INTERNAL, ""); + } + } + // Send response. + { + std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); + if (!service_->shutdown_) { + next_ = + CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + if (status.ok()) { + writer_.Finish(response, status, &next_); + } else { + writer_.FinishWithError(status, &next_); + } + } + } } -void DefaultHealthCheckService::SetServingStatus(bool serving) { - std::lock_guard<std::mutex> lock(mu_); - for (auto iter = services_map_.begin(); iter != services_map_.end(); ++iter) { - iter->second = serving; +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: + OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { + if (ok) { + gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p", + service_, this); } } -DefaultHealthCheckService::ServingStatus -DefaultHealthCheckService::GetServingStatus( - const grpc::string& service_name) const { - std::lock_guard<std::mutex> lock(mu_); - const auto& iter = services_map_.find(service_name); - if (iter == services_map_.end()) { - return NOT_FOUND; +// +// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler +// + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) { + std::shared_ptr<CallHandler> self = + std::make_shared<WatchCallHandler>(cq, database, service); + WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get()); + { + std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_); + if (service->shutdown_) return; + // Request AsyncNotifyWhenDone(). + handler->on_done_notified_ = + CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler, + std::placeholders::_1, std::placeholders::_2), + self /* copies ref */); + handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_); + // Request a Watch() call. + handler->next_ = + CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_, + &handler->stream_, cq, cq, + &handler->next_); } - return iter->second ? SERVING : NOT_SERVING; } -DefaultHealthCheckService::HealthCheckServiceImpl* -DefaultHealthCheckService::GetHealthCheckService() { - GPR_ASSERT(impl_ == nullptr); - impl_.reset(new HealthCheckServiceImpl(this)); - return impl_.get(); +DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + WatchCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service) + : cq_(cq), + database_(database), + service_(service), + stream_(&ctx_), + call_state_(WAITING_FOR_CALL) {} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) { + if (ok) { + call_state_ = CALL_RECEIVED; + } else { + // AsyncNotifyWhenDone() needs to be called before the call starts, but the + // tag will not pop out if the call never starts ( + // https://github.com/grpc/grpc/issues/10136). So we need to manually + // release the ownership of the handler in this case. + GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); + } + if (!ok || shutdown_) { + // The value of ok being false means that the server is shutting down. + Shutdown(std::move(self), "OnCallReceived"); + return; + } + // Spawn a new handler instance to serve the next new client. Every handler + // instance will deallocate itself when it's done. + CreateAndStart(cq_, database_, service_); + // Parse request. + if (!service_->DecodeRequest(request_, &service_name_)) { + on_finish_done_ = + CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + stream_.Finish(Status(INVALID_ARGUMENT, ""), &on_finish_done_); + call_state_ = FINISH_CALLED; + return; + } + // Register the call for updates to the service. + gpr_log(GPR_DEBUG, + "[HCS %p] Health check watch started for service \"%s\" " + "(handler: %p)", + service_, service_name_.c_str(), this); + database_->RegisterCallHandler(service_name_, std::move(self)); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) { + std::unique_lock<std::mutex> lock(mu_); + // If there's already a send in flight, cache the new status, and + // we'll start a new send for it when the one in flight completes. + if (send_in_flight_) { + pending_status_ = status; + return; + } + // Start a send. + SendHealthLocked(std::move(self), status); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) { + std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_); + if (service_->shutdown_) { + cq_lock.release()->unlock(); + Shutdown(std::move(self), "SendHealthLocked"); + return; + } + send_in_flight_ = true; + call_state_ = SEND_MESSAGE_PENDING; + // Construct response. + ByteBuffer response; + if (!service_->EncodeResponse(status, &response)) { + on_finish_done_ = + CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + stream_.Finish(Status(INTERNAL, ""), &on_finish_done_); + return; + } + next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + stream_.Write(response, &next_); +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) { + if (!ok || shutdown_) { + Shutdown(std::move(self), "OnSendHealthDone"); + return; + } + call_state_ = CALL_RECEIVED; + { + std::unique_lock<std::mutex> lock(mu_); + send_in_flight_ = false; + // If we got a new status since we started the last send, start a + // new send for it. + if (pending_status_ != NOT_FOUND) { + auto status = pending_status_; + pending_status_ = NOT_FOUND; + SendHealthLocked(std::move(self), status); + } + } +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) { + GPR_ASSERT(ok); + done_notified_ = true; + if (ctx_.IsCancelled()) { + is_cancelled_ = true; + } + gpr_log(GPR_DEBUG, + "[HCS %p] Healt check call is notified done (handler: %p, " + "is_cancelled: %d).", + service_, this, static_cast<int>(is_cancelled_)); + Shutdown(std::move(self), "OnDoneNotified"); +} + +// TODO(roth): This method currently assumes that there will be only one +// thread polling the cq and invoking the corresponding callbacks. If +// that changes, we will need to add synchronization here. +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + Shutdown(std::shared_ptr<CallHandler> self, const char* reason) { + if (!shutdown_) { + gpr_log(GPR_DEBUG, + "[HCS %p] Shutting down the handler (service_name: \"%s\", " + "handler: %p, reason: %s).", + service_, service_name_.c_str(), this, reason); + shutdown_ = true; + } + // OnCallReceived() may be called after OnDoneNotified(), so we need to + // try to Finish() every time we are in Shutdown(). + if (call_state_ >= CALL_RECEIVED && call_state_ < FINISH_CALLED) { + std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); + if (!service_->shutdown_) { + on_finish_done_ = + CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, + std::placeholders::_1, std::placeholders::_2), + std::move(self)); + // TODO(juanlishen): Maybe add a message proto for the client to + // explicitly cancel the stream so that we can return OK status in such + // cases. + stream_.Finish(Status::CANCELLED, &on_finish_done_); + call_state_ = FINISH_CALLED; + } + } +} + +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: + OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { + if (ok) { + gpr_log(GPR_DEBUG, + "[HCS %p] Health check call finished (service_name: \"%s\", " + "handler: %p).", + service_, service_name_.c_str(), this); + } } } // namespace grpc diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index a1ce5aa64e..edad594936 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -19,42 +19,268 @@ #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H +#include <atomic> #include <mutex> +#include <set> +#include <grpc/support/log.h> +#include <grpcpp/grpcpp.h> #include <grpcpp/health_check_service_interface.h> +#include <grpcpp/impl/codegen/async_generic_service.h> +#include <grpcpp/impl/codegen/async_unary_call.h> #include <grpcpp/impl/codegen/service_type.h> #include <grpcpp/support/byte_buffer.h> +#include "src/core/lib/gprpp/thd.h" + namespace grpc { // Default implementation of HealthCheckServiceInterface. Server will create and // own it. class DefaultHealthCheckService final : public HealthCheckServiceInterface { public: + enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; + // The service impl to register with the server. class HealthCheckServiceImpl : public Service { public: - explicit HealthCheckServiceImpl(DefaultHealthCheckService* service); + // Base class for call handlers. + class CallHandler { + public: + virtual ~CallHandler() = default; + virtual void SendHealth(std::shared_ptr<CallHandler> self, + ServingStatus status) = 0; + }; - Status Check(ServerContext* context, const ByteBuffer* request, - ByteBuffer* response); + HealthCheckServiceImpl(DefaultHealthCheckService* database, + std::unique_ptr<ServerCompletionQueue> cq); + + ~HealthCheckServiceImpl(); + + void StartServingThread(); private: - const DefaultHealthCheckService* const service_; - internal::RpcServiceMethod* method_; + // A tag that can be called with a bool argument. It's tailored for + // CallHandler's use. Before being used, it should be constructed with a + // method of CallHandler and a shared pointer to the handler. The + // shared pointer will be moved to the invoked function and the function + // can only be invoked once. That makes ref counting of the handler easier, + // because the shared pointer is not bound to the function and can be gone + // once the invoked function returns (if not used any more). + class CallableTag { + public: + using HandlerFunction = + std::function<void(std::shared_ptr<CallHandler>, bool)>; + + CallableTag() {} + + CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler) + : handler_function_(std::move(func)), handler_(std::move(handler)) { + GPR_ASSERT(handler_function_ != nullptr); + GPR_ASSERT(handler_ != nullptr); + } + + // Runs the tag. This should be called only once. The handler is no + // longer owned by this tag after this method is invoked. + void Run(bool ok) { + GPR_ASSERT(handler_function_ != nullptr); + GPR_ASSERT(handler_ != nullptr); + handler_function_(std::move(handler_), ok); + } + + // Releases and returns the shared pointer to the handler. + std::shared_ptr<CallHandler> ReleaseHandler() { + return std::move(handler_); + } + + private: + HandlerFunction handler_function_ = nullptr; + std::shared_ptr<CallHandler> handler_; + }; + + // Call handler for Check method. + // Each handler takes care of one call. It contains per-call data and it + // will access the members of the parent class (i.e., + // DefaultHealthCheckService) for per-service health data. + class CheckCallHandler : public CallHandler { + public: + // Instantiates a CheckCallHandler and requests the next health check + // call. The handler object will manage its own lifetime, so no action is + // needed from the caller any more regarding that object. + static void CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // This ctor is public because we want to use std::make_shared<> in + // CreateAndStart(). This ctor shouldn't be used elsewhere. + CheckCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // Not used for Check. + void SendHealth(std::shared_ptr<CallHandler> self, + ServingStatus status) override {} + + private: + // Called when we receive a call. + // Spawns a new handler so that we can keep servicing future calls. + void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); + + // Called when Finish() is done. + void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); + + // The members passed down from HealthCheckServiceImpl. + ServerCompletionQueue* cq_; + DefaultHealthCheckService* database_; + HealthCheckServiceImpl* service_; + + ByteBuffer request_; + GenericServerAsyncResponseWriter writer_; + ServerContext ctx_; + + CallableTag next_; + }; + + // Call handler for Watch method. + // Each handler takes care of one call. It contains per-call data and it + // will access the members of the parent class (i.e., + // DefaultHealthCheckService) for per-service health data. + class WatchCallHandler : public CallHandler { + public: + // Instantiates a WatchCallHandler and requests the next health check + // call. The handler object will manage its own lifetime, so no action is + // needed from the caller any more regarding that object. + static void CreateAndStart(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + // This ctor is public because we want to use std::make_shared<> in + // CreateAndStart(). This ctor shouldn't be used elsewhere. + WatchCallHandler(ServerCompletionQueue* cq, + DefaultHealthCheckService* database, + HealthCheckServiceImpl* service); + + void SendHealth(std::shared_ptr<CallHandler> self, + ServingStatus status) override; + + private: + // Called when we receive a call. + // Spawns a new handler so that we can keep servicing future calls. + void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); + + // Requires holding mu_. + void SendHealthLocked(std::shared_ptr<CallHandler> self, + ServingStatus status); + + // When sending a health result finishes. + void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok); + + // Called when Finish() is done. + void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); + + // Called when AsyncNotifyWhenDone() notifies us. + void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok); + + void Shutdown(std::shared_ptr<CallHandler> self, const char* reason); + + // The members passed down from HealthCheckServiceImpl. + ServerCompletionQueue* cq_; + DefaultHealthCheckService* database_; + HealthCheckServiceImpl* service_; + + ByteBuffer request_; + grpc::string service_name_; + GenericServerAsyncWriter stream_; + ServerContext ctx_; + + std::mutex mu_; + bool send_in_flight_ = false; // Guarded by mu_. + ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. + + // The state of the RPC progress. + enum CallState { + WAITING_FOR_CALL, + CALL_RECEIVED, + SEND_MESSAGE_PENDING, + FINISH_CALLED + } call_state_; + + bool shutdown_ = false; + bool done_notified_ = false; + bool is_cancelled_ = false; + CallableTag next_; + CallableTag on_done_notified_; + CallableTag on_finish_done_; + }; + + // Handles the incoming requests and drives the completion queue in a loop. + static void Serve(void* arg); + + // Returns true on success. + static bool DecodeRequest(const ByteBuffer& request, + grpc::string* service_name); + static bool EncodeResponse(ServingStatus status, ByteBuffer* response); + + // Needed to appease Windows compilers, which don't seem to allow + // nested classes to access protected members in the parent's + // superclass. + using Service::RequestAsyncServerStreaming; + using Service::RequestAsyncUnary; + + DefaultHealthCheckService* database_; + std::unique_ptr<ServerCompletionQueue> cq_; + internal::RpcServiceMethod* check_method_; + internal::RpcServiceMethod* watch_method_; + + // To synchronize the operations related to shutdown state of cq_, so that + // we don't enqueue new tags into cq_ after it is already shut down. + std::mutex cq_shutdown_mu_; + std::atomic_bool shutdown_{false}; + std::unique_ptr<::grpc_core::Thread> thread_; }; DefaultHealthCheckService(); + void SetServingStatus(const grpc::string& service_name, bool serving) override; void SetServingStatus(bool serving) override; - enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; + ServingStatus GetServingStatus(const grpc::string& service_name) const; - HealthCheckServiceImpl* GetHealthCheckService(); + + HealthCheckServiceImpl* GetHealthCheckService( + std::unique_ptr<ServerCompletionQueue> cq); private: + // Stores the current serving status of a service and any call + // handlers registered for updates when the service's status changes. + class ServiceData { + public: + void SetServingStatus(ServingStatus status); + ServingStatus GetServingStatus() const { return status_; } + void AddCallHandler( + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + void RemoveCallHandler( + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + bool Unused() const { + return call_handlers_.empty() && status_ == NOT_FOUND; + } + + private: + ServingStatus status_ = NOT_FOUND; + std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>> + call_handlers_; + }; + + void RegisterCallHandler( + const grpc::string& service_name, + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + + void UnregisterCallHandler( + const grpc::string& service_name, + std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + mutable std::mutex mu_; - std::map<grpc::string, bool> services_map_; + std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_. std::unique_ptr<HealthCheckServiceImpl> impl_; }; diff --git a/src/cpp/server/health/health.pb.c b/src/cpp/server/health/health.pb.c index 09bd98a3d9..5c214c7160 100644 --- a/src/cpp/server/health/health.pb.c +++ b/src/cpp/server/health/health.pb.c @@ -2,7 +2,6 @@ /* Generated by nanopb-0.3.7-dev */ #include "src/cpp/server/health/health.pb.h" - /* @@protoc_insertion_point(includes) */ #if PB_PROTO_HEADER_VERSION != 30 #error Regenerate this file with the current version of nanopb generator. diff --git a/src/cpp/server/health/health.pb.h b/src/cpp/server/health/health.pb.h index 29e1f3bacb..9d54ccd618 100644 --- a/src/cpp/server/health/health.pb.h +++ b/src/cpp/server/health/health.pb.h @@ -17,11 +17,12 @@ extern "C" { typedef enum _grpc_health_v1_HealthCheckResponse_ServingStatus { grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN = 0, grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING = 1, - grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2 + grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2, + grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN = 3 } grpc_health_v1_HealthCheckResponse_ServingStatus; #define _grpc_health_v1_HealthCheckResponse_ServingStatus_MIN grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN -#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING -#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING+1)) +#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN +#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN+1)) /* Struct definitions */ typedef struct _grpc_health_v1_HealthCheckRequest { diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index e0b9b7a62b..8417c45e64 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -263,7 +263,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { std::unique_ptr<Server> server(new Server( max_receive_message_size_, &args, sync_server_cqs, sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, - sync_server_settings_.cq_timeout_msec)); + sync_server_settings_.cq_timeout_msec, resource_quota_)); if (has_sync_methods) { // This is a Sync server diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 0d77510e29..3cadf65c80 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -47,6 +47,13 @@ namespace grpc { namespace { +// The default value for maximum number of threads that can be created in the +// sync server. This value of INT_MAX is chosen to match the default behavior if +// no ResourceQuota is set. To modify the max number of threads in a sync +// server, pass a custom ResourceQuota object (with the desired number of +// max-threads set) to the server builder. +#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX + class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: ~DefaultGlobalCallbacks() override {} @@ -204,8 +211,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { call_(mrd->call_, server, &cq_, server->max_receive_message_size()), ctx_(mrd->deadline_, &mrd->request_metadata_), has_request_payload_(mrd->has_request_payload_), - request_payload_(mrd->request_payload_), - method_(mrd->method_) { + request_payload_(has_request_payload_ ? mrd->request_payload_ + : nullptr), + method_(mrd->method_), + server_(server) { ctx_.set_call(mrd->call_); ctx_.cq_ = &cq_; GPR_ASSERT(mrd->in_flight_); @@ -219,10 +228,13 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { } } - void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks) { + void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks, + bool resources) { ctx_.BeginCompletionOp(&call_); global_callbacks->PreSynchronousRequest(&ctx_); - method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter( + auto* handler = resources ? method_->handler() + : server_->resource_exhausted_handler_.get(); + handler->RunHandler(internal::MethodHandler::HandlerParameter( &call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; @@ -244,6 +256,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { const bool has_request_payload_; grpc_byte_buffer* request_payload_; internal::RpcServiceMethod* const method_; + Server* server_; }; private: @@ -266,9 +279,9 @@ class Server::SyncRequestThreadManager : public ThreadManager { public: SyncRequestThreadManager(Server* server, CompletionQueue* server_cq, std::shared_ptr<GlobalCallbacks> global_callbacks, - int min_pollers, int max_pollers, - int cq_timeout_msec) - : ThreadManager(min_pollers, max_pollers), + grpc_resource_quota* rq, int min_pollers, + int max_pollers, int cq_timeout_msec) + : ThreadManager("SyncServer", rq, min_pollers, max_pollers), server_(server), server_cq_(server_cq), cq_timeout_msec_(cq_timeout_msec), @@ -294,7 +307,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { GPR_UNREACHABLE_CODE(return TIMEOUT); } - void DoWork(void* tag, bool ok) override { + void DoWork(void* tag, bool ok, bool resources) override { SyncRequest* sync_req = static_cast<SyncRequest*>(tag); if (!sync_req) { @@ -314,7 +327,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { } GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(global_callbacks_); + cd.Run(global_callbacks_, resources); } // TODO (sreek) If ok is false here (which it isn't in case of // grpc_request_registered_call), we should still re-queue the request @@ -376,7 +389,8 @@ Server::Server( int max_receive_message_size, ChannelArguments* args, std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> sync_server_cqs, - int min_pollers, int max_pollers, int sync_cq_timeout_msec) + int min_pollers, int max_pollers, int sync_cq_timeout_msec, + grpc_resource_quota* server_rq) : max_receive_message_size_(max_receive_message_size), sync_server_cqs_(std::move(sync_server_cqs)), started_(false), @@ -392,10 +406,22 @@ Server::Server( global_callbacks_->UpdateArguments(args); if (sync_server_cqs_ != nullptr) { + bool default_rq_created = false; + if (server_rq == nullptr) { + server_rq = grpc_resource_quota_create("SyncServer-default-rq"); + grpc_resource_quota_set_max_threads(server_rq, + DEFAULT_MAX_SYNC_SERVER_THREADS); + default_rq_created = true; + } + for (const auto& it : *sync_server_cqs_) { sync_req_mgrs_.emplace_back(new SyncRequestThreadManager( - this, it.get(), global_callbacks_, min_pollers, max_pollers, - sync_cq_timeout_msec)); + this, it.get(), global_callbacks_, server_rq, min_pollers, + max_pollers, sync_cq_timeout_msec)); + } + + if (default_rq_created) { + grpc_resource_quota_unref(server_rq); } } @@ -533,16 +559,20 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { // Only create default health check service when user did not provide an // explicit one. + ServerCompletionQueue* health_check_cq = nullptr; + DefaultHealthCheckService::HealthCheckServiceImpl* + default_health_check_service_impl = nullptr; if (health_check_service_ == nullptr && !health_check_service_disabled_ && DefaultHealthCheckServiceEnabled()) { - if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) { - gpr_log(GPR_INFO, - "Default health check service disabled at async-only server."); - } else { - auto* default_hc_service = new DefaultHealthCheckService; - health_check_service_.reset(default_hc_service); - RegisterService(nullptr, default_hc_service->GetHealthCheckService()); - } + auto* default_hc_service = new DefaultHealthCheckService; + health_check_service_.reset(default_hc_service); + health_check_cq = new ServerCompletionQueue(GRPC_CQ_DEFAULT_POLLING); + grpc_server_register_completion_queue(server_, health_check_cq->cq(), + nullptr); + default_health_check_service_impl = + default_hc_service->GetHealthCheckService( + std::unique_ptr<ServerCompletionQueue>(health_check_cq)); + RegisterService(nullptr, default_health_check_service_impl); } grpc_server_start(server_); @@ -557,11 +587,25 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { new UnimplementedAsyncRequest(this, cqs[i]); } } + if (health_check_cq != nullptr) { + new UnimplementedAsyncRequest(this, health_check_cq); + } + } + + // If this server has any support for synchronous methods (has any sync + // server CQs), make sure that we have a ResourceExhausted handler + // to deal with the case of thread exhaustion + if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) { + resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler); } for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { (*it)->Start(); } + + if (default_health_check_service_impl != nullptr) { + default_health_check_service_impl->StartServingThread(); + } } void Server::ShutdownInternal(gpr_timespec deadline) { diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index 02ac56a3fd..3e8606a76f 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -22,8 +22,8 @@ #include <mutex> #include <grpc/support/log.h> - #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/iomgr/exec_ctx.h" namespace grpc { @@ -48,12 +48,17 @@ ThreadManager::WorkerThread::~WorkerThread() { thd_.Join(); } -ThreadManager::ThreadManager(int min_pollers, int max_pollers) +ThreadManager::ThreadManager(const char* name, + grpc_resource_quota* resource_quota, + int min_pollers, int max_pollers) : shutdown_(false), num_pollers_(0), min_pollers_(min_pollers), max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers), - num_threads_(0) {} + num_threads_(0), + max_active_threads_sofar_(0) { + resource_user_ = grpc_resource_user_create(resource_quota, name); +} ThreadManager::~ThreadManager() { { @@ -61,6 +66,8 @@ ThreadManager::~ThreadManager() { GPR_ASSERT(num_threads_ == 0); } + grpc_core::ExecCtx exec_ctx; // grpc_resource_user_unref needs an exec_ctx + grpc_resource_user_unref(resource_user_); CleanupCompletedThreads(); } @@ -81,17 +88,27 @@ bool ThreadManager::IsShutdown() { return shutdown_; } +int ThreadManager::GetMaxActiveThreadsSoFar() { + std::lock_guard<std::mutex> list_lock(list_mu_); + return max_active_threads_sofar_; +} + void ThreadManager::MarkAsCompleted(WorkerThread* thd) { { std::lock_guard<std::mutex> list_lock(list_mu_); completed_threads_.push_back(thd); } - std::lock_guard<std::mutex> lock(mu_); - num_threads_--; - if (num_threads_ == 0) { - shutdown_cv_.notify_one(); + { + std::lock_guard<std::mutex> lock(mu_); + num_threads_--; + if (num_threads_ == 0) { + shutdown_cv_.notify_one(); + } } + + // Give a thread back to the resource quota + grpc_resource_user_free_threads(resource_user_, 1); } void ThreadManager::CleanupCompletedThreads() { @@ -106,14 +123,22 @@ void ThreadManager::CleanupCompletedThreads() { } void ThreadManager::Initialize() { + if (!grpc_resource_user_allocate_threads(resource_user_, min_pollers_)) { + gpr_log(GPR_ERROR, + "No thread quota available to even create the minimum required " + "polling threads (i.e %d). Unable to start the thread manager", + min_pollers_); + abort(); + } + { std::unique_lock<std::mutex> lock(mu_); num_pollers_ = min_pollers_; num_threads_ = min_pollers_; + max_active_threads_sofar_ = min_pollers_; } for (int i = 0; i < min_pollers_; i++) { - // Create a new thread (which ends up calling the MainWorkLoop() function new WorkerThread(this); } } @@ -139,20 +164,40 @@ void ThreadManager::MainWorkLoop() { done = true; break; case WORK_FOUND: - // If we got work and there are now insufficient pollers, start a new - // one + // If we got work and there are now insufficient pollers and there is + // quota available to create a new thread, start a new poller thread + bool resource_exhausted = false; if (!shutdown_ && num_pollers_ < min_pollers_) { - num_pollers_++; - num_threads_++; - // Drop lock before spawning thread to avoid contention - lock.unlock(); - new WorkerThread(this); + if (grpc_resource_user_allocate_threads(resource_user_, 1)) { + // We can allocate a new poller thread + num_pollers_++; + num_threads_++; + if (num_threads_ > max_active_threads_sofar_) { + max_active_threads_sofar_ = num_threads_; + } + // Drop lock before spawning thread to avoid contention + lock.unlock(); + new WorkerThread(this); + } else if (num_pollers_ > 0) { + // There is still at least some thread polling, so we can go on + // even though we are below the number of pollers that we would + // like to have (min_pollers_) + lock.unlock(); + } else { + // There are no pollers to spare and we couldn't allocate + // a new thread, so resources are exhausted! + lock.unlock(); + resource_exhausted = true; + } } else { - // Drop lock for consistency with above branch + // There are a sufficient number of pollers available so we can do + // the work and continue polling with our existing poller threads lock.unlock(); } // Lock is always released at this point - do the application work - DoWork(tag, ok); + // or return resource exhausted if there is new work but we couldn't + // get a thread in which to do it. + DoWork(tag, ok, !resource_exhausted); // Take the lock again to check post conditions lock.lock(); // If we're shutdown, we should finish at this point. @@ -196,6 +241,8 @@ void ThreadManager::MainWorkLoop() { } }; + // This thread is exiting. Do some cleanup work i.e delete already completed + // worker threads CleanupCompletedThreads(); // If we are here, either ThreadManager is shutting down or it already has diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index 5a40f2de47..6f0bd17c5f 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -27,12 +27,14 @@ #include <grpcpp/support/config.h> #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/iomgr/resource_quota.h" namespace grpc { class ThreadManager { public: - explicit ThreadManager(int min_pollers, int max_pollers); + explicit ThreadManager(const char* name, grpc_resource_quota* resource_quota, + int min_pollers, int max_pollers); virtual ~ThreadManager(); // Initializes and Starts the Rpc Manager threads @@ -65,12 +67,14 @@ class ThreadManager { // The implementation of DoWork() is supposed to perform the work found by // PollForWork(). The tag and ok parameters are the same as returned by - // PollForWork() + // PollForWork(). The resources parameter indicates that the call actually + // has the resources available for performing the RPC's work. If it doesn't, + // the implementation should fail it appropriately. // // The implementation of DoWork() should also do any setup needed to ensure // that the next call to PollForWork() (not necessarily by the current thread) // actually finds some work - virtual void DoWork(void* tag, bool ok) = 0; + virtual void DoWork(void* tag, bool ok, bool resources) = 0; // Mark the ThreadManager as shutdown and begin draining the work. This is a // non-blocking call and the caller should call Wait(), a blocking call which @@ -84,6 +88,11 @@ class ThreadManager { // all the threads have drained all the outstanding work virtual void Wait(); + // Max number of concurrent threads that were ever active in this thread + // manager so far. This is useful for debugging purposes (and in unit tests) + // to check if resource_quota is properly being enforced. + int GetMaxActiveThreadsSoFar(); + private: // Helper wrapper class around grpc_core::Thread. Takes a ThreadManager object // and starts a new grpc_core::Thread to calls the Run() function. @@ -91,6 +100,24 @@ class ThreadManager { // The Run() function calls ThreadManager::MainWorkLoop() function and once // that completes, it marks the WorkerThread completed by calling // ThreadManager::MarkAsCompleted() + // + // WHY IS THIS NEEDED?: + // When a thread terminates, some other thread *must* call Join() on that + // thread so that the resources are released. Having a WorkerThread wrapper + // will make this easier. Once Run() completes, each thread calls the + // following two functions: + // ThreadManager::CleanupCompletedThreads() + // ThreadManager::MarkAsCompleted() + // + // - MarkAsCompleted() puts the WorkerThread object in the ThreadManger's + // completed_threads_ list + // - CleanupCompletedThreads() calls "Join()" on the threads that are already + // in the completed_threads_ list (since a thread cannot call Join() on + // itself, it calls CleanupCompletedThreads() *before* calling + // MarkAsCompleted()) + // + // TODO(sreek): Consider creating the threads 'detached' so that Join() need + // not be called (and the need for this WorkerThread class is eliminated) class WorkerThread { public: WorkerThread(ThreadManager* thd_mgr); @@ -111,13 +138,21 @@ class ThreadManager { void MarkAsCompleted(WorkerThread* thd); void CleanupCompletedThreads(); - // Protects shutdown_, num_pollers_ and num_threads_ - // TODO: sreek - Change num_pollers and num_threads_ to atomics + // Protects shutdown_, num_pollers_, num_threads_ and + // max_active_threads_sofar_ std::mutex mu_; bool shutdown_; std::condition_variable shutdown_cv_; + // The resource user object to use when requesting quota to create threads + // + // Note: The user of this ThreadManager object must create grpc_resource_quota + // object (that contains the actual max thread quota) and a grpc_resource_user + // object through which quota is requested whenver new threads need to be + // created + grpc_resource_user* resource_user_; + // Number of threads doing polling int num_pollers_; @@ -125,10 +160,15 @@ class ThreadManager { int min_pollers_; int max_pollers_; - // The total number of threads (includes threads includes the threads that are - // currently polling i.e num_pollers_) + // The total number of threads currently active (includes threads includes the + // threads that are currently polling i.e num_pollers_) int num_threads_; + // See GetMaxActiveThreadsSoFar()'s description. + // To be more specific, this variable tracks the max value num_threads_ was + // ever set so far + int max_active_threads_sofar_; + std::mutex list_mu_; std::list<WorkerThread*> completed_threads_; }; |