diff options
Diffstat (limited to 'include/grpc++/impl/codegen/sync_stream.h')
-rw-r--r-- | include/grpc++/impl/codegen/sync_stream.h | 106 |
1 files changed, 80 insertions, 26 deletions
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 4d9b074e95..ae3b8e441d 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -100,22 +100,40 @@ class WriterInterface { public: virtual ~WriterInterface() {} - /// Blocking write \a msg to the stream with options. + /// Blocking write \a msg to the stream with WriteOptions \a options. /// This is thread-safe with respect to \a Read /// /// \param msg The message to be written to the stream. - /// \param options Options affecting the write operation. + /// \param options The WriteOptions affecting the write operation. /// /// \return \a true on success, \a false when the stream has been closed. - virtual bool Write(const W& msg, const WriteOptions& options) = 0; + virtual bool Write(const W& msg, WriteOptions options) = 0; - /// Blocking write \a msg to the stream with default options. + /// Blocking write \a msg to the stream with default write options. /// This is thread-safe with respect to \a Read /// /// \param msg The message to be written to the stream. /// /// \return \a true on success, \a false when the stream has been closed. inline bool Write(const W& msg) { return Write(msg, WriteOptions()); } + + /// Write \a msg and coalesce it with the writing of trailing metadata, using + /// WriteOptions \a options. + /// + /// For client, WriteLast is equivalent of performing Write and WritesDone in + /// a single step. \a msg and trailing metadata are coalesced and sent on wire + /// by calling this function. + /// For server, WriteLast buffers the \a msg. The writing of \a msg is held + /// until the service handler returns, where \a msg and trailing metadata are + /// coalesced and sent on wire. Note that WriteLast can only buffer \a msg up + /// to the flow control window size. If \a msg size is larger than the window + /// size, it will be sent on wire without buffering. + /// + /// \param[in] msg The message to be written to the stream. + /// \param[in] options The WriteOptions to be used to write this message. + void WriteLast(const W& msg, WriteOptions options) { + Write(msg, options.set_last_message()); + } }; /// Client-side interface for streaming reads of message of type \a R. @@ -213,11 +231,13 @@ class ClientWriter : public ClientWriterInterface<W> { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); + if (!context_->initial_metadata_corked_) { + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } } void WaitForInitialMetadata() { @@ -230,11 +250,24 @@ class ClientWriter : public ClientWriterInterface<W> { } using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) override { - CallOpSet<CallOpSendMessage> ops; + bool Write(const W& msg, WriteOptions options) override { + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> + ops; + + if (options.is_last_message()) { + options.set_buffer_hint(); + ops.ClientSendClose(); + } + if (context_->initial_metadata_corked_) { + ops.SendInitialMetadata(context_->send_initial_metadata_, + context_->initial_metadata_flags()); + context_->set_initial_metadata_corked(false); + } if (!ops.SendMessage(msg, options).ok()) { return false; } + call_.PerformOps(&ops); return cq_.Pluck(&ops); } @@ -293,11 +326,13 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpSet<CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&ops); - cq_.Pluck(&ops); + if (!context_->initial_metadata_corked_) { + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&ops); + cq_.Pluck(&ops); + } } void WaitForInitialMetadata() override { @@ -325,9 +360,24 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { } using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) override { - CallOpSet<CallOpSendMessage> ops; - if (!ops.SendMessage(msg, options).ok()) return false; + bool Write(const W& msg, WriteOptions options) override { + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> + ops; + + if (options.is_last_message()) { + options.set_buffer_hint(); + ops.ClientSendClose(); + } + if (context_->initial_metadata_corked_) { + ops.SendInitialMetadata(context_->send_initial_metadata_, + context_->initial_metadata_flags()); + context_->set_initial_metadata_corked(false); + } + if (!ops.SendMessage(msg, options).ok()) { + return false; + } + call_.PerformOps(&ops); return cq_.Pluck(&ops); } @@ -423,7 +473,10 @@ class ServerWriter final : public ServerWriterInterface<W> { } using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) override { + bool Write(const W& msg, WriteOptions options) override { + if (options.is_last_message()) { + options.set_buffer_hint(); + } CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; if (!ops.SendMessage(msg, options).ok()) { return false; @@ -485,7 +538,10 @@ class ServerReaderWriterBody final { return call_->cq()->Pluck(&ops) && ops.got_message; } - bool Write(const W& msg, const WriteOptions& options) { + bool Write(const W& msg, WriteOptions options) { + if (options.is_last_message()) { + options.set_buffer_hint(); + } CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; if (!ops.SendMessage(msg, options).ok()) { return false; @@ -523,7 +579,7 @@ class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { bool Read(R* msg) override { return body_.Read(msg); } using WriterInterface<W>::Write; - bool Write(const W& msg, const WriteOptions& options) override { + bool Write(const W& msg, WriteOptions options) override { return body_.Write(msg, options); } @@ -562,8 +618,7 @@ class ServerUnaryStreamer final } using WriterInterface<ResponseType>::Write; - bool Write(const ResponseType& response, - const WriteOptions& options) override { + bool Write(const ResponseType& response, WriteOptions options) override { if (write_done_ || !read_done_) { return false; } @@ -604,8 +659,7 @@ class ServerSplitStreamer final } using WriterInterface<ResponseType>::Write; - bool Write(const ResponseType& response, - const WriteOptions& options) override { + bool Write(const ResponseType& response, WriteOptions options) override { return read_done_ && body_.Write(response, options); } |