aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-02-12 10:24:39 -0800
committerGravatar Yang Gao <yangg@google.com>2015-02-12 10:24:39 -0800
commit424bc92e377ad0f2aed23bb4bcde6bb06aa49774 (patch)
tree3c7e8f639af7962fa74a269bccb1427c296121e2 /include/grpc++
parent02a6e3a76ad7ea94695bb51813674e8f1e774d0d (diff)
implement ClientAsyncX api
Diffstat (limited to 'include/grpc++')
-rw-r--r--include/grpc++/stream.h105
1 files changed, 85 insertions, 20 deletions
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 74e7539aa4..52a764bfc4 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -365,6 +365,8 @@ class ClientAsyncStreamingInterface {
public:
virtual ~ClientAsyncStreamingInterface() {}
+ virtual void ReadInitialMetadata(void* tag) = 0;
+
virtual void Finish(Status* status, void* tag) = 0;
};
@@ -390,30 +392,50 @@ template <class R>
class ClientAsyncReader final : public ClientAsyncStreamingInterface,
public AsyncReaderInterface<R> {
public:
- // Blocking create a stream and write the first request out.
+ // Create a stream and write the first request out.
ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request, void* tag)
- : call_(channel->CreateCall(method, context, &cq_)) {
+ : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
init_buf_.Reset(tag);
+ init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
init_buf_.AddSendMessage(request);
init_buf_.AddClientSendClose();
call_.PerformOps(&init_buf_);
}
- virtual void Read(R *msg, void* tag) override {
+ void ReadInitialMetadata(void* tag) override {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ CallOpBuffer buf;
+ buf.Reset(tag);
+ buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ call_.PerformOps(&buf);
+ context_->initial_metadata_received_ = true;
+ }
+
+ void Read(R *msg, void* tag) override {
read_buf_.Reset(tag);
+ if (!context_->initial_metadata_received_) {
+ read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
}
- virtual void Finish(Status* status, void* tag) override {
+ void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag);
- finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata
+ if (!context_->initial_metadata_received_) {
+ finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
private:
+ ClientContext* context_ = nullptr;
CompletionQueue cq_;
Call call_;
CallOpBuffer init_buf_;
@@ -425,37 +447,56 @@ template <class W>
class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
public WriterInterface<W> {
public:
- // Blocking create a stream.
ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
- google::protobuf::Message *response)
- : response_(response),
- call_(channel->CreateCall(method, context, &cq_)) {}
+ google::protobuf::Message *response, void* tag)
+ : context_(context), response_(response),
+ call_(channel->CreateCall(method, context, &cq_)) {
+ init_buf_.Reset(tag);
+ init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
+ call_.PerformOps(&init_buf_);
+ }
- virtual void Write(const W& msg, void* tag) override {
+ void ReadInitialMetadata(void* tag) override {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ CallOpBuffer buf;
+ buf.Reset(tag);
+ buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ call_.PerformOps(&buf);
+ context_->initial_metadata_received_ = true;
+ }
+
+ void Write(const W& msg, void* tag) override {
write_buf_.Reset(tag);
write_buf_.AddSendMessage(msg);
call_.PerformOps(&write_buf_);
}
- virtual void WritesDone(void* tag) override {
+ void WritesDone(void* tag) override {
writes_done_buf_.Reset(tag);
writes_done_buf_.AddClientSendClose();
call_.PerformOps(&writes_done_buf_);
}
- virtual void Finish(Status* status, void* tag) override {
+ void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag);
+ if (!context_->initial_metadata_received_) {
+ finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
finish_buf_.AddRecvMessage(response_, &got_message_);
- finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata
+ finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
private:
+ ClientContext* context_ = nullptr;
google::protobuf::Message *const response_;
bool got_message_;
CompletionQueue cq_;
Call call_;
+ CallOpBuffer init_buf_;
CallOpBuffer write_buf_;
CallOpBuffer writes_done_buf_;
CallOpBuffer finish_buf_;
@@ -468,36 +509,60 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
public AsyncReaderInterface<R> {
public:
ClientAsyncReaderWriter(ChannelInterface *channel,
- const RpcMethod &method, ClientContext *context)
- : call_(channel->CreateCall(method, context, &cq_)) {}
+ const RpcMethod &method, ClientContext *context, void* tag)
+ : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
+ init_buf_.Reset(tag);
+ init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
+ call_.PerformOps(&init_buf_);
+ }
+
+ void ReadInitialMetadata(void* tag) override {
+ GPR_ASSERT(!context_->initial_metadata_received_);
- virtual void Read(R *msg, void* tag) override {
+ CallOpBuffer buf;
+ buf.Reset(tag);
+ buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ call_.PerformOps(&buf);
+ context_->initial_metadata_received_ = true;
+ }
+
+ void Read(R *msg, void* tag) override {
read_buf_.Reset(tag);
+ if (!context_->initial_metadata_received_) {
+ read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
}
- virtual void Write(const W& msg, void* tag) override {
+ void Write(const W& msg, void* tag) override {
write_buf_.Reset(tag);
write_buf_.AddSendMessage(msg);
call_.PerformOps(&write_buf_);
}
- virtual void WritesDone(void* tag) override {
+ void WritesDone(void* tag) override {
writes_done_buf_.Reset(tag);
writes_done_buf_.AddClientSendClose();
call_.PerformOps(&writes_done_buf_);
}
- virtual void Finish(Status* status, void* tag) override {
+ void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag);
- finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata
+ if (!context_->initial_metadata_received_) {
+ finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
+ context_->initial_metadata_received_ = true;
+ }
+ finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
call_.PerformOps(&finish_buf_);
}
private:
+ ClientContext* context_ = nullptr;
CompletionQueue cq_;
Call call_;
+ CallOpBuffer init_buf_;
CallOpBuffer read_buf_;
CallOpBuffer write_buf_;
CallOpBuffer writes_done_buf_;