diff options
author | 2016-02-05 13:40:22 -0800 | |
---|---|---|
committer | 2016-02-05 13:40:22 -0800 | |
commit | 38004a8e399118b523fdebb7d7c1144355594b01 (patch) | |
tree | fc88f23167e93fda79f9a9cd9ce4b8b287d2df6e /include/grpc++/impl/codegen/async_unary_call.h | |
parent | 29cc1e46eba483f699ba92c68294930cb319457b (diff) |
Wrap groups of related CallOpSet's into a ref-counted structure
whenever appropriate so as to avoid any unintentional free-before-use
problems.
Potential performance issue: this triggers an additional allocation
for each Async call initiation, along with the cost of ref-counting
shared_ptr . But this is worth it for the additional safety provided
here without any change to the exposed C++ API.
Diffstat (limited to 'include/grpc++/impl/codegen/async_unary_call.h')
-rw-r--r-- | include/grpc++/impl/codegen/async_unary_call.h | 96 |
1 files changed, 61 insertions, 35 deletions
diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 481b20b535..c1c637e928 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -62,40 +62,53 @@ class ClientAsyncResponseReader GRPC_FINAL ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.SendInitialMetadata(context->send_initial_metadata_); + : context_(context), + call_(channel->CreateCall(method, context, cq)), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_); // TODO(ctiller): don't assert - GPR_ASSERT(init_buf_.SendMessage(request).ok()); - init_buf_.ClientSendClose(); - call_.PerformOps(&init_buf_); + GPR_ASSERT(collection_->init_buf_.SendMessage(request).ok()); + collection_->init_buf_.ClientSendClose(); + call_.PerformOps(&collection_->init_buf_); } void ReadInitialMetadata(void* tag) { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.set_output_tag(tag); - meta_buf_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); + collection_->meta_buf_.set_output_tag(tag); + collection_->meta_buf_.RecvInitialMetadata(context_); + call_.PerformOps(&collection_->meta_buf_); } void Finish(R* msg, Status* status, void* tag) { - finish_buf_.set_output_tag(tag); + collection_->finish_buf_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.RecvInitialMetadata(context_); + collection_->finish_buf_.RecvInitialMetadata(context_); } - finish_buf_.RecvMessage(msg); - finish_buf_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); + collection_->finish_buf_.RecvMessage(msg); + collection_->finish_buf_.ClientRecvStatus(context_, status); + call_.PerformOps(&collection_->finish_buf_); } private: ClientContext* context_; Call call_; - SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> init_buf_; - CallOpSet<CallOpRecvInitialMetadata> meta_buf_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, - CallOpClientRecvStatus> finish_buf_; + + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + init_buf_.SetCollection(shared_from_this()); + meta_buf_.SetCollection(shared_from_this()); + finish_buf_.SetCollection(shared_from_this()); + } + SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> init_buf_; + CallOpSet<CallOpRecvInitialMetadata> meta_buf_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, + CallOpClientRecvStatus> finish_buf_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; template <class W> @@ -103,42 +116,47 @@ class ServerAsyncResponseWriter GRPC_FINAL : public ServerAsyncStreamingInterface { public: explicit ServerAsyncResponseWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + : call_(nullptr, nullptr, nullptr), + ctx_(ctx), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + } void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.set_output_tag(tag); - meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->meta_buf_.set_output_tag(tag); + collection_->meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); + call_.PerformOps(&collection_->meta_buf_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_buf_.set_output_tag(tag); + collection_->finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.ok()) { - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, - finish_buf_.SendMessage(msg)); + collection_->finish_buf_.ServerSendStatus( + ctx_->trailing_metadata_, collection_->finish_buf_.SendMessage(msg)); } else { - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); + collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, + status); } - call_.PerformOps(&finish_buf_); + call_.PerformOps(&collection_->finish_buf_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.ok()); - finish_buf_.set_output_tag(tag); + collection_->finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&collection_->finish_buf_); } private: @@ -146,9 +164,17 @@ class ServerAsyncResponseWriter GRPC_FINAL Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_buf_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> finish_buf_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + meta_buf_.SetCollection(shared_from_this()); + finish_buf_.SetCollection(shared_from_this()); + } + CallOpSet<CallOpSendInitialMetadata> meta_buf_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> finish_buf_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; } // namespace grpc |