aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2016-02-05 13:40:22 -0800
committerGravatar Vijay Pai <vpai@google.com>2016-02-05 13:40:22 -0800
commit38004a8e399118b523fdebb7d7c1144355594b01 (patch)
treefc88f23167e93fda79f9a9cd9ce4b8b287d2df6e /include
parent29cc1e46eba483f699ba92c68294930cb319457b (diff)
Wrap groups of related CallOpSet's into a ref-counted structure
whenever appropriate so as to avoid any unintentional free-before-use problems. Potential performance issue: this triggers an additional allocation for each Async call initiation, along with the cost of ref-counting shared_ptr . But this is worth it for the additional safety provided here without any change to the exposed C++ API.
Diffstat (limited to 'include')
-rw-r--r--include/grpc++/impl/codegen/async_stream.h341
-rw-r--r--include/grpc++/impl/codegen/async_unary_call.h96
-rw-r--r--include/grpc++/impl/codegen/call.h15
3 files changed, 288 insertions, 164 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index b0410485f8..e449b92bb1 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -105,49 +105,62 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const W& request, void* tag)
- : context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ : context_(context),
+ call_(channel->CreateCall(method, context, cq)),
+ collection_(new CallOpSetCollection) {
+ collection_->SetCollection();
+ collection_->init_ops_.set_output_tag(tag);
+ collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
- GPR_ASSERT(init_ops_.SendMessage(request).ok());
- init_ops_.ClientSendClose();
- call_.PerformOps(&init_ops_);
+ GPR_ASSERT(collection_->init_ops_.SendMessage(request).ok());
+ collection_->init_ops_.ClientSendClose();
+ call_.PerformOps(&collection_->init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_ops_.set_output_tag(tag);
- meta_ops_.RecvInitialMetadata(context_);
- call_.PerformOps(&meta_ops_);
+ collection_->meta_ops_.set_output_tag(tag);
+ collection_->meta_ops_.RecvInitialMetadata(context_);
+ call_.PerformOps(&collection_->meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_ops_.set_output_tag(tag);
+ collection_->read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- read_ops_.RecvInitialMetadata(context_);
+ collection_->read_ops_.RecvInitialMetadata(context_);
}
- read_ops_.RecvMessage(msg);
- call_.PerformOps(&read_ops_);
+ collection_->read_ops_.RecvMessage(msg);
+ call_.PerformOps(&collection_->read_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_ops_.set_output_tag(tag);
+ collection_->finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_ops_.RecvInitialMetadata(context_);
+ collection_->finish_ops_.RecvInitialMetadata(context_);
}
- finish_ops_.ClientRecvStatus(context_, status);
- call_.PerformOps(&finish_ops_);
+ collection_->finish_ops_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&collection_->finish_ops_);
}
private:
ClientContext* context_;
Call call_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
- init_ops_;
- CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
+ class CallOpSetCollection : public CallOpSetCollectionInterface {
+ public:
+ void SetCollection() {
+ init_ops_.SetCollection(shared_from_this());
+ meta_ops_.SetCollection(shared_from_this());
+ read_ops_.SetCollection(shared_from_this());
+ finish_ops_.SetCollection(shared_from_this());
+ }
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose> init_ops_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
+ };
+ std::shared_ptr<CallOpSetCollection> collection_;
};
/// Common interface for client side asynchronous writing.
@@ -168,53 +181,67 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
R* response, void* tag)
- : context_(context), call_(channel->CreateCall(method, context, cq)) {
- finish_ops_.RecvMessage(response);
-
- init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_);
- call_.PerformOps(&init_ops_);
+ : context_(context),
+ call_(channel->CreateCall(method, context, cq)),
+ collection_(new CallOpSetCollection) {
+ collection_->SetCollection();
+ collection_->finish_ops_.RecvMessage(response);
+
+ collection_->init_ops_.set_output_tag(tag);
+ collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&collection_->init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_ops_.set_output_tag(tag);
- meta_ops_.RecvInitialMetadata(context_);
- call_.PerformOps(&meta_ops_);
+ collection_->meta_ops_.set_output_tag(tag);
+ collection_->meta_ops_.RecvInitialMetadata(context_);
+ call_.PerformOps(&collection_->meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_ops_.set_output_tag(tag);
+ collection_->write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert
- GPR_ASSERT(write_ops_.SendMessage(msg).ok());
- call_.PerformOps(&write_ops_);
+ GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok());
+ call_.PerformOps(&collection_->write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
- writes_done_ops_.set_output_tag(tag);
- writes_done_ops_.ClientSendClose();
- call_.PerformOps(&writes_done_ops_);
+ collection_->writes_done_ops_.set_output_tag(tag);
+ collection_->writes_done_ops_.ClientSendClose();
+ call_.PerformOps(&collection_->writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_ops_.set_output_tag(tag);
+ collection_->finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_ops_.RecvInitialMetadata(context_);
+ collection_->finish_ops_.RecvInitialMetadata(context_);
}
- finish_ops_.ClientRecvStatus(context_, status);
- call_.PerformOps(&finish_ops_);
+ collection_->finish_ops_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&collection_->finish_ops_);
}
private:
ClientContext* context_;
Call call_;
- CallOpSet<CallOpSendInitialMetadata> init_ops_;
- CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendMessage> write_ops_;
- CallOpSet<CallOpClientSendClose> writes_done_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
- CallOpClientRecvStatus> finish_ops_;
+ class CallOpSetCollection : public CallOpSetCollectionInterface {
+ public:
+ void SetCollection() {
+ init_ops_.SetCollection(shared_from_this());
+ meta_ops_.SetCollection(shared_from_this());
+ write_ops_.SetCollection(shared_from_this());
+ writes_done_ops_.SetCollection(shared_from_this());
+ finish_ops_.SetCollection(shared_from_this();
+ }
+ CallOpSet<CallOpSendInitialMetadata> init_ops_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+ CallOpSet<CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
+ CallOpClientRecvStatus> finish_ops_;
+ };
+ std::shared_ptr<CallOpSetCollection> collection_;
};
/// Client-side interface for asynchronous bi-directional streaming.
@@ -236,60 +263,75 @@ class ClientAsyncReaderWriter GRPC_FINAL
ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
void* tag)
- : context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_);
- call_.PerformOps(&init_ops_);
+ : context_(context),
+ call_(channel->CreateCall(method, context, cq)),
+ collection_(new CallOpSetCollection) {
+ collection_->SetCollection();
+ collection_->init_ops_.set_output_tag(tag);
+ collection_->init_ops_.SendInitialMetadata(context->send_initial_metadata_);
+ call_.PerformOps(&collection_->init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_ops_.set_output_tag(tag);
- meta_ops_.RecvInitialMetadata(context_);
- call_.PerformOps(&meta_ops_);
+ collection_->meta_ops_.set_output_tag(tag);
+ collection_->meta_ops_.RecvInitialMetadata(context_);
+ call_.PerformOps(&collection_->meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_ops_.set_output_tag(tag);
+ collection_->read_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- read_ops_.RecvInitialMetadata(context_);
+ collection_->read_ops_.RecvInitialMetadata(context_);
}
- read_ops_.RecvMessage(msg);
- call_.PerformOps(&read_ops_);
+ collection_->read_ops_.RecvMessage(msg);
+ call_.PerformOps(&collection_->read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_ops_.set_output_tag(tag);
+ collection_->write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert
- GPR_ASSERT(write_ops_.SendMessage(msg).ok());
- call_.PerformOps(&write_ops_);
+ GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok());
+ call_.PerformOps(&collection_->write_ops_);
}
void WritesDone(void* tag) GRPC_OVERRIDE {
- writes_done_ops_.set_output_tag(tag);
- writes_done_ops_.ClientSendClose();
- call_.PerformOps(&writes_done_ops_);
+ collection_->writes_done_ops_.set_output_tag(tag);
+ collection_->writes_done_ops_.ClientSendClose();
+ call_.PerformOps(&collection_->writes_done_ops_);
}
void Finish(Status* status, void* tag) GRPC_OVERRIDE {
- finish_ops_.set_output_tag(tag);
+ collection_->finish_ops_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_ops_.RecvInitialMetadata(context_);
+ collection_->finish_ops_.RecvInitialMetadata(context_);
}
- finish_ops_.ClientRecvStatus(context_, status);
- call_.PerformOps(&finish_ops_);
+ collection_->finish_ops_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&collection_->finish_ops_);
}
private:
ClientContext* context_;
Call call_;
- CallOpSet<CallOpSendInitialMetadata> init_ops_;
- CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendMessage> write_ops_;
- CallOpSet<CallOpClientSendClose> writes_done_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
+ class CallOpSetCollection : public CallOpSetCollectionInterface {
+ public:
+ void SetCollection() {
+ init_ops_.SetCollection(shared_from_this());
+ meta_ops_.SetCollection(shared_from_this());
+ read_ops_.SetCollection(shared_from_this());
+ write_ops_.SetCollection(shared_from_this());
+ writes_done_ops_.SetCollection(shared_from_this());
+ finish_ops_.SetCollection(shared_from_this();
+ }
+ CallOpSet<CallOpSendInitialMetadata> init_ops_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpClientSendClose> writes_done_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
+ };
+ std::shared_ptr<CallOpSetCollection> collection_;
};
template <class W, class R>
@@ -297,48 +339,53 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {
public:
explicit ServerAsyncReader(ServerContext* ctx)
- : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+ : call_(nullptr, nullptr, nullptr),
+ ctx_(ctx),
+ collection_(new CallOpSetCollection) {
+ collection_->SetCollection();
+ }
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_ops_.set_output_tag(tag);
- meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ collection_->meta_ops_.set_output_tag(tag);
+ collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_ops_);
+ call_.PerformOps(&collection_->meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_ops_.set_output_tag(tag);
- read_ops_.RecvMessage(msg);
- call_.PerformOps(&read_ops_);
+ collection_->read_ops_.set_output_tag(tag);
+ collection_->read_ops_.RecvMessage(msg);
+ call_.PerformOps(&collection_->read_ops_);
}
void Finish(const W& msg, const Status& status, void* tag) {
- finish_ops_.set_output_tag(tag);
+ collection_->finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.ok()) {
- finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
- finish_ops_.SendMessage(msg));
+ collection_->finish_ops_.ServerSendStatus(
+ ctx_->trailing_metadata_, collection_->finish_ops_.SendMessage(msg));
} else {
- finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
+ status);
}
- call_.PerformOps(&finish_ops_);
+ call_.PerformOps(&collection_->finish_ops_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.ok());
- finish_ops_.set_output_tag(tag);
+ collection_->finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_ops_);
+ collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&collection_->finish_ops_);
}
private:
@@ -346,10 +393,19 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
- CallOpSet<CallOpSendInitialMetadata> meta_ops_;
- CallOpSet<CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus> finish_ops_;
+ class CallOpSetCollection : public CallOpSetCollectionInterface {
+ public:
+ void SetCollection() {
+ meta_ops_.SetCollection(shared_from_this());
+ read_ops_.SetCollection(shared_from_this());
+ finish_ops_.SetCollection(shared_from_this();
+ }
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus> finish_ops_;
+ };
+ std::shared_ptr<CallOpSetCollection> collection_;
};
template <class W>
@@ -357,36 +413,40 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
public AsyncWriterInterface<W> {
public:
explicit ServerAsyncWriter(ServerContext* ctx)
- : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+ : call_(nullptr, nullptr, nullptr),
+ ctx_(ctx),
+ collection_(new CallOpSetCollection) {
+ collection_->SetCollection();
+ }
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_ops_.set_output_tag(tag);
- meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ collection_->meta_ops_.set_output_tag(tag);
+ collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_ops_);
+ call_.PerformOps(&collection_->meta_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_ops_.set_output_tag(tag);
+ collection_->write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
- GPR_ASSERT(write_ops_.SendMessage(msg).ok());
- call_.PerformOps(&write_ops_);
+ GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok());
+ call_.PerformOps(&collection_->write_ops_);
}
void Finish(const Status& status, void* tag) {
- finish_ops_.set_output_tag(tag);
+ collection_->finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_ops_);
+ collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&collection_->finish_ops_);
}
private:
@@ -394,9 +454,18 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
- CallOpSet<CallOpSendInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
+ class CallOpSetCollection : public CallOpSetCollectionInterface {
+ public:
+ void SetCollection() {
+ meta_ops_.SetCollection(shared_from_this());
+ write_ops_.SetCollection(shared_from_this());
+ finish_ops_.SetCollection(shared_from_this();
+ }
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
+ };
+ std::shared_ptr<CallOpSetCollection> collection_;
};
/// Server-side interface for asynchronous bi-directional streaming.
@@ -406,42 +475,46 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
public AsyncReaderInterface<R> {
public:
explicit ServerAsyncReaderWriter(ServerContext* ctx)
- : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+ : call_(nullptr, nullptr, nullptr),
+ ctx_(ctx),
+ collection_(new CallOpSetCollection) {
+ collection_->SetCollection();
+ }
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_ops_.set_output_tag(tag);
- meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ collection_->meta_ops_.set_output_tag(tag);
+ collection_->meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_ops_);
+ call_.PerformOps(&collection_->meta_ops_);
}
void Read(R* msg, void* tag) GRPC_OVERRIDE {
- read_ops_.set_output_tag(tag);
- read_ops_.RecvMessage(msg);
- call_.PerformOps(&read_ops_);
+ collection_->read_ops_.set_output_tag(tag);
+ collection_->read_ops_.RecvMessage(msg);
+ call_.PerformOps(&collection_->read_ops_);
}
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
- write_ops_.set_output_tag(tag);
+ collection_->write_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ collection_->write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
- GPR_ASSERT(write_ops_.SendMessage(msg).ok());
- call_.PerformOps(&write_ops_);
+ GPR_ASSERT(collection_->write_ops_.SendMessage(msg).ok());
+ call_.PerformOps(&collection_->write_ops_);
}
void Finish(const Status& status, void* tag) {
- finish_ops_.set_output_tag(tag);
+ collection_->finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
+ collection_->finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_ops_);
+ collection_->finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&collection_->finish_ops_);
}
private:
@@ -451,10 +524,20 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
Call call_;
ServerContext* ctx_;
- CallOpSet<CallOpSendInitialMetadata> meta_ops_;
- CallOpSet<CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
+ class CallOpSetCollection : public CallOpSetCollectionInterface {
+ public:
+ void SetCollection() {
+ meta_ops_.SetCollection(shared_from_this());
+ read_ops_.SetCollection(shared_from_this());
+ write_ops_.SetCollection(shared_from_this());
+ finish_ops_.SetCollection(shared_from_this();
+ }
+ CallOpSet<CallOpSendInitialMetadata> meta_ops_;
+ CallOpSet<CallOpRecvMessage<R>> read_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
+ };
+ std::shared_ptr<CallOpSetCollection> collection_;
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h
index 481b20b535..c1c637e928 100644
--- a/include/grpc++/impl/codegen/async_unary_call.h
+++ b/include/grpc++/impl/codegen/async_unary_call.h
@@ -62,40 +62,53 @@ class ClientAsyncResponseReader GRPC_FINAL
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const W& request)
- : context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_buf_.SendInitialMetadata(context->send_initial_metadata_);
+ : context_(context),
+ call_(channel->CreateCall(method, context, cq)),
+ collection_(new CallOpSetCollection) {
+ collection_->SetCollection();
+ collection_->init_buf_.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
- GPR_ASSERT(init_buf_.SendMessage(request).ok());
- init_buf_.ClientSendClose();
- call_.PerformOps(&init_buf_);
+ GPR_ASSERT(collection_->init_buf_.SendMessage(request).ok());
+ collection_->init_buf_.ClientSendClose();
+ call_.PerformOps(&collection_->init_buf_);
}
void ReadInitialMetadata(void* tag) {
GPR_ASSERT(!context_->initial_metadata_received_);
- meta_buf_.set_output_tag(tag);
- meta_buf_.RecvInitialMetadata(context_);
- call_.PerformOps(&meta_buf_);
+ collection_->meta_buf_.set_output_tag(tag);
+ collection_->meta_buf_.RecvInitialMetadata(context_);
+ call_.PerformOps(&collection_->meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
- finish_buf_.set_output_tag(tag);
+ collection_->finish_buf_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- finish_buf_.RecvInitialMetadata(context_);
+ collection_->finish_buf_.RecvInitialMetadata(context_);
}
- finish_buf_.RecvMessage(msg);
- finish_buf_.ClientRecvStatus(context_, status);
- call_.PerformOps(&finish_buf_);
+ collection_->finish_buf_.RecvMessage(msg);
+ collection_->finish_buf_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&collection_->finish_buf_);
}
private:
ClientContext* context_;
Call call_;
- SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose> init_buf_;
- CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
- CallOpClientRecvStatus> finish_buf_;
+
+ class CallOpSetCollection : public CallOpSetCollectionInterface {
+ public:
+ void SetCollection() {
+ init_buf_.SetCollection(shared_from_this());
+ meta_buf_.SetCollection(shared_from_this());
+ finish_buf_.SetCollection(shared_from_this());
+ }
+ SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose> init_buf_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
+ CallOpClientRecvStatus> finish_buf_;
+ };
+ std::shared_ptr<CallOpSetCollection> collection_;
};
template <class W>
@@ -103,42 +116,47 @@ class ServerAsyncResponseWriter GRPC_FINAL
: public ServerAsyncStreamingInterface {
public:
explicit ServerAsyncResponseWriter(ServerContext* ctx)
- : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
+ : call_(nullptr, nullptr, nullptr),
+ ctx_(ctx),
+ collection_(new CallOpSetCollection) {
+ collection_->SetCollection();
+ }
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
- meta_buf_.set_output_tag(tag);
- meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+ collection_->meta_buf_.set_output_tag(tag);
+ collection_->meta_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
- call_.PerformOps(&meta_buf_);
+ call_.PerformOps(&collection_->meta_buf_);
}
void Finish(const W& msg, const Status& status, void* tag) {
- finish_buf_.set_output_tag(tag);
+ collection_->finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+ collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
// The response is dropped if the status is not OK.
if (status.ok()) {
- finish_buf_.ServerSendStatus(ctx_->trailing_metadata_,
- finish_buf_.SendMessage(msg));
+ collection_->finish_buf_.ServerSendStatus(
+ ctx_->trailing_metadata_, collection_->finish_buf_.SendMessage(msg));
} else {
- finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_,
+ status);
}
- call_.PerformOps(&finish_buf_);
+ call_.PerformOps(&collection_->finish_buf_);
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.ok());
- finish_buf_.set_output_tag(tag);
+ collection_->finish_buf_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
- finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
+ collection_->finish_buf_.SendInitialMetadata(ctx_->initial_metadata_);
ctx_->sent_initial_metadata_ = true;
}
- finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
- call_.PerformOps(&finish_buf_);
+ collection_->finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status);
+ call_.PerformOps(&collection_->finish_buf_);
}
private:
@@ -146,9 +164,17 @@ class ServerAsyncResponseWriter GRPC_FINAL
Call call_;
ServerContext* ctx_;
- CallOpSet<CallOpSendInitialMetadata> meta_buf_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus> finish_buf_;
+ class CallOpSetCollection : public CallOpSetCollectionInterface {
+ public:
+ void SetCollection() {
+ meta_buf_.SetCollection(shared_from_this());
+ finish_buf_.SetCollection(shared_from_this());
+ }
+ CallOpSet<CallOpSendInitialMetadata> meta_buf_;
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpServerSendStatus> finish_buf_;
+ };
+ std::shared_ptr<CallOpSetCollection> collection_;
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index 1e06768ac4..94a8243ac6 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -472,6 +472,15 @@ class CallOpClientRecvStatus {
size_t status_details_capacity_;
};
+/// An abstract collection of CallOpSet's, to be used whenever
+/// CallOpSet objects must be thought of as a group. Each member
+/// of the group should have a shared_ptr back to the collection,
+/// as will the object that instantiates the collection, allowing
+/// for automatic ref-counting. In practice, any actual use should
+/// derive from this base class
+class CallOpSetCollectionInterface
+ : public std::enable_shared_from_this<CallOpSetCollectionInterface> {};
+
/// An abstract collection of call ops, used to generate the
/// grpc_call_op structure to pass down to the lower layers,
/// and as it is-a CompletionQueueTag, also massages the final
@@ -488,8 +497,14 @@ class CallOpSetInterface : public CompletionQueueTag {
max_message_size_ = max_message_size;
}
+ /// Mark this as belonging to a collection
+ void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) {
+ collection_ = collection;
+ }
+
protected:
int max_message_size_;
+ std::shared_ptr<CallOpSetCollectionInterface> collection_;
};
/// Primary implementaiton of CallOpSetInterface.