From 38004a8e399118b523fdebb7d7c1144355594b01 Mon Sep 17 00:00:00 2001
From: Vijay Pai <vpai@google.com>
Date: Fri, 5 Feb 2016 13:40:22 -0800
Subject: Wrap groups of related CallOpSet's into a ref-counted structure
 whenever appropriate so as to avoid any unintentional free-before-use
 problems.

Potential performance issue: this triggers an additional allocation
for each Async call initiation, along with the cost of ref-counting
shared_ptr . But this is worth it for the additional safety provided
here without any change to the exposed C++ API.
---
 include/grpc++/impl/codegen/async_stream.h     | 341 +++++++++++++++----------
 include/grpc++/impl/codegen/async_unary_call.h |  96 ++++---
 include/grpc++/impl/codegen/call.h             |  15 ++
 3 files changed, 288 insertions(+), 164 deletions(-)

diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index b0410485f8..e449b92bb1 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -105,49 +105,62 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
   ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
                     const RpcMethod& method, ClientContext* context,
                     const W& request, void* tag)
-      : context_(context), call_(channel->CreateCall(method, context, cq)) {
-    init_ops_.set_output_tag(tag);
-    init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+      : context_(context),
+        call_(channel->CreateCall(method, context, cq)),
+        collection_(new CallOpSetCollection) {
+    collection_->SetCollection();
+    collection_->init_ops_.set_output_tag(tag);
+    collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_);
     // TODO(ctiller): don't assert
-    GPR_ASSERT(init_ops_.SendMessage(request).ok());
-    init_ops_.ClientSendClose();
-    call_.PerformOps(&init_ops_);
+    GPR_ASSERT(collection_->init_ops_.SendMessage(request).ok());
+    collection_->init_ops_.ClientSendClose();
+    call_.PerformOps(&collection_->init_ops_);
   }
 
   void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
-    meta_ops_.set_output_tag(tag);
-    meta_ops_.RecvInitialMetadata(context_);
-    call_.PerformOps(&meta_ops_);
+    collection_->meta_ops_.set_output_tag(tag);
+    collection_->meta_ops_.RecvInitialMetadata(context_);
+    call_.PerformOps(&collection_->meta_ops_);
   }
 
   void Read(R* msg, void* tag) GRPC_OVERRIDE {
-    read_ops_.set_output_tag(tag);
+    collection_->read_ops_.set_output_tag(tag);
     if (!context_->initial_metadata_received_) {
-      read_ops_.RecvInitialMetadata(context_);
+      collection_->read_ops_.RecvInitialMetadata(context_);
     }
-    read_ops_.RecvMessage(msg);
-    call_.PerformOps(&read_ops_);
+    collection_->read_ops_.RecvMessage(msg);
+    call_.PerformOps(&collection_->read_ops_);
   }
 
   void Finish(Status* status, void* tag) GRPC_OVERRIDE {
-    finish_ops_.set_output_tag(tag);
+    collection_->finish_ops_.set_output_tag(tag);
     if (!context_->initial_metadata_received_) {
-      finish_ops_.RecvInitialMetadata(context_);
+      collection_->finish_ops_.RecvInitialMetadata(context_);
     }
-    finish_ops_.ClientRecvStatus(context_, status);
-    call_.PerformOps(&finish_ops_);
+    collection_->finish_ops_.ClientRecvStatus(context_, status);
+    call_.PerformOps(&collection_->finish_ops_);
   }
 
  private:
   ClientContext* context_;
   Call call_;
-  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
-      init_ops_;
-  CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
-  CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
-  CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
+  class CallOpSetCollection : public CallOpSetCollectionInterface {
+   public:
+    void SetCollection() {
+      init_ops_.SetCollection(shared_from_this());
+      meta_ops_.SetCollection(shared_from_this());
+      read_ops_.SetCollection(shared_from_this());
+      finish_ops_.SetCollection(shared_from_this());
+    }
+    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+              CallOpClientSendClose> init_ops_;
+    CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+    CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+    CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
+  };
+  std::shared_ptr<CallOpSetCollection> collection_;
 };
 
 /// Common interface for client side asynchronous writing.
@@ -168,53 +181,67 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
   ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
                     const RpcMethod& method, ClientContext* context,
                     R* response, void* tag)
-      : context_(context), call_(channel->CreateCall(method, context, cq)) {
-    finish_ops_.RecvMessage(response);
-
-    init_ops_.set_output_tag(tag);
-    init_ops_.SendInitialMetadata(context->send_initial_metadata_);
-    call_.PerformOps(&init_ops_);
+      : context_(context),
+        call_(channel->CreateCall(method, context, cq)),
+        collection_(new CallOpSetCollection) {
+    collection_->SetCollection();
+    collection_->finish_ops_.RecvMessage(response);
+
+    collection_->init_ops_.set_output_tag(tag);
+    collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+    call_.PerformOps(&collection_->init_ops_);
   }
 
   void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
-    meta_ops_.set_output_tag(tag);
-    meta_ops_.RecvInitialMetadata(context_);
-    call_.PerformOps(&meta_ops_);
+    collection_->meta_ops_.set_output_tag(tag);
+    collection_->meta_ops_.RecvInitialMetadata(context_);
+    call_.PerformOps(&collection_->meta_ops_);
   }
 
   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
-    write_ops_.set_output_tag(tag);
+    collection_->write_ops_.set_output_tag(tag);
     // TODO(ctiller): don't assert
-    GPR_ASSERT(write_ops_.SendMessage(msg).ok());
-    call_.PerformOps(&write_ops_);
+    GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok());
+    call_.PerformOps(&collection_->write_ops_);
   }
 
   void WritesDone(void* tag) GRPC_OVERRIDE {
-    writes_done_ops_.set_output_tag(tag);
-    writes_done_ops_.ClientSendClose();
-    call_.PerformOps(&writes_done_ops_);
+    collection_->writes_done_ops_.set_output_tag(tag);
+    collection_->writes_done_ops_.ClientSendClose();
+    call_.PerformOps(&collection_->writes_done_ops_);
   }
 
   void Finish(Status* status, void* tag) GRPC_OVERRIDE {
-    finish_ops_.set_output_tag(tag);
+    collection_->finish_ops_.set_output_tag(tag);
     if (!context_->initial_metadata_received_) {
-      finish_ops_.RecvInitialMetadata(context_);
+      collection_->finish_ops_.RecvInitialMetadata(context_);
     }
-    finish_ops_.ClientRecvStatus(context_, status);
-    call_.PerformOps(&finish_ops_);
+    collection_->finish_ops_.ClientRecvStatus(context_, status);
+    call_.PerformOps(&collection_->finish_ops_);
   }
 
  private:
   ClientContext* context_;
   Call call_;
