From 4028d2c11b6561ad7aea71e7bc465dc56865d40d Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 21 Sep 2016 10:45:33 -0700 Subject: More fixes --- include/grpc++/server.h | 6 ++++-- src/cpp/server/server.cc | 31 +++++++++++++++++++++++++------ src/cpp/server/server_builder.cc | 16 +++++++--------- 3 files changed, 36 insertions(+), 17 deletions(-) diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 0c8b22184b..af0a15d7bd 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -135,7 +135,8 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { /// \param max_pollers The maximum number of polling threads per server /// completion queue (in param sync_server_cqs) to use for listening to /// incoming requests (used only in case of sync server) - Server(std::shared_ptr> sync_server_cqs, + Server(std::shared_ptr>> + sync_server_cqs, int max_message_size, ChannelArguments* args, int min_pollers, int max_pollers); @@ -193,7 +194,8 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen { /// The following completion queues are ONLY used in case of Sync API i.e if /// the server has any services with sync methods. The server uses these /// completion queues to poll for new RPCs - std::shared_ptr> sync_server_cqs_; + std::shared_ptr>> + sync_server_cqs_; /// List of GrpcRpcManager instances (one for each cq in the sync_server_cqs) std::vector> sync_req_mgrs_; diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 21debcc748..89854f9493 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -118,6 +118,7 @@ class Server::UnimplementedAsyncResponse GRPC_FINAL UnimplementedAsyncRequest* const request_; }; +// TODO (sreek) - This might no longer be needed class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { @@ -126,6 +127,13 @@ class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { } }; +class ShutdownTag : public CompletionQueueTag { + public: + bool FinalizeResult(void** tag, bool *status) { + return false; + } +}; + class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { public: SyncRequest(RpcServiceMethod* method, void* tag) @@ -147,6 +155,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_metadata_array_destroy(&request_metadata_); } + // TODO (Sreek) This function is probably no longer needed static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { void* tag = nullptr; *ok = false; @@ -158,6 +167,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } + // TODO (sreek) - This function is probably no longer needed static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok, gpr_timespec deadline) { void* tag = nullptr; @@ -177,6 +187,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { GPR_UNREACHABLE_CODE(return false); } + // TODO (sreek) - Refactor this SetupRequest/TeardownRequest and ResetRequest + // functions void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void TeardownRequest() { @@ -184,6 +196,10 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { cq_ = nullptr; } + void ResetRequest() { + in_flight_ = false; + } + void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; @@ -326,10 +342,12 @@ class Server::SyncRequestManager : public GrpcRpcManager { GPR_TIMER_SCOPE("cd.Run()", 0); cd.Run(global_callbacks_); } else { + sync_req->ResetRequest(); // ok is false. For some reason, the tag was returned but event was not // successful. In this case, request again unless we are shutting down if (!IsShutdown()) { - sync_req->Request(server_->c_server(), server_cq_->cq()); + // TODO (sreek) Remove this + // sync_req->Request(server_->c_server(), server_cq_->cq()); } } } @@ -371,7 +389,8 @@ class Server::SyncRequestManager : public GrpcRpcManager { static internal::GrpcLibraryInitializer g_gli_initializer; Server::Server( - std::shared_ptr> sync_server_cqs, + std::shared_ptr>> + sync_server_cqs, int max_message_size, ChannelArguments* args, int min_pollers, int max_pollers) : max_message_size_(max_message_size), @@ -390,7 +409,7 @@ Server::Server( for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); it++) { sync_req_mgrs_.emplace_back(new SyncRequestManager( - this, &(*it), global_callbacks_, min_pollers, max_pollers)); + this, (*it).get(), global_callbacks_, min_pollers, max_pollers)); } grpc_channel_args channel_args; @@ -411,7 +430,7 @@ Server::~Server() { // destructor for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); it++) { - (*it).Shutdown(); + (*it)->Shutdown(); } // TODO (sreek) Delete this @@ -552,7 +571,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { if (started_ && !shutdown_) { shutdown_ = true; - int shutdown_tag = 0; // Dummy shutdown tag + ShutdownTag shutdown_tag; // Dummy shutdown tag grpc_server_shutdown_and_notify(server_, shutdown_cq_.cq(), &shutdown_tag); // Shutdown all RpcManagers. This will try to gracefully stop all the @@ -587,7 +606,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // destructor) for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); it++) { - (*it).Shutdown(); + (*it)->Shutdown(); } /* diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 786195ed6c..8c70ac8f99 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -93,7 +93,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService( gpr_log(GPR_ERROR, "Adding multiple AsyncGenericService is unsupported for now. " "Dropping the service %p", - (void *) service); + (void*)service); } else { generic_service_ = service; } @@ -163,8 +163,9 @@ std::unique_ptr ServerBuilder::BuildAndStart() { // This is different from the completion queues added to the server via // ServerBuilder's AddCompletionQueue() method (those completion queues // are in 'cqs_' member variable of ServerBuilder object) - std::shared_ptr> sync_server_cqs( - new std::vector()); + std::shared_ptr>> + sync_server_cqs( + new std::vector>()); if (has_sync_methods) { // If the server has synchronous methods, it will need completion queues to @@ -177,11 +178,7 @@ std::unique_ptr ServerBuilder::BuildAndStart() { num_cqs = GPR_MAX(num_cqs, 4); for (int i = 0; i < num_cqs; i++) { - // emplace_back() would have been ideal here but doesn't work since the - // ServerCompletionQueue's constructor is private. With emplace_back, the - // constructor is called from somewhere within the library; so making - // ServerBuilder class a friend to ServerCompletion queue won't help. - sync_server_cqs->push_back(ServerCompletionQueue()); + sync_server_cqs->emplace_back(new ServerCompletionQueue()); } } @@ -222,7 +219,8 @@ std::unique_ptr ServerBuilder::BuildAndStart() { int num_frequently_polled_cqs = sync_server_cqs->size(); for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) { - grpc_server_register_completion_queue(server->server_, it->cq(), nullptr); + grpc_server_register_completion_queue(server->server_, (*it)->cq(), + nullptr); } // cqs_ contains the completion queue added by calling the ServerBuilder's -- cgit v1.2.3