diff options
-rw-r--r-- | include/grpc++/impl/codegen/async_stream.h | 341 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/async_unary_call.h | 47 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/call.h | 6 |
3 files changed, 150 insertions, 244 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index e13bab81e2..b0410485f8 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -105,62 +105,49 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> { ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request, void* tag) - : context_(context), - call_(channel->CreateCall(method, context, cq)), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - collection_->init_ops_.set_output_tag(tag); - collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); + : context_(context), call_(channel->CreateCall(method, context, cq)) { + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); // TODO(ctiller): don't assert - GPR_ASSERT(collection_->init_ops_.SendMessage(request).ok()); - collection_->init_ops_.ClientSendClose(); - call_.PerformOps(&collection_->init_ops_); + GPR_ASSERT(init_ops_.SendMessage(request).ok()); + init_ops_.ClientSendClose(); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - collection_->meta_ops_.set_output_tag(tag); - collection_->meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&collection_->meta_ops_); + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - collection_->read_ops_.set_output_tag(tag); + read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - collection_->read_ops_.RecvInitialMetadata(context_); + read_ops_.RecvInitialMetadata(context_); } - collection_->read_ops_.RecvMessage(msg); - call_.PerformOps(&collection_->read_ops_); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - collection_->finish_ops_.RecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - collection_->finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&collection_->finish_ops_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; Call call_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - init_ops_.SetCollection(shared_from_this()); - meta_ops_.SetCollection(shared_from_this()); - read_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; }; /// Common interface for client side asynchronous writing. @@ -181,67 +168,53 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, R* response, void* tag) - : context_(context), - call_(channel->CreateCall(method, context, cq)), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - collection_->finish_ops_.RecvMessage(response); - - collection_->init_ops_.set_output_tag(tag); - collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&collection_->init_ops_); + : context_(context), call_(channel->CreateCall(method, context, cq)) { + finish_ops_.RecvMessage(response); + + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - collection_->meta_ops_.set_output_tag(tag); - collection_->meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&collection_->meta_ops_); + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - collection_->write_ops_.set_output_tag(tag); + write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&collection_->write_ops_); + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - collection_->writes_done_ops_.set_output_tag(tag); - collection_->writes_done_ops_.ClientSendClose(); - call_.PerformOps(&collection_->writes_done_ops_); + writes_done_ops_.set_output_tag(tag); + writes_done_ops_.ClientSendClose(); + call_.PerformOps(&writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - collection_->finish_ops_.RecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - collection_->finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&collection_->finish_ops_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; Call call_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - init_ops_.SetCollection(shared_from_this()); - meta_ops_.SetCollection(shared_from_this()); - write_ops_.SetCollection(shared_from_this()); - writes_done_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, - CallOpClientRecvStatus> finish_ops_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpSendMessage> write_ops_; + CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, + CallOpClientRecvStatus> finish_ops_; }; /// Client-side interface for asynchronous bi-directional streaming. @@ -263,75 +236,60 @@ class ClientAsyncReaderWriter GRPC_FINAL ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, void* tag) - : context_(context), - call_(channel->CreateCall(method, context, cq)), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - collection_->init_ops_.set_output_tag(tag); - collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&collection_->init_ops_); + : context_(context), call_(channel->CreateCall(method, context, cq)) { + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - collection_->meta_ops_.set_output_tag(tag); - collection_->meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&collection_->meta_ops_); + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - collection_->read_ops_.set_output_tag(tag); + read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - collection_->read_ops_.RecvInitialMetadata(context_); + read_ops_.RecvInitialMetadata(context_); } - collection_->read_ops_.RecvMessage(msg); - call_.PerformOps(&collection_->read_ops_); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - collection_->write_ops_.set_output_tag(tag); + write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&collection_->write_ops_); + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - collection_->writes_done_ops_.set_output_tag(tag); - collection_->writes_done_ops_.ClientSendClose(); - call_.PerformOps(&collection_->writes_done_ops_); + writes_done_ops_.set_output_tag(tag); + writes_done_ops_.ClientSendClose(); + call_.PerformOps(&writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - collection_->finish_ops_.RecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - collection_->finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&collection_->finish_ops_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; Call call_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - init_ops_.SetCollection(shared_from_this()); - meta_ops_.SetCollection(shared_from_this()); - read_ops_.SetCollection(shared_from_this()); - write_ops_.SetCollection(shared_from_this()); - writes_done_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendMessage> write_ops_; + CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; }; template <class W, class R> @@ -339,53 +297,48 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncReaderInterface<R> { public: explicit ServerAsyncReader(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), - ctx_(ctx), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - } + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - collection_->meta_ops_.set_output_tag(tag); - collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&collection_->meta_ops_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - collection_->read_ops_.set_output_tag(tag); - collection_->read_ops_.RecvMessage(msg); - call_.PerformOps(&collection_->read_ops_); + read_ops_.set_output_tag(tag); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Finish(const W& msg, const Status& status, void* tag) { - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.ok()) { - collection_->finish_ops_.ServerSendStatus( - ctx_->trailing_metadata_, collection_->finish_ops_.SendMessage(msg)); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, + finish_ops_.SendMessage(msg)); } else { - collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, - status); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); } - call_.PerformOps(&collection_->finish_ops_); + call_.PerformOps(&finish_ops_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.ok()); - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&collection_->finish_ops_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -393,19 +346,10 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - meta_ops_.SetCollection(shared_from_this()); - read_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> finish_ops_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> finish_ops_; }; template <class W> @@ -413,40 +357,36 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncWriterInterface<W> { public: explicit ServerAsyncWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), - ctx_(ctx), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - } + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - collection_->meta_ops_.set_output_tag(tag); - collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&collection_->meta_ops_); + call_.PerformOps(&meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - collection_->write_ops_.set_output_tag(tag); + write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert - GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&collection_->write_ops_); + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); } void Finish(const Status& status, void* tag) { - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&collection_->finish_ops_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -454,18 +394,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - meta_ops_.SetCollection(shared_from_this()); - write_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; /// Server-side interface for asynchronous bi-directional streaming. @@ -475,46 +406,42 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncReaderInterface<R> { public: explicit ServerAsyncReaderWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), - ctx_(ctx), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - } + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - collection_->meta_ops_.set_output_tag(tag); - collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&collection_->meta_ops_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - collection_->read_ops_.set_output_tag(tag); - collection_->read_ops_.RecvMessage(msg); - call_.PerformOps(&collection_->read_ops_); + read_ops_.set_output_tag(tag); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - collection_->write_ops_.set_output_tag(tag); + write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert - GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&collection_->write_ops_); + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); } void Finish(const Status& status, void* tag) { - collection_->finish_ops_.set_output_tag(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&collection_->finish_ops_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -524,20 +451,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - meta_ops_.SetCollection(shared_from_this()); - read_ops_.SetCollection(shared_from_this()); - write_ops_.SetCollection(shared_from_this()); - finish_ops_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index c1c637e928..f78ba891e8 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -116,47 +116,42 @@ class ServerAsyncResponseWriter GRPC_FINAL : public ServerAsyncStreamingInterface { public: explicit ServerAsyncResponseWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), - ctx_(ctx), - collection_(new CallOpSetCollection) { - collection_->SetCollection(); - } + : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - collection_->meta_buf_.set_output_tag(tag); - collection_->meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); + meta_buf_.set_output_tag(tag); + meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&collection_->meta_buf_); + call_.PerformOps(&meta_buf_); } void Finish(const W& msg, const Status& status, void* tag) { - collection_->finish_buf_.set_output_tag(tag); + finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); + finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.ok()) { - collection_->finish_buf_.ServerSendStatus( - ctx_->trailing_metadata_, collection_->finish_buf_.SendMessage(msg)); + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, + finish_buf_.SendMessage(msg)); } else { - collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, - status); + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); } - call_.PerformOps(&collection_->finish_buf_); + call_.PerformOps(&finish_buf_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.ok()); - collection_->finish_buf_.set_output_tag(tag); + finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); + finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&collection_->finish_buf_); + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_buf_); } private: @@ -164,17 +159,9 @@ class ServerAsyncResponseWriter GRPC_FINAL Call call_; ServerContext* ctx_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - void SetCollection() { - meta_buf_.SetCollection(shared_from_this()); - finish_buf_.SetCollection(shared_from_this()); - } - CallOpSet<CallOpSendInitialMetadata> meta_buf_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> finish_buf_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + CallOpSet<CallOpSendInitialMetadata> meta_buf_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> finish_buf_; }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 94a8243ac6..03280b7e3e 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -477,7 +477,9 @@ class CallOpClientRecvStatus { /// of the group should have a shared_ptr back to the collection, /// as will the object that instantiates the collection, allowing /// for automatic ref-counting. In practice, any actual use should -/// derive from this base class +/// derive from this base class. This is specifically necessary if +/// some of the CallOpSet's in the collection are "Sneaky" and don't +/// report back to the C++ layer CQ operations class CallOpSetCollectionInterface : public std::enable_shared_from_this<CallOpSetCollectionInterface> {}; @@ -497,7 +499,7 @@ class CallOpSetInterface : public CompletionQueueTag { max_message_size_ = max_message_size; } - /// Mark this as belonging to a collection + /// Mark this as belonging to a collection if needed void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) { collection_ = collection; } |