aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-09-21 10:45:33 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-09-21 10:45:33 -0700
commit4028d2c11b6561ad7aea71e7bc465dc56865d40d (patch)
tree003358672b4d573650a1c4d527ed16e1a9d5035a /src/cpp
parent4306eeee397760e11b416f43e881e7dfb87f88b0 (diff)
More fixes
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/server/server.cc31
-rw-r--r--src/cpp/server/server_builder.cc16
2 files changed, 32 insertions, 15 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 21debcc748..89854f9493 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -118,6 +118,7 @@ class Server::UnimplementedAsyncResponse GRPC_FINAL
UnimplementedAsyncRequest* const request_;
};
+// TODO (sreek) - This might no longer be needed
class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
@@ -126,6 +127,13 @@ class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
}
};
+class ShutdownTag : public CompletionQueueTag {
+ public:
+ bool FinalizeResult(void** tag, bool *status) {
+ return false;
+ }
+};
+
class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
SyncRequest(RpcServiceMethod* method, void* tag)
@@ -147,6 +155,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_metadata_array_destroy(&request_metadata_);
}
+ // TODO (Sreek) This function is probably no longer needed
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
*ok = false;
@@ -158,6 +167,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
+ // TODO (sreek) - This function is probably no longer needed
static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
gpr_timespec deadline) {
void* tag = nullptr;
@@ -177,6 +187,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
GPR_UNREACHABLE_CODE(return false);
}
+ // TODO (sreek) - Refactor this SetupRequest/TeardownRequest and ResetRequest
+ // functions
void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
@@ -184,6 +196,10 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
cq_ = nullptr;
}
+ void ResetRequest() {
+ in_flight_ = false;
+ }
+
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
@@ -326,10 +342,12 @@ class Server::SyncRequestManager : public GrpcRpcManager {
GPR_TIMER_SCOPE("cd.Run()", 0);
cd.Run(global_callbacks_);
} else {
+ sync_req->ResetRequest();
// ok is false. For some reason, the tag was returned but event was not
// successful. In this case, request again unless we are shutting down
if (!IsShutdown()) {
- sync_req->Request(server_->c_server(), server_cq_->cq());
+ // TODO (sreek) Remove this
+ // sync_req->Request(server_->c_server(), server_cq_->cq());
}
}
}
@@ -371,7 +389,8 @@ class Server::SyncRequestManager : public GrpcRpcManager {
static internal::GrpcLibraryInitializer g_gli_initializer;
Server::Server(
- std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs,
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs,
int max_message_size, ChannelArguments* args, int min_pollers,
int max_pollers)
: max_message_size_(max_message_size),
@@ -390,7 +409,7 @@ Server::Server(
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
sync_req_mgrs_.emplace_back(new SyncRequestManager(
- this, &(*it), global_callbacks_, min_pollers, max_pollers));
+ this, (*it).get(), global_callbacks_, min_pollers, max_pollers));
}
grpc_channel_args channel_args;
@@ -411,7 +430,7 @@ Server::~Server() {
// destructor
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
- (*it).Shutdown();
+ (*it)->Shutdown();
}
// TODO (sreek) Delete this
@@ -552,7 +571,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
if (started_ && !shutdown_) {
shutdown_ = true;
- int shutdown_tag = 0; // Dummy shutdown tag
+ ShutdownTag shutdown_tag; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq_.cq(), &shutdown_tag);
// Shutdown all RpcManagers. This will try to gracefully stop all the
@@ -587,7 +606,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// destructor)
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
- (*it).Shutdown();
+ (*it)->Shutdown();
}
/*
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 786195ed6c..8c70ac8f99 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -93,7 +93,7 @@ ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
gpr_log(GPR_ERROR,
"Adding multiple AsyncGenericService is unsupported for now. "
"Dropping the service %p",
- (void *) service);
+ (void*)service);
} else {
generic_service_ = service;
}
@@ -163,8 +163,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
// This is different from the completion queues added to the server via
// ServerBuilder's AddCompletionQueue() method (those completion queues
// are in 'cqs_' member variable of ServerBuilder object)
- std::shared_ptr<std::vector<ServerCompletionQueue>> sync_server_cqs(
- new std::vector<ServerCompletionQueue>());
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs(
+ new std::vector<std::unique_ptr<ServerCompletionQueue>>());
if (has_sync_methods) {
// If the server has synchronous methods, it will need completion queues to
@@ -177,11 +178,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
num_cqs = GPR_MAX(num_cqs, 4);
for (int i = 0; i < num_cqs; i++) {
- // emplace_back() would have been ideal here but doesn't work since the
- // ServerCompletionQueue's constructor is private. With emplace_back, the
- // constructor is called from somewhere within the library; so making
- // ServerBuilder class a friend to ServerCompletion queue won't help.
- sync_server_cqs->push_back(ServerCompletionQueue());
+ sync_server_cqs->emplace_back(new ServerCompletionQueue());
}
}
@@ -222,7 +219,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
int num_frequently_polled_cqs = sync_server_cqs->size();
for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
- grpc_server_register_completion_queue(server->server_, it->cq(), nullptr);
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
+ nullptr);
}
// cqs_ contains the completion queue added by calling the ServerBuilder's