aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc++/impl/codegen/server_interface.h7
-rw-r--r--include/grpc++/server.h4
-rw-r--r--src/cpp/server/health/default_health_check_service.cc17
-rw-r--r--src/cpp/server/health/default_health_check_service.h11
-rw-r--r--src/cpp/server/server_cc.cc140
-rw-r--r--test/cpp/end2end/health_service_end2end_test.cc11
6 files changed, 28 insertions, 162 deletions
diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h
index 785568f19a..bd1b36e883 100644
--- a/include/grpc++/impl/codegen/server_interface.h
+++ b/include/grpc++/impl/codegen/server_interface.h
@@ -158,8 +158,7 @@ class ServerInterface : public CallHook {
public:
RegisteredAsyncRequest(ServerInterface* server, ServerContext* context,
ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq, void* tag,
- bool delete_on_finalize);
+ CompletionQueue* call_cq, void* tag);
// uses BaseAsyncRequest::FinalizeResult
@@ -175,7 +174,7 @@ class ServerInterface : public CallHook {
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
- : RegisteredAsyncRequest(server, context, stream, call_cq, tag, true) {
+ : RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
IssueRequest(registered_method, nullptr, notification_cq);
}
@@ -191,7 +190,7 @@ class ServerInterface : public CallHook {
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* request)
- : RegisteredAsyncRequest(server, context, stream, call_cq, tag, true),
+ : RegisteredAsyncRequest(server, context, stream, call_cq, tag),
request_(request) {
IssueRequest(registered_method, &payload_, notification_cq);
}
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 3f205625ee..e3e9174c9c 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -119,10 +119,6 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen {
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
- class HealthCheckAsyncRequestContext;
- class HealthCheckAsyncRequest;
- class HealthCheckAsyncResponse;
-
/// Server constructors. To be used by \a ServerBuilder only.
///
/// \param max_message_size Maximum message length that the channel can
diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc
index 9743bd5775..46def70e8a 100644
--- a/src/cpp/server/health/default_health_check_service.cc
+++ b/src/cpp/server/health/default_health_check_service.cc
@@ -49,14 +49,11 @@ const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
} // namespace
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
- DefaultHealthCheckService* service, bool sync)
- : service_(service), method_(nullptr), sync_(sync) {
- MethodHandler* handler = nullptr;
- if (sync_) {
- handler =
- new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>(
- std::mem_fn(&HealthCheckServiceImpl::Check), this);
- }
+ DefaultHealthCheckService* service)
+ : service_(service), method_(nullptr) {
+ MethodHandler* handler =
+ new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>(
+ std::mem_fn(&HealthCheckServiceImpl::Check), this);
method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC,
handler);
AddMethod(method_);
@@ -160,9 +157,9 @@ DefaultHealthCheckService::GetServingStatus(
}
DefaultHealthCheckService::HealthCheckServiceImpl*
-DefaultHealthCheckService::GetHealthCheckService(bool sync) {
+DefaultHealthCheckService::GetHealthCheckService() {
GPR_ASSERT(impl_ == nullptr);
- impl_.reset(new HealthCheckServiceImpl(this, sync));
+ impl_.reset(new HealthCheckServiceImpl(this));
return impl_.get();
}
diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h
index 1ecb0a2ba9..5c0e230342 100644
--- a/src/cpp/server/health/default_health_check_service.h
+++ b/src/cpp/server/health/default_health_check_service.h
@@ -49,21 +49,14 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
// The service impl to register with the server.
class HealthCheckServiceImpl : public Service {
public:
- HealthCheckServiceImpl(DefaultHealthCheckService* service, bool sync);
+ explicit HealthCheckServiceImpl(DefaultHealthCheckService* service);
Status Check(ServerContext* context, const ByteBuffer* request,
ByteBuffer* response);
- bool sync() { return sync_; }
-
- // This is only useful for the async mode. It should be called after
- // RegisterService returns.
- void* server_tag() const { return method_->server_tag(); }
-
private:
const DefaultHealthCheckService* const service_;
RpcServiceMethod* method_;
- const bool sync_;
};
DefaultHealthCheckService();
@@ -72,7 +65,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
void SetServingStatus(bool serving) override;
enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
ServingStatus GetServingStatus(const grpc::string& service_name) const;
- HealthCheckServiceImpl* GetHealthCheckService(bool sync);
+ HealthCheckServiceImpl* GetHealthCheckService();
private:
mutable std::mutex mu_;
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 69e491dc96..c377297ec0 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -119,79 +119,6 @@ class Server::UnimplementedAsyncResponse final
UnimplementedAsyncRequest* const request_;
};
-// This is a dummy implementation of the interface so that
-// HealthCheckAsyncRequest can get Call from RegisteredAsyncRequest. It does not
-// do any reading or writing.
-class HealthCheckAsyncResponseWriter final
- : public ServerAsyncStreamingInterface {
- public:
- HealthCheckAsyncResponseWriter() : call_(nullptr, nullptr, nullptr) {}
- void SendInitialMetadata(void* tag) override {
- abort(); // should not be called.
- }
- void BindCall(Call* call) override { call_ = *call; }
- Call* call() { return &call_; }
-
- private:
- Call call_;
-};
-
-class Server::HealthCheckAsyncRequestContext {
- protected:
- ServerContext server_context_;
- HealthCheckAsyncResponseWriter rpc_;
-};
-
-class Server::HealthCheckAsyncRequest final
- : public HealthCheckAsyncRequestContext,
- public RegisteredAsyncRequest {
- public:
- HealthCheckAsyncRequest(
- DefaultHealthCheckService::HealthCheckServiceImpl* service,
- Server* server, ServerCompletionQueue* cq)
- : RegisteredAsyncRequest(server, &server_context_, &rpc_, cq, this,
- false),
- service_(service),
- server_(server),
- cq_(cq) {
- IssueRequest(service->server_tag(), &payload_, cq);
- }
-
- bool FinalizeResult(void** tag, bool* status) override;
- Call* call() { return rpc_.call(); }
- ByteBuffer* response() { return &response_; }
- Status* status() { return &status_; }
- ServerContext* server_context() { return &server_context_; }
-
- private:
- DefaultHealthCheckService::HealthCheckServiceImpl* service_;
- Server* const server_;
- ServerCompletionQueue* const cq_;
- grpc_byte_buffer* payload_;
- ByteBuffer request_;
- ByteBuffer response_;
- Status status_;
-};
-
-typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus>
- HealthCheckAsyncResponseOp;
-class Server::HealthCheckAsyncResponse final
- : public HealthCheckAsyncResponseOp {
- public:
- HealthCheckAsyncResponse(HealthCheckAsyncRequest* request);
- ~HealthCheckAsyncResponse() { delete request_; }
-
- bool FinalizeResult(void** tag, bool* status) override {
- HealthCheckAsyncResponseOp::FinalizeResult(tag, status);
- delete this;
- return false;
- }
-
- private:
- HealthCheckAsyncRequest* const request_;
-};
-
class ShutdownTag : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) { return false; }
@@ -572,14 +499,16 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
// Only create default health check service when user did not provide an
// explicit one.
- DefaultHealthCheckService::HealthCheckServiceImpl* health_service = nullptr;
if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
DefaultHealthCheckServiceEnabled()) {
- auto* default_hc_service = new DefaultHealthCheckService;
- health_check_service_.reset(default_hc_service);
- health_service =
- default_hc_service->GetHealthCheckService(!sync_server_cqs_->empty());
- RegisterService(nullptr, health_service);
+ if (sync_server_cqs_->empty()) {
+ gpr_log(GPR_ERROR,
+ "Default health check service disabled at async-only server.");
+ } else {
+ auto* default_hc_service = new DefaultHealthCheckService;
+ health_check_service_.reset(default_hc_service);
+ RegisterService(nullptr, default_hc_service->GetHealthCheckService());
+ }
}
grpc_server_start(server_);
@@ -596,14 +525,6 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
}
}
- if (health_service && !health_service->sync()) {
- for (size_t i = 0; i < num_cqs; i++) {
- if (cqs[i]->IsFrequentlyPolled()) {
- new HealthCheckAsyncRequest(health_service, this, cqs[i]);
- }
- }
- }
-
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start();
}
@@ -715,10 +636,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
- bool delete_on_finalize)
- : BaseAsyncRequest(server, context, stream, call_cq, tag,
- delete_on_finalize) {}
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
void ServerInterface::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
@@ -776,45 +695,6 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
request_->stream()->call_.PerformOps(this);
}
-bool Server::HealthCheckAsyncRequest::FinalizeResult(void** tag, bool* status) {
- bool serialization_status =
- *status && payload_ &&
- SerializationTraits<ByteBuffer>::Deserialize(payload_, &request_).ok();
- RegisteredAsyncRequest::FinalizeResult(tag, status);
- *status = serialization_status && *status;
- if (*status) {
- new HealthCheckAsyncRequest(service_, server_, cq_);
- status_ = service_->Check(&server_context_, &request_, &response_);
- new HealthCheckAsyncResponse(this);
- return false;
- } else {
- delete this;
- return false;
- }
-}
-
-Server::HealthCheckAsyncResponse::HealthCheckAsyncResponse(
- HealthCheckAsyncRequest* request)
- : request_(request) {
- ServerContext* context = request_->server_context();
- if (!context->sent_initial_metadata_) {
- SendInitialMetadata(context->initial_metadata_,
- context->initial_metadata_flags());
- if (context->compression_level_set()) {
- set_compression_level(context->compression_level());
- }
- context->sent_initial_metadata_ = true;
- }
- Status* status = request_->status();
- if (status->ok()) {
- ServerSendStatus(context->trailing_metadata_,
- SendMessage(*request_->response()));
- } else {
- ServerSendStatus(context->trailing_metadata_, *status);
- }
- request_->call()->PerformOps(this);
-}
-
ServerInitializer* Server::initializer() { return server_initializer_.get(); }
} // namespace grpc
diff --git a/test/cpp/end2end/health_service_end2end_test.cc b/test/cpp/end2end/health_service_end2end_test.cc
index 8a6a9886c9..3d51007857 100644
--- a/test/cpp/end2end/health_service_end2end_test.cc
+++ b/test/cpp/end2end/health_service_end2end_test.cc
@@ -273,12 +273,13 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceAsyncOnly) {
SetUpServer(false, true, false, nullptr);
cq_thread_ = std::thread(LoopCompletionQueue, cq_.get());
- VerifyHealthCheckService();
+ HealthCheckServiceInterface* default_service =
+ server_->GetHealthCheckService();
+ EXPECT_TRUE(default_service == nullptr);
- // The default service has a size limit of the service name.
- const grpc::string kTooLongServiceName(201, 'x');
- SendHealthCheckRpc(kTooLongServiceName,
- Status(StatusCode::INVALID_ARGUMENT, ""));
+ ResetStubs();
+
+ SendHealthCheckRpc("", Status(StatusCode::UNIMPLEMENTED, ""));
}
// Provide an empty service to disable the default service.