diff options
Diffstat (limited to 'include/grpc++/impl/call.h')
-rw-r--r-- | include/grpc++/impl/call.h | 141 |
1 files changed, 130 insertions, 11 deletions
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index 64fa5d6efb..1fa4490779 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -60,6 +60,93 @@ void FillMetadataMap(grpc_metadata_array* arr, grpc_metadata* FillMetadataArray( const std::multimap<grpc::string, grpc::string>& metadata); +/// Per-message write options. +class WriteOptions { + public: + WriteOptions() : flags_(0) {} + WriteOptions(const WriteOptions& other) : flags_(other.flags_) {} + + /// Clear all flags. + inline void Clear() { + flags_ = 0; + } + + /// Returns raw flags bitset. + inline gpr_uint32 flags() const { + return flags_; + } + + /// Sets flag for the disabling of compression for the next message write. + /// + /// \sa GRPC_WRITE_NO_COMPRESS + inline WriteOptions& set_no_compression() { + SetBit(GRPC_WRITE_NO_COMPRESS); + return *this; + } + + /// Clears flag for the disabling of compression for the next message write. + /// + /// \sa GRPC_WRITE_NO_COMPRESS + inline WriteOptions& clear_no_compression() { + ClearBit(GRPC_WRITE_NO_COMPRESS); + return *this; + } + + /// Get value for the flag indicating whether compression for the next + /// message write is forcefully disabled. + /// + /// \sa GRPC_WRITE_NO_COMPRESS + inline bool get_no_compression() const { + return GetBit(GRPC_WRITE_NO_COMPRESS); + } + + /// Sets flag indicating that the write may be buffered and need not go out on + /// the wire immediately. + /// + /// \sa GRPC_WRITE_BUFFER_HINT + inline WriteOptions& set_buffer_hint() { + SetBit(GRPC_WRITE_BUFFER_HINT); + return *this; + } + + /// Clears flag indicating that the write may be buffered and need not go out + /// on the wire immediately. + /// + /// \sa GRPC_WRITE_BUFFER_HINT + inline WriteOptions& clear_buffer_hint() { + ClearBit(GRPC_WRITE_BUFFER_HINT); + return *this; + } + + /// Get value for the flag indicating that the write may be buffered and need + /// not go out on the wire immediately. + /// + /// \sa GRPC_WRITE_BUFFER_HINT + inline bool get_buffer_hint() const { + return GetBit(GRPC_WRITE_BUFFER_HINT); + } + + WriteOptions& operator=(const WriteOptions& rhs) { + flags_ = rhs.flags_; + return *this; + } + + private: + void SetBit(const gpr_int32 mask) { + flags_ |= mask; + } + + void ClearBit(const gpr_int32 mask) { + flags_ &= ~mask; + } + + bool GetBit(const gpr_int32 mask) const { + return flags_ & mask; + } + + gpr_uint32 flags_; +}; + /// Default argument for CallOpSet. I is unused by the class, but can be /// used for generating multiple names for the same thing. template <int I> @@ -104,6 +191,12 @@ class CallOpSendMessage { public: CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {} + /// Send \a message using \a options for the write. The \a options are cleared + /// after use. + template <class M> + Status SendMessage(const M& message, + const WriteOptions& options) GRPC_MUST_USE_RESULT; + template <class M> Status SendMessage(const M& message) GRPC_MUST_USE_RESULT; @@ -112,8 +205,10 @@ class CallOpSendMessage { if (send_buf_ == nullptr) return; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_MESSAGE; - op->flags = 0; + op->flags = write_options_.flags(); op->data.send_message = send_buf_; + // Flags are per-message: clear them after use. + write_options_.Clear(); } void FinishOp(bool* status, int max_message_size) { if (own_buf_) grpc_byte_buffer_destroy(send_buf_); @@ -122,14 +217,22 @@ class CallOpSendMessage { private: grpc_byte_buffer* send_buf_; + WriteOptions write_options_; bool own_buf_; }; template <class M> -Status CallOpSendMessage::SendMessage(const M& message) { +Status CallOpSendMessage::SendMessage(const M& message, + const WriteOptions& options) { + write_options_ = options; return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_); } +template <class M> +Status CallOpSendMessage::SendMessage(const M& message) { + return SendMessage(message, WriteOptions()); +} + template <class R> class CallOpRecvMessage { public: @@ -172,17 +275,34 @@ class CallOpRecvMessage { grpc_byte_buffer* recv_buf_; }; +namespace CallOpGenericRecvMessageHelper { +class DeserializeFunc { + public: + virtual Status Deserialize(grpc_byte_buffer* buf, int max_message_size) = 0; +}; + +template <class R> +class DeserializeFuncType GRPC_FINAL : public DeserializeFunc { + public: + DeserializeFuncType(R* message) : message_(message) {} + Status Deserialize(grpc_byte_buffer* buf, + int max_message_size) GRPC_OVERRIDE { + return SerializationTraits<R>::Deserialize(buf, message_, max_message_size); + } + + private: + R* message_; // Not a managed pointer because management is external to this +}; +} // namespace CallOpGenericRecvMessageHelper + class CallOpGenericRecvMessage { public: CallOpGenericRecvMessage() : got_message(false) {} template <class R> void RecvMessage(R* message) { - deserialize_ = [message](grpc_byte_buffer* buf, - int max_message_size) -> Status { - return SerializationTraits<R>::Deserialize(buf, message, - max_message_size); - }; + deserialize_.reset( + new CallOpGenericRecvMessageHelper::DeserializeFuncType<R>(message)); } bool got_message; @@ -201,7 +321,7 @@ class CallOpGenericRecvMessage { if (recv_buf_) { if (*status) { got_message = true; - *status = deserialize_(recv_buf_, max_message_size).ok(); + *status = deserialize_->Deserialize(recv_buf_, max_message_size).ok(); } else { got_message = false; grpc_byte_buffer_destroy(recv_buf_); @@ -210,12 +330,11 @@ class CallOpGenericRecvMessage { got_message = false; *status = false; } - deserialize_ = DeserializeFunc(); + deserialize_.reset(); } private: - typedef std::function<Status(grpc_byte_buffer*, int)> DeserializeFunc; - DeserializeFunc deserialize_; + std::unique_ptr<CallOpGenericRecvMessageHelper::DeserializeFunc> deserialize_; grpc_byte_buffer* recv_buf_; }; |