diff options
Diffstat (limited to 'include/grpc++/support/async_stream.h')
-rw-r--r-- | include/grpc++/support/async_stream.h | 426 |
1 files changed, 1 insertions, 425 deletions
diff --git a/include/grpc++/support/async_stream.h b/include/grpc++/support/async_stream.h index 0c96352ccd..88147ea9d6 100644 --- a/include/grpc++/support/async_stream.h +++ b/include/grpc++/support/async_stream.h @@ -34,430 +34,6 @@ #ifndef GRPCXX_SUPPORT_ASYNC_STREAM_H #define GRPCXX_SUPPORT_ASYNC_STREAM_H -#include <grpc/support/log.h> -#include <grpc++/channel.h> -#include <grpc++/client_context.h> -#include <grpc++/completion_queue.h> -#include <grpc++/impl/call.h> -#include <grpc++/impl/service_type.h> -#include <grpc++/server_context.h> -#include <grpc++/support/status.h> - -namespace grpc { - -/// Common interface for all client side asynchronous streaming. -class ClientAsyncStreamingInterface { - public: - virtual ~ClientAsyncStreamingInterface() {} - - /// Request notification of the reading of the initial metadata. Completion - /// will be notified by \a tag on the associated completion queue. - /// - /// \param[in] tag Tag identifying this request. - virtual void ReadInitialMetadata(void* tag) = 0; - - /// Request notification completion. - /// - /// \param[out] status To be updated with the operation status. - /// \param[in] tag Tag identifying this request. - virtual void Finish(Status* status, void* tag) = 0; -}; - -/// An interface that yields a sequence of messages of type \a R. -template <class R> -class AsyncReaderInterface { - public: - virtual ~AsyncReaderInterface() {} - - /// Read a message of type \a R into \a msg. Completion will be notified by \a - /// tag on the associated completion queue. - /// - /// \param[out] msg Where to eventually store the read message. - /// \param[in] tag The tag identifying the operation. - virtual void Read(R* msg, void* tag) = 0; -}; - -/// An interface that can be fed a sequence of messages of type \a W. -template <class W> -class AsyncWriterInterface { - public: - virtual ~AsyncWriterInterface() {} - - /// Request the writing of \a msg with identifying tag \a tag. - /// - /// Only one write may be outstanding at any given time. This means that - /// after calling Write, one must wait to receive \a tag from the completion - /// queue BEFORE calling Write again. - /// - /// \param[in] msg The message to be written. - /// \param[in] tag The tag identifying the operation. - virtual void Write(const W& msg, void* tag) = 0; -}; - -template <class R> -class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, - public AsyncReaderInterface<R> {}; - -template <class R> -class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> { - public: - /// Create a stream and write the first request out. - template <class W> - ClientAsyncReader(Channel* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - const W& request, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - // TODO(ctiller): don't assert - GPR_ASSERT(init_ops_.SendMessage(request).ok()); - init_ops_.ClientSendClose(); - call_.PerformOps(&init_ops_); - } - - void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); - } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - ClientContext* context_; - Call call_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> - init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; -}; - -/// Common interface for client side asynchronous writing. -template <class W> -class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface<W> { - public: - /// Signal the client is done with the writes. - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - -template <class W> -class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { - public: - template <class R> - ClientAsyncWriter(Channel* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - R* response, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - finish_ops_.RecvMessage(response); - - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); - } - - void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); - } - - void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - ClientContext* context_; - Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, - CallOpClientRecvStatus> finish_ops_; -}; - -/// Client-side interface for asynchronous bi-directional streaming. -template <class W, class R> -class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface, - public AsyncWriterInterface<W>, - public AsyncReaderInterface<R> { - public: - /// Signal the client is done with the writes. - /// - /// \param[in] tag The tag identifying the operation. - virtual void WritesDone(void* tag) = 0; -}; - -template <class W, class R> -class ClientAsyncReaderWriter GRPC_FINAL - : public ClientAsyncReaderWriterInterface<W, R> { - public: - ClientAsyncReaderWriter(Channel* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); - } - - void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!context_->initial_metadata_received_); - - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); - } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); - } - - void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); - if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); - } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); - } - - private: - ClientContext* context_; - Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; -}; - -template <class W, class R> -class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncReaderInterface<R> { - public: - explicit ServerAsyncReader(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Finish(const W& msg, const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // The response is dropped if the status is not OK. - if (status.ok()) { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, - finish_ops_.SendMessage(msg)); - } else { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - } - call_.PerformOps(&finish_ops_); - } - - void FinishWithError(const Status& status, void* tag) { - GPR_ASSERT(!status.ok()); - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> finish_ops_; -}; - -template <class W> -class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncWriterInterface<W> { - public: - explicit ServerAsyncWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void Finish(const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; -}; - -/// Server-side interface for asynchronous bi-directional streaming. -template <class W, class R> -class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, - public AsyncWriterInterface<W>, - public AsyncReaderInterface<R> { - public: - explicit ServerAsyncReaderWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); - } - - void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); - } - - void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); - } - - void Finish(const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); - if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); - } - - private: - friend class ::grpc::Server; - - void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; -}; - -} // namespace grpc +#include <grpc++/impl/codegen/async_stream.h> #endif // GRPCXX_SUPPORT_ASYNC_STREAM_H |