diff options
Diffstat (limited to 'include/grpc++/impl/codegen/async_stream.h')
-rw-r--r-- | include/grpc++/impl/codegen/async_stream.h | 341 |
1 files changed, 212 insertions, 129 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index b0410485f8..e13bab81e2 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -105,49 +105,62 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> { ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, const W& request, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); + : context_(context), + call_(channel->CreateCall(method, context, cq)), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + collection_->init_ops_.set_output_tag(tag); + collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); // TODO(ctiller): don't assert - GPR_ASSERT(init_ops_.SendMessage(request).ok()); - init_ops_.ClientSendClose(); - call_.PerformOps(&init_ops_); + GPR_ASSERT(collection_->init_ops_.SendMessage(request).ok()); + collection_->init_ops_.ClientSendClose(); + call_.PerformOps(&collection_->init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&collection_->meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); + collection_->read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); + collection_->read_ops_.RecvInitialMetadata(context_); } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); + collection_->read_ops_.RecvMessage(msg); + call_.PerformOps(&collection_->read_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); + collection_->finish_ops_.RecvInitialMetadata(context_); } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&collection_->finish_ops_); } private: ClientContext* context_; Call call_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> - init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + init_ops_.SetCollection(shared_from_this()); + meta_ops_.SetCollection(shared_from_this()); + read_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this()); + } + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; /// Common interface for client side asynchronous writing. @@ -168,53 +181,67 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, R* response, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - finish_ops_.RecvMessage(response); - - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); + : context_(context), + call_(channel->CreateCall(method, context, cq)), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + collection_->finish_ops_.RecvMessage(response); + + collection_->init_ops_.set_output_tag(tag); + collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&collection_->init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&collection_->meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); + collection_->write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); + GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&collection_->write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); + collection_->writes_done_ops_.set_output_tag(tag); + collection_->writes_done_ops_.ClientSendClose(); + call_.PerformOps(&collection_->writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); + collection_->finish_ops_.RecvInitialMetadata(context_); } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&collection_->finish_ops_); } private: ClientContext* context_; Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, - CallOpClientRecvStatus> finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + init_ops_.SetCollection(shared_from_this()); + meta_ops_.SetCollection(shared_from_this()); + write_ops_.SetCollection(shared_from_this()); + writes_done_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this()); + } + CallOpSet<CallOpSendInitialMetadata> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpSendMessage> write_ops_; + CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, + CallOpClientRecvStatus> finish_ops_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; /// Client-side interface for asynchronous bi-directional streaming. @@ -236,60 +263,75 @@ class ClientAsyncReaderWriter GRPC_FINAL ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_); - call_.PerformOps(&init_ops_); + : context_(context), + call_(channel->CreateCall(method, context, cq)), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + collection_->init_ops_.set_output_tag(tag); + collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&collection_->init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_ops_.set_output_tag(tag); - meta_ops_.RecvInitialMetadata(context_); - call_.PerformOps(&meta_ops_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&collection_->meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); + collection_->read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - read_ops_.RecvInitialMetadata(context_); + collection_->read_ops_.RecvInitialMetadata(context_); } - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); + collection_->read_ops_.RecvMessage(msg); + call_.PerformOps(&collection_->read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); + collection_->write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); + GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&collection_->write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_ops_.set_output_tag(tag); - writes_done_ops_.ClientSendClose(); - call_.PerformOps(&writes_done_ops_); + collection_->writes_done_ops_.set_output_tag(tag); + collection_->writes_done_ops_.ClientSendClose(); + call_.PerformOps(&collection_->writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_ops_.RecvInitialMetadata(context_); + collection_->finish_ops_.RecvInitialMetadata(context_); } - finish_ops_.ClientRecvStatus(context_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&collection_->finish_ops_); } private: ClientContext* context_; Call call_; - CallOpSet<CallOpSendInitialMetadata> init_ops_; - CallOpSet<CallOpRecvInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendMessage> write_ops_; - CallOpSet<CallOpClientSendClose> writes_done_ops_; - CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + init_ops_.SetCollection(shared_from_this()); + meta_ops_.SetCollection(shared_from_this()); + read_ops_.SetCollection(shared_from_this()); + write_ops_.SetCollection(shared_from_this()); + writes_done_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this()); + } + CallOpSet<CallOpSendInitialMetadata> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendMessage> write_ops_; + CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; template <class W, class R> @@ -297,48 +339,53 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncReaderInterface<R> { public: explicit ServerAsyncReader(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + : call_(nullptr, nullptr, nullptr), + ctx_(ctx), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + } void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); + call_.PerformOps(&collection_->meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); + collection_->read_ops_.set_output_tag(tag); + collection_->read_ops_.RecvMessage(msg); + call_.PerformOps(&collection_->read_ops_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.ok()) { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, - finish_ops_.SendMessage(msg)); + collection_->finish_ops_.ServerSendStatus( + ctx_->trailing_metadata_, collection_->finish_ops_.SendMessage(msg)); } else { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, + status); } - call_.PerformOps(&finish_ops_); + call_.PerformOps(&collection_->finish_ops_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.ok()); - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&collection_->finish_ops_); } private: @@ -346,10 +393,19 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpServerSendStatus> finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + meta_ops_.SetCollection(shared_from_this()); + read_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this()); + } + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> finish_ops_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; template <class W> @@ -357,36 +413,40 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncWriterInterface<W> { public: explicit ServerAsyncWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + : call_(nullptr, nullptr, nullptr), + ctx_(ctx), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + } void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); + call_.PerformOps(&collection_->meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); + collection_->write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); + GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&collection_->write_ops_); } void Finish(const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&collection_->finish_ops_); } private: @@ -394,9 +454,18 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + meta_ops_.SetCollection(shared_from_this()); + write_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this()); + } + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; /// Server-side interface for asynchronous bi-directional streaming. @@ -406,42 +475,46 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, public AsyncReaderInterface<R> { public: explicit ServerAsyncReaderWriter(ServerContext* ctx) - : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} + : call_(nullptr, nullptr, nullptr), + ctx_(ctx), + collection_(new CallOpSetCollection) { + collection_->SetCollection(); + } void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->meta_ops_.set_output_tag(tag); + collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_ops_); + call_.PerformOps(&collection_->meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_ops_.set_output_tag(tag); - read_ops_.RecvMessage(msg); - call_.PerformOps(&read_ops_); + collection_->read_ops_.set_output_tag(tag); + collection_->read_ops_.RecvMessage(msg); + call_.PerformOps(&collection_->read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_ops_.set_output_tag(tag); + collection_->write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - write_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); - call_.PerformOps(&write_ops_); + GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&collection_->write_ops_); } void Finish(const Status& status, void* tag) { - finish_ops_.set_output_tag(tag); + collection_->finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); + collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_ops_); + collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&collection_->finish_ops_); } private: @@ -451,10 +524,20 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpSet<CallOpSendInitialMetadata> meta_ops_; - CallOpSet<CallOpRecvMessage<R>> read_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; - CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; + class CallOpSetCollection : public CallOpSetCollectionInterface { + public: + void SetCollection() { + meta_ops_.SetCollection(shared_from_this()); + read_ops_.SetCollection(shared_from_this()); + write_ops_.SetCollection(shared_from_this()); + finish_ops_.SetCollection(shared_from_this()); + } + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; + }; + std::shared_ptr<CallOpSetCollection> collection_; }; } // namespace grpc |