diff options
Diffstat (limited to 'src/cpp')
35 files changed, 486 insertions, 125 deletions
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 17f31c22cb..8bf2e4687e 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -31,29 +31,26 @@ * */ -#include "src/cpp/client/channel.h" +#include <grpc++/channel.h> #include <memory> #include <grpc/grpc.h> #include <grpc/support/log.h> #include <grpc/support/slice.h> - -#include "src/core/profiling/timers.h" -#include <grpc++/channel_arguments.h> #include <grpc++/client_context.h> #include <grpc++/completion_queue.h> -#include <grpc++/config.h> #include <grpc++/credentials.h> #include <grpc++/impl/call.h> #include <grpc++/impl/rpc_method.h> -#include <grpc++/status.h> -#include <grpc++/time.h> +#include <grpc++/support/channel_arguments.h> +#include <grpc++/support/config.h> +#include <grpc++/support/status.h> +#include <grpc++/support/time.h> +#include "src/core/profiling/timers.h" namespace grpc { -Channel::Channel(grpc_channel* channel) : c_channel_(channel) {} - Channel::Channel(const grpc::string& host, grpc_channel* channel) : host_(host), c_channel_(channel) {} diff --git a/src/cpp/client/channel_arguments.cc b/src/cpp/client/channel_arguments.cc index da6602e7af..50422d06c9 100644 --- a/src/cpp/client/channel_arguments.cc +++ b/src/cpp/client/channel_arguments.cc @@ -31,10 +31,9 @@ * */ -#include <grpc++/channel_arguments.h> +#include <grpc++/support/channel_arguments.h> #include <grpc/support/log.h> - #include "src/core/channel/channel_args.h" namespace grpc { diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index b8caa1eae4..c4d7cf2e51 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -38,7 +38,7 @@ #include <grpc/support/string_util.h> #include <grpc++/credentials.h> #include <grpc++/server_context.h> -#include <grpc++/time.h> +#include <grpc++/support/time.h> #include "src/core/channel/compress_filter.h" #include "src/cpp/common/create_auth_context.h" @@ -71,7 +71,7 @@ void ClientContext::AddMetadata(const grpc::string& meta_key, } void ClientContext::set_call(grpc_call* call, - const std::shared_ptr<ChannelInterface>& channel) { + const std::shared_ptr<Channel>& channel) { GPR_ASSERT(call_ == nullptr); call_ = call; channel_ = channel; diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc index 5ae772f096..8c571cbbaa 100644 --- a/src/cpp/client/create_channel.cc +++ b/src/cpp/client/create_channel.cc @@ -34,15 +34,16 @@ #include <memory> #include <sstream> -#include "src/cpp/client/channel.h" -#include <grpc++/channel_interface.h> -#include <grpc++/channel_arguments.h> +#include <grpc++/channel.h> #include <grpc++/create_channel.h> +#include <grpc++/support/channel_arguments.h> + +#include "src/cpp/client/create_channel_internal.h" namespace grpc { class ChannelArguments; -std::shared_ptr<ChannelInterface> CreateChannel( +std::shared_ptr<Channel> CreateChannel( const grpc::string& target, const std::shared_ptr<Credentials>& creds, const ChannelArguments& args) { ChannelArguments cp_args = args; @@ -50,10 +51,10 @@ std::shared_ptr<ChannelInterface> CreateChannel( user_agent_prefix << "grpc-c++/" << grpc_version_string(); cp_args.SetString(GRPC_ARG_PRIMARY_USER_AGENT_STRING, user_agent_prefix.str()); - return creds ? creds->CreateChannel(target, cp_args) - : std::shared_ptr<ChannelInterface>( - new Channel(grpc_lame_client_channel_create( - NULL, GRPC_STATUS_INVALID_ARGUMENT, - "Invalid credentials."))); + return creds + ? creds->CreateChannel(target, cp_args) + : CreateChannelInternal("", grpc_lame_client_channel_create( + NULL, GRPC_STATUS_INVALID_ARGUMENT, + "Invalid credentials.")); } } // namespace grpc diff --git a/src/cpp/client/internal_stub.cc b/src/cpp/client/create_channel_internal.cc index 91724a4837..9c5ab038cf 100644 --- a/src/cpp/client/internal_stub.cc +++ b/src/cpp/client/create_channel_internal.cc @@ -31,6 +31,16 @@ * */ -#include <grpc++/impl/internal_stub.h> +#include <memory> -namespace grpc {} // namespace grpc +#include <grpc++/channel.h> + +struct grpc_channel; + +namespace grpc { + +std::shared_ptr<Channel> CreateChannelInternal(const grpc::string& host, + grpc_channel* c_channel) { + return std::shared_ptr<Channel>(new Channel(host, c_channel)); +} +} // namespace grpc diff --git a/src/cpp/client/create_channel_internal.h b/src/cpp/client/create_channel_internal.h new file mode 100644 index 0000000000..4385ec701e --- /dev/null +++ b/src/cpp/client/create_channel_internal.h @@ -0,0 +1,51 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_CLIENT_CREATE_CHANNEL_INTERNAL_H +#define GRPC_INTERNAL_CPP_CLIENT_CREATE_CHANNEL_INTERNAL_H + +#include <memory> + +#include <grpc++/support/config.h> + +struct grpc_channel; + +namespace grpc { +class Channel; + +std::shared_ptr<Channel> CreateChannelInternal(const grpc::string& host, + grpc_channel* c_channel); + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_CLIENT_CREATE_CHANNEL_INTERNAL_H diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 0c90578ae5..7a2fdf941c 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -31,7 +31,7 @@ * */ -#include <grpc++/generic_stub.h> +#include <grpc++/generic/generic_stub.h> #include <grpc++/impl/rpc_method.h> @@ -44,8 +44,7 @@ std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call( return std::unique_ptr<GenericClientAsyncReaderWriter>( new GenericClientAsyncReaderWriter( channel_.get(), cq, - RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING, nullptr), - context, tag)); + RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag)); } } // namespace grpc diff --git a/src/cpp/client/insecure_credentials.cc b/src/cpp/client/insecure_credentials.cc index 2f9357b568..4a4d2cb97d 100644 --- a/src/cpp/client/insecure_credentials.cc +++ b/src/cpp/client/insecure_credentials.cc @@ -31,25 +31,27 @@ * */ +#include <grpc++/credentials.h> + #include <grpc/grpc.h> #include <grpc/support/log.h> - -#include <grpc++/channel_arguments.h> -#include <grpc++/config.h> -#include <grpc++/credentials.h> -#include "src/cpp/client/channel.h" +#include <grpc++/channel.h> +#include <grpc++/support/channel_arguments.h> +#include <grpc++/support/config.h> +#include "src/cpp/client/create_channel_internal.h" namespace grpc { namespace { class InsecureCredentialsImpl GRPC_FINAL : public Credentials { public: - std::shared_ptr<grpc::ChannelInterface> CreateChannel( + std::shared_ptr<grpc::Channel> CreateChannel( const string& target, const grpc::ChannelArguments& args) GRPC_OVERRIDE { grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); - return std::shared_ptr<ChannelInterface>(new Channel( - grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr))); + return CreateChannelInternal( + "", + grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr)); } // InsecureCredentials should not be applied to a call. diff --git a/src/cpp/client/secure_channel_arguments.cc b/src/cpp/client/secure_channel_arguments.cc index d89df999ad..e17d3b58b0 100644 --- a/src/cpp/client/secure_channel_arguments.cc +++ b/src/cpp/client/secure_channel_arguments.cc @@ -31,9 +31,9 @@ * */ -#include <grpc++/channel_arguments.h> -#include <grpc/grpc_security.h> +#include <grpc++/support/channel_arguments.h> +#include <grpc/grpc_security.h> #include "src/core/channel/channel_args.h" namespace grpc { diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 6cd6b77fcf..f368f2590a 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -32,21 +32,21 @@ */ #include <grpc/support/log.h> - -#include <grpc++/channel_arguments.h> +#include <grpc++/channel.h> #include <grpc++/impl/grpc_library.h> -#include "src/cpp/client/channel.h" +#include <grpc++/support/channel_arguments.h> +#include "src/cpp/client/create_channel_internal.h" #include "src/cpp/client/secure_credentials.h" namespace grpc { -std::shared_ptr<grpc::ChannelInterface> SecureCredentials::CreateChannel( +std::shared_ptr<grpc::Channel> SecureCredentials::CreateChannel( const string& target, const grpc::ChannelArguments& args) { grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); - return std::shared_ptr<ChannelInterface>(new Channel( + return CreateChannelInternal( args.GetSslTargetNameOverride(), - grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args))); + grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args)); } bool SecureCredentials::ApplyToCall(grpc_call* call) { diff --git a/src/cpp/client/secure_credentials.h b/src/cpp/client/secure_credentials.h index c2b8d43a15..62d3185477 100644 --- a/src/cpp/client/secure_credentials.h +++ b/src/cpp/client/secure_credentials.h @@ -36,7 +36,7 @@ #include <grpc/grpc_security.h> -#include <grpc++/config.h> +#include <grpc++/support/config.h> #include <grpc++/credentials.h> namespace grpc { @@ -48,7 +48,7 @@ class SecureCredentials GRPC_FINAL : public Credentials { grpc_credentials* GetRawCreds() { return c_creds_; } bool ApplyToCall(grpc_call* call) GRPC_OVERRIDE; - std::shared_ptr<grpc::ChannelInterface> CreateChannel( + std::shared_ptr<grpc::Channel> CreateChannel( const string& target, const grpc::ChannelArguments& args) GRPC_OVERRIDE; SecureCredentials* AsSecureCredentials() GRPC_OVERRIDE { return this; } diff --git a/src/cpp/common/auth_property_iterator.cc b/src/cpp/common/auth_property_iterator.cc index d3bfd5cb6b..5ccf8cf72c 100644 --- a/src/cpp/common/auth_property_iterator.cc +++ b/src/cpp/common/auth_property_iterator.cc @@ -31,7 +31,7 @@ * */ -#include <grpc++/auth_context.h> +#include <grpc++/support/auth_context.h> #include <grpc/grpc_security.h> diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 0a5c976e01..16aa2c9fb9 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -34,10 +34,9 @@ #include <grpc++/impl/call.h> #include <grpc/support/alloc.h> -#include <grpc++/byte_buffer.h> +#include <grpc++/channel.h> #include <grpc++/client_context.h> -#include <grpc++/channel_interface.h> - +#include <grpc++/support/byte_buffer.h> #include "src/core/profiling/timers.h" namespace grpc { diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index fca33f8f54..a175beb452 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -36,7 +36,7 @@ #include <grpc/grpc.h> #include <grpc/support/log.h> -#include <grpc++/time.h> +#include <grpc++/support/time.h> namespace grpc { diff --git a/src/cpp/common/create_auth_context.h b/src/cpp/common/create_auth_context.h index 9082a90c6d..b4962bae4e 100644 --- a/src/cpp/common/create_auth_context.h +++ b/src/cpp/common/create_auth_context.h @@ -33,7 +33,7 @@ #include <memory> #include <grpc/grpc.h> -#include <grpc++/auth_context.h> +#include <grpc++/support/auth_context.h> namespace grpc { diff --git a/src/cpp/common/insecure_create_auth_context.cc b/src/cpp/common/insecure_create_auth_context.cc index 07fc0bd549..fe80c1a80c 100644 --- a/src/cpp/common/insecure_create_auth_context.cc +++ b/src/cpp/common/insecure_create_auth_context.cc @@ -33,7 +33,7 @@ #include <memory> #include <grpc/grpc.h> -#include <grpc++/auth_context.h> +#include <grpc++/support/auth_context.h> namespace grpc { diff --git a/src/cpp/common/secure_auth_context.h b/src/cpp/common/secure_auth_context.h index 264ed620a3..01b7126189 100644 --- a/src/cpp/common/secure_auth_context.h +++ b/src/cpp/common/secure_auth_context.h @@ -34,7 +34,7 @@ #ifndef GRPC_INTERNAL_CPP_COMMON_SECURE_AUTH_CONTEXT_H #define GRPC_INTERNAL_CPP_COMMON_SECURE_AUTH_CONTEXT_H -#include <grpc++/auth_context.h> +#include <grpc++/support/auth_context.h> struct grpc_auth_context; diff --git a/src/cpp/common/secure_create_auth_context.cc b/src/cpp/common/secure_create_auth_context.cc index d81f4bbc4a..f13d25a1dd 100644 --- a/src/cpp/common/secure_create_auth_context.cc +++ b/src/cpp/common/secure_create_auth_context.cc @@ -34,7 +34,7 @@ #include <grpc/grpc.h> #include <grpc/grpc_security.h> -#include <grpc++/auth_context.h> +#include <grpc++/support/auth_context.h> #include "src/cpp/common/secure_auth_context.h" namespace grpc { diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index 05470ec627..be84c222a0 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -32,7 +32,6 @@ */ #include <grpc++/impl/proto_utils.h> -#include <grpc++/config.h> #include <grpc/grpc.h> #include <grpc/byte_buffer.h> @@ -40,6 +39,7 @@ #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> #include <grpc/support/port_platform.h> +#include <grpc++/support/config.h> const int kMaxBufferLength = 8192; diff --git a/src/cpp/server/async_generic_service.cc b/src/cpp/server/async_generic_service.cc index 2e99afcb5f..6b9ea532b6 100644 --- a/src/cpp/server/async_generic_service.cc +++ b/src/cpp/server/async_generic_service.cc @@ -31,7 +31,7 @@ * */ -#include <grpc++/async_generic_service.h> +#include <grpc++/generic/async_generic_service.h> #include <grpc++/server.h> diff --git a/src/cpp/server/create_default_thread_pool.cc b/src/cpp/server/create_default_thread_pool.cc index 9f59d254f1..f3b07ec8ce 100644 --- a/src/cpp/server/create_default_thread_pool.cc +++ b/src/cpp/server/create_default_thread_pool.cc @@ -32,7 +32,8 @@ */ #include <grpc/support/cpu.h> -#include <grpc++/dynamic_thread_pool.h> + +#include "src/cpp/server/dynamic_thread_pool.h" #ifndef GRPC_CUSTOM_DEFAULT_THREAD_POOL diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc index b475f43b1d..4b226c2992 100644 --- a/src/cpp/server/dynamic_thread_pool.cc +++ b/src/cpp/server/dynamic_thread_pool.cc @@ -33,7 +33,8 @@ #include <grpc++/impl/sync.h> #include <grpc++/impl/thd.h> -#include <grpc++/dynamic_thread_pool.h> + +#include "src/cpp/server/dynamic_thread_pool.h" namespace grpc { DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool) diff --git a/src/cpp/client/channel.h b/src/cpp/server/dynamic_thread_pool.h index 7e406ad788..5ba7533c05 100644 --- a/src/cpp/client/channel.h +++ b/src/cpp/server/dynamic_thread_pool.h @@ -31,50 +31,53 @@ * */ -#ifndef GRPC_INTERNAL_CPP_CLIENT_CHANNEL_H -#define GRPC_INTERNAL_CPP_CLIENT_CHANNEL_H +#ifndef GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H +#define GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H +#include <list> #include <memory> +#include <queue> -#include <grpc++/channel_interface.h> -#include <grpc++/config.h> -#include <grpc++/impl/grpc_library.h> +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> +#include <grpc++/support/config.h> -struct grpc_channel; +#include "src/cpp/server/thread_pool_interface.h" namespace grpc { -class Call; -class CallOpSetInterface; -class ChannelArguments; -class CompletionQueue; -class Credentials; -class StreamContextInterface; -class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface { +class DynamicThreadPool GRPC_FINAL : public ThreadPoolInterface { public: - explicit Channel(grpc_channel* c_channel); - Channel(const grpc::string& host, grpc_channel* c_channel); - ~Channel() GRPC_OVERRIDE; + explicit DynamicThreadPool(int reserve_threads); + ~DynamicThreadPool(); - void* RegisterMethod(const char* method) GRPC_OVERRIDE; - Call CreateCall(const RpcMethod& method, ClientContext* context, - CompletionQueue* cq) GRPC_OVERRIDE; - void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE; - - grpc_connectivity_state GetState(bool try_to_connect) GRPC_OVERRIDE; + void Add(const std::function<void()>& callback) GRPC_OVERRIDE; private: - void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed, - gpr_timespec deadline, CompletionQueue* cq, - void* tag) GRPC_OVERRIDE; + class DynamicThread { + public: + DynamicThread(DynamicThreadPool* pool); + ~DynamicThread(); - bool WaitForStateChangeImpl(grpc_connectivity_state last_observed, - gpr_timespec deadline) GRPC_OVERRIDE; + private: + DynamicThreadPool* pool_; + std::unique_ptr<grpc::thread> thd_; + void ThreadFunc(); + }; + grpc::mutex mu_; + grpc::condition_variable cv_; + grpc::condition_variable shutdown_cv_; + bool shutdown_; + std::queue<std::function<void()>> callbacks_; + int reserve_threads_; + int nthreads_; + int threads_waiting_; + std::list<DynamicThread*> dead_threads_; - const grpc::string host_; - grpc_channel* const c_channel_; // owned + void ThreadFunc(); + static void ReapThreads(std::list<DynamicThread*>* tlist); }; } // namespace grpc -#endif // GRPC_INTERNAL_CPP_CLIENT_CHANNEL_H +#endif // GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H diff --git a/src/cpp/server/fixed_size_thread_pool.cc b/src/cpp/server/fixed_size_thread_pool.cc index bafbc5802a..2bdc44be2e 100644 --- a/src/cpp/server/fixed_size_thread_pool.cc +++ b/src/cpp/server/fixed_size_thread_pool.cc @@ -33,7 +33,7 @@ #include <grpc++/impl/sync.h> #include <grpc++/impl/thd.h> -#include <grpc++/fixed_size_thread_pool.h> +#include "src/cpp/server/fixed_size_thread_pool.h" namespace grpc { diff --git a/src/cpp/server/fixed_size_thread_pool.h b/src/cpp/server/fixed_size_thread_pool.h new file mode 100644 index 0000000000..394ae5821e --- /dev/null +++ b/src/cpp/server/fixed_size_thread_pool.h @@ -0,0 +1,67 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H +#define GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H + +#include <queue> +#include <vector> + +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> +#include <grpc++/support/config.h> + +#include "src/cpp/server/thread_pool_interface.h" + +namespace grpc { + +class FixedSizeThreadPool GRPC_FINAL : public ThreadPoolInterface { + public: + explicit FixedSizeThreadPool(int num_threads); + ~FixedSizeThreadPool(); + + void Add(const std::function<void()>& callback) GRPC_OVERRIDE; + + private: + grpc::mutex mu_; + grpc::condition_variable cv_; + bool shutdown_; + std::queue<std::function<void()>> callbacks_; + std::vector<grpc::thread*> threads_; + + void ThreadFunc(); +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H diff --git a/src/cpp/server/secure_server_credentials.h b/src/cpp/server/secure_server_credentials.h index b9803f107e..d3d37b188d 100644 --- a/src/cpp/server/secure_server_credentials.h +++ b/src/cpp/server/secure_server_credentials.h @@ -34,10 +34,10 @@ #ifndef GRPC_INTERNAL_CPP_SERVER_SECURE_SERVER_CREDENTIALS_H #define GRPC_INTERNAL_CPP_SERVER_SECURE_SERVER_CREDENTIALS_H -#include <grpc/grpc_security.h> - #include <grpc++/server_credentials.h> +#include <grpc/grpc_security.h> + namespace grpc { class SecureServerCredentials GRPC_FINAL : public ServerCredentials { diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index e039c07374..66cd27cc33 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -32,24 +32,71 @@ */ #include <grpc++/server.h> + #include <utility> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc++/completion_queue.h> -#include <grpc++/async_generic_service.h> +#include <grpc++/generic/async_generic_service.h> #include <grpc++/impl/rpc_service_method.h> #include <grpc++/impl/service_type.h> #include <grpc++/server_context.h> #include <grpc++/server_credentials.h> -#include <grpc++/thread_pool_interface.h> -#include <grpc++/time.h> +#include <grpc++/support/time.h> #include "src/core/profiling/timers.h" +#include "src/cpp/server/thread_pool_interface.h" namespace grpc { +class Server::UnimplementedAsyncRequestContext { + protected: + UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {} + + GenericServerContext server_context_; + GenericServerAsyncReaderWriter generic_stream_; +}; + +class Server::UnimplementedAsyncRequest GRPC_FINAL + : public UnimplementedAsyncRequestContext, + public GenericAsyncRequest { + public: + UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq) + : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq, + NULL, false), + server_(server), + cq_(cq) {} + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + + ServerContext* context() { return &server_context_; } + GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } + + private: + Server* const server_; + ServerCompletionQueue* const cq_; +}; + +typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> + UnimplementedAsyncResponseOp; +class Server::UnimplementedAsyncResponse GRPC_FINAL + : public UnimplementedAsyncResponseOp { + public: + UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); + ~UnimplementedAsyncResponse() { delete request_; } + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status); + delete this; + return r; + } + + private: + UnimplementedAsyncRequest* const request_; +}; + class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) { @@ -297,18 +344,23 @@ int Server::AddListeningPort(const grpc::string& addr, return creds->AddPortToServer(addr, server_); } -bool Server::Start() { +bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { GPR_ASSERT(!started_); started_ = true; grpc_server_start(server_); if (!has_generic_service_) { - unknown_method_.reset(new RpcServiceMethod( - "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); - // Use of emplace_back with just constructor arguments is not accepted here - // by gcc-4.4 because it can't match the anonymous nullptr with a proper - // constructor implicitly. Construct the object and use push_back. - sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); + if (!sync_methods_->empty()) { + unknown_method_.reset(new RpcServiceMethod( + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + // Use of emplace_back with just constructor arguments is not accepted + // here by gcc-4.4 because it can't match the anonymous nullptr with a + // proper constructor implicitly. Construct the object and use push_back. + sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); + } + for (size_t i = 0; i < num_cqs; i++) { + new UnimplementedAsyncRequest(this, cqs[i]); + } } // Start processing rpcs. if (!sync_methods_->empty()) { @@ -370,12 +422,14 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { Server::BaseAsyncRequest::BaseAsyncRequest( Server* server, ServerContext* context, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, + bool delete_on_finalize) : server_(server), context_(context), stream_(stream), call_cq_(call_cq), tag_(tag), + delete_on_finalize_(delete_on_finalize), call_(nullptr) { memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); } @@ -402,14 +456,16 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { // just the pointers inside call are copied here stream_->BindCall(&call); *tag = tag_; - delete this; + if (delete_on_finalize_) { + delete this; + } return true; } Server::RegisteredAsyncRequest::RegisteredAsyncRequest( Server* server, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) - : BaseAsyncRequest(server, context, stream, call_cq, tag) {} + : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} void Server::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, @@ -423,8 +479,9 @@ void Server::RegisteredAsyncRequest::IssueRequest( Server::GenericAsyncRequest::GenericAsyncRequest( Server* server, GenericServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) - : BaseAsyncRequest(server, context, stream, call_cq, tag) { + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : BaseAsyncRequest(server, context, stream, call_cq, tag, + delete_on_finalize) { grpc_call_details_init(&call_details_); GPR_ASSERT(notification_cq); GPR_ASSERT(call_cq); @@ -445,6 +502,25 @@ bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) { return BaseAsyncRequest::FinalizeResult(tag, status); } +bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, + bool* status) { + if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) { + new UnimplementedAsyncRequest(server_, cq_); + new UnimplementedAsyncResponse(this); + } else { + delete this; + } + return false; +} + +Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( + UnimplementedAsyncRequest* request) + : request_(request) { + Status status(StatusCode::UNIMPLEMENTED, ""); + UnknownMethodHandler::FillOps(request_->context(), this); + request_->stream()->call_.PerformOps(this); +} + void Server::ScheduleCallback() { { grpc::unique_lock<grpc::mutex> lock(mu_); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 0b11d86173..b739cbfe62 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -37,8 +37,8 @@ #include <grpc/support/log.h> #include <grpc++/impl/service_type.h> #include <grpc++/server.h> -#include <grpc++/thread_pool_interface.h> -#include <grpc++/fixed_size_thread_pool.h> +#include "src/cpp/server/thread_pool_interface.h" +#include "src/cpp/server/fixed_size_thread_pool.h" namespace grpc { @@ -89,10 +89,6 @@ void ServerBuilder::AddListeningPort(const grpc::string& addr, ports_.push_back(port); } -void ServerBuilder::SetThreadPool(ThreadPoolInterface* thread_pool) { - thread_pool_ = thread_pool; -} - std::unique_ptr<Server> ServerBuilder::BuildAndStart() { bool thread_pool_owned = false; if (!async_services_.empty() && !services_.empty()) { @@ -103,12 +99,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { thread_pool_ = CreateDefaultThreadPool(); thread_pool_owned = true; } - // Async services only, create a thread pool to handle requests to unknown - // services. - if (!thread_pool_ && !generic_service_ && !async_services_.empty()) { - thread_pool_ = new FixedSizeThreadPool(1); - thread_pool_owned = true; - } std::unique_ptr<Server> server( new Server(thread_pool_, thread_pool_owned, max_message_size_)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { @@ -138,7 +128,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { *port->selected_port = r; } } - if (!server->Start()) { + if (!server->Start(&cqs_[0], cqs_.size())) { return nullptr; } return server; diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 03461ddda5..acc163d6b5 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -38,7 +38,7 @@ #include <grpc/support/log.h> #include <grpc++/impl/call.h> #include <grpc++/impl/sync.h> -#include <grpc++/time.h> +#include <grpc++/support/time.h> #include "src/core/channel/compress_filter.h" #include "src/cpp/common/create_auth_context.h" diff --git a/src/cpp/server/thread_pool_interface.h b/src/cpp/server/thread_pool_interface.h new file mode 100644 index 0000000000..1ebe30fe2a --- /dev/null +++ b/src/cpp/server/thread_pool_interface.h @@ -0,0 +1,54 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_THREAD_POOL_INTERFACE_H +#define GRPC_INTERNAL_CPP_THREAD_POOL_INTERFACE_H + +#include <functional> + +namespace grpc { + +// A thread pool interface for running callbacks. +class ThreadPoolInterface { + public: + virtual ~ThreadPoolInterface() {} + + // Schedule the given callback for execution. + virtual void Add(const std::function<void()>& callback) = 0; +}; + +ThreadPoolInterface* CreateDefaultThreadPool(); + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_THREAD_POOL_INTERFACE_H diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc index a66c92c3e1..e46e656beb 100644 --- a/src/cpp/util/byte_buffer.cc +++ b/src/cpp/util/byte_buffer.cc @@ -32,7 +32,7 @@ */ #include <grpc/byte_buffer_reader.h> -#include <grpc++/byte_buffer.h> +#include <grpc++/support/byte_buffer.h> namespace grpc { diff --git a/src/cpp/util/slice.cc b/src/cpp/util/slice.cc index 57370dabc6..7e88423b6c 100644 --- a/src/cpp/util/slice.cc +++ b/src/cpp/util/slice.cc @@ -31,7 +31,7 @@ * */ -#include <grpc++/slice.h> +#include <grpc++/support/slice.h> namespace grpc { diff --git a/src/cpp/util/status.cc b/src/cpp/util/status.cc index 5bb9eda3d9..ad9850cf07 100644 --- a/src/cpp/util/status.cc +++ b/src/cpp/util/status.cc @@ -31,7 +31,7 @@ * */ -#include <grpc++/status.h> +#include <grpc++/support/status.h> namespace grpc { diff --git a/src/cpp/util/string_ref.cc b/src/cpp/util/string_ref.cc new file mode 100644 index 0000000000..8483e8c2ee --- /dev/null +++ b/src/cpp/util/string_ref.cc @@ -0,0 +1,111 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/support/string_ref.h> + +#include <string.h> + +#include <algorithm> + +namespace grpc { + +constexpr size_t string_ref::npos; + +string_ref& string_ref::operator=(const string_ref& rhs) { + data_ = rhs.data_; + length_ = rhs.length_; + return *this; +} + +string_ref::string_ref(const char* s) : data_(s), length_(strlen(s)) {} + +string_ref string_ref::substr(size_t pos, size_t n) const { + if (pos > length_) pos = length_; + if (n > (length_ - pos)) n = length_ - pos; + return string_ref(data_ + pos, n); +} + +int string_ref::compare(string_ref x) const { + size_t min_size = length_ < x.length_ ? length_ : x.length_; + int r = memcmp(data_, x.data_, min_size); + if (r < 0) return -1; + if (r > 0) return 1; + if (length_ < x.length_) return -1; + if (length_ > x.length_) return 1; + return 0; +} + +bool string_ref::starts_with(string_ref x) const { + return length_ >= x.length_ && (memcmp(data_, x.data_, x.length_) == 0); +} + +bool string_ref::ends_with(string_ref x) const { + return length_ >= x.length_ && + (memcmp(data_ + (length_ - x.length_), x.data_, x.length_) == 0); +} + +size_t string_ref::find(string_ref s) const { + auto it = std::search(cbegin(), cend(), s.cbegin(), s.cend()); + return it == cend() ? npos : std::distance(cbegin(), it); +} + +size_t string_ref::find(char c) const { + auto it = std::find_if(cbegin(), cend(), [c](char cc) { return cc == c; }); + return it == cend() ? npos : std::distance(cbegin(), it); +} + +bool operator==(string_ref x, string_ref y) { + return x.compare(y) == 0; +} + +bool operator!=(string_ref x, string_ref y) { + return x.compare(y) != 0; +} + +bool operator<(string_ref x, string_ref y) { + return x.compare(y) < 0; +} + +bool operator<=(string_ref x, string_ref y) { + return x.compare(y) <= 0; +} + +bool operator>(string_ref x, string_ref y) { + return x.compare(y) > 0; +} + +bool operator>=(string_ref x, string_ref y) { + return x.compare(y) >= 0; +} + +} // namespace grpc diff --git a/src/cpp/util/time.cc b/src/cpp/util/time.cc index 799c597e0b..b3401eb26b 100644 --- a/src/cpp/util/time.cc +++ b/src/cpp/util/time.cc @@ -31,12 +31,12 @@ * */ -#include <grpc++/config.h> +#include <grpc++/support/config.h> #ifndef GRPC_CXX0X_NO_CHRONO #include <grpc/support/time.h> -#include <grpc++/time.h> +#include <grpc++/support/time.h> using std::chrono::duration_cast; using std::chrono::nanoseconds; |