diff options
Diffstat (limited to 'src/cpp/server')
-rw-r--r-- | src/cpp/server/server.cc | 64 |
1 files changed, 15 insertions, 49 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index d8ea375636..eb2a984993 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -90,26 +90,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } - static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok, - gpr_timespec deadline) { - void* tag = nullptr; - *ok = false; - switch (cq->AsyncNext(&tag, ok, deadline)) { - case CompletionQueue::TIMEOUT: - *req = nullptr; - return true; - case CompletionQueue::SHUTDOWN: - *req = nullptr; - return false; - case CompletionQueue::GOT_EVENT: - *req = static_cast<SyncRequest*>(tag); - GPR_ASSERT((*req)->in_flight_); - return true; - } - gpr_log(GPR_ERROR, "Should never reach here"); - abort(); - } - void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void TeardownRequest() { @@ -207,21 +187,22 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { static grpc_server* CreateServer( int max_message_size, const grpc_compression_options& compression_options) { + grpc_arg args[2]; + size_t args_idx = 0; if (max_message_size > 0) { - grpc_arg args[2]; - args[0].type = GRPC_ARG_INTEGER; - args[0].key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); - args[0].value.integer = max_message_size; - - args[1].type = GRPC_ARG_INTEGER; - args[1].key = const_cast<char*>(GRPC_COMPRESSION_ALGORITHM_STATE_ARG); - args[1].value.integer = compression_options.enabled_algorithms_bitset; - - grpc_channel_args channel_args = {2, args}; - return grpc_server_create(&channel_args, NULL); - } else { - return grpc_server_create(nullptr, nullptr); + args[args_idx].type = GRPC_ARG_INTEGER; + args[args_idx].key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); + args[args_idx].value.integer = max_message_size; + args_idx++; } + + args[args_idx].type = GRPC_ARG_INTEGER; + args[args_idx].key = const_cast<char*>(GRPC_COMPRESSION_ALGORITHM_STATE_ARG); + args[args_idx].value.integer = compression_options.enabled_algorithms_bitset; + args_idx++; + + grpc_channel_args channel_args = {args_idx, args}; + return grpc_server_create(&channel_args, nullptr); } Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, @@ -330,27 +311,12 @@ bool Server::Start() { return true; } -void Server::ShutdownInternal(gpr_timespec deadline) { +void Server::Shutdown() { grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { shutdown_ = true; grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); cq_.Shutdown(); - // Spin, eating requests until the completion queue is completely shutdown. - // If the deadline expires then cancel anything that's pending and keep - // spinning forever until the work is actually drained. - // Since nothing else needs to touch state guarded by mu_, holding it - // through this loop is fine. - SyncRequest* request; - bool ok; - while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { - if (request == NULL) { // deadline expired - grpc_server_cancel_all_calls(server_); - deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); - } else if (ok) { - SyncRequest::CallData call_data(this, request); - } - } // Wait for running callbacks to finish. while (num_running_cb_ != 0) { |