diff options
author | Yang Gao <yangg@google.com> | 2015-02-12 10:24:39 -0800 |
---|---|---|
committer | Yang Gao <yangg@google.com> | 2015-02-12 10:24:39 -0800 |
commit | 424bc92e377ad0f2aed23bb4bcde6bb06aa49774 (patch) | |
tree | 3c7e8f639af7962fa74a269bccb1427c296121e2 /include/grpc++ | |
parent | 02a6e3a76ad7ea94695bb51813674e8f1e774d0d (diff) |
implement ClientAsyncX api
Diffstat (limited to 'include/grpc++')
-rw-r--r-- | include/grpc++/stream.h | 105 |
1 files changed, 85 insertions, 20 deletions
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 74e7539aa4..52a764bfc4 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -365,6 +365,8 @@ class ClientAsyncStreamingInterface { public: virtual ~ClientAsyncStreamingInterface() {} + virtual void ReadInitialMetadata(void* tag) = 0; + virtual void Finish(Status* status, void* tag) = 0; }; @@ -390,30 +392,50 @@ template <class R> class ClientAsyncReader final : public ClientAsyncStreamingInterface, public AsyncReaderInterface<R> { public: - // Blocking create a stream and write the first request out. + // Create a stream and write the first request out. ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, const google::protobuf::Message &request, void* tag) - : call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { init_buf_.Reset(tag); + init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); init_buf_.AddSendMessage(request); init_buf_.AddClientSendClose(); call_.PerformOps(&init_buf_); } - virtual void Read(R *msg, void* tag) override { + void ReadInitialMetadata(void* tag) override { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpBuffer buf; + buf.Reset(tag); + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + context_->initial_metadata_received_ = true; + } + + void Read(R *msg, void* tag) override { read_buf_.Reset(tag); + if (!context_->initial_metadata_received_) { + read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } read_buf_.AddRecvMessage(msg); call_.PerformOps(&read_buf_); } - virtual void Finish(Status* status, void* tag) override { + void Finish(Status* status, void* tag) override { finish_buf_.Reset(tag); - finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata + if (!context_->initial_metadata_received_) { + finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } + finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } private: + ClientContext* context_ = nullptr; CompletionQueue cq_; Call call_; CallOpBuffer init_buf_; @@ -425,37 +447,56 @@ template <class W> class ClientAsyncWriter final : public ClientAsyncStreamingInterface, public WriterInterface<W> { public: - // Blocking create a stream. ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, - google::protobuf::Message *response) - : response_(response), - call_(channel->CreateCall(method, context, &cq_)) {} + google::protobuf::Message *response, void* tag) + : context_(context), response_(response), + call_(channel->CreateCall(method, context, &cq_)) { + init_buf_.Reset(tag); + init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); + call_.PerformOps(&init_buf_); + } - virtual void Write(const W& msg, void* tag) override { + void ReadInitialMetadata(void* tag) override { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpBuffer buf; + buf.Reset(tag); + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + context_->initial_metadata_received_ = true; + } + + void Write(const W& msg, void* tag) override { write_buf_.Reset(tag); write_buf_.AddSendMessage(msg); call_.PerformOps(&write_buf_); } - virtual void WritesDone(void* tag) override { + void WritesDone(void* tag) override { writes_done_buf_.Reset(tag); writes_done_buf_.AddClientSendClose(); call_.PerformOps(&writes_done_buf_); } - virtual void Finish(Status* status, void* tag) override { + void Finish(Status* status, void* tag) override { finish_buf_.Reset(tag); + if (!context_->initial_metadata_received_) { + finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } finish_buf_.AddRecvMessage(response_, &got_message_); - finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata + finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } private: + ClientContext* context_ = nullptr; google::protobuf::Message *const response_; bool got_message_; CompletionQueue cq_; Call call_; + CallOpBuffer init_buf_; CallOpBuffer write_buf_; CallOpBuffer writes_done_buf_; CallOpBuffer finish_buf_; @@ -468,36 +509,60 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, public AsyncReaderInterface<R> { public: ClientAsyncReaderWriter(ChannelInterface *channel, - const RpcMethod &method, ClientContext *context) - : call_(channel->CreateCall(method, context, &cq_)) {} + const RpcMethod &method, ClientContext *context, void* tag) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + init_buf_.Reset(tag); + init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); + call_.PerformOps(&init_buf_); + } + + void ReadInitialMetadata(void* tag) override { + GPR_ASSERT(!context_->initial_metadata_received_); - virtual void Read(R *msg, void* tag) override { + CallOpBuffer buf; + buf.Reset(tag); + buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + call_.PerformOps(&buf); + context_->initial_metadata_received_ = true; + } + + void Read(R *msg, void* tag) override { read_buf_.Reset(tag); + if (!context_->initial_metadata_received_) { + read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } read_buf_.AddRecvMessage(msg); call_.PerformOps(&read_buf_); } - virtual void Write(const W& msg, void* tag) override { + void Write(const W& msg, void* tag) override { write_buf_.Reset(tag); write_buf_.AddSendMessage(msg); call_.PerformOps(&write_buf_); } - virtual void WritesDone(void* tag) override { + void WritesDone(void* tag) override { writes_done_buf_.Reset(tag); writes_done_buf_.AddClientSendClose(); call_.PerformOps(&writes_done_buf_); } - virtual void Finish(Status* status, void* tag) override { + void Finish(Status* status, void* tag) override { finish_buf_.Reset(tag); - finish_buf_.AddClientRecvStatus(nullptr, status); // TODO metadata + if (!context_->initial_metadata_received_) { + finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_); + context_->initial_metadata_received_ = true; + } + finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } private: + ClientContext* context_ = nullptr; CompletionQueue cq_; Call call_; + CallOpBuffer init_buf_; CallOpBuffer read_buf_; CallOpBuffer write_buf_; CallOpBuffer writes_done_buf_; |