aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/client/channel_cc.cc2
-rw-r--r--src/cpp/client/client_context.cc2
-rw-r--r--src/cpp/client/generic_stub.cc2
-rw-r--r--src/cpp/common/core_codegen.cc25
-rw-r--r--src/cpp/common/version_cc.cc2
-rw-r--r--src/cpp/server/server_builder.cc37
-rw-r--r--src/cpp/server/server_cc.cc40
-rw-r--r--src/cpp/server/server_context.cc23
-rw-r--r--src/cpp/thread_manager/thread_manager.cc114
-rw-r--r--src/cpp/thread_manager/thread_manager.h12
10 files changed, 143 insertions, 116 deletions
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index c985183ae7..fac1ba9d43 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -131,7 +131,7 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = 0;
grpc_op cops[MAX_OPS];
- ops->FillOps(cops, &nops);
+ ops->FillOps(call->call(), cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call->call(), cops, nops, ops, nullptr));
}
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 3d884cf62e..2516232840 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -74,7 +74,7 @@ ClientContext::ClientContext()
ClientContext::~ClientContext() {
if (call_) {
- grpc_call_destroy(call_);
+ grpc_call_unref(call_);
}
g_client_callbacks->Destructor(this);
}
diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc
index 7a2fdf941c..4adb2a5359 100644
--- a/src/cpp/client/generic_stub.cc
+++ b/src/cpp/client/generic_stub.cc
@@ -42,7 +42,7 @@ std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call(
ClientContext* context, const grpc::string& method, CompletionQueue* cq,
void* tag) {
return std::unique_ptr<GenericClientAsyncReaderWriter>(
- new GenericClientAsyncReaderWriter(
+ GenericClientAsyncReaderWriter::Create(
channel_.get(), cq,
RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag));
}
diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc
index 43dffe7a2a..f679a33945 100644
--- a/src/cpp/common/core_codegen.cc
+++ b/src/cpp/common/core_codegen.cc
@@ -54,9 +54,26 @@ struct grpc_byte_buffer;
namespace grpc {
+const grpc_completion_queue_factory*
+CoreCodegen::grpc_completion_queue_factory_lookup(
+ const grpc_completion_queue_attributes* attributes) {
+ return ::grpc_completion_queue_factory_lookup(attributes);
+}
+
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create(
+ const grpc_completion_queue_factory* factory,
+ const grpc_completion_queue_attributes* attributes, void* reserved) {
+ return ::grpc_completion_queue_create(factory, attributes, reserved);
+}
+
+grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_next(
void* reserved) {
- return ::grpc_completion_queue_create(reserved);
+ return ::grpc_completion_queue_create_for_next(reserved);
+}
+
+grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_pluck(
+ void* reserved) {
+ return ::grpc_completion_queue_create_for_pluck(reserved);
}
void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) {
@@ -91,6 +108,12 @@ void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
::grpc_byte_buffer_destroy(bb);
}
+void CoreCodegen::grpc_call_ref(grpc_call* call) { ::grpc_call_ref(call); }
+void CoreCodegen::grpc_call_unref(grpc_call* call) { ::grpc_call_unref(call); }
+void* CoreCodegen::grpc_call_arena_alloc(grpc_call* call, size_t length) {
+ return ::grpc_call_arena_alloc(call, length);
+}
+
int CoreCodegen::grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) {
return ::grpc_byte_buffer_reader_init(reader, buffer);
diff --git a/src/cpp/common/version_cc.cc b/src/cpp/common/version_cc.cc
index 16a295de2e..72a4c4cf94 100644
--- a/src/cpp/common/version_cc.cc
+++ b/src/cpp/common/version_cc.cc
@@ -37,5 +37,5 @@
#include <grpc++/grpc++.h>
namespace grpc {
-grpc::string Version() { return "1.3.0-pre1"; }
+grpc::string Version() { return "1.4.0-dev"; }
}
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 4eb4b5a1b2..2ead048a1f 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -83,7 +83,8 @@ ServerBuilder::~ServerBuilder() {
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
bool is_frequently_polled) {
- ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled);
+ ServerCompletionQueue* cq = new ServerCompletionQueue(
+ is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING);
cqs_.push_back(cq);
return std::unique_ptr<ServerCompletionQueue>(cq);
}
@@ -242,6 +243,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
sync_server_cqs(std::make_shared<
std::vector<std::unique_ptr<ServerCompletionQueue>>>());
+ int num_frequently_polled_cqs = 0;
+ for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
+ if ((*it)->IsFrequentlyPolled()) {
+ num_frequently_polled_cqs++;
+ }
+ }
+
+ const bool is_hybrid_server =
+ has_sync_methods && num_frequently_polled_cqs > 0;
+
if (has_sync_methods) {
// This is a Sync server
gpr_log(GPR_INFO,
@@ -251,9 +262,12 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
sync_server_settings_.max_pollers,
sync_server_settings_.cq_timeout_msec);
+ grpc_cq_polling_type polling_type =
+ is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;
+
// Create completion queues to listen to incoming rpc requests
for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
- sync_server_cqs->emplace_back(new ServerCompletionQueue());
+ sync_server_cqs->emplace_back(new ServerCompletionQueue(polling_type));
}
}
@@ -269,12 +283,10 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
// server
// 2. cqs_: Completion queues added via AddCompletionQueue() call
- // All sync cqs (if any) are frequently polled by ThreadManager
- int num_frequently_polled_cqs = sync_server_cqs->size();
-
for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
+ num_frequently_polled_cqs++;
}
// cqs_ contains the completion queue added by calling the ServerBuilder's
@@ -283,14 +295,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
// listening to incoming channels. Such completion queues must be registered
// as non-listening queues
for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
- if ((*it)->IsFrequentlyPolled()) {
- grpc_server_register_completion_queue(server->server_, (*it)->cq(),
- nullptr);
- num_frequently_polled_cqs++;
- } else {
- grpc_server_register_non_listening_completion_queue(server->server_,
- (*it)->cq(), nullptr);
- }
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
+ nullptr);
}
if (num_frequently_polled_cqs == 0) {
@@ -337,10 +343,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}
auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
- if (!server->Start(cqs_data, cqs_.size())) {
- if (added_port) server->Shutdown();
- return nullptr;
- }
+ server->Start(cqs_data, cqs_.size());
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
(*plugin)->Finish(initializer);
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index ce173a1ee2..b4d6961db8 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -124,6 +124,14 @@ class ShutdownTag : public CompletionQueueTag {
bool FinalizeResult(void** tag, bool* status) { return false; }
};
+class DummyTag : public CompletionQueueTag {
+ public:
+ bool FinalizeResult(void** tag, bool* status) {
+ *status = true;
+ return true;
+ }
+};
+
class Server::SyncRequest final : public CompletionQueueTag {
public:
SyncRequest(RpcServiceMethod* method, void* tag)
@@ -145,7 +153,7 @@ class Server::SyncRequest final : public CompletionQueueTag {
grpc_metadata_array_destroy(&request_metadata_);
}
- void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
+ void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); }
void TeardownRequest() {
grpc_completion_queue_destroy(cq_);
@@ -213,10 +221,15 @@ class Server::SyncRequest final : public CompletionQueueTag {
MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_));
global_callbacks->PostSynchronousRequest(&ctx_);
request_payload_ = nullptr;
- void* ignored_tag;
- bool ignored_ok;
+
cq_.Shutdown();
- GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
+
+ CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
+ cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
+
+ /* Ensure the cq_ is shutdown */
+ DummyTag ignored_tag;
+ GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
}
private:
@@ -315,14 +328,18 @@ class Server::SyncRequestThreadManager : public ThreadManager {
}
}
- void ShutdownAndDrainCompletionQueue() {
+ void Shutdown() override {
server_cq_->Shutdown();
+ ThreadManager::Shutdown();
+ }
+ void Wait() override {
+ ThreadManager::Wait();
// Drain any pending items from the queue
void* tag;
bool ok;
while (server_cq_->Next(&tag, &ok)) {
- // Nothing to be done here
+ // Do nothing
}
}
@@ -402,7 +419,7 @@ Server::~Server() {
} else if (!started_) {
// Shutdown the completion queues
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
- (*it)->ShutdownAndDrainCompletionQueue();
+ (*it)->Shutdown();
}
}
}
@@ -490,11 +507,11 @@ int Server::AddListeningPort(const grpc::string& addr,
ServerCredentials* creds) {
GPR_ASSERT(!started_);
int port = creds->AddPortToServer(addr, server_);
- global_callbacks_->AddPort(this, port);
+ global_callbacks_->AddPort(this, addr, creds, port);
return port;
}
-bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
+void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
GPR_ASSERT(!started_);
global_callbacks_->PreServerStart(this);
started_ = true;
@@ -530,8 +547,6 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Start();
}
-
- return true;
}
void Server::ShutdownInternal(gpr_timespec deadline) {
@@ -568,7 +583,6 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Wait for threads in all ThreadManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
- (*it)->ShutdownAndDrainCompletionQueue();
}
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
@@ -593,7 +607,7 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = 0;
grpc_op cops[MAX_OPS];
- ops->FillOps(cops, &nops);
+ ops->FillOps(call->call(), cops, &nops);
auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
GPR_ASSERT(GRPC_CALL_OK == result);
}
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 3a408eb23e..923556413e 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -62,7 +62,7 @@ class ServerContext::CompletionOp final : public CallOpSetInterface {
finalized_(false),
cancelled_(0) {}
- void FillOps(grpc_op* ops, size_t* nops) override;
+ void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override;
bool FinalizeResult(void** tag, bool* status) override;
bool CheckCancelled(CompletionQueue* cq) {
@@ -100,7 +100,8 @@ void ServerContext::CompletionOp::Unref() {
}
}
-void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
+void ServerContext::CompletionOp::FillOps(grpc_call* call, grpc_op* ops,
+ size_t* nops) {
ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops->data.recv_close_on_server.cancelled = &cancelled_;
ops->flags = 0;
@@ -151,7 +152,7 @@ ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr)
ServerContext::~ServerContext() {
if (call_) {
- grpc_call_destroy(call_);
+ grpc_call_unref(call_);
}
if (completion_op_) {
completion_op_->Unref();
@@ -167,6 +168,10 @@ void ServerContext::BeginCompletionOp(Call* call) {
call->PerformOps(completion_op_);
}
+CompletionQueueTag* ServerContext::GetCompletionOpTag() {
+ return static_cast<CompletionQueueTag*>(completion_op_);
+}
+
void ServerContext::AddInitialMetadata(const grpc::string& key,
const grpc::string& value) {
initial_metadata_.insert(std::make_pair(key, value));
@@ -225,17 +230,9 @@ const struct census_context* ServerContext::census_context() const {
void ServerContext::SetLoadReportingCosts(
const std::vector<grpc::string>& cost_data) {
if (call_ == nullptr) return;
- grpc_load_reporting_cost_context* cost_ctx =
- static_cast<grpc_load_reporting_cost_context*>(
- gpr_malloc(sizeof(*cost_ctx)));
- cost_ctx->values_count = cost_data.size();
- cost_ctx->values = static_cast<grpc_slice*>(
- gpr_malloc(sizeof(*cost_ctx->values) * cost_ctx->values_count));
- for (size_t i = 0; i < cost_ctx->values_count; ++i) {
- cost_ctx->values[i] =
- grpc_slice_from_copied_buffer(cost_data[i].data(), cost_data[i].size());
+ for (const auto& cost_datum : cost_data) {
+ AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
}
- grpc_call_set_load_reporting_cost_context(call_, cost_ctx);
}
} // namespace grpc
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc
index 1450d009e4..a463a4388a 100644
--- a/src/cpp/thread_manager/thread_manager.cc
+++ b/src/cpp/thread_manager/thread_manager.cc
@@ -98,80 +98,78 @@ void ThreadManager::MarkAsCompleted(WorkerThread* thd) {
}
void ThreadManager::CleanupCompletedThreads() {
- std::unique_lock<std::mutex> lock(list_mu_);
- for (auto thd = completed_threads_.begin(); thd != completed_threads_.end();
- thd = completed_threads_.erase(thd)) {
- delete *thd;
+ std::list<WorkerThread*> completed_threads;
+ {
+ // swap out the completed threads list: allows other threads to clean up
+ // more quickly
+ std::unique_lock<std::mutex> lock(list_mu_);
+ completed_threads.swap(completed_threads_);
}
+ for (auto thd : completed_threads) delete thd;
}
void ThreadManager::Initialize() {
- for (int i = 0; i < min_pollers_; i++) {
- MaybeCreatePoller();
- }
-}
-
-// If the number of pollers (i.e threads currently blocked in PollForWork()) is
-// less than max threshold (i.e max_pollers_) and the total number of threads is
-// below the maximum threshold, we can let the current thread continue as poller
-bool ThreadManager::MaybeContinueAsPoller() {
- std::unique_lock<std::mutex> lock(mu_);
- if (shutdown_ || num_pollers_ > max_pollers_) {
- return false;
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ num_pollers_ = min_pollers_;
+ num_threads_ = min_pollers_;
}
- num_pollers_++;
- return true;
-}
-
-// Create a new poller if the current number of pollers i.e num_pollers_ (i.e
-// threads currently blocked in PollForWork()) is below the threshold (i.e
-// min_pollers_) and the total number of threads is below the maximum threshold
-void ThreadManager::MaybeCreatePoller() {
- std::unique_lock<std::mutex> lock(mu_);
- if (!shutdown_ && num_pollers_ < min_pollers_) {
- num_pollers_++;
- num_threads_++;
-
+ for (int i = 0; i < min_pollers_; i++) {
// Create a new thread (which ends up calling the MainWorkLoop() function
new WorkerThread(this);
}
}
void ThreadManager::MainWorkLoop() {
- void* tag;
- bool ok;
-
- /*
- 1. Poll for work (i.e PollForWork())
- 2. After returning from PollForWork, reduce the number of pollers by 1. If
- PollForWork() returned a TIMEOUT, then it may indicate that we have more
- polling threads than needed. Check if the number of pollers is greater
- than min_pollers and if so, terminate the thread.
- 3. Since we are short of one poller now, see if a new poller has to be
- created (i.e see MaybeCreatePoller() for more details)
- 4. Do the actual work (DoWork())
- 5. After doing the work, see it this thread can resume polling work (i.e
- see MaybeContinueAsPoller() for more details) */
- do {
+ while (true) {
+ void* tag;
+ bool ok;
WorkStatus work_status = PollForWork(&tag, &ok);
- {
- std::unique_lock<std::mutex> lock(mu_);
- num_pollers_--;
-
- if (work_status == TIMEOUT && num_pollers_ > min_pollers_) {
+ std::unique_lock<std::mutex> lock(mu_);
+ // Reduce the number of pollers by 1 and check what happened with the poll
+ num_pollers_--;
+ bool done = false;
+ switch (work_status) {
+ case TIMEOUT:
+ // If we timed out and we have more pollers than we need (or we are
+ // shutdown), finish this thread
+ if (shutdown_ || num_pollers_ > max_pollers_) done = true;
+ break;
+ case SHUTDOWN:
+ // If the thread manager is shutdown, finish this thread
+ done = true;
+ break;
+ case WORK_FOUND:
+ // If we got work and there are now insufficient pollers, start a new
+ // one
+ if (!shutdown_ && num_pollers_ < min_pollers_) {
+ num_pollers_++;
+ num_threads_++;
+ // Drop lock before spawning thread to avoid contention
+ lock.unlock();
+ new WorkerThread(this);
+ } else {
+ // Drop lock for consistency with above branch
+ lock.unlock();
+ }
+ // Lock is always released at this point - do the application work
+ DoWork(tag, ok);
+ // Take the lock again to check post conditions
+ lock.lock();
+ // If we're shutdown, we should finish at this point.
+ if (shutdown_) done = true;
break;
- }
- }
-
- // Note that MaybeCreatePoller does check for shutdown and creates a new
- // thread only if ThreadManager is not shutdown
- if (work_status == WORK_FOUND) {
- MaybeCreatePoller();
- DoWork(tag, ok);
}
- } while (MaybeContinueAsPoller());
+ // If we decided to finish the thread, break out of the while loop
+ if (done) break;
+ // ... otherwise increase poller count and continue
+ // There's a chance that we'll exceed the max poller count: that is
+ // explicitly ok - we'll decrease after one poll timeout, and prevent
+ // some thrashing starting up and shutting down threads
+ num_pollers_++;
+ };
CleanupCompletedThreads();
diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h
index 9c0569c62c..d1050f6ded 100644
--- a/src/cpp/thread_manager/thread_manager.h
+++ b/src/cpp/thread_manager/thread_manager.h
@@ -89,14 +89,14 @@ class ThreadManager {
// Mark the ThreadManager as shutdown and begin draining the work. This is a
// non-blocking call and the caller should call Wait(), a blocking call which
// returns only once the shutdown is complete
- void Shutdown();
+ virtual void Shutdown();
// Has Shutdown() been called
bool IsShutdown();
// A blocking call that returns only after the ThreadManager has shutdown and
// all the threads have drained all the outstanding work
- void Wait();
+ virtual void Wait();
private:
// Helper wrapper class around std::thread. This takes a ThreadManager object
@@ -122,14 +122,6 @@ class ThreadManager {
// The main funtion in ThreadManager
void MainWorkLoop();
- // Create a new poller if the number of current pollers is less than the
- // minimum number of pollers needed (i.e min_pollers).
- void MaybeCreatePoller();
-
- // Returns true if the current thread can resume as a poller. i.e if the
- // current number of pollers is less than the max_pollers.
- bool MaybeContinueAsPoller();
-
void MarkAsCompleted(WorkerThread* thd);
void CleanupCompletedThreads();