diff options
author | Hongyu Chen <hongyu@google.com> | 2015-08-25 14:44:15 -0700 |
---|---|---|
committer | Hongyu Chen <hongyu@google.com> | 2015-08-25 14:44:15 -0700 |
commit | 011ea49592e71e1db3ef43a094aa9b452ff21e67 (patch) | |
tree | 70017b24ed2d3736552025680d4a0f138adf456f /src/cpp/server/server.cc | |
parent | a96ce800a8df5c62ffd264317836ecf3433c4344 (diff) | |
parent | 1b481b64be43bd4c7655b25422344a17b2f198d9 (diff) |
Merge remote-tracking branch 'upstream/master' into timespec
Diffstat (limited to 'src/cpp/server/server.cc')
-rw-r--r-- | src/cpp/server/server.cc | 106 |
1 files changed, 91 insertions, 15 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index e039c07374..66cd27cc33 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -32,24 +32,71 @@ */ #include <grpc++/server.h> + #include <utility> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc++/completion_queue.h> -#include <grpc++/async_generic_service.h> +#include <grpc++/generic/async_generic_service.h> #include <grpc++/impl/rpc_service_method.h> #include <grpc++/impl/service_type.h> #include <grpc++/server_context.h> #include <grpc++/server_credentials.h> -#include <grpc++/thread_pool_interface.h> -#include <grpc++/time.h> +#include <grpc++/support/time.h> #include "src/core/profiling/timers.h" +#include "src/cpp/server/thread_pool_interface.h" namespace grpc { +class Server::UnimplementedAsyncRequestContext { + protected: + UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {} + + GenericServerContext server_context_; + GenericServerAsyncReaderWriter generic_stream_; +}; + +class Server::UnimplementedAsyncRequest GRPC_FINAL + : public UnimplementedAsyncRequestContext, + public GenericAsyncRequest { + public: + UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq) + : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq, + NULL, false), + server_(server), + cq_(cq) {} + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + + ServerContext* context() { return &server_context_; } + GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } + + private: + Server* const server_; + ServerCompletionQueue* const cq_; +}; + +typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> + UnimplementedAsyncResponseOp; +class Server::UnimplementedAsyncResponse GRPC_FINAL + : public UnimplementedAsyncResponseOp { + public: + UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); + ~UnimplementedAsyncResponse() { delete request_; } + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status); + delete this; + return r; + } + + private: + UnimplementedAsyncRequest* const request_; +}; + class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { @@ -297,18 +344,23 @@ int Server::AddListeningPort(const grpc::string& addr, return creds->AddPortToServer(addr, server_); } -bool Server::Start() { +bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { GPR_ASSERT(!started_); started_ = true; grpc_server_start(server_); if (!has_generic_service_) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); - // Use of emplace_back with just constructor arguments is not accepted here - // by gcc-4.4 because it can't match the anonymous nullptr with a proper - // constructor implicitly. Construct the object and use push_back. - sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); + if (!sync_methods_->empty()) { + unknown_method_.reset(new RpcServiceMethod( + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + // Use of emplace_back with just constructor arguments is not accepted + // here by gcc-4.4 because it can't match the anonymous nullptr with a + // proper constructor implicitly. Construct the object and use push_back. + sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); + } + for (size_t i = 0; i < num_cqs; i++) { + new UnimplementedAsyncRequest(this, cqs[i]); + } } // Start processing rpcs. if (!sync_methods_->empty()) { @@ -370,12 +422,14 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { Server::BaseAsyncRequest::BaseAsyncRequest( Server* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, + bool delete_on_finalize) : server_(server), context_(context), stream_(stream), call_cq_(call_cq), tag_(tag), + delete_on_finalize_(delete_on_finalize), call_(nullptr) { memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); } @@ -402,14 +456,16 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { // just the pointers inside call are copied here stream_->BindCall(&call); *tag = tag_; - delete this; + if (delete_on_finalize_) { + delete this; + } return true; } Server::RegisteredAsyncRequest::RegisteredAsyncRequest( Server* server, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) - : BaseAsyncRequest(server, context, stream, call_cq, tag) {} + : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} void Server::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, @@ -423,8 +479,9 @@ void Server::RegisteredAsyncRequest::IssueRequest( Server::GenericAsyncRequest::GenericAsyncRequest( Server* server, GenericServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) - : BaseAsyncRequest(server, context, stream, call_cq, tag) { + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : BaseAsyncRequest(server, context, stream, call_cq, tag, + delete_on_finalize) { grpc_call_details_init(&call_details_); GPR_ASSERT(notification_cq); GPR_ASSERT(call_cq); @@ -445,6 +502,25 @@ bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) { return BaseAsyncRequest::FinalizeResult(tag, status); } +bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, + bool* status) { + if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) { + new UnimplementedAsyncRequest(server_, cq_); + new UnimplementedAsyncResponse(this); + } else { + delete this; + } + return false; +} + +Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( + UnimplementedAsyncRequest* request) + : request_(request) { + Status status(StatusCode::UNIMPLEMENTED, ""); + UnknownMethodHandler::FillOps(request_->context(), this); + request_->stream()->call_.PerformOps(this); +} + void Server::ScheduleCallback() { { grpc::unique_lock<grpc::mutex> lock(mu_); |