aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
Diffstat (limited to 'include')
-rw-r--r--include/grpc++/impl/codegen/async_unary_call.h91
-rw-r--r--include/grpc++/impl/codegen/call.h31
-rw-r--r--include/grpc++/impl/codegen/core_codegen.h3
-rw-r--r--include/grpc++/impl/codegen/core_codegen_interface.h3
-rw-r--r--include/grpc/grpc.h16
5 files changed, 82 insertions, 62 deletions
diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h
index b77a16b699..b120b37f1f 100644
--- a/include/grpc++/impl/codegen/async_unary_call.h
+++ b/include/grpc++/impl/codegen/async_unary_call.h
@@ -34,6 +34,7 @@
#ifndef GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
#define GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
+#include <assert.h>
#include <grpc++/impl/codegen/call.h>
#include <grpc++/impl/codegen/channel_interface.h>
#include <grpc++/impl/codegen/client_context.h>
@@ -41,6 +42,8 @@
#include <grpc++/impl/codegen/service_type.h>
#include <grpc++/impl/codegen/status.h>
+extern "C" void* grpc_call_arena_alloc(grpc_call* call, size_t size);
+
namespace grpc {
class CompletionQueue;
@@ -59,57 +62,67 @@ class ClientAsyncResponseReader final
: public ClientAsyncResponseReaderInterface<R> {
public:
template <class W>
- ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
- const RpcMethod& method, ClientContext* context,
- const W& request)
- : context_(context),
- call_(channel->CreateCall(method, context, cq)),
- collection_(std::make_shared<CallOpSetCollection>()) {
- collection_->init_buf_.SetCollection(collection_);
- collection_->init_buf_.SendInitialMetadata(
- context->send_initial_metadata_, context->initial_metadata_flags());
+ static ClientAsyncResponseReader* Create(ChannelInterface* channel,
+ CompletionQueue* cq,
+ const RpcMethod& method,
+ ClientContext* context,
+ const W& request) {
+ Call call = channel->CreateCall(method, context, cq);
+ ClientAsyncResponseReader* reader =
+ new (grpc_call_arena_alloc(call.call(), sizeof(*reader)))
+ ClientAsyncResponseReader(call, context);
+
+ reader->init_buf_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
// TODO(ctiller): don't assert
- GPR_CODEGEN_ASSERT(collection_->init_buf_.SendMessage(request).ok());
- collection_->init_buf_.ClientSendClose();
- call_.PerformOps(&collection_->init_buf_);
+ GPR_CODEGEN_ASSERT(reader->init_buf_.SendMessage(request).ok());
+ reader->init_buf_.ClientSendClose();
+ reader->call_.PerformOps(&reader->init_buf_);
+ return reader;
+ }
+
+ // always allocated against a call arena, no memory free required
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(ClientAsyncResponseReader));
}
void ReadInitialMetadata(void* tag) {
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
- collection_->meta_buf_.SetCollection(collection_);
- collection_->meta_buf_.set_output_tag(tag);
- collection_->meta_buf_.RecvInitialMetadata(context_);
- call_.PerformOps(&collection_->meta_buf_);
+ meta_buf_.set_output_tag(tag);
+ meta_buf_.RecvInitialMetadata(context_);
+ call_.PerformOps(&meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
- collection_->finish_buf_.SetCollection(collection_);
- collection_->finish_buf_.set_output_tag(tag);
+ finish_buf_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- collection_->finish_buf_.RecvInitialMetadata(context_);
+ finish_buf_.RecvInitialMetadata(context_);
}
- collection_->finish_buf_.RecvMessage(msg);
- collection_->finish_buf_.AllowNoMessage();
- collection_->finish_buf_.ClientRecvStatus(context_, status);
- call_.PerformOps(&collection_->finish_buf_);
+ finish_buf_.RecvMessage(msg);
+ finish_buf_.AllowNoMessage();
+ finish_buf_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&finish_buf_);
}
private:
- ClientContext* context_;
+ ClientContext* const context_;
Call call_;
- class CallOpSetCollection : public CallOpSetCollectionInterface {
- public:
- SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose>
- init_buf_;
- CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
- CallOpClientRecvStatus>
- finish_buf_;
- };
- std::shared_ptr<CallOpSetCollection> collection_;
+ ClientAsyncResponseReader(Call call, ClientContext* context)
+ : context_(context), call_(call) {}
+
+ // disable operator new
+ static void* operator new(std::size_t size);
+ static void* operator new(std::size_t size, void* p) { return p; };
+
+ SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose>
+ init_buf_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
+ CallOpClientRecvStatus>
+ finish_buf_;
};
template <class W>
@@ -179,4 +192,12 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
} // namespace grpc
+namespace std {
+template <class R>
+class default_delete<grpc::ClientAsyncResponseReader<R>> {
+ public:
+ void operator()(void* p) {}
+};
+}
+
#endif // GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index a3f2be6bb1..56dd7b9685 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -34,6 +34,7 @@
#ifndef GRPCXX_IMPL_CODEGEN_CALL_H
#define GRPCXX_IMPL_CODEGEN_CALL_H
+#include <assert.h>
#include <cstring>
#include <functional>
#include <map>
@@ -50,6 +51,7 @@
#include <grpc++/impl/codegen/status_helper.h>
#include <grpc++/impl/codegen/string_ref.h>
+#include <grpc/impl/codegen/atm.h>
#include <grpc/impl/codegen/compression_types.h>
#include <grpc/impl/codegen/grpc_types.h>
@@ -559,17 +561,6 @@ class CallOpClientRecvStatus {
grpc_slice status_details_;
};
-/// An abstract collection of CallOpSet's, to be used whenever
-/// CallOpSet objects must be thought of as a group. Each member
-/// of the group should have a shared_ptr back to the collection,
-/// as will the object that instantiates the collection, allowing
-/// for automatic ref-counting. In practice, any actual use should
-/// derive from this base class. This is specifically necessary if
-/// some of the CallOpSet's in the collection are "Sneaky" and don't
-/// report back to the C++ layer CQ operations
-class CallOpSetCollectionInterface
- : public std::enable_shared_from_this<CallOpSetCollectionInterface> {};
-
/// An abstract collection of call ops, used to generate the
/// grpc_call_op structure to pass down to the lower layers,
/// and as it is-a CompletionQueueTag, also massages the final
@@ -577,18 +568,9 @@ class CallOpSetCollectionInterface
/// API.
class CallOpSetInterface : public CompletionQueueTag {
public:
- CallOpSetInterface() {}
/// Fills in grpc_op, starting from ops[*nops] and moving
/// upwards.
- virtual void FillOps(grpc_op* ops, size_t* nops) = 0;
-
- /// Mark this as belonging to a collection if needed
- void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) {
- collection_ = collection;
- }
-
- protected:
- std::shared_ptr<CallOpSetCollectionInterface> collection_;
+ virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0;
};
/// Primary implementaiton of CallOpSetInterface.
@@ -609,13 +591,15 @@ class CallOpSet : public CallOpSetInterface,
public Op6 {
public:
CallOpSet() : return_tag_(this) {}
- void FillOps(grpc_op* ops, size_t* nops) override {
+ void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override {
this->Op1::AddOp(ops, nops);
this->Op2::AddOp(ops, nops);
this->Op3::AddOp(ops, nops);
this->Op4::AddOp(ops, nops);
this->Op5::AddOp(ops, nops);
this->Op6::AddOp(ops, nops);
+ g_core_codegen_interface->grpc_call_ref(call);
+ call_ = call;
}
bool FinalizeResult(void** tag, bool* status) override {
@@ -626,7 +610,7 @@ class CallOpSet : public CallOpSetInterface,
this->Op5::FinishOp(status);
this->Op6::FinishOp(status);
*tag = return_tag_;
- collection_.reset(); // drop the ref at this point
+ g_core_codegen_interface->grpc_call_unref(call_);
return true;
}
@@ -634,6 +618,7 @@ class CallOpSet : public CallOpSetInterface,
private:
void* return_tag_;
+ grpc_call* call_;
};
/// A CallOpSet that does not post completions to the completion queue.
diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h
index 754bf14b25..b579849aca 100644
--- a/include/grpc++/impl/codegen/core_codegen.h
+++ b/include/grpc++/impl/codegen/core_codegen.h
@@ -64,6 +64,9 @@ class CoreCodegen : public CoreCodegenInterface {
void gpr_cv_signal(gpr_cv* cv) override;
void gpr_cv_broadcast(gpr_cv* cv) override;
+ void grpc_call_ref(grpc_call* call) override;
+ void grpc_call_unref(grpc_call* call) override;
+
void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override;
int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h
index 45ea040303..12464591a4 100644
--- a/include/grpc++/impl/codegen/core_codegen_interface.h
+++ b/include/grpc++/impl/codegen/core_codegen_interface.h
@@ -94,6 +94,9 @@ class CoreCodegenInterface {
virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice,
size_t nslices) = 0;
+ virtual void grpc_call_ref(grpc_call* call) = 0;
+ virtual void grpc_call_unref(grpc_call* call) = 0;
+
virtual grpc_slice grpc_slice_malloc(size_t length) = 0;
virtual void grpc_slice_unref(grpc_slice slice) = 0;
virtual grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) = 0;
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index e088435d6c..3af0ca25c9 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -263,6 +263,10 @@ GRPCAPI grpc_call *grpc_channel_create_registered_call(
grpc_completion_queue *completion_queue, void *registered_call_handle,
gpr_timespec deadline, void *reserved);
+/** Allocate memory in the grpc_call arena: this memory is automatically
+ discarded at call completion */
+GRPCAPI void *grpc_call_arena_alloc(grpc_call *call, size_t size);
+
/** Start a batch of operations defined in the array ops; when complete, post a
completion of type 'tag' to the completion queue bound to the call.
The order of ops specified in the batch has no significance.
@@ -345,7 +349,7 @@ GRPCAPI void grpc_channel_destroy(grpc_channel *channel);
/** Called by clients to cancel an RPC on the server.
Can be called multiple times, from any thread.
THREAD-SAFETY grpc_call_cancel and grpc_call_cancel_with_status
- are thread-safe, and can be called at any point before grpc_call_destroy
+ are thread-safe, and can be called at any point before grpc_call_unref
is called.*/
GRPCAPI grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved);
@@ -360,9 +364,13 @@ GRPCAPI grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
const char *description,
void *reserved);
-/** Destroy a call.
- THREAD SAFETY: grpc_call_destroy is thread-compatible */
-GRPCAPI void grpc_call_destroy(grpc_call *call);
+/** Ref a call.
+ THREAD SAFETY: grpc_call_unref is thread-compatible */
+GRPCAPI void grpc_call_ref(grpc_call *call);
+
+/** Unref a call.
+ THREAD SAFETY: grpc_call_unref is thread-compatible */
+GRPCAPI void grpc_call_unref(grpc_call *call);
/** Request notification of a new call.
Once a call is received, a notification tagged with \a tag_new is added to