From 38004a8e399118b523fdebb7d7c1144355594b01 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 5 Feb 2016 13:40:22 -0800 Subject: Wrap groups of related CallOpSet's into a ref-counted structure whenever appropriate so as to avoid any unintentional free-before-use problems. Potential performance issue: this triggers an additional allocation for each Async call initiation, along with the cost of ref-counting shared_ptr . But this is worth it for the additional safety provided here without any change to the exposed C++ API. --- include/grpc++/impl/codegen/async_stream.h | 341 +++++++++++++++---------- include/grpc++/impl/codegen/async_unary_call.h | 96 ++++--- include/grpc++/impl/codegen/call.h | 15 ++ 3 files changed, 288 insertions(+), 164 deletions(-) (limited to 'include') diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index b0410485f8..e449b92bb1 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -105,49 +105,62 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface { ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); + : context_(context), + call_(channel->CreateCall(method, context, cq)), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + collection_->init_ops_.set_output_tag(tag); + collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); // TODO(ctiller): don't assert - GPR_ASSERT(init_ops_.SendMessage(request).ok()); - init_ops_.ClientSendClose(); - call_.PerformOps(&init_ops_); + GPR_ASSERT(collection_->init_ops_.SendMessage(request).ok()); + collection_->init_ops_.ClientSendClose(); + call_.PerformOps(&collection_->init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&collection_->meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); + collection_->read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); + collection_->read_ops_.RecvInitialMetadata(context_); } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); + collection_->read_ops_.RecvMessage(msg); + call_.PerformOps(&collection_->read_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); + collection_->finish_ops_.RecvInitialMetadata(context_); } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&collection_->finish_ops_); } private: ClientContext* context_; Call call_; - CallOpSet - init_ops_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + init_ops_.SetCollection(shared_from_this()); + meta_ops_.SetCollection(shared_from_this()); + read_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this()); + } + CallOpSet init_ops_; + CallOpSet meta_ops_; + CallOpSet> read_ops_; + CallOpSet finish_ops_; + }; + std::shared_ptr collection_; }; /// Common interface for client side asynchronous writing. @@ -168,53 +181,67 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface { ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, R* response, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - finish_ops_.RecvMessage(response); - - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); + : context_(context), + call_(channel->CreateCall(method, context, cq)), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + collection_->finish_ops_.RecvMessage(response); + + collection_->init_ops_.set_output_tag(tag); + collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&collection_->init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&collection_->meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); + collection_->write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); + GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&collection_->write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); + collection_->writes_done_ops_.set_output_tag(tag); + collection_->writes_done_ops_.ClientSendClose(); + call_.PerformOps(&collection_->writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); + collection_->finish_ops_.RecvInitialMetadata(context_); } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&collection_->finish_ops_); } private: ClientContext* context_; Call call_; - CallOpSet init_ops_; - CallOpSet meta_ops_; - CallOpSet write_ops_; - CallOpSet writes_done_ops_; - CallOpSet finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + init_ops_.SetCollection(shared_from_this()); + meta_ops_.SetCollection(shared_from_this()); + write_ops_.SetCollection(shared_from_this()); + writes_done_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this(); + } + CallOpSet init_ops_; + CallOpSet meta_ops_; + CallOpSet write_ops_; + CallOpSet writes_done_ops_; + CallOpSet finish_ops_; + }; + std::shared_ptr collection_; }; /// Client-side interface for asynchronous bi-directional streaming. @@ -236,60 +263,75 @@ class ClientAsyncReaderWriter GRPC_FINAL ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); + : context_(context), + call_(channel->CreateCall(method, context, cq)), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + collection_->init_ops_.set_output_tag(tag); + collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&collection_->init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&collection_->meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); + collection_->read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); + collection_->read_ops_.RecvInitialMetadata(context_); } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); + collection_->read_ops_.RecvMessage(msg); + call_.PerformOps(&collection_->read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); + collection_->write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); + GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&collection_->write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); + collection_->writes_done_ops_.set_output_tag(tag); + collection_->writes_done_ops_.ClientSendClose(); + call_.PerformOps(&collection_->writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); + collection_->finish_ops_.RecvInitialMetadata(context_); } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&collection_->finish_ops_); } private: ClientContext* context_; Call call_; - CallOpSet init_ops_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet write_ops_; - CallOpSet writes_done_ops_; - CallOpSet finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + init_ops_.SetCollection(shared_from_this()); + meta_ops_.SetCollection(shared_from_this()); + read_ops_.SetCollection(shared_from_this()); + write_ops_.SetCollection(shared_from_this()); + writes_done_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this(); + } + CallOpSet init_ops_; + CallOpSet meta_ops_; + CallOpSet> read_ops_; + CallOpSet write_ops_; + CallOpSet writes_done_ops_; + CallOpSet finish_ops_; + }; + std::shared_ptr collection_; }; template @@ -297,48 +339,53 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncReaderInterface { public: explicit ServerAsyncReader(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + : call_(nullptr, nullptr, nullptr), + ctx_(ctx), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + } void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); + call_.PerformOps(&collection_->meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); + collection_->read_ops_.set_output_tag(tag); + collection_->read_ops_.RecvMessage(msg); + call_.PerformOps(&collection_->read_ops_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.ok()) { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, - finish_ops_.SendMessage(msg)); + collection_->finish_ops_.ServerSendStatus( + ctx_->trailing_metadata_, collection_->finish_ops_.SendMessage(msg)); } else { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, + status); } - call_.PerformOps(&finish_ops_); + call_.PerformOps(&collection_->finish_ops_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.ok()); - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&collection_->finish_ops_); } private: @@ -346,10 +393,19 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + meta_ops_.SetCollection(shared_from_this()); + read_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this(); + } + CallOpSet meta_ops_; + CallOpSet> read_ops_; + CallOpSet finish_ops_; + }; + std::shared_ptr collection_; }; template @@ -357,36 +413,40 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncWriterInterface { public: explicit ServerAsyncWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + : call_(nullptr, nullptr, nullptr), + ctx_(ctx), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + } void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); + call_.PerformOps(&collection_->meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); + collection_->write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); + GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&collection_->write_ops_); } void Finish(const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&collection_->finish_ops_); } private: @@ -394,9 +454,18 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet write_ops_; - CallOpSet finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + meta_ops_.SetCollection(shared_from_this()); + write_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this(); + } + CallOpSet meta_ops_; + CallOpSet write_ops_; + CallOpSet finish_ops_; + }; + std::shared_ptr collection_; }; /// Server-side interface for asynchronous bi-directional streaming. @@ -406,42 +475,46 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncReaderInterface { public: explicit ServerAsyncReaderWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + : call_(nullptr, nullptr, nullptr), + ctx_(ctx), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + } void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); + call_.PerformOps(&collection_->meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); + collection_->read_ops_.set_output_tag(tag); + collection_->read_ops_.RecvMessage(msg); + call_.PerformOps(&collection_->read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); + collection_->write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); + GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&collection_->write_ops_); } void Finish(const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&collection_->finish_ops_); } private: @@ -451,10 +524,20 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpSet meta_ops_; - CallOpSet> read_ops_; - CallOpSet write_ops_; - CallOpSet finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + meta_ops_.SetCollection(shared_from_this()); + read_ops_.SetCollection(shared_from_this()); + write_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this(); + } + CallOpSet meta_ops_; + CallOpSet> read_ops_; + CallOpSet write_ops_; + CallOpSet finish_ops_; + }; + std::shared_ptr collection_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 481b20b535..c1c637e928 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -62,40 +62,53 @@ class ClientAsyncResponseReader GRPC_FINAL ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.SendInitialMetadata(context->send_initial_metadata_); + : context_(context), + call_(channel->CreateCall(method, context, cq)), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_); // TODO(ctiller): don't assert - GPR_ASSERT(init_buf_.SendMessage(request).ok()); - init_buf_.ClientSendClose(); - call_.PerformOps(&init_buf_); + GPR_ASSERT(collection_->init_buf_.SendMessage(request).ok()); + collection_->init_buf_.ClientSendClose(); + call_.PerformOps(&collection_->init_buf_); } void ReadInitialMetadata(void* tag) { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.set_output_tag(tag); - meta_buf_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); + collection_->meta_buf_.set_output_tag(tag); + collection_->meta_buf_.RecvInitialMetadata(context_); + call_.PerformOps(&collection_->meta_buf_); } void Finish(R* msg, Status* status, void* tag) { - finish_buf_.set_output_tag(tag); + collection_->finish_buf_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.RecvInitialMetadata(context_); + collection_->finish_buf_.RecvInitialMetadata(context_); } - finish_buf_.RecvMessage(msg); - finish_buf_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); + collection_->finish_buf_.RecvMessage(msg); + collection_->finish_buf_.ClientRecvStatus(context_, status); + call_.PerformOps(&collection_->finish_buf_); } private: ClientContext* context_; Call call_; - SneakyCallOpSet init_buf_; - CallOpSet meta_buf_; - CallOpSet, - CallOpClientRecvStatus> finish_buf_; + + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + init_buf_.SetCollection(shared_from_this()); + meta_buf_.SetCollection(shared_from_this()); + finish_buf_.SetCollection(shared_from_this()); + } + SneakyCallOpSet init_buf_; + CallOpSet meta_buf_; + CallOpSet, + CallOpClientRecvStatus> finish_buf_; + }; + std::shared_ptr collection_; }; template @@ -103,42 +116,47 @@ class ServerAsyncResponseWriter GRPC_FINAL : public ServerAsyncStreamingInterface { public: explicit ServerAsyncResponseWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + : call_(nullptr, nullptr, nullptr), + ctx_(ctx), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + } void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.set_output_tag(tag); - meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->meta_buf_.set_output_tag(tag); + collection_->meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); + call_.PerformOps(&collection_->meta_buf_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_buf_.set_output_tag(tag); + collection_->finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.ok()) { - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, - finish_buf_.SendMessage(msg)); + collection_->finish_buf_.ServerSendStatus( + ctx_->trailing_metadata_, collection_->finish_buf_.SendMessage(msg)); } else { - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); + collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, + status); } - call_.PerformOps(&finish_buf_); + call_.PerformOps(&collection_->finish_buf_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.ok()); - finish_buf_.set_output_tag(tag); + collection_->finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&collection_->finish_buf_); } private: @@ -146,9 +164,17 @@ class ServerAsyncResponseWriter GRPC_FINAL Call call_; ServerContext* ctx_; - CallOpSet meta_buf_; - CallOpSet finish_buf_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + meta_buf_.SetCollection(shared_from_this()); + finish_buf_.SetCollection(shared_from_this()); + } + CallOpSet meta_buf_; + CallOpSet finish_buf_; + }; + std::shared_ptr collection_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 1e06768ac4..94a8243ac6 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -472,6 +472,15 @@ class CallOpClientRecvStatus { size_t status_details_capacity_; }; +/// An abstract collection of CallOpSet's, to be used whenever +/// CallOpSet objects must be thought of as a group. Each member +/// of the group should have a shared_ptr back to the collection, +/// as will the object that instantiates the collection, allowing +/// for automatic ref-counting. In practice, any actual use should +/// derive from this base class +class CallOpSetCollectionInterface + : public std::enable_shared_from_this {}; + /// An abstract collection of call ops, used to generate the /// grpc_call_op structure to pass down to the lower layers, /// and as it is-a CompletionQueueTag, also massages the final @@ -488,8 +497,14 @@ class CallOpSetInterface : public CompletionQueueTag { max_message_size_ = max_message_size; } + /// Mark this as belonging to a collection + void SetCollection(std::shared_ptr collection) { + collection_ = collection; + } + protected: int max_message_size_; + std::shared_ptr collection_; }; /// Primary implementaiton of CallOpSetInterface. -- cgit v1.2.3