From 8e708b12cb1c88f5cb6e3984d887c3c2b9bee54f Mon Sep 17 00:00:00 2001 From: yang-g Date: Thu, 29 Dec 2016 11:44:36 -0800 Subject: WIP --- src/cpp/server/server_cc.cc | 103 +++++++++++++++++++++++++++++++++----------- 1 file changed, 77 insertions(+), 26 deletions(-) (limited to 'src/cpp/server/server_cc.cc') diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 20641aeea8..43f0947095 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -119,11 +119,24 @@ class Server::UnimplementedAsyncResponse final UnimplementedAsyncRequest* const request_; }; +class HealthCheckAsyncResponseWriter final + : public ServerAsyncStreamingInterface { + public: + HealthCheckAsyncResponseWriter() : call_(nullptr, nullptr, nullptr) {} + void SendInitialMetadata(void* tag) override { + abort(); // should not be called. + } + void BindCall(Call* call) override { call_ = *call; } + Call* call() { return &call_; } + + private: + Call call_; +}; + class Server::HealthCheckAsyncRequestContext { protected: - HealthCheckAsyncRequestContext() : rpc_(&server_context_) {} ServerContext server_context_; - ServerAsyncResponseWriter rpc_; + HealthCheckAsyncResponseWriter rpc_; }; class Server::HealthCheckAsyncRequest final @@ -137,49 +150,86 @@ class Server::HealthCheckAsyncRequest final false), service_(service), server_(server), - cq_(cq), - had_request_(false) { + cq_(cq) { IssueRequest(service->method()->server_tag(), &payload_, cq); } bool FinalizeResult(void** tag, bool* status) override; + Call* call() { return rpc_.call(); } + ByteBuffer* response() { return &response_; } + Status* status() { return &status_; } + ServerContext* server_context() { return &server_context_; } private: DefaultHealthCheckService::AsyncHealthCheckServiceImpl* service_; Server* const server_; ServerCompletionQueue* const cq_; grpc_byte_buffer* payload_; - bool had_request_; ByteBuffer request_; ByteBuffer response_; + Status status_; +}; + +typedef SneakyCallOpSet + HealthCheckAsyncResponseOp; +class Server::HealthCheckAsyncResponse final + : public HealthCheckAsyncResponseOp { + public: + HealthCheckAsyncResponse(HealthCheckAsyncRequest* request); + ~HealthCheckAsyncResponse() { delete request_; } + + bool FinalizeResult(void** tag, bool* status) override { + HealthCheckAsyncResponseOp::FinalizeResult(tag, status); + delete this; + return false; + } + + private: + HealthCheckAsyncRequest* const request_; }; bool Server::HealthCheckAsyncRequest::FinalizeResult(void** tag, bool* status) { - if (!had_request_) { - had_request_ = true; - bool serialization_status = - *status && payload_ && - SerializationTraits::Deserialize( - payload_, &request_, server_->max_receive_message_size()) - .ok(); - RegisteredAsyncRequest::FinalizeResult(tag, status); - *status = serialization_status && *status; - if (*status) { - new HealthCheckAsyncRequest(service_, server_, cq_); - Status s = service_->Check(&server_context_, &request_, &response_); - rpc_.Finish(response_, s, this); - return false; - } else { - // TODO what to do here - delete this; - return false; - } + bool serialization_status = + *status && payload_ && + SerializationTraits::Deserialize( + payload_, &request_, server_->max_receive_message_size()) + .ok(); + RegisteredAsyncRequest::FinalizeResult(tag, status); + *status = serialization_status && *status; + if (*status) { + new HealthCheckAsyncRequest(service_, server_, cq_); + status_ = service_->Check(&server_context_, &request_, &response_); + new HealthCheckAsyncResponse(this); + return false; } else { delete this; return false; } } +Server::HealthCheckAsyncResponse::HealthCheckAsyncResponse( + HealthCheckAsyncRequest* request) + : request_(request) { + ServerContext* context = request_->server_context(); + if (!context->sent_initial_metadata_) { + SendInitialMetadata(context->initial_metadata_, + context->initial_metadata_flags()); + if (context->compression_level_set()) { + set_compression_level(context->compression_level()); + } + context->sent_initial_metadata_ = true; + } + Status* status = request_->status(); + if (status->ok()) { + ServerSendStatus(context->trailing_metadata_, + SendMessage(*request_->response())); + } else { + ServerSendStatus(context->trailing_metadata_, *status); + } + request_->call()->PerformOps(this); +} + class ShutdownTag : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { return false; } @@ -567,9 +617,10 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { auto* default_hc_service = new DefaultHealthCheckService; health_check_service_.reset(default_hc_service); if (!sync_server_cqs_->empty()) { // Has sync methods. + gpr_log(GPR_ERROR, "register sync"); // XXX RegisterService(nullptr, default_hc_service->GetSyncHealthCheckService()); - } - if (sync_server_cqs_->empty()) { // No sync methods. + } else { + gpr_log(GPR_ERROR, "register async"); // XXX async_health_service = default_hc_service->GetAsyncHealthCheckService(); RegisterService(nullptr, async_health_service); } -- cgit v1.2.3