diff options
Diffstat (limited to 'src/cpp/server/server.cc')
-rw-r--r-- | src/cpp/server/server.cc | 152 |
1 files changed, 121 insertions, 31 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index ab87b22f5f..dbe60e50de 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -50,6 +50,52 @@ namespace grpc { +class Server::UnimplementedAsyncRequestContext { + protected: + UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {} + + GenericServerContext server_context_; + GenericServerAsyncReaderWriter generic_stream_; +}; + +class Server::UnimplementedAsyncRequest GRPC_FINAL + : public UnimplementedAsyncRequestContext, + public GenericAsyncRequest { + public: + UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq) + : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq, + NULL, false), + server_(server), + cq_(cq) {} + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + + ServerContext* context() { return &server_context_; } + GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } + + private: + Server* const server_; + ServerCompletionQueue* const cq_; +}; + +typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> + UnimplementedAsyncResponseOp; +class Server::UnimplementedAsyncResponse GRPC_FINAL + : public UnimplementedAsyncResponseOp { + public: + UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); + ~UnimplementedAsyncResponse() { delete request_; } + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status); + delete this; + return r; + } + + private: + UnimplementedAsyncRequest* const request_; +}; + class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { @@ -67,11 +113,17 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || method->method_type() == RpcMethod::SERVER_STREAMING), + call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); } - ~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); } + ~SyncRequest() { + if (call_details_) { + delete call_details_; + } + grpc_metadata_array_destroy(&request_metadata_); + } static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { void* tag = nullptr; @@ -84,7 +136,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } - void SetupRequest() { cq_ = grpc_completion_queue_create(); } + void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void TeardownRequest() { grpc_completion_queue_destroy(cq_); @@ -94,17 +146,32 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; - GPR_ASSERT(GRPC_CALL_OK == - grpc_server_request_registered_call( - server, tag_, &call_, &deadline_, &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, cq_, - notify_cq, this)); + if (tag_) { + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_registered_call( + server, tag_, &call_, &deadline_, &request_metadata_, + has_request_payload_ ? &request_payload_ : nullptr, cq_, + notify_cq, this)); + } else { + if (!call_details_) { + call_details_ = new grpc_call_details; + grpc_call_details_init(call_details_); + } + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( + server, &call_, call_details_, + &request_metadata_, cq_, notify_cq, this)); + } } bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { if (!*status) { grpc_completion_queue_destroy(cq_); } + if (call_details_) { + deadline_ = call_details_->deadline; + grpc_call_details_destroy(call_details_); + grpc_call_details_init(call_details_); + } return true; } @@ -157,6 +224,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { bool in_flight_; const bool has_request_payload_; grpc_call* call_; + grpc_call_details* call_details_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; grpc_byte_buffer* request_payload_; @@ -170,9 +238,9 @@ static grpc_server* CreateServer(int max_message_size) { arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); arg.value.integer = max_message_size; grpc_channel_args args = {1, &arg}; - return grpc_server_create(&args); + return grpc_server_create(&args, nullptr); } else { - return grpc_server_create(nullptr); + return grpc_server_create(nullptr, nullptr); } } @@ -183,10 +251,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, shutdown_(false), num_running_cb_(0), sync_methods_(new std::list<SyncRequest>), + has_generic_service_(false), server_(CreateServer(max_message_size)), thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) { - grpc_server_register_completion_queue(server_, cq_.cq()); + grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); } Server::~Server() { @@ -207,23 +276,23 @@ Server::~Server() { delete sync_methods_; } -bool Server::RegisterService(const grpc::string *host, RpcService* service) { +bool Server::RegisterService(const grpc::string* host, RpcService* service) { for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod* method = service->GetMethod(i); - void* tag = grpc_server_register_method( - server_, method->name(), host ? host->c_str() : nullptr); + void* tag = grpc_server_register_method(server_, method->name(), + host ? host->c_str() : nullptr); if (!tag) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); return false; } - SyncRequest request(method, tag); - sync_methods_->emplace_back(request); + sync_methods_->emplace_back(method, tag); } return true; } -bool Server::RegisterAsyncService(const grpc::string *host, AsynchronousService* service) { +bool Server::RegisterAsyncService(const grpc::string* host, + AsynchronousService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an asynchronous service against one server."); service->server_ = this; @@ -245,6 +314,7 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an async generic service against one server."); service->server_ = this; + has_generic_service_ = true; } int Server::AddListeningPort(const grpc::string& addr, @@ -253,19 +323,15 @@ int Server::AddListeningPort(const grpc::string& addr, return creds->AddPortToServer(addr, server_); } -bool Server::Start() { +bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { GPR_ASSERT(!started_); started_ = true; grpc_server_start(server_); - // Start processing rpcs. - if (!sync_methods_->empty()) { - for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { - m->SetupRequest(); - m->Request(server_, cq_.cq()); + if (!has_generic_service_) { + for (size_t i = 0; i < num_cqs; i++) { + new UnimplementedAsyncRequest(this, cqs[i]); } - - ScheduleCallback(); } return true; @@ -297,18 +363,20 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { size_t nops = 0; grpc_op cops[MAX_OPS]; ops->FillOps(cops, &nops); - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), cops, nops, ops)); + auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); + GPR_ASSERT(GRPC_CALL_OK == result); } Server::BaseAsyncRequest::BaseAsyncRequest( Server* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, + bool delete_on_finalize) : server_(server), context_(context), stream_(stream), call_cq_(call_cq), tag_(tag), + delete_on_finalize_(delete_on_finalize), call_(nullptr) { memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); } @@ -335,14 +403,16 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { // just the pointers inside call are copied here stream_->BindCall(&call); *tag = tag_; - delete this; + if (delete_on_finalize_) { + delete this; + } return true; } Server::RegisteredAsyncRequest::RegisteredAsyncRequest( Server* server, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) - : BaseAsyncRequest(server, context, stream, call_cq, tag) {} + : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} void Server::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, @@ -356,8 +426,9 @@ void Server::RegisteredAsyncRequest::IssueRequest( Server::GenericAsyncRequest::GenericAsyncRequest( Server* server, GenericServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) - : BaseAsyncRequest(server, context, stream, call_cq, tag) { + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : BaseAsyncRequest(server, context, stream, call_cq, tag, + delete_on_finalize) { grpc_call_details_init(&call_details_); GPR_ASSERT(notification_cq); GPR_ASSERT(call_cq); @@ -378,6 +449,25 @@ bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) { return BaseAsyncRequest::FinalizeResult(tag, status); } +bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, + bool* status) { + if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) { + new UnimplementedAsyncRequest(server_, cq_); + new UnimplementedAsyncResponse(this); + } else { + delete this; + } + return false; +} + +Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( + UnimplementedAsyncRequest* request) + : request_(request) { + Status status(StatusCode::UNIMPLEMENTED, ""); + UnknownMethodHandler::FillOps(request_->context(), this); + request_->stream()->call_.PerformOps(this); +} + void Server::ScheduleCallback() { { grpc::unique_lock<grpc::mutex> lock(mu_); |