diff options
-rw-r--r-- | include/grpc++/impl/call.h | 110 | ||||
-rw-r--r-- | include/grpc++/stream.h | 26 | ||||
-rw-r--r-- | test/cpp/end2end/mock_test.cc | 4 |
3 files changed, 128 insertions, 12 deletions
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index 64fa5d6efb..325c64b20d 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -60,6 +60,96 @@ void FillMetadataMap(grpc_metadata_array* arr, grpc_metadata* FillMetadataArray( const std::multimap<grpc::string, grpc::string>& metadata); +/// Per-message write options. +class WriteOptions { + public: + WriteOptions() : flags_(0) {} + WriteOptions(const WriteOptions& other) : flags_(other.flags_) {} + + /// Clear all flags. + inline void Clear() { + flags_ = 0; + } + + /// Returns raw flags bitset. + inline gpr_uint32 flags() const { + return flags_; + } + + /// Sets flag for the disabling of compression for the next message write. + /// + /// \sa GRPC_WRITE_NO_COMPRESS + inline WriteOptions& set_no_compression() { + SetBit(GRPC_WRITE_NO_COMPRESS); + return *this; + } + + /// Clears flag for the disabling of compression for the next message write. + /// + /// \sa GRPC_WRITE_NO_COMPRESS + inline WriteOptions& clear_no_compression() { + ClearBit(GRPC_WRITE_NO_COMPRESS); + return *this; + } + + /// Get value for the flag indicating whether compression for the next + /// message write is forcefully disabled. + /// + /// \sa GRPC_WRITE_NO_COMPRESS + inline bool get_no_compression() const { + return GetBit(GRPC_WRITE_NO_COMPRESS); + } + + /// Sets flag indicating that the write may be buffered and need not go out on + /// the wire immediately. + /// + /// \sa GRPC_WRITE_BUFFER_HINT + inline WriteOptions& set_buffer_hint() { + SetBit(GRPC_WRITE_BUFFER_HINT); + return *this; + } + + /// Clears flag indicating that the write may be buffered and need not go out + /// on the wire immediately. + /// + /// \sa GRPC_WRITE_BUFFER_HINT + inline WriteOptions& clear_buffer_hint() { + ClearBit(GRPC_WRITE_BUFFER_HINT); + return *this; + } + + /// Get value for the flag indicating that the write may be buffered and need + /// not go out on the wire immediately. + /// + /// \sa GRPC_WRITE_BUFFER_HINT + inline bool get_buffer_hint() const { + return GetBit(GRPC_WRITE_BUFFER_HINT); + } + + WriteOptions& operator=(const WriteOptions& rhs) { + if (this == &rhs) { + return *this; + } + flags_ = rhs.flags_; + return *this; + } + + private: + void SetBit(const gpr_int32 mask) { + flags_ |= mask; + } + + void ClearBit(const gpr_int32 mask) { + flags_ &= ~mask; + } + + bool GetBit(const gpr_int32 mask) const { + return flags_ & mask; + } + + gpr_uint32 flags_; +}; + /// Default argument for CallOpSet. I is unused by the class, but can be /// used for generating multiple names for the same thing. template <int I> @@ -104,6 +194,12 @@ class CallOpSendMessage { public: CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {} + /// Send \a message using \a options for the write. The \a options are cleared + /// after use. + template <class M> + Status SendMessage(const M& message, + const WriteOptions& options) GRPC_MUST_USE_RESULT; + template <class M> Status SendMessage(const M& message) GRPC_MUST_USE_RESULT; @@ -112,8 +208,10 @@ class CallOpSendMessage { if (send_buf_ == nullptr) return; grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_MESSAGE; - op->flags = 0; + op->flags = write_options_.flags(); op->data.send_message = send_buf_; + // Flags are per-message: clear them after use. + write_options_.Clear(); } void FinishOp(bool* status, int max_message_size) { if (own_buf_) grpc_byte_buffer_destroy(send_buf_); @@ -122,14 +220,22 @@ class CallOpSendMessage { private: grpc_byte_buffer* send_buf_; + WriteOptions write_options_; bool own_buf_; }; template <class M> -Status CallOpSendMessage::SendMessage(const M& message) { +Status CallOpSendMessage::SendMessage(const M& message, + const WriteOptions& options) { + write_options_ = options; return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_); } +template <class M> +Status CallOpSendMessage::SendMessage(const M& message) { + return SendMessage(message, WriteOptions()); +} + template <class R> class CallOpRecvMessage { public: diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index dd5e52d6d3..3903f2ec06 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -79,7 +79,11 @@ class WriterInterface { // Blocking write msg to the stream. Returns true on success. // Returns false when the stream has been closed. - virtual bool Write(const W& msg) = 0; + virtual bool Write(const W& msg, const WriteOptions& options) = 0; + + inline bool Write(const W& msg) { + return Write(msg, WriteOptions()); + } }; template <class R> @@ -168,9 +172,10 @@ class ClientWriter : public ClientWriterInterface<W> { cq_.Pluck(&ops); } - bool Write(const W& msg) GRPC_OVERRIDE { + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { CallOpSet<CallOpSendMessage> ops; - if (!ops.SendMessage(msg).ok()) { + if (!ops.SendMessage(msg, options).ok()) { return false; } call_.PerformOps(&ops); @@ -246,9 +251,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { return cq_.Pluck(&ops) && ops.got_message; } - bool Write(const W& msg) GRPC_OVERRIDE { + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { CallOpSet<CallOpSendMessage> ops; - if (!ops.SendMessage(msg).ok()) return false; + if (!ops.SendMessage(msg, options).ok()) return false; call_.PerformOps(&ops); return cq_.Pluck(&ops); } @@ -317,9 +323,10 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> { call_->cq()->Pluck(&ops); } - bool Write(const W& msg) GRPC_OVERRIDE { + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; - if (!ops.SendMessage(msg).ok()) { + if (!ops.SendMessage(msg, options).ok()) { return false; } if (!ctx_->sent_initial_metadata_) { @@ -359,9 +366,10 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, return call_->cq()->Pluck(&ops) && ops.got_message; } - bool Write(const W& msg) GRPC_OVERRIDE { + using WriterInterface<W>::Write; + bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; - if (!ops.SendMessage(msg).ok()) { + if (!ops.SendMessage(msg, options).ok()) { return false; } if (!ctx_->sent_initial_metadata_) { diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc index 2809ab8d3c..59a8edf509 100644 --- a/test/cpp/end2end/mock_test.cc +++ b/test/cpp/end2end/mock_test.cc @@ -86,7 +86,9 @@ class MockClientReaderWriter<EchoRequest, EchoResponse> GRPC_FINAL msg->set_message(last_message_); return true; } - bool Write(const EchoRequest& msg) GRPC_OVERRIDE { + + bool Write(const EchoRequest& msg, + const WriteOptions& options) GRPC_OVERRIDE { gpr_log(GPR_INFO, "mock recv msg %s", msg.message().c_str()); last_message_ = msg.message(); return true; |