-  CallOpSet<CallOpSendInitialMetadata> init_ops_;
-  CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
-  CallOpSet<CallOpSendMessage> write_ops_;
-  CallOpSet<CallOpClientSendClose> writes_done_ops_;
-  CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
-            CallOpClientRecvStatus> finish_ops_;
+  class CallOpSetCollection : public CallOpSetCollectionInterface {
+   public:
+    void SetCollection() {
+      init_ops_.SetCollection(shared_from_this());
+      meta_ops_.SetCollection(shared_from_this());
+      write_ops_.SetCollection(shared_from_this());
+      writes_done_ops_.SetCollection(shared_from_this());
+          finish_ops_.SetCollection(shared_from_this();
+    }
+    CallOpSet<CallOpSendInitialMetadata> init_ops_;
+    CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+    CallOpSet<CallOpSendMessage> write_ops_;
+    CallOpSet<CallOpClientSendClose> writes_done_ops_;
+    CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
+              CallOpClientRecvStatus> finish_ops_;
+  };
+  std::shared_ptr<CallOpSetCollection> collection_;
 };
 
 /// Client-side interface for asynchronous bi-directional streaming.
@@ -236,60 +263,75 @@ class ClientAsyncReaderWriter GRPC_FINAL
   ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq,
                           const RpcMethod& method, ClientContext* context,
                           void* tag)
-      : context_(context), call_(channel->CreateCall(method, context, cq)) {
-    init_ops_.set_output_tag(tag);
-    init_ops_.SendInitialMetadata(context->send_initial_metadata_);
-    call_.PerformOps(&init_ops_);
+      : context_(context),
+        call_(channel->CreateCall(method, context, cq)),
+        collection_(new CallOpSetCollection) {
+    collection_->SetCollection();
+    collection_->init_ops_.set_output_tag(tag);
+    collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+    call_.PerformOps(&collection_->init_ops_);
   }
 
   void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
-    meta_ops_.set_output_tag(tag);
-    meta_ops_.RecvInitialMetadata(context_);
-    call_.PerformOps(&meta_ops_);
+    collection_->meta_ops_.set_output_tag(tag);
+    collection_->meta_ops_.RecvInitialMetadata(context_);
+    call_.PerformOps(&collection_->meta_ops_);
   }
 
   void Read(R* msg, void* tag) GRPC_OVERRIDE {
-    read_ops_.set_output_tag(tag);
+    collection_->read_ops_.set_output_tag(tag);
     if (!context_->initial_metadata_received_) {
-      read_ops_.RecvInitialMetadata(context_);
+      collection_->read_ops_.RecvInitialMetadata(context_);
     }
-    read_ops_.RecvMessage(msg);
-    call_.PerformOps(&read_ops_);
+    collection_->read_ops_.RecvMessage(msg);
+    call_.PerformOps(&collection_->read_ops_);
   }
 
   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
-    write_ops_.set_output_tag(tag);
+    collection_->write_ops_.set_output_tag(tag);
     // TODO(ctiller): don't assert
-    GPR_ASSERT(write_ops_.SendMessage(msg).ok());
-    call_.PerformOps(&write_ops_);
+    GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok());
+    call_.PerformOps(&collection_->write_ops_);
   }
 
   void WritesDone(void* tag) GRPC_OVERRIDE {
-    writes_done_ops_.set_output_tag(tag);
-    writes_done_ops_.ClientSendClose();
-    call_.PerformOps(&writes_done_ops_);
+    collection_->writes_done_ops_.set_output_tag(tag);
+    collection_->writes_done_ops_.ClientSendClose();
+    call_.PerformOps(&collection_->writes_done_ops_);
   }
 
   void Finish(Status* status, void* tag) GRPC_OVERRIDE {
-    finish_ops_.set_output_tag(tag);
+    collection_->finish_ops_.set_output_tag(tag);
     if (!context_->initial_metadata_received_) {
-      finish_ops_.RecvInitialMetadata(context_);
+      collection_->finish_ops_.RecvInitialMetadata(context_);
     }
-    finish_ops_.ClientRecvStatus(context_, status);
-    call_.PerformOps(&finish_ops_);
+    collection_->finish_ops_.ClientRecvStatus(context_, status);
+    call_.PerformOps(&collection_->finish_ops_);
   }
 
  private:
   ClientContext* context_;
   Call call_;
-  CallOpSet<CallOpSendInitialMetadata> init_ops_;
-  CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
-  CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
-  CallOpSet<CallOpSendMessage> write_ops_;
-  CallOpSet<CallOpClientSendClose> writes_done_ops_;
-  CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
+  class CallOpSetCollection : public CallOpSetCollectionInterface {
+   public:
+    void SetCollection() {
+      init_ops_.SetCollection(shared_from_this());
+      meta_ops_.SetCollection(shared_from_this());
+      read_ops_.SetCollection(shared_from_this());
+      write_ops_.SetCollection(shared_from_this());
+      writes_done_ops_.SetCollection(shared_from_this());
+          finish_ops_.SetCollection(shared_from_this();
+    }
+    CallOpSet<CallOpSendInitialMetadata> init_ops_;
+    CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+    CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+    CallOpSet<CallOpSendMessage> write_ops_;
+    CallOpSet<CallOpClientSendClose> writes_done_ops_;
+    CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
+  };
+  std::shared_ptr<CallOpSetCollection> collection_;
 };
 
 template <class W, class R>
@@ -297,48 +339,53 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
                                      public AsyncReaderInterface<R> {
  public:
   explicit ServerAsyncReader(ServerContext* ctx)
-      : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+      : call_(nullptr, nullptr, nullptr),
+        ctx_(ctx),
+        collection_(new CallOpSetCollection) {
+    collection_->SetCollection();
+  }
 
   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    meta_ops_.set_output_tag(tag);
-    meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+    collection_->meta_ops_.set_output_tag(tag);
+    collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_.PerformOps(&meta_ops_);
+    call_.PerformOps(&collection_->meta_ops_);
   }
 
   void Read(R* msg, void* tag) GRPC_OVERRIDE {
-    read_ops_.set_output_tag(tag);
-    read_ops_.RecvMessage(msg);
-    call_.PerformOps(&read_ops_);
+    collection_->read_ops_.set_output_tag(tag);
+    collection_->read_ops_.RecvMessage(msg);
+    call_.PerformOps(&collection_->read_ops_);
   }
 
   void Finish(const W& msg, const Status& status, void* tag) {
-    finish_ops_.set_output_tag(tag);
+    collection_->finish_ops_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+      collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
     // The response is dropped if the status is not OK.
     if (status.ok()) {
-      finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
-                                   finish_ops_.SendMessage(msg));
+      collection_->finish_ops_.ServerSendStatus(
+          ctx_->trailing_metadata_, collection_->finish_ops_.SendMessage(msg));
     } else {
-      finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+      collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
+                                                status);
     }
-    call_.PerformOps(&finish_ops_);
+    call_.PerformOps(&collection_->finish_ops_);
   }
 
   void FinishWithError(const Status& status, void* tag) {
     GPR_ASSERT(!status.ok());
-    finish_ops_.set_output_tag(tag);
+    collection_->finish_ops_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+      collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
-    call_.PerformOps(&finish_ops_);
+    collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&collection_->finish_ops_);
   }
 
  private:
