diff options
Diffstat (limited to 'include/grpc++/stream.h')
-rw-r--r-- | include/grpc++/stream.h | 408 |
1 files changed, 325 insertions, 83 deletions
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index b8982f4d93..c30825a7a5 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -34,7 +34,9 @@ #ifndef __GRPCPP_STREAM_H__ #define __GRPCPP_STREAM_H__ -#include <grpc++/stream_context_interface.h> +#include <grpc++/call.h> +#include <grpc++/channel_interface.h> +#include <grpc++/completion_queue.h> #include <grpc++/status.h> #include <grpc/support/log.h> @@ -45,16 +47,12 @@ class ClientStreamingInterface { public: virtual ~ClientStreamingInterface() {} - // Try to cancel the stream. Wait() still needs to be called to get the final - // status. Cancelling after the stream has finished has no effects. - virtual void Cancel() = 0; - // Wait until the stream finishes, and return the final status. When the // client side declares it has no more message to send, either implicitly or // by calling WritesDone, it needs to make sure there is no more message to // be received from the server, either implicitly or by getting a false from // a Read(). Otherwise, this implicitly cancels the stream. - virtual const Status& Wait() = 0; + virtual Status Finish() = 0; }; // An interface that yields a sequence of R messages. @@ -82,147 +80,391 @@ class WriterInterface { }; template <class R> -class ClientReader : public ClientStreamingInterface, - public ReaderInterface<R> { +class ClientReader final : public ClientStreamingInterface, + public ReaderInterface<R> { public: // Blocking create a stream and write the first request out. - explicit ClientReader(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Write(context_->request(), true); + ClientReader(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request) + : call_(channel->CreateCall(method, context, &cq_)) { + CallOpBuffer buf; + buf.AddSendMessage(request); + buf.AddClientSendClose(); + call_.PerformOps(&buf); + cq_.Pluck(&buf); } - ~ClientReader() { delete context_; } + virtual bool Read(R *msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddClientRecvStatus(&status); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + return status; + } - virtual void Cancel() { context_->Cancel(); } + private: + CompletionQueue cq_; + Call call_; +}; - virtual const Status& Wait() { return context_->Wait(); } +template <class W> +class ClientWriter final : public ClientStreamingInterface, + public WriterInterface<W> { + public: + // Blocking create a stream. + ClientWriter(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + google::protobuf::Message *response) + : response_(response), + call_(channel->CreateCall(method, context, &cq_)) {} + + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } + + virtual bool WritesDone() { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } + + // Read the final response and wait for the final status. + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddRecvMessage(response_); + buf.AddClientRecvStatus(&status); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + return status; + } private: - StreamContextInterface* const context_; + google::protobuf::Message *const response_; + CompletionQueue cq_; + Call call_; }; -template <class W> -class ClientWriter : public ClientStreamingInterface, - public WriterInterface<W> { +// Client-side interface for bi-directional streaming. +template <class W, class R> +class ClientReaderWriter final : public ClientStreamingInterface, + public WriterInterface<W>, + public ReaderInterface<R> { public: // Blocking create a stream. - explicit ClientWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(false); + ClientReaderWriter(ChannelInterface *channel, + const RpcMethod &method, ClientContext *context) + : call_(channel->CreateCall(method, context, &cq_)) {} + + virtual bool Read(R *msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } - ~ClientWriter() { delete context_; } + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual bool WritesDone() { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } - virtual void WritesDone() { context_->Write(nullptr, true); } + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddClientRecvStatus(&status); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + return status; + } - virtual void Cancel() { context_->Cancel(); } + private: + CompletionQueue cq_; + Call call_; +}; - // Read the final response and wait for the final status. - virtual const Status& Wait() { - bool success = context_->Read(context_->response()); - if (!success) { - Cancel(); - } else { - success = context_->Read(nullptr); - if (success) { - Cancel(); - } - } - return context_->Wait(); +template <class R> +class ServerReader final : public ReaderInterface<R> { + public: + explicit ServerReader(Call* call) : call_(call) {} + + virtual bool Read(R* msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); } private: - StreamContextInterface* const context_; + Call* call_; }; -// Client-side interface for bi-directional streaming. +template <class W> +class ServerWriter final : public WriterInterface<W> { + public: + explicit ServerWriter(Call* call) : call_(call) {} + + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); + } + + private: + Call* call_; +}; + +// Server-side interface for bi-directional streaming. template <class W, class R> -class ClientReaderWriter : public ClientStreamingInterface, - public WriterInterface<W>, +class ServerReaderWriter final : public WriterInterface<W>, public ReaderInterface<R> { public: - // Blocking create a stream. - explicit ClientReaderWriter(StreamContextInterface* context) - : context_(context) { - GPR_ASSERT(context_); - context_->Start(false); + explicit ServerReaderWriter(Call* call) : call_(call) {} + + virtual bool Read(R* msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); } - ~ClientReaderWriter() { delete context_; } + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); + } + + private: + CompletionQueue* cq_; + Call* call_; +}; + +// Async interfaces +// Common interface for all client side streaming. +class ClientAsyncStreamingInterface { + public: + virtual ~ClientAsyncStreamingInterface() {} + + virtual void Finish(Status* status, void* tag) = 0; +}; + +// An interface that yields a sequence of R messages. +template <class R> +class AsyncReaderInterface { + public: + virtual ~AsyncReaderInterface() {} + + virtual void Read(R* msg, void* tag) = 0; +}; - virtual bool Read(R* msg) { return context_->Read(msg); } +// An interface that can be fed a sequence of W messages. +template <class W> +class AsyncWriterInterface { + public: + virtual ~AsyncWriterInterface() {} - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual void Write(const W& msg, void* tag) = 0; +}; + +template <class R> +class ClientAsyncReader final : public ClientAsyncStreamingInterface, + public AsyncReaderInterface<R> { + public: + // Blocking create a stream and write the first request out. + ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, void* tag) + : call_(channel->CreateCall(method, context, &cq_)) { + init_buf_.Reset(tag); + init_buf_.AddSendMessage(request); + init_buf_.AddClientSendClose(); + call_.PerformOps(&init_buf_); } - virtual void WritesDone() { context_->Write(nullptr, true); } + virtual void Read(R *msg, void* tag) override { + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); + } - virtual void Cancel() { context_->Cancel(); } + virtual void Finish(Status* status, void* tag) override { + finish_buf_.Reset(tag); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); + } - virtual const Status& Wait() { return context_->Wait(); } + private: + CompletionQueue cq_; + Call call_; + CallOpBuffer init_buf_; + CallOpBuffer read_buf_; + CallOpBuffer finish_buf_; +}; + +template <class W> +class ClientAsyncWriter final : public ClientAsyncStreamingInterface, + public WriterInterface<W> { + public: + // Blocking create a stream. + ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + google::protobuf::Message *response) + : response_(response), + call_(channel->CreateCall(method, context, &cq_)) {} + + virtual void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); + } + + virtual void WritesDone(void* tag) override { + writes_done_buf_.Reset(tag); + writes_done_buf_.AddClientSendClose(); + call_.PerformOps(&writes_done_buf_); + } + + virtual void Finish(Status* status, void* tag) override { + finish_buf_.Reset(tag); + finish_buf_.AddRecvMessage(response_); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); + } private: - StreamContextInterface* const context_; + google::protobuf::Message *const response_; + CompletionQueue cq_; + Call call_; + CallOpBuffer write_buf_; + CallOpBuffer writes_done_buf_; + CallOpBuffer finish_buf_; }; -template <class R> -class ServerReader : public ReaderInterface<R> { +// Client-side interface for bi-directional streaming. +template <class W, class R> +class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, + public AsyncWriterInterface<W>, + public AsyncReaderInterface<R> { public: - explicit ServerReader(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); + ClientAsyncReaderWriter(ChannelInterface *channel, + const RpcMethod &method, ClientContext *context) + : call_(channel->CreateCall(method, context, &cq_)) {} + + virtual void Read(R *msg, void* tag) override { + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); + } + + virtual void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); } - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual void WritesDone(void* tag) override { + writes_done_buf_.Reset(tag); + writes_done_buf_.AddClientSendClose(); + call_.PerformOps(&writes_done_buf_); + } + + virtual void Finish(Status* status, void* tag) override { + finish_buf_.Reset(tag); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); + } private: - StreamContextInterface* const context_; // not owned + CompletionQueue cq_; + Call call_; + CallOpBuffer read_buf_; + CallOpBuffer write_buf_; + CallOpBuffer writes_done_buf_; + CallOpBuffer finish_buf_; }; +// TODO(yangg) Move out of stream.h template <class W> -class ServerWriter : public WriterInterface<W> { +class ServerAsyncResponseWriter final { public: - explicit ServerWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Read(context_->request()); + explicit ServerAsyncResponseWriter(Call* call) : call_(call) {} + + virtual void Write(const W& msg, void* tag) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf); } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + private: + Call* call_; +}; + +template <class R> +class ServerAsyncReader : public AsyncReaderInterface<R> { + public: + explicit ServerAsyncReader(Call* call) : call_(call) {} + + virtual void Read(R* msg, void* tag) { + // TODO + } + + private: + Call* call_; +}; + +template <class W> +class ServerAsyncWriter : public AsyncWriterInterface<W> { + public: + explicit ServerAsyncWriter(Call* call) : call_(call) {} + + virtual void Write(const W& msg, void* tag) { + // TODO } private: - StreamContextInterface* const context_; // not owned + Call* call_; }; // Server-side interface for bi-directional streaming. template <class W, class R> -class ServerReaderWriter : public WriterInterface<W>, - public ReaderInterface<R> { +class ServerAsyncReaderWriter : public AsyncWriterInterface<W>, + public AsyncReaderInterface<R> { public: - explicit ServerReaderWriter(StreamContextInterface* context) - : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - } + explicit ServerAsyncReaderWriter(Call* call) : call_(call) {} - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual void Read(R* msg, void* tag) { + // TODO + } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual void Write(const W& msg, void* tag) { + // TODO } private: - StreamContextInterface* const context_; // not owned + Call* call_; }; } // namespace grpc |