diff options
author | 2015-02-12 11:38:36 -0800 | |
---|---|---|
committer | 2015-02-12 11:38:36 -0800 | |
commit | 8c8d0aa1d881fbcf393a73f99b86ed29a866f8ff (patch) | |
tree | e26e0eb8c3f6fec55a917f1449883cc2da0a2153 /src/cpp | |
parent | bc8e3db73eecec79e5592c1e1723f6b69095e84a (diff) |
Async API progress
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/server/server.cc | 75 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 22 |
2 files changed, 61 insertions, 36 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 90a2863b0c..20dd135a86 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -39,6 +39,7 @@ #include <grpc/support/log.h> #include <grpc++/completion_queue.h> #include <grpc++/impl/rpc_service_method.h> +#include <grpc++/impl/service_type.h> #include <grpc++/server_context.h> #include <grpc++/server_credentials.h> #include <grpc++/thread_pool_interface.h> @@ -47,8 +48,8 @@ namespace grpc { -Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, - ServerCredentials *creds) +Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, + ServerCredentials* creds) : started_(false), shutdown_(false), num_running_cb_(0), @@ -56,7 +57,8 @@ Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, thread_pool_owned_(thread_pool_owned), secure_(creds != nullptr) { if (creds) { - server_ = grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr); + server_ = + grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr); } else { server_ = grpc_server_create(cq_.cq(), nullptr); } @@ -81,10 +83,11 @@ Server::~Server() { } } -bool Server::RegisterService(RpcService *service) { +bool Server::RegisterService(RpcService* service) { for (int i = 0; i < service->GetMethodCount(); ++i) { - RpcServiceMethod *method = service->GetMethod(i); - void *tag = grpc_server_register_method(server_, method->name(), nullptr, cq_.cq()); + RpcServiceMethod* method = service->GetMethod(i); + void* tag = + grpc_server_register_method(server_, method->name(), nullptr, cq_.cq()); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); @@ -95,7 +98,24 @@ bool Server::RegisterService(RpcService *service) { return true; } -int Server::AddPort(const grpc::string &addr) { +bool Server::RegisterAsyncService(AsynchronousService* service) { + GPR_ASSERT(service->server_ == nullptr && "Can only register an asynchronous service against one server."); + service->server_ = this; + service->request_args_.reserve(service->method_count_); + for (size_t i = 0; i < service->method_count_; ++i) { + void* tag = grpc_server_register_method(server_, service->method_names_[i], nullptr, + service->completion_queue()->cq()); + if (!tag) { + gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", + service->method_names_[i]); + return false; + } + service->request_args_.push_back(tag); + } + return true; +} + +int Server::AddPort(const grpc::string& addr) { GPR_ASSERT(!started_); if (secure_) { return grpc_server_add_secure_http2_port(server_, addr.c_str()); @@ -106,7 +126,7 @@ int Server::AddPort(const grpc::string &addr) { class Server::MethodRequestData final : public CompletionQueueTag { public: - MethodRequestData(RpcServiceMethod *method, void *tag) + MethodRequestData(RpcServiceMethod* method, void* tag) : method_(method), tag_(tag), has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || @@ -118,33 +138,33 @@ class Server::MethodRequestData final : public CompletionQueueTag { grpc_metadata_array_init(&request_metadata_); } - static MethodRequestData *Wait(CompletionQueue *cq, bool *ok) { - void *tag = nullptr; + static MethodRequestData* Wait(CompletionQueue* cq, bool* ok) { + void* tag = nullptr; *ok = false; if (!cq->Next(&tag, ok)) { return nullptr; } - auto *mrd = static_cast<MethodRequestData *>(tag); + auto* mrd = static_cast<MethodRequestData*>(tag); GPR_ASSERT(mrd->in_flight_); return mrd; } - void Request(grpc_server *server) { + void Request(grpc_server* server) { GPR_ASSERT(!in_flight_); in_flight_ = true; cq_ = grpc_completion_queue_create(); GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call( server, tag_, &call_, &deadline_, &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, - cq_, this)); + has_request_payload_ ? &request_payload_ : nullptr, cq_, + this)); } - void FinalizeResult(void **tag, bool *status) override {} + void FinalizeResult(void** tag, bool* status) override {} class CallData { public: - explicit CallData(Server *server, MethodRequestData *mrd) + explicit CallData(Server* server, MethodRequestData* mrd) : cq_(mrd->cq_), call_(mrd->call_, server, &cq_), ctx_(mrd->deadline_, mrd->request_metadata_.metadata, @@ -196,21 +216,21 @@ class Server::MethodRequestData final : public CompletionQueueTag { ServerContext ctx_; const bool has_request_payload_; const bool has_response_payload_; - grpc_byte_buffer *request_payload_; - RpcServiceMethod *const method_; + grpc_byte_buffer* request_payload_; + RpcServiceMethod* const method_; }; private: - RpcServiceMethod *const method_; - void *const tag_; + RpcServiceMethod* const method_; + void* const tag_; bool in_flight_ = false; const bool has_request_payload_; const bool has_response_payload_; - grpc_call *call_; + grpc_call* call_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; - grpc_byte_buffer *request_payload_; - grpc_completion_queue *cq_; + grpc_byte_buffer* request_payload_; + grpc_completion_queue* cq_; }; bool Server::Start() { @@ -220,7 +240,7 @@ bool Server::Start() { // Start processing rpcs. if (!methods_.empty()) { - for (auto &m : methods_) { + for (auto& m : methods_) { m.Request(server_); } @@ -246,14 +266,13 @@ void Server::Shutdown() { } } -void Server::PerformOpsOnCall(CallOpBuffer *buf, Call *call) { +void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) { static const size_t MAX_OPS = 8; size_t nops = MAX_OPS; grpc_op ops[MAX_OPS]; buf->FillOps(ops, &nops); GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), ops, nops, - buf)); + grpc_call_start_batch(call->call(), ops, nops, buf)); } void Server::ScheduleCallback() { @@ -267,7 +286,7 @@ void Server::ScheduleCallback() { void Server::RunRpc() { // Wait for one more incoming rpc. bool ok; - auto *mrd = MethodRequestData::Wait(&cq_, &ok); + auto* mrd = MethodRequestData::Wait(&cq_, &ok); if (mrd) { ScheduleCallback(); if (ok) { diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index d6bcb9313a..dd23e929b1 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -43,25 +43,25 @@ namespace grpc { ServerBuilder::ServerBuilder() {} -void ServerBuilder::RegisterService(SynchronousService *service) { +void ServerBuilder::RegisterService(SynchronousService* service) { services_.push_back(service->service()); } -void ServerBuilder::RegisterAsyncService(AsynchronousService *service) { +void ServerBuilder::RegisterAsyncService(AsynchronousService* service) { async_services_.push_back(service); } -void ServerBuilder::AddPort(const grpc::string &addr) { +void ServerBuilder::AddPort(const grpc::string& addr) { ports_.push_back(addr); } void ServerBuilder::SetCredentials( - const std::shared_ptr<ServerCredentials> &creds) { + const std::shared_ptr<ServerCredentials>& creds) { GPR_ASSERT(!creds_); creds_ = creds; } -void ServerBuilder::SetThreadPool(ThreadPoolInterface *thread_pool) { +void ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) { thread_pool_ = thread_pool; } @@ -77,13 +77,19 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { thread_pool_ = new ThreadPool(cores); thread_pool_owned = true; } - std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned, creds_.get())); - for (auto *service : services_) { + std::unique_ptr<Server> server( + new Server(thread_pool_, thread_pool_owned, creds_.get())); + for (auto* service : services_) { if (!server->RegisterService(service)) { return nullptr; } } - for (auto &port : ports_) { + for (auto* service : async_services_) { + if (!server->RegisterAsyncService(service)) { + return nullptr; + } + } + for (auto& port : ports_) { if (!server->AddPort(port)) { return nullptr; } |