@@ -346,10 +393,19 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
 
   Call call_;
   ServerContext* ctx_;
-  CallOpSet<CallOpSendInitialMetadata> meta_ops_;
-  CallOpSet<CallOpRecvMessage<R>> read_ops_;
-  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
-            CallOpServerSendStatus> finish_ops_;
+  class CallOpSetCollection : public CallOpSetCollectionInterface {
+   public:
+    void SetCollection() {
+      meta_ops_.SetCollection(shared_from_this());
+      read_ops_.SetCollection(shared_from_this());
+          finish_ops_.SetCollection(shared_from_this();
+    }
+    CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+    CallOpSet<CallOpRecvMessage<R>> read_ops_;
+    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+              CallOpServerSendStatus> finish_ops_;
+  };
+  std::shared_ptr<CallOpSetCollection> collection_;
 };
 
 template <class W>
@@ -357,36 +413,40 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
                                      public AsyncWriterInterface<W> {
  public:
   explicit ServerAsyncWriter(ServerContext* ctx)
-      : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+      : call_(nullptr, nullptr, nullptr),
+        ctx_(ctx),
+        collection_(new CallOpSetCollection) {
+    collection_->SetCollection();
+  }
 
   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    meta_ops_.set_output_tag(tag);
-    meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+    collection_->meta_ops_.set_output_tag(tag);
+    collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_.PerformOps(&meta_ops_);
+    call_.PerformOps(&collection_->meta_ops_);
   }
 
   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
-    write_ops_.set_output_tag(tag);
+    collection_->write_ops_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+      collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
     // TODO(ctiller): don't assert
-    GPR_ASSERT(write_ops_.SendMessage(msg).ok());
-    call_.PerformOps(&write_ops_);
+    GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok());
+    call_.PerformOps(&collection_->write_ops_);
   }
 
   void Finish(const Status& status, void* tag) {
-    finish_ops_.set_output_tag(tag);
+    collection_->finish_ops_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+      collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
-    call_.PerformOps(&finish_ops_);
+    collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&collection_->finish_ops_);
   }
 
  private:
@@ -394,9 +454,18 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
 
   Call call_;
   ServerContext* ctx_;
-  CallOpSet<CallOpSendInitialMetadata> meta_ops_;
-  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
-  CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
+  class CallOpSetCollection : public CallOpSetCollectionInterface {
+   public:
+    void SetCollection() {
+      meta_ops_.SetCollection(shared_from_this());
+      write_ops_.SetCollection(shared_from_this());
+          finish_ops_.SetCollection(shared_from_this();
+    }
+    CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+    CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
+  };
+  std::shared_ptr<CallOpSetCollection> collection_;
 };
 
 /// Server-side interface for asynchronous bi-directional streaming.
@@ -406,42 +475,46 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
                                            public AsyncReaderInterface<R> {
  public:
   explicit ServerAsyncReaderWriter(ServerContext* ctx)
-      : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+      : call_(nullptr, nullptr, nullptr),
+        ctx_(ctx),
+        collection_(new CallOpSetCollection) {
+    collection_->SetCollection();
+  }
 
   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    meta_ops_.set_output_tag(tag);
-    meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+    collection_->meta_ops_.set_output_tag(tag);
+    collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_.PerformOps(&meta_ops_);
+    call_.PerformOps(&collection_->meta_ops_);
   }
 
   void Read(R* msg, void* tag) GRPC_OVERRIDE {
-    read_ops_.set_output_tag(tag);
-    read_ops_.RecvMessage(msg);
-    call_.PerformOps(&read_ops_);
+    collection_->read_ops_.set_output_tag(tag);
+    collection_->read_ops_.RecvMessage(msg);
+    call_.PerformOps(&collection_->read_ops_);
   }
 
   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
-    write_ops_.set_output_tag(tag);
+    collection_->write_ops_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+      collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
     // TODO(ctiller): don't assert
-    GPR_ASSERT(write_ops_.SendMessage(msg).ok());
-    call_.PerformOps(&write_ops_);
+    GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok());
+    call_.PerformOps(&collection_->write_ops_);
   }
 
   void Finish(const Status& status, void* tag) {
-    finish_ops_.set_output_tag(tag);
+    collection_->finish_ops_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+      collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
-    call_.PerformOps(&finish_ops_);
+    collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&collection_->finish_ops_);
   }
 
  private:
@@ -451,10 +524,20 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
 
   Call call_;
   ServerContext* ctx_;
-  CallOpSet<CallOpSendInitialMetadata> meta_ops_;
-  CallOpSet<CallOpRecvMessage<R>> read_ops_;
-  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
-  CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
+  class CallOpSetCollection : public CallOpSetCollectionInterface {
+   public:
+    void SetCollection() {
+      meta_ops_.SetCollection(shared_from_this());
+      read_ops_.SetCollection(shared_from_this());
+      write_ops_.SetCollection(shared_from_this());
+          finish_ops_.SetCollection(shared_from_this();
+    }
+    CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+    CallOpSet<CallOpRecvMessage<R>> read_ops_;
+    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+    CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
+  };
+  std::shared_ptr<CallOpSetCollection> collection_;
 };
 
 }  // namespace grpc
diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h
index 481b20b535..c1c637e928 100644
--- a/include/grpc++/impl/codegen/async_unary_call.h
+++ b/include/grpc++/impl/codegen/async_unary_call.h
@@ -62,40 +62,53 @@ class ClientAsyncResponseReader GRPC_FINAL
   ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
                             const RpcMethod& method, ClientContext* context,
                             const W& request)
-      : context_(context), call_(channel->CreateCall(method, context, cq)) {
-    init_buf_.SendInitialMetadata(context->send_initial_metadata_);
+      : context_(context),
+        call_(channel->CreateCall(method, context, cq)),
+        collection_(new CallOpSetCollection) {
+    collection_->SetCollection();
+    collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_);
     // TODO(ctiller): don't assert
-    GPR_ASSERT(init_buf_.SendMessage(request).ok());
-    init_buf_.ClientSendClose();
-    call_.PerformOps(&init_buf_);
+    GPR_ASSERT(collection_->init_buf_.SendMessage(request).ok());
+    collection_->init_buf_.ClientSendClose();
+    call_.PerformOps(&collection_->init_buf_);
   }
 
   void ReadInitialMetadata(void* tag) {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
-    meta_buf_.set_output_tag(tag);
-    meta_buf_.RecvInitialMetadata(context_);
-    call_.PerformOps(&meta_buf_);
+    collection_->meta_buf_.set_output_tag(tag);
+    collection_->meta_buf_.RecvInitialMetadata(context_);
+    call_.PerformOps(&collection_->meta_buf_);
   }
 
   void Finish(R* msg, Status* status, void* tag) {
-    finish_buf_.set_output_tag(tag);
+    collection_->finish_buf_.set_output_tag(tag);
     if (!context_->initial_metadata_received_) {
-      finish_buf_.RecvInitialMetadata(context_);
+      collection_->finish_buf_.RecvInitialMetadata(context_);
     }
-    finish_buf_.RecvMessage(msg);
-    finish_buf_.ClientRecvStatus(context_, status);
-    call_.PerformOps(&finish_buf_);
+    collection_->finish_buf_.RecvMessage(msg);
+    collection_->finish_buf_.ClientRecvStatus(context_, status);
+    call_.PerformOps(&collection_->finish_buf_);
   }
 
  private:
   ClientContext* context_;
   Call call_;
