diff options
Diffstat (limited to 'include/grpc++/impl/codegen/async_stream.h')
-rw-r--r-- | include/grpc++/impl/codegen/async_stream.h | 247 |
1 files changed, 192 insertions, 55 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index 1a5cbbd45d..8f529895ca 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -101,6 +101,39 @@ class AsyncWriterInterface { /// \param[in] msg The message to be written. /// \param[in] tag The tag identifying the operation. virtual void Write(const W& msg, void* tag) = 0; + + /// Request the writing of \a msg using WriteOptions \a options with + /// identifying tag \a tag. + /// + /// Only one write may be outstanding at any given time. This means that + /// after calling Write, one must wait to receive \a tag from the completion + /// queue BEFORE calling Write again. + /// WriteOptions \a options is used to set the write options of this message. + /// This is thread-safe with respect to \a Read + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] tag The tag identifying the operation. + virtual void Write(const W& msg, WriteOptions options, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with the writing + /// of trailing metadata, using WriteOptions \a options with identifying tag + /// \a tag. + /// + /// For client, WriteLast is equivalent of performing Write and WritesDone in + /// a single step. + /// For server, WriteLast buffers the \a msg. The writing of \a msg is held + /// until Finish is called, where \a msg and trailing metadata are coalesced + /// and write is initiated. Note that WriteLast can only buffer \a msg up to + /// the flow control window size. If \a msg size is larger than the window + /// size, it will be sent on wire without buffering. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] tag The tag identifying the operation. + void WriteLast(const W& msg, WriteOptions options, void* tag) { + Write(msg, options.set_last_message(), tag); + } }; template <class R> @@ -183,11 +216,17 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { : context_(context), call_(channel->CreateCall(method, context, cq)) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); - - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&init_ops_); + // if corked bit is set in context, we buffer up the initial metadata to + // coalesce with later message to be sent. No op is performed. + if (context_->initial_metadata_corked_) { + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + } else { + write_ops_.set_output_tag(tag); + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&write_ops_); + } } void ReadInitialMetadata(void* tag) override { @@ -205,10 +244,21 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { call_.PerformOps(&write_ops_); } + void Write(const W& msg, WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + void WritesDone(void* tag) override { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); + write_ops_.set_output_tag(tag); + write_ops_.ClientSendClose(); + call_.PerformOps(&write_ops_); } void Finish(Status* status, void* tag) override { @@ -223,10 +273,9 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { private: ClientContext* context_; Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + write_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_; @@ -253,10 +302,17 @@ class ClientAsyncReaderWriter final const RpcMethod& method, ClientContext* context, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&init_ops_); + if (context_->initial_metadata_corked_) { + // if corked bit is set in context, we buffer up the initial metadata to + // coalesce with later message to be sent. No op is performed. + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + } else { + write_ops_.set_output_tag(tag); + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&write_ops_); + } } void ReadInitialMetadata(void* tag) override { @@ -283,10 +339,21 @@ class ClientAsyncReaderWriter final call_.PerformOps(&write_ops_); } + void Write(const W& msg, WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + write_ops_.ClientSendClose(); + } + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + void WritesDone(void* tag) override { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); + write_ops_.set_output_tag(tag); + write_ops_.ClientSendClose(); + call_.PerformOps(&write_ops_); } void Finish(Status* status, void* tag) override { @@ -301,11 +368,10 @@ class ClientAsyncReaderWriter final private: ClientContext* context_; Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + write_ops_; CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; }; @@ -395,6 +461,20 @@ class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface, public AsyncWriterInterface<W> { public: virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with identifying tag \a + /// tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish in a + /// single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; }; template <class W> @@ -418,29 +498,37 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { void Write(const W& msg, void* tag) override { write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - write_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } + EnsureInitialMetadataSent(&write_ops_); // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } + void Write(const W& msg, WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + } + + EnsureInitialMetadataSent(&write_ops_); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, + void* tag) override { + write_ops_.set_output_tag(tag); + EnsureInitialMetadataSent(&write_ops_); + options.set_buffer_hint(); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&write_ops_); + } + void Finish(const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } + EnsureInitialMetadataSent(&finish_ops_); finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); call_.PerformOps(&finish_ops_); } @@ -448,10 +536,24 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { private: void BindCall(Call* call) override { call_ = *call; } + template <class T> + void EnsureInitialMetadataSent(T* ops) { + if (!ctx_->sent_initial_metadata_) { + ops->SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops->set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + } + Call call_; ServerContext* ctx_; CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + write_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; @@ -462,6 +564,20 @@ class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface, public AsyncReaderInterface<R> { public: virtual void Finish(const Status& status, void* tag) = 0; + + /// Request the writing of \a msg and coalesce it with trailing metadata which + /// contains \a status, using WriteOptions options with identifying tag \a + /// tag. + /// + /// WriteAndFinish is equivalent of performing WriteLast and Finish in a + /// single step. + /// + /// \param[in] msg The message to be written. + /// \param[in] options The WriteOptions to be used to write this message. + /// \param[in] status The Status that server returns to client. + /// \param[in] tag The tag identifying the operation. + virtual void WriteAndFinish(const W& msg, WriteOptions options, + const Status& status, void* tag) = 0; }; template <class W, class R> @@ -492,29 +608,36 @@ class ServerAsyncReaderWriter final void Write(const W& msg, void* tag) override { write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - write_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } + EnsureInitialMetadataSent(&write_ops_); // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } + void Write(const W& msg, WriteOptions options, void* tag) override { + write_ops_.set_output_tag(tag); + if (options.is_last_message()) { + options.set_buffer_hint(); + } + EnsureInitialMetadataSent(&write_ops_); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + call_.PerformOps(&write_ops_); + } + + void WriteAndFinish(const W& msg, WriteOptions options, const Status& status, + void* tag) override { + write_ops_.set_output_tag(tag); + EnsureInitialMetadataSent(&write_ops_); + options.set_buffer_hint(); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); + write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&write_ops_); + } + void Finish(const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - finish_ops_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - } + EnsureInitialMetadataSent(&finish_ops_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); call_.PerformOps(&finish_ops_); } @@ -524,11 +647,25 @@ class ServerAsyncReaderWriter final void BindCall(Call* call) override { call_ = *call; } + template <class T> + void EnsureInitialMetadataSent(T* ops) { + if (!ctx_->sent_initial_metadata_) { + ops->SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + ops->set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + } + Call call_; ServerContext* ctx_; CallOpSet<CallOpSendInitialMetadata> meta_ops_; CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + write_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; |