diff options
author | 2016-09-21 10:45:33 -0700 | |
---|---|---|
committer | 2016-09-21 10:45:33 -0700 | |
commit | 4028d2c11b6561ad7aea71e7bc465dc56865d40d (patch) | |
tree | 003358672b4d573650a1c4d527ed16e1a9d5035a /src/cpp | |
parent | 4306eeee397760e11b416f43e881e7dfb87f88b0 (diff) |
More fixes
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/server/server.cc | 31 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 16 |
2 files changed, 32 insertions, 15 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 21debcc748..89854f9493 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -118,6 +118,7 @@ class Server::UnimplementedAsyncResponse GRPC_FINAL UnimplementedAsyncRequest* const request_; }; +// TODO (sreek) - This might no longer be needed class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { @@ -126,6 +127,13 @@ class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { } }; +class ShutdownTag : public CompletionQueueTag { + public: + bool FinalizeResult(void** tag, bool *status) { + return false; + } +}; + class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { public: SyncRequest(RpcServiceMethod* method, void* tag) @@ -147,6 +155,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_metadata_array_destroy(&request_metadata_); } + // TODO (Sreek) This function is probably no longer needed static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { void* tag = nullptr; *ok = false; @@ -158,6 +167,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } + // TODO (sreek) - This function is probably no longer needed static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok, gpr_timespec deadline) { void* tag = nullptr; @@ -177,6 +187,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { GPR_UNREACHABLE_CODE(return false); } + // TODO (sreek) - Refactor this SetupRequest/TeardownRequest and ResetRequest + // functions void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void TeardownRequest() { @@ -184,6 +196,10 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { cq_ = nullptr; } + void ResetRequest() { + in_flight_ = false; + } + void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; @@ -326,10 +342,12 @@ class Server::SyncRequestManager : public GrpcRpcManager { GPR_TIMER_SCOPE("cd.Run()", 0); cd.Run(global_callbacks_); } else { + sync_req->ResetRequest(); // ok is false. For some reason, the tag was returned but event was not // successful. In this case, request again unless we are shutting down if (!IsShutdown()) { - sync_req->Request(server_->c_server(), server_cq_->cq()); + // TODO (sreek) Remove this + // sync_req->Request(server_->c_server(), server_cq_->cq()); } } } @@ -371,7 +389,8 @@ class Server::SyncRequestManager : public GrpcRpcManager { static internal::GrpcLibraryInitializer g_gli_initializer; Server::Server( - std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs, + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs, int max_message_size, ChannelArguments* args, int min_pollers, int max_pollers) : max_message_size_(max_message_size), @@ -390,7 +409,7 @@ Server::Server( for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); it++) { sync_req_mgrs_.emplace_back(new SyncRequestManager( - this, &(*it), global_callbacks_, min_pollers, max_pollers)); + this, (*it).get(), global_callbacks_, min_pollers, max_pollers)); } grpc_channel_args channel_args; @@ -411,7 +430,7 @@ Server::~Server() { // destructor for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); it++) { - (*it).Shutdown(); + (*it)->Shutdown(); } // TODO (sreek) Delete this @@ -552,7 +571,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { if (started_ && !shutdown_) { shutdown_ = true; - int shutdown_tag = 0; // Dummy shutdown tag + ShutdownTag shutdown_tag; // Dummy shutdown tag grpc_server_shutdown_and_notify(server_, shutdown_cq_.cq(), &shutdown_tag); // Shutdown all RpcManagers. This will try to gracefully stop all the @@ -587,7 +606,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // destructor) for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end(); it++) { - (*it).Shutdown(); + (*it)->Shutdown(); } /* diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 786195ed6c..8c70ac8f99 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -93,7 +93,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService( gpr_log(GPR_ERROR, "Adding multiple AsyncGenericService is unsupported for now. " "Dropping the service %p", - (void *) service); + (void*)service); } else { generic_service_ = service; } @@ -163,8 +163,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { // This is different from the completion queues added to the server via // ServerBuilder's AddCompletionQueue() method (those completion queues // are in 'cqs_' member variable of ServerBuilder object) - std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs( - new std::vector<ServerCompletionQueue>()); + std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> + sync_server_cqs( + new std::vector<std::unique_ptr<ServerCompletionQueue>>()); if (has_sync_methods) { // If the server has synchronous methods, it will need completion queues to @@ -177,11 +178,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { num_cqs = GPR_MAX(num_cqs, 4); for (int i = 0; i < num_cqs; i++) { - // emplace_back() would have been ideal here but doesn't work since the - // ServerCompletionQueue's constructor is private. With emplace_back, the - // constructor is called from somewhere within the library; so making - // ServerBuilder class a friend to ServerCompletion queue won't help. - sync_server_cqs->push_back(ServerCompletionQueue()); + sync_server_cqs->emplace_back(new ServerCompletionQueue()); } } @@ -222,7 +219,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { int num_frequently_polled_cqs = sync_server_cqs->size(); for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) { - grpc_server_register_completion_queue(server->server_, it->cq(), nullptr); + grpc_server_register_completion_queue(server->server_, (*it)->cq(), + nullptr); } // cqs_ contains the completion queue added by calling the ServerBuilder's |