diff options
author | Vijay Pai <vpai@google.com> | 2018-10-31 01:02:07 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2018-10-31 01:02:07 -0700 |
commit | 932abf48a3fd0757a05bdbae4038c20fd521f312 (patch) | |
tree | 74c60a14a3d9e6b37de5def54ff51065a893e860 | |
parent | 2f47137a6e4dc05df4e09e47348aadbe21889d13 (diff) |
Address reviewer comments.
-rw-r--r-- | include/grpcpp/impl/codegen/async_unary_call.h | 2 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/call_op_set.h | 22 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/call_op_set_interface.h | 4 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/client_callback.h | 2 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/rpc_service_method.h | 11 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/server_callback.h | 80 | ||||
-rw-r--r-- | src/compiler/cpp_generator.cc | 6 | ||||
-rw-r--r-- | src/cpp/client/channel_cc.cc | 4 | ||||
-rw-r--r-- | src/cpp/common/completion_queue_cc.cc | 10 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 4 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 16 |
11 files changed, 82 insertions, 79 deletions
diff --git a/include/grpcpp/impl/codegen/async_unary_call.h b/include/grpcpp/impl/codegen/async_unary_call.h index 744b128141..89dcb12418 100644 --- a/include/grpcpp/impl/codegen/async_unary_call.h +++ b/include/grpcpp/impl/codegen/async_unary_call.h @@ -240,7 +240,7 @@ class ServerAsyncResponseWriter final /// metadata. void Finish(const W& msg, const Status& status, void* tag) { finish_buf_.set_output_tag(tag); - finish_buf_.set_cq_tag(&finish_buf_); + finish_buf_.set_core_cq_tag(&finish_buf_); if (!ctx_->sent_initial_metadata_) { finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h index 785688e67f..5c52b027b2 100644 --- a/include/grpcpp/impl/codegen/call_op_set.h +++ b/include/grpcpp/impl/codegen/call_op_set.h @@ -770,19 +770,19 @@ class CallOpSet : public CallOpSetInterface, public Op5, public Op6 { public: - CallOpSet() : cq_tag_(this), return_tag_(this) {} + CallOpSet() : core_cq_tag_(this), return_tag_(this) {} // The copy constructor and assignment operator reset the value of - // cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_ since - // those are only meaningful on a specific object, not across objects. + // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_ + // since those are only meaningful on a specific object, not across objects. CallOpSet(const CallOpSet& other) - : cq_tag_(this), + : core_cq_tag_(this), return_tag_(this), call_(other.call_), done_intercepting_(false), interceptor_methods_(InterceptorBatchMethodsImpl()) {} CallOpSet& operator=(const CallOpSet& other) { - cq_tag_ = this; + core_cq_tag_ = this; return_tag_ = this; call_ = other.call_; done_intercepting_ = false; @@ -834,13 +834,13 @@ class CallOpSet : public CallOpSetInterface, void set_output_tag(void* return_tag) { return_tag_ = return_tag; } - void* cq_tag() override { return cq_tag_; } + void* core_cq_tag() override { return core_cq_tag_; } - /// set_cq_tag is used to provide a different core CQ tag than "this". + /// set_core_cq_tag is used to provide a different core CQ tag than "this". /// This is used for callback-based tags, where the core tag is the core /// callback function. It does not change the use or behavior of any other /// function (such as FinalizeResult) - void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; } + void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } // This will be called while interceptors are run if the RPC is a hijacked // RPC. This should set hijacking state for each of the ops. @@ -866,7 +866,7 @@ class CallOpSet : public CallOpSetInterface, this->Op6::AddOp(ops, &nops); GPR_CODEGEN_ASSERT(GRPC_CALL_OK == g_core_codegen_interface->grpc_call_start_batch( - call_.call(), ops, nops, cq_tag(), nullptr)); + call_.call(), ops, nops, core_cq_tag(), nullptr)); } // Should be called after interceptors are done running on the finalize result @@ -875,7 +875,7 @@ class CallOpSet : public CallOpSetInterface, done_intercepting_ = true; GPR_CODEGEN_ASSERT(GRPC_CALL_OK == g_core_codegen_interface->grpc_call_start_batch( - call_.call(), nullptr, 0, cq_tag(), nullptr)); + call_.call(), nullptr, 0, core_cq_tag(), nullptr)); } private: @@ -906,7 +906,7 @@ class CallOpSet : public CallOpSetInterface, return interceptor_methods_.RunInterceptors(); } - void* cq_tag_; + void* core_cq_tag_; void* return_tag_; Call call_; bool done_intercepting_ = false; diff --git a/include/grpcpp/impl/codegen/call_op_set_interface.h b/include/grpcpp/impl/codegen/call_op_set_interface.h index 815227a299..3b74566a6d 100644 --- a/include/grpcpp/impl/codegen/call_op_set_interface.h +++ b/include/grpcpp/impl/codegen/call_op_set_interface.h @@ -38,9 +38,9 @@ class CallOpSetInterface : public CompletionQueueTag { virtual void FillOps(internal::Call* call) = 0; /// Get the tag to be used at the core completion queue. Generally, the - /// value of cq_tag will be "this". However, it can be overridden if we + /// value of core_cq_tag will be "this". However, it can be overridden if we /// want core to process the tag differently (e.g., as a core callback) - virtual void* cq_tag() = 0; + virtual void* core_cq_tag() = 0; // This will be called while interceptors are run if the RPC is a hijacked // RPC. This should set hijacking state for each of the ops. diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index ecb00a0769..4baa819091 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -84,7 +84,7 @@ class CallbackUnaryCallImpl { ops->AllowNoMessage(); ops->ClientSendClose(); ops->ClientRecvStatus(context, tag->status_ptr()); - ops->set_cq_tag(tag); + ops->set_core_cq_tag(tag); call.PerformOps(ops); } }; diff --git a/include/grpcpp/impl/codegen/rpc_service_method.h b/include/grpcpp/impl/codegen/rpc_service_method.h index 1890e4254d..f465c5fc2f 100644 --- a/include/grpcpp/impl/codegen/rpc_service_method.h +++ b/include/grpcpp/impl/codegen/rpc_service_method.h @@ -46,21 +46,22 @@ class MethodHandler { /// \param context : the ServerContext structure for this server call /// \param req : the request payload, if appropriate for this RPC /// \param req_status : the request status after any interceptors have run - /// \param renew : used only by the callback API. It is a function - /// called by the RPC Controller to request another RPC + /// \param rpc_requester : used only by the callback API. It is a function + /// called by the RPC Controller to request another RPC (and also + /// to set up the state required to make that request possible) HandlerParameter(Call* c, ServerContext* context, void* req, - Status req_status, std::function<void()> renew) + Status req_status, std::function<void()> requester) : call(c), server_context(context), request(req), status(req_status), - renewer(std::move(renew)) {} + call_requester(std::move(requester)) {} ~HandlerParameter() {} Call* call; ServerContext* server_context; void* request; Status status; - std::function<void()> renewer; + std::function<void()> call_requester; }; virtual void RunHandler(const HandlerParameter& param) = 0; diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h index c8f7510ed5..5d56cbf1df 100644 --- a/include/grpcpp/impl/codegen/server_callback.h +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -48,7 +48,6 @@ class ServerCallbackRpcController { // The method handler must call this function when it is done so that // the library knows to free its resources virtual void Finish(Status s) = 0; - virtual void FinishWithError(Status s) = 0; // Allow the method handler to push out the initial metadata before // the response and status are ready @@ -75,7 +74,8 @@ class CallbackUnaryHandler : public MethodHandler { param.call->call(), sizeof(ServerCallbackRpcControllerImpl))) ServerCallbackRpcControllerImpl( param.server_context, param.call, - static_cast<RequestType*>(param.request), std::move(param.renewer)); + static_cast<RequestType*>(param.request), + std::move(param.call_requester)); Status status = param.status; if (status.ok()) { @@ -112,52 +112,19 @@ class CallbackUnaryHandler : public MethodHandler { // The implementation class of ServerCallbackRpcController is a private member // of CallbackUnaryHandler since it is never exposed anywhere, and this allows // it to take advantage of CallbackUnaryHandler's friendships. - class ServerCallbackRpcControllerImpl : public experimental::ServerCallbackRpcController { public: - void Finish(Status s) override { FinishInternal(std::move(s), false); } - - void FinishWithError(Status s) override { - FinishInternal(std::move(s), true); - } - - void SendInitialMetadata(std::function<void(bool)> f) override { - GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); - - meta_tag_ = - CallbackWithSuccessTag(call_.call(), std::move(f), &meta_buf_); - meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - if (ctx_->compression_level_set()) { - meta_buf_.set_compression_level(ctx_->compression_level()); - } - ctx_->sent_initial_metadata_ = true; - meta_buf_.set_cq_tag(&meta_tag_); - call_.PerformOps(&meta_buf_); - } - - private: - template <class SrvType, class ReqType, class RespType> - friend class CallbackUnaryHandler; - - ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call, - RequestType* req, - std::function<void()> renewer) - : ctx_(ctx), call_(*call), req_(req), renewer_(std::move(renewer)) {} - - ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); } - - void FinishInternal(Status s, bool allow_error) { + void Finish(Status s) override { finish_tag_ = CallbackWithSuccessTag( call_.call(), [this](bool) { grpc_call* call = call_.call(); - auto renewer = std::move(renewer_); + auto call_requester = std::move(call_requester_); this->~ServerCallbackRpcControllerImpl(); // explicitly call // destructor g_core_codegen_interface->grpc_call_unref(call); - renewer(); + call_requester(); }, &finish_buf_); if (!ctx_->sent_initial_metadata_) { @@ -168,17 +135,46 @@ class CallbackUnaryHandler : public MethodHandler { } ctx_->sent_initial_metadata_ = true; } - // The response may be dropped if the status is not OK. - if (allow_error || s.ok()) { + // The response is dropped if the status is not OK. + if (s.ok()) { finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, finish_buf_.SendMessage(resp_)); } else { finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, s); } - finish_buf_.set_cq_tag(&finish_tag_); + finish_buf_.set_core_cq_tag(&finish_tag_); call_.PerformOps(&finish_buf_); } + void SendInitialMetadata(std::function<void(bool)> f) override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + + meta_tag_ = + CallbackWithSuccessTag(call_.call(), std::move(f), &meta_buf_); + meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_buf_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_buf_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_buf_); + } + + private: + template <class SrvType, class ReqType, class RespType> + friend class CallbackUnaryHandler; + + ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call, + RequestType* req, + std::function<void()> call_requester) + : ctx_(ctx), + call_(*call), + req_(req), + call_requester_(std::move(call_requester)) {} + + ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); } + RequestType* request() { return req_; } ResponseType* response() { return &resp_; } @@ -193,7 +189,7 @@ class CallbackUnaryHandler : public MethodHandler { Call call_; RequestType* req_; ResponseType resp_; - std::function<void()> renewer_; + std::function<void()> call_requester_; }; }; diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index fcff75a420..34d5beb653 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -926,8 +926,11 @@ void PrintHeaderServerMethodCallback( "Method$(context, request, response, controller);\n" " }, this));\n"); } else if (ClientOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods } else if (ServerOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods } else if (method->BidiStreaming()) { + // TODO(vjpai): Add in code generation for all streaming methods } printer->Print(*vars, "}\n"); printer->Print(*vars, @@ -975,8 +978,11 @@ void PrintHeaderServerMethodRawCallback( "Method$(context, request, response, controller);\n" " }, this));\n"); } else if (ClientOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods } else if (ServerOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods } else if (method->BidiStreaming()) { + // TODO(vjpai): Add in code generation for all streaming methods } printer->Print(*vars, "}\n"); printer->Print(*vars, diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 15e3ccb3c9..f5c1b4e2c5 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -225,7 +225,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { static void Run(grpc_experimental_completion_queue_functor* cb, int) { auto* callback = static_cast<ShutdownCallback*>(cb); delete callback->cq_; - grpc_core::Delete(callback); + delete callback; } private: @@ -238,7 +238,7 @@ CompletionQueue* Channel::CallbackCQ() { // if there is no explicit per-channel CQ registered std::lock_guard<std::mutex> l(mu_); if (callback_cq_ == nullptr) { - auto* shutdown_callback = grpc_core::New<ShutdownCallback>(); + auto* shutdown_callback = new ShutdownCallback; callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 6893201e2e..d93a54aed7 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -60,10 +60,10 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( case GRPC_QUEUE_SHUTDOWN: return SHUTDOWN; case GRPC_OP_COMPLETE: - auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag); + auto core_cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag); *ok = ev.success != 0; - *tag = cq_tag; - if (cq_tag->FinalizeResult(tag, ok)) { + *tag = core_cq_tag; + if (core_cq_tag->FinalizeResult(tag, ok)) { return GOT_EVENT; } break; @@ -87,9 +87,9 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { flushed_ = true; if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag, &res)) { - auto cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag); + auto core_cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag); *ok = res == 1; - if (cq_tag->FinalizeResult(tag, ok)) { + if (core_cq_tag->FinalizeResult(tag, ok)) { return true; } } diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index a9f0315af6..870ee84e3e 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -1111,7 +1111,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { static void Run(grpc_experimental_completion_queue_functor* cb, int) { auto* callback = static_cast<ShutdownCallback*>(cb); delete callback->cq_; - grpc_core::Delete(callback); + delete callback; } private: @@ -1124,7 +1124,7 @@ CompletionQueue* Server::CallbackCQ() { // if there is no explicit per-server CQ registered std::lock_guard<std::mutex> l(mu_); if (callback_cq_ == nullptr) { - auto* shutdown_callback = grpc_core::New<ShutdownCallback>(); + auto* shutdown_callback = new ShutdownCallback; callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 8ded7a8972..355debb3fb 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -45,7 +45,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { : call_(*call), has_tag_(false), tag_(nullptr), - cq_tag_(this), + core_cq_tag_(this), refs_(2), finalized_(false), cancelled_(0), @@ -92,9 +92,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { tag_ = tag; } - void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; } + void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } - void* cq_tag() override { return cq_tag_; } + void* core_cq_tag() override { return core_cq_tag_; } void Unref(); @@ -138,7 +138,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { internal::Call call_; bool has_tag_; void* tag_; - void* cq_tag_; + void* core_cq_tag_; std::mutex mu_; int refs_; bool finalized_; @@ -166,8 +166,8 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) { interceptor_methods_.SetCall(&call_); interceptor_methods_.SetReverse(); interceptor_methods_.SetCallOpSetInterface(this); - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), &ops, 1, cq_tag_, nullptr)); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), &ops, 1, + core_cq_tag_, nullptr)); /* No interceptors to run here */ } @@ -272,7 +272,7 @@ void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { if (callback) { completion_tag_ = internal::CallbackWithSuccessTag(call->call(), nullptr, completion_op_); - completion_op_->set_cq_tag(&completion_tag_); + completion_op_->set_core_cq_tag(&completion_tag_); } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } @@ -305,7 +305,7 @@ bool ServerContext::IsCancelled() const { if (completion_tag_) { // When using callback API, this result is always valid. return completion_op_->CheckCancelledAsync(); - } else if (completion_tag_ || has_notify_when_done_tag_) { + } else if (has_notify_when_done_tag_) { // When using async API, the result is only valid // if the tag has already been delivered at the completion queue return completion_op_ && completion_op_->CheckCancelledAsync(); |