aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/client/channel.cc23
-rw-r--r--src/cpp/client/channel_arguments.cc3
-rw-r--r--src/cpp/client/client_context.cc4
-rw-r--r--src/cpp/client/create_channel.cc17
-rw-r--r--src/cpp/client/create_channel_internal.cc (renamed from src/cpp/client/internal_stub.cc)14
-rw-r--r--src/cpp/client/create_channel_internal.h51
-rw-r--r--src/cpp/client/generic_stub.cc5
-rw-r--r--src/cpp/client/insecure_credentials.cc18
-rw-r--r--src/cpp/client/secure_channel_arguments.cc4
-rw-r--r--src/cpp/client/secure_credentials.cc12
-rw-r--r--src/cpp/client/secure_credentials.h5
-rw-r--r--src/cpp/common/auth_property_iterator.cc8
-rw-r--r--src/cpp/common/call.cc5
-rw-r--r--src/cpp/common/completion_queue.cc2
-rw-r--r--src/cpp/common/create_auth_context.h2
-rw-r--r--src/cpp/common/insecure_create_auth_context.cc2
-rw-r--r--src/cpp/common/secure_auth_context.h2
-rw-r--r--src/cpp/common/secure_create_auth_context.cc2
-rw-r--r--src/cpp/proto/proto_utils.cc15
-rw-r--r--src/cpp/server/async_generic_service.cc2
-rw-r--r--src/cpp/server/create_default_thread_pool.cc9
-rw-r--r--src/cpp/server/dynamic_thread_pool.cc27
-rw-r--r--src/cpp/server/dynamic_thread_pool.h (renamed from src/cpp/client/channel.h)64
-rw-r--r--src/cpp/server/fixed_size_thread_pool.cc2
-rw-r--r--src/cpp/server/fixed_size_thread_pool.h67
-rw-r--r--src/cpp/server/secure_server_credentials.cc11
-rw-r--r--src/cpp/server/secure_server_credentials.h5
-rw-r--r--src/cpp/server/server.cc149
-rw-r--r--src/cpp/server/server_builder.cc37
-rw-r--r--src/cpp/server/server_context.cc9
-rw-r--r--src/cpp/server/thread_pool_interface.h54
-rw-r--r--src/cpp/util/byte_buffer.cc2
-rw-r--r--src/cpp/util/slice.cc2
-rw-r--r--src/cpp/util/status.cc2
-rw-r--r--src/cpp/util/string_ref.cc111
-rw-r--r--src/cpp/util/time.cc4
36 files changed, 579 insertions, 172 deletions
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 0582b59a6d..8bf2e4687e 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -31,29 +31,26 @@
*
*/
-#include "src/cpp/client/channel.h"
+#include <grpc++/channel.h>
#include <memory>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
-
-#include "src/core/profiling/timers.h"
-#include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
-#include <grpc++/config.h>
#include <grpc++/credentials.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/rpc_method.h>
-#include <grpc++/status.h>
-#include <grpc++/time.h>
+#include <grpc++/support/channel_arguments.h>
+#include <grpc++/support/config.h>
+#include <grpc++/support/status.h>
+#include <grpc++/support/time.h>
+#include "src/core/profiling/timers.h"
namespace grpc {
-Channel::Channel(grpc_channel* channel) : c_channel_(channel) {}
-
Channel::Channel(const grpc::string& host, grpc_channel* channel)
: host_(host), c_channel_(channel) {}
@@ -71,7 +68,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
} else {
const char* host_str = NULL;
if (!context->authority().empty()) {
- host_str = context->authority().c_str();
+ host_str = context->authority_.c_str();
} else if (!host_.empty()) {
host_str = host_.c_str();
}
@@ -98,9 +95,8 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
}
void* Channel::RegisterMethod(const char* method) {
- return grpc_channel_register_call(c_channel_, method,
- host_.empty() ? NULL : host_.c_str(),
- nullptr);
+ return grpc_channel_register_call(
+ c_channel_, method, host_.empty() ? NULL : host_.c_str(), nullptr);
}
grpc_connectivity_state Channel::GetState(bool try_to_connect) {
@@ -117,6 +113,7 @@ class TagSaver GRPC_FINAL : public CompletionQueueTag {
delete this;
return true;
}
+
private:
void* tag_;
};
diff --git a/src/cpp/client/channel_arguments.cc b/src/cpp/client/channel_arguments.cc
index da6602e7af..50422d06c9 100644
--- a/src/cpp/client/channel_arguments.cc
+++ b/src/cpp/client/channel_arguments.cc
@@ -31,10 +31,9 @@
*
*/
-#include <grpc++/channel_arguments.h>
+#include <grpc++/support/channel_arguments.h>
#include <grpc/support/log.h>
-
#include "src/core/channel/channel_args.h"
namespace grpc {
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index b8caa1eae4..c4d7cf2e51 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -38,7 +38,7 @@
#include <grpc/support/string_util.h>
#include <grpc++/credentials.h>
#include <grpc++/server_context.h>
-#include <grpc++/time.h>
+#include <grpc++/support/time.h>
#include "src/core/channel/compress_filter.h"
#include "src/cpp/common/create_auth_context.h"
@@ -71,7 +71,7 @@ void ClientContext::AddMetadata(const grpc::string& meta_key,
}
void ClientContext::set_call(grpc_call* call,
- const std::shared_ptr<ChannelInterface>& channel) {
+ const std::shared_ptr<Channel>& channel) {
GPR_ASSERT(call_ == nullptr);
call_ = call;
channel_ = channel;
diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc
index 21d01b739d..8c571cbbaa 100644
--- a/src/cpp/client/create_channel.cc
+++ b/src/cpp/client/create_channel.cc
@@ -34,15 +34,16 @@
#include <memory>
#include <sstream>
-#include "src/cpp/client/channel.h"
-#include <grpc++/channel_interface.h>
-#include <grpc++/channel_arguments.h>
+#include <grpc++/channel.h>
#include <grpc++/create_channel.h>
+#include <grpc++/support/channel_arguments.h>
+
+#include "src/cpp/client/create_channel_internal.h"
namespace grpc {
class ChannelArguments;
-std::shared_ptr<ChannelInterface> CreateChannel(
+std::shared_ptr<Channel> CreateChannel(
const grpc::string& target, const std::shared_ptr<Credentials>& creds,
const ChannelArguments& args) {
ChannelArguments cp_args = args;
@@ -50,8 +51,10 @@ std::shared_ptr<ChannelInterface> CreateChannel(
user_agent_prefix << "grpc-c++/" << grpc_version_string();
cp_args.SetString(GRPC_ARG_PRIMARY_USER_AGENT_STRING,
user_agent_prefix.str());
- return creds ? creds->CreateChannel(target, cp_args)
- : std::shared_ptr<ChannelInterface>(
- new Channel(grpc_lame_client_channel_create(NULL)));
+ return creds
+ ? creds->CreateChannel(target, cp_args)
+ : CreateChannelInternal("", grpc_lame_client_channel_create(
+ NULL, GRPC_STATUS_INVALID_ARGUMENT,
+ "Invalid credentials."));
}
} // namespace grpc
diff --git a/src/cpp/client/internal_stub.cc b/src/cpp/client/create_channel_internal.cc
index 91724a4837..9c5ab038cf 100644
--- a/src/cpp/client/internal_stub.cc
+++ b/src/cpp/client/create_channel_internal.cc
@@ -31,6 +31,16 @@
*
*/
-#include <grpc++/impl/internal_stub.h>
+#include <memory>
-namespace grpc {} // namespace grpc
+#include <grpc++/channel.h>
+
+struct grpc_channel;
+
+namespace grpc {
+
+std::shared_ptr<Channel> CreateChannelInternal(const grpc::string& host,
+ grpc_channel* c_channel) {
+ return std::shared_ptr<Channel>(new Channel(host, c_channel));
+}
+} // namespace grpc
diff --git a/src/cpp/client/create_channel_internal.h b/src/cpp/client/create_channel_internal.h
new file mode 100644
index 0000000000..4385ec701e
--- /dev/null
+++ b/src/cpp/client/create_channel_internal.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_CLIENT_CREATE_CHANNEL_INTERNAL_H
+#define GRPC_INTERNAL_CPP_CLIENT_CREATE_CHANNEL_INTERNAL_H
+
+#include <memory>
+
+#include <grpc++/support/config.h>
+
+struct grpc_channel;
+
+namespace grpc {
+class Channel;
+
+std::shared_ptr<Channel> CreateChannelInternal(const grpc::string& host,
+ grpc_channel* c_channel);
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_CLIENT_CREATE_CHANNEL_INTERNAL_H
diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc
index 0c90578ae5..7a2fdf941c 100644
--- a/src/cpp/client/generic_stub.cc
+++ b/src/cpp/client/generic_stub.cc
@@ -31,7 +31,7 @@
*
*/
-#include <grpc++/generic_stub.h>
+#include <grpc++/generic/generic_stub.h>
#include <grpc++/impl/rpc_method.h>
@@ -44,8 +44,7 @@ std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call(
return std::unique_ptr<GenericClientAsyncReaderWriter>(
new GenericClientAsyncReaderWriter(
channel_.get(), cq,
- RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING, nullptr),
- context, tag));
+ RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag));
}
} // namespace grpc
diff --git a/src/cpp/client/insecure_credentials.cc b/src/cpp/client/insecure_credentials.cc
index 2f9357b568..4a4d2cb97d 100644
--- a/src/cpp/client/insecure_credentials.cc
+++ b/src/cpp/client/insecure_credentials.cc
@@ -31,25 +31,27 @@
*
*/
+#include <grpc++/credentials.h>
+
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-
-#include <grpc++/channel_arguments.h>
-#include <grpc++/config.h>
-#include <grpc++/credentials.h>
-#include "src/cpp/client/channel.h"
+#include <grpc++/channel.h>
+#include <grpc++/support/channel_arguments.h>
+#include <grpc++/support/config.h>
+#include "src/cpp/client/create_channel_internal.h"
namespace grpc {
namespace {
class InsecureCredentialsImpl GRPC_FINAL : public Credentials {
public:
- std::shared_ptr<grpc::ChannelInterface> CreateChannel(
+ std::shared_ptr<grpc::Channel> CreateChannel(
const string& target, const grpc::ChannelArguments& args) GRPC_OVERRIDE {
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
- return std::shared_ptr<ChannelInterface>(new Channel(
- grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr)));
+ return CreateChannelInternal(
+ "",
+ grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr));
}
// InsecureCredentials should not be applied to a call.
diff --git a/src/cpp/client/secure_channel_arguments.cc b/src/cpp/client/secure_channel_arguments.cc
index d89df999ad..e17d3b58b0 100644
--- a/src/cpp/client/secure_channel_arguments.cc
+++ b/src/cpp/client/secure_channel_arguments.cc
@@ -31,9 +31,9 @@
*
*/
-#include <grpc++/channel_arguments.h>
-#include <grpc/grpc_security.h>
+#include <grpc++/support/channel_arguments.h>
+#include <grpc/grpc_security.h>
#include "src/core/channel/channel_args.h"
namespace grpc {
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index 6cd6b77fcf..f368f2590a 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -32,21 +32,21 @@
*/
#include <grpc/support/log.h>
-
-#include <grpc++/channel_arguments.h>
+#include <grpc++/channel.h>
#include <grpc++/impl/grpc_library.h>
-#include "src/cpp/client/channel.h"
+#include <grpc++/support/channel_arguments.h>
+#include "src/cpp/client/create_channel_internal.h"
#include "src/cpp/client/secure_credentials.h"
namespace grpc {
-std::shared_ptr<grpc::ChannelInterface> SecureCredentials::CreateChannel(
+std::shared_ptr<grpc::Channel> SecureCredentials::CreateChannel(
const string& target, const grpc::ChannelArguments& args) {
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
- return std::shared_ptr<ChannelInterface>(new Channel(
+ return CreateChannelInternal(
args.GetSslTargetNameOverride(),
- grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args)));
+ grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args));
}
bool SecureCredentials::ApplyToCall(grpc_call* call) {
diff --git a/src/cpp/client/secure_credentials.h b/src/cpp/client/secure_credentials.h
index ddf69911b5..62d3185477 100644
--- a/src/cpp/client/secure_credentials.h
+++ b/src/cpp/client/secure_credentials.h
@@ -36,7 +36,7 @@
#include <grpc/grpc_security.h>
-#include <grpc++/config.h>
+#include <grpc++/support/config.h>
#include <grpc++/credentials.h>
namespace grpc {
@@ -48,7 +48,7 @@ class SecureCredentials GRPC_FINAL : public Credentials {
grpc_credentials* GetRawCreds() { return c_creds_; }
bool ApplyToCall(grpc_call* call) GRPC_OVERRIDE;
- std::shared_ptr<grpc::ChannelInterface> CreateChannel(
+ std::shared_ptr<grpc::Channel> CreateChannel(
const string& target, const grpc::ChannelArguments& args) GRPC_OVERRIDE;
SecureCredentials* AsSecureCredentials() GRPC_OVERRIDE { return this; }
@@ -59,4 +59,3 @@ class SecureCredentials GRPC_FINAL : public Credentials {
} // namespace grpc
#endif // GRPC_INTERNAL_CPP_CLIENT_SECURE_CREDENTIALS_H
-
diff --git a/src/cpp/common/auth_property_iterator.cc b/src/cpp/common/auth_property_iterator.cc
index ba88983515..5ccf8cf72c 100644
--- a/src/cpp/common/auth_property_iterator.cc
+++ b/src/cpp/common/auth_property_iterator.cc
@@ -31,7 +31,7 @@
*
*/
-#include <grpc++/auth_context.h>
+#include <grpc++/support/auth_context.h>
#include <grpc/grpc_security.h>
@@ -64,8 +64,7 @@ AuthPropertyIterator AuthPropertyIterator::operator++(int) {
return tmp;
}
-bool AuthPropertyIterator::operator==(
- const AuthPropertyIterator& rhs) const {
+bool AuthPropertyIterator::operator==(const AuthPropertyIterator& rhs) const {
if (property_ == nullptr || rhs.property_ == nullptr) {
return property_ == rhs.property_;
} else {
@@ -73,8 +72,7 @@ bool AuthPropertyIterator::operator==(
}
}
-bool AuthPropertyIterator::operator!=(
- const AuthPropertyIterator& rhs) const {
+bool AuthPropertyIterator::operator!=(const AuthPropertyIterator& rhs) const {
return !operator==(rhs);
}
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index 0a5c976e01..16aa2c9fb9 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -34,10 +34,9 @@
#include <grpc++/impl/call.h>
#include <grpc/support/alloc.h>
-#include <grpc++/byte_buffer.h>
+#include <grpc++/channel.h>
#include <grpc++/client_context.h>
-#include <grpc++/channel_interface.h>
-
+#include <grpc++/support/byte_buffer.h>
#include "src/core/profiling/timers.h"
namespace grpc {
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index fca33f8f54..a175beb452 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -36,7 +36,7 @@
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-#include <grpc++/time.h>
+#include <grpc++/support/time.h>
namespace grpc {
diff --git a/src/cpp/common/create_auth_context.h b/src/cpp/common/create_auth_context.h
index 9082a90c6d..b4962bae4e 100644
--- a/src/cpp/common/create_auth_context.h
+++ b/src/cpp/common/create_auth_context.h
@@ -33,7 +33,7 @@
#include <memory>
#include <grpc/grpc.h>
-#include <grpc++/auth_context.h>
+#include <grpc++/support/auth_context.h>
namespace grpc {
diff --git a/src/cpp/common/insecure_create_auth_context.cc b/src/cpp/common/insecure_create_auth_context.cc
index 07fc0bd549..fe80c1a80c 100644
--- a/src/cpp/common/insecure_create_auth_context.cc
+++ b/src/cpp/common/insecure_create_auth_context.cc
@@ -33,7 +33,7 @@
#include <memory>
#include <grpc/grpc.h>
-#include <grpc++/auth_context.h>
+#include <grpc++/support/auth_context.h>
namespace grpc {
diff --git a/src/cpp/common/secure_auth_context.h b/src/cpp/common/secure_auth_context.h
index 1b27bf5c32..8a866eaaa9 100644
--- a/src/cpp/common/secure_auth_context.h
+++ b/src/cpp/common/secure_auth_context.h
@@ -34,7 +34,7 @@
#ifndef GRPC_INTERNAL_CPP_COMMON_SECURE_AUTH_CONTEXT_H
#define GRPC_INTERNAL_CPP_COMMON_SECURE_AUTH_CONTEXT_H
-#include <grpc++/auth_context.h>
+#include <grpc++/support/auth_context.h>
struct grpc_auth_context;
diff --git a/src/cpp/common/secure_create_auth_context.cc b/src/cpp/common/secure_create_auth_context.cc
index d81f4bbc4a..f13d25a1dd 100644
--- a/src/cpp/common/secure_create_auth_context.cc
+++ b/src/cpp/common/secure_create_auth_context.cc
@@ -34,7 +34,7 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
-#include <grpc++/auth_context.h>
+#include <grpc++/support/auth_context.h>
#include "src/cpp/common/secure_auth_context.h"
namespace grpc {
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index 63f4a3a0bc..be84c222a0 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -32,7 +32,6 @@
*/
#include <grpc++/impl/proto_utils.h>
-#include <grpc++/config.h>
#include <grpc/grpc.h>
#include <grpc/byte_buffer.h>
@@ -40,6 +39,7 @@
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/support/port_platform.h>
+#include <grpc++/support/config.h>
const int kMaxBufferLength = 8192;
@@ -154,18 +154,18 @@ class GrpcBufferReader GRPC_FINAL
namespace grpc {
-Status SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
+Status SerializeProto(const grpc::protobuf::Message& msg,
+ grpc_byte_buffer** bp) {
GrpcBufferWriter writer(bp);
return msg.SerializeToZeroCopyStream(&writer)
? Status::OK
- : Status(StatusCode::INVALID_ARGUMENT,
- "Failed to serialize message");
+ : Status(StatusCode::INTERNAL, "Failed to serialize message");
}
Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
int max_message_size) {
if (!buffer) {
- return Status(StatusCode::INVALID_ARGUMENT, "No payload");
+ return Status(StatusCode::INTERNAL, "No payload");
}
GrpcBufferReader reader(buffer);
::grpc::protobuf::io::CodedInputStream decoder(&reader);
@@ -173,11 +173,10 @@ Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
decoder.SetTotalBytesLimit(max_message_size, max_message_size);
}
if (!msg->ParseFromCodedStream(&decoder)) {
- return Status(StatusCode::INVALID_ARGUMENT,
- msg->InitializationErrorString());
+ return Status(StatusCode::INTERNAL, msg->InitializationErrorString());
}
if (!decoder.ConsumedEntireMessage()) {
- return Status(StatusCode::INVALID_ARGUMENT, "Did not read entire message");
+ return Status(StatusCode::INTERNAL, "Did not read entire message");
}
return Status::OK;
}
diff --git a/src/cpp/server/async_generic_service.cc b/src/cpp/server/async_generic_service.cc
index 2e99afcb5f..6b9ea532b6 100644
--- a/src/cpp/server/async_generic_service.cc
+++ b/src/cpp/server/async_generic_service.cc
@@ -31,7 +31,7 @@
*
*/
-#include <grpc++/async_generic_service.h>
+#include <grpc++/generic/async_generic_service.h>
#include <grpc++/server.h>
diff --git a/src/cpp/server/create_default_thread_pool.cc b/src/cpp/server/create_default_thread_pool.cc
index 81c84474d8..f3b07ec8ce 100644
--- a/src/cpp/server/create_default_thread_pool.cc
+++ b/src/cpp/server/create_default_thread_pool.cc
@@ -32,16 +32,17 @@
*/
#include <grpc/support/cpu.h>
-#include <grpc++/dynamic_thread_pool.h>
+
+#include "src/cpp/server/dynamic_thread_pool.h"
#ifndef GRPC_CUSTOM_DEFAULT_THREAD_POOL
namespace grpc {
ThreadPoolInterface* CreateDefaultThreadPool() {
- int cores = gpr_cpu_num_cores();
- if (!cores) cores = 4;
- return new DynamicThreadPool(cores);
+ int cores = gpr_cpu_num_cores();
+ if (!cores) cores = 4;
+ return new DynamicThreadPool(cores);
}
} // namespace grpc
diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc
index f58d0420df..4b226c2992 100644
--- a/src/cpp/server/dynamic_thread_pool.cc
+++ b/src/cpp/server/dynamic_thread_pool.cc
@@ -33,13 +33,14 @@
#include <grpc++/impl/sync.h>
#include <grpc++/impl/thd.h>
-#include <grpc++/dynamic_thread_pool.h>
+
+#include "src/cpp/server/dynamic_thread_pool.h"
namespace grpc {
-DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool *pool):
- pool_(pool),
- thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, this)) {
-}
+DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
+ : pool_(pool),
+ thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc,
+ this)) {}
DynamicThreadPool::DynamicThread::~DynamicThread() {
thd_->join();
thd_.reset();
@@ -57,7 +58,7 @@ void DynamicThreadPool::DynamicThread::ThreadFunc() {
pool_->shutdown_cv_.notify_one();
}
}
-
+
void DynamicThreadPool::ThreadFunc() {
for (;;) {
// Wait until work is available or we are shutting down.
@@ -65,7 +66,7 @@ void DynamicThreadPool::ThreadFunc() {
if (!shutdown_ && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread
if (threads_waiting_ >= reserve_threads_) {
- break;
+ break;
}
threads_waiting_++;
cv_.wait(lock);
@@ -84,9 +85,11 @@ void DynamicThreadPool::ThreadFunc() {
}
}
-DynamicThreadPool::DynamicThreadPool(int reserve_threads) :
- shutdown_(false), reserve_threads_(reserve_threads), nthreads_(0),
- threads_waiting_(0) {
+DynamicThreadPool::DynamicThreadPool(int reserve_threads)
+ : shutdown_(false),
+ reserve_threads_(reserve_threads),
+ nthreads_(0),
+ threads_waiting_(0) {
for (int i = 0; i < reserve_threads_; i++) {
grpc::lock_guard<grpc::mutex> lock(mu_);
nthreads_++;
@@ -96,10 +99,10 @@ DynamicThreadPool::DynamicThreadPool(int reserve_threads) :
void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) {
- delete *t;
+ delete *t;
}
}
-
+
DynamicThreadPool::~DynamicThreadPool() {
grpc::unique_lock<grpc::mutex> lock(mu_);
shutdown_ = true;
diff --git a/src/cpp/client/channel.h b/src/cpp/server/dynamic_thread_pool.h
index cb8e8d98d2..5ba7533c05 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/server/dynamic_thread_pool.h
@@ -31,51 +31,53 @@
*
*/
-#ifndef GRPC_INTERNAL_CPP_CLIENT_CHANNEL_H
-#define GRPC_INTERNAL_CPP_CLIENT_CHANNEL_H
+#ifndef GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H
+#define GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H
+#include <list>
#include <memory>
+#include <queue>
-#include <grpc++/channel_interface.h>
-#include <grpc++/config.h>
-#include <grpc++/impl/grpc_library.h>
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
+#include <grpc++/support/config.h>
-struct grpc_channel;
+#include "src/cpp/server/thread_pool_interface.h"
namespace grpc {
-class Call;
-class CallOpSetInterface;
-class ChannelArguments;
-class CompletionQueue;
-class Credentials;
-class StreamContextInterface;
-class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface {
+class DynamicThreadPool GRPC_FINAL : public ThreadPoolInterface {
public:
- explicit Channel(grpc_channel* c_channel);
- Channel(const grpc::string& host, grpc_channel* c_channel);
- ~Channel() GRPC_OVERRIDE;
+ explicit DynamicThreadPool(int reserve_threads);
+ ~DynamicThreadPool();
- void* RegisterMethod(const char* method) GRPC_OVERRIDE;
- Call CreateCall(const RpcMethod& method, ClientContext* context,
- CompletionQueue* cq) GRPC_OVERRIDE;
- void PerformOpsOnCall(CallOpSetInterface* ops,
- Call* call) GRPC_OVERRIDE;
-
- grpc_connectivity_state GetState(bool try_to_connect) GRPC_OVERRIDE;
+ void Add(const std::function<void()>& callback) GRPC_OVERRIDE;
private:
- void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
- gpr_timespec deadline, CompletionQueue* cq,
- void* tag) GRPC_OVERRIDE;
+ class DynamicThread {
+ public:
+ DynamicThread(DynamicThreadPool* pool);
+ ~DynamicThread();
- bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
- gpr_timespec deadline) GRPC_OVERRIDE;
+ private:
+ DynamicThreadPool* pool_;
+ std::unique_ptr<grpc::thread> thd_;
+ void ThreadFunc();
+ };
+ grpc::mutex mu_;
+ grpc::condition_variable cv_;
+ grpc::condition_variable shutdown_cv_;
+ bool shutdown_;
+ std::queue<std::function<void()>> callbacks_;
+ int reserve_threads_;
+ int nthreads_;
+ int threads_waiting_;
+ std::list<DynamicThread*> dead_threads_;
- const grpc::string host_;
- grpc_channel* const c_channel_; // owned
+ void ThreadFunc();
+ static void ReapThreads(std::list<DynamicThread*>* tlist);
};
} // namespace grpc
-#endif // GRPC_INTERNAL_CPP_CLIENT_CHANNEL_H
+#endif // GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H
diff --git a/src/cpp/server/fixed_size_thread_pool.cc b/src/cpp/server/fixed_size_thread_pool.cc
index bafbc5802a..2bdc44be2e 100644
--- a/src/cpp/server/fixed_size_thread_pool.cc
+++ b/src/cpp/server/fixed_size_thread_pool.cc
@@ -33,7 +33,7 @@
#include <grpc++/impl/sync.h>
#include <grpc++/impl/thd.h>
-#include <grpc++/fixed_size_thread_pool.h>
+#include "src/cpp/server/fixed_size_thread_pool.h"
namespace grpc {
diff --git a/src/cpp/server/fixed_size_thread_pool.h b/src/cpp/server/fixed_size_thread_pool.h
new file mode 100644
index 0000000000..394ae5821e
--- /dev/null
+++ b/src/cpp/server/fixed_size_thread_pool.h
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H
+#define GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H
+
+#include <queue>
+#include <vector>
+
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
+#include <grpc++/support/config.h>
+
+#include "src/cpp/server/thread_pool_interface.h"
+
+namespace grpc {
+
+class FixedSizeThreadPool GRPC_FINAL : public ThreadPoolInterface {
+ public:
+ explicit FixedSizeThreadPool(int num_threads);
+ ~FixedSizeThreadPool();
+
+ void Add(const std::function<void()>& callback) GRPC_OVERRIDE;
+
+ private:
+ grpc::mutex mu_;
+ grpc::condition_variable cv_;
+ bool shutdown_;
+ std::queue<std::function<void()>> callbacks_;
+ std::vector<grpc::thread*> threads_;
+
+ void ThreadFunc();
+};
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H
diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc
index fab538f1c4..a7d11856a0 100644
--- a/src/cpp/server/secure_server_credentials.cc
+++ b/src/cpp/server/secure_server_credentials.cc
@@ -49,7 +49,7 @@ void AuthMetadataProcessorAyncWrapper::Process(
auto* w = reinterpret_cast<AuthMetadataProcessorAyncWrapper*>(wrapper);
if (w->processor_ == nullptr) {
// Early exit.
- cb(user_data, NULL, 0, 1);
+ cb(user_data, nullptr, 0, nullptr, 0, GRPC_STATUS_OK, nullptr);
return;
}
if (w->processor_->IsBlocking()) {
@@ -83,14 +83,15 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor(
0,
{{nullptr, nullptr, nullptr, nullptr}}});
}
- cb(user_data, &consumed_md[0], consumed_md.size(), 1);
+ cb(user_data, &consumed_md[0], consumed_md.size(), nullptr, 0,
+ GRPC_STATUS_OK, nullptr);
} else {
- cb(user_data, nullptr, 0, 0);
+ cb(user_data, nullptr, 0, nullptr, 0, GRPC_STATUS_UNAUTHENTICATED, nullptr);
}
}
-int SecureServerCredentials::AddPortToServer(
- const grpc::string& addr, grpc_server* server) {
+int SecureServerCredentials::AddPortToServer(const grpc::string& addr,
+ grpc_server* server) {
return grpc_server_add_secure_http2_port(server, addr.c_str(), creds_);
}
diff --git a/src/cpp/server/secure_server_credentials.h b/src/cpp/server/secure_server_credentials.h
index 07d0f9374a..e427280a37 100644
--- a/src/cpp/server/secure_server_credentials.h
+++ b/src/cpp/server/secure_server_credentials.h
@@ -36,10 +36,11 @@
#include <memory>
+#include <grpc++/server_credentials.h>
+
#include <grpc/grpc_security.h>
-#include <grpc++/server_credentials.h>
-#include <grpc++/thread_pool_interface.h>
+#include "src/cpp/server/thread_pool_interface.h"
namespace grpc {
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 90f3854a72..66cd27cc33 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -32,24 +32,71 @@
*/
#include <grpc++/server.h>
+
#include <utility>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc++/completion_queue.h>
-#include <grpc++/async_generic_service.h>
+#include <grpc++/generic/async_generic_service.h>
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
-#include <grpc++/thread_pool_interface.h>
-#include <grpc++/time.h>
+#include <grpc++/support/time.h>
#include "src/core/profiling/timers.h"
+#include "src/cpp/server/thread_pool_interface.h"
namespace grpc {
+class Server::UnimplementedAsyncRequestContext {
+ protected:
+ UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
+
+ GenericServerContext server_context_;
+ GenericServerAsyncReaderWriter generic_stream_;
+};
+
+class Server::UnimplementedAsyncRequest GRPC_FINAL
+ : public UnimplementedAsyncRequestContext,
+ public GenericAsyncRequest {
+ public:
+ UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
+ : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
+ NULL, false),
+ server_(server),
+ cq_(cq) {}
+
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
+
+ ServerContext* context() { return &server_context_; }
+ GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
+
+ private:
+ Server* const server_;
+ ServerCompletionQueue* const cq_;
+};
+
+typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
+ UnimplementedAsyncResponseOp;
+class Server::UnimplementedAsyncResponse GRPC_FINAL
+ : public UnimplementedAsyncResponseOp {
+ public:
+ UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
+ ~UnimplementedAsyncResponse() { delete request_; }
+
+ bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
+ bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status);
+ delete this;
+ return r;
+ }
+
+ private:
+ UnimplementedAsyncRequest* const request_;
+};
+
class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
@@ -90,6 +137,26 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
+ static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
+ gpr_timespec deadline) {
+ void* tag = nullptr;
+ *ok = false;
+ switch (cq->AsyncNext(&tag, ok, deadline)) {
+ case CompletionQueue::TIMEOUT:
+ *req = nullptr;
+ return true;
+ case CompletionQueue::SHUTDOWN:
+ *req = nullptr;
+ return false;
+ case CompletionQueue::GOT_EVENT:
+ *req = static_cast<SyncRequest*>(tag);
+ GPR_ASSERT((*req)->in_flight_);
+ return true;
+ }
+ gpr_log(GPR_ERROR, "Should never reach here");
+ abort();
+ }
+
void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
@@ -230,18 +297,17 @@ Server::~Server() {
delete sync_methods_;
}
-bool Server::RegisterService(const grpc::string *host, RpcService* service) {
+bool Server::RegisterService(const grpc::string* host, RpcService* service) {
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod* method = service->GetMethod(i);
- void* tag = grpc_server_register_method(
- server_, method->name(), host ? host->c_str() : nullptr);
+ void* tag = grpc_server_register_method(server_, method->name(),
+ host ? host->c_str() : nullptr);
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
return false;
}
- SyncRequest request(method, tag);
- sync_methods_->emplace_back(request);
+ sync_methods_->emplace_back(method, tag);
}
return true;
}
@@ -278,15 +344,23 @@ int Server::AddListeningPort(const grpc::string& addr,
return creds->AddPortToServer(addr, server_);
}
-bool Server::Start() {
+bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
GPR_ASSERT(!started_);
started_ = true;
grpc_server_start(server_);
if (!has_generic_service_) {
- unknown_method_.reset(new RpcServiceMethod(
- "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
- sync_methods_->emplace_back(unknown_method_.get(), nullptr);
+ if (!sync_methods_->empty()) {
+ unknown_method_.reset(new RpcServiceMethod(
+ "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ // Use of emplace_back with just constructor arguments is not accepted
+ // here by gcc-4.4 because it can't match the anonymous nullptr with a
+ // proper constructor implicitly. Construct the object and use push_back.
+ sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
+ }
+ for (size_t i = 0; i < num_cqs; i++) {
+ new UnimplementedAsyncRequest(this, cqs[i]);
+ }
}
// Start processing rpcs.
if (!sync_methods_->empty()) {
@@ -301,12 +375,27 @@ bool Server::Start() {
return true;
}
-void Server::Shutdown() {
+void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
+ // Spin, eating requests until the completion queue is completely shutdown.
+ // If the deadline expires then cancel anything that's pending and keep
+ // spinning forever until the work is actually drained.
+ // Since nothing else needs to touch state guarded by mu_, holding it
+ // through this loop is fine.
+ SyncRequest* request;
+ bool ok;
+ while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
+ if (request == NULL) { // deadline expired
+ grpc_server_cancel_all_calls(server_);
+ deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ } else if (ok) {
+ SyncRequest::CallData call_data(this, request);
+ }
+ }
// Wait for running callbacks to finish.
while (num_running_cb_ != 0) {
@@ -333,12 +422,14 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
Server::BaseAsyncRequest::BaseAsyncRequest(
Server* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
+ bool delete_on_finalize)
: server_(server),
context_(context),
stream_(stream),
call_cq_(call_cq),
tag_(tag),
+ delete_on_finalize_(delete_on_finalize),
call_(nullptr) {
memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
}
@@ -365,14 +456,16 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
// just the pointers inside call are copied here
stream_->BindCall(&call);
*tag = tag_;
- delete this;
+ if (delete_on_finalize_) {
+ delete this;
+ }
return true;
}
Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
- : BaseAsyncRequest(server, context, stream, call_cq, tag) {}
+ : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
void Server::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
@@ -386,8 +479,9 @@ void Server::RegisteredAsyncRequest::IssueRequest(
Server::GenericAsyncRequest::GenericAsyncRequest(
Server* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag)
- : BaseAsyncRequest(server, context, stream, call_cq, tag) {
+ ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
+ : BaseAsyncRequest(server, context, stream, call_cq, tag,
+ delete_on_finalize) {
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
@@ -408,6 +502,25 @@ bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
return BaseAsyncRequest::FinalizeResult(tag, status);
}
+bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
+ bool* status) {
+ if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
+ new UnimplementedAsyncRequest(server_, cq_);
+ new UnimplementedAsyncResponse(this);
+ } else {
+ delete this;
+ }
+ return false;
+}
+
+Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
+ UnimplementedAsyncRequest* request)
+ : request_(request) {
+ Status status(StatusCode::UNIMPLEMENTED, "");
+ UnknownMethodHandler::FillOps(request_->context(), this);
+ request_->stream()->call_.PerformOps(this);
+}
+
void Server::ScheduleCallback() {
{
grpc::unique_lock<grpc::mutex> lock(mu_);
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 09118879f4..b739cbfe62 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -37,8 +37,8 @@
#include <grpc/support/log.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server.h>
-#include <grpc++/thread_pool_interface.h>
-#include <grpc++/fixed_size_thread_pool.h>
+#include "src/cpp/server/thread_pool_interface.h"
+#include "src/cpp/server/fixed_size_thread_pool.h"
namespace grpc {
@@ -59,14 +59,16 @@ void ServerBuilder::RegisterAsyncService(AsynchronousService* service) {
async_services_.emplace_back(new NamedService<AsynchronousService>(service));
}
-void ServerBuilder::RegisterService(
- const grpc::string& addr, SynchronousService* service) {
- services_.emplace_back(new NamedService<RpcService>(addr, service->service()));
+void ServerBuilder::RegisterService(const grpc::string& addr,
+ SynchronousService* service) {
+ services_.emplace_back(
+ new NamedService<RpcService>(addr, service->service()));
}
-void ServerBuilder::RegisterAsyncService(
- const grpc::string& addr, AsynchronousService* service) {
- async_services_.emplace_back(new NamedService<AsynchronousService>(addr, service));
+void ServerBuilder::RegisterAsyncService(const grpc::string& addr,
+ AsynchronousService* service) {
+ async_services_.emplace_back(
+ new NamedService<AsynchronousService>(addr, service));
}
void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
@@ -87,10 +89,6 @@ void ServerBuilder::AddListeningPort(const grpc::string& addr,
ports_.push_back(port);
}
-void ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) {
- thread_pool_ = thread_pool;
-}
-
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
bool thread_pool_owned = false;
if (!async_services_.empty() && !services_.empty()) {
@@ -101,12 +99,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_ = CreateDefaultThreadPool();
thread_pool_owned = true;
}
- // Async services only, create a thread pool to handle requests to unknown
- // services.
- if (!thread_pool_ && !generic_service_ && !async_services_.empty()) {
- thread_pool_ = new FixedSizeThreadPool(1);
- thread_pool_owned = true;
- }
std::unique_ptr<Server> server(
new Server(thread_pool_, thread_pool_owned, max_message_size_));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
@@ -119,9 +111,10 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
- for (auto service = async_services_.begin();
- service != async_services_.end(); service++) {
- if (!server->RegisterAsyncService((*service)->host.get(), (*service)->service)) {
+ for (auto service = async_services_.begin(); service != async_services_.end();
+ service++) {
+ if (!server->RegisterAsyncService((*service)->host.get(),
+ (*service)->service)) {
return nullptr;
}
}
@@ -135,7 +128,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
*port->selected_port = r;
}
}
- if (!server->Start()) {
+ if (!server->Start(&cqs_[0], cqs_.size())) {
return nullptr;
}
return server;
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index bb34040a2f..acc163d6b5 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -38,7 +38,7 @@
#include <grpc/support/log.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/sync.h>
-#include <grpc++/time.h>
+#include <grpc++/support/time.h>
#include "src/core/channel/compress_filter.h"
#include "src/cpp/common/create_auth_context.h"
@@ -50,7 +50,12 @@ namespace grpc {
class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- CompletionOp() : has_tag_(false), tag_(nullptr), refs_(2), finalized_(false), cancelled_(0) {}
+ CompletionOp()
+ : has_tag_(false),
+ tag_(nullptr),
+ refs_(2),
+ finalized_(false),
+ cancelled_(0) {}
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
diff --git a/src/cpp/server/thread_pool_interface.h b/src/cpp/server/thread_pool_interface.h
new file mode 100644
index 0000000000..1ebe30fe2a
--- /dev/null
+++ b/src/cpp/server/thread_pool_interface.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_THREAD_POOL_INTERFACE_H
+#define GRPC_INTERNAL_CPP_THREAD_POOL_INTERFACE_H
+
+#include <functional>
+
+namespace grpc {
+
+// A thread pool interface for running callbacks.
+class ThreadPoolInterface {
+ public:
+ virtual ~ThreadPoolInterface() {}
+
+ // Schedule the given callback for execution.
+ virtual void Add(const std::function<void()>& callback) = 0;
+};
+
+ThreadPoolInterface* CreateDefaultThreadPool();
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_THREAD_POOL_INTERFACE_H
diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc
index a66c92c3e1..e46e656beb 100644
--- a/src/cpp/util/byte_buffer.cc
+++ b/src/cpp/util/byte_buffer.cc
@@ -32,7 +32,7 @@
*/
#include <grpc/byte_buffer_reader.h>
-#include <grpc++/byte_buffer.h>
+#include <grpc++/support/byte_buffer.h>
namespace grpc {
diff --git a/src/cpp/util/slice.cc b/src/cpp/util/slice.cc
index 57370dabc6..7e88423b6c 100644
--- a/src/cpp/util/slice.cc
+++ b/src/cpp/util/slice.cc
@@ -31,7 +31,7 @@
*
*/
-#include <grpc++/slice.h>
+#include <grpc++/support/slice.h>
namespace grpc {
diff --git a/src/cpp/util/status.cc b/src/cpp/util/status.cc
index 5bb9eda3d9..ad9850cf07 100644
--- a/src/cpp/util/status.cc
+++ b/src/cpp/util/status.cc
@@ -31,7 +31,7 @@
*
*/
-#include <grpc++/status.h>
+#include <grpc++/support/status.h>
namespace grpc {
diff --git a/src/cpp/util/string_ref.cc b/src/cpp/util/string_ref.cc
new file mode 100644
index 0000000000..8483e8c2ee
--- /dev/null
+++ b/src/cpp/util/string_ref.cc
@@ -0,0 +1,111 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/support/string_ref.h>
+
+#include <string.h>
+
+#include <algorithm>
+
+namespace grpc {
+
+constexpr size_t string_ref::npos;
+
+string_ref& string_ref::operator=(const string_ref& rhs) {
+ data_ = rhs.data_;
+ length_ = rhs.length_;
+ return *this;
+}
+
+string_ref::string_ref(const char* s) : data_(s), length_(strlen(s)) {}
+
+string_ref string_ref::substr(size_t pos, size_t n) const {
+ if (pos > length_) pos = length_;
+ if (n > (length_ - pos)) n = length_ - pos;
+ return string_ref(data_ + pos, n);
+}
+
+int string_ref::compare(string_ref x) const {
+ size_t min_size = length_ < x.length_ ? length_ : x.length_;
+ int r = memcmp(data_, x.data_, min_size);
+ if (r < 0) return -1;
+ if (r > 0) return 1;
+ if (length_ < x.length_) return -1;
+ if (length_ > x.length_) return 1;
+ return 0;
+}
+
+bool string_ref::starts_with(string_ref x) const {
+ return length_ >= x.length_ && (memcmp(data_, x.data_, x.length_) == 0);
+}
+
+bool string_ref::ends_with(string_ref x) const {
+ return length_ >= x.length_ &&
+ (memcmp(data_ + (length_ - x.length_), x.data_, x.length_) == 0);
+}
+
+size_t string_ref::find(string_ref s) const {
+ auto it = std::search(cbegin(), cend(), s.cbegin(), s.cend());
+ return it == cend() ? npos : std::distance(cbegin(), it);
+}
+
+size_t string_ref::find(char c) const {
+ auto it = std::find_if(cbegin(), cend(), [c](char cc) { return cc == c; });
+ return it == cend() ? npos : std::distance(cbegin(), it);
+}
+
+bool operator==(string_ref x, string_ref y) {
+ return x.compare(y) == 0;
+}
+
+bool operator!=(string_ref x, string_ref y) {
+ return x.compare(y) != 0;
+}
+
+bool operator<(string_ref x, string_ref y) {
+ return x.compare(y) < 0;
+}
+
+bool operator<=(string_ref x, string_ref y) {
+ return x.compare(y) <= 0;
+}
+
+bool operator>(string_ref x, string_ref y) {
+ return x.compare(y) > 0;
+}
+
+bool operator>=(string_ref x, string_ref y) {
+ return x.compare(y) >= 0;
+}
+
+} // namespace grpc
diff --git a/src/cpp/util/time.cc b/src/cpp/util/time.cc
index 799c597e0b..b3401eb26b 100644
--- a/src/cpp/util/time.cc
+++ b/src/cpp/util/time.cc
@@ -31,12 +31,12 @@
*
*/
-#include <grpc++/config.h>
+#include <grpc++/support/config.h>
#ifndef GRPC_CXX0X_NO_CHRONO
#include <grpc/support/time.h>
-#include <grpc++/time.h>
+#include <grpc++/support/time.h>
using std::chrono::duration_cast;
using std::chrono::nanoseconds;