-  SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
-                  CallOpClientSendClose> init_buf_;
-  CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
-  CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
-            CallOpClientRecvStatus> finish_buf_;
+
+  class CallOpSetCollection : public CallOpSetCollectionInterface {
+   public:
+    void SetCollection() {
+      init_buf_.SetCollection(shared_from_this());
+      meta_buf_.SetCollection(shared_from_this());
+      finish_buf_.SetCollection(shared_from_this());
+    }
+    SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+                    CallOpClientSendClose> init_buf_;
+    CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
+    CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
+              CallOpClientRecvStatus> finish_buf_;
+  };
+  std::shared_ptr<CallOpSetCollection> collection_;
 };
 
 template <class W>
@@ -103,42 +116,47 @@ class ServerAsyncResponseWriter GRPC_FINAL
     : public ServerAsyncStreamingInterface {
  public:
   explicit ServerAsyncResponseWriter(ServerContext* ctx)
-      : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+      : call_(nullptr, nullptr, nullptr),
+        ctx_(ctx),
+        collection_(new CallOpSetCollection) {
+    collection_->SetCollection();
+  }
 
   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    meta_buf_.set_output_tag(tag);
-    meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+    collection_->meta_buf_.set_output_tag(tag);
+    collection_->meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_.PerformOps(&meta_buf_);
+    call_.PerformOps(&collection_->meta_buf_);
   }
 
   void Finish(const W& msg, const Status& status, void* tag) {
-    finish_buf_.set_output_tag(tag);
+    collection_->finish_buf_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+      collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
     // The response is dropped if the status is not OK.
     if (status.ok()) {
-      finish_buf_.ServerSendStatus(ctx_->trailing_metadata_,
-                                   finish_buf_.SendMessage(msg));
+      collection_->finish_buf_.ServerSendStatus(
+          ctx_->trailing_metadata_, collection_->finish_buf_.SendMessage(msg));
     } else {
-      finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
+      collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_,
+                                                status);
     }
-    call_.PerformOps(&finish_buf_);
+    call_.PerformOps(&collection_->finish_buf_);
   }
 
   void FinishWithError(const Status& status, void* tag) {
     GPR_ASSERT(!status.ok());
-    finish_buf_.set_output_tag(tag);
+    collection_->finish_buf_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+      collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
-    call_.PerformOps(&finish_buf_);
+    collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&collection_->finish_buf_);
   }
 
  private:
@@ -146,9 +164,17 @@ class ServerAsyncResponseWriter GRPC_FINAL
 
   Call call_;
   ServerContext* ctx_;
-  CallOpSet<CallOpSendInitialMetadata> meta_buf_;
-  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
-            CallOpServerSendStatus> finish_buf_;
+  class CallOpSetCollection : public CallOpSetCollectionInterface {
+   public:
+    void SetCollection() {
+      meta_buf_.SetCollection(shared_from_this());
+      finish_buf_.SetCollection(shared_from_this());
+    }
+    CallOpSet<CallOpSendInitialMetadata> meta_buf_;
+    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+              CallOpServerSendStatus> finish_buf_;
+  };
+  std::shared_ptr<CallOpSetCollection> collection_;
 };
 
 }  // namespace grpc
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index 1e06768ac4..94a8243ac6 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -472,6 +472,15 @@ class CallOpClientRecvStatus {
   size_t status_details_capacity_;
 };
 
+/// An abstract collection of CallOpSet's, to be used whenever
+/// CallOpSet objects must be thought of as a group. Each member
+/// of the group should have a shared_ptr back to the collection,
+/// as will the object that instantiates the collection, allowing
+/// for automatic ref-counting. In practice, any actual use should
+/// derive from this base class
+class CallOpSetCollectionInterface
+    : public std::enable_shared_from_this<CallOpSetCollectionInterface> {};
+
 /// An abstract collection of call ops, used to generate the
 /// grpc_call_op structure to pass down to the lower layers,
 /// and as it is-a CompletionQueueTag, also massages the final
@@ -488,8 +497,14 @@ class CallOpSetInterface : public CompletionQueueTag {
     max_message_size_ = max_message_size;
   }
 
+  /// Mark this as belonging to a collection
+  void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) {
+    collection_ = collection;
+  }
+
  protected:
   int max_message_size_;
+  std::shared_ptr<CallOpSetCollectionInterface> collection_;
 };
 
 /// Primary implementaiton of CallOpSetInterface.
-- 
cgit v1.2.3


From 7e3f9b0178eda9f4fe5aa407294e890f2953b317 Mon Sep 17 00:00:00 2001
From: Vijay Pai <vpai@google.com>
Date: Fri, 5 Feb 2016 13:43:17 -0800
Subject: Forgot to include these edits in the last commit

---
 include/grpc++/impl/codegen/async_stream.h | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index e449b92bb1..e13bab81e2 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -232,7 +232,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
       meta_ops_.SetCollection(shared_from_this());
       write_ops_.SetCollection(shared_from_this());
       writes_done_ops_.SetCollection(shared_from_this());
-          finish_ops_.SetCollection(shared_from_this();
+      finish_ops_.SetCollection(shared_from_this());
     }
     CallOpSet<CallOpSendInitialMetadata> init_ops_;
     CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
@@ -322,7 +322,7 @@ class ClientAsyncReaderWriter GRPC_FINAL
       read_ops_.SetCollection(shared_from_this());
       write_ops_.SetCollection(shared_from_this());
       writes_done_ops_.SetCollection(shared_from_this());
-          finish_ops_.SetCollection(shared_from_this();
+      finish_ops_.SetCollection(shared_from_this());
     }
     CallOpSet<CallOpSendInitialMetadata> init_ops_;
     CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
