diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-02-09 22:13:44 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-02-09 22:13:44 -0800 |
commit | 549a74daa87c871b7bf47d11f6f0539f3235b631 (patch) | |
tree | a9fcaeaf2b4e31639ddba835e855a149e36425e4 /include/grpc++ | |
parent | 80e00a8c63bd801b697fbe0cd1d8e00b14a81c76 (diff) |
Rephrase async streaming methods
To ensure that the CallOpBuffers stay alive until completion.
Diffstat (limited to 'include/grpc++')
-rw-r--r-- | include/grpc++/call.h | 2 | ||||
-rw-r--r-- | include/grpc++/stream.h | 78 |
2 files changed, 46 insertions, 34 deletions
diff --git a/include/grpc++/call.h b/include/grpc++/call.h index 94215bfa98..de789febe6 100644 --- a/include/grpc++/call.h +++ b/include/grpc++/call.h @@ -57,6 +57,8 @@ class CallOpBuffer final : public CompletionQueueTag { public: CallOpBuffer() : return_tag_(this) {} + void Reset(void *next_return_tag); + void AddSendInitialMetadata(std::vector<std::pair<grpc::string, grpc::string> > *metadata); void AddSendMessage(const google::protobuf::Message &message); void AddRecvMessage(google::protobuf::Message *message); diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index ca32d60810..631183ea55 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -330,27 +330,30 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface, ClientContext *context, const google::protobuf::Message &request, void* tag) : call_(channel->CreateCall(method, context, &cq_)) { - CallOpBuffer buf; - buf.AddSendMessage(request); - buf.AddClientSendClose(); - call_.PerformOps(&buf); + init_buf_.Reset(tag); + init_buf_.AddSendMessage(request); + init_buf_.AddClientSendClose(); + call_.PerformOps(&init_buf_); } virtual void Read(R *msg, void* tag) override { - CallOpBuffer buf; - buf.AddRecvMessage(msg); - call_.PerformOps(&buf); + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); } virtual void Finish(Status* status, void* tag) override { - CallOpBuffer buf; - buf.AddClientRecvStatus(status); - call_.PerformOps(&buf); + finish_buf_.Reset(tag); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); } private: CompletionQueue cq_; Call call_; + CallOpBuffer init_buf_; + CallOpBuffer read_buf_; + CallOpBuffer finish_buf_; }; template <class W> @@ -365,28 +368,31 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface, call_(channel->CreateCall(method, context, &cq_)) {} virtual void Write(const W& msg, void* tag) override { - CallOpBuffer buf; - buf.AddSendMessage(msg); - call_.PerformOps(&buf); + write_buf_.Reset(tag); + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); } - virtual void WritesDone(void* tag) { - CallOpBuffer buf; - buf.AddClientSendClose(); - call_.PerformOps(&buf); + virtual void WritesDone(void* tag) override { + writes_done_buf_.Reset(tag); + writes_done_buf_.AddClientSendClose(); + call_.PerformOps(&writes_done_buf_); } virtual void Finish(Status* status, void* tag) override { - CallOpBuffer buf; - buf.AddRecvMessage(response_); - buf.AddClientRecvStatus(status); - call_.PerformOps(&buf); + finish_buf_.Reset(tag); + finish_buf_.AddRecvMessage(response_); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); } private: google::protobuf::Message *const response_; CompletionQueue cq_; Call call_; + CallOpBuffer write_buf_; + CallOpBuffer writes_done_buf_; + CallOpBuffer finish_buf_; }; // Client-side interface for bi-directional streaming. @@ -400,32 +406,36 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, : call_(channel->CreateCall(method, context, &cq_)) {} virtual void Read(R *msg, void* tag) override { - CallOpBuffer buf; - buf.AddRecvMessage(msg); - call_.PerformOps(&buf); + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); } virtual void Write(const W& msg, void* tag) override { - CallOpBuffer buf; - buf.AddSendMessage(msg); - call_.PerformOps(&buf); + write_buf_.Reset(tag); + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); } - virtual void WritesDone(void* tag) { - CallOpBuffer buf; - buf.AddClientSendClose(); - call_.PerformOps(&buf); + virtual void WritesDone(void* tag) override { + writes_done_buf_.Reset(tag); + writes_done_buf_.AddClientSendClose(); + call_.PerformOps(&writes_done_buf_); } virtual void Finish(Status* status, void* tag) override { - CallOpBuffer buf; - buf.AddClientRecvStatus(status); - call_.PerformOps(&buf); + finish_buf_.Reset(tag); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); } private: CompletionQueue cq_; Call call_; + CallOpBuffer read_buf_; + CallOpBuffer write_buf_; + CallOpBuffer writes_done_buf_; + CallOpBuffer finish_buf_; }; // TODO(yangg) Move out of stream.h |