aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-10 17:39:54 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-10 17:39:54 -0800
commitcbd04850884ed6978fa6ae19ec8d04c3773d8ac4 (patch)
tree50dca5c86839c6a90ba61e281fd33682a5d3b23d
parent36d18a089e66aff8fc542ec2d98623b56a2e78f1 (diff)
Simplify server ready for async path
-rw-r--r--include/grpc++/server.h29
-rw-r--r--src/cpp/server/server.cc58
2 files changed, 44 insertions, 43 deletions
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 670ffa7815..eefd4457f9 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -35,7 +35,7 @@
#define __GRPCPP_SERVER_H__
#include <condition_variable>
-#include <map>
+#include <list>
#include <memory>
#include <mutex>
@@ -69,6 +69,25 @@ class Server {
private:
friend class ServerBuilder;
+ class MethodRequestData {
+ public:
+ MethodRequestData(RpcServiceMethod* method, void* tag) : method_(method), tag_(tag) {}
+ static MethodRequestData *Wait(CompletionQueue *cq);
+
+ void Request(CompletionQueue* cq);
+
+ class CallData {
+ public:
+ explicit CallData(MethodRequestData *mrd);
+
+ void Run();
+ };
+
+ private:
+ RpcServiceMethod *const method_;
+ void *const tag_;
+ };
+
// ServerBuilder use only
Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ServerCredentials* creds);
Server();
@@ -85,7 +104,8 @@ class Server {
void ScheduleCallback();
// Completion queue.
- CompletionQueue cq_;
+ std::unique_ptr<CompletionQueue> cq_sync_;
+ std::unique_ptr<CompletionQueue> cq_async_;
// Sever status
std::mutex mu_;
@@ -95,12 +115,11 @@ class Server {
int num_running_cb_;
std::condition_variable callback_cv_;
+ std::list<MethodRequestData> methods_;
+
// Pointer to the c grpc server.
grpc_server* server_;
- // A map for all method information.
- std::map<grpc::string, RpcServiceMethod*> method_map_;
-
ThreadPoolInterface* thread_pool_;
// Whether the thread pool is created and owned by the server.
bool thread_pool_owned_;
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 44c5276b54..f5bbfdc6f7 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -54,9 +54,9 @@ Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, ServerC
secure_(creds != nullptr) {
if (creds) {
server_ =
- grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr);
+ grpc_secure_server_create(creds->GetRawCreds(), nullptr, nullptr);
} else {
- server_ = grpc_server_create(cq_.cq(), nullptr);
+ server_ = grpc_server_create(nullptr, nullptr);
}
}
@@ -80,13 +80,17 @@ Server::~Server() {
}
bool Server::RegisterService(RpcService *service) {
+ if (!cq_sync_) {
+ cq_sync_.reset(new CompletionQueue);
+ }
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod *method = service->GetMethod(i);
- if (method_map_.find(method->name()) != method_map_.end()) {
+ void *tag = grpc_server_register_method(server_, method->name(), nullptr);
+ if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name());
return false;
}
- method_map_.insert(std::make_pair(method->name(), method));
+ methods_.emplace_back(method, tag);
}
return true;
}
@@ -106,7 +110,11 @@ bool Server::Start() {
grpc_server_start(server_);
// Start processing rpcs.
- if (thread_pool_) {
+ if (cq_sync_) {
+ for (auto& m : methods_) {
+ m.Request(cq_sync_.get());
+ }
+
ScheduleCallback();
}
@@ -126,12 +134,6 @@ void Server::Shutdown() {
}
}
}
-
- // Shutdown the completion queue.
- cq_.Shutdown();
- void *tag = nullptr;
- bool ok = false;
- GPR_ASSERT(false == cq_.Next(&tag, &ok));
}
void Server::ScheduleCallback() {
@@ -144,35 +146,15 @@ void Server::ScheduleCallback() {
void Server::RunRpc() {
// Wait for one more incoming rpc.
- void *tag = nullptr;
- GPR_ASSERT(started_);
- grpc_call *c_call = NULL;
- grpc_call_details call_details;
- grpc_call_details_init(&call_details);
- grpc_metadata_array initial_metadata;
- grpc_metadata_array_init(&initial_metadata);
- CompletionQueue cq;
- grpc_call_error err = grpc_server_request_call(server_, &c_call, &call_details, &initial_metadata, cq.cq(), cq.cq(), nullptr);
- GPR_ASSERT(err == GRPC_CALL_OK);
- bool ok = false;
- GPR_ASSERT(cq_.Next(&tag, &ok));
- if (ok) {
- ServerContext context;
- Call call(c_call, nullptr, &cq);
+ auto* mrd = MethodRequestData::Wait(cq_sync_.get());
+ if (mrd) {
+ MethodRequestData::CallData cd(mrd);
+
+ mrd->Request(cq_sync_.get());
ScheduleCallback();
- RpcServiceMethod *method = nullptr;
- auto iter = method_map_.find(call_details.method);
- if (iter != method_map_.end()) {
- method = iter->second;
- }
- // TODO(ctiller): allocate only if necessary
- std::unique_ptr<google::protobuf::Message> request(method->AllocateRequestProto());
- std::unique_ptr<google::protobuf::Message> response(method->AllocateResponseProto());
- method->handler()->RunHandler(MethodHandler::HandlerParameter(
- &call, &context, request.get(), response.get()));
+
+ cd.Run();
}
- grpc_call_details_destroy(&call_details);
- grpc_metadata_array_destroy(&initial_metadata);
{
std::unique_lock<std::mutex> lock(mu_);