@@ -398,7 +398,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
     void SetCollection() {
       meta_ops_.SetCollection(shared_from_this());
       read_ops_.SetCollection(shared_from_this());
-          finish_ops_.SetCollection(shared_from_this();
+      finish_ops_.SetCollection(shared_from_this());
     }
     CallOpSet<CallOpSendInitialMetadata> meta_ops_;
     CallOpSet<CallOpRecvMessage<R>> read_ops_;
@@ -459,7 +459,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
     void SetCollection() {
       meta_ops_.SetCollection(shared_from_this());
       write_ops_.SetCollection(shared_from_this());
-          finish_ops_.SetCollection(shared_from_this();
+      finish_ops_.SetCollection(shared_from_this());
     }
     CallOpSet<CallOpSendInitialMetadata> meta_ops_;
     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
@@ -530,7 +530,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
       meta_ops_.SetCollection(shared_from_this());
       read_ops_.SetCollection(shared_from_this());
       write_ops_.SetCollection(shared_from_this());
-          finish_ops_.SetCollection(shared_from_this();
+      finish_ops_.SetCollection(shared_from_this());
     }
     CallOpSet<CallOpSendInitialMetadata> meta_ops_;
     CallOpSet<CallOpRecvMessage<R>> read_ops_;
-- 
cgit v1.2.3


From 5506bea349ccac69e9ee0410647ea8793cbee6f2 Mon Sep 17 00:00:00 2001
From: Vijay Pai <vpai@google.com>
Date: Fri, 5 Feb 2016 14:23:24 -0800
Subject: Undo much of the previous commits so that only CallOpSet groups with
 a Sneaky member are in a collection.

---
 include/grpc++/impl/codegen/async_stream.h     | 341 ++++++++++---------------
 include/grpc++/impl/codegen/async_unary_call.h |  47 ++--
 include/grpc++/impl/codegen/call.h             |   6 +-
 3 files changed, 150 insertions(+), 244 deletions(-)

diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index e13bab81e2..b0410485f8 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -105,62 +105,49 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
   ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
                     const RpcMethod& method, ClientContext* context,
                     const W& request, void* tag)
-      : context_(context),
-        call_(channel->CreateCall(method, context, cq)),
-        collection_(new CallOpSetCollection) {
-    collection_->SetCollection();
-    collection_->init_ops_.set_output_tag(tag);
-    collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+      : context_(context), call_(channel->CreateCall(method, context, cq)) {
+    init_ops_.set_output_tag(tag);
+    init_ops_.SendInitialMetadata(context->send_initial_metadata_);
     // TODO(ctiller): don't assert
-    GPR_ASSERT(collection_->init_ops_.SendMessage(request).ok());
-    collection_->init_ops_.ClientSendClose();
-    call_.PerformOps(&collection_->init_ops_);
+    GPR_ASSERT(init_ops_.SendMessage(request).ok());
+    init_ops_.ClientSendClose();
+    call_.PerformOps(&init_ops_);
   }
 
   void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
-    collection_->meta_ops_.set_output_tag(tag);
-    collection_->meta_ops_.RecvInitialMetadata(context_);
-    call_.PerformOps(&collection_->meta_ops_);
+    meta_ops_.set_output_tag(tag);
+    meta_ops_.RecvInitialMetadata(context_);
+    call_.PerformOps(&meta_ops_);
   }
 
   void Read(R* msg, void* tag) GRPC_OVERRIDE {
-    collection_->read_ops_.set_output_tag(tag);
+    read_ops_.set_output_tag(tag);
     if (!context_->initial_metadata_received_) {
-      collection_->read_ops_.RecvInitialMetadata(context_);
+      read_ops_.RecvInitialMetadata(context_);
     }
-    collection_->read_ops_.RecvMessage(msg);
-    call_.PerformOps(&collection_->read_ops_);
+    read_ops_.RecvMessage(msg);
+    call_.PerformOps(&read_ops_);
   }
 
   void Finish(Status* status, void* tag) GRPC_OVERRIDE {
-    collection_->finish_ops_.set_output_tag(tag);
+    finish_ops_.set_output_tag(tag);
     if (!context_->initial_metadata_received_) {
-      collection_->finish_ops_.RecvInitialMetadata(context_);
+      finish_ops_.RecvInitialMetadata(context_);
     }
-    collection_->finish_ops_.ClientRecvStatus(context_, status);
-    call_.PerformOps(&collection_->finish_ops_);
+    finish_ops_.ClientRecvStatus(context_, status);
+    call_.PerformOps(&finish_ops_);
   }
 
  private:
   ClientContext* context_;
   Call call_;
-  class CallOpSetCollection : public CallOpSetCollectionInterface {
-   public:
-    void SetCollection() {
-      init_ops_.SetCollection(shared_from_this());
-      meta_ops_.SetCollection(shared_from_this());
-      read_ops_.SetCollection(shared_from_this());
-      finish_ops_.SetCollection(shared_from_this());
-    }
-    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
-              CallOpClientSendClose> init_ops_;
-    CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
-    CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
-    CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
-  };
-  std::shared_ptr<CallOpSetCollection> collection_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+      init_ops_;
+  CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+  CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+  CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
 };
 
 /// Common interface for client side asynchronous writing.
@@ -181,67 +168,53 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
   ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
                     const RpcMethod& method, ClientContext* context,
                     R* response, void* tag)
-      : context_(context),
-        call_(channel->CreateCall(method, context, cq)),
-        collection_(new CallOpSetCollection) {
-    collection_->SetCollection();
-    collection_->finish_ops_.RecvMessage(response);
-
-    collection_->init_ops_.set_output_tag(tag);
-    collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_);
-    call_.PerformOps(&collection_->init_ops_);
+      : context_(context), call_(channel->CreateCall(method, context, cq)) {
+    finish_ops_.RecvMessage(response);
+
+    init_ops_.set_output_tag(tag);
+    init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+    call_.PerformOps(&init_ops_);
   }
 
   void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
-    collection_->meta_ops_.set_output_tag(tag);
-    collection_->meta_ops_.RecvInitialMetadata(context_);
-    call_.PerformOps(&collection_->meta_ops_);
+    meta_ops_.set_output_tag(tag);
+    meta_ops_.RecvInitialMetadata(context_);
+    call_.PerformOps(&meta_ops_);
   }
 
   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
-    collection_->write_ops_.set_output_tag(tag);
+    write_ops_.set_output_tag(tag);
     // TODO(ctiller): don't assert
-    GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok());
-    call_.PerformOps(&collection_->write_ops_);
+    GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+    call_.PerformOps(&write_ops_);
   }
 
   void WritesDone(void* tag) GRPC_OVERRIDE {
-    collection_->writes_done_ops_.set_output_tag(tag);
-    collection_->writes_done_ops_.ClientSendClose();
-    call_.PerformOps(&collection_->writes_done_ops_);
+    writes_done_ops_.set_output_tag(tag);
+    writes_done_ops_.ClientSendClose();
+    call_.PerformOps(&writes_done_ops_);
   }
 
   void Finish(Status* status, void* tag) GRPC_OVERRIDE {
-    collection_->finish_ops_.set_output_tag(tag);
+    finish_ops_.set_output_tag(tag);
     if (!context_->initial_metadata_received_) {
-      collection_->finish_ops_.RecvInitialMetadata(context_);
+      finish_ops_.RecvInitialMetadata(context_);
     }
-    collection_->finish_ops_.ClientRecvStatus(context_, status);
-    call_.PerformOps(&collection_->finish_ops_);
+    finish_ops_.ClientRecvStatus(context_, status);
+    call_.PerformOps(&finish_ops_);
   }
 
  private:
   ClientContext* context_;
   Call call_;
