aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-10-25 19:50:10 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-10-25 19:50:10 -0700
commit3896dabb85bd4f7314fbe9f850c4af4294157019 (patch)
tree73155467b35d4b969b2360656cbc718ca3d79e2d /src/cpp/server
parenta094b7b3127ffcb607e11486a64fc905e92a2565 (diff)
parent111f47437638ce0c00144199b08272b35a99acd7 (diff)
Merge branch 'master' into interceptors
Diffstat (limited to 'src/cpp/server')
-rw-r--r--src/cpp/server/channelz/channelz_service.cc67
-rw-r--r--src/cpp/server/health/default_health_check_service.cc38
-rw-r--r--src/cpp/server/health/default_health_check_service.h4
-rw-r--r--src/cpp/server/health/health.pb.c23
-rw-r--r--src/cpp/server/health/health.pb.h73
-rw-r--r--src/cpp/server/server_context.cc37
6 files changed, 86 insertions, 156 deletions
diff --git a/src/cpp/server/channelz/channelz_service.cc b/src/cpp/server/channelz/channelz_service.cc
index 79ed9102e5..428893f277 100644
--- a/src/cpp/server/channelz/channelz_service.cc
+++ b/src/cpp/server/channelz/channelz_service.cc
@@ -20,9 +20,6 @@
#include "src/cpp/server/channelz/channelz_service.h"
-#include <google/protobuf/text_format.h>
-#include <google/protobuf/util/json_util.h>
-
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
@@ -33,13 +30,14 @@ Status ChannelzService::GetTopChannels(
channelz::v1::GetTopChannelsResponse* response) {
char* json_str = grpc_channelz_get_top_channels(request->start_channel_id());
if (json_str == nullptr) {
- return Status(INTERNAL, "grpc_channelz_get_top_channels returned null");
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_top_channels returned null");
}
- google::protobuf::util::Status s =
- google::protobuf::util::JsonStringToMessage(json_str, response);
+ grpc::protobuf::util::Status s =
+ grpc::protobuf::json::JsonStringToMessage(json_str, response);
gpr_free(json_str);
- if (s != google::protobuf::util::Status::OK) {
- return Status(INTERNAL, s.ToString());
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
}
return Status::OK;
}
@@ -49,13 +47,14 @@ Status ChannelzService::GetServers(
channelz::v1::GetServersResponse* response) {
char* json_str = grpc_channelz_get_servers(request->start_server_id());
if (json_str == nullptr) {
- return Status(INTERNAL, "grpc_channelz_get_servers returned null");
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_servers returned null");
}
- google::protobuf::util::Status s =
- google::protobuf::util::JsonStringToMessage(json_str, response);
+ grpc::protobuf::util::Status s =
+ grpc::protobuf::json::JsonStringToMessage(json_str, response);
gpr_free(json_str);
- if (s != google::protobuf::util::Status::OK) {
- return Status(INTERNAL, s.ToString());
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
}
return Status::OK;
}
@@ -66,13 +65,14 @@ Status ChannelzService::GetServerSockets(
char* json_str = grpc_channelz_get_server_sockets(request->server_id(),
request->start_socket_id());
if (json_str == nullptr) {
- return Status(INTERNAL, "grpc_channelz_get_server_sockets returned null");
+ return Status(StatusCode::INTERNAL,
+ "grpc_channelz_get_server_sockets returned null");
}
- google::protobuf::util::Status s =
- google::protobuf::util::JsonStringToMessage(json_str, response);
+ grpc::protobuf::util::Status s =
+ grpc::protobuf::json::JsonStringToMessage(json_str, response);
gpr_free(json_str);
- if (s != google::protobuf::util::Status::OK) {
- return Status(INTERNAL, s.ToString());
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
}
return Status::OK;
}
@@ -82,13 +82,13 @@ Status ChannelzService::GetChannel(
channelz::v1::GetChannelResponse* response) {
char* json_str = grpc_channelz_get_channel(request->channel_id());
if (json_str == nullptr) {
- return Status(NOT_FOUND, "No object found for that ChannelId");
+ return Status(StatusCode::NOT_FOUND, "No object found for that ChannelId");
}
- google::protobuf::util::Status s =
- google::protobuf::util::JsonStringToMessage(json_str, response);
+ grpc::protobuf::util::Status s =
+ grpc::protobuf::json::JsonStringToMessage(json_str, response);
gpr_free(json_str);
- if (s != google::protobuf::util::Status::OK) {
- return Status(INTERNAL, s.ToString());
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
}
return Status::OK;
}
@@ -98,13 +98,14 @@ Status ChannelzService::GetSubchannel(
channelz::v1::GetSubchannelResponse* response) {
char* json_str = grpc_channelz_get_subchannel(request->subchannel_id());
if (json_str == nullptr) {
- return Status(NOT_FOUND, "No object found for that SubchannelId");
+ return Status(StatusCode::NOT_FOUND,
+ "No object found for that SubchannelId");
}
- google::protobuf::util::Status s =
- google::protobuf::util::JsonStringToMessage(json_str, response);
+ grpc::protobuf::util::Status s =
+ grpc::protobuf::json::JsonStringToMessage(json_str, response);
gpr_free(json_str);
- if (s != google::protobuf::util::Status::OK) {
- return Status(INTERNAL, s.ToString());
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
}
return Status::OK;
}
@@ -114,13 +115,13 @@ Status ChannelzService::GetSocket(ServerContext* unused,
channelz::v1::GetSocketResponse* response) {
char* json_str = grpc_channelz_get_socket(request->socket_id());
if (json_str == nullptr) {
- return Status(NOT_FOUND, "No object found for that SocketId");
+ return Status(StatusCode::NOT_FOUND, "No object found for that SocketId");
}
- google::protobuf::util::Status s =
- google::protobuf::util::JsonStringToMessage(json_str, response);
+ grpc::protobuf::util::Status s =
+ grpc::protobuf::json::JsonStringToMessage(json_str, response);
gpr_free(json_str);
- if (s != google::protobuf::util::Status::OK) {
- return Status(INTERNAL, s.ToString());
+ if (!s.ok()) {
+ return Status(StatusCode::INTERNAL, s.ToString());
}
return Status::OK;
}
diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc
index 0c03fdf17a..c951c69d51 100644
--- a/src/cpp/server/health/default_health_check_service.cc
+++ b/src/cpp/server/health/default_health_check_service.cc
@@ -26,8 +26,8 @@
#include "pb_decode.h"
#include "pb_encode.h"
+#include "src/core/ext/filters/client_channel/health/health.pb.h"
#include "src/cpp/server/health/default_health_check_service.h"
-#include "src/cpp/server/health/health.pb.h"
namespace grpc {
@@ -78,12 +78,12 @@ void DefaultHealthCheckService::RegisterCallHandler(
void DefaultHealthCheckService::UnregisterCallHandler(
const grpc::string& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
std::unique_lock<std::mutex> lock(mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) return;
ServiceData& service_data = it->second;
- service_data.RemoveCallHandler(std::move(handler));
+ service_data.RemoveCallHandler(handler);
if (service_data.Unused()) {
services_map_.erase(it);
}
@@ -115,7 +115,7 @@ void DefaultHealthCheckService::ServiceData::AddCallHandler(
}
void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
call_handlers_.erase(handler);
}
@@ -184,16 +184,13 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
std::vector<Slice> slices;
if (!request.Dump(&slices).ok()) return false;
uint8_t* request_bytes = nullptr;
- bool request_bytes_owned = false;
size_t request_size = 0;
grpc_health_v1_HealthCheckRequest request_struct;
- if (slices.empty()) {
- request_struct.has_service = false;
- } else if (slices.size() == 1) {
+ request_struct.has_service = false;
+ if (slices.size() == 1) {
request_bytes = const_cast<uint8_t*>(slices[0].begin());
request_size = slices[0].size();
- } else {
- request_bytes_owned = true;
+ } else if (slices.size() > 1) {
request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
uint8_t* copy_to = request_bytes;
for (size_t i = 0; i < slices.size(); i++) {
@@ -201,15 +198,13 @@ bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
copy_to += slices[i].size();
}
}
- if (request_bytes != nullptr) {
- pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
- bool decode_status = pb_decode(
- &istream, grpc_health_v1_HealthCheckRequest_fields, &request_struct);
- if (request_bytes_owned) {
- gpr_free(request_bytes);
- }
- if (!decode_status) return false;
+ pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
+ bool decode_status = pb_decode(
+ &istream, grpc_health_v1_HealthCheckRequest_fields, &request_struct);
+ if (slices.size() > 1) {
+ gpr_free(request_bytes);
}
+ if (!decode_status) return false;
*service_name = request_struct.has_service ? request_struct.service : "";
return true;
}
@@ -318,6 +313,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
service_, this);
}
+ self.reset(); // To appease clang-tidy.
}
//
@@ -442,7 +438,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
if (finish_called_) return;
std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
- if (!service_->shutdown_) return;
+ if (service_->shutdown_) return;
SendFinishLocked(std::move(self), status);
}
@@ -464,6 +460,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
"handler: %p).",
service_, service_name_.c_str(), this);
}
+ self.reset(); // To appease clang-tidy.
}
// TODO(roth): This method currently assumes that there will be only one
@@ -473,9 +470,10 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
GPR_ASSERT(ok);
gpr_log(GPR_DEBUG,
- "[HCS %p] Healt watch call is notified done (handler: %p, "
+ "[HCS %p] Health watch call is notified done (handler: %p, "
"is_cancelled: %d).",
service_, this, static_cast<int>(ctx_.IsCancelled()));
+ database_->UnregisterCallHandler(service_name_, self);
SendFinish(std::move(self), Status::CANCELLED);
}
diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h
index 3bab76b6b0..450bd543f5 100644
--- a/src/cpp/server/health/default_health_check_service.h
+++ b/src/cpp/server/health/default_health_check_service.h
@@ -252,7 +252,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
void AddCallHandler(
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
void RemoveCallHandler(
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
bool Unused() const {
return call_handlers_.empty() && status_ == NOT_FOUND;
}
@@ -269,7 +269,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
void UnregisterCallHandler(
const grpc::string& service_name,
- std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
+ const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
mutable std::mutex mu_;
std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_.
diff --git a/src/cpp/server/health/health.pb.c b/src/cpp/server/health/health.pb.c
deleted file mode 100644
index 5c214c7160..0000000000
--- a/src/cpp/server/health/health.pb.c
+++ /dev/null
@@ -1,23 +0,0 @@
-/* Automatically generated nanopb constant definitions */
-/* Generated by nanopb-0.3.7-dev */
-
-#include "src/cpp/server/health/health.pb.h"
-/* @@protoc_insertion_point(includes) */
-#if PB_PROTO_HEADER_VERSION != 30
-#error Regenerate this file with the current version of nanopb generator.
-#endif
-
-
-
-const pb_field_t grpc_health_v1_HealthCheckRequest_fields[2] = {
- PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_health_v1_HealthCheckRequest, service, service, 0),
- PB_LAST_FIELD
-};
-
-const pb_field_t grpc_health_v1_HealthCheckResponse_fields[2] = {
- PB_FIELD( 1, UENUM , OPTIONAL, STATIC , FIRST, grpc_health_v1_HealthCheckResponse, status, status, 0),
- PB_LAST_FIELD
-};
-
-
-/* @@protoc_insertion_point(eof) */
diff --git a/src/cpp/server/health/health.pb.h b/src/cpp/server/health/health.pb.h
deleted file mode 100644
index 9d54ccd618..0000000000
--- a/src/cpp/server/health/health.pb.h
+++ /dev/null
@@ -1,73 +0,0 @@
-/* Automatically generated nanopb header */
-/* Generated by nanopb-0.3.7-dev */
-
-#ifndef PB_GRPC_HEALTH_V1_HEALTH_PB_H_INCLUDED
-#define PB_GRPC_HEALTH_V1_HEALTH_PB_H_INCLUDED
-#include "pb.h"
-/* @@protoc_insertion_point(includes) */
-#if PB_PROTO_HEADER_VERSION != 30
-#error Regenerate this file with the current version of nanopb generator.
-#endif
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/* Enum definitions */
-typedef enum _grpc_health_v1_HealthCheckResponse_ServingStatus {
- grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN = 0,
- grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING = 1,
- grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING = 2,
- grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN = 3
-} grpc_health_v1_HealthCheckResponse_ServingStatus;
-#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MIN grpc_health_v1_HealthCheckResponse_ServingStatus_UNKNOWN
-#define _grpc_health_v1_HealthCheckResponse_ServingStatus_MAX grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
-#define _grpc_health_v1_HealthCheckResponse_ServingStatus_ARRAYSIZE ((grpc_health_v1_HealthCheckResponse_ServingStatus)(grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN+1))
-
-/* Struct definitions */
-typedef struct _grpc_health_v1_HealthCheckRequest {
- bool has_service;
- char service[200];
-/* @@protoc_insertion_point(struct:grpc_health_v1_HealthCheckRequest) */
-} grpc_health_v1_HealthCheckRequest;
-
-typedef struct _grpc_health_v1_HealthCheckResponse {
- bool has_status;
- grpc_health_v1_HealthCheckResponse_ServingStatus status;
-/* @@protoc_insertion_point(struct:grpc_health_v1_HealthCheckResponse) */
-} grpc_health_v1_HealthCheckResponse;
-
-/* Default values for struct fields */
-
-/* Initializer values for message structs */
-#define grpc_health_v1_HealthCheckRequest_init_default {false, ""}
-#define grpc_health_v1_HealthCheckResponse_init_default {false, (grpc_health_v1_HealthCheckResponse_ServingStatus)0}
-#define grpc_health_v1_HealthCheckRequest_init_zero {false, ""}
-#define grpc_health_v1_HealthCheckResponse_init_zero {false, (grpc_health_v1_HealthCheckResponse_ServingStatus)0}
-
-/* Field tags (for use in manual encoding/decoding) */
-#define grpc_health_v1_HealthCheckRequest_service_tag 1
-#define grpc_health_v1_HealthCheckResponse_status_tag 1
-
-/* Struct field encoding specification for nanopb */
-extern const pb_field_t grpc_health_v1_HealthCheckRequest_fields[2];
-extern const pb_field_t grpc_health_v1_HealthCheckResponse_fields[2];
-
-/* Maximum encoded size of messages (where known) */
-#define grpc_health_v1_HealthCheckRequest_size 203
-#define grpc_health_v1_HealthCheckResponse_size 2
-
-/* Message IDs (where set with "msgid" option) */
-#ifdef PB_MSGID
-
-#define HEALTH_MESSAGES \
-
-
-#endif
-
-#ifdef __cplusplus
-} /* extern "C" */
-#endif
-/* @@protoc_insertion_point(eof) */
-
-#endif
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 44ee0846b6..995e787785 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -40,8 +40,10 @@ namespace grpc {
class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- CompletionOp()
- : has_tag_(false),
+ // must ref the call before calling constructor and after deleting this
+ CompletionOp(internal::Call* call)
+ : call_(*call),
+ has_tag_(false),
tag_(nullptr),
refs_(2),
finalized_(false),
@@ -55,6 +57,21 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
}
void FillOps(internal::Call* call) override;
+
+ // This should always be arena allocated in the call, so override delete.
+ // But this class is not trivially destructible, so must actually call delete
+ // before allowing the arena to be freed
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(CompletionOp));
+ }
+
+ // This operator should never be called as the memory should be freed as part
+ // of the arena destruction. It only exists to provide a matching operator
+ // delete to the operator new so that some compilers will not complain (see
+ // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
+ // there are no tests catching the compiler warning.
+ static void operator delete(void*, void*) { assert(0); }
+
bool FinalizeResult(void** tag, bool* status) override;
bool CheckCancelled(CompletionQueue* cq) {
@@ -92,7 +109,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
std::unique_lock<std::mutex> lock(mu_);
if (--refs_ == 0) {
lock.unlock();
+ grpc_call* call = call_.call();
delete this;
+ grpc_call_unref(call);
}
return;
}
@@ -108,6 +127,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
return finalized_ ? (cancelled_ != 0) : false;
}
+ internal::Call call_;
bool has_tag_;
void* tag_;
std::mutex mu_;
@@ -115,7 +135,6 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
bool finalized_;
int cancelled_;
bool done_intercepting_;
- internal::Call call_;
internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
@@ -123,7 +142,9 @@ void ServerContext::CompletionOp::Unref() {
std::unique_lock<std::mutex> lock(mu_);
if (--refs_ == 0) {
lock.unlock();
+ grpc_call* call = call_.call();
delete this;
+ grpc_call_unref(call);
}
}
@@ -133,7 +154,6 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) {
ops.data.recv_close_on_server.cancelled = &cancelled_;
ops.flags = 0;
ops.reserved = nullptr;
- call_ = *call;
interceptor_methods_.SetCall(&call_);
interceptor_methods_.SetReverse();
interceptor_methods_.SetCallOpSetInterface(this);
@@ -153,7 +173,9 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
}
if (--refs_ == 0) {
lock.unlock();
+ grpc_call* call = call_.call();
delete this;
+ grpc_call_unref(call);
}
return ret;
}
@@ -175,7 +197,9 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
lock.lock();
if (--refs_ == 0) {
lock.unlock();
+ grpc_call* call = call_.call();
delete this;
+ grpc_call_unref(call);
}
return ret;
}
@@ -226,7 +250,10 @@ void ServerContext::BeginCompletionOp(internal::Call* call) {
if (rpc_info_) {
rpc_info_->Ref();
}
- completion_op_ = new CompletionOp();
+ grpc_call_ref(call->call());
+ completion_op_ =
+ new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
+ CompletionOp(call);
if (has_notify_when_done_tag_) {
completion_op_->set_tag(async_notify_when_done_tag_);
}