diff options
author | yang-g <yangg@google.com> | 2015-07-16 21:03:49 -0700 |
---|---|---|
committer | yang-g <yangg@google.com> | 2015-07-16 21:03:49 -0700 |
commit | fb58d239ef360b4fcdeeda358e46bb107ceceaa4 (patch) | |
tree | 5768a672fc92eacf5c4a573b3d52c1b3723bc738 /include | |
parent | 7ef7232dd9eba4be7d81b59a6bb59c4fd634ea08 (diff) | |
parent | 65ef0fffaecf8396cb93b3a76416d099f7b07438 (diff) |
Merge with head
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc++/fixed_size_thread_pool.h | 67 | ||||
-rw-r--r-- | include/grpc++/impl/call.h | 141 | ||||
-rw-r--r-- | include/grpc++/server_context.h | 6 | ||||
-rw-r--r-- | include/grpc++/stream.h | 26 | ||||
-rw-r--r-- | include/grpc++/thread_pool_interface.h | 2 |
5 files changed, 218 insertions, 24 deletions
diff --git a/include/grpc++/fixed_size_thread_pool.h b/include/grpc++/fixed_size_thread_pool.h new file mode 100644 index 0000000000..9f0cbfbae9 --- /dev/null +++ b/include/grpc++/fixed_size_thread_pool.h @@ -0,0 +1,67 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_SERVER_THREAD_POOL_H +#define GRPC_INTERNAL_CPP_SERVER_THREAD_POOL_H + +#include <grpc++/config.h> + +#include <grpc++/impl/sync.h> +#include <grpc++/impl/thd.h> +#include <grpc++/thread_pool_interface.h> + +#include <queue> +#include <vector> + +namespace grpc { + +class FixedSizeThreadPool GRPC_FINAL : public ThreadPoolInterface { + public: + explicit FixedSizeThreadPool(int num_threads); + ~FixedSizeThreadPool(); + + void ScheduleCallback(const std::function<void()>& callback) GRPC_OVERRIDE; + + private: + grpc::mutex mu_; + grpc::condition_variable cv_; + bool shutdown_; + std::queue<std::function<void()>> callbacks_; + std::vector<grpc::thread*> threads_; + + void ThreadFunc(); +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_SERVER_THREAD_POOL_H diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index 64fa5d6efb..1fa4490779 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -60,6 +60,93 @@ void FillMetadataMap(grpc_metadata_array* arr, grpc_metadata* FillMetadataArray( const std::multimap<grpc::string, grpc::string>& metadata); +/// Per-message write options. +class WriteOptions { + public: + WriteOptions() : flags_(0) {} + WriteOptions(const WriteOptions& other) : flags_(other.flags_) {} + + /// Clear all flags. + inline void Clear() { + flags_ = 0; + } + + /// Returns raw flags bitset. + inline gpr_uint32 flags() const { + return flags_; + } + + /// Sets flag for the disabling of compression for the next message write. + /// + /// \sa GRPC_WRITE_NO_COMPRESS + inline WriteOptions& set_no_compression() { + SetBit(GRPC_WRITE_NO_COMPRESS); + return *this; + } + + /// Clears flag for the disabling of compression for the next message write. + /// + /// \sa GRPC_WRITE_NO_COMPRESS + inline WriteOptions& clear_no_compression() { + ClearBit(GRPC_WRITE_NO_COMPRESS); + return *this; + } + + /// Get value for the flag indicating whether compression for the next + /// message write is forcefully disabled. + /// + /// \sa GRPC_WRITE_NO_COMPRESS + inline bool get_no_compression() const { + return GetBit(GRPC_WRITE_NO_COMPRESS); + } + + /// Sets flag indicating that the write may be buffered and need not go out on + /// the wire immediately. + /// + /// \sa GRPC_WRITE_BUFFER_HINT + inline WriteOptions& set_buffer_hint() { + SetBit(GRPC_WRITE_BUFFER_HINT); + return *this; + } + + /// Clears flag indicating that the write may be buffered and need not go out + /// on the wire immediately. + /// + /// \sa GRPC_WRITE_BUFFER_HINT + inline WriteOptions& clear_buffer_hint() { + ClearBit(GRPC_WRITE_BUFFER_HINT); + return *this; + } + + /// Get value for the flag indicating that the write may be buffered and need + /// not go out on the wire immediately. + /// + /// \sa GRPC_WRITE_BUFFER_HINT + inline bool get_buffer_hint() const { + return GetBit(GRPC_WRITE_BUFFER_HINT); + } + + WriteOptions& operator=(const WriteOptions& rhs) { + flags_ = rhs.flags_; + return *this; + } + + private: + void SetBit(const gpr_int32 mask) { + flags_ |= mask; + } + + void ClearBit(const gpr_int32 mask) { + flags_ &= ~mask; + } + + bool GetBit(const gpr_int32 mask) const { + return flags_ & mask; + } + + gpr_uint32 flags_; +}; + /// Default argument for CallOpSet. I is unused by the class, but can be /// used for generating multiple names for the same thing. template <int I> @@ -104,6 +191,12 @@ class CallOpSendMessage { public: CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {} + /// Send \a message using \a options for the write. The \a options are cleared + /// after use. + template <class M> + Status SendMessage(const M& message, + const WriteOptions& options) GRPC_MUST_USE_RESULT; + template <class M> Status SendMessage(const M& message) GRPC_MUST_USE_RESULT; @@ -112,8 +205,10 @@ class CallOpSendMessage { if (send_buf_ == nullptr) return; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_MESSAGE; - op->flags = 0; + op->flags = write_options_.flags(); op->data.send_message = send_buf_; + // Flags are per-message: clear them after use. + write_options_.Clear(); } void FinishOp(bool* status, int max_message_size) { if (own_buf_) grpc_byte_buffer_destroy(send_buf_); @@ -122,14 +217,22 @@ class CallOpSendMessage { private: grpc_byte_buffer* send_buf_; + WriteOptions write_options_; bool own_buf_; }; template <class M> -Status CallOpSendMessage::SendMessage(const M& message) { +Status CallOpSendMessage::SendMessage(const M& message, + const WriteOptions& options) { + write_options_ = options; return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_); } +template <class M> +Status CallOpSendMessage::SendMessage(const M& message) { + return SendMessage(message, WriteOptions()); +} + template <class R> class CallOpRecvMessage { public: @@ -172,17 +275,34 @@ class CallOpRecvMessage { grpc_byte_buffer* recv_buf_; }; +namespace CallOpGenericRecvMessageHelper { +class DeserializeFunc { + public: + virtual Status Deserialize(grpc_byte_buffer* buf, int max_message_size) = 0; +}; + +template <class R> +class DeserializeFuncType GRPC_FINAL : public DeserializeFunc { + public: + DeserializeFuncType(R* message) : message_(message) {} + Status Deserialize(grpc_byte_buffer* buf, + int max_message_size) GRPC_OVERRIDE { + return SerializationTraits<R>::Deserialize(buf, message_, max_message_size); + } + + private: + R* message_; // Not a managed pointer because management is external to this +}; +} // namespace CallOpGenericRecvMessageHelper + class CallOpGenericRecvMessage { public: CallOpGenericRecvMessage() : got_message(false) {} template <class R> void RecvMessage(R* message) { - deserialize_ = [message](grpc_byte_buffer* buf, - int max_message_size) -> Status { - return SerializationTraits<R>::Deserialize(buf, message, - max_message_size); - }; + deserialize_.reset( + new CallOpGenericRecvMessageHelper::DeserializeFuncType<R>(message)); } bool got_message; @@ -201,7 +321,7 @@ class CallOpGenericRecvMessage { if (recv_buf_) { if (*status) { got_message = true; - *status = deserialize_(recv_buf_, max_message_size).ok(); + *status = deserialize_->Deserialize(recv_buf_, max_message_size).ok(); } else { got_message = false; grpc_byte_buffer_destroy(recv_buf_); @@ -210,12 +330,11 @@ class CallOpGenericRecvMessage { got_message = false; *status = false; } - deserialize_ = DeserializeFunc(); + deserialize_.reset(); } private: - typedef std::function<Status(grpc_byte_buffer*, int)> DeserializeFunc; - DeserializeFunc deserialize_; + std::unique_ptr<CallOpGenericRecvMessageHelper::DeserializeFunc> deserialize_; grpc_byte_buffer* recv_buf_; }; diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index a4ee986df1..2123d03291 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -99,9 +99,7 @@ class ServerContext { return client_metadata_; } - std::shared_ptr<const AuthContext> auth_context() const { - return auth_context_; - } + std::shared_ptr<const AuthContext> auth_context() const; private: friend class ::grpc::Server; @@ -147,7 +145,7 @@ class ServerContext { grpc_call* call_; CompletionQueue* cq_; bool sent_initial_metadata_; - std::shared_ptr<const AuthContext> auth_context_; + mutable std::shared_ptr<const AuthContext> auth_context_; std::multimap<grpc::string, grpc::string> client_metadata_; std::multimap<grpc::string, grpc::string> initial_metadata_; std::multimap<grpc::string, grpc::string> trailing_metadata_; diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index dd5e52d6d3..3903f2ec06 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -79,7 +79,11 @@ class WriterInterface { // Blocking write msg to the stream. Returns true on success. // Returns false when the stream has been closed. - virtual bool Write(const W& msg) = 0; + virtual bool Write(const W& msg, const WriteOptions& options) = 0; + + inline bool Write(const W& msg) { + return Write(msg, WriteOptions()); + } }; template <class R> @@ -168,9 +172,10 @@ class ClientWriter : public ClientWriterInterface<W> { cq_.Pluck(&ops); } - bool Write(const W& msg) GRPC_OVERRIDE { + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { CallOpSet<CallOpSendMessage> ops; - if (!ops.SendMessage(msg).ok()) { + if (!ops.SendMessage(msg, options).ok()) { return false; } call_.PerformOps(&ops); @@ -246,9 +251,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { return cq_.Pluck(&ops) && ops.got_message; } - bool Write(const W& msg) GRPC_OVERRIDE { + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { CallOpSet<CallOpSendMessage> ops; - if (!ops.SendMessage(msg).ok()) return false; + if (!ops.SendMessage(msg, options).ok()) return false; call_.PerformOps(&ops); return cq_.Pluck(&ops); } @@ -317,9 +323,10 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> { call_->cq()->Pluck(&ops); } - bool Write(const W& msg) GRPC_OVERRIDE { + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; - if (!ops.SendMessage(msg).ok()) { + if (!ops.SendMessage(msg, options).ok()) { return false; } if (!ctx_->sent_initial_metadata_) { @@ -359,9 +366,10 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, return call_->cq()->Pluck(&ops) && ops.got_message; } - bool Write(const W& msg) GRPC_OVERRIDE { + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; - if (!ops.SendMessage(msg).ok()) { + if (!ops.SendMessage(msg, options).ok()) { return false; } if (!ctx_->sent_initial_metadata_) { diff --git a/include/grpc++/thread_pool_interface.h b/include/grpc++/thread_pool_interface.h index ead307f6a2..ac4458d530 100644 --- a/include/grpc++/thread_pool_interface.h +++ b/include/grpc++/thread_pool_interface.h @@ -47,6 +47,8 @@ class ThreadPoolInterface { virtual void ScheduleCallback(const std::function<void()>& callback) = 0; }; +ThreadPoolInterface* CreateDefaultThreadPool(); + } // namespace grpc #endif // GRPCXX_THREAD_POOL_INTERFACE_H |