aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Nicolas Noble <nicolasnoble@users.noreply.github.com>2016-02-08 15:11:24 -0800
committerGravatar Nicolas Noble <nicolasnoble@users.noreply.github.com>2016-02-08 15:11:24 -0800
commite2c7671170802f8e4fd749eab178c0ce373f7023 (patch)
tree32e316f8f54f3a9ca8556271cb884ad289e6f723
parent757c157d861dff34bb85b4f6e10d0708fecb88ce (diff)
parent4776292fc3053209485443a6a89b279bb2bc5bb5 (diff)
Merge pull request #5099 from vjpai/cpp_races
Move internals of many C++ Async classes to ref-counted struct
-rw-r--r--include/grpc++/impl/codegen/async_unary_call.h46
-rw-r--r--include/grpc++/impl/codegen/call.h18
2 files changed, 46 insertions, 18 deletions
diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h
index 481b20b535..f3c75dc3b1 100644
--- a/include/grpc++/impl/codegen/async_unary_call.h
+++ b/include/grpc++/impl/codegen/async_unary_call.h
@@ -62,40 +62,50 @@ class ClientAsyncResponseReader GRPC_FINAL
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const W& request)
- : context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_buf_.SendInitialMetadata(context->send_initial_metadata_);
+ : context_(context),
+ call_(channel->CreateCall(method, context, cq)),
+ collection_(new CallOpSetCollection) {
+ collection_->init_buf_.SetCollection(collection_);
+ collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
- GPR_ASSERT(init_buf_.SendMessage(request).ok());
- init_buf_.ClientSendClose();
- call_.PerformOps(&init_buf_);
+ GPR_ASSERT(collection_->init_buf_.SendMessage(request).ok());
+ collection_->init_buf_.ClientSendClose();
+ call_.PerformOps(&collection_->init_buf_);
}
void ReadInitialMetadata(void* tag) {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.set_output_tag(tag);
- meta_buf_.RecvInitialMetadata(context_);
- call_.PerformOps(&meta_buf_);
+ collection_->meta_buf_.SetCollection(collection_);
+ collection_->meta_buf_.set_output_tag(tag);
+ collection_->meta_buf_.RecvInitialMetadata(context_);
+ call_.PerformOps(&collection_->meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
- finish_buf_.set_output_tag(tag);
+ collection_->finish_buf_.SetCollection(collection_);
+ collection_->finish_buf_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.RecvInitialMetadata(context_);
+ collection_->finish_buf_.RecvInitialMetadata(context_);
}
- finish_buf_.RecvMessage(msg);
- finish_buf_.ClientRecvStatus(context_, status);
- call_.PerformOps(&finish_buf_);
+ collection_->finish_buf_.RecvMessage(msg);
+ collection_->finish_buf_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&collection_->finish_buf_);
}
private:
ClientContext* context_;
Call call_;
- SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose> init_buf_;
- CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
- CallOpClientRecvStatus> finish_buf_;
+
+ class CallOpSetCollection : public CallOpSetCollectionInterface {
+ public:
+ SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose> init_buf_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
+ CallOpClientRecvStatus> finish_buf_;
+ };
+ std::shared_ptr<CallOpSetCollection> collection_;
};
template <class W>
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index 1e06768ac4..e65349ddd3 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -472,6 +472,17 @@ class CallOpClientRecvStatus {
size_t status_details_capacity_;
};
+/// An abstract collection of CallOpSet's, to be used whenever
+/// CallOpSet objects must be thought of as a group. Each member
+/// of the group should have a shared_ptr back to the collection,
+/// as will the object that instantiates the collection, allowing
+/// for automatic ref-counting. In practice, any actual use should
+/// derive from this base class. This is specifically necessary if
+/// some of the CallOpSet's in the collection are "Sneaky" and don't
+/// report back to the C++ layer CQ operations
+class CallOpSetCollectionInterface
+ : public std::enable_shared_from_this<CallOpSetCollectionInterface> {};
+
/// An abstract collection of call ops, used to generate the
/// grpc_call_op structure to pass down to the lower layers,
/// and as it is-a CompletionQueueTag, also massages the final
@@ -488,8 +499,14 @@ class CallOpSetInterface : public CompletionQueueTag {
max_message_size_ = max_message_size;
}
+ /// Mark this as belonging to a collection if needed
+ void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) {
+ collection_ = collection;
+ }
+
protected:
int max_message_size_;
+ std::shared_ptr<CallOpSetCollectionInterface> collection_;
};
/// Primary implementaiton of CallOpSetInterface.
@@ -527,6 +544,7 @@ class CallOpSet : public CallOpSetInterface,
this->Op5::FinishOp(status, max_message_size_);
this->Op6::FinishOp(status, max_message_size_);
*tag = return_tag_;
+ collection_.reset(); // drop the ref at this point
return true;
}