diff options
author | Nicolas Noble <nicolasnoble@users.noreply.github.com> | 2016-02-08 15:11:24 -0800 |
---|---|---|
committer | Nicolas Noble <nicolasnoble@users.noreply.github.com> | 2016-02-08 15:11:24 -0800 |
commit | e2c7671170802f8e4fd749eab178c0ce373f7023 (patch) | |
tree | 32e316f8f54f3a9ca8556271cb884ad289e6f723 | |
parent | 757c157d861dff34bb85b4f6e10d0708fecb88ce (diff) | |
parent | 4776292fc3053209485443a6a89b279bb2bc5bb5 (diff) |
Merge pull request #5099 from vjpai/cpp_races
Move internals of many C++ Async classes to ref-counted struct
-rw-r--r-- | include/grpc++/impl/codegen/async_unary_call.h | 46 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/call.h | 18 |
2 files changed, 46 insertions, 18 deletions
diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 481b20b535..f3c75dc3b1 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -62,40 +62,50 @@ class ClientAsyncResponseReader GRPC_FINAL ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.SendInitialMetadata(context->send_initial_metadata_); + : context_(context), + call_(channel->CreateCall(method, context, cq)), + collection_(new CallOpSetCollection) { + collection_->init_buf_.SetCollection(collection_); + collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_); // TODO(ctiller): don't assert - GPR_ASSERT(init_buf_.SendMessage(request).ok()); - init_buf_.ClientSendClose(); - call_.PerformOps(&init_buf_); + GPR_ASSERT(collection_->init_buf_.SendMessage(request).ok()); + collection_->init_buf_.ClientSendClose(); + call_.PerformOps(&collection_->init_buf_); } void ReadInitialMetadata(void* tag) { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.set_output_tag(tag); - meta_buf_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); + collection_->meta_buf_.SetCollection(collection_); + collection_->meta_buf_.set_output_tag(tag); + collection_->meta_buf_.RecvInitialMetadata(context_); + call_.PerformOps(&collection_->meta_buf_); } void Finish(R* msg, Status* status, void* tag) { - finish_buf_.set_output_tag(tag); + collection_->finish_buf_.SetCollection(collection_); + collection_->finish_buf_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.RecvInitialMetadata(context_); + collection_->finish_buf_.RecvInitialMetadata(context_); } - finish_buf_.RecvMessage(msg); - finish_buf_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); + collection_->finish_buf_.RecvMessage(msg); + collection_->finish_buf_.ClientRecvStatus(context_, status); + call_.PerformOps(&collection_->finish_buf_); } private: ClientContext* context_; Call call_; - SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> init_buf_; - CallOpSet<CallOpRecvInitialMetadata> meta_buf_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, - CallOpClientRecvStatus> finish_buf_; + + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> init_buf_; + CallOpSet<CallOpRecvInitialMetadata> meta_buf_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, + CallOpClientRecvStatus> finish_buf_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; template <class W> diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 1e06768ac4..e65349ddd3 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -472,6 +472,17 @@ class CallOpClientRecvStatus { size_t status_details_capacity_; }; +/// An abstract collection of CallOpSet's, to be used whenever +/// CallOpSet objects must be thought of as a group. Each member +/// of the group should have a shared_ptr back to the collection, +/// as will the object that instantiates the collection, allowing +/// for automatic ref-counting. In practice, any actual use should +/// derive from this base class. This is specifically necessary if +/// some of the CallOpSet's in the collection are "Sneaky" and don't +/// report back to the C++ layer CQ operations +class CallOpSetCollectionInterface + : public std::enable_shared_from_this<CallOpSetCollectionInterface> {}; + /// An abstract collection of call ops, used to generate the /// grpc_call_op structure to pass down to the lower layers, /// and as it is-a CompletionQueueTag, also massages the final @@ -488,8 +499,14 @@ class CallOpSetInterface : public CompletionQueueTag { max_message_size_ = max_message_size; } + /// Mark this as belonging to a collection if needed + void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) { + collection_ = collection; + } + protected: int max_message_size_; + std::shared_ptr<CallOpSetCollectionInterface> collection_; }; /// Primary implementaiton of CallOpSetInterface. @@ -527,6 +544,7 @@ class CallOpSet : public CallOpSetInterface, this->Op5::FinishOp(status, max_message_size_); this->Op6::FinishOp(status, max_message_size_); *tag = return_tag_; + collection_.reset(); // drop the ref at this point return true; } |