diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-02-18 14:50:14 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-02-18 14:50:14 -0800 |
commit | 8a287d1a1bfbc2250abe43aa433397f68d07a632 (patch) | |
tree | 6b32636acbee57c60a7a098248774289576cadfe /include | |
parent | 1ff680a545c0212008bb568f198c91b48a27df89 (diff) | |
parent | 68bc778b63c1e82ec8c68cf9e2b9be23c0b9104d (diff) |
Merge pull request #588 from yang-g/c++api
Async client api change. Add a ClientAsyncResponseReader.
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc++/async_unary_call.h | 144 | ||||
-rw-r--r-- | include/grpc++/client_context.h | 4 | ||||
-rw-r--r-- | include/grpc++/impl/client_unary_call.h | 7 | ||||
-rw-r--r-- | include/grpc++/stream.h | 54 |
4 files changed, 148 insertions, 61 deletions
diff --git a/include/grpc++/async_unary_call.h b/include/grpc++/async_unary_call.h new file mode 100644 index 0000000000..105250ce9d --- /dev/null +++ b/include/grpc++/async_unary_call.h @@ -0,0 +1,144 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_ASYNC_UNARY_CALL_H__ +#define __GRPCPP_ASYNC_UNARY_CALL_H__ + +#include <grpc++/channel_interface.h> +#include <grpc++/client_context.h> +#include <grpc++/completion_queue.h> +#include <grpc++/server_context.h> +#include <grpc++/impl/call.h> +#include <grpc++/impl/service_type.h> +#include <grpc++/status.h> +#include <grpc/support/log.h> + +namespace grpc { +template <class R> +class ClientAsyncResponseReader final { + public: + ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, + const RpcMethod& method, ClientContext* context, + const google::protobuf::Message& request, void* tag) + : context_(context), + call_(channel->CreateCall(method, context, cq)) { + init_buf_.Reset(tag); + init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); + init_buf_.AddSendMessage(request); + init_buf_.AddClientSendClose(); + call_.PerformOps(&init_buf_); + } + + void ReadInitialMetadata(void* tag) { + GPR_ASSERT(!context_->initial_metadata_received_); + + meta_buf_.Reset(tag); + meta_buf_.AddRecvInitialMetadata(context_); + call_.PerformOps(&meta_buf_); + } + + void Finish(R* msg, Status* status, void* tag) { + finish_buf_.Reset(tag); + if (!context_->initial_metadata_received_) { + finish_buf_.AddRecvInitialMetadata(context_); + } + finish_buf_.AddRecvMessage(msg); + finish_buf_.AddClientRecvStatus(context_, status); + call_.PerformOps(&finish_buf_); + } + + + private: + ClientContext* context_ = nullptr; + Call call_; + CallOpBuffer init_buf_; + CallOpBuffer meta_buf_; + CallOpBuffer finish_buf_; +}; + +template <class W> +class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { + public: + explicit ServerAsyncResponseWriter(ServerContext* ctx) + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + + void SendInitialMetadata(void* tag) { + GPR_ASSERT(!ctx_->sent_initial_metadata_); + + meta_buf_.Reset(tag); + meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + call_.PerformOps(&meta_buf_); + } + + void Finish(const W& msg, const Status& status, void* tag) { + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + // The response is dropped if the status is not OK. + if (status.IsOk()) { + finish_buf_.AddSendMessage(msg); + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); + } + + void FinishWithError(const Status& status, void* tag) { + GPR_ASSERT(!status.IsOk()); + finish_buf_.Reset(tag); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + ctx_->sent_initial_metadata_ = true; + } + bool cancelled = false; + finish_buf_.AddServerRecvClose(&cancelled); + finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); + } + + private: + void BindCall(Call* call) override { call_ = *call; } + + Call call_; + ServerContext* ctx_; + CallOpBuffer meta_buf_; + CallOpBuffer finish_buf_; +}; + +} // namespace grpc + +#endif // __GRPCPP_ASYNC_UNARY_CALL_H__ diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 52bedd4d38..1e7e6bfad7 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -72,6 +72,8 @@ template <class W> class ClientAsyncWriter; template <class R, class W> class ClientAsyncReaderWriter; +template <class R> +class ClientAsyncResponseReader; class ClientContext { public: @@ -119,6 +121,8 @@ class ClientContext { friend class ::grpc::ClientAsyncWriter; template <class R, class W> friend class ::grpc::ClientAsyncReaderWriter; + template <class R> + friend class ::grpc::ClientAsyncResponseReader; grpc_call *call() { return call_; } void set_call(grpc_call *call) { diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h index a29621edb3..f25ded7a24 100644 --- a/include/grpc++/impl/client_unary_call.h +++ b/include/grpc++/impl/client_unary_call.h @@ -48,13 +48,6 @@ class CompletionQueue; class RpcMethod; class Status; -// Wrapper that begins an asynchronous unary call -void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method, - ClientContext *context, - const google::protobuf::Message &request, - google::protobuf::Message *result, Status *status, - CompletionQueue *cq, void *tag); - // Wrapper that performs a blocking unary call Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, ClientContext *context, diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 491dfc8136..01deac2ce1 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -550,60 +550,6 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, CallOpBuffer finish_buf_; }; -// TODO(yangg) Move out of stream.h -template <class W> -class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { - public: - explicit ServerAsyncResponseWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} - - void SendInitialMetadata(void* tag) { - GPR_ASSERT(!ctx_->sent_initial_metadata_); - - meta_buf_.Reset(tag); - meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); - } - - void Finish(const W& msg, const Status& status, void* tag) { - finish_buf_.Reset(tag); - if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - // The response is dropped if the status is not OK. - if (status.IsOk()) { - finish_buf_.AddSendMessage(msg); - } - bool cancelled = false; - finish_buf_.AddServerRecvClose(&cancelled); - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); - } - - void FinishWithError(const Status& status, void* tag) { - GPR_ASSERT(!status.IsOk()); - finish_buf_.Reset(tag); - if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); - ctx_->sent_initial_metadata_ = true; - } - bool cancelled = false; - finish_buf_.AddServerRecvClose(&cancelled); - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); - } - - private: - void BindCall(Call* call) override { call_ = *call; } - - Call call_; - ServerContext* ctx_; - CallOpBuffer meta_buf_; - CallOpBuffer finish_buf_; -}; - template <class W, class R> class ServerAsyncReader : public ServerAsyncStreamingInterface, public AsyncReaderInterface<R> { |