diff options
Diffstat (limited to 'src/cpp/server/server_cc.cc')
-rw-r--r-- | src/cpp/server/server_cc.cc | 48 |
1 files changed, 37 insertions, 11 deletions
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index c031528a8f..0a51cf5626 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -58,6 +58,9 @@ namespace { // max-threads set) to the server builder. #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX +// How many callback requests of each method should we pre-register at start +#define DEFAULT_CALLBACK_REQS_PER_METHOD 32 + class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: ~DefaultGlobalCallbacks() override {} @@ -81,10 +84,7 @@ class ShutdownTag : public internal::CompletionQueueTag { class DummyTag : public internal::CompletionQueueTag { public: - bool FinalizeResult(void** tag, bool* status) { - *status = true; - return true; - } + bool FinalizeResult(void** tag, bool* status) { return true; } }; class UnimplementedAsyncRequestContext { @@ -199,9 +199,21 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { } } + void PostShutdownCleanup() { + if (call_) { + grpc_call_unref(call_); + call_ = nullptr; + } + if (cq_) { + grpc_completion_queue_destroy(cq_); + cq_ = nullptr; + } + } + bool FinalizeResult(void** tag, bool* status) override { if (!*status) { grpc_completion_queue_destroy(cq_); + cq_ = nullptr; } if (call_details_) { deadline_ = call_details_->deadline; @@ -586,7 +598,17 @@ class Server::SyncRequestThreadManager : public ThreadManager { void* tag; bool ok; while (server_cq_->Next(&tag, &ok)) { - // Do nothing + if (ok) { + // If a request was pulled off the queue, it means that the thread + // handling the request added it to the completion queue after shutdown + // was called - because the thread had already started and checked the + // shutdown flag before shutdown was called. In this case, we simply + // clean it up here, *after* calling wait on all the worker threads, at + // which point we are certain no in-flight requests will add more to the + // queue. This fixes an intermittent memory leak on shutdown. + SyncRequest* sync_req = static_cast<SyncRequest*>(tag); + sync_req->PostShutdownCleanup(); + } } } @@ -707,14 +729,15 @@ std::shared_ptr<Channel> Server::InProcessChannel( grpc_channel_args channel_args = args.c_channel_args(); return CreateChannelInternal( "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr), - nullptr); + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>()); } std::shared_ptr<Channel> Server::experimental_type::InProcessChannelWithInterceptors( const ChannelArguments& args, - std::unique_ptr<std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>> + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> interceptor_creators) { grpc_channel_args channel_args = args.c_channel_args(); return CreateChannelInternal( @@ -769,9 +792,12 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { (*it)->AddSyncMethod(method, method_registration_tag); } } else { - // a callback method - auto* req = new CallbackRequest(this, method, method_registration_tag); - callback_reqs_.emplace_back(req); + // a callback method. Register at least some callback requests + // TODO(vjpai): Register these dynamically based on need + for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) { + auto* req = new CallbackRequest(this, method, method_registration_tag); + callback_reqs_.emplace_back(req); + } // Enqueue it so that it will be Request'ed later once // all request matchers are created at core server startup } |