-  class CallOpSetCollection : public CallOpSetCollectionInterface {
-   public:
-    void SetCollection() {
-      init_ops_.SetCollection(shared_from_this());
-      meta_ops_.SetCollection(shared_from_this());
-      write_ops_.SetCollection(shared_from_this());
-      writes_done_ops_.SetCollection(shared_from_this());
-      finish_ops_.SetCollection(shared_from_this());
-    }
-    CallOpSet<CallOpSendInitialMetadata> init_ops_;
-    CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
-    CallOpSet<CallOpSendMessage> write_ops_;
-    CallOpSet<CallOpClientSendClose> writes_done_ops_;
-    CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
-              CallOpClientRecvStatus> finish_ops_;
-  };
-  std::shared_ptr<CallOpSetCollection> collection_;
+  CallOpSet<CallOpSendInitialMetadata> init_ops_;
+  CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+  CallOpSet<CallOpSendMessage> write_ops_;
+  CallOpSet<CallOpClientSendClose> writes_done_ops_;
+  CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
+            CallOpClientRecvStatus> finish_ops_;
 };
 
 /// Client-side interface for asynchronous bi-directional streaming.
@@ -263,75 +236,60 @@ class ClientAsyncReaderWriter GRPC_FINAL
   ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq,
                           const RpcMethod& method, ClientContext* context,
                           void* tag)
-      : context_(context),
-        call_(channel->CreateCall(method, context, cq)),
-        collection_(new CallOpSetCollection) {
-    collection_->SetCollection();
-    collection_->init_ops_.set_output_tag(tag);
-    collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_);
-    call_.PerformOps(&collection_->init_ops_);
+      : context_(context), call_(channel->CreateCall(method, context, cq)) {
+    init_ops_.set_output_tag(tag);
+    init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+    call_.PerformOps(&init_ops_);
   }
 
   void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
-    collection_->meta_ops_.set_output_tag(tag);
-    collection_->meta_ops_.RecvInitialMetadata(context_);
-    call_.PerformOps(&collection_->meta_ops_);
+    meta_ops_.set_output_tag(tag);
+    meta_ops_.RecvInitialMetadata(context_);
+    call_.PerformOps(&meta_ops_);
   }
 
   void Read(R* msg, void* tag) GRPC_OVERRIDE {
-    collection_->read_ops_.set_output_tag(tag);
+    read_ops_.set_output_tag(tag);
     if (!context_->initial_metadata_received_) {
-      collection_->read_ops_.RecvInitialMetadata(context_);
+      read_ops_.RecvInitialMetadata(context_);
     }
-    collection_->read_ops_.RecvMessage(msg);
-    call_.PerformOps(&collection_->read_ops_);
+    read_ops_.RecvMessage(msg);
+    call_.PerformOps(&read_ops_);
   }
 
   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
-    collection_->write_ops_.set_output_tag(tag);
+    write_ops_.set_output_tag(tag);
     // TODO(ctiller): don't assert
-    GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok());
-    call_.PerformOps(&collection_->write_ops_);
+    GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+    call_.PerformOps(&write_ops_);
   }
 
   void WritesDone(void* tag) GRPC_OVERRIDE {
-    collection_->writes_done_ops_.set_output_tag(tag);
-    collection_->writes_done_ops_.ClientSendClose();
-    call_.PerformOps(&collection_->writes_done_ops_);
+    writes_done_ops_.set_output_tag(tag);
+    writes_done_ops_.ClientSendClose();
+    call_.PerformOps(&writes_done_ops_);
   }
 
   void Finish(Status* status, void* tag) GRPC_OVERRIDE {
-    collection_->finish_ops_.set_output_tag(tag);
+    finish_ops_.set_output_tag(tag);
     if (!context_->initial_metadata_received_) {
-      collection_->finish_ops_.RecvInitialMetadata(context_);
+      finish_ops_.RecvInitialMetadata(context_);
     }
-    collection_->finish_ops_.ClientRecvStatus(context_, status);
-    call_.PerformOps(&collection_->finish_ops_);
+    finish_ops_.ClientRecvStatus(context_, status);
+    call_.PerformOps(&finish_ops_);
   }
 
  private:
   ClientContext* context_;
   Call call_;
-  class CallOpSetCollection : public CallOpSetCollectionInterface {
-   public:
-    void SetCollection() {
-      init_ops_.SetCollection(shared_from_this());
-      meta_ops_.SetCollection(shared_from_this());
-      read_ops_.SetCollection(shared_from_this());
-      write_ops_.SetCollection(shared_from_this());
-      writes_done_ops_.SetCollection(shared_from_this());
-      finish_ops_.SetCollection(shared_from_this());
-    }
-    CallOpSet<CallOpSendInitialMetadata> init_ops_;
-    CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
-    CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
-    CallOpSet<CallOpSendMessage> write_ops_;
-    CallOpSet<CallOpClientSendClose> writes_done_ops_;
-    CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
-  };
-  std::shared_ptr<CallOpSetCollection> collection_;
+  CallOpSet<CallOpSendInitialMetadata> init_ops_;
+  CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+  CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+  CallOpSet<CallOpSendMessage> write_ops_;
+  CallOpSet<CallOpClientSendClose> writes_done_ops_;
+  CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
 };
 
 template <class W, class R>
@@ -339,53 +297,48 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
                                      public AsyncReaderInterface<R> {
  public:
   explicit ServerAsyncReader(ServerContext* ctx)
-      : call_(nullptr, nullptr, nullptr),
-        ctx_(ctx),
-        collection_(new CallOpSetCollection) {
-    collection_->SetCollection();
-  }
+      : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
 
   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    collection_->meta_ops_.set_output_tag(tag);
-    collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+    meta_ops_.set_output_tag(tag);
+    meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_.PerformOps(&collection_->meta_ops_);
+    call_.PerformOps(&meta_ops_);
   }
 
   void Read(R* msg, void* tag) GRPC_OVERRIDE {
-    collection_->read_ops_.set_output_tag(tag);
-    collection_->read_ops_.RecvMessage(msg);
-    call_.PerformOps(&collection_->read_ops_);
+    read_ops_.set_output_tag(tag);
+    read_ops_.RecvMessage(msg);
+    call_.PerformOps(&read_ops_);
   }
 
   void Finish(const W& msg, const Status& status, void* tag) {
-    collection_->finish_ops_.set_output_tag(tag);
+    finish_ops_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
     // The response is dropped if the status is not OK.
     if (status.ok()) {
-      collection_->finish_ops_.ServerSendStatus(
-          ctx_->trailing_metadata_, collection_->finish_ops_.SendMessage(msg));
+      finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
+                                   finish_ops_.SendMessage(msg));
     } else {
-      collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
-                                                status);
+      finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
     }
-    call_.PerformOps(&collection_->finish_ops_);
+    call_.PerformOps(&finish_ops_);
   }
 
   void FinishWithError(const Status& status, void* tag) {
     GPR_ASSERT(!status.ok());
-    collection_->finish_ops_.set_output_tag(tag);
+    finish_ops_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
-    call_.PerformOps(&collection_->finish_ops_);
+    finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&finish_ops_);
   }
 
  private:
@@ -393,19 +346,10 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
 
   Call call_;
   ServerContext* ctx_;
-  class CallOpSetCollection : public CallOpSetCollectionInterface {
-   public:
-    void SetCollection() {
-      meta_ops_.SetCollection(shared_from_this());
-      read_ops_.SetCollection(shared_from_this());
-      finish_ops_.SetCollection(shared_from_this());
-    }
-    CallOpSet<CallOpSendInitialMetadata> meta_ops_;
-    CallOpSet<CallOpRecvMessage<R>> read_ops_;
-    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
-              CallOpServerSendStatus> finish_ops_;
-  };
-  std::shared_ptr<CallOpSetCollection> collection_;
+  CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+  CallOpSet<CallOpRecvMessage<R>> read_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+            CallOpServerSendStatus> finish_ops_;
 };
 
 template <class W>
@@ -413,40 +357,36 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
                                      public AsyncWriterInterface<W> {
  public:
   explicit ServerAsyncWriter(ServerContext* ctx)
-      : call_(nullptr, nullptr, nullptr),
-        ctx_(ctx),
-        collection_(new CallOpSetCollection) {
-    collection_->SetCollection();
-  }
+      : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
 
   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    collection_->meta_ops_.set_output_tag(tag);
-    collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+    meta_ops_.set_output_tag(tag);
+    meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_.PerformOps(&collection_->meta_ops_);
+    call_.PerformOps(&meta_ops_);
   }
 
   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
-    collection_->write_ops_.set_output_tag(tag);
+    write_ops_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+      write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
     // TODO(ctiller): don't assert
-    GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok());
-    call_.PerformOps(&collection_->write_ops_);
+    GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+    call_.PerformOps(&write_ops_);
   }
 
   void Finish(const Status& status, void* tag) {
-    collection_->finish_ops_.set_output_tag(tag);
+    finish_ops_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
-    call_.PerformOps(&collection_->finish_ops_);
+    finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&finish_ops_);
   }
 
  private:
@@ -454,18 +394,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
 
   Call call_;
   ServerContext* ctx_;
-  class CallOpSetCollection : public CallOpSetCollectionInterface {
-   public:
-    void SetCollection() {
-      meta_ops_.SetCollection(shared_from_this());
-      write_ops_.SetCollection(shared_from_this());
-      finish_ops_.SetCollection(shared_from_this());
-    }
-    CallOpSet<CallOpSendInitialMetadata> meta_ops_;
-    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
-    CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
-  };
-  std::shared_ptr<CallOpSetCollection> collection_;
+  CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
 };
 
 /// Server-side interface for asynchronous bi-directional streaming.
@@ -475,46 +406,42 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
                                            public AsyncReaderInterface<R> {
  public:
   explicit ServerAsyncReaderWriter(ServerContext* ctx)
-      : call_(nullptr, nullptr, nullptr),
-        ctx_(ctx),
-        collection_(new CallOpSetCollection) {
-    collection_->SetCollection();
-  }
+      : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
 
   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    collection_->meta_ops_.set_output_tag(tag);
-    collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+    meta_ops_.set_output_tag(tag);
+    meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_.PerformOps(&collection_->meta_ops_);
+    call_.PerformOps(&meta_ops_);
   }
 
   void Read(R* msg, void* tag) GRPC_OVERRIDE {
-    collection_->read_ops_.set_output_tag(tag);
-    collection_->read_ops_.RecvMessage(msg);
-    call_.PerformOps(&collection_->read_ops_);
+    read_ops_.set_output_tag(tag);
+    read_ops_.RecvMessage(msg);
+    call_.PerformOps(&read_ops_);
   }
 
   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
-    collection_->write_ops_.set_output_tag(tag);
+    write_ops_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+      write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
     // TODO(ctiller): don't assert
-    GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok());
-    call_.PerformOps(&collection_->write_ops_);
+    GPR_ASSERT(write_ops_.SendMessage(msg).ok());
+    call_.PerformOps(&write_ops_);
   }
 
   void Finish(const Status& status, void* tag) {
-    collection_->finish_ops_.set_output_tag(tag);
+    finish_ops_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+      finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
-    call_.PerformOps(&collection_->finish_ops_);
+    finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&finish_ops_);
   }
 
  private:
@@ -524,20 +451,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
 
   Call call_;
   ServerContext* ctx_;
-  class CallOpSetCollection : public CallOpSetCollectionInterface {
-   public:
-    void SetCollection() {
-      meta_ops_.SetCollection(shared_from_this());
-      read_ops_.SetCollection(shared_from_this());
-      write_ops_.SetCollection(shared_from_this());
-      finish_ops_.SetCollection(shared_from_this());
-    }
-    CallOpSet<CallOpSendInitialMetadata> meta_ops_;
-    CallOpSet<CallOpRecvMessage<R>> read_ops_;
-    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
-    CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
-  };
-  std::shared_ptr<CallOpSetCollection> collection_;
+  CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+  CallOpSet<CallOpRecvMessage<R>> read_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
 };
 
 }  // namespace grpc
diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h
index c1c637e928..f78ba891e8 100644
--- a/include/grpc++/impl/codegen/async_unary_call.h
+++ b/include/grpc++/impl/codegen/async_unary_call.h
@@ -116,47 +116,42 @@ class ServerAsyncResponseWriter GRPC_FINAL
     : public ServerAsyncStreamingInterface {
  public:
   explicit ServerAsyncResponseWriter(ServerContext* ctx)
-      : call_(nullptr, nullptr, nullptr),
-        ctx_(ctx),
-        collection_(new CallOpSetCollection) {
-    collection_->SetCollection();
-  }
+      : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
 
   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     GPR_ASSERT(!ctx_->sent_initial_metadata_);
 
-    collection_->meta_buf_.set_output_tag(tag);
-    collection_->meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+    meta_buf_.set_output_tag(tag);
+    meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
     ctx_->sent_initial_metadata_ = true;
-    call_.PerformOps(&collection_->meta_buf_);
+    call_.PerformOps(&meta_buf_);
   }
 
   void Finish(const W& msg, const Status& status, void* tag) {
-    collection_->finish_buf_.set_output_tag(tag);
+    finish_buf_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+      finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
     // The response is dropped if the status is not OK.
     if (status.ok()) {
-      collection_->finish_buf_.ServerSendStatus(
-          ctx_->trailing_metadata_, collection_->finish_buf_.SendMessage(msg));
+      finish_buf_.ServerSendStatus(ctx_->trailing_metadata_,
+                                   finish_buf_.SendMessage(msg));
     } else {
-      collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_,
-                                                status);
+      finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
     }
-    call_.PerformOps(&collection_->finish_buf_);
+    call_.PerformOps(&finish_buf_);
   }
 
   void FinishWithError(const Status& status, void* tag) {
     GPR_ASSERT(!status.ok());
-    collection_->finish_buf_.set_output_tag(tag);
+    finish_buf_.set_output_tag(tag);
     if (!ctx_->sent_initial_metadata_) {
-      collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+      finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
       ctx_->sent_initial_metadata_ = true;
     }
-    collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
-    call_.PerformOps(&collection_->finish_buf_);
+    finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
+    call_.PerformOps(&finish_buf_);
   }
 
  private:
@@ -164,17 +159,9 @@ class ServerAsyncResponseWriter GRPC_FINAL
 
   Call call_;
   ServerContext* ctx_;
-  class CallOpSetCollection : public CallOpSetCollectionInterface {
-   public:
-    void SetCollection() {
-      meta_buf_.SetCollection(shared_from_this());
-      finish_buf_.SetCollection(shared_from_this());
-    }
-    CallOpSet<CallOpSendInitialMetadata> meta_buf_;
-    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
-              CallOpServerSendStatus> finish_buf_;
-  };
-  std::shared_ptr<CallOpSetCollection> collection_;
+  CallOpSet<CallOpSendInitialMetadata> meta_buf_;
+  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+            CallOpServerSendStatus> finish_buf_;
 };
 
 }  // namespace grpc
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index 94a8243ac6..03280b7e3e 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -477,7 +477,9 @@ class CallOpClientRecvStatus {
 /// of the group should have a shared_ptr back to the collection,
 /// as will the object that instantiates the collection, allowing
 /// for automatic ref-counting. In practice, any actual use should
-/// derive from this base class
+/// derive from this base class. This is specifically necessary if
+/// some of the CallOpSet's in the collection are "Sneaky" and don't
+/// report back to the C++ layer CQ operations
 class CallOpSetCollectionInterface
     : public std::enable_shared_from_this<CallOpSetCollectionInterface> {};
 
@@ -497,7 +499,7 @@ class CallOpSetInterface : public CompletionQueueTag {
     max_message_size_ = max_message_size;
   }
 
-  /// Mark this as belonging to a collection
+  /// Mark this as belonging to a collection if needed
   void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) {
     collection_ = collection;
   }
-- 
cgit v1.2.3


From 2b5638668e6c72016028b1fe11b78d78711c29d8 Mon Sep 17 00:00:00 2001
From: Vijay Pai <vpai@google.com>
Date: Fri, 5 Feb 2016 15:11:36 -0800
Subject: Drop the ref

---
 include/grpc++/impl/codegen/call.h | 1 +
 1 file changed, 1 insertion(+)

diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index 03280b7e3e..d342a3a25a 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -544,6 +544,7 @@ class CallOpSet : public CallOpSetInterface,
     this->Op5::FinishOp(status, max_message_size_);
     this->Op6::FinishOp(status, max_message_size_);
     *tag = return_tag_;
+    SetCollection(nullptr); // drop the ref at this point
     return true;
   }
 
-- 
cgit v1.2.3


From bedf57fe8c323c2e26a2ad64e29521a353343a6b Mon Sep 17 00:00:00 2001
From: Vijay Pai <vpai@google.com>
Date: Fri, 5 Feb 2016 16:45:54 -0800
Subject: Ref the collection only when it will be used (and later finalized)

---
 include/grpc++/impl/codegen/async_unary_call.h | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h
index f78ba891e8..f3c75dc3b1 100644
--- a/include/grpc++/impl/codegen/async_unary_call.h
+++ b/include/grpc++/impl/codegen/async_unary_call.h
@@ -65,7 +65,7 @@ class ClientAsyncResponseReader GRPC_FINAL
       : context_(context),
         call_(channel->CreateCall(method, context, cq)),
         collection_(new CallOpSetCollection) {
-    collection_->SetCollection();
+    collection_->init_buf_.SetCollection(collection_);
     collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_);
     // TODO(ctiller): don't assert
     GPR_ASSERT(collection_->init_buf_.SendMessage(request).ok());
@@ -76,12 +76,14 @@ class ClientAsyncResponseReader GRPC_FINAL
   void ReadInitialMetadata(void* tag) {
     GPR_ASSERT(!context_->initial_metadata_received_);
 
+    collection_->meta_buf_.SetCollection(collection_);
     collection_->meta_buf_.set_output_tag(tag);
     collection_->meta_buf_.RecvInitialMetadata(context_);
     call_.PerformOps(&collection_->meta_buf_);
   }
 
   void Finish(R* msg, Status* status, void* tag) {
+    collection_->finish_buf_.SetCollection(collection_);
     collection_->finish_buf_.set_output_tag(tag);
     if (!context_->initial_metadata_received_) {
       collection_->finish_buf_.RecvInitialMetadata(context_);
@@ -97,11 +99,6 @@ class ClientAsyncResponseReader GRPC_FINAL
 
   class CallOpSetCollection : public CallOpSetCollectionInterface {
    public:
-    void SetCollection() {
-      init_buf_.SetCollection(shared_from_this());
-      meta_buf_.SetCollection(shared_from_this());
-      finish_buf_.SetCollection(shared_from_this());
-    }
     SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
                     CallOpClientSendClose> init_buf_;
     CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
-- 
cgit v1.2.3


From c593ca017161e48367bf7182f01dac477109cf21 Mon Sep 17 00:00:00 2001
From: Vijay Pai <vpai@google.com>
Date: Mon, 8 Feb 2016 11:38:42 -0800
Subject: reset the shared_ptr

---
 include/grpc++/impl/codegen/call.h | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index d342a3a25a..c075388266 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -544,7 +544,7 @@ class CallOpSet : public CallOpSetInterface,
     this->Op5::FinishOp(status, max_message_size_);
     this->Op6::FinishOp(status, max_message_size_);
     *tag = return_tag_;
-    SetCollection(nullptr); // drop the ref at this point
+    collection_.reset(); // drop the ref at this point
     return true;
   }
 
-- 
cgit v1.2.3


From 30bf3ea4b94eb3ded9296e3e7825d94d402be8be Mon Sep 17 00:00:00 2001
From: Vijay Pai <vpai@google.com>
Date: Mon, 8 Feb 2016 13:21:07 -0800
Subject: clang-format

---
 include/grpc++/impl/codegen/call.h | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index c075388266..e65349ddd3 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -544,7 +544,7 @@ class CallOpSet : public CallOpSetInterface,
     this->Op5::FinishOp(status, max_message_size_);
     this->Op6::FinishOp(status, max_message_size_);
     *tag = return_tag_;
-    collection_.reset(); // drop the ref at this point
+    collection_.reset();  // drop the ref at this point
     return true;
   }
 
-- 
cgit v1.2.3