From 38004a8e399118b523fdebb7d7c1144355594b01 Mon Sep 17 00:00:00 2001 From: Vijay Pai <vpai@google.com> Date: Fri, 5 Feb 2016 13:40:22 -0800 Subject: Wrap groups of related CallOpSet's into a ref-counted structure whenever appropriate so as to avoid any unintentional free-before-use problems. Potential performance issue: this triggers an additional allocation for each Async call initiation, along with the cost of ref-counting shared_ptr . But this is worth it for the additional safety provided here without any change to the exposed C++ API. --- include/grpc++/impl/codegen/async_stream.h | 341 +++++++++++++++---------- include/grpc++/impl/codegen/async_unary_call.h | 96 ++++--- include/grpc++/impl/codegen/call.h | 15 ++ 3 files changed, 288 insertions(+), 164 deletions(-) diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index b0410485f8..e449b92bb1 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -105,49 +105,62 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> { ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); + : context_(context), + call_(channel->CreateCall(method, context, cq)), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + collection_->init_ops_.set_output_tag(tag); + collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); // TODO(ctiller): don't assert - GPR_ASSERT(init_ops_.SendMessage(request).ok()); - init_ops_.ClientSendClose(); - call_.PerformOps(&init_ops_); + GPR_ASSERT(collection_->init_ops_.SendMessage(request).ok()); + collection_->init_ops_.ClientSendClose(); + call_.PerformOps(&collection_->init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&collection_->meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); + collection_->read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); + collection_->read_ops_.RecvInitialMetadata(context_); } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); + collection_->read_ops_.RecvMessage(msg); + call_.PerformOps(&collection_->read_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); + collection_->finish_ops_.RecvInitialMetadata(context_); } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&collection_->finish_ops_); } private: ClientContext* context_; Call call_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> - init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + init_ops_.SetCollection(shared_from_this()); + meta_ops_.SetCollection(shared_from_this()); + read_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this()); + } + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; /// Common interface for client side asynchronous writing. @@ -168,53 +181,67 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, R* response, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - finish_ops_.RecvMessage(response); - - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); + : context_(context), + call_(channel->CreateCall(method, context, cq)), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + collection_->finish_ops_.RecvMessage(response); + + collection_->init_ops_.set_output_tag(tag); + collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&collection_->init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&collection_->meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); + collection_->write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); + GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&collection_->write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); + collection_->writes_done_ops_.set_output_tag(tag); + collection_->writes_done_ops_.ClientSendClose(); + call_.PerformOps(&collection_->writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); + collection_->finish_ops_.RecvInitialMetadata(context_); } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&collection_->finish_ops_); } private: ClientContext* context_; Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, - CallOpClientRecvStatus> finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + init_ops_.SetCollection(shared_from_this()); + meta_ops_.SetCollection(shared_from_this()); + write_ops_.SetCollection(shared_from_this()); + writes_done_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this(); + } + CallOpSet<CallOpSendInitialMetadata> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpSendMessage> write_ops_; + CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, + CallOpClientRecvStatus> finish_ops_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; /// Client-side interface for asynchronous bi-directional streaming. @@ -236,60 +263,75 @@ class ClientAsyncReaderWriter GRPC_FINAL ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); + : context_(context), + call_(channel->CreateCall(method, context, cq)), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + collection_->init_ops_.set_output_tag(tag); + collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&collection_->init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&collection_->meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); + collection_->read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); + collection_->read_ops_.RecvInitialMetadata(context_); } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); + collection_->read_ops_.RecvMessage(msg); + call_.PerformOps(&collection_->read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); + collection_->write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); + GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&collection_->write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); + collection_->writes_done_ops_.set_output_tag(tag); + collection_->writes_done_ops_.ClientSendClose(); + call_.PerformOps(&collection_->writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); + collection_->finish_ops_.RecvInitialMetadata(context_); } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&collection_->finish_ops_); } private: ClientContext* context_; Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + init_ops_.SetCollection(shared_from_this()); + meta_ops_.SetCollection(shared_from_this()); + read_ops_.SetCollection(shared_from_this()); + write_ops_.SetCollection(shared_from_this()); + writes_done_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this(); + } + CallOpSet<CallOpSendInitialMetadata> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendMessage> write_ops_; + CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; template <class W, class R> @@ -297,48 +339,53 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncReaderInterface<R> { public: explicit ServerAsyncReader(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + : call_(nullptr, nullptr, nullptr), + ctx_(ctx), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + } void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); + call_.PerformOps(&collection_->meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); + collection_->read_ops_.set_output_tag(tag); + collection_->read_ops_.RecvMessage(msg); + call_.PerformOps(&collection_->read_ops_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.ok()) { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, - finish_ops_.SendMessage(msg)); + collection_->finish_ops_.ServerSendStatus( + ctx_->trailing_metadata_, collection_->finish_ops_.SendMessage(msg)); } else { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, + status); } - call_.PerformOps(&finish_ops_); + call_.PerformOps(&collection_->finish_ops_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.ok()); - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&collection_->finish_ops_); } private: @@ -346,10 +393,19 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + meta_ops_.SetCollection(shared_from_this()); + read_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this(); + } + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> finish_ops_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; template <class W> @@ -357,36 +413,40 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncWriterInterface<W> { public: explicit ServerAsyncWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + : call_(nullptr, nullptr, nullptr), + ctx_(ctx), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + } void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); + call_.PerformOps(&collection_->meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); + collection_->write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); + GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&collection_->write_ops_); } void Finish(const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&collection_->finish_ops_); } private: @@ -394,9 +454,18 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + meta_ops_.SetCollection(shared_from_this()); + write_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this(); + } + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; /// Server-side interface for asynchronous bi-directional streaming. @@ -406,42 +475,46 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncReaderInterface<R> { public: explicit ServerAsyncReaderWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + : call_(nullptr, nullptr, nullptr), + ctx_(ctx), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + } void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); + call_.PerformOps(&collection_->meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); + collection_->read_ops_.set_output_tag(tag); + collection_->read_ops_.RecvMessage(msg); + call_.PerformOps(&collection_->read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); + collection_->write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); + GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&collection_->write_ops_); } void Finish(const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&collection_->finish_ops_); } private: @@ -451,10 +524,20 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + meta_ops_.SetCollection(shared_from_this()); + read_ops_.SetCollection(shared_from_this()); + write_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this(); + } + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 481b20b535..c1c637e928 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -62,40 +62,53 @@ class ClientAsyncResponseReader GRPC_FINAL ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.SendInitialMetadata(context->send_initial_metadata_); + : context_(context), + call_(channel->CreateCall(method, context, cq)), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_); // TODO(ctiller): don't assert - GPR_ASSERT(init_buf_.SendMessage(request).ok()); - init_buf_.ClientSendClose(); - call_.PerformOps(&init_buf_); + GPR_ASSERT(collection_->init_buf_.SendMessage(request).ok()); + collection_->init_buf_.ClientSendClose(); + call_.PerformOps(&collection_->init_buf_); } void ReadInitialMetadata(void* tag) { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.set_output_tag(tag); - meta_buf_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); + collection_->meta_buf_.set_output_tag(tag); + collection_->meta_buf_.RecvInitialMetadata(context_); + call_.PerformOps(&collection_->meta_buf_); } void Finish(R* msg, Status* status, void* tag) { - finish_buf_.set_output_tag(tag); + collection_->finish_buf_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.RecvInitialMetadata(context_); + collection_->finish_buf_.RecvInitialMetadata(context_); } - finish_buf_.RecvMessage(msg); - finish_buf_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); + collection_->finish_buf_.RecvMessage(msg); + collection_->finish_buf_.ClientRecvStatus(context_, status); + call_.PerformOps(&collection_->finish_buf_); } private: ClientContext* context_; Call call_; - SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> init_buf_; - CallOpSet<CallOpRecvInitialMetadata> meta_buf_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, - CallOpClientRecvStatus> finish_buf_; + + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + init_buf_.SetCollection(shared_from_this()); + meta_buf_.SetCollection(shared_from_this()); + finish_buf_.SetCollection(shared_from_this()); + } + SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> init_buf_; + CallOpSet<CallOpRecvInitialMetadata> meta_buf_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, + CallOpClientRecvStatus> finish_buf_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; template <class W> @@ -103,42 +116,47 @@ class ServerAsyncResponseWriter GRPC_FINAL : public ServerAsyncStreamingInterface { public: explicit ServerAsyncResponseWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + : call_(nullptr, nullptr, nullptr), + ctx_(ctx), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + } void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.set_output_tag(tag); - meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->meta_buf_.set_output_tag(tag); + collection_->meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); + call_.PerformOps(&collection_->meta_buf_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_buf_.set_output_tag(tag); + collection_->finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.ok()) { - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, - finish_buf_.SendMessage(msg)); + collection_->finish_buf_.ServerSendStatus( + ctx_->trailing_metadata_, collection_->finish_buf_.SendMessage(msg)); } else { - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); + collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, + status); } - call_.PerformOps(&finish_buf_); + call_.PerformOps(&collection_->finish_buf_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.ok()); - finish_buf_.set_output_tag(tag); + collection_->finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&collection_->finish_buf_); } private: @@ -146,9 +164,17 @@ class ServerAsyncResponseWriter GRPC_FINAL Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_buf_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> finish_buf_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + meta_buf_.SetCollection(shared_from_this()); + finish_buf_.SetCollection(shared_from_this()); + } + CallOpSet<CallOpSendInitialMetadata> meta_buf_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> finish_buf_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 1e06768ac4..94a8243ac6 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -472,6 +472,15 @@ class CallOpClientRecvStatus { size_t status_details_capacity_; }; +/// An abstract collection of CallOpSet's, to be used whenever +/// CallOpSet objects must be thought of as a group. Each member +/// of the group should have a shared_ptr back to the collection, +/// as will the object that instantiates the collection, allowing +/// for automatic ref-counting. In practice, any actual use should +/// derive from this base class +class CallOpSetCollectionInterface + : public std::enable_shared_from_this<CallOpSetCollectionInterface> {}; + /// An abstract collection of call ops, used to generate the /// grpc_call_op structure to pass down to the lower layers, /// and as it is-a CompletionQueueTag, also massages the final @@ -488,8 +497,14 @@ class CallOpSetInterface : public CompletionQueueTag { max_message_size_ = max_message_size; } + /// Mark this as belonging to a collection + void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) { + collection_ = collection; + } + protected: int max_message_size_; + std::shared_ptr<CallOpSetCollectionInterface> collection_; }; /// Primary implementaiton of CallOpSetInterface. -- cgit v1.2.3 From 7e3f9b0178eda9f4fe5aa407294e890f2953b317 Mon Sep 17 00:00:00 2001 From: Vijay Pai <vpai@google.com> Date: Fri, 5 Feb 2016 13:43:17 -0800 Subject: Forgot to include these edits in the last commit --- include/grpc++/impl/codegen/async_stream.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index e449b92bb1..e13bab81e2 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -232,7 +232,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { meta_ops_.SetCollection(shared_from_this()); write_ops_.SetCollection(shared_from_this()); writes_done_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this(); + finish_ops_.SetCollection(shared_from_this()); } CallOpSet<CallOpSendInitialMetadata> init_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_; @@ -322,7 +322,7 @@ class ClientAsyncReaderWriter GRPC_FINAL read_ops_.SetCollection(shared_from_this()); write_ops_.SetCollection(shared_from_this()); writes_done_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this(); + finish_ops_.SetCollection(shared_from_this()); } CallOpSet<CallOpSendInitialMetadata> init_ops_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_; @@ -398,7 +398,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, void SetCollection() { meta_ops_.SetCollection(shared_from_this()); read_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this(); + finish_ops_.SetCollection(shared_from_this()); } CallOpSet<CallOpSendInitialMetadata> meta_ops_; CallOpSet<CallOpRecvMessage<R>> read_ops_; @@ -459,7 +459,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, void SetCollection() { meta_ops_.SetCollection(shared_from_this()); write_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this(); + finish_ops_.SetCollection(shared_from_this()); } CallOpSet<CallOpSendInitialMetadata> meta_ops_; CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; @@ -530,7 +530,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, meta_ops_.SetCollection(shared_from_this()); read_ops_.SetCollection(shared_from_this()); write_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this(); + finish_ops_.SetCollection(shared_from_this()); } CallOpSet<CallOpSendInitialMetadata> meta_ops_; CallOpSet<CallOpRecvMessage<R>> read_ops_; -- cgit v1.2.3 From 5506bea349ccac69e9ee0410647ea8793cbee6f2 Mon Sep 17 00:00:00 2001 From: Vijay Pai <vpai@google.com> Date: Fri, 5 Feb 2016 14:23:24 -0800 Subject: Undo much of the previous commits so that only CallOpSet groups with a Sneaky member are in a collection. --- include/grpc++/impl/codegen/async_stream.h | 341 ++++++++++--------------- include/grpc++/impl/codegen/async_unary_call.h | 47 ++-- include/grpc++/impl/codegen/call.h | 6 +- 3 files changed, 150 insertions(+), 244 deletions(-) diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index e13bab81e2..b0410485f8 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -105,62 +105,49 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> { ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request, void* tag) - : context_(context), - call_(channel->CreateCall(method, context, cq)), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - collection_->init_ops_.set_output_tag(tag); - collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); + : context_(context), call_(channel->CreateCall(method, context, cq)) { + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); // TODO(ctiller): don't assert - GPR_ASSERT(collection_->init_ops_.SendMessage(request).ok()); - collection_->init_ops_.ClientSendClose(); - call_.PerformOps(&collection_->init_ops_); + GPR_ASSERT(init_ops_.SendMessage(request).ok()); + init_ops_.ClientSendClose(); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - collection_->meta_ops_.set_output_tag(tag); - collection_->meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&collection_->meta_ops_); + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - collection_->read_ops_.set_output_tag(tag); + read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - collection_->read_ops_.RecvInitialMetadata(context_); + read_ops_.RecvInitialMetadata(context_); } - collection_->read_ops_.RecvMessage(msg); - call_.PerformOps(&collection_->read_ops_); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - collection_->finish_ops_.RecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - collection_->finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&collection_->finish_ops_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; Call call_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - init_ops_.SetCollection(shared_from_this()); - meta_ops_.SetCollection(shared_from_this()); - read_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; }; /// Common interface for client side asynchronous writing. @@ -181,67 +168,53 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, R* response, void* tag) - : context_(context), - call_(channel->CreateCall(method, context, cq)), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - collection_->finish_ops_.RecvMessage(response); - - collection_->init_ops_.set_output_tag(tag); - collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&collection_->init_ops_); + : context_(context), call_(channel->CreateCall(method, context, cq)) { + finish_ops_.RecvMessage(response); + + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - collection_->meta_ops_.set_output_tag(tag); - collection_->meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&collection_->meta_ops_); + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - collection_->write_ops_.set_output_tag(tag); + write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&collection_->write_ops_); + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - collection_->writes_done_ops_.set_output_tag(tag); - collection_->writes_done_ops_.ClientSendClose(); - call_.PerformOps(&collection_->writes_done_ops_); + writes_done_ops_.set_output_tag(tag); + writes_done_ops_.ClientSendClose(); + call_.PerformOps(&writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - collection_->finish_ops_.RecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - collection_->finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&collection_->finish_ops_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; Call call_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - init_ops_.SetCollection(shared_from_this()); - meta_ops_.SetCollection(shared_from_this()); - write_ops_.SetCollection(shared_from_this()); - writes_done_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, - CallOpClientRecvStatus> finish_ops_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpSendMessage> write_ops_; + CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, + CallOpClientRecvStatus> finish_ops_; }; /// Client-side interface for asynchronous bi-directional streaming. @@ -263,75 +236,60 @@ class ClientAsyncReaderWriter GRPC_FINAL ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, void* tag) - : context_(context), - call_(channel->CreateCall(method, context, cq)), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - collection_->init_ops_.set_output_tag(tag); - collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&collection_->init_ops_); + : context_(context), call_(channel->CreateCall(method, context, cq)) { + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - collection_->meta_ops_.set_output_tag(tag); - collection_->meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&collection_->meta_ops_); + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - collection_->read_ops_.set_output_tag(tag); + read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - collection_->read_ops_.RecvInitialMetadata(context_); + read_ops_.RecvInitialMetadata(context_); } - collection_->read_ops_.RecvMessage(msg); - call_.PerformOps(&collection_->read_ops_); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - collection_->write_ops_.set_output_tag(tag); + write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&collection_->write_ops_); + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - collection_->writes_done_ops_.set_output_tag(tag); - collection_->writes_done_ops_.ClientSendClose(); - call_.PerformOps(&collection_->writes_done_ops_); + writes_done_ops_.set_output_tag(tag); + writes_done_ops_.ClientSendClose(); + call_.PerformOps(&writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - collection_->finish_ops_.RecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - collection_->finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&collection_->finish_ops_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; Call call_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - init_ops_.SetCollection(shared_from_this()); - meta_ops_.SetCollection(shared_from_this()); - read_ops_.SetCollection(shared_from_this()); - write_ops_.SetCollection(shared_from_this()); - writes_done_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendMessage> write_ops_; + CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; }; template <class W, class R> @@ -339,53 +297,48 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncReaderInterface<R> { public: explicit ServerAsyncReader(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), - ctx_(ctx), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - } + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - collection_->meta_ops_.set_output_tag(tag); - collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&collection_->meta_ops_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - collection_->read_ops_.set_output_tag(tag); - collection_->read_ops_.RecvMessage(msg); - call_.PerformOps(&collection_->read_ops_); + read_ops_.set_output_tag(tag); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Finish(const W& msg, const Status& status, void* tag) { - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.ok()) { - collection_->finish_ops_.ServerSendStatus( - ctx_->trailing_metadata_, collection_->finish_ops_.SendMessage(msg)); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, + finish_ops_.SendMessage(msg)); } else { - collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, - status); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); } - call_.PerformOps(&collection_->finish_ops_); + call_.PerformOps(&finish_ops_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.ok()); - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&collection_->finish_ops_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -393,19 +346,10 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - meta_ops_.SetCollection(shared_from_this()); - read_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> finish_ops_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> finish_ops_; }; template <class W> @@ -413,40 +357,36 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncWriterInterface<W> { public: explicit ServerAsyncWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), - ctx_(ctx), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - } + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - collection_->meta_ops_.set_output_tag(tag); - collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&collection_->meta_ops_); + call_.PerformOps(&meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - collection_->write_ops_.set_output_tag(tag); + write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert - GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&collection_->write_ops_); + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); } void Finish(const Status& status, void* tag) { - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&collection_->finish_ops_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -454,18 +394,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - meta_ops_.SetCollection(shared_from_this()); - write_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; /// Server-side interface for asynchronous bi-directional streaming. @@ -475,46 +406,42 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncReaderInterface<R> { public: explicit ServerAsyncReaderWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), - ctx_(ctx), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - } + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - collection_->meta_ops_.set_output_tag(tag); - collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&collection_->meta_ops_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - collection_->read_ops_.set_output_tag(tag); - collection_->read_ops_.RecvMessage(msg); - call_.PerformOps(&collection_->read_ops_); + read_ops_.set_output_tag(tag); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - collection_->write_ops_.set_output_tag(tag); + write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert - GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&collection_->write_ops_); + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); } void Finish(const Status& status, void* tag) { - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&collection_->finish_ops_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -524,20 +451,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - meta_ops_.SetCollection(shared_from_this()); - read_ops_.SetCollection(shared_from_this()); - write_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index c1c637e928..f78ba891e8 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -116,47 +116,42 @@ class ServerAsyncResponseWriter GRPC_FINAL : public ServerAsyncStreamingInterface { public: explicit ServerAsyncResponseWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), - ctx_(ctx), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - } + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - collection_->meta_buf_.set_output_tag(tag); - collection_->meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); + meta_buf_.set_output_tag(tag); + meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&collection_->meta_buf_); + call_.PerformOps(&meta_buf_); } void Finish(const W& msg, const Status& status, void* tag) { - collection_->finish_buf_.set_output_tag(tag); + finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); + finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.ok()) { - collection_->finish_buf_.ServerSendStatus( - ctx_->trailing_metadata_, collection_->finish_buf_.SendMessage(msg)); + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, + finish_buf_.SendMessage(msg)); } else { - collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, - status); + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); } - call_.PerformOps(&collection_->finish_buf_); + call_.PerformOps(&finish_buf_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.ok()); - collection_->finish_buf_.set_output_tag(tag); + finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); + finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&collection_->finish_buf_); + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); } private: @@ -164,17 +159,9 @@ class ServerAsyncResponseWriter GRPC_FINAL Call call_; ServerContext* ctx_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - meta_buf_.SetCollection(shared_from_this()); - finish_buf_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata> meta_buf_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> finish_buf_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata> meta_buf_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> finish_buf_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 94a8243ac6..03280b7e3e 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -477,7 +477,9 @@ class CallOpClientRecvStatus { /// of the group should have a shared_ptr back to the collection, /// as will the object that instantiates the collection, allowing /// for automatic ref-counting. In practice, any actual use should -/// derive from this base class +/// derive from this base class. This is specifically necessary if +/// some of the CallOpSet's in the collection are "Sneaky" and don't +/// report back to the C++ layer CQ operations class CallOpSetCollectionInterface : public std::enable_shared_from_this<CallOpSetCollectionInterface> {}; @@ -497,7 +499,7 @@ class CallOpSetInterface : public CompletionQueueTag { max_message_size_ = max_message_size; } - /// Mark this as belonging to a collection + /// Mark this as belonging to a collection if needed void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) { collection_ = collection; } -- cgit v1.2.3 From 2b5638668e6c72016028b1fe11b78d78711c29d8 Mon Sep 17 00:00:00 2001 From: Vijay Pai <vpai@google.com> Date: Fri, 5 Feb 2016 15:11:36 -0800 Subject: Drop the ref --- include/grpc++/impl/codegen/call.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 03280b7e3e..d342a3a25a 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -544,6 +544,7 @@ class CallOpSet : public CallOpSetInterface, this->Op5::FinishOp(status, max_message_size_); this->Op6::FinishOp(status, max_message_size_); *tag = return_tag_; + SetCollection(nullptr); // drop the ref at this point return true; } -- cgit v1.2.3 From bedf57fe8c323c2e26a2ad64e29521a353343a6b Mon Sep 17 00:00:00 2001 From: Vijay Pai <vpai@google.com> Date: Fri, 5 Feb 2016 16:45:54 -0800 Subject: Ref the collection only when it will be used (and later finalized) --- include/grpc++/impl/codegen/async_unary_call.h | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index f78ba891e8..f3c75dc3b1 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -65,7 +65,7 @@ class ClientAsyncResponseReader GRPC_FINAL : context_(context), call_(channel->CreateCall(method, context, cq)), collection_(new CallOpSetCollection) { - collection_->SetCollection(); + collection_->init_buf_.SetCollection(collection_); collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_); // TODO(ctiller): don't assert GPR_ASSERT(collection_->init_buf_.SendMessage(request).ok()); @@ -76,12 +76,14 @@ class ClientAsyncResponseReader GRPC_FINAL void ReadInitialMetadata(void* tag) { GPR_ASSERT(!context_->initial_metadata_received_); + collection_->meta_buf_.SetCollection(collection_); collection_->meta_buf_.set_output_tag(tag); collection_->meta_buf_.RecvInitialMetadata(context_); call_.PerformOps(&collection_->meta_buf_); } void Finish(R* msg, Status* status, void* tag) { + collection_->finish_buf_.SetCollection(collection_); collection_->finish_buf_.set_output_tag(tag); if (!context_->initial_metadata_received_) { collection_->finish_buf_.RecvInitialMetadata(context_); @@ -97,11 +99,6 @@ class ClientAsyncResponseReader GRPC_FINAL class CallOpSetCollection : public CallOpSetCollectionInterface { public: - void SetCollection() { - init_buf_.SetCollection(shared_from_this()); - meta_buf_.SetCollection(shared_from_this()); - finish_buf_.SetCollection(shared_from_this()); - } SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> init_buf_; CallOpSet<CallOpRecvInitialMetadata> meta_buf_; -- cgit v1.2.3 From c593ca017161e48367bf7182f01dac477109cf21 Mon Sep 17 00:00:00 2001 From: Vijay Pai <vpai@google.com> Date: Mon, 8 Feb 2016 11:38:42 -0800 Subject: reset the shared_ptr --- include/grpc++/impl/codegen/call.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index d342a3a25a..c075388266 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -544,7 +544,7 @@ class CallOpSet : public CallOpSetInterface, this->Op5::FinishOp(status, max_message_size_); this->Op6::FinishOp(status, max_message_size_); *tag = return_tag_; - SetCollection(nullptr); // drop the ref at this point + collection_.reset(); // drop the ref at this point return true; } -- cgit v1.2.3 From 30bf3ea4b94eb3ded9296e3e7825d94d402be8be Mon Sep 17 00:00:00 2001 From: Vijay Pai <vpai@google.com> Date: Mon, 8 Feb 2016 13:21:07 -0800 Subject: clang-format --- include/grpc++/impl/codegen/call.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index c075388266..e65349ddd3 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -544,7 +544,7 @@ class CallOpSet : public CallOpSetInterface, this->Op5::FinishOp(status, max_message_size_); this->Op6::FinishOp(status, max_message_size_); *tag = return_tag_; - collection_.reset(); // drop the ref at this point + collection_.reset(); // drop the ref at this point return true; } -- cgit v1.2.3