diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc++/impl/codegen/async_stream.h | 129 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/async_unary_call.h | 91 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/call.h | 40 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen.h | 4 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen_interface.h | 4 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/status_helper.h | 47 | ||||
-rw-r--r-- | include/grpc++/server_builder.h | 5 | ||||
-rw-r--r-- | include/grpc/grpc.h | 16 |
8 files changed, 175 insertions, 161 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h index 8f529895ca..f97d824baf 100644 --- a/include/grpc++/impl/codegen/async_stream.h +++ b/include/grpc++/impl/codegen/async_stream.h @@ -145,17 +145,19 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { public: /// Create a stream and write the first request out. template <class W> - ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - const W& request, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_ops_.set_output_tag(tag); - init_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); - init_ops_.ClientSendClose(); - call_.PerformOps(&init_ops_); + static ClientAsyncReader* Create(ChannelInterface* channel, + CompletionQueue* cq, const RpcMethod& method, + ClientContext* context, const W& request, + void* tag) { + Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReader))) + ClientAsyncReader(call, context, request, tag); + } + + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(ClientAsyncReader)); } void ReadInitialMetadata(void* tag) override { @@ -185,6 +187,19 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { } private: + template <class W> + ClientAsyncReader(Call call, ClientContext* context, const W& request, + void* tag) + : context_(context), call_(call) { + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); + init_ops_.ClientSendClose(); + call_.PerformOps(&init_ops_); + } + ClientContext* context_; Call call_; CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> @@ -210,23 +225,19 @@ template <class W> class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { public: template <class R> - ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - R* response, void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - finish_ops_.RecvMessage(response); - finish_ops_.AllowNoMessage(); - // if corked bit is set in context, we buffer up the initial metadata to - // coalesce with later message to be sent. No op is performed. - if (context_->initial_metadata_corked_) { - write_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - } else { - write_ops_.set_output_tag(tag); - write_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&write_ops_); - } + static ClientAsyncWriter* Create(ChannelInterface* channel, + CompletionQueue* cq, const RpcMethod& method, + ClientContext* context, R* response, + void* tag) { + Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncWriter))) + ClientAsyncWriter(call, context, response, tag); + } + + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(ClientAsyncWriter)); } void ReadInitialMetadata(void* tag) override { @@ -271,6 +282,24 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { } private: + template <class R> + ClientAsyncWriter(Call call, ClientContext* context, R* response, void* tag) + : context_(context), call_(call) { + finish_ops_.RecvMessage(response); + finish_ops_.AllowNoMessage(); + // if corked bit is set in context, we buffer up the initial metadata to + // coalesce with later message to be sent. No op is performed. + if (context_->initial_metadata_corked_) { + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + } else { + write_ops_.set_output_tag(tag); + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&write_ops_); + } + } + ClientContext* context_; Call call_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_; @@ -298,21 +327,20 @@ template <class W, class R> class ClientAsyncReaderWriter final : public ClientAsyncReaderWriterInterface<W, R> { public: - ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - void* tag) - : context_(context), call_(channel->CreateCall(method, context, cq)) { - if (context_->initial_metadata_corked_) { - // if corked bit is set in context, we buffer up the initial metadata to - // coalesce with later message to be sent. No op is performed. - write_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - } else { - write_ops_.set_output_tag(tag); - write_ops_.SendInitialMetadata(context->send_initial_metadata_, - context->initial_metadata_flags()); - call_.PerformOps(&write_ops_); - } + static ClientAsyncReaderWriter* Create(ChannelInterface* channel, + CompletionQueue* cq, + const RpcMethod& method, + ClientContext* context, void* tag) { + Call call = channel->CreateCall(method, context, cq); + + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncReaderWriter))) + ClientAsyncReaderWriter(call, context, tag); + } + + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(ClientAsyncReaderWriter)); } void ReadInitialMetadata(void* tag) override { @@ -366,6 +394,21 @@ class ClientAsyncReaderWriter final } private: + ClientAsyncReaderWriter(Call call, ClientContext* context, void* tag) + : context_(context), call_(call) { + if (context_->initial_metadata_corked_) { + // if corked bit is set in context, we buffer up the initial metadata to + // coalesce with later message to be sent. No op is performed. + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + } else { + write_ops_.set_output_tag(tag); + write_ops_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + call_.PerformOps(&write_ops_); + } + } + ClientContext* context_; Call call_; CallOpSet<CallOpRecvInitialMetadata> meta_ops_; diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index b77a16b699..a147a6acbf 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -34,6 +34,7 @@ #ifndef GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H #define GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H +#include <assert.h> #include <grpc++/impl/codegen/call.h> #include <grpc++/impl/codegen/channel_interface.h> #include <grpc++/impl/codegen/client_context.h> @@ -59,57 +60,67 @@ class ClientAsyncResponseReader final : public ClientAsyncResponseReaderInterface<R> { public: template <class W> - ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, - const RpcMethod& method, ClientContext* context, - const W& request) - : context_(context), - call_(channel->CreateCall(method, context, cq)), - collection_(std::make_shared<CallOpSetCollection>()) { - collection_->init_buf_.SetCollection(collection_); - collection_->init_buf_.SendInitialMetadata( - context->send_initial_metadata_, context->initial_metadata_flags()); - // TODO(ctiller): don't assert - GPR_CODEGEN_ASSERT(collection_->init_buf_.SendMessage(request).ok()); - collection_->init_buf_.ClientSendClose(); - call_.PerformOps(&collection_->init_buf_); + static ClientAsyncResponseReader* Create(ChannelInterface* channel, + CompletionQueue* cq, + const RpcMethod& method, + ClientContext* context, + const W& request) { + Call call = channel->CreateCall(method, context, cq); + return new (g_core_codegen_interface->grpc_call_arena_alloc( + call.call(), sizeof(ClientAsyncResponseReader))) + ClientAsyncResponseReader(call, context, request); + } + + // always allocated against a call arena, no memory free required + static void operator delete(void* ptr, std::size_t size) { + assert(size == sizeof(ClientAsyncResponseReader)); } void ReadInitialMetadata(void* tag) { GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); - collection_->meta_buf_.SetCollection(collection_); - collection_->meta_buf_.set_output_tag(tag); - collection_->meta_buf_.RecvInitialMetadata(context_); - call_.PerformOps(&collection_->meta_buf_); + meta_buf_.set_output_tag(tag); + meta_buf_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_buf_); } void Finish(R* msg, Status* status, void* tag) { - collection_->finish_buf_.SetCollection(collection_); - collection_->finish_buf_.set_output_tag(tag); + finish_buf_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - collection_->finish_buf_.RecvInitialMetadata(context_); + finish_buf_.RecvInitialMetadata(context_); } - collection_->finish_buf_.RecvMessage(msg); - collection_->finish_buf_.AllowNoMessage(); - collection_->finish_buf_.ClientRecvStatus(context_, status); - call_.PerformOps(&collection_->finish_buf_); + finish_buf_.RecvMessage(msg); + finish_buf_.AllowNoMessage(); + finish_buf_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_buf_); } private: - ClientContext* context_; + ClientContext* const context_; Call call_; - class CallOpSetCollection : public CallOpSetCollectionInterface { - public: - SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, - CallOpClientSendClose> - init_buf_; - CallOpSet<CallOpRecvInitialMetadata> meta_buf_; - CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, - CallOpClientRecvStatus> - finish_buf_; - }; - std::shared_ptr<CallOpSetCollection> collection_; + template <class W> + ClientAsyncResponseReader(Call call, ClientContext* context, const W& request) + : context_(context), call_(call) { + init_buf_.SendInitialMetadata(context->send_initial_metadata_, + context->initial_metadata_flags()); + // TODO(ctiller): don't assert + GPR_CODEGEN_ASSERT(init_buf_.SendMessage(request).ok()); + init_buf_.ClientSendClose(); + call_.PerformOps(&init_buf_); + } + + // disable operator new + static void* operator new(std::size_t size); + static void* operator new(std::size_t size, void* p) { return p; }; + + SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> + init_buf_; + CallOpSet<CallOpRecvInitialMetadata> meta_buf_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, + CallOpClientRecvStatus> + finish_buf_; }; template <class W> @@ -179,4 +190,12 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface { } // namespace grpc +namespace std { +template <class R> +class default_delete<grpc::ClientAsyncResponseReader<R>> { + public: + void operator()(void* p) {} +}; +} + #endif // GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index dd63c21ff1..f334ba61d6 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -34,6 +34,7 @@ #ifndef GRPCXX_IMPL_CODEGEN_CALL_H #define GRPCXX_IMPL_CODEGEN_CALL_H +#include <assert.h> #include <cstring> #include <functional> #include <map> @@ -47,9 +48,9 @@ #include <grpc++/impl/codegen/serialization_traits.h> #include <grpc++/impl/codegen/slice.h> #include <grpc++/impl/codegen/status.h> -#include <grpc++/impl/codegen/status_helper.h> #include <grpc++/impl/codegen/string_ref.h> +#include <grpc/impl/codegen/atm.h> #include <grpc/impl/codegen/compression_types.h> #include <grpc/impl/codegen/grpc_types.h> @@ -246,8 +247,10 @@ class CallOpSendInitialMetadata { op->data.send_initial_metadata.metadata = initial_metadata_; op->data.send_initial_metadata.maybe_compression_level.is_set = maybe_compression_level_.is_set; - op->data.send_initial_metadata.maybe_compression_level.level = - maybe_compression_level_.level; + if (maybe_compression_level_.is_set) { + op->data.send_initial_metadata.maybe_compression_level.level = + maybe_compression_level_.level; + } } void FinishOp(bool* status) { if (!send_) return; @@ -468,7 +471,7 @@ class CallOpServerSendStatus { trailing_metadata_ = FillMetadataArray( trailing_metadata, &trailing_metadata_count_, send_error_details_); send_status_available_ = true; - send_status_code_ = static_cast<grpc_status_code>(GetCanonicalCode(status)); + send_status_code_ = static_cast<grpc_status_code>(status.error_code()); send_error_message_ = status.error_message(); } @@ -579,17 +582,6 @@ class CallOpClientRecvStatus { grpc_slice error_message_; }; -/// An abstract collection of CallOpSet's, to be used whenever -/// CallOpSet objects must be thought of as a group. Each member -/// of the group should have a shared_ptr back to the collection, -/// as will the object that instantiates the collection, allowing -/// for automatic ref-counting. In practice, any actual use should -/// derive from this base class. This is specifically necessary if -/// some of the CallOpSet's in the collection are "Sneaky" and don't -/// report back to the C++ layer CQ operations -class CallOpSetCollectionInterface - : public std::enable_shared_from_this<CallOpSetCollectionInterface> {}; - /// An abstract collection of call ops, used to generate the /// grpc_call_op structure to pass down to the lower layers, /// and as it is-a CompletionQueueTag, also massages the final @@ -597,18 +589,9 @@ class CallOpSetCollectionInterface /// API. class CallOpSetInterface : public CompletionQueueTag { public: - CallOpSetInterface() {} /// Fills in grpc_op, starting from ops[*nops] and moving /// upwards. - virtual void FillOps(grpc_op* ops, size_t* nops) = 0; - - /// Mark this as belonging to a collection if needed - void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) { - collection_ = collection; - } - - protected: - std::shared_ptr<CallOpSetCollectionInterface> collection_; + virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0; }; /// Primary implementaiton of CallOpSetInterface. @@ -629,13 +612,15 @@ class CallOpSet : public CallOpSetInterface, public Op6 { public: CallOpSet() : return_tag_(this) {} - void FillOps(grpc_op* ops, size_t* nops) override { + void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override { this->Op1::AddOp(ops, nops); this->Op2::AddOp(ops, nops); this->Op3::AddOp(ops, nops); this->Op4::AddOp(ops, nops); this->Op5::AddOp(ops, nops); this->Op6::AddOp(ops, nops); + g_core_codegen_interface->grpc_call_ref(call); + call_ = call; } bool FinalizeResult(void** tag, bool* status) override { @@ -646,7 +631,7 @@ class CallOpSet : public CallOpSetInterface, this->Op5::FinishOp(status); this->Op6::FinishOp(status); *tag = return_tag_; - collection_.reset(); // drop the ref at this point + g_core_codegen_interface->grpc_call_unref(call_); return true; } @@ -654,6 +639,7 @@ class CallOpSet : public CallOpSetInterface, private: void* return_tag_; + grpc_call* call_; }; /// A CallOpSet that does not post completions to the completion queue. diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h index 65151590b2..86601076bd 100644 --- a/include/grpc++/impl/codegen/core_codegen.h +++ b/include/grpc++/impl/codegen/core_codegen.h @@ -68,6 +68,10 @@ class CoreCodegen : public CoreCodegenInterface { void gpr_cv_signal(gpr_cv* cv) override; void gpr_cv_broadcast(gpr_cv* cv) override; + void grpc_call_ref(grpc_call* call) override; + void grpc_call_unref(grpc_call* call) override; + virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) override; + void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override; int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index 529bef687b..a0665f4e06 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -96,6 +96,10 @@ class CoreCodegenInterface { virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice, size_t nslices) = 0; + virtual void grpc_call_ref(grpc_call* call) = 0; + virtual void grpc_call_unref(grpc_call* call) = 0; + virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) = 0; + virtual grpc_slice grpc_empty_slice() = 0; virtual grpc_slice grpc_slice_malloc(size_t length) = 0; virtual void grpc_slice_unref(grpc_slice slice) = 0; diff --git a/include/grpc++/impl/codegen/status_helper.h b/include/grpc++/impl/codegen/status_helper.h deleted file mode 100644 index bfe45d9e5b..0000000000 --- a/include/grpc++/impl/codegen/status_helper.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPCXX_IMPL_CODEGEN_STATUS_HELPER_H -#define GRPCXX_IMPL_CODEGEN_STATUS_HELPER_H - -#include <grpc++/impl/codegen/status.h> - -namespace grpc { - -inline StatusCode GetCanonicalCode(const Status& status) { - return status.error_code(); -} - -} // namespace grpc - -#endif // GRPCXX_IMPL_CODEGEN_STATUS_HELPER_H diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index d707100a52..7ac23349c8 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -195,10 +195,7 @@ class ServerBuilder { struct SyncServerSettings { SyncServerSettings() - : num_cqs(1), - min_pollers(1), - max_pollers(INT_MAX), - cq_timeout_msec(1000) {} + : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {} // Number of server completion queues to create to listen to incoming RPCs. int num_cqs; diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 9a65d34ce3..5ded451540 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -265,6 +265,10 @@ GRPCAPI grpc_call *grpc_channel_create_registered_call( grpc_completion_queue *completion_queue, void *registered_call_handle, gpr_timespec deadline, void *reserved); +/** Allocate memory in the grpc_call arena: this memory is automatically + discarded at call completion */ +GRPCAPI void *grpc_call_arena_alloc(grpc_call *call, size_t size); + /** Start a batch of operations defined in the array ops; when complete, post a completion of type 'tag' to the completion queue bound to the call. The order of ops specified in the batch has no significance. @@ -341,7 +345,7 @@ GRPCAPI void grpc_channel_destroy(grpc_channel *channel); /** Called by clients to cancel an RPC on the server. Can be called multiple times, from any thread. THREAD-SAFETY grpc_call_cancel and grpc_call_cancel_with_status - are thread-safe, and can be called at any point before grpc_call_destroy + are thread-safe, and can be called at any point before grpc_call_unref is called.*/ GRPCAPI grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved); @@ -356,9 +360,13 @@ GRPCAPI grpc_call_error grpc_call_cancel_with_status(grpc_call *call, const char *description, void *reserved); -/** Destroy a call. - THREAD SAFETY: grpc_call_destroy is thread-compatible */ -GRPCAPI void grpc_call_destroy(grpc_call *call); +/** Ref a call. + THREAD SAFETY: grpc_call_unref is thread-compatible */ +GRPCAPI void grpc_call_ref(grpc_call *call); + +/** Unref a call. + THREAD SAFETY: grpc_call_unref is thread-compatible */ +GRPCAPI void grpc_call_unref(grpc_call *call); /** Request notification of a new call. Once a call is received, a notification tagged with \a tag_new is added to |