diff options
author | Craig Tiller <ctiller@google.com> | 2015-02-12 14:57:17 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-02-12 14:57:17 -0800 |
commit | d4ebeeb7fbd61031c9b3db013e07195f31013e89 (patch) | |
tree | 51133d03c724fbae1d0cca1266a9f292592b7f5a /include | |
parent | 3d6ceb646178ce7a5b0de38c38ba75da448fae39 (diff) |
Async server streaming
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc++/stream.h | 58 |
1 files changed, 33 insertions, 25 deletions
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 2f37cc4075..23387e78ab 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -574,8 +574,8 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, template <class W> class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { public: - ServerAsyncResponseWriter(Call* call, ServerContext* ctx) - : call_(call), ctx_(ctx) {} + explicit ServerAsyncResponseWriter(ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) { GPR_ASSERT(!ctx_->sent_initial_metadata_); @@ -583,7 +583,7 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { meta_buf_.Reset(tag); meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&meta_buf_); + call_.PerformOps(&meta_buf_); } void Finish(const W& msg, const Status& status, void* tag) { @@ -599,7 +599,7 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { bool cancelled = false; finish_buf_.AddServerRecvClose(&cancelled); finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_->PerformOps(&finish_buf_); + call_.PerformOps(&finish_buf_); } void FinishWithError(const Status& status, void* tag) { @@ -612,11 +612,13 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { bool cancelled = false; finish_buf_.AddServerRecvClose(&cancelled); finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_->PerformOps(&finish_buf_); + call_.PerformOps(&finish_buf_); } private: - Call* call_; + void BindCall(Call *call) override { call_ = *call; } + + Call call_; ServerContext* ctx_; CallOpBuffer meta_buf_; CallOpBuffer finish_buf_; @@ -626,8 +628,8 @@ template <class R> class ServerAsyncReader : public ServerAsyncStreamingInterface, public AsyncReaderInterface<R> { public: - ServerAsyncReader(Call* call, ServerContext* ctx) - : call_(call), ctx_(ctx) {} + explicit ServerAsyncReader(ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) override { GPR_ASSERT(!ctx_->sent_initial_metadata_); @@ -635,13 +637,13 @@ class ServerAsyncReader : public ServerAsyncStreamingInterface, meta_buf_.Reset(tag); meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&meta_buf_); + call_.PerformOps(&meta_buf_); } void Read(R* msg, void* tag) override { read_buf_.Reset(tag); read_buf_.AddRecvMessage(msg); - call_->PerformOps(&read_buf_); + call_.PerformOps(&read_buf_); } void Finish(const Status& status, void* tag) override { @@ -653,12 +655,14 @@ class ServerAsyncReader : public ServerAsyncStreamingInterface, bool cancelled = false; finish_buf_.AddServerRecvClose(&cancelled); finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_->PerformOps(&finish_buf_); + call_.PerformOps(&finish_buf_); } private: - Call* call_; + void BindCall(Call *call) override { call_ = *call; } + + Call call_; ServerContext* ctx_; CallOpBuffer meta_buf_; CallOpBuffer read_buf_; @@ -669,8 +673,8 @@ template <class W> class ServerAsyncWriter : public ServerAsyncStreamingInterface, public AsyncWriterInterface<W> { public: - ServerAsyncWriter(Call* call, ServerContext* ctx) - : call_(call), ctx_(ctx) {} + explicit ServerAsyncWriter(ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) override { GPR_ASSERT(!ctx_->sent_initial_metadata_); @@ -678,7 +682,7 @@ class ServerAsyncWriter : public ServerAsyncStreamingInterface, meta_buf_.Reset(tag); meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&meta_buf_); + call_.PerformOps(&meta_buf_); } void Write(const W& msg, void* tag) override { @@ -688,7 +692,7 @@ class ServerAsyncWriter : public ServerAsyncStreamingInterface, ctx_->sent_initial_metadata_ = true; } write_buf_.AddSendMessage(msg); - call_->PerformOps(&write_buf_); + call_.PerformOps(&write_buf_); } void Finish(const Status& status, void* tag) override { @@ -700,11 +704,13 @@ class ServerAsyncWriter : public ServerAsyncStreamingInterface, bool cancelled = false; finish_buf_.AddServerRecvClose(&cancelled); finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_->PerformOps(&finish_buf_); + call_.PerformOps(&finish_buf_); } private: - Call* call_; + void BindCall(Call *call) override { call_ = *call; } + + Call call_; ServerContext* ctx_; CallOpBuffer meta_buf_; CallOpBuffer write_buf_; @@ -717,8 +723,8 @@ class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface, public AsyncWriterInterface<W>, public AsyncReaderInterface<R> { public: - ServerAsyncReaderWriter(Call* call, ServerContext* ctx) - : call_(call), ctx_(ctx) {} + explicit ServerAsyncReaderWriter(ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) override { GPR_ASSERT(!ctx_->sent_initial_metadata_); @@ -726,13 +732,13 @@ class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface, meta_buf_.Reset(tag); meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&meta_buf_); + call_.PerformOps(&meta_buf_); } virtual void Read(R* msg, void* tag) override { read_buf_.Reset(tag); read_buf_.AddRecvMessage(msg); - call_->PerformOps(&read_buf_); + call_.PerformOps(&read_buf_); } virtual void Write(const W& msg, void* tag) override { @@ -742,7 +748,7 @@ class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface, ctx_->sent_initial_metadata_ = true; } write_buf_.AddSendMessage(msg); - call_->PerformOps(&write_buf_); + call_.PerformOps(&write_buf_); } void Finish(const Status& status, void* tag) override { @@ -754,11 +760,13 @@ class ServerAsyncReaderWriter : public ServerAsyncStreamingInterface, bool cancelled = false; finish_buf_.AddServerRecvClose(&cancelled); finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_->PerformOps(&finish_buf_); + call_.PerformOps(&finish_buf_); } private: - Call* call_; + void BindCall(Call *call) override { call_ = *call; } + + Call call_; ServerContext* ctx_; CallOpBuffer meta_buf_; CallOpBuffer read_buf_; |