diff options
author | Yash Tibrewal <yashkt@google.com> | 2018-10-25 19:50:10 -0700 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2018-10-25 19:50:10 -0700 |
commit | 3896dabb85bd4f7314fbe9f850c4af4294157019 (patch) | |
tree | 73155467b35d4b969b2360656cbc718ca3d79e2d /src/cpp/server | |
parent | a094b7b3127ffcb607e11486a64fc905e92a2565 (diff) | |
parent | 111f47437638ce0c00144199b08272b35a99acd7 (diff) |
Merge branch 'master' into interceptors
Diffstat (limited to 'src/cpp/server')
-rw-r--r-- | src/cpp/server/channelz/channelz_service.cc | 67 | ||||
-rw-r--r-- | src/cpp/server/health/default_health_check_service.cc | 38 | ||||
-rw-r--r-- | src/cpp/server/health/default_health_check_service.h | 4 | ||||
-rw-r--r-- | src/cpp/server/health/health.pb.c | 23 | ||||
-rw-r--r-- | src/cpp/server/health/health.pb.h | 73 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 37 |
6 files changed, 86 insertions, 156 deletions
diff --git a/src/cpp/server/channelz/channelz_service.cc b/src/cpp/server/channelz/channelz_service.cc index 79ed9102e5..428893f277 100644 --- a/src/cpp/server/channelz/channelz_service.cc +++ b/src/cpp/server/channelz/channelz_service.cc @@ -20,9 +20,6 @@ #include "src/cpp/server/channelz/channelz_service.h" -#include <google/protobuf/text_format.h> -#include <google/protobuf/util/json_util.h> - #include <grpc/grpc.h> #include <grpc/support/alloc.h> @@ -33,13 +30,14 @@ Status ChannelzService::GetTopChannels( channelz::v1::GetTopChannelsResponse* response) { char* json_str = grpc_channelz_get_top_channels(request->start_channel_id()); if (json_str == nullptr) { - return Status(INTERNAL, "grpc_channelz_get_top_channels returned null"); + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_top_channels returned null"); } - google::protobuf::util::Status s = - google::protobuf::util::JsonStringToMessage(json_str, response); + grpc::protobuf::util::Status s = + grpc::protobuf::json::JsonStringToMessage(json_str, response); gpr_free(json_str); - if (s != google::protobuf::util::Status::OK) { - return Status(INTERNAL, s.ToString()); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); } return Status::OK; } @@ -49,13 +47,14 @@ Status ChannelzService::GetServers( channelz::v1::GetServersResponse* response) { char* json_str = grpc_channelz_get_servers(request->start_server_id()); if (json_str == nullptr) { - return Status(INTERNAL, "grpc_channelz_get_servers returned null"); + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_servers returned null"); } - google::protobuf::util::Status s = - google::protobuf::util::JsonStringToMessage(json_str, response); + grpc::protobuf::util::Status s = + grpc::protobuf::json::JsonStringToMessage(json_str, response); gpr_free(json_str); - if (s != google::protobuf::util::Status::OK) { - return Status(INTERNAL, s.ToString()); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); } return Status::OK; } @@ -66,13 +65,14 @@ Status ChannelzService::GetServerSockets( char* json_str = grpc_channelz_get_server_sockets(request->server_id(), request->start_socket_id()); if (json_str == nullptr) { - return Status(INTERNAL, "grpc_channelz_get_server_sockets returned null"); + return Status(StatusCode::INTERNAL, + "grpc_channelz_get_server_sockets returned null"); } - google::protobuf::util::Status s = - google::protobuf::util::JsonStringToMessage(json_str, response); + grpc::protobuf::util::Status s = + grpc::protobuf::json::JsonStringToMessage(json_str, response); gpr_free(json_str); - if (s != google::protobuf::util::Status::OK) { - return Status(INTERNAL, s.ToString()); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); } return Status::OK; } @@ -82,13 +82,13 @@ Status ChannelzService::GetChannel( channelz::v1::GetChannelResponse* response) { char* json_str = grpc_channelz_get_channel(request->channel_id()); if (json_str == nullptr) { - return Status(NOT_FOUND, "No object found for that ChannelId"); + return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId"); } - google::protobuf::util::Status s = - google::protobuf::util::JsonStringToMessage(json_str, response); + grpc::protobuf::util::Status s = + grpc::protobuf::json::JsonStringToMessage(json_str, response); gpr_free(json_str); - if (s != google::protobuf::util::Status::OK) { - return Status(INTERNAL, s.ToString()); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); } return Status::OK; } @@ -98,13 +98,14 @@ Status ChannelzService::GetSubchannel( channelz::v1::GetSubchannelResponse* response) { char* json_str = grpc_channelz_get_subchannel(request->subchannel_id()); if (json_str == nullptr) { - return Status(NOT_FOUND, "No object found for that SubchannelId"); + return Status(StatusCode::NOT_FOUND, + "No object found for that SubchannelId"); } - google::protobuf::util::Status s = - google::protobuf::util::JsonStringToMessage(json_str, response); + grpc::protobuf::util::Status s = + grpc::protobuf::json::JsonStringToMessage(json_str, response); gpr_free(json_str); - if (s != google::protobuf::util::Status::OK) { - return Status(INTERNAL, s.ToString()); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); } return Status::OK; } @@ -114,13 +115,13 @@ Status ChannelzService::GetSocket(ServerContext* unused, channelz::v1::GetSocketResponse* response) { char* json_str = grpc_channelz_get_socket(request->socket_id()); if (json_str == nullptr) { - return Status(NOT_FOUND, "No object found for that SocketId"); + return Status(StatusCode::NOT_FOUND, "No object found for that SocketId"); } - google::protobuf::util::Status s = - google::protobuf::util::JsonStringToMessage(json_str, response); + grpc::protobuf::util::Status s = + grpc::protobuf::json::JsonStringToMessage(json_str, response); gpr_free(json_str); - if (s != google::protobuf::util::Status::OK) { - return Status(INTERNAL, s.ToString()); + if (!s.ok()) { + return Status(StatusCode::INTERNAL, s.ToString()); } return Status::OK; } diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index 0c03fdf17a..c951c69d51 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -26,8 +26,8 @@ #include "pb_decode.h" #include "pb_encode.h" +#include "src/core/ext/filters/client_channel/health/health.pb.h" #include "src/cpp/server/health/default_health_check_service.h" -#include "src/cpp/server/health/health.pb.h" namespace grpc { @@ -78,12 +78,12 @@ void DefaultHealthCheckService::RegisterCallHandler( void DefaultHealthCheckService::UnregisterCallHandler( const grpc::string& service_name, - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { + const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { std::unique_lock<std::mutex> lock(mu_); auto it = services_map_.find(service_name); if (it == services_map_.end()) return; ServiceData& service_data = it->second; - service_data.RemoveCallHandler(std::move(handler)); + service_data.RemoveCallHandler(handler); if (service_data.Unused()) { services_map_.erase(it); } @@ -115,7 +115,7 @@ void DefaultHealthCheckService::ServiceData::AddCallHandler( } void DefaultHealthCheckService::ServiceData::RemoveCallHandler( - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { + const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { call_handlers_.erase(handler); } @@ -184,16 +184,13 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( std::vector<Slice> slices; if (!request.Dump(&slices).ok()) return false; uint8_t* request_bytes = nullptr; - bool request_bytes_owned = false; size_t request_size = 0; grpc_health_v1_HealthCheckRequest request_struct; - if (slices.empty()) { - request_struct.has_service = false; - } else if (slices.size() == 1) { + request_struct.has_service = false; + if (slices.size() == 1) { request_bytes = const_cast<uint8_t*>(slices[0].begin()); request_size = slices[0].size(); - } else { - request_bytes_owned = true; + } else if (slices.size() > 1) { request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length())); uint8_t* copy_to = request_bytes; for (size_t i = 0; i < slices.size(); i++) { @@ -201,15 +198,13 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest( copy_to += slices[i].size(); } } - if (request_bytes != nullptr) { - pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size); - bool decode_status = pb_decode( - &istream, grpc_health_v1_HealthCheckRequest_fields, &request_struct); - if (request_bytes_owned) { - gpr_free(request_bytes); - } - if (!decode_status) return false; + pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size); + bool decode_status = pb_decode( + &istream, grpc_health_v1_HealthCheckRequest_fields, &request_struct); + if (slices.size() > 1) { + gpr_free(request_bytes); } + if (!decode_status) return false; *service_name = request_struct.has_service ? request_struct.service : ""; return true; } @@ -318,6 +313,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p", service_, this); } + self.reset(); // To appease clang-tidy. } // @@ -442,7 +438,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendFinish(std::shared_ptr<CallHandler> self, const Status& status) { if (finish_called_) return; std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_); - if (!service_->shutdown_) return; + if (service_->shutdown_) return; SendFinishLocked(std::move(self), status); } @@ -464,6 +460,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: "handler: %p).", service_, service_name_.c_str(), this); } + self.reset(); // To appease clang-tidy. } // TODO(roth): This method currently assumes that there will be only one @@ -473,9 +470,10 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) { GPR_ASSERT(ok); gpr_log(GPR_DEBUG, - "[HCS %p] Healt watch call is notified done (handler: %p, " + "[HCS %p] Health watch call is notified done (handler: %p, " "is_cancelled: %d).", service_, this, static_cast<int>(ctx_.IsCancelled())); + database_->UnregisterCallHandler(service_name_, self); SendFinish(std::move(self), Status::CANCELLED); } diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index 3bab76b6b0..450bd543f5 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -252,7 +252,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { void AddCallHandler( std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); void RemoveCallHandler( - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); bool Unused() const { return call_handlers_.empty() && status_ == NOT_FOUND; } @@ -269,7 +269,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { void UnregisterCallHandler( const grpc::string& service_name, - std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); + const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); mutable std::mutex mu_; std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_. diff --git a/src/cpp/server/health/health.pb.c b/src/cpp/server/health/health.pb.c deleted file mode 100644 index 5c214c7160..0000000000 --- a/src/cpp/server/health/health.pb.c +++ /dev/null @@ -1,23 +0,0 @@ -/* Automatically generated nanopb constant definitions */ -/* Generated by nanopb-0.3.7-dev */ - -#include "src/cpp/server/health/health.pb.h" -/* @@protoc_insertion_point(includes) */ -#if PB_PROTO_HEADER_VERSION != 30 -#error Regenerate this file with the current version of nanopb generator. -#endif - - - -const pb_field_t grpc_health_v1_HealthCheckRequest_fields[2] = { - PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_health_v1_HealthCheckRequest, service, service, 0), - PB_LAST_FIELD -}; - -const pb_field_t grpc_health_v1_HealthCheckResponse_fields[2] = { - PB_FIELD( 1, UENUM , OPTIONAL, STATIC , FIRST, grpc_health_v1_HealthCheckResponse, status, status, 0), - PB_LAST_FIELD -}; - - -/* @@protoc_insertion_point(eof) */ diff --git a/src/cpp/server/health/health.pb.h b/src/cpp/server/health/health.pb.h deleted file mode 100644 index 9d54ccd618..0000000000 --- a/src/cpp/server/health/health.pb.h +++ /dev/null @@ -1,73 +0,0 @@ -/* Automatically generated nanopb header */ -/* Generated by nanopb-0.3.7-dev */ - -#ifndef PB_GRPC_HEALTH_V1_HEALTH_PB_H_INCLUDED -#define PB_GRPC_HEALTH_V1_HEALTH_PB_H_INCLUDED -#include "pb.h" -/* @@protoc_insertion_point(includes) */ -#if PB_PROTO_HEADER_VERSION != 30 -#error Regenerate this file with the current version of nanopb generator. -#endif - -#ifdef __cplusplus -extern "C" { -#endif - -/* Enum definitions */ -typedef enum _grpc_health_v1_HealthCheckResponse_ServingStatus { - grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN = 0, - grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING = 1, - grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2, - grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN = 3 -} grpc_health_v1_HealthCheckResponse_ServingStatus; -#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MIN grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN -#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN -#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN+1)) - -/* Struct definitions */ -typedef struct _grpc_health_v1_HealthCheckRequest { - bool has_service; - char service[200]; -/* @@protoc_insertion_point(struct:grpc_health_v1_HealthCheckRequest) */ -} grpc_health_v1_HealthCheckRequest; - -typedef struct _grpc_health_v1_HealthCheckResponse { - bool has_status; - grpc_health_v1_HealthCheckResponse_ServingStatus status; -/* @@protoc_insertion_point(struct:grpc_health_v1_HealthCheckResponse) */ -} grpc_health_v1_HealthCheckResponse; - -/* Default values for struct fields */ - -/* Initializer values for message structs */ -#define grpc_health_v1_HealthCheckRequest_init_default {false, ""} -#define grpc_health_v1_HealthCheckResponse_init_default {false, (grpc_health_v1_HealthCheckResponse_ServingStatus)0} -#define grpc_health_v1_HealthCheckRequest_init_zero {false, ""} -#define grpc_health_v1_HealthCheckResponse_init_zero {false, (grpc_health_v1_HealthCheckResponse_ServingStatus)0} - -/* Field tags (for use in manual encoding/decoding) */ -#define grpc_health_v1_HealthCheckRequest_service_tag 1 -#define grpc_health_v1_HealthCheckResponse_status_tag 1 - -/* Struct field encoding specification for nanopb */ -extern const pb_field_t grpc_health_v1_HealthCheckRequest_fields[2]; -extern const pb_field_t grpc_health_v1_HealthCheckResponse_fields[2]; - -/* Maximum encoded size of messages (where known) */ -#define grpc_health_v1_HealthCheckRequest_size 203 -#define grpc_health_v1_HealthCheckResponse_size 2 - -/* Message IDs (where set with "msgid" option) */ -#ifdef PB_MSGID - -#define HEALTH_MESSAGES \ - - -#endif - -#ifdef __cplusplus -} /* extern "C" */ -#endif -/* @@protoc_insertion_point(eof) */ - -#endif diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 44ee0846b6..995e787785 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -40,8 +40,10 @@ namespace grpc { class ServerContext::CompletionOp final : public internal::CallOpSetInterface { public: // initial refs: one in the server context, one in the cq - CompletionOp() - : has_tag_(false), + // must ref the call before calling constructor and after deleting this + CompletionOp(internal::Call* call) + : call_(*call), + has_tag_(false), tag_(nullptr), refs_(2), finalized_(false), @@ -55,6 +57,21 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { } void FillOps(internal::Call* call) override; + + // This should always be arena allocated in the call, so override delete. + // But this class is not trivially destructible, so must actually call delete + // before allowing the arena to be freed + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(CompletionOp)); + } + + // This operator should never be called as the memory should be freed as part + // of the arena destruction. It only exists to provide a matching operator + // delete to the operator new so that some compilers will not complain (see + // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this + // there are no tests catching the compiler warning. + static void operator delete(void*, void*) { assert(0); } + bool FinalizeResult(void** tag, bool* status) override; bool CheckCancelled(CompletionQueue* cq) { @@ -92,7 +109,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { std::unique_lock<std::mutex> lock(mu_); if (--refs_ == 0) { lock.unlock(); + grpc_call* call = call_.call(); delete this; + grpc_call_unref(call); } return; } @@ -108,6 +127,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { return finalized_ ? (cancelled_ != 0) : false; } + internal::Call call_; bool has_tag_; void* tag_; std::mutex mu_; @@ -115,7 +135,6 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { bool finalized_; int cancelled_; bool done_intercepting_; - internal::Call call_; internal::InterceptorBatchMethodsImpl interceptor_methods_; }; @@ -123,7 +142,9 @@ void ServerContext::CompletionOp::Unref() { std::unique_lock<std::mutex> lock(mu_); if (--refs_ == 0) { lock.unlock(); + grpc_call* call = call_.call(); delete this; + grpc_call_unref(call); } } @@ -133,7 +154,6 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) { ops.data.recv_close_on_server.cancelled = &cancelled_; ops.flags = 0; ops.reserved = nullptr; - call_ = *call; interceptor_methods_.SetCall(&call_); interceptor_methods_.SetReverse(); interceptor_methods_.SetCallOpSetInterface(this); @@ -153,7 +173,9 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { } if (--refs_ == 0) { lock.unlock(); + grpc_call* call = call_.call(); delete this; + grpc_call_unref(call); } return ret; } @@ -175,7 +197,9 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { lock.lock(); if (--refs_ == 0) { lock.unlock(); + grpc_call* call = call_.call(); delete this; + grpc_call_unref(call); } return ret; } @@ -226,7 +250,10 @@ void ServerContext::BeginCompletionOp(internal::Call* call) { if (rpc_info_) { rpc_info_->Ref(); } - completion_op_ = new CompletionOp(); + grpc_call_ref(call->call()); + completion_op_ = + new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) + CompletionOp(call); if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } |