aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server.cc')
-rw-r--r--src/cpp/server/server.cc31
1 files changed, 25 insertions, 6 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 21debcc748..89854f9493 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -118,6 +118,7 @@ class Server::UnimplementedAsyncResponse GRPC_FINAL
UnimplementedAsyncRequest* const request_;
};
+// TODO (sreek) - This might no longer be needed
class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
@@ -126,6 +127,13 @@ class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
}
};
+class ShutdownTag : public CompletionQueueTag {
+ public:
+ bool FinalizeResult(void** tag, bool *status) {
+ return false;
+ }
+};
+
class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
SyncRequest(RpcServiceMethod* method, void* tag)
@@ -147,6 +155,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_metadata_array_destroy(&request_metadata_);
}
+ // TODO (Sreek) This function is probably no longer needed
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
*ok = false;
@@ -158,6 +167,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
+ // TODO (sreek) - This function is probably no longer needed
static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
gpr_timespec deadline) {
void* tag = nullptr;
@@ -177,6 +187,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
GPR_UNREACHABLE_CODE(return false);
}
+ // TODO (sreek) - Refactor this SetupRequest/TeardownRequest and ResetRequest
+ // functions
void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
@@ -184,6 +196,10 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
cq_ = nullptr;
}
+ void ResetRequest() {
+ in_flight_ = false;
+ }
+
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
@@ -326,10 +342,12 @@ class Server::SyncRequestManager : public GrpcRpcManager {
GPR_TIMER_SCOPE("cd.Run()", 0);
cd.Run(global_callbacks_);
} else {
+ sync_req->ResetRequest();
// ok is false. For some reason, the tag was returned but event was not
// successful. In this case, request again unless we are shutting down
if (!IsShutdown()) {
- sync_req->Request(server_->c_server(), server_cq_->cq());
+ // TODO (sreek) Remove this
+ // sync_req->Request(server_->c_server(), server_cq_->cq());
}
}
}
@@ -371,7 +389,8 @@ class Server::SyncRequestManager : public GrpcRpcManager {
static internal::GrpcLibraryInitializer g_gli_initializer;
Server::Server(
- std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs,
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs,
int max_message_size, ChannelArguments* args, int min_pollers,
int max_pollers)
: max_message_size_(max_message_size),
@@ -390,7 +409,7 @@ Server::Server(
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
sync_req_mgrs_.emplace_back(new SyncRequestManager(
- this, &(*it), global_callbacks_, min_pollers, max_pollers));
+ this, (*it).get(), global_callbacks_, min_pollers, max_pollers));
}
grpc_channel_args channel_args;
@@ -411,7 +430,7 @@ Server::~Server() {
// destructor
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
- (*it).Shutdown();
+ (*it)->Shutdown();
}
// TODO (sreek) Delete this
@@ -552,7 +571,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
if (started_ && !shutdown_) {
shutdown_ = true;
- int shutdown_tag = 0; // Dummy shutdown tag
+ ShutdownTag shutdown_tag; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq_.cq(), &shutdown_tag);
// Shutdown all RpcManagers. This will try to gracefully stop all the
@@ -587,7 +606,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// destructor)
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
- (*it).Shutdown();
+ (*it)->Shutdown();
}
/*