diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-02-09 16:12:36 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-02-09 16:12:36 -0800 |
commit | 8de8c646c5c231796e45ecb8e4302f1a3fc4004c (patch) | |
tree | db89c170dc64dd03748bef64457ee7e1246ad1fa /include | |
parent | b470169252c36a64749f6b7c60da36845c7fee1d (diff) | |
parent | 75ec2b191cbd94ff0eb3f1247d66e3991aa97000 (diff) |
Merge pull request #3 from yang-g/c++api
more implementation and all async signatures
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc++/stream.h | 236 |
1 files changed, 184 insertions, 52 deletions
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 30af678c69..34604191e5 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -177,6 +177,7 @@ class ClientWriter final : public ClientStreamingInterface, virtual Status Finish() override { CallOpBuffer buf; Status status; + buf.AddRecvMessage(response_); buf.AddClientRecvStatus(&status); call_.PerformOps(&buf, (void *)4); GPR_ASSERT(cq_.Pluck((void *)4)); @@ -252,110 +253,241 @@ class ServerReader final : public ReaderInterface<R> { }; template <class W> -class ServerWriter : public WriterInterface<W> { +class ServerWriter final : public WriterInterface<W> { public: - explicit ServerWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Read(context_->request()); - } + explicit ServerWriter(Call* call) : call_(call) {} - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf, (void *)2); + return call_->cq()->Pluck((void *)2); } private: - StreamContextInterface* const context_; // not owned + Call* call_; }; // Server-side interface for bi-directional streaming. template <class W, class R> -class ServerReaderWriter : public WriterInterface<W>, +class ServerReaderWriter final : public WriterInterface<W>, public ReaderInterface<R> { public: - explicit ServerReaderWriter(StreamContextInterface* context) - : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); + explicit ServerReaderWriter(Call* call) : call_(call) {} + + virtual bool Read(R* msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_->PerformOps(&buf, (void *)2); + return call_->cq()->Pluck((void *)2); + } + + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf, (void *)3); + return call_->cq()->Pluck((void *)3); } - virtual bool Read(R* msg) { return context_->Read(msg); } + private: + CompletionQueue* cq_; + Call* call_; +}; + +// Async interfaces +// Common interface for all client side streaming. +class ClientAsyncStreamingInterface { + public: + virtual ~ClientAsyncStreamingInterface() {} + + virtual void Finish(Status* status, void* tag) = 0; +}; + +// An interface that yields a sequence of R messages. +template <class R> +class AsyncReaderInterface { + public: + virtual ~AsyncReaderInterface() {} - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual void Read(R* msg, void* tag) = 0; +}; + +// An interface that can be fed a sequence of W messages. +template <class W> +class AsyncWriterInterface { + public: + virtual ~Async WriterInterface() {} + + virtual void Write(const W& msg, void* tag) = 0; +}; + +template <class R> +class ClientAsyncReader final : public ClientAsyncStreamingInterface, + public AsyncReaderInterface<R> { + public: + // Blocking create a stream and write the first request out. + ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, void* tag) + : call_(channel->CreateCall(method, context, &cq_)) { + CallOpBuffer buf; + buf.AddSendMessage(request); + buf.AddClientSendClose(); + call_.PerformOps(&buf, tag); + } + + virtual void Read(R *msg, void* tag) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_.PerformOps(&buf, tag); + } + + virtual void Finish(Status* status, void* tag) override { + CallOpBuffer buf; + buf.AddClientRecvStatus(status); + call_.PerformOps(&buf, tag); } private: - StreamContextInterface* const context_; // not owned + CompletionQueue cq_; + Call call_; }; template <class W> -class ServerAsyncResponseWriter { +class ClientWriter final : public ClientAsyncStreamingInterface, + public WriterInterface<W> { public: - explicit ServerAsyncResponseWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Read(context_->request()); + // Blocking create a stream. + ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + google::protobuf::Message *response) + : response_(response), + call_(channel->CreateCall(method, context, &cq_)) {} + + virtual void Write(const W& msg, void* tag) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(&buf, tag); } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual void WritesDone(void* tag) { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(&buf, tag); + } + + virtual void Finish(Status* status, void* tag) override { + CallOpBuffer buf; + buf.AddRecvMessage(response_); + buf.AddClientRecvStatus(status); + call_.PerformOps(&buf, tag); } private: - StreamContextInterface* const context_; // not owned + google::protobuf::Message *const response_; + CompletionQueue cq_; + Call call_; }; -template <class R> -class ServerAsyncReader : public ReaderInterface<R> { +// Client-side interface for bi-directional streaming. +template <class W, class R> +class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, + public AsyncWriterInterface<W>, + public AsyncReaderInterface<R> { public: - explicit ServerAsyncReader(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); + ClientAsyncReaderWriter(ChannelInterface *channel, + const RpcMethod &method, ClientContext *context) + : call_(channel->CreateCall(method, context, &cq_)) {} + + virtual void Read(R *msg, void* tag) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_.PerformOps(&buf, tag); + } + + virtual void Write(const W& msg, void* tag) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(&buf, tag); + } + + virtual bool WritesDone(void* tag) { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(&buf, tag); } - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual void Finish(Status* status, void* tag) override { + CallOpBuffer buf; + Status status; + buf.AddClientRecvStatus(status); + call_.PerformOps(&buf, tag); + } private: - StreamContextInterface* const context_; // not owned + CompletionQueue cq_; + Call call_; }; +// TODO(yangg) Move out of stream.h template <class W> -class ServerAsyncWriter : public WriterInterface<W> { +class ServerAsyncResponseWriter final { public: - explicit ServerAsyncWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Read(context_->request()); + explicit ServerAsyncResponseWriter(Call* call) : call_(call) {} + + virtual void Write(const W& msg, void* tag) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf, tag); } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + private: + Call* call_; +}; + +template <class R> +class ServerAsyncReader : public AsyncReaderInterface<R> { + public: + explicit ServerAsyncReader(Call* call) : call_(call) {} + + virtual void Read(R* msg, void* tag) { + // TODO } private: - StreamContextInterface* const context_; // not owned + Call* call_; +}; + +template <class W> +class ServerAsyncWriter : public AsyncWriterInterface<W> { + public: + explicit ServerAsyncWriter(Call* call) : call_(call) {} + + virtual void Write(const W& msg, void* tag) { + // TODO + } + + private: + Call* call_; }; // Server-side interface for bi-directional streaming. template <class W, class R> -class ServerAsyncReaderWriter : public WriterInterface<W>, - public ReaderInterface<R> { +class ServerAsyncReaderWriter : public AsyncWriterInterface<W>, + public AsyncReaderInterface<R> { public: - explicit ServerAsyncReaderWriter(StreamContextInterface* context) - : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - } + explicit ServerAsyncReaderWriter(Call* call) : call_(call) {} - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual void Read(R* msg, void* tag) { + // TODO + } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual void Write(const W& msg, void* tag) { + // TODO } private: - StreamContextInterface* const context_; // not owned + Call* call_; }; } // namespace grpc |