diff options
author | Craig Tiller <ctiller@google.com> | 2017-03-31 07:50:01 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-03-31 07:50:01 -0700 |
commit | 547631eaf89709ed2dcb30e9f8fdb617617c7aff (patch) | |
tree | 3ca8547943c909d13d0c89da906d5fb2a78d3595 /include/grpc++ | |
parent | 4cbf4d803013d658919d08c341747fb005ff6337 (diff) | |
parent | 66c76d653bf93fb94b108993f3bea916675147bc (diff) |
Merge github.com:grpc/grpc into cpparena
Diffstat (limited to 'include/grpc++')
-rw-r--r-- | include/grpc++/impl/codegen/async_stream.h | 247 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/call.h | 48 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/client_context.h | 15 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/grpc_library.h | 26 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/server_context.h | 1 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/sync_stream.h | 106 | ||||
-rw-r--r-- | include/grpc++/support/channel_arguments.h | 6 |
7 files changed, 349 insertions, 100 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index 1a5cbbd45d..8f529895ca 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -101,6 +101,39 @@ class AsyncWriterInterface { /// \param[in] msg The message to be written. /// \param[in] tag The tag identifying the operation. virtual void Write(const W& msg, void* tag) = 0; + + /// Request the writing of \a msg using WriteOptions \a options with + /// identifying tag \a tag. + /// + /// Only one write may be outstanding at any given time. This means that + /// after calling Write, one must wait to receive \a tag from the completion + /// queue BEFORE calling Write again. + /// WriteOptions \a options is used to set the write options of this message. + /// This is thread-safe with respect to \a Read + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] tag The tag identifying the operation. + virtual void Write(const W& msg, WriteOptions options, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with the writing + /// of trailing metadata, using WriteOptions \a options with identifying tag + /// \a tag. + /// + /// For client, WriteLast is equivalent of performing Write and WritesDone in + /// a single step. + /// For server, WriteLast buffers the \a msg. The writing of \a msg is held + /// until Finish is called, where \a msg and trailing metadata are coalesced + /// and write is initiated. Note that WriteLast can only buffer \a msg up to + /// the flow control window size. If \a msg size is larger than the window + /// size, it will be sent on wire without buffering. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] tag The tag identifying the operation. + void WriteLast(const W& msg, WriteOptions options, void* tag) { + Write(msg, options.set_last_message(), tag); + } }; template <class R> @@ -183,11 +216,17 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { : context_(context), call_(channel->CreateCall(method, context, cq)) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); - - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&init_ops_); + // if corked bit is set in context, we buffer up the initial metadata to + // coalesce with later message to be sent. No op is performed. + if (context_->initial_metadata_corked_) { + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + } else { + write_ops_.set_output_tag(tag); + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&write_ops_); + } } void ReadInitialMetadata(void* tag) override { @@ -205,10 +244,21 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { call_.PerformOps(&write_ops_); } + void Write(const W& msg, WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + void WritesDone(void* tag) override { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); + write_ops_.set_output_tag(tag); + write_ops_.ClientSendClose(); + call_.PerformOps(&write_ops_); } void Finish(Status* status, void* tag) override { @@ -223,10 +273,9 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { private: ClientContext* context_; Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + write_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_; @@ -253,10 +302,17 @@ class ClientAsyncReaderWriter final const RpcMethod& method, ClientContext* context, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&init_ops_); + if (context_->initial_metadata_corked_) { + // if corked bit is set in context, we buffer up the initial metadata to + // coalesce with later message to be sent. No op is performed. + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + } else { + write_ops_.set_output_tag(tag); + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&write_ops_); + } } void ReadInitialMetadata(void* tag) override { @@ -283,10 +339,21 @@ class ClientAsyncReaderWriter final call_.PerformOps(&write_ops_); } + void Write(const W& msg, WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + void WritesDone(void* tag) override { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); + write_ops_.set_output_tag(tag); + write_ops_.ClientSendClose(); + call_.PerformOps(&write_ops_); } void Finish(Status* status, void* tag) override { @@ -301,11 +368,10 @@ class ClientAsyncReaderWriter final private: ClientContext* context_; Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + write_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; }; @@ -395,6 +461,20 @@ class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface, public AsyncWriterInterface<W> { public: virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with identifying tag \a + /// tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish in a + /// single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; }; template <class W> @@ -418,29 +498,37 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { void Write(const W& msg, void* tag) override { write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - write_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } + EnsureInitialMetadataSent(&write_ops_); // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } + void Write(const W& msg, WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + } + + EnsureInitialMetadataSent(&write_ops_); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, + void* tag) override { + write_ops_.set_output_tag(tag); + EnsureInitialMetadataSent(&write_ops_); + options.set_buffer_hint(); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&write_ops_); + } + void Finish(const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } + EnsureInitialMetadataSent(&finish_ops_); finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); call_.PerformOps(&finish_ops_); } @@ -448,10 +536,24 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { private: void BindCall(Call* call) override { call_ = *call; } + template <class T> + void EnsureInitialMetadataSent(T* ops) { + if (!ctx_->sent_initial_metadata_) { + ops->SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops->set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + } + Call call_; ServerContext* ctx_; CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + write_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; @@ -462,6 +564,20 @@ class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface, public AsyncReaderInterface<R> { public: virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with identifying tag \a + /// tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish in a + /// single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; }; template <class W, class R> @@ -492,29 +608,36 @@ class ServerAsyncReaderWriter final void Write(const W& msg, void* tag) override { write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - write_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } + EnsureInitialMetadataSent(&write_ops_); // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } + void Write(const W& msg, WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + } + EnsureInitialMetadataSent(&write_ops_); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, + void* tag) override { + write_ops_.set_output_tag(tag); + EnsureInitialMetadataSent(&write_ops_); + options.set_buffer_hint(); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&write_ops_); + } + void Finish(const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } + EnsureInitialMetadataSent(&finish_ops_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); call_.PerformOps(&finish_ops_); } @@ -524,11 +647,25 @@ class ServerAsyncReaderWriter final void BindCall(Call* call) override { call_ = *call; } + template <class T> + void EnsureInitialMetadataSent(T* ops) { + if (!ctx_->sent_initial_metadata_) { + ops->SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops->set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + } + Call call_; ServerContext* ctx_; CallOpSet<CallOpSendInitialMetadata> meta_ops_; CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + write_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 04296dbe15..9c8611a116 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -86,8 +86,9 @@ inline grpc_metadata* FillMetadataArray( /// Per-message write options. class WriteOptions { public: - WriteOptions() : flags_(0) {} - WriteOptions(const WriteOptions& other) : flags_(other.flags_) {} + WriteOptions() : flags_(0), last_message_(false) {} + WriteOptions(const WriteOptions& other) + : flags_(other.flags_), last_message_(other.last_message_) {} /// Clear all flags. inline void Clear() { flags_ = 0; } @@ -143,6 +144,43 @@ class WriteOptions { /// \sa GRPC_WRITE_BUFFER_HINT inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); } + /// corked bit: aliases set_buffer_hint currently, with the intent that + /// set_buffer_hint will be removed in the future + inline WriteOptions& set_corked() { + SetBit(GRPC_WRITE_BUFFER_HINT); + return *this; + } + + inline WriteOptions& clear_corked() { + ClearBit(GRPC_WRITE_BUFFER_HINT); + return *this; + } + + inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); } + + /// last-message bit: indicates this is the last message in a stream + /// client-side: makes Write the equivalent of performing Write, WritesDone + /// in a single step + /// server-side: hold the Write until the service handler returns (sync api) + /// or until Finish is called (async api) + inline WriteOptions& set_last_message() { + last_message_ = true; + return *this; + } + + /// Clears flag indicating that this is the last message in a stream, + /// disabling coalescing. + inline WriteOptions& clear_last_messsage() { + last_message_ = false; + return *this; + } + + /// Get value for the flag indicating that this is the last message, and + /// should be coalesced with trailing metadata. + /// + /// \sa GRPC_WRITE_LAST_MESSAGE + bool is_last_message() const { return last_message_; } + WriteOptions& operator=(const WriteOptions& rhs) { flags_ = rhs.flags_; return *this; @@ -156,6 +194,7 @@ class WriteOptions { bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; } uint32_t flags_; + bool last_message_; }; /// Default argument for CallOpSet. I is unused by the class, but can be @@ -226,7 +265,7 @@ class CallOpSendMessage { /// after use. template <class M> Status SendMessage(const M& message, - const WriteOptions& options) GRPC_MUST_USE_RESULT; + WriteOptions options) GRPC_MUST_USE_RESULT; template <class M> Status SendMessage(const M& message) GRPC_MUST_USE_RESULT; @@ -254,8 +293,7 @@ class CallOpSendMessage { }; template <class M> -Status CallOpSendMessage::SendMessage(const M& message, - const WriteOptions& options) { +Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) { write_options_ = options; return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_); } diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h index b91c7f65d4..3c50e6ba9d 100644 --- a/include/grpc++/impl/codegen/client_context.h +++ b/include/grpc++/impl/codegen/client_context.h @@ -281,6 +281,17 @@ class ClientContext { /// \param algorithm The compression algorithm used for the client call. void set_compression_algorithm(grpc_compression_algorithm algorithm); + /// Flag whether the initial metadata should be \a corked + /// + /// If \a corked is true, then the initial metadata will be colasced with the + /// write of first message in the stream. + /// + /// \param corked The flag indicating whether the initial metadata is to be + /// corked or not. + void set_initial_metadata_corked(bool corked) { + initial_metadata_corked_ = corked; + } + /// Return the peer uri in a string. /// /// \warning This value is never authenticated or subject to any security @@ -357,7 +368,8 @@ class ClientContext { (cacheable_ ? GRPC_INITIAL_METADATA_CACHEABLE_REQUEST : 0) | (wait_for_ready_explicitly_set_ ? GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET - : 0); + : 0) | + (initial_metadata_corked_ ? GRPC_INITIAL_METADATA_CORKED : 0); } grpc::string authority() { return authority_; } @@ -384,6 +396,7 @@ class ClientContext { PropagationOptions propagation_options_; grpc_compression_algorithm compression_algorithm_; + bool initial_metadata_corked_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/grpc_library.h b/include/grpc++/impl/codegen/grpc_library.h index 2b11aff214..3735d04e8c 100644 --- a/include/grpc++/impl/codegen/grpc_library.h +++ b/include/grpc++/impl/codegen/grpc_library.h @@ -51,18 +51,26 @@ extern GrpcLibraryInterface* g_glip; /// Classes that require gRPC to be initialized should inherit from this class. class GrpcLibraryCodegen { public: - GrpcLibraryCodegen() { - GPR_CODEGEN_ASSERT(g_glip && - "gRPC library not initialized. See " - "grpc::internal::GrpcLibraryInitializer."); - g_glip->init(); + GrpcLibraryCodegen(bool call_grpc_init = true) : grpc_init_called_(false) { + if (call_grpc_init) { + GPR_CODEGEN_ASSERT(g_glip && + "gRPC library not initialized. See " + "grpc::internal::GrpcLibraryInitializer."); + g_glip->init(); + grpc_init_called_ = true; + } } virtual ~GrpcLibraryCodegen() { - GPR_CODEGEN_ASSERT(g_glip && - "gRPC library not initialized. See " - "grpc::internal::GrpcLibraryInitializer."); - g_glip->shutdown(); + if (grpc_init_called_) { + GPR_CODEGEN_ASSERT(g_glip && + "gRPC library not initialized. See " + "grpc::internal::GrpcLibraryInitializer."); + g_glip->shutdown(); + } } + + private: + bool grpc_init_called_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index bf9a9b6f1a..91f0be06e7 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -39,7 +39,6 @@ #include <vector> #include <grpc/impl/codegen/compression_types.h> -#include <grpc/load_reporting.h> #include <grpc++/impl/codegen/config.h> #include <grpc++/impl/codegen/create_auth_context.h> diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 4d9b074e95..ae3b8e441d 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -100,22 +100,40 @@ class WriterInterface { public: virtual ~WriterInterface() {} - /// Blocking write \a msg to the stream with options. + /// Blocking write \a msg to the stream with WriteOptions \a options. /// This is thread-safe with respect to \a Read /// /// \param msg The message to be written to the stream. - /// \param options Options affecting the write operation. + /// \param options The WriteOptions affecting the write operation. /// /// \return \a true on success, \a false when the stream has been closed. - virtual bool Write(const W& msg, const WriteOptions& options) = 0; + virtual bool Write(const W& msg, WriteOptions options) = 0; - /// Blocking write \a msg to the stream with default options. + /// Blocking write \a msg to the stream with default write options. /// This is thread-safe with respect to \a Read /// /// \param msg The message to be written to the stream. /// /// \return \a true on success, \a false when the stream has been closed. inline bool Write(const W& msg) { return Write(msg, WriteOptions()); } + + /// Write \a msg and coalesce it with the writing of trailing metadata, using + /// WriteOptions \a options. + /// + /// For client, WriteLast is equivalent of performing Write and WritesDone in + /// a single step. \a msg and trailing metadata are coalesced and sent on wire + /// by calling this function. + /// For server, WriteLast buffers the \a msg. The writing of \a msg is held + /// until the service handler returns, where \a msg and trailing metadata are + /// coalesced and sent on wire. Note that WriteLast can only buffer \a msg up + /// to the flow control window size. If \a msg size is larger than the window + /// size, it will be sent on wire without buffering. + /// + /// \param[in] msg The message to be written to the stream. + /// \param[in] options The WriteOptions to be used to write this message. + void WriteLast(const W& msg, WriteOptions options) { + Write(msg, options.set_last_message()); + } }; /// Client-side interface for streaming reads of message of type \a R. @@ -213,11 +231,13 @@ class ClientWriter : public ClientWriterInterface<W> { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); + if (!context_->initial_metadata_corked_) { + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } } void WaitForInitialMetadata() { @@ -230,11 +250,24 @@ class ClientWriter : public ClientWriterInterface<W> { } using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) override { - CallOpSet<CallOpSendMessage> ops; + bool Write(const W& msg, WriteOptions options) override { + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> + ops; + + if (options.is_last_message()) { + options.set_buffer_hint(); + ops.ClientSendClose(); + } + if (context_->initial_metadata_corked_) { + ops.SendInitialMetadata(context_->send_initial_metadata_, + context_->initial_metadata_flags()); + context_->set_initial_metadata_corked(false); + } if (!ops.SendMessage(msg, options).ok()) { return false; } + call_.PerformOps(&ops); return cq_.Pluck(&ops); } @@ -293,11 +326,13 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); + if (!context_->initial_metadata_corked_) { + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } } void WaitForInitialMetadata() override { @@ -325,9 +360,24 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { } using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) override { - CallOpSet<CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) return false; + bool Write(const W& msg, WriteOptions options) override { + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> + ops; + + if (options.is_last_message()) { + options.set_buffer_hint(); + ops.ClientSendClose(); + } + if (context_->initial_metadata_corked_) { + ops.SendInitialMetadata(context_->send_initial_metadata_, + context_->initial_metadata_flags()); + context_->set_initial_metadata_corked(false); + } + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + call_.PerformOps(&ops); return cq_.Pluck(&ops); } @@ -423,7 +473,10 @@ class ServerWriter final : public ServerWriterInterface<W> { } using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) override { + bool Write(const W& msg, WriteOptions options) override { + if (options.is_last_message()) { + options.set_buffer_hint(); + } CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; if (!ops.SendMessage(msg, options).ok()) { return false; @@ -485,7 +538,10 @@ class ServerReaderWriterBody final { return call_->cq()->Pluck(&ops) && ops.got_message; } - bool Write(const W& msg, const WriteOptions& options) { + bool Write(const W& msg, WriteOptions options) { + if (options.is_last_message()) { + options.set_buffer_hint(); + } CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; if (!ops.SendMessage(msg, options).ok()) { return false; @@ -523,7 +579,7 @@ class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { bool Read(R* msg) override { return body_.Read(msg); } using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) override { + bool Write(const W& msg, WriteOptions options) override { return body_.Write(msg, options); } @@ -562,8 +618,7 @@ class ServerUnaryStreamer final } using WriterInterface<ResponseType>::Write; - bool Write(const ResponseType& response, - const WriteOptions& options) override { + bool Write(const ResponseType& response, WriteOptions options) override { if (write_done_ || !read_done_) { return false; } @@ -604,8 +659,7 @@ class ServerSplitStreamer final } using WriterInterface<ResponseType>::Write; - bool Write(const ResponseType& response, - const WriteOptions& options) override { + bool Write(const ResponseType& response, WriteOptions options) override { return read_done_ && body_.Write(response, options); } diff --git a/include/grpc++/support/channel_arguments.h b/include/grpc++/support/channel_arguments.h index efdf7772ad..61307d6194 100644 --- a/include/grpc++/support/channel_arguments.h +++ b/include/grpc++/support/channel_arguments.h @@ -54,7 +54,7 @@ class ResourceQuota; class ChannelArguments { public: ChannelArguments(); - ~ChannelArguments() {} + ~ChannelArguments(); ChannelArguments(const ChannelArguments& other); ChannelArguments& operator=(ChannelArguments other) { @@ -117,10 +117,10 @@ class ChannelArguments { /// Return (by value) a c grpc_channel_args structure which points to /// arguments owned by this ChannelArguments instance - grpc_channel_args c_channel_args() { + grpc_channel_args c_channel_args() const { grpc_channel_args out; out.num_args = args_.size(); - out.args = args_.empty() ? NULL : &args_[0]; + out.args = args_.empty() ? NULL : const_cast<grpc_arg*>(&args_[0]); return out; } |