diff options
Diffstat (limited to 'third_party/grpc/src/cpp/server')
14 files changed, 1814 insertions, 0 deletions
diff --git a/third_party/grpc/src/cpp/server/async_generic_service.cc b/third_party/grpc/src/cpp/server/async_generic_service.cc new file mode 100644 index 0000000000..6b9ea532b6 --- /dev/null +++ b/third_party/grpc/src/cpp/server/async_generic_service.cc @@ -0,0 +1,48 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/generic/async_generic_service.h> + +#include <grpc++/server.h> + +namespace grpc { + +void AsyncGenericService::RequestCall( + GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer, + CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, + void* tag) { + server_->RequestAsyncGenericCall(ctx, reader_writer, call_cq, notification_cq, + tag); +} + +} // namespace grpc diff --git a/third_party/grpc/src/cpp/server/create_default_thread_pool.cc b/third_party/grpc/src/cpp/server/create_default_thread_pool.cc new file mode 100644 index 0000000000..f3b07ec8ce --- /dev/null +++ b/third_party/grpc/src/cpp/server/create_default_thread_pool.cc @@ -0,0 +1,50 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/cpu.h> + +#include "src/cpp/server/dynamic_thread_pool.h" + +#ifndef GRPC_CUSTOM_DEFAULT_THREAD_POOL + +namespace grpc { + +ThreadPoolInterface* CreateDefaultThreadPool() { + int cores = gpr_cpu_num_cores(); + if (!cores) cores = 4; + return new DynamicThreadPool(cores); +} + +} // namespace grpc + +#endif // !GRPC_CUSTOM_DEFAULT_THREAD_POOL diff --git a/third_party/grpc/src/cpp/server/dynamic_thread_pool.cc b/third_party/grpc/src/cpp/server/dynamic_thread_pool.cc new file mode 100644 index 0000000000..4b226c2992 --- /dev/null +++ b/third_party/grpc/src/cpp/server/dynamic_thread_pool.cc @@ -0,0 +1,134 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> + +#include "src/cpp/server/dynamic_thread_pool.h" + +namespace grpc { +DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool) + : pool_(pool), + thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, + this)) {} +DynamicThreadPool::DynamicThread::~DynamicThread() { + thd_->join(); + thd_.reset(); +} + +void DynamicThreadPool::DynamicThread::ThreadFunc() { + pool_->ThreadFunc(); + // Now that we have killed ourselves, we should reduce the thread count + grpc::unique_lock<grpc::mutex> lock(pool_->mu_); + pool_->nthreads_--; + // Move ourselves to dead list + pool_->dead_threads_.push_back(this); + + if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) { + pool_->shutdown_cv_.notify_one(); + } +} + +void DynamicThreadPool::ThreadFunc() { + for (;;) { + // Wait until work is available or we are shutting down. + grpc::unique_lock<grpc::mutex> lock(mu_); + if (!shutdown_ && callbacks_.empty()) { + // If there are too many threads waiting, then quit this thread + if (threads_waiting_ >= reserve_threads_) { + break; + } + threads_waiting_++; + cv_.wait(lock); + threads_waiting_--; + } + // Drain callbacks before considering shutdown to ensure all work + // gets completed. + if (!callbacks_.empty()) { + auto cb = callbacks_.front(); + callbacks_.pop(); + lock.unlock(); + cb(); + } else if (shutdown_) { + break; + } + } +} + +DynamicThreadPool::DynamicThreadPool(int reserve_threads) + : shutdown_(false), + reserve_threads_(reserve_threads), + nthreads_(0), + threads_waiting_(0) { + for (int i = 0; i < reserve_threads_; i++) { + grpc::lock_guard<grpc::mutex> lock(mu_); + nthreads_++; + new DynamicThread(this); + } +} + +void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) { + for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) { + delete *t; + } +} + +DynamicThreadPool::~DynamicThreadPool() { + grpc::unique_lock<grpc::mutex> lock(mu_); + shutdown_ = true; + cv_.notify_all(); + while (nthreads_ != 0) { + shutdown_cv_.wait(lock); + } + ReapThreads(&dead_threads_); +} + +void DynamicThreadPool::Add(const std::function<void()>& callback) { + grpc::lock_guard<grpc::mutex> lock(mu_); + // Add works to the callbacks list + callbacks_.push(callback); + // Increase pool size or notify as needed + if (threads_waiting_ == 0) { + // Kick off a new thread + nthreads_++; + new DynamicThread(this); + } else { + cv_.notify_one(); + } + // Also use this chance to harvest dead threads + if (!dead_threads_.empty()) { + ReapThreads(&dead_threads_); + } +} + +} // namespace grpc diff --git a/third_party/grpc/src/cpp/server/dynamic_thread_pool.h b/third_party/grpc/src/cpp/server/dynamic_thread_pool.h new file mode 100644 index 0000000000..5ba7533c05 --- /dev/null +++ b/third_party/grpc/src/cpp/server/dynamic_thread_pool.h @@ -0,0 +1,83 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H +#define GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H + +#include <list> +#include <memory> +#include <queue> + +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> +#include <grpc++/support/config.h> + +#include "src/cpp/server/thread_pool_interface.h" + +namespace grpc { + +class DynamicThreadPool GRPC_FINAL : public ThreadPoolInterface { + public: + explicit DynamicThreadPool(int reserve_threads); + ~DynamicThreadPool(); + + void Add(const std::function<void()>& callback) GRPC_OVERRIDE; + + private: + class DynamicThread { + public: + DynamicThread(DynamicThreadPool* pool); + ~DynamicThread(); + + private: + DynamicThreadPool* pool_; + std::unique_ptr<grpc::thread> thd_; + void ThreadFunc(); + }; + grpc::mutex mu_; + grpc::condition_variable cv_; + grpc::condition_variable shutdown_cv_; + bool shutdown_; + std::queue<std::function<void()>> callbacks_; + int reserve_threads_; + int nthreads_; + int threads_waiting_; + std::list<DynamicThread*> dead_threads_; + + void ThreadFunc(); + static void ReapThreads(std::list<DynamicThread*>* tlist); +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H diff --git a/third_party/grpc/src/cpp/server/fixed_size_thread_pool.cc b/third_party/grpc/src/cpp/server/fixed_size_thread_pool.cc new file mode 100644 index 0000000000..2bdc44be2e --- /dev/null +++ b/third_party/grpc/src/cpp/server/fixed_size_thread_pool.cc @@ -0,0 +1,85 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> +#include "src/cpp/server/fixed_size_thread_pool.h" + +namespace grpc { + +void FixedSizeThreadPool::ThreadFunc() { + for (;;) { + // Wait until work is available or we are shutting down. + grpc::unique_lock<grpc::mutex> lock(mu_); + if (!shutdown_ && callbacks_.empty()) { + cv_.wait(lock); + } + // Drain callbacks before considering shutdown to ensure all work + // gets completed. + if (!callbacks_.empty()) { + auto cb = callbacks_.front(); + callbacks_.pop(); + lock.unlock(); + cb(); + } else if (shutdown_) { + return; + } + } +} + +FixedSizeThreadPool::FixedSizeThreadPool(int num_threads) : shutdown_(false) { + for (int i = 0; i < num_threads; i++) { + threads_.push_back( + new grpc::thread(&FixedSizeThreadPool::ThreadFunc, this)); + } +} + +FixedSizeThreadPool::~FixedSizeThreadPool() { + { + grpc::lock_guard<grpc::mutex> lock(mu_); + shutdown_ = true; + cv_.notify_all(); + } + for (auto t = threads_.begin(); t != threads_.end(); t++) { + (*t)->join(); + delete *t; + } +} + +void FixedSizeThreadPool::Add(const std::function<void()>& callback) { + grpc::lock_guard<grpc::mutex> lock(mu_); + callbacks_.push(callback); + cv_.notify_one(); +} + +} // namespace grpc diff --git a/third_party/grpc/src/cpp/server/fixed_size_thread_pool.h b/third_party/grpc/src/cpp/server/fixed_size_thread_pool.h new file mode 100644 index 0000000000..394ae5821e --- /dev/null +++ b/third_party/grpc/src/cpp/server/fixed_size_thread_pool.h @@ -0,0 +1,67 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H +#define GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H + +#include <queue> +#include <vector> + +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> +#include <grpc++/support/config.h> + +#include "src/cpp/server/thread_pool_interface.h" + +namespace grpc { + +class FixedSizeThreadPool GRPC_FINAL : public ThreadPoolInterface { + public: + explicit FixedSizeThreadPool(int num_threads); + ~FixedSizeThreadPool(); + + void Add(const std::function<void()>& callback) GRPC_OVERRIDE; + + private: + grpc::mutex mu_; + grpc::condition_variable cv_; + bool shutdown_; + std::queue<std::function<void()>> callbacks_; + std::vector<grpc::thread*> threads_; + + void ThreadFunc(); +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_FIXED_SIZE_THREAD_POOL_H diff --git a/third_party/grpc/src/cpp/server/insecure_server_credentials.cc b/third_party/grpc/src/cpp/server/insecure_server_credentials.cc new file mode 100644 index 0000000000..ef3cae5fd7 --- /dev/null +++ b/third_party/grpc/src/cpp/server/insecure_server_credentials.cc @@ -0,0 +1,60 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/security/server_credentials.h> + +#include <grpc/grpc.h> +#include <grpc/support/log.h> + +namespace grpc { +namespace { +class InsecureServerCredentialsImpl GRPC_FINAL : public ServerCredentials { + public: + int AddPortToServer(const grpc::string& addr, + grpc_server* server) GRPC_OVERRIDE { + return grpc_server_add_insecure_http2_port(server, addr.c_str()); + } + void SetAuthMetadataProcessor( + const std::shared_ptr<AuthMetadataProcessor>& processor) GRPC_OVERRIDE { + (void)processor; + GPR_ASSERT(0); // Should not be called on InsecureServerCredentials. + } +}; +} // namespace + +std::shared_ptr<ServerCredentials> InsecureServerCredentials() { + return std::shared_ptr<ServerCredentials>( + new InsecureServerCredentialsImpl()); +} + +} // namespace grpc diff --git a/third_party/grpc/src/cpp/server/secure_server_credentials.cc b/third_party/grpc/src/cpp/server/secure_server_credentials.cc new file mode 100644 index 0000000000..d472667a7e --- /dev/null +++ b/third_party/grpc/src/cpp/server/secure_server_credentials.cc @@ -0,0 +1,141 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <functional> +#include <map> +#include <memory> + +#include "src/cpp/common/secure_auth_context.h" +#include "src/cpp/server/secure_server_credentials.h" + +#include <grpc++/security/auth_metadata_processor.h> + +namespace grpc { + +void AuthMetadataProcessorAyncWrapper::Destroy(void* wrapper) { + auto* w = reinterpret_cast<AuthMetadataProcessorAyncWrapper*>(wrapper); + delete w; +} + +void AuthMetadataProcessorAyncWrapper::Process( + void* wrapper, grpc_auth_context* context, const grpc_metadata* md, + size_t num_md, grpc_process_auth_metadata_done_cb cb, void* user_data) { + auto* w = reinterpret_cast<AuthMetadataProcessorAyncWrapper*>(wrapper); + if (!w->processor_) { + // Early exit. + cb(user_data, nullptr, 0, nullptr, 0, GRPC_STATUS_OK, nullptr); + return; + } + if (w->processor_->IsBlocking()) { + w->thread_pool_->Add( + std::bind(&AuthMetadataProcessorAyncWrapper::InvokeProcessor, w, + context, md, num_md, cb, user_data)); + } else { + // invoke directly. + w->InvokeProcessor(context, md, num_md, cb, user_data); + } +} + +void AuthMetadataProcessorAyncWrapper::InvokeProcessor( + grpc_auth_context* ctx, const grpc_metadata* md, size_t num_md, + grpc_process_auth_metadata_done_cb cb, void* user_data) { + AuthMetadataProcessor::InputMetadata metadata; + for (size_t i = 0; i < num_md; i++) { + metadata.insert(std::make_pair( + md[i].key, grpc::string_ref(md[i].value, md[i].value_length))); + } + SecureAuthContext context(ctx, false); + AuthMetadataProcessor::OutputMetadata consumed_metadata; + AuthMetadataProcessor::OutputMetadata response_metadata; + + Status status = processor_->Process(metadata, &context, &consumed_metadata, + &response_metadata); + + std::vector<grpc_metadata> consumed_md; + for (auto it = consumed_metadata.begin(); it != consumed_metadata.end(); + ++it) { + grpc_metadata md_entry; + md_entry.key = it->first.c_str(); + md_entry.value = it->second.data(); + md_entry.value_length = it->second.size(); + md_entry.flags = 0; + consumed_md.push_back(md_entry); + } + std::vector<grpc_metadata> response_md; + for (auto it = response_metadata.begin(); it != response_metadata.end(); + ++it) { + grpc_metadata md_entry; + md_entry.key = it->first.c_str(); + md_entry.value = it->second.data(); + md_entry.value_length = it->second.size(); + md_entry.flags = 0; + response_md.push_back(md_entry); + } + auto consumed_md_data = consumed_md.empty() ? nullptr : &consumed_md[0]; + auto response_md_data = response_md.empty() ? nullptr : &response_md[0]; + cb(user_data, consumed_md_data, consumed_md.size(), response_md_data, + response_md.size(), static_cast<grpc_status_code>(status.error_code()), + status.error_message().c_str()); +} + +int SecureServerCredentials::AddPortToServer(const grpc::string& addr, + grpc_server* server) { + return grpc_server_add_secure_http2_port(server, addr.c_str(), creds_); +} + +void SecureServerCredentials::SetAuthMetadataProcessor( + const std::shared_ptr<AuthMetadataProcessor>& processor) { + auto* wrapper = new AuthMetadataProcessorAyncWrapper(processor); + grpc_server_credentials_set_auth_metadata_processor( + creds_, {AuthMetadataProcessorAyncWrapper::Process, + AuthMetadataProcessorAyncWrapper::Destroy, wrapper}); +} + +std::shared_ptr<ServerCredentials> SslServerCredentials( + const SslServerCredentialsOptions& options) { + std::vector<grpc_ssl_pem_key_cert_pair> pem_key_cert_pairs; + for (auto key_cert_pair = options.pem_key_cert_pairs.begin(); + key_cert_pair != options.pem_key_cert_pairs.end(); key_cert_pair++) { + grpc_ssl_pem_key_cert_pair p = {key_cert_pair->private_key.c_str(), + key_cert_pair->cert_chain.c_str()}; + pem_key_cert_pairs.push_back(p); + } + grpc_server_credentials* c_creds = grpc_ssl_server_credentials_create( + options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(), + pem_key_cert_pairs.empty() ? nullptr : &pem_key_cert_pairs[0], + pem_key_cert_pairs.size(), options.force_client_auth, nullptr); + return std::shared_ptr<ServerCredentials>( + new SecureServerCredentials(c_creds)); +} + +} // namespace grpc diff --git a/third_party/grpc/src/cpp/server/secure_server_credentials.h b/third_party/grpc/src/cpp/server/secure_server_credentials.h new file mode 100644 index 0000000000..5460f4a02c --- /dev/null +++ b/third_party/grpc/src/cpp/server/secure_server_credentials.h @@ -0,0 +1,88 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_SERVER_SECURE_SERVER_CREDENTIALS_H +#define GRPC_INTERNAL_CPP_SERVER_SECURE_SERVER_CREDENTIALS_H + +#include <memory> + +#include <grpc++/security/server_credentials.h> + +#include <grpc/grpc_security.h> + +#include "src/cpp/server/thread_pool_interface.h" + +namespace grpc { + +class AuthMetadataProcessorAyncWrapper GRPC_FINAL { + public: + static void Destroy(void* wrapper); + + static void Process(void* wrapper, grpc_auth_context* context, + const grpc_metadata* md, size_t num_md, + grpc_process_auth_metadata_done_cb cb, void* user_data); + + AuthMetadataProcessorAyncWrapper( + const std::shared_ptr<AuthMetadataProcessor>& processor) + : thread_pool_(CreateDefaultThreadPool()), processor_(processor) {} + + private: + void InvokeProcessor(grpc_auth_context* context, const grpc_metadata* md, + size_t num_md, grpc_process_auth_metadata_done_cb cb, + void* user_data); + std::unique_ptr<ThreadPoolInterface> thread_pool_; + std::shared_ptr<AuthMetadataProcessor> processor_; +}; + +class SecureServerCredentials GRPC_FINAL : public ServerCredentials { + public: + explicit SecureServerCredentials(grpc_server_credentials* creds) + : creds_(creds) {} + ~SecureServerCredentials() GRPC_OVERRIDE { + grpc_server_credentials_release(creds_); + } + + int AddPortToServer(const grpc::string& addr, + grpc_server* server) GRPC_OVERRIDE; + + void SetAuthMetadataProcessor( + const std::shared_ptr<AuthMetadataProcessor>& processor) GRPC_OVERRIDE; + + private: + grpc_server_credentials* creds_; + std::unique_ptr<AuthMetadataProcessorAyncWrapper> processor_; +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_SERVER_SECURE_SERVER_CREDENTIALS_H diff --git a/third_party/grpc/src/cpp/server/server.cc b/third_party/grpc/src/cpp/server/server.cc new file mode 100644 index 0000000000..0d31140924 --- /dev/null +++ b/third_party/grpc/src/cpp/server/server.cc @@ -0,0 +1,588 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/server.h> + +#include <utility> + +#include <grpc++/completion_queue.h> +#include <grpc++/generic/async_generic_service.h> +#include <grpc++/impl/codegen/completion_queue_tag.h> +#include <grpc++/impl/grpc_library.h> +#include <grpc++/impl/method_handler_impl.h> +#include <grpc++/impl/rpc_service_method.h> +#include <grpc++/impl/service_type.h> +#include <grpc++/security/server_credentials.h> +#include <grpc++/server_context.h> +#include <grpc++/support/time.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/profiling/timers.h" +#include "src/cpp/server/thread_pool_interface.h" + +namespace grpc { + +class DefaultGlobalCallbacks GRPC_FINAL : public Server::GlobalCallbacks { + public: + ~DefaultGlobalCallbacks() GRPC_OVERRIDE {} + void PreSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {} + void PostSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {} +}; + +static std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr; +static gpr_once g_once_init_callbacks = GPR_ONCE_INIT; + +static void InitGlobalCallbacks() { + if (g_callbacks == nullptr) { + g_callbacks.reset(new DefaultGlobalCallbacks()); + } +} + +class Server::UnimplementedAsyncRequestContext { + protected: + UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {} + + GenericServerContext server_context_; + GenericServerAsyncReaderWriter generic_stream_; +}; + +class Server::UnimplementedAsyncRequest GRPC_FINAL + : public UnimplementedAsyncRequestContext, + public GenericAsyncRequest { + public: + UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq) + : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq, + NULL, false), + server_(server), + cq_(cq) {} + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + + ServerContext* context() { return &server_context_; } + GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } + + private: + Server* const server_; + ServerCompletionQueue* const cq_; +}; + +typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> + UnimplementedAsyncResponseOp; +class Server::UnimplementedAsyncResponse GRPC_FINAL + : public UnimplementedAsyncResponseOp { + public: + UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); + ~UnimplementedAsyncResponse() { delete request_; } + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status); + delete this; + return r; + } + + private: + UnimplementedAsyncRequest* const request_; +}; + +class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag { + public: + bool FinalizeResult(void** tag, bool* status) { + delete this; + return false; + } +}; + +class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { + public: + SyncRequest(RpcServiceMethod* method, void* tag) + : method_(method), + tag_(tag), + in_flight_(false), + has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || + method->method_type() == + RpcMethod::SERVER_STREAMING), + call_details_(nullptr), + cq_(nullptr) { + grpc_metadata_array_init(&request_metadata_); + } + + ~SyncRequest() { + if (call_details_) { + delete call_details_; + } + grpc_metadata_array_destroy(&request_metadata_); + } + + static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { + void* tag = nullptr; + *ok = false; + if (!cq->Next(&tag, ok)) { + return nullptr; + } + auto* mrd = static_cast<SyncRequest*>(tag); + GPR_ASSERT(mrd->in_flight_); + return mrd; + } + + static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok, + gpr_timespec deadline) { + void* tag = nullptr; + *ok = false; + switch (cq->AsyncNext(&tag, ok, deadline)) { + case CompletionQueue::TIMEOUT: + *req = nullptr; + return true; + case CompletionQueue::SHUTDOWN: + *req = nullptr; + return false; + case CompletionQueue::GOT_EVENT: + *req = static_cast<SyncRequest*>(tag); + GPR_ASSERT((*req)->in_flight_); + return true; + } + GPR_UNREACHABLE_CODE(return false); + } + + void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } + + void TeardownRequest() { + grpc_completion_queue_destroy(cq_); + cq_ = nullptr; + } + + void Request(grpc_server* server, grpc_completion_queue* notify_cq) { + GPR_ASSERT(cq_ && !in_flight_); + in_flight_ = true; + if (tag_) { + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_registered_call( + server, tag_, &call_, &deadline_, &request_metadata_, + has_request_payload_ ? &request_payload_ : nullptr, cq_, + notify_cq, this)); + } else { + if (!call_details_) { + call_details_ = new grpc_call_details; + grpc_call_details_init(call_details_); + } + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( + server, &call_, call_details_, + &request_metadata_, cq_, notify_cq, this)); + } + } + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + if (!*status) { + grpc_completion_queue_destroy(cq_); + } + if (call_details_) { + deadline_ = call_details_->deadline; + grpc_call_details_destroy(call_details_); + grpc_call_details_init(call_details_); + } + return true; + } + + class CallData GRPC_FINAL { + public: + explicit CallData(Server* server, SyncRequest* mrd) + : cq_(mrd->cq_), + call_(mrd->call_, server, &cq_, server->max_message_size_), + ctx_(mrd->deadline_, mrd->request_metadata_.metadata, + mrd->request_metadata_.count), + has_request_payload_(mrd->has_request_payload_), + request_payload_(mrd->request_payload_), + method_(mrd->method_) { + ctx_.set_call(mrd->call_); + ctx_.cq_ = &cq_; + GPR_ASSERT(mrd->in_flight_); + mrd->in_flight_ = false; + mrd->request_metadata_.count = 0; + } + + ~CallData() { + if (has_request_payload_ && request_payload_) { + grpc_byte_buffer_destroy(request_payload_); + } + } + + void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) { + ctx_.BeginCompletionOp(&call_); + global_callbacks->PreSynchronousRequest(&ctx_); + method_->handler()->RunHandler(MethodHandler::HandlerParameter( + &call_, &ctx_, request_payload_, call_.max_message_size())); + global_callbacks->PostSynchronousRequest(&ctx_); + request_payload_ = nullptr; + void* ignored_tag; + bool ignored_ok; + cq_.Shutdown(); + GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false); + } + + private: + CompletionQueue cq_; + Call call_; + ServerContext ctx_; + const bool has_request_payload_; + grpc_byte_buffer* request_payload_; + RpcServiceMethod* const method_; + }; + + private: + RpcServiceMethod* const method_; + void* const tag_; + bool in_flight_; + const bool has_request_payload_; + grpc_call* call_; + grpc_call_details* call_details_; + gpr_timespec deadline_; + grpc_metadata_array request_metadata_; + grpc_byte_buffer* request_payload_; + grpc_completion_queue* cq_; +}; + +static grpc_server* CreateServer(const ChannelArguments& args) { + grpc_channel_args channel_args; + args.SetChannelArgs(&channel_args); + return grpc_server_create(&channel_args, nullptr); +} + +static internal::GrpcLibraryInitializer g_gli_initializer; +Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, + int max_message_size, const ChannelArguments& args) + : max_message_size_(max_message_size), + started_(false), + shutdown_(false), + num_running_cb_(0), + sync_methods_(new std::list<SyncRequest>), + has_generic_service_(false), + server_(CreateServer(args)), + thread_pool_(thread_pool), + thread_pool_owned_(thread_pool_owned) { + g_gli_initializer.summon(); + gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); + global_callbacks_ = g_callbacks; + grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); +} + +Server::~Server() { + { + grpc::unique_lock<grpc::mutex> lock(mu_); + if (started_ && !shutdown_) { + lock.unlock(); + Shutdown(); + } else if (!started_) { + cq_.Shutdown(); + } + } + void* got_tag; + bool ok; + GPR_ASSERT(!cq_.Next(&got_tag, &ok)); + grpc_server_destroy(server_); + if (thread_pool_owned_) { + delete thread_pool_; + } + delete sync_methods_; +} + +void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { + GPR_ASSERT(g_callbacks == nullptr); + GPR_ASSERT(callbacks != nullptr); + g_callbacks.reset(callbacks); +} + +bool Server::RegisterService(const grpc::string* host, Service* service) { + bool has_async_methods = service->has_async_methods(); + if (has_async_methods) { + GPR_ASSERT(service->server_ == nullptr && + "Can only register an asynchronous service against one server."); + service->server_ = this; + } + for (auto it = service->methods_.begin(); it != service->methods_.end(); + ++it) { + if (it->get() == nullptr) { // Handled by generic service if any. + continue; + } + RpcServiceMethod* method = it->get(); + void* tag = grpc_server_register_method(server_, method->name(), + host ? host->c_str() : nullptr); + if (tag == nullptr) { + gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", + method->name()); + return false; + } + if (method->handler() == nullptr) { + method->set_server_tag(tag); + } else { + sync_methods_->emplace_back(method, tag); + } + } + return true; +} + +void Server::RegisterAsyncGenericService(AsyncGenericService* service) { + GPR_ASSERT(service->server_ == nullptr && + "Can only register an async generic service against one server."); + service->server_ = this; + has_generic_service_ = true; +} + +int Server::AddListeningPort(const grpc::string& addr, + ServerCredentials* creds) { + GPR_ASSERT(!started_); + return creds->AddPortToServer(addr, server_); +} + +bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { + GPR_ASSERT(!started_); + started_ = true; + grpc_server_start(server_); + + if (!has_generic_service_) { + if (!sync_methods_->empty()) { + unknown_method_.reset(new RpcServiceMethod( + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + // Use of emplace_back with just constructor arguments is not accepted + // here by gcc-4.4 because it can't match the anonymous nullptr with a + // proper constructor implicitly. Construct the object and use push_back. + sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); + } + for (size_t i = 0; i < num_cqs; i++) { + new UnimplementedAsyncRequest(this, cqs[i]); + } + } + // Start processing rpcs. + if (!sync_methods_->empty()) { + for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { + m->SetupRequest(); + m->Request(server_, cq_.cq()); + } + + ScheduleCallback(); + } + + return true; +} + +void Server::ShutdownInternal(gpr_timespec deadline) { + grpc::unique_lock<grpc::mutex> lock(mu_); + if (started_ && !shutdown_) { + shutdown_ = true; + grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); + cq_.Shutdown(); + lock.unlock(); + // Spin, eating requests until the completion queue is completely shutdown. + // If the deadline expires then cancel anything that's pending and keep + // spinning forever until the work is actually drained. + // Since nothing else needs to touch state guarded by mu_, holding it + // through this loop is fine. + SyncRequest* request; + bool ok; + while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { + if (request == NULL) { // deadline expired + grpc_server_cancel_all_calls(server_); + deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + } else if (ok) { + SyncRequest::CallData call_data(this, request); + } + } + lock.lock(); + + // Wait for running callbacks to finish. + while (num_running_cb_ != 0) { + callback_cv_.wait(lock); + } + } +} + +void Server::Wait() { + grpc::unique_lock<grpc::mutex> lock(mu_); + while (num_running_cb_ != 0) { + callback_cv_.wait(lock); + } +} + +void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { + static const size_t MAX_OPS = 8; + size_t nops = 0; + grpc_op cops[MAX_OPS]; + ops->FillOps(cops, &nops); + auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); + GPR_ASSERT(GRPC_CALL_OK == result); +} + +ServerInterface::BaseAsyncRequest::BaseAsyncRequest( + ServerInterface* server, ServerContext* context, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag, + bool delete_on_finalize) + : server_(server), + context_(context), + stream_(stream), + call_cq_(call_cq), + tag_(tag), + delete_on_finalize_(delete_on_finalize), + call_(nullptr) { + memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); +} + +bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, + bool* status) { + if (*status) { + for (size_t i = 0; i < initial_metadata_array_.count; i++) { + context_->client_metadata_.insert( + std::pair<grpc::string_ref, grpc::string_ref>( + initial_metadata_array_.metadata[i].key, + grpc::string_ref( + initial_metadata_array_.metadata[i].value, + initial_metadata_array_.metadata[i].value_length))); + } + } + grpc_metadata_array_destroy(&initial_metadata_array_); + context_->set_call(call_); + context_->cq_ = call_cq_; + Call call(call_, server_, call_cq_, server_->max_message_size()); + if (*status && call_) { + context_->BeginCompletionOp(&call); + } + // just the pointers inside call are copied here + stream_->BindCall(&call); + *tag = tag_; + if (delete_on_finalize_) { + delete this; + } + return true; +} + +ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( + ServerInterface* server, ServerContext* context, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {} + +void ServerInterface::RegisteredAsyncRequest::IssueRequest( + void* registered_method, grpc_byte_buffer** payload, + ServerCompletionQueue* notification_cq) { + grpc_server_request_registered_call( + server_->server(), registered_method, &call_, &context_->deadline_, + &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(), + this); +} + +ServerInterface::GenericAsyncRequest::GenericAsyncRequest( + ServerInterface* server, GenericServerContext* context, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize) + : BaseAsyncRequest(server, context, stream, call_cq, tag, + delete_on_finalize) { + grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); + grpc_server_request_call(server->server(), &call_, &call_details_, + &initial_metadata_array_, call_cq->cq(), + notification_cq->cq(), this); +} + +bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, + bool* status) { + // TODO(yangg) remove the copy here. + if (*status) { + static_cast<GenericServerContext*>(context_)->method_ = + call_details_.method; + static_cast<GenericServerContext*>(context_)->host_ = call_details_.host; + } + gpr_free(call_details_.method); + gpr_free(call_details_.host); + return BaseAsyncRequest::FinalizeResult(tag, status); +} + +bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag, + bool* status) { + if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) { + new UnimplementedAsyncRequest(server_, cq_); + new UnimplementedAsyncResponse(this); + } else { + delete this; + } + return false; +} + +Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( + UnimplementedAsyncRequest* request) + : request_(request) { + Status status(StatusCode::UNIMPLEMENTED, ""); + UnknownMethodHandler::FillOps(request_->context(), this); + request_->stream()->call_.PerformOps(this); +} + +void Server::ScheduleCallback() { + { + grpc::unique_lock<grpc::mutex> lock(mu_); + num_running_cb_++; + } + thread_pool_->Add(std::bind(&Server::RunRpc, this)); +} + +void Server::RunRpc() { + // Wait for one more incoming rpc. + bool ok; + GPR_TIMER_SCOPE("Server::RunRpc", 0); + auto* mrd = SyncRequest::Wait(&cq_, &ok); + if (mrd) { + ScheduleCallback(); + if (ok) { + SyncRequest::CallData cd(this, mrd); + { + mrd->SetupRequest(); + grpc::unique_lock<grpc::mutex> lock(mu_); + if (!shutdown_) { + mrd->Request(server_, cq_.cq()); + } else { + // destroy the structure that was created + mrd->TeardownRequest(); + } + } + GPR_TIMER_SCOPE("cd.Run()", 0); + cd.Run(global_callbacks_); + } + } + + { + grpc::unique_lock<grpc::mutex> lock(mu_); + num_running_cb_--; + if (shutdown_) { + callback_cv_.notify_all(); + } + } +} + +} // namespace grpc diff --git a/third_party/grpc/src/cpp/server/server_builder.cc b/third_party/grpc/src/cpp/server/server_builder.cc new file mode 100644 index 0000000000..a8c188e5a5 --- /dev/null +++ b/third_party/grpc/src/cpp/server/server_builder.cc @@ -0,0 +1,144 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/server_builder.h> + +#include <grpc/support/cpu.h> +#include <grpc/support/log.h> +#include <grpc++/impl/service_type.h> +#include <grpc++/server.h> +#include "src/cpp/server/thread_pool_interface.h" +#include "src/cpp/server/fixed_size_thread_pool.h" + +namespace grpc { + +ServerBuilder::ServerBuilder() + : max_message_size_(-1), generic_service_(nullptr) { + grpc_compression_options_init(&compression_options_); +} + +std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() { + ServerCompletionQueue* cq = new ServerCompletionQueue(); + cqs_.push_back(cq); + return std::unique_ptr<ServerCompletionQueue>(cq); +} + +void ServerBuilder::RegisterService(Service* service) { + services_.emplace_back(new NamedService(service)); +} + +void ServerBuilder::RegisterService(const grpc::string& addr, + Service* service) { + services_.emplace_back(new NamedService(addr, service)); +} + +void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) { + if (generic_service_) { + gpr_log(GPR_ERROR, + "Adding multiple AsyncGenericService is unsupported for now. " + "Dropping the service %p", + service); + return; + } + generic_service_ = service; +} + +void ServerBuilder::SetOption(std::unique_ptr<ServerBuilderOption> option) { + options_.push_back(std::move(option)); +} + +void ServerBuilder::AddListeningPort(const grpc::string& addr, + std::shared_ptr<ServerCredentials> creds, + int* selected_port) { + Port port = {addr, creds, selected_port}; + ports_.push_back(port); +} + +std::unique_ptr<Server> ServerBuilder::BuildAndStart() { + std::unique_ptr<ThreadPoolInterface> thread_pool; + for (auto it = services_.begin(); it != services_.end(); ++it) { + if ((*it)->service->has_synchronous_methods()) { + if (thread_pool == nullptr) { + thread_pool.reset(CreateDefaultThreadPool()); + break; + } + } + } + ChannelArguments args; + for (auto option = options_.begin(); option != options_.end(); ++option) { + (*option)->UpdateArguments(&args); + } + if (max_message_size_ > 0) { + args.SetInt(GRPC_ARG_MAX_MESSAGE_LENGTH, max_message_size_); + } + args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, + compression_options_.enabled_algorithms_bitset); + std::unique_ptr<Server> server( + new Server(thread_pool.release(), true, max_message_size_, args)); + for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { + grpc_server_register_completion_queue(server->server_, (*cq)->cq(), + nullptr); + } + for (auto service = services_.begin(); service != services_.end(); + service++) { + if (!server->RegisterService((*service)->host.get(), (*service)->service)) { + return nullptr; + } + } + if (generic_service_) { + server->RegisterAsyncGenericService(generic_service_); + } else { + for (auto it = services_.begin(); it != services_.end(); ++it) { + if ((*it)->service->has_generic_methods()) { + gpr_log(GPR_ERROR, + "Some methods were marked generic but there is no " + "generic service registered."); + return nullptr; + } + } + } + for (auto port = ports_.begin(); port != ports_.end(); port++) { + int r = server->AddListeningPort(port->addr, port->creds.get()); + if (!r) return nullptr; + if (port->selected_port != nullptr) { + *port->selected_port = r; + } + } + auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0]; + if (!server->Start(cqs_data, cqs_.size())) { + return nullptr; + } + return server; +} + +} // namespace grpc diff --git a/third_party/grpc/src/cpp/server/server_context.cc b/third_party/grpc/src/cpp/server/server_context.cc new file mode 100644 index 0000000000..e205a1969b --- /dev/null +++ b/third_party/grpc/src/cpp/server/server_context.cc @@ -0,0 +1,232 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/server_context.h> + +#include <grpc++/completion_queue.h> +#include <grpc++/impl/call.h> +#include <grpc++/impl/sync.h> +#include <grpc++/support/time.h> +#include <grpc/compression.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/channel/compress_filter.h" +#include "src/cpp/common/create_auth_context.h" + +namespace grpc { + +// CompletionOp + +class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { + public: + // initial refs: one in the server context, one in the cq + CompletionOp() + : has_tag_(false), + tag_(nullptr), + refs_(2), + finalized_(false), + cancelled_(0) {} + + void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE; + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + + bool CheckCancelled(CompletionQueue* cq); + + void set_tag(void* tag) { + has_tag_ = true; + tag_ = tag; + } + + void Unref(); + + private: + bool has_tag_; + void* tag_; + grpc::mutex mu_; + int refs_; + bool finalized_; + int cancelled_; +}; + +void ServerContext::CompletionOp::Unref() { + grpc::unique_lock<grpc::mutex> lock(mu_); + if (--refs_ == 0) { + lock.unlock(); + delete this; + } +} + +bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) { + cq->TryPluck(this); + grpc::lock_guard<grpc::mutex> g(mu_); + return finalized_ ? cancelled_ != 0 : false; +} + +void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { + ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops->data.recv_close_on_server.cancelled = &cancelled_; + ops->flags = 0; + ops->reserved = NULL; + *nops = 1; +} + +bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { + grpc::unique_lock<grpc::mutex> lock(mu_); + finalized_ = true; + bool ret = false; + if (has_tag_) { + *tag = tag_; + ret = true; + } + if (!*status) cancelled_ = 1; + if (--refs_ == 0) { + lock.unlock(); + delete this; + } + return ret; +} + +// ServerContext body + +ServerContext::ServerContext() + : completion_op_(nullptr), + has_notify_when_done_tag_(false), + async_notify_when_done_tag_(nullptr), + deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)), + call_(nullptr), + cq_(nullptr), + sent_initial_metadata_(false) {} + +ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata* metadata, + size_t metadata_count) + : completion_op_(nullptr), + has_notify_when_done_tag_(false), + async_notify_when_done_tag_(nullptr), + deadline_(deadline), + call_(nullptr), + cq_(nullptr), + sent_initial_metadata_(false) { + for (size_t i = 0; i < metadata_count; i++) { + client_metadata_.insert(std::pair<grpc::string_ref, grpc::string_ref>( + metadata[i].key, + grpc::string_ref(metadata[i].value, metadata[i].value_length))); + } +} + +ServerContext::~ServerContext() { + if (call_) { + grpc_call_destroy(call_); + } + if (completion_op_) { + completion_op_->Unref(); + } +} + +void ServerContext::BeginCompletionOp(Call* call) { + GPR_ASSERT(!completion_op_); + completion_op_ = new CompletionOp(); + if (has_notify_when_done_tag_) { + completion_op_->set_tag(async_notify_when_done_tag_); + } + call->PerformOps(completion_op_); +} + +void ServerContext::AddInitialMetadata(const grpc::string& key, + const grpc::string& value) { + initial_metadata_.insert(std::make_pair(key, value)); +} + +void ServerContext::AddTrailingMetadata(const grpc::string& key, + const grpc::string& value) { + trailing_metadata_.insert(std::make_pair(key, value)); +} + +void ServerContext::TryCancel() const { + grpc_call_error err = grpc_call_cancel_with_status( + call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", NULL); + if (err != GRPC_CALL_OK) { + gpr_log(GPR_ERROR, "TryCancel failed with: %d", err); + } +} + +bool ServerContext::IsCancelled() const { + return completion_op_ && completion_op_->CheckCancelled(cq_); +} + +void ServerContext::set_compression_level(grpc_compression_level level) { + const grpc_compression_algorithm algorithm_for_level = + grpc_compression_algorithm_for_level(level); + set_compression_algorithm(algorithm_for_level); +} + +void ServerContext::set_compression_algorithm( + grpc_compression_algorithm algorithm) { + char* algorithm_name = NULL; + if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) { + gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.", + algorithm); + abort(); + } + GPR_ASSERT(algorithm_name != NULL); + AddInitialMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name); +} + +void ServerContext::set_call(grpc_call* call) { + call_ = call; + auth_context_ = CreateAuthContext(call); +} + +std::shared_ptr<const AuthContext> ServerContext::auth_context() const { + if (auth_context_.get() == nullptr) { + auth_context_ = CreateAuthContext(call_); + } + return auth_context_; +} + +grpc::string ServerContext::peer() const { + grpc::string peer; + if (call_) { + char* c_peer = grpc_call_get_peer(call_); + peer = c_peer; + gpr_free(c_peer); + } + return peer; +} + +const struct census_context* ServerContext::census_context() const { + return grpc_census_call_get_context(call_); +} + +} // namespace grpc diff --git a/third_party/grpc/src/cpp/server/server_credentials.cc b/third_party/grpc/src/cpp/server/server_credentials.cc new file mode 100644 index 0000000000..8495916178 --- /dev/null +++ b/third_party/grpc/src/cpp/server/server_credentials.cc @@ -0,0 +1,40 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/security/server_credentials.h> + +namespace grpc { + +ServerCredentials::~ServerCredentials() {} + +} // namespace grpc diff --git a/third_party/grpc/src/cpp/server/thread_pool_interface.h b/third_party/grpc/src/cpp/server/thread_pool_interface.h new file mode 100644 index 0000000000..1ebe30fe2a --- /dev/null +++ b/third_party/grpc/src/cpp/server/thread_pool_interface.h @@ -0,0 +1,54 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_THREAD_POOL_INTERFACE_H +#define GRPC_INTERNAL_CPP_THREAD_POOL_INTERFACE_H + +#include <functional> + +namespace grpc { + +// A thread pool interface for running callbacks. +class ThreadPoolInterface { + public: + virtual ~ThreadPoolInterface() {} + + // Schedule the given callback for execution. + virtual void Add(const std::function<void()>& callback) = 0; +}; + +ThreadPoolInterface* CreateDefaultThreadPool(); + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_THREAD_POOL_INTERFACE_H |