aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/client/channel.cc8
-rw-r--r--src/cpp/client/channel.h5
-rw-r--r--src/cpp/client/create_channel.cc4
-rw-r--r--src/cpp/client/secure_credentials.h1
-rw-r--r--src/cpp/common/auth_property_iterator.cc6
-rw-r--r--src/cpp/proto/proto_utils.cc13
-rw-r--r--src/cpp/server/create_default_thread_pool.cc6
-rw-r--r--src/cpp/server/dynamic_thread_pool.cc24
-rw-r--r--src/cpp/server/secure_server_credentials.cc4
-rw-r--r--src/cpp/server/server.cc51
-rw-r--r--src/cpp/server/server_builder.cc21
-rw-r--r--src/cpp/server/server_context.cc7
12 files changed, 97 insertions, 53 deletions
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 0582b59a6d..17f31c22cb 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -71,7 +71,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
} else {
const char* host_str = NULL;
if (!context->authority().empty()) {
- host_str = context->authority().c_str();
+ host_str = context->authority_.c_str();
} else if (!host_.empty()) {
host_str = host_.c_str();
}
@@ -98,9 +98,8 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
}
void* Channel::RegisterMethod(const char* method) {
- return grpc_channel_register_call(c_channel_, method,
- host_.empty() ? NULL : host_.c_str(),
- nullptr);
+ return grpc_channel_register_call(
+ c_channel_, method, host_.empty() ? NULL : host_.c_str(), nullptr);
}
grpc_connectivity_state Channel::GetState(bool try_to_connect) {
@@ -117,6 +116,7 @@ class TagSaver GRPC_FINAL : public CompletionQueueTag {
delete this;
return true;
}
+
private:
void* tag_;
};
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index cb8e8d98d2..7e406ad788 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -58,9 +58,8 @@ class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface {
void* RegisterMethod(const char* method) GRPC_OVERRIDE;
Call CreateCall(const RpcMethod& method, ClientContext* context,
- CompletionQueue* cq) GRPC_OVERRIDE;
- void PerformOpsOnCall(CallOpSetInterface* ops,
- Call* call) GRPC_OVERRIDE;
+ CompletionQueue* cq) GRPC_OVERRIDE;
+ void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
grpc_connectivity_state GetState(bool try_to_connect) GRPC_OVERRIDE;
diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc
index 21d01b739d..5ae772f096 100644
--- a/src/cpp/client/create_channel.cc
+++ b/src/cpp/client/create_channel.cc
@@ -52,6 +52,8 @@ std::shared_ptr<ChannelInterface> CreateChannel(
user_agent_prefix.str());
return creds ? creds->CreateChannel(target, cp_args)
: std::shared_ptr<ChannelInterface>(
- new Channel(grpc_lame_client_channel_create(NULL)));
+ new Channel(grpc_lame_client_channel_create(
+ NULL, GRPC_STATUS_INVALID_ARGUMENT,
+ "Invalid credentials.")));
}
} // namespace grpc
diff --git a/src/cpp/client/secure_credentials.h b/src/cpp/client/secure_credentials.h
index ddf69911b5..c2b8d43a15 100644
--- a/src/cpp/client/secure_credentials.h
+++ b/src/cpp/client/secure_credentials.h
@@ -59,4 +59,3 @@ class SecureCredentials GRPC_FINAL : public Credentials {
} // namespace grpc
#endif // GRPC_INTERNAL_CPP_CLIENT_SECURE_CREDENTIALS_H
-
diff --git a/src/cpp/common/auth_property_iterator.cc b/src/cpp/common/auth_property_iterator.cc
index ba88983515..d3bfd5cb6b 100644
--- a/src/cpp/common/auth_property_iterator.cc
+++ b/src/cpp/common/auth_property_iterator.cc
@@ -64,8 +64,7 @@ AuthPropertyIterator AuthPropertyIterator::operator++(int) {
return tmp;
}
-bool AuthPropertyIterator::operator==(
- const AuthPropertyIterator& rhs) const {
+bool AuthPropertyIterator::operator==(const AuthPropertyIterator& rhs) const {
if (property_ == nullptr || rhs.property_ == nullptr) {
return property_ == rhs.property_;
} else {
@@ -73,8 +72,7 @@ bool AuthPropertyIterator::operator==(
}
}
-bool AuthPropertyIterator::operator!=(
- const AuthPropertyIterator& rhs) const {
+bool AuthPropertyIterator::operator!=(const AuthPropertyIterator& rhs) const {
return !operator==(rhs);
}
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index 63f4a3a0bc..05470ec627 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -154,18 +154,18 @@ class GrpcBufferReader GRPC_FINAL
namespace grpc {
-Status SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
+Status SerializeProto(const grpc::protobuf::Message& msg,
+ grpc_byte_buffer** bp) {
GrpcBufferWriter writer(bp);
return msg.SerializeToZeroCopyStream(&writer)
? Status::OK
- : Status(StatusCode::INVALID_ARGUMENT,
- "Failed to serialize message");
+ : Status(StatusCode::INTERNAL, "Failed to serialize message");
}
Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
int max_message_size) {
if (!buffer) {
- return Status(StatusCode::INVALID_ARGUMENT, "No payload");
+ return Status(StatusCode::INTERNAL, "No payload");
}
GrpcBufferReader reader(buffer);
::grpc::protobuf::io::CodedInputStream decoder(&reader);
@@ -173,11 +173,10 @@ Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
decoder.SetTotalBytesLimit(max_message_size, max_message_size);
}
if (!msg->ParseFromCodedStream(&decoder)) {
- return Status(StatusCode::INVALID_ARGUMENT,
- msg->InitializationErrorString());
+ return Status(StatusCode::INTERNAL, msg->InitializationErrorString());
}
if (!decoder.ConsumedEntireMessage()) {
- return Status(StatusCode::INVALID_ARGUMENT, "Did not read entire message");
+ return Status(StatusCode::INTERNAL, "Did not read entire message");
}
return Status::OK;
}
diff --git a/src/cpp/server/create_default_thread_pool.cc b/src/cpp/server/create_default_thread_pool.cc
index 81c84474d8..9f59d254f1 100644
--- a/src/cpp/server/create_default_thread_pool.cc
+++ b/src/cpp/server/create_default_thread_pool.cc
@@ -39,9 +39,9 @@
namespace grpc {
ThreadPoolInterface* CreateDefaultThreadPool() {
- int cores = gpr_cpu_num_cores();
- if (!cores) cores = 4;
- return new DynamicThreadPool(cores);
+ int cores = gpr_cpu_num_cores();
+ if (!cores) cores = 4;
+ return new DynamicThreadPool(cores);
}
} // namespace grpc
diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc
index f58d0420df..b475f43b1d 100644
--- a/src/cpp/server/dynamic_thread_pool.cc
+++ b/src/cpp/server/dynamic_thread_pool.cc
@@ -36,10 +36,10 @@
#include <grpc++/dynamic_thread_pool.h>
namespace grpc {
-DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool *pool):
- pool_(pool),
- thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, this)) {
-}
+DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
+ : pool_(pool),
+ thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc,
+ this)) {}
DynamicThreadPool::DynamicThread::~DynamicThread() {
thd_->join();
thd_.reset();
@@ -57,7 +57,7 @@ void DynamicThreadPool::DynamicThread::ThreadFunc() {
pool_->shutdown_cv_.notify_one();
}
}
-
+
void DynamicThreadPool::ThreadFunc() {
for (;;) {
// Wait until work is available or we are shutting down.
@@ -65,7 +65,7 @@ void DynamicThreadPool::ThreadFunc() {
if (!shutdown_ && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread
if (threads_waiting_ >= reserve_threads_) {
- break;
+ break;
}
threads_waiting_++;
cv_.wait(lock);
@@ -84,9 +84,11 @@ void DynamicThreadPool::ThreadFunc() {
}
}
-DynamicThreadPool::DynamicThreadPool(int reserve_threads) :
- shutdown_(false), reserve_threads_(reserve_threads), nthreads_(0),
- threads_waiting_(0) {
+DynamicThreadPool::DynamicThreadPool(int reserve_threads)
+ : shutdown_(false),
+ reserve_threads_(reserve_threads),
+ nthreads_(0),
+ threads_waiting_(0) {
for (int i = 0; i < reserve_threads_; i++) {
grpc::lock_guard<grpc::mutex> lock(mu_);
nthreads_++;
@@ -96,10 +98,10 @@ DynamicThreadPool::DynamicThreadPool(int reserve_threads) :
void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) {
- delete *t;
+ delete *t;
}
}
-
+
DynamicThreadPool::~DynamicThreadPool() {
grpc::unique_lock<grpc::mutex> lock(mu_);
shutdown_ = true;
diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc
index 32c45e2280..f203cf7f49 100644
--- a/src/cpp/server/secure_server_credentials.cc
+++ b/src/cpp/server/secure_server_credentials.cc
@@ -35,8 +35,8 @@
namespace grpc {
-int SecureServerCredentials::AddPortToServer(
- const grpc::string& addr, grpc_server* server) {
+int SecureServerCredentials::AddPortToServer(const grpc::string& addr,
+ grpc_server* server) {
return grpc_server_add_secure_http2_port(server, addr.c_str(), creds_);
}
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 90f3854a72..e039c07374 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -90,6 +90,26 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
+ static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
+ gpr_timespec deadline) {
+ void* tag = nullptr;
+ *ok = false;
+ switch (cq->AsyncNext(&tag, ok, deadline)) {
+ case CompletionQueue::TIMEOUT:
+ *req = nullptr;
+ return true;
+ case CompletionQueue::SHUTDOWN:
+ *req = nullptr;
+ return false;
+ case CompletionQueue::GOT_EVENT:
+ *req = static_cast<SyncRequest*>(tag);
+ GPR_ASSERT((*req)->in_flight_);
+ return true;
+ }
+ gpr_log(GPR_ERROR, "Should never reach here");
+ abort();
+ }
+
void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
@@ -230,18 +250,17 @@ Server::~Server() {
delete sync_methods_;
}
-bool Server::RegisterService(const grpc::string *host, RpcService* service) {
+bool Server::RegisterService(const grpc::string* host, RpcService* service) {
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod* method = service->GetMethod(i);
- void* tag = grpc_server_register_method(
- server_, method->name(), host ? host->c_str() : nullptr);
+ void* tag = grpc_server_register_method(server_, method->name(),
+ host ? host->c_str() : nullptr);
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
return false;
}
- SyncRequest request(method, tag);
- sync_methods_->emplace_back(request);
+ sync_methods_->emplace_back(method, tag);
}
return true;
}
@@ -286,7 +305,10 @@ bool Server::Start() {
if (!has_generic_service_) {
unknown_method_.reset(new RpcServiceMethod(
"unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
- sync_methods_->emplace_back(unknown_method_.get(), nullptr);
+ // Use of emplace_back with just constructor arguments is not accepted here
+ // by gcc-4.4 because it can't match the anonymous nullptr with a proper
+ // constructor implicitly. Construct the object and use push_back.
+ sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
}
// Start processing rpcs.
if (!sync_methods_->empty()) {
@@ -301,12 +323,27 @@ bool Server::Start() {
return true;
}
-void Server::Shutdown() {
+void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
+ // Spin, eating requests until the completion queue is completely shutdown.
+ // If the deadline expires then cancel anything that's pending and keep
+ // spinning forever until the work is actually drained.
+ // Since nothing else needs to touch state guarded by mu_, holding it
+ // through this loop is fine.
+ SyncRequest* request;
+ bool ok;
+ while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
+ if (request == NULL) { // deadline expired
+ grpc_server_cancel_all_calls(server_);
+ deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ } else if (ok) {
+ SyncRequest::CallData call_data(this, request);
+ }
+ }
// Wait for running callbacks to finish.
while (num_running_cb_ != 0) {
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 09118879f4..0b11d86173 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -59,14 +59,16 @@ void ServerBuilder::RegisterAsyncService(AsynchronousService* service) {
async_services_.emplace_back(new NamedService<AsynchronousService>(service));
}
-void ServerBuilder::RegisterService(
- const grpc::string& addr, SynchronousService* service) {
- services_.emplace_back(new NamedService<RpcService>(addr, service->service()));
+void ServerBuilder::RegisterService(const grpc::string& addr,
+ SynchronousService* service) {
+ services_.emplace_back(
+ new NamedService<RpcService>(addr, service->service()));
}
-void ServerBuilder::RegisterAsyncService(
- const grpc::string& addr, AsynchronousService* service) {
- async_services_.emplace_back(new NamedService<AsynchronousService>(addr, service));
+void ServerBuilder::RegisterAsyncService(const grpc::string& addr,
+ AsynchronousService* service) {
+ async_services_.emplace_back(
+ new NamedService<AsynchronousService>(addr, service));
}
void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
@@ -119,9 +121,10 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
- for (auto service = async_services_.begin();
- service != async_services_.end(); service++) {
- if (!server->RegisterAsyncService((*service)->host.get(), (*service)->service)) {
+ for (auto service = async_services_.begin(); service != async_services_.end();
+ service++) {
+ if (!server->RegisterAsyncService((*service)->host.get(),
+ (*service)->service)) {
return nullptr;
}
}
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index bb34040a2f..03461ddda5 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -50,7 +50,12 @@ namespace grpc {
class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- CompletionOp() : has_tag_(false), tag_(nullptr), refs_(2), finalized_(false), cancelled_(0) {}
+ CompletionOp()
+ : has_tag_(false),
+ tag_(nullptr),
+ refs_(2),
+ finalized_(false),
+ cancelled_(0) {}
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;