aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-09 22:13:44 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-09 22:13:44 -0800
commit549a74daa87c871b7bf47d11f6f0539f3235b631 (patch)
treea9fcaeaf2b4e31639ddba835e855a149e36425e4 /include/grpc++
parent80e00a8c63bd801b697fbe0cd1d8e00b14a81c76 (diff)
Rephrase async streaming methods
To ensure that the CallOpBuffers stay alive until completion.
Diffstat (limited to 'include/grpc++')
-rw-r--r--include/grpc++/call.h2
-rw-r--r--include/grpc++/stream.h78
2 files changed, 46 insertions, 34 deletions
diff --git a/include/grpc++/call.h b/include/grpc++/call.h
index 94215bfa98..de789febe6 100644
--- a/include/grpc++/call.h
+++ b/include/grpc++/call.h
@@ -57,6 +57,8 @@ class CallOpBuffer final : public CompletionQueueTag {
public:
CallOpBuffer() : return_tag_(this) {}
+ void Reset(void *next_return_tag);
+
void AddSendInitialMetadata(std::vector<std::pair<grpc::string, grpc::string> > *metadata);
void AddSendMessage(const google::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message);
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index ca32d60810..631183ea55 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -330,27 +330,30 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface,
ClientContext *context,
const google::protobuf::Message &request, void* tag)
: call_(channel->CreateCall(method, context, &cq_)) {
- CallOpBuffer buf;
- buf.AddSendMessage(request);
- buf.AddClientSendClose();
- call_.PerformOps(&buf);
+ init_buf_.Reset(tag);
+ init_buf_.AddSendMessage(request);
+ init_buf_.AddClientSendClose();
+ call_.PerformOps(&init_buf_);
}
virtual void Read(R *msg, void* tag) override {
- CallOpBuffer buf;
- buf.AddRecvMessage(msg);
- call_.PerformOps(&buf);
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
}
virtual void Finish(Status* status, void* tag) override {
- CallOpBuffer buf;
- buf.AddClientRecvStatus(status);
- call_.PerformOps(&buf);
+ finish_buf_.Reset(tag);
+ finish_buf_.AddClientRecvStatus(status);
+ call_.PerformOps(&finish_buf_);
}
private:
CompletionQueue cq_;
Call call_;
+ CallOpBuffer init_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer finish_buf_;
};
template <class W>
@@ -365,28 +368,31 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
call_(channel->CreateCall(method, context, &cq_)) {}
virtual void Write(const W& msg, void* tag) override {
- CallOpBuffer buf;
- buf.AddSendMessage(msg);
- call_.PerformOps(&buf);
+ write_buf_.Reset(tag);
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
}
- virtual void WritesDone(void* tag) {
- CallOpBuffer buf;
- buf.AddClientSendClose();
- call_.PerformOps(&buf);
+ virtual void WritesDone(void* tag) override {
+ writes_done_buf_.Reset(tag);
+ writes_done_buf_.AddClientSendClose();
+ call_.PerformOps(&writes_done_buf_);
}
virtual void Finish(Status* status, void* tag) override {
- CallOpBuffer buf;
- buf.AddRecvMessage(response_);
- buf.AddClientRecvStatus(status);
- call_.PerformOps(&buf);
+ finish_buf_.Reset(tag);
+ finish_buf_.AddRecvMessage(response_);
+ finish_buf_.AddClientRecvStatus(status);
+ call_.PerformOps(&finish_buf_);
}
private:
google::protobuf::Message *const response_;
CompletionQueue cq_;
Call call_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer writes_done_buf_;
+ CallOpBuffer finish_buf_;
};
// Client-side interface for bi-directional streaming.
@@ -400,32 +406,36 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
: call_(channel->CreateCall(method, context, &cq_)) {}
virtual void Read(R *msg, void* tag) override {
- CallOpBuffer buf;
- buf.AddRecvMessage(msg);
- call_.PerformOps(&buf);
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
}
virtual void Write(const W& msg, void* tag) override {
- CallOpBuffer buf;
- buf.AddSendMessage(msg);
- call_.PerformOps(&buf);
+ write_buf_.Reset(tag);
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
}
- virtual void WritesDone(void* tag) {
- CallOpBuffer buf;
- buf.AddClientSendClose();
- call_.PerformOps(&buf);
+ virtual void WritesDone(void* tag) override {
+ writes_done_buf_.Reset(tag);
+ writes_done_buf_.AddClientSendClose();
+ call_.PerformOps(&writes_done_buf_);
}
virtual void Finish(Status* status, void* tag) override {
- CallOpBuffer buf;
- buf.AddClientRecvStatus(status);
- call_.PerformOps(&buf);
+ finish_buf_.Reset(tag);
+ finish_buf_.AddClientRecvStatus(status);
+ call_.PerformOps(&finish_buf_);
}
private:
CompletionQueue cq_;
Call call_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer writes_done_buf_;
+ CallOpBuffer finish_buf_;
};
// TODO(yangg) Move out of stream.h