diff options
96 files changed, 1833 insertions, 208 deletions
@@ -245,6 +245,7 @@ GRPCXX_PUBLIC_HDRS = [ "include/grpcpp/support/config.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", + "include/grpcpp/support/server_callback.h", "include/grpcpp/support/slice.h", "include/grpcpp/support/status.h", "include/grpcpp/support/status_code_enum.h", @@ -2088,6 +2089,7 @@ grpc_cc_library( "include/grpcpp/impl/codegen/rpc_service_method.h", "include/grpcpp/impl/codegen/security/auth_context.h", "include/grpcpp/impl/codegen/serialization_traits.h", + "include/grpcpp/impl/codegen/server_callback.h", "include/grpcpp/impl/codegen/server_context.h", "include/grpcpp/impl/codegen/server_interceptor.h", "include/grpcpp/impl/codegen/server_interface.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index efa02f1987..49c31baecb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3020,6 +3020,7 @@ foreach(_hdr include/grpcpp/support/config.h include/grpcpp/support/proto_buffer_reader.h include/grpcpp/support/proto_buffer_writer.h + include/grpcpp/support/server_callback.h include/grpcpp/support/slice.h include/grpcpp/support/status.h include/grpcpp/support/status_code_enum.h @@ -3137,6 +3138,7 @@ foreach(_hdr include/grpcpp/impl/codegen/rpc_service_method.h include/grpcpp/impl/codegen/security/auth_context.h include/grpcpp/impl/codegen/serialization_traits.h + include/grpcpp/impl/codegen/server_callback.h include/grpcpp/impl/codegen/server_context.h include/grpcpp/impl/codegen/server_interceptor.h include/grpcpp/impl/codegen/server_interface.h @@ -3600,6 +3602,7 @@ foreach(_hdr include/grpcpp/support/config.h include/grpcpp/support/proto_buffer_reader.h include/grpcpp/support/proto_buffer_writer.h + include/grpcpp/support/server_callback.h include/grpcpp/support/slice.h include/grpcpp/support/status.h include/grpcpp/support/status_code_enum.h @@ -3717,6 +3720,7 @@ foreach(_hdr include/grpcpp/impl/codegen/rpc_service_method.h include/grpcpp/impl/codegen/security/auth_context.h include/grpcpp/impl/codegen/serialization_traits.h + include/grpcpp/impl/codegen/server_callback.h include/grpcpp/impl/codegen/server_context.h include/grpcpp/impl/codegen/server_interceptor.h include/grpcpp/impl/codegen/server_interface.h @@ -4131,6 +4135,7 @@ foreach(_hdr include/grpcpp/impl/codegen/rpc_service_method.h include/grpcpp/impl/codegen/security/auth_context.h include/grpcpp/impl/codegen/serialization_traits.h + include/grpcpp/impl/codegen/server_callback.h include/grpcpp/impl/codegen/server_context.h include/grpcpp/impl/codegen/server_interceptor.h include/grpcpp/impl/codegen/server_interface.h @@ -4317,6 +4322,7 @@ foreach(_hdr include/grpcpp/impl/codegen/rpc_service_method.h include/grpcpp/impl/codegen/security/auth_context.h include/grpcpp/impl/codegen/serialization_traits.h + include/grpcpp/impl/codegen/server_callback.h include/grpcpp/impl/codegen/server_context.h include/grpcpp/impl/codegen/server_interceptor.h include/grpcpp/impl/codegen/server_interface.h @@ -4530,6 +4536,7 @@ foreach(_hdr include/grpcpp/support/config.h include/grpcpp/support/proto_buffer_reader.h include/grpcpp/support/proto_buffer_writer.h + include/grpcpp/support/server_callback.h include/grpcpp/support/slice.h include/grpcpp/support/status.h include/grpcpp/support/status_code_enum.h @@ -4647,6 +4654,7 @@ foreach(_hdr include/grpcpp/impl/codegen/rpc_service_method.h include/grpcpp/impl/codegen/security/auth_context.h include/grpcpp/impl/codegen/serialization_traits.h + include/grpcpp/impl/codegen/server_callback.h include/grpcpp/impl/codegen/server_context.h include/grpcpp/impl/codegen/server_interceptor.h include/grpcpp/impl/codegen/server_interface.h @@ -5371,6 +5371,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/config.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ + include/grpcpp/support/server_callback.h \ include/grpcpp/support/slice.h \ include/grpcpp/support/status.h \ include/grpcpp/support/status_code_enum.h \ @@ -5488,6 +5489,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/rpc_service_method.h \ include/grpcpp/impl/codegen/security/auth_context.h \ include/grpcpp/impl/codegen/serialization_traits.h \ + include/grpcpp/impl/codegen/server_callback.h \ include/grpcpp/impl/codegen/server_context.h \ include/grpcpp/impl/codegen/server_interceptor.h \ include/grpcpp/impl/codegen/server_interface.h \ @@ -5960,6 +5962,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/config.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ + include/grpcpp/support/server_callback.h \ include/grpcpp/support/slice.h \ include/grpcpp/support/status.h \ include/grpcpp/support/status_code_enum.h \ @@ -6077,6 +6080,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/rpc_service_method.h \ include/grpcpp/impl/codegen/security/auth_context.h \ include/grpcpp/impl/codegen/serialization_traits.h \ + include/grpcpp/impl/codegen/server_callback.h \ include/grpcpp/impl/codegen/server_context.h \ include/grpcpp/impl/codegen/server_interceptor.h \ include/grpcpp/impl/codegen/server_interface.h \ @@ -6476,6 +6480,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/rpc_service_method.h \ include/grpcpp/impl/codegen/security/auth_context.h \ include/grpcpp/impl/codegen/serialization_traits.h \ + include/grpcpp/impl/codegen/server_callback.h \ include/grpcpp/impl/codegen/server_context.h \ include/grpcpp/impl/codegen/server_interceptor.h \ include/grpcpp/impl/codegen/server_interface.h \ @@ -6639,6 +6644,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/rpc_service_method.h \ include/grpcpp/impl/codegen/security/auth_context.h \ include/grpcpp/impl/codegen/serialization_traits.h \ + include/grpcpp/impl/codegen/server_callback.h \ include/grpcpp/impl/codegen/server_context.h \ include/grpcpp/impl/codegen/server_interceptor.h \ include/grpcpp/impl/codegen/server_interface.h \ @@ -6857,6 +6863,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/config.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ + include/grpcpp/support/server_callback.h \ include/grpcpp/support/slice.h \ include/grpcpp/support/status.h \ include/grpcpp/support/status_code_enum.h \ @@ -6974,6 +6981,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/rpc_service_method.h \ include/grpcpp/impl/codegen/security/auth_context.h \ include/grpcpp/impl/codegen/serialization_traits.h \ + include/grpcpp/impl/codegen/server_callback.h \ include/grpcpp/impl/codegen/server_context.h \ include/grpcpp/impl/codegen/server_interceptor.h \ include/grpcpp/impl/codegen/server_interface.h \ diff --git a/build.yaml b/build.yaml index bdb5abc8be..748258da0c 100644 --- a/build.yaml +++ b/build.yaml @@ -1245,6 +1245,7 @@ filegroups: - include/grpcpp/impl/codegen/rpc_service_method.h - include/grpcpp/impl/codegen/security/auth_context.h - include/grpcpp/impl/codegen/serialization_traits.h + - include/grpcpp/impl/codegen/server_callback.h - include/grpcpp/impl/codegen/server_context.h - include/grpcpp/impl/codegen/server_interceptor.h - include/grpcpp/impl/codegen/server_interface.h @@ -1363,6 +1364,7 @@ filegroups: - include/grpcpp/support/config.h - include/grpcpp/support/proto_buffer_reader.h - include/grpcpp/support/proto_buffer_writer.h + - include/grpcpp/support/server_callback.h - include/grpcpp/support/slice.h - include/grpcpp/support/status.h - include/grpcpp/support/status_code_enum.h diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 338eeb0cec..f8a4a78e8d 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -118,6 +118,7 @@ Pod::Spec.new do |s| 'include/grpcpp/support/config.h', 'include/grpcpp/support/proto_buffer_reader.h', 'include/grpcpp/support/proto_buffer_writer.h', + 'include/grpcpp/support/server_callback.h', 'include/grpcpp/support/slice.h', 'include/grpcpp/support/status.h', 'include/grpcpp/support/status_code_enum.h', @@ -154,6 +155,7 @@ Pod::Spec.new do |s| 'include/grpcpp/impl/codegen/rpc_service_method.h', 'include/grpcpp/impl/codegen/security/auth_context.h', 'include/grpcpp/impl/codegen/serialization_traits.h', + 'include/grpcpp/impl/codegen/server_callback.h', 'include/grpcpp/impl/codegen/server_context.h', 'include/grpcpp/impl/codegen/server_interceptor.h', 'include/grpcpp/impl/codegen/server_interface.h', diff --git a/include/grpcpp/impl/codegen/async_unary_call.h b/include/grpcpp/impl/codegen/async_unary_call.h index 744b128141..89dcb12418 100644 --- a/include/grpcpp/impl/codegen/async_unary_call.h +++ b/include/grpcpp/impl/codegen/async_unary_call.h @@ -240,7 +240,7 @@ class ServerAsyncResponseWriter final /// metadata. void Finish(const W& msg, const Status& status, void* tag) { finish_buf_.set_output_tag(tag); - finish_buf_.set_cq_tag(&finish_buf_); + finish_buf_.set_core_cq_tag(&finish_buf_); if (!ctx_->sent_initial_metadata_) { finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); diff --git a/include/grpcpp/impl/codegen/byte_buffer.h b/include/grpcpp/impl/codegen/byte_buffer.h index d54ae31852..abba5549b8 100644 --- a/include/grpcpp/impl/codegen/byte_buffer.h +++ b/include/grpcpp/impl/codegen/byte_buffer.h @@ -45,6 +45,8 @@ template <class ServiceType, class RequestType, class ResponseType> class RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; +template <class ServiceType, class RequestType, class ResponseType> +class CallbackUnaryHandler; template <StatusCode code> class ErrorMethodHandler; template <class R> @@ -154,6 +156,8 @@ class ByteBuffer final { friend class internal::RpcMethodHandler; template <class ServiceType, class RequestType, class ResponseType> friend class internal::ServerStreamingHandler; + template <class ServiceType, class RequestType, class ResponseType> + friend class internal::CallbackUnaryHandler; template <StatusCode code> friend class internal::ErrorMethodHandler; template <class R> diff --git a/include/grpcpp/impl/codegen/call_op_set.h b/include/grpcpp/impl/codegen/call_op_set.h index 785688e67f..5c52b027b2 100644 --- a/include/grpcpp/impl/codegen/call_op_set.h +++ b/include/grpcpp/impl/codegen/call_op_set.h @@ -770,19 +770,19 @@ class CallOpSet : public CallOpSetInterface, public Op5, public Op6 { public: - CallOpSet() : cq_tag_(this), return_tag_(this) {} + CallOpSet() : core_cq_tag_(this), return_tag_(this) {} // The copy constructor and assignment operator reset the value of - // cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_ since - // those are only meaningful on a specific object, not across objects. + // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_ + // since those are only meaningful on a specific object, not across objects. CallOpSet(const CallOpSet& other) - : cq_tag_(this), + : core_cq_tag_(this), return_tag_(this), call_(other.call_), done_intercepting_(false), interceptor_methods_(InterceptorBatchMethodsImpl()) {} CallOpSet& operator=(const CallOpSet& other) { - cq_tag_ = this; + core_cq_tag_ = this; return_tag_ = this; call_ = other.call_; done_intercepting_ = false; @@ -834,13 +834,13 @@ class CallOpSet : public CallOpSetInterface, void set_output_tag(void* return_tag) { return_tag_ = return_tag; } - void* cq_tag() override { return cq_tag_; } + void* core_cq_tag() override { return core_cq_tag_; } - /// set_cq_tag is used to provide a different core CQ tag than "this". + /// set_core_cq_tag is used to provide a different core CQ tag than "this". /// This is used for callback-based tags, where the core tag is the core /// callback function. It does not change the use or behavior of any other /// function (such as FinalizeResult) - void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; } + void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } // This will be called while interceptors are run if the RPC is a hijacked // RPC. This should set hijacking state for each of the ops. @@ -866,7 +866,7 @@ class CallOpSet : public CallOpSetInterface, this->Op6::AddOp(ops, &nops); GPR_CODEGEN_ASSERT(GRPC_CALL_OK == g_core_codegen_interface->grpc_call_start_batch( - call_.call(), ops, nops, cq_tag(), nullptr)); + call_.call(), ops, nops, core_cq_tag(), nullptr)); } // Should be called after interceptors are done running on the finalize result @@ -875,7 +875,7 @@ class CallOpSet : public CallOpSetInterface, done_intercepting_ = true; GPR_CODEGEN_ASSERT(GRPC_CALL_OK == g_core_codegen_interface->grpc_call_start_batch( - call_.call(), nullptr, 0, cq_tag(), nullptr)); + call_.call(), nullptr, 0, core_cq_tag(), nullptr)); } private: @@ -906,7 +906,7 @@ class CallOpSet : public CallOpSetInterface, return interceptor_methods_.RunInterceptors(); } - void* cq_tag_; + void* core_cq_tag_; void* return_tag_; Call call_; bool done_intercepting_ = false; diff --git a/include/grpcpp/impl/codegen/call_op_set_interface.h b/include/grpcpp/impl/codegen/call_op_set_interface.h index 815227a299..3b74566a6d 100644 --- a/include/grpcpp/impl/codegen/call_op_set_interface.h +++ b/include/grpcpp/impl/codegen/call_op_set_interface.h @@ -38,9 +38,9 @@ class CallOpSetInterface : public CompletionQueueTag { virtual void FillOps(internal::Call* call) = 0; /// Get the tag to be used at the core completion queue. Generally, the - /// value of cq_tag will be "this". However, it can be overridden if we + /// value of core_cq_tag will be "this". However, it can be overridden if we /// want core to process the tag differently (e.g., as a core callback) - virtual void* cq_tag() = 0; + virtual void* core_cq_tag() = 0; // This will be called while interceptors are run if the RPC is a hijacked // RPC. This should set hijacking state for each of the ops. diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index eba9ec6edc..29deef658f 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -101,10 +101,11 @@ class CallbackWithStatusTag GPR_CODEGEN_ASSERT(ignored == ops_); // Last use of func_ or status_, so ok to move them out - CatchingCallback(std::move(func_), std::move(status_)); - + auto func = std::move(func_); + auto status = std::move(status_); func_ = nullptr; // reset to clear this out for sure status_ = Status(); // reset to clear this out for sure + CatchingCallback(std::move(func), std::move(status)); g_core_codegen_interface->grpc_call_unref(call_); } }; @@ -124,6 +125,8 @@ class CallbackWithSuccessTag // there are no tests catching the compiler warning. static void operator delete(void*, void*) { assert(0); } + CallbackWithSuccessTag() : call_(nullptr), ops_(nullptr) {} + CallbackWithSuccessTag(grpc_call* call, std::function<void(bool)> f, CompletionQueueTag* ops) : call_(call), func_(std::move(f)), ops_(ops) { @@ -138,6 +141,9 @@ class CallbackWithSuccessTag // that are detected before the operations are internally processed. void force_run(bool ok) { Run(ok); } + /// check if this tag has ever been set + operator bool() const { return call_ != nullptr; } + private: grpc_call* call_; std::function<void(bool)> func_; @@ -150,13 +156,19 @@ class CallbackWithSuccessTag void Run(bool ok) { void* ignored = ops_; bool new_ok = ok; - GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &new_ok)); + // Allow a "false" return value from FinalizeResult to silence the + // callback, just as it silences a CQ tag in the async cases + bool do_callback = ops_->FinalizeResult(&ignored, &new_ok); GPR_CODEGEN_ASSERT(ignored == ops_); - // Last use of func_, so ok to move it out for rvalue call above - CatchingCallback(std::move(func_), ok); - - func_ = nullptr; // reset to clear this out for sure + if (do_callback) { + // Last use of func_, so ok to move it out for rvalue call above + auto func = std::move(func_); + func_ = nullptr; // reset to clear this out for sure + CatchingCallback(std::move(func), ok); + } else { + func_ = nullptr; // reset to clear this out for sure + } g_core_codegen_interface->grpc_call_unref(call_); } }; diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h index 6fd1dd1d9b..39805a0ef5 100644 --- a/include/grpcpp/impl/codegen/channel_interface.h +++ b/include/grpcpp/impl/codegen/channel_interface.h @@ -21,6 +21,7 @@ #include <grpc/impl/codegen/connectivity_state.h> #include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/client_context.h> #include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/time.h> @@ -142,7 +143,7 @@ class ChannelInterface { // channel. If the return value is nullptr, this channel doesn't support // callback operations. // TODO(vjpai): Consider a better default like using a global CQ - // Returns nullptr (rather than being pure) since this is a new method + // Returns nullptr (rather than being pure) since this is a post-1.0 method // and adding a new pure method to an interface would be a breaking change // (even though this is private and non-API) virtual CompletionQueue* CallbackCQ() { return nullptr; } diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index ecb00a0769..4baa819091 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -84,7 +84,7 @@ class CallbackUnaryCallImpl { ops->AllowNoMessage(); ops->ClientSendClose(); ops->ClientRecvStatus(context, tag->status_ptr()); - ops->set_cq_tag(tag); + ops->set_core_cq_tag(tag); call.PerformOps(ops); } }; diff --git a/include/grpcpp/impl/codegen/client_interceptor.h b/include/grpcpp/impl/codegen/client_interceptor.h index 00113f04aa..0e08a7ce01 100644 --- a/include/grpcpp/impl/codegen/client_interceptor.h +++ b/include/grpcpp/impl/codegen/client_interceptor.h @@ -21,7 +21,6 @@ #include <vector> -#include <grpc/impl/codegen/log.h> #include <grpcpp/impl/codegen/interceptor.h> #include <grpcpp/impl/codegen/string_ref.h> diff --git a/include/grpcpp/impl/codegen/completion_queue.h b/include/grpcpp/impl/codegen/completion_queue.h index 5eef2c281f..d603c7c700 100644 --- a/include/grpcpp/impl/codegen/completion_queue.h +++ b/include/grpcpp/impl/codegen/completion_queue.h @@ -380,12 +380,18 @@ class ServerCompletionQueue : public CompletionQueue { ServerCompletionQueue() : polling_type_(GRPC_CQ_DEFAULT_POLLING) {} private: + /// \param completion_type indicates whether this is a NEXT or CALLBACK + /// completion queue. /// \param polling_type Informs the GRPC library about the type of polling /// allowed on this completion queue. See grpc_cq_polling_type's description /// in grpc_types.h for more details. - ServerCompletionQueue(grpc_cq_polling_type polling_type) + /// \param shutdown_cb is the shutdown callback used for CALLBACK api queues + ServerCompletionQueue(grpc_cq_completion_type completion_type, + grpc_cq_polling_type polling_type, + grpc_experimental_completion_queue_functor* shutdown_cb) : CompletionQueue(grpc_completion_queue_attributes{ - GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, polling_type, nullptr}), + GRPC_CQ_CURRENT_VERSION, completion_type, polling_type, + shutdown_cb}), polling_type_(polling_type) {} grpc_cq_polling_type polling_type_; diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h index cdd34b80d1..15cab711e5 100644 --- a/include/grpcpp/impl/codegen/interceptor.h +++ b/include/grpcpp/impl/codegen/interceptor.h @@ -21,13 +21,13 @@ #include <grpc/impl/codegen/grpc_types.h> #include <grpcpp/impl/codegen/byte_buffer.h> -#include <grpcpp/impl/codegen/channel_interface.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/metadata_map.h> namespace grpc { +class ChannelInterface; class Status; namespace experimental { diff --git a/include/grpcpp/impl/codegen/rpc_service_method.h b/include/grpcpp/impl/codegen/rpc_service_method.h index e77f4046a3..f465c5fc2f 100644 --- a/include/grpcpp/impl/codegen/rpc_service_method.h +++ b/include/grpcpp/impl/codegen/rpc_service_method.h @@ -40,14 +40,28 @@ class MethodHandler { public: virtual ~MethodHandler() {} struct HandlerParameter { + /// Constructor for HandlerParameter + /// + /// \param c : the gRPC Call structure for this server call + /// \param context : the ServerContext structure for this server call + /// \param req : the request payload, if appropriate for this RPC + /// \param req_status : the request status after any interceptors have run + /// \param rpc_requester : used only by the callback API. It is a function + /// called by the RPC Controller to request another RPC (and also + /// to set up the state required to make that request possible) HandlerParameter(Call* c, ServerContext* context, void* req, - Status req_status) - : call(c), server_context(context), request(req), status(req_status) {} + Status req_status, std::function<void()> requester) + : call(c), + server_context(context), + request(req), + status(req_status), + call_requester(std::move(requester)) {} ~HandlerParameter() {} Call* call; ServerContext* server_context; void* request; Status status; + std::function<void()> call_requester; }; virtual void RunHandler(const HandlerParameter& param) = 0; @@ -71,25 +85,29 @@ class RpcServiceMethod : public RpcMethod { MethodHandler* handler) : RpcMethod(name, type), server_tag_(nullptr), - async_type_(AsyncType::UNSET), + api_type_(ApiType::SYNC), handler_(handler) {} - enum class AsyncType { - UNSET, + enum class ApiType { + SYNC, ASYNC, RAW, + CALL_BACK, // not CALLBACK because that is reserved in Windows + RAW_CALL_BACK, }; void set_server_tag(void* tag) { server_tag_ = tag; } void* server_tag() const { return server_tag_; } /// if MethodHandler is nullptr, then this is an async method MethodHandler* handler() const { return handler_.get(); } + ApiType api_type() const { return api_type_; } void SetHandler(MethodHandler* handler) { handler_.reset(handler); } - void SetServerAsyncType(RpcServiceMethod::AsyncType type) { - if (async_type_ == AsyncType::UNSET) { + void SetServerApiType(RpcServiceMethod::ApiType type) { + if ((api_type_ == ApiType::SYNC) && + (type == ApiType::ASYNC || type == ApiType::RAW)) { // this marks this method as async handler_.reset(); - } else { + } else if (api_type_ != ApiType::SYNC) { // this is not an error condition, as it allows users to declare a server // like WithRawMethod_foo<AsyncService>. However since it // overwrites behavior, it should be logged. @@ -98,24 +116,28 @@ class RpcServiceMethod : public RpcMethod { "You are marking method %s as '%s', even though it was " "previously marked '%s'. This behavior will overwrite the original " "behavior. If you expected this then ignore this message.", - name(), TypeToString(async_type_), TypeToString(type)); + name(), TypeToString(api_type_), TypeToString(type)); } - async_type_ = type; + api_type_ = type; } private: void* server_tag_; - AsyncType async_type_; + ApiType api_type_; std::unique_ptr<MethodHandler> handler_; - const char* TypeToString(RpcServiceMethod::AsyncType type) { + const char* TypeToString(RpcServiceMethod::ApiType type) { switch (type) { - case AsyncType::UNSET: - return "unset"; - case AsyncType::ASYNC: + case ApiType::SYNC: + return "sync"; + case ApiType::ASYNC: return "async"; - case AsyncType::RAW: + case ApiType::RAW: return "raw"; + case ApiType::CALL_BACK: + return "callback"; + case ApiType::RAW_CALL_BACK: + return "raw_callback"; default: GPR_UNREACHABLE_CODE(return "unknown"); } diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h new file mode 100644 index 0000000000..5d56cbf1df --- /dev/null +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -0,0 +1,200 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H +#define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H + +#include <functional> + +#include <grpcpp/impl/codegen/call.h> +#include <grpcpp/impl/codegen/callback_common.h> +#include <grpcpp/impl/codegen/config.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/codegen/server_context.h> +#include <grpcpp/impl/codegen/server_interface.h> +#include <grpcpp/impl/codegen/status.h> + +namespace grpc { + +// forward declarations +namespace internal { +template <class ServiceType, class RequestType, class ResponseType> +class CallbackUnaryHandler; +} // namespace internal + +namespace experimental { + +// For unary RPCs, the exposed controller class is only an interface +// and the actual implementation is an internal class. +class ServerCallbackRpcController { + public: + virtual ~ServerCallbackRpcController() {} + + // The method handler must call this function when it is done so that + // the library knows to free its resources + virtual void Finish(Status s) = 0; + + // Allow the method handler to push out the initial metadata before + // the response and status are ready + virtual void SendInitialMetadata(std::function<void(bool)>) = 0; +}; + +} // namespace experimental + +namespace internal { + +template <class ServiceType, class RequestType, class ResponseType> +class CallbackUnaryHandler : public MethodHandler { + public: + CallbackUnaryHandler( + std::function<void(ServerContext*, const RequestType*, ResponseType*, + experimental::ServerCallbackRpcController*)> + func, + ServiceType* service) + : func_(func) {} + void RunHandler(const HandlerParameter& param) final { + // Arena allocate a controller structure (that includes request/response) + g_core_codegen_interface->grpc_call_ref(param.call->call()); + auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc( + param.call->call(), sizeof(ServerCallbackRpcControllerImpl))) + ServerCallbackRpcControllerImpl( + param.server_context, param.call, + static_cast<RequestType*>(param.request), + std::move(param.call_requester)); + Status status = param.status; + + if (status.ok()) { + // Call the actual function handler and expect the user to call finish + CatchingCallback(std::move(func_), param.server_context, + controller->request(), controller->response(), + controller); + } else { + // if deserialization failed, we need to fail the call + controller->Finish(status); + } + } + + void* Deserialize(grpc_call* call, grpc_byte_buffer* req, + Status* status) final { + ByteBuffer buf; + buf.set_buffer(req); + auto* request = new (g_core_codegen_interface->grpc_call_arena_alloc( + call, sizeof(RequestType))) RequestType(); + *status = SerializationTraits<RequestType>::Deserialize(&buf, request); + buf.Release(); + if (status->ok()) { + return request; + } + request->~RequestType(); + return nullptr; + } + + private: + std::function<void(ServerContext*, const RequestType*, ResponseType*, + experimental::ServerCallbackRpcController*)> + func_; + + // The implementation class of ServerCallbackRpcController is a private member + // of CallbackUnaryHandler since it is never exposed anywhere, and this allows + // it to take advantage of CallbackUnaryHandler's friendships. + class ServerCallbackRpcControllerImpl + : public experimental::ServerCallbackRpcController { + public: + void Finish(Status s) override { + finish_tag_ = CallbackWithSuccessTag( + call_.call(), + [this](bool) { + grpc_call* call = call_.call(); + auto call_requester = std::move(call_requester_); + this->~ServerCallbackRpcControllerImpl(); // explicitly call + // destructor + g_core_codegen_interface->grpc_call_unref(call); + call_requester(); + }, + &finish_buf_); + if (!ctx_->sent_initial_metadata_) { + finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + finish_buf_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + } + // The response is dropped if the status is not OK. + if (s.ok()) { + finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, + finish_buf_.SendMessage(resp_)); + } else { + finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, s); + } + finish_buf_.set_core_cq_tag(&finish_tag_); + call_.PerformOps(&finish_buf_); + } + + void SendInitialMetadata(std::function<void(bool)> f) override { + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); + + meta_tag_ = + CallbackWithSuccessTag(call_.call(), std::move(f), &meta_buf_); + meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + if (ctx_->compression_level_set()) { + meta_buf_.set_compression_level(ctx_->compression_level()); + } + ctx_->sent_initial_metadata_ = true; + meta_buf_.set_core_cq_tag(&meta_tag_); + call_.PerformOps(&meta_buf_); + } + + private: + template <class SrvType, class ReqType, class RespType> + friend class CallbackUnaryHandler; + + ServerCallbackRpcControllerImpl(ServerContext* ctx, Call* call, + RequestType* req, + std::function<void()> call_requester) + : ctx_(ctx), + call_(*call), + req_(req), + call_requester_(std::move(call_requester)) {} + + ~ServerCallbackRpcControllerImpl() { req_->~RequestType(); } + + RequestType* request() { return req_; } + ResponseType* response() { return &resp_; } + + CallOpSet<CallOpSendInitialMetadata> meta_buf_; + CallbackWithSuccessTag meta_tag_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> + finish_buf_; + CallbackWithSuccessTag finish_tag_; + + ServerContext* ctx_; + Call call_; + RequestType* req_; + ResponseType resp_; + std::function<void()> call_requester_; + }; +}; + +} // namespace internal + +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index 7559fb3b34..82ee862f61 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -27,6 +27,7 @@ #include <grpcpp/impl/codegen/call.h> #include <grpcpp/impl/codegen/call_op_set.h> +#include <grpcpp/impl/codegen/callback_common.h> #include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/create_auth_context.h> @@ -65,6 +66,8 @@ template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; template <class ServiceType, class RequestType, class ResponseType> class BidiStreamingHandler; +template <class ServiceType, class RequestType, class ResponseType> +class CallbackUnaryHandler; template <class Streamer, bool WriteNeeded> class TemplatedBidiStreamingHandler; template <StatusCode code> @@ -137,7 +140,7 @@ class ServerContext { /// must end in "-bin". void AddTrailingMetadata(const grpc::string& key, const grpc::string& value); - /// IsCancelled is always safe to call when using sync API. + /// IsCancelled is always safe to call when using sync or callback API. /// When using async API, it is only safe to call IsCancelled after /// the AsyncNotifyWhenDone tag has been delivered. bool IsCancelled() const; @@ -267,6 +270,8 @@ class ServerContext { friend class ::grpc::internal::ServerStreamingHandler; template <class Streamer, bool WriteNeeded> friend class ::grpc::internal::TemplatedBidiStreamingHandler; + template <class ServiceType, class RequestType, class ResponseType> + friend class ::grpc::internal::CallbackUnaryHandler; template <StatusCode code> friend class internal::ErrorMethodHandler; friend class ::grpc::ClientContext; @@ -277,7 +282,7 @@ class ServerContext { class CompletionOp; - void BeginCompletionOp(internal::Call* call); + void BeginCompletionOp(internal::Call* call, bool callback); /// Return the tag queued by BeginCompletionOp() internal::CompletionQueueTag* GetCompletionOpTag(); @@ -285,6 +290,12 @@ class ServerContext { void set_call(grpc_call* call) { call_ = call; } + void BindDeadlineAndMetadata(gpr_timespec deadline, grpc_metadata_array* arr); + + void Clear(); + + void Setup(gpr_timespec deadline); + uint32_t initial_metadata_flags() const { return 0; } experimental::ServerRpcInfo* set_server_rpc_info( @@ -302,6 +313,7 @@ class ServerContext { CompletionOp* completion_op_; bool has_notify_when_done_tag_; void* async_notify_when_done_tag_; + internal::CallbackWithSuccessTag completion_tag_; gpr_timespec deadline_; grpc_call* call_; @@ -321,7 +333,7 @@ class ServerContext { pending_ops_; bool has_pending_ops_; - experimental::ServerRpcInfo* rpc_info_ = nullptr; + experimental::ServerRpcInfo* rpc_info_; }; } // namespace grpc diff --git a/include/grpcpp/impl/codegen/server_interceptor.h b/include/grpcpp/impl/codegen/server_interceptor.h index c39e9a988d..5fb5df28b7 100644 --- a/include/grpcpp/impl/codegen/server_interceptor.h +++ b/include/grpcpp/impl/codegen/server_interceptor.h @@ -22,7 +22,6 @@ #include <atomic> #include <vector> -#include <grpc/impl/codegen/log.h> #include <grpcpp/impl/codegen/interceptor.h> #include <grpcpp/impl/codegen/string_ref.h> diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h index 92c87a5f7e..8bfb10f704 100644 --- a/include/grpcpp/impl/codegen/server_interface.h +++ b/include/grpcpp/impl/codegen/server_interface.h @@ -333,11 +333,26 @@ class ServerInterface : public internal::CallHook { } private: - virtual const std::vector< + // EXPERIMENTAL + // Getter method for the vector of interceptor factory objects. + // Returns a nullptr (rather than being pure) since this is a new method and + // adding a new pure method to an interface would be a breaking change (even + // though this is private and non-API) + virtual std::vector< std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* interceptor_creators() { return nullptr; } + + // EXPERIMENTAL + // A method to get the callbackable completion queue associated with this + // server. If the return value is nullptr, this server doesn't support + // callback operations. + // TODO(vjpai): Consider a better default like using a global CQ + // Returns nullptr (rather than being pure) since this is a post-1.0 method + // and adding a new pure method to an interface would be a breaking change + // (even though this is private and non-API) + virtual CompletionQueue* CallbackCQ() { return nullptr; } }; } // namespace grpc diff --git a/include/grpcpp/impl/codegen/service_type.h b/include/grpcpp/impl/codegen/service_type.h index 9f1a052168..332a04c294 100644 --- a/include/grpcpp/impl/codegen/service_type.h +++ b/include/grpcpp/impl/codegen/service_type.h @@ -71,7 +71,20 @@ class Service { bool has_synchronous_methods() const { for (auto it = methods_.begin(); it != methods_.end(); ++it) { - if (*it && (*it)->handler() != nullptr) { + if (*it && + (*it)->api_type() == internal::RpcServiceMethod::ApiType::SYNC) { + return true; + } + } + return false; + } + + bool has_callback_methods() const { + for (auto it = methods_.begin(); it != methods_.end(); ++it) { + if (*it && ((*it)->api_type() == + internal::RpcServiceMethod::ApiType::CALL_BACK || + (*it)->api_type() == + internal::RpcServiceMethod::ApiType::RAW_CALL_BACK)) { return true; } } @@ -88,6 +101,43 @@ class Service { } protected: + // TODO(vjpai): Promote experimental contents once callback API is accepted + class experimental_type { + public: + explicit experimental_type(Service* service) : service_(service) {} + + void MarkMethodCallback(int index, internal::MethodHandler* handler) { + // This does not have to be a hard error, however no one has approached us + // with a use case yet. Please file an issue if you believe you have one. + size_t idx = static_cast<size_t>(index); + GPR_CODEGEN_ASSERT( + service_->methods_[idx].get() != nullptr && + "Cannot mark the method as 'callback' because it has already been " + "marked as 'generic'."); + service_->methods_[idx]->SetHandler(handler); + service_->methods_[idx]->SetServerApiType( + internal::RpcServiceMethod::ApiType::CALL_BACK); + } + + void MarkMethodRawCallback(int index, internal::MethodHandler* handler) { + // This does not have to be a hard error, however no one has approached us + // with a use case yet. Please file an issue if you believe you have one. + size_t idx = static_cast<size_t>(index); + GPR_CODEGEN_ASSERT( + service_->methods_[idx].get() != nullptr && + "Cannot mark the method as 'raw callback' because it has already " + "been marked as 'generic'."); + service_->methods_[idx]->SetHandler(handler); + service_->methods_[idx]->SetServerApiType( + internal::RpcServiceMethod::ApiType::RAW_CALL_BACK); + } + + private: + Service* service_; + }; + + experimental_type experimental() { return experimental_type(this); } + template <class Message> void RequestAsyncUnary(int index, ServerContext* context, Message* request, internal::ServerAsyncStreamingInterface* stream, @@ -138,8 +188,7 @@ class Service { methods_[idx].get() != nullptr && "Cannot mark the method as 'async' because it has already been " "marked as 'generic'."); - methods_[idx]->SetServerAsyncType( - internal::RpcServiceMethod::AsyncType::ASYNC); + methods_[idx]->SetServerApiType(internal::RpcServiceMethod::ApiType::ASYNC); } void MarkMethodRaw(int index) { @@ -149,8 +198,7 @@ class Service { GPR_CODEGEN_ASSERT(methods_[idx].get() != nullptr && "Cannot mark the method as 'raw' because it has already " "been marked as 'generic'."); - methods_[idx]->SetServerAsyncType( - internal::RpcServiceMethod::AsyncType::RAW); + methods_[idx]->SetServerApiType(internal::RpcServiceMethod::ApiType::RAW); } void MarkMethodGeneric(int index) { diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h index 2b89ffd317..a14a4da578 100644 --- a/include/grpcpp/server.h +++ b/include/grpcpp/server.h @@ -191,8 +191,7 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { grpc_server* server() override { return server_; }; private: - const std::vector< - std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* + std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* interceptor_creators() override { return &interceptor_creators_; } @@ -202,6 +201,7 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { friend class ServerInitializer; class SyncRequest; + class CallbackRequest; class UnimplementedAsyncRequest; class UnimplementedAsyncResponse; @@ -224,8 +224,18 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { return max_receive_message_size_; }; + CompletionQueue* CallbackCQ() override; + ServerInitializer* initializer(); + // A vector of interceptor factory objects. + // This should be destroyed after health_check_service_ and this requirement + // is satisfied by declaring interceptor_creators_ before + // health_check_service_. (C++ mandates that member objects be destroyed in + // the reverse order of initialization.) + std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> + interceptor_creators_; + const int max_receive_message_size_; /// The following completion queues are ONLY used in case of Sync API @@ -238,6 +248,9 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { /// the \a sync_server_cqs) std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; + /// Outstanding callback requests + std::vector<std::unique_ptr<CallbackRequest>> callback_reqs_; + // Server status std::mutex mu_; bool started_; @@ -262,8 +275,12 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { // A special handler for resource exhausted in sync case std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_; - std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> - interceptor_creators_; + // callback_cq_ references the callbackable completion queue associated + // with this server (if any). It is set on the first call to CallbackCQ(). + // It is _not owned_ by the server; ownership belongs with its internal + // shutdown callback tag (invoked when the CQ is fully shutdown). + // It is protected by mu_ + CompletionQueue* callback_cq_ = nullptr; }; } // namespace grpc diff --git a/include/grpcpp/support/server_callback.h b/include/grpcpp/support/server_callback.h new file mode 100644 index 0000000000..b0aeeb53c5 --- /dev/null +++ b/include/grpcpp/support/server_callback.h @@ -0,0 +1,24 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H +#define GRPCPP_SUPPORT_SERVER_CALLBACK_H + +#include <grpcpp/impl/codegen/server_callback.h> + +#endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H @@ -87,19 +87,19 @@ BUILD_WITH_CYTHON = os.environ.get('GRPC_PYTHON_BUILD_WITH_CYTHON', False) # Export this variable to use the system installation of openssl. You need to # have the header files installed (in /usr/include/openssl) and during -# runtime, the shared libary must be installed +# runtime, the shared library must be installed BUILD_WITH_SYSTEM_OPENSSL = os.environ.get('GRPC_PYTHON_BUILD_SYSTEM_OPENSSL', False) # Export this variable to use the system installation of zlib. You need to # have the header files installed (in /usr/include/) and during -# runtime, the shared libary must be installed +# runtime, the shared library must be installed BUILD_WITH_SYSTEM_ZLIB = os.environ.get('GRPC_PYTHON_BUILD_SYSTEM_ZLIB', False) # Export this variable to use the system installation of cares. You need to # have the header files installed (in /usr/include/) and during -# runtime, the shared libary must be installed +# runtime, the shared library must be installed BUILD_WITH_SYSTEM_CARES = os.environ.get('GRPC_PYTHON_BUILD_SYSTEM_CARES', False) @@ -202,7 +202,7 @@ DEFINE_MACROS = ( ('OPENSSL_NO_ASM', 1), ('_WIN32_WINNT', 0x600), ('GPR_BACKWARDS_COMPATIBILITY_MODE', 1)) if "win32" in sys.platform: - # TODO(zyc): Re-enble c-ares on x64 and x86 windows after fixing the + # TODO(zyc): Re-enable c-ares on x64 and x86 windows after fixing the # ares_library_init compilation issue DEFINE_MACROS += (('WIN32_LEAN_AND_MEAN', 1), ('CARES_STATICLIB', 1), ('GRPC_ARES', 0), ('NTDDI_VERSION', 0x06000000), diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 56716493dc..7986aca696 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -135,6 +135,7 @@ grpc::string GetHeaderIncludes(grpc_generator::File* file, "grpcpp/impl/codegen/method_handler_impl.h", "grpcpp/impl/codegen/proto_utils.h", "grpcpp/impl/codegen/rpc_method.h", + "grpcpp/impl/codegen/server_callback.h", "grpcpp/impl/codegen/service_type.h", "grpcpp/impl/codegen/status.h", "grpcpp/impl/codegen/stub_options.h", @@ -700,9 +701,9 @@ void PrintHeaderServerMethodSync(grpc_generator::Printer* printer, printer->Print(method->GetTrailingComments("//").c_str()); } -// Helper generator. Disabled the sync API for Request and Response, then adds +// Helper generator. Disables the sync API for Request and Response, then adds // in an async API for RealRequest and RealResponse types. This is to be used -// to generate async and raw APIs. +// to generate async and raw async APIs. void PrintHeaderServerAsyncMethodsHelper( grpc_generator::Printer* printer, const grpc_generator::Method* method, std::map<grpc::string, grpc::string>* vars) { @@ -829,6 +830,170 @@ void PrintHeaderServerMethodAsync(grpc_generator::Printer* printer, printer->Print(*vars, "};\n"); } +// Helper generator. Disables the sync API for Request and Response, then adds +// in a callback API for RealRequest and RealResponse types. This is to be used +// to generate callback and raw callback APIs. +void PrintHeaderServerCallbackMethodsHelper( + grpc_generator::Printer* printer, const grpc_generator::Method* method, + std::map<grpc::string, grpc::string>* vars) { + if (method->NoStreaming()) { + printer->Print( + *vars, + "// disable synchronous version of this method\n" + "::grpc::Status $Method$(" + "::grpc::ServerContext* context, const $Request$* request, " + "$Response$* response) override {\n" + " abort();\n" + " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" + "}\n"); + printer->Print( + *vars, + "virtual void $Method$(" + "::grpc::ServerContext* context, const $RealRequest$* request, " + "$RealResponse$* response, " + "::grpc::experimental::ServerCallbackRpcController* " + "controller) { controller->Finish(::grpc::Status(" + "::grpc::StatusCode::UNIMPLEMENTED, \"\")); }\n"); + } else if (ClientOnlyStreaming(method)) { + printer->Print( + *vars, + "// disable synchronous version of this method\n" + "::grpc::Status $Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerReader< $Request$>* reader, " + "$Response$* response) override {\n" + " abort();\n" + " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" + "}\n"); + } else if (ServerOnlyStreaming(method)) { + printer->Print( + *vars, + "// disable synchronous version of this method\n" + "::grpc::Status $Method$(" + "::grpc::ServerContext* context, const $Request$* request, " + "::grpc::ServerWriter< $Response$>* writer) override " + "{\n" + " abort();\n" + " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" + "}\n"); + } else if (method->BidiStreaming()) { + printer->Print( + *vars, + "// disable synchronous version of this method\n" + "::grpc::Status $Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerReaderWriter< $Response$, $Request$>* stream) " + " override {\n" + " abort();\n" + " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" + "}\n"); + } +} + +void PrintHeaderServerMethodCallback( + grpc_generator::Printer* printer, const grpc_generator::Method* method, + std::map<grpc::string, grpc::string>* vars) { + (*vars)["Method"] = method->name(); + // These will be disabled + (*vars)["Request"] = method->input_type_name(); + (*vars)["Response"] = method->output_type_name(); + // These will be used for the callback API + (*vars)["RealRequest"] = method->input_type_name(); + (*vars)["RealResponse"] = method->output_type_name(); + printer->Print(*vars, "template <class BaseClass>\n"); + printer->Print( + *vars, + "class ExperimentalWithCallbackMethod_$Method$ : public BaseClass {\n"); + printer->Print( + " private:\n" + " void BaseClassMustBeDerivedFromService(const Service *service) {}\n"); + printer->Print(" public:\n"); + printer->Indent(); + printer->Print(*vars, "ExperimentalWithCallbackMethod_$Method$() {\n"); + if (method->NoStreaming()) { + printer->Print( + *vars, + " ::grpc::Service::experimental().MarkMethodCallback($Idx$,\n" + " new ::grpc::internal::CallbackUnaryHandler< " + "ExperimentalWithCallbackMethod_$Method$<BaseClass>, $RealRequest$, " + "$RealResponse$>(\n" + " [this](::grpc::ServerContext* context,\n" + " const $RealRequest$* request,\n" + " $RealResponse$* response,\n" + " ::grpc::experimental::ServerCallbackRpcController* " + "controller) {\n" + " this->$" + "Method$(context, request, response, controller);\n" + " }, this));\n"); + } else if (ClientOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods + } else if (ServerOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods + } else if (method->BidiStreaming()) { + // TODO(vjpai): Add in code generation for all streaming methods + } + printer->Print(*vars, "}\n"); + printer->Print(*vars, + "~ExperimentalWithCallbackMethod_$Method$() override {\n" + " BaseClassMustBeDerivedFromService(this);\n" + "}\n"); + PrintHeaderServerCallbackMethodsHelper(printer, method, vars); + printer->Outdent(); + printer->Print(*vars, "};\n"); +} + +void PrintHeaderServerMethodRawCallback( + grpc_generator::Printer* printer, const grpc_generator::Method* method, + std::map<grpc::string, grpc::string>* vars) { + (*vars)["Method"] = method->name(); + // These will be disabled + (*vars)["Request"] = method->input_type_name(); + (*vars)["Response"] = method->output_type_name(); + // These will be used for raw API + (*vars)["RealRequest"] = "::grpc::ByteBuffer"; + (*vars)["RealResponse"] = "::grpc::ByteBuffer"; + printer->Print(*vars, "template <class BaseClass>\n"); + printer->Print(*vars, + "class ExperimentalWithRawCallbackMethod_$Method$ : public " + "BaseClass {\n"); + printer->Print( + " private:\n" + " void BaseClassMustBeDerivedFromService(const Service *service) {}\n"); + printer->Print(" public:\n"); + printer->Indent(); + printer->Print(*vars, "ExperimentalWithRawCallbackMethod_$Method$() {\n"); + if (method->NoStreaming()) { + printer->Print( + *vars, + " ::grpc::Service::experimental().MarkMethodRawCallback($Idx$,\n" + " new ::grpc::internal::CallbackUnaryHandler< " + "ExperimentalWithRawCallbackMethod_$Method$<BaseClass>, $RealRequest$, " + "$RealResponse$>(\n" + " [this](::grpc::ServerContext* context,\n" + " const $RealRequest$* request,\n" + " $RealResponse$* response,\n" + " ::grpc::experimental::ServerCallbackRpcController* " + "controller) {\n" + " this->$" + "Method$(context, request, response, controller);\n" + " }, this));\n"); + } else if (ClientOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods + } else if (ServerOnlyStreaming(method)) { + // TODO(vjpai): Add in code generation for all streaming methods + } else if (method->BidiStreaming()) { + // TODO(vjpai): Add in code generation for all streaming methods + } + printer->Print(*vars, "}\n"); + printer->Print(*vars, + "~ExperimentalWithRawCallbackMethod_$Method$() override {\n" + " BaseClassMustBeDerivedFromService(this);\n" + "}\n"); + PrintHeaderServerCallbackMethodsHelper(printer, method, vars); + printer->Outdent(); + printer->Print(*vars, "};\n"); +} + void PrintHeaderServerMethodStreamedUnary( grpc_generator::Printer* printer, const grpc_generator::Method* method, std::map<grpc::string, grpc::string>* vars) { @@ -1137,7 +1302,7 @@ void PrintHeaderService(grpc_generator::Printer* printer, printer->Print("typedef "); for (int i = 0; i < service->method_count(); ++i) { - (*vars)["method_name"] = service->method(i).get()->name(); + (*vars)["method_name"] = service->method(i)->name(); printer->Print(*vars, "WithAsyncMethod_$method_name$<"); } printer->Print("Service"); @@ -1146,6 +1311,24 @@ void PrintHeaderService(grpc_generator::Printer* printer, } printer->Print(" AsyncService;\n"); + // Server side - Callback + for (int i = 0; i < service->method_count(); ++i) { + (*vars)["Idx"] = as_string(i); + PrintHeaderServerMethodCallback(printer, service->method(i).get(), vars); + } + + printer->Print("typedef "); + + for (int i = 0; i < service->method_count(); ++i) { + (*vars)["method_name"] = service->method(i)->name(); + printer->Print(*vars, "ExperimentalWithCallbackMethod_$method_name$<"); + } + printer->Print("Service"); + for (int i = 0; i < service->method_count(); ++i) { + printer->Print(" >"); + } + printer->Print(" ExperimentalCallbackService;\n"); + // Server side - Generic for (int i = 0; i < service->method_count(); ++i) { (*vars)["Idx"] = as_string(i); @@ -1158,6 +1341,12 @@ void PrintHeaderService(grpc_generator::Printer* printer, PrintHeaderServerMethodRaw(printer, service->method(i).get(), vars); } + // Server side - Raw Callback + for (int i = 0; i < service->method_count(); ++i) { + (*vars)["Idx"] = as_string(i); + PrintHeaderServerMethodRawCallback(printer, service->method(i).get(), vars); + } + // Server side - Streamed Unary for (int i = 0; i < service->method_count(); ++i) { (*vars)["Idx"] = as_string(i); @@ -1167,7 +1356,7 @@ void PrintHeaderService(grpc_generator::Printer* printer, printer->Print("typedef "); for (int i = 0; i < service->method_count(); ++i) { - (*vars)["method_name"] = service->method(i).get()->name(); + (*vars)["method_name"] = service->method(i)->name(); if (service->method(i)->NoStreaming()) { printer->Print(*vars, "WithStreamedUnaryMethod_$method_name$<"); } @@ -1189,7 +1378,7 @@ void PrintHeaderService(grpc_generator::Printer* printer, printer->Print("typedef "); for (int i = 0; i < service->method_count(); ++i) { - (*vars)["method_name"] = service->method(i).get()->name(); + (*vars)["method_name"] = service->method(i)->name(); auto method = service->method(i); if (ServerOnlyStreaming(method.get())) { printer->Print(*vars, "WithSplitStreamingMethod_$method_name$<"); @@ -1207,7 +1396,7 @@ void PrintHeaderService(grpc_generator::Printer* printer, // Server side - typedef for controlled both unary and server-side streaming printer->Print("typedef "); for (int i = 0; i < service->method_count(); ++i) { - (*vars)["method_name"] = service->method(i).get()->name(); + (*vars)["method_name"] = service->method(i)->name(); auto method = service->method(i); if (ServerOnlyStreaming(method.get())) { printer->Print(*vars, "WithSplitStreamingMethod_$method_name$<"); @@ -1333,6 +1522,7 @@ grpc::string GetSourceIncludes(grpc_generator::File* file, "grpcpp/impl/codegen/client_callback.h", "grpcpp/impl/codegen/method_handler_impl.h", "grpcpp/impl/codegen/rpc_service_method.h", + "grpcpp/impl/codegen/server_callback.h", "grpcpp/impl/codegen/service_type.h", "grpcpp/impl/codegen/sync_stream.h"}; std::vector<grpc::string> headers(headers_strs, array_end(headers_strs)); @@ -1577,7 +1767,7 @@ void PrintSourceService(grpc_generator::Printer* printer, printer->Print(*vars, "static const char* $prefix$$Service$_method_names[] = {\n"); for (int i = 0; i < service->method_count(); ++i) { - (*vars)["Method"] = service->method(i).get()->name(); + (*vars)["Method"] = service->method(i)->name(); printer->Print(*vars, " \"/$Package$$Service$/$Method$\",\n"); } printer->Print(*vars, "};\n\n"); diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index daf1b89b09..91894689c3 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -2951,6 +2951,27 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) { } } +// If the channel is in TRANSIENT_FAILURE and the call is not +// wait_for_ready=true, fails the call and returns true. +static bool fail_call_if_in_transient_failure(grpc_call_element* elem) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch; + if (grpc_connectivity_state_check(&chand->state_tracker) == + GRPC_CHANNEL_TRANSIENT_FAILURE && + (batch->payload->send_initial_metadata.send_initial_metadata_flags & + GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) { + pending_batches_fail( + elem, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "channel is in state TRANSIENT_FAILURE"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + true /* yield_call_combiner */); + return true; + } + return false; +} + // Invoked once resolver results are available. static void process_service_config_and_start_lb_pick_locked( grpc_call_element* elem) { @@ -2958,6 +2979,9 @@ static void process_service_config_and_start_lb_pick_locked( // Only get service config data on the first attempt. if (GPR_LIKELY(calld->num_attempts_completed == 0)) { apply_service_config_to_call_locked(elem); + // Check this after applying service config, since it may have + // affected the call's wait_for_ready value. + if (fail_call_if_in_transient_failure(elem)) return; } // Start LB pick. grpc_core::LbPicker::StartLocked(elem); @@ -3127,6 +3151,16 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { // We do not yet have an LB policy, so wait for a resolver result. if (GPR_UNLIKELY(!chand->started_resolving)) { start_resolving_locked(chand); + } else { + // Normally, we want to do this check in + // process_service_config_and_start_lb_pick_locked(), so that we + // can honor the wait_for_ready setting in the service config. + // However, if the channel is in TRANSIENT_FAILURE at this point, that + // means that the resolver has returned a failure, so we're not going + // to get a service config right away. In that case, we fail the + // call now based on the wait_for_ready value passed in from the + // application. + if (fail_call_if_in_transient_failure(elem)) return; } // Create a new waiter, which will delete itself when done. grpc_core::New<grpc_core::ResolverResultWaiter>(elem); diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index a3c782d8c9..591637aa86 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -299,6 +299,11 @@ HealthCheckClient::CallState::~CallState() { health_check_client_.get(), this); } if (call_ != nullptr) GRPC_SUBCHANNEL_CALL_UNREF(call_, "call_ended"); + for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) { + if (context_[i].destroy != nullptr) { + context_[i].destroy(context_[i].value); + } + } // Unset the call combiner cancellation closure. This has the // effect of scheduling the previously set cancellation closure, if // any, so that it can release any internal references it may be @@ -346,6 +351,7 @@ void HealthCheckClient::CallState::StartCall() { } // Initialize payload and batch. memset(&batch_, 0, sizeof(batch_)); + payload_.context = context_; batch_.payload = &payload_; // on_complete callback takes ref, handled manually. Ref(DEBUG_LOCATION, "on_complete").release(); diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 123be79c88..230b89b3fc 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -108,7 +108,8 @@ grpc_channel* grpc_channel_create_with_builder( channel->resource_user = resource_user; channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type); bool channelz_enabled = GRPC_ENABLE_CHANNELZ_DEFAULT; - size_t channel_tracer_max_memory = 0; // default to off + size_t channel_tracer_max_memory = + GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT; bool internal_channel = false; // this creates the default ChannelNode. Different types of channels may // override this to ensure a correct ChannelNode is created. @@ -147,7 +148,6 @@ grpc_channel* grpc_channel_create_with_builder( 0x1; /* always support no compression */ } else if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)) { - GPR_ASSERT(channel_tracer_max_memory == 0); const grpc_integer_options options = { GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX}; channel_tracer_max_memory = diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 15e3ccb3c9..f5c1b4e2c5 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -225,7 +225,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { static void Run(grpc_experimental_completion_queue_functor* cb, int) { auto* callback = static_cast<ShutdownCallback*>(cb); delete callback->cq_; - grpc_core::Delete(callback); + delete callback; } private: @@ -238,7 +238,7 @@ CompletionQueue* Channel::CallbackCQ() { // if there is no explicit per-channel CQ registered std::lock_guard<std::mutex> l(mu_); if (callback_cq_ == nullptr) { - auto* shutdown_callback = grpc_core::New<ShutdownCallback>(); + auto* shutdown_callback = new ShutdownCallback; callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}); diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index 6893201e2e..d93a54aed7 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -60,10 +60,10 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( case GRPC_QUEUE_SHUTDOWN: return SHUTDOWN; case GRPC_OP_COMPLETE: - auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag); + auto core_cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag); *ok = ev.success != 0; - *tag = cq_tag; - if (cq_tag->FinalizeResult(tag, ok)) { + *tag = core_cq_tag; + if (core_cq_tag->FinalizeResult(tag, ok)) { return GOT_EVENT; } break; @@ -87,9 +87,9 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { flushed_ = true; if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag, &res)) { - auto cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag); + auto core_cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag); *ok = res == 1; - if (cq_tag->FinalizeResult(tag, ok)) { + if (core_cq_tag->FinalizeResult(tag, ok)) { return true; } } diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index fc42b6c886..0dc03b6876 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -71,7 +71,9 @@ ServerBuilder::~ServerBuilder() { std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue( bool is_frequently_polled) { ServerCompletionQueue* cq = new ServerCompletionQueue( - is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING); + GRPC_CQ_NEXT, + is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING, + nullptr); cqs_.push_back(cq); return std::unique_ptr<ServerCompletionQueue>(cq); } @@ -256,15 +258,22 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { // Create completion queues to listen to incoming rpc requests for (int i = 0; i < sync_server_settings_.num_cqs; i++) { - sync_server_cqs->emplace_back(new ServerCompletionQueue(polling_type)); + sync_server_cqs->emplace_back( + new ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr)); } } - std::unique_ptr<Server> server(new Server( - max_receive_message_size_, &args, sync_server_cqs, - sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, - sync_server_settings_.cq_timeout_msec, resource_quota_, - std::move(interceptor_creators_))); + // == Determine if the server has any callback methods == + bool has_callback_methods = false; + for (auto it = services_.begin(); it != services_.end(); ++it) { + if ((*it)->service->has_callback_methods()) { + has_callback_methods = true; + break; + } + } + + // TODO(vjpai): Add a section here for plugins once they can support callback + // methods if (has_sync_methods) { // This is a Sync server @@ -276,6 +285,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { sync_server_settings_.cq_timeout_msec); } + if (has_callback_methods) { + gpr_log(GPR_INFO, "Callback server."); + } + + std::unique_ptr<Server> server(new Server( + max_receive_message_size_, &args, sync_server_cqs, + sync_server_settings_.min_pollers, sync_server_settings_.max_pollers, + sync_server_settings_.cq_timeout_msec, resource_quota_, + std::move(interceptor_creators_))); + ServerInitializer* initializer = server->initializer(); // Register all the completion queues with the server. i.e @@ -289,6 +308,12 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { num_frequently_polled_cqs++; } + if (has_callback_methods) { + auto* cq = server->CallbackCQ(); + grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); + num_frequently_polled_cqs++; + } + // cqs_ contains the completion queue added by calling the ServerBuilder's // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by // calling Next() or AsyncNext()) and hence are not safe to be used for diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 82a9d719fa..870ee84e3e 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -147,9 +147,9 @@ class Server::UnimplementedAsyncResponse final class Server::SyncRequest final : public internal::CompletionQueueTag { public: - SyncRequest(internal::RpcServiceMethod* method, void* tag) + SyncRequest(internal::RpcServiceMethod* method, void* method_tag) : method_(method), - tag_(tag), + method_tag_(method_tag), in_flight_(false), has_request_payload_( method->method_type() == internal::RpcMethod::NORMAL_RPC || @@ -176,10 +176,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; - if (tag_) { + if (method_tag_) { if (GRPC_CALL_OK != grpc_server_request_registered_call( - server, tag_, &call_, &deadline_, &request_metadata_, + server, method_tag_, &call_, &deadline_, &request_metadata_, has_request_payload_ ? &request_payload_ : nullptr, cq_, notify_cq, this)) { TeardownRequest(); @@ -211,6 +211,9 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { return true; } + // The CallData class represents a call that is "active" as opposed + // to just being requested. It wraps and takes ownership of the cq from + // the call request class CallData final { public: explicit CallData(Server* server, SyncRequest* mrd) @@ -276,12 +279,12 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { void ContinueRunAfterInterception() { { - ctx_.BeginCompletionOp(&call_); + ctx_.BeginCompletionOp(&call_, false); global_callbacks_->PreSynchronousRequest(&ctx_); auto* handler = resources_ ? method_->handler() : server_->resource_exhausted_handler_.get(); handler->RunHandler(internal::MethodHandler::HandlerParameter( - &call_, &ctx_, request_, request_status_)); + &call_, &ctx_, request_, request_status_, nullptr)); request_ = nullptr; global_callbacks_->PostSynchronousRequest(&ctx_); @@ -314,7 +317,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { private: internal::RpcServiceMethod* const method_; - void* const tag_; + void* const method_tag_; bool in_flight_; const bool has_request_payload_; grpc_call* call_; @@ -325,6 +328,176 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { grpc_completion_queue* cq_; }; +class Server::CallbackRequest final : public internal::CompletionQueueTag { + public: + CallbackRequest(Server* server, internal::RpcServiceMethod* method, + void* method_tag) + : server_(server), + method_(method), + method_tag_(method_tag), + has_request_payload_( + method->method_type() == internal::RpcMethod::NORMAL_RPC || + method->method_type() == internal::RpcMethod::SERVER_STREAMING), + cq_(server->CallbackCQ()), + tag_(this) { + Setup(); + } + + ~CallbackRequest() { Clear(); } + + void Request() { + if (method_tag_) { + if (GRPC_CALL_OK != + grpc_server_request_registered_call( + server_->c_server(), method_tag_, &call_, &deadline_, + &request_metadata_, + has_request_payload_ ? &request_payload_ : nullptr, cq_->cq(), + cq_->cq(), static_cast<void*>(&tag_))) { + return; + } + } else { + if (!call_details_) { + call_details_ = new grpc_call_details; + grpc_call_details_init(call_details_); + } + if (grpc_server_request_call(server_->c_server(), &call_, call_details_, + &request_metadata_, cq_->cq(), cq_->cq(), + static_cast<void*>(&tag_)) != GRPC_CALL_OK) { + return; + } + } + } + + bool FinalizeResult(void** tag, bool* status) override { return false; } + + private: + class CallbackCallTag : public grpc_experimental_completion_queue_functor { + public: + CallbackCallTag(Server::CallbackRequest* req) : req_(req) { + functor_run = &CallbackCallTag::StaticRun; + } + + // force_run can not be performed on a tag if operations using this tag + // have been sent to PerformOpsOnCall. It is intended for error conditions + // that are detected before the operations are internally processed. + void force_run(bool ok) { Run(ok); } + + private: + Server::CallbackRequest* req_; + internal::Call* call_; + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast<CallbackCallTag*>(cb)->Run(static_cast<bool>(ok)); + } + void Run(bool ok) { + void* ignored = req_; + bool new_ok = ok; + GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok)); + GPR_ASSERT(ignored == req_); + + if (!ok) { + // The call has been shutdown + req_->Clear(); + return; + } + + // Bind the call, deadline, and metadata from what we got + req_->ctx_.set_call(req_->call_); + req_->ctx_.cq_ = req_->cq_; + req_->ctx_.BindDeadlineAndMetadata(req_->deadline_, + &req_->request_metadata_); + req_->request_metadata_.count = 0; + + // Create a C++ Call to control the underlying core call + call_ = new (grpc_call_arena_alloc(req_->call_, sizeof(internal::Call))) + internal::Call( + req_->call_, req_->server_, req_->cq_, + req_->server_->max_receive_message_size(), + req_->ctx_.set_server_rpc_info( + req_->method_->name(), req_->server_->interceptor_creators_)); + + req_->interceptor_methods_.SetCall(call_); + req_->interceptor_methods_.SetReverse(); + // Set interception point for RECV INITIAL METADATA + req_->interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); + req_->interceptor_methods_.SetRecvInitialMetadata( + &req_->ctx_.client_metadata_); + + if (req_->has_request_payload_) { + // Set interception point for RECV MESSAGE + req_->request_ = req_->method_->handler()->Deserialize( + req_->call_, req_->request_payload_, &req_->request_status_); + req_->request_payload_ = nullptr; + req_->interceptor_methods_.AddInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_MESSAGE); + req_->interceptor_methods_.SetRecvMessage(req_->request_); + } + + if (req_->interceptor_methods_.RunInterceptors( + [this] { ContinueRunAfterInterception(); })) { + ContinueRunAfterInterception(); + } else { + // There were interceptors to be run, so ContinueRunAfterInterception + // will be run when interceptors are done. + } + } + void ContinueRunAfterInterception() { + req_->ctx_.BeginCompletionOp(call_, true); + req_->method_->handler()->RunHandler( + internal::MethodHandler::HandlerParameter( + call_, &req_->ctx_, req_->request_, req_->request_status_, + [this] { + req_->Reset(); + req_->Request(); + })); + } + }; + + void Reset() { + Clear(); + Setup(); + } + + void Clear() { + if (call_details_) { + delete call_details_; + call_details_ = nullptr; + } + grpc_metadata_array_destroy(&request_metadata_); + if (has_request_payload_ && request_payload_) { + grpc_byte_buffer_destroy(request_payload_); + } + ctx_.Clear(); + interceptor_methods_.ClearState(); + } + + void Setup() { + grpc_metadata_array_init(&request_metadata_); + ctx_.Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); + request_payload_ = nullptr; + request_ = nullptr; + request_status_ = Status(); + } + + Server* const server_; + internal::RpcServiceMethod* const method_; + void* const method_tag_; + const bool has_request_payload_; + grpc_byte_buffer* request_payload_; + void* request_; + Status request_status_; + grpc_call_details* call_details_ = nullptr; + grpc_call* call_; + gpr_timespec deadline_; + grpc_metadata_array request_metadata_; + CompletionQueue* cq_; + CallbackCallTag tag_; + ServerContext ctx_; + internal::InterceptorBatchMethodsImpl interceptor_methods_; +}; + // Implementation of ThreadManager. Each instance of SyncRequestThreadManager // manages a pool of threads that poll for incoming Sync RPCs and call the // appropriate RPC handlers @@ -447,7 +620,8 @@ Server::Server( std::vector< std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> interceptor_creators) - : max_receive_message_size_(max_receive_message_size), + : interceptor_creators_(std::move(interceptor_creators)), + max_receive_message_size_(max_receive_message_size), sync_server_cqs_(std::move(sync_server_cqs)), started_(false), shutdown_(false), @@ -455,8 +629,7 @@ Server::Server( has_generic_service_(false), server_(nullptr), server_initializer_(new ServerInitializer(this)), - health_check_service_disabled_(false), - interceptor_creators_(std::move(interceptor_creators)) { + health_check_service_disabled_(false) { g_gli_initializer.summon(); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); global_callbacks_ = g_callbacks; @@ -504,6 +677,9 @@ Server::Server( Server::~Server() { { std::unique_lock<std::mutex> lock(mu_); + if (callback_cq_ != nullptr) { + callback_cq_->Shutdown(); + } if (started_ && !shutdown_) { lock.unlock(); Shutdown(); @@ -576,21 +752,28 @@ bool Server::RegisterService(const grpc::string* host, Service* service) { } internal::RpcServiceMethod* method = it->get(); - void* tag = grpc_server_register_method( + void* method_registration_tag = grpc_server_register_method( server_, method->name(), host ? host->c_str() : nullptr, PayloadHandlingForMethod(method), 0); - if (tag == nullptr) { + if (method_registration_tag == nullptr) { gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); return false; } - if (method->handler() == nullptr) { // Async method - method->set_server_tag(tag); - } else { + if (method->handler() == nullptr) { // Async method without handler + method->set_server_tag(method_registration_tag); + } else if (method->api_type() == + internal::RpcServiceMethod::ApiType::SYNC) { for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) { - (*it)->AddSyncMethod(method, tag); + (*it)->AddSyncMethod(method, method_registration_tag); } + } else { + // a callback method + auto* req = new CallbackRequest(this, method, method_registration_tag); + callback_reqs_.emplace_back(req); + // Enqueue it so that it will be Request'ed later once + // all request matchers are created at core server startup } method_name = method->name(); @@ -641,7 +824,8 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { // performance. This ensures that we don't introduce thread hops // for application requests that wind up on this CQ, which is polled // in its own thread. - health_check_cq = new ServerCompletionQueue(GRPC_CQ_NON_POLLING); + health_check_cq = + new ServerCompletionQueue(GRPC_CQ_NEXT, GRPC_CQ_NON_POLLING, nullptr); grpc_server_register_completion_queue(server_, health_check_cq->cq(), nullptr); default_health_check_service_impl = @@ -678,6 +862,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { (*it)->Start(); } + for (auto& cbreq : callback_reqs_) { + cbreq->Request(); + } + if (default_health_check_service_impl != nullptr) { default_health_check_service_impl->StartServingThread(); } @@ -806,7 +994,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, } } if (*status && call_) { - context_->BeginCompletionOp(&call_wrapper_); + context_->BeginCompletionOp(&call_wrapper_, false); } *tag = tag_; if (delete_on_finalize_) { @@ -817,7 +1005,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, void ServerInterface::BaseAsyncRequest:: ContinueFinalizeResultAfterInterception() { - context_->BeginCompletionOp(&call_wrapper_); + context_->BeginCompletionOp(&call_wrapper_, false); // Queue a tag which will be returned immediately grpc_core::ExecCtx exec_ctx; grpc_cq_begin_op(notification_cq_->cq(), this); @@ -910,4 +1098,41 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse( ServerInitializer* Server::initializer() { return server_initializer_.get(); } +namespace { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { + public: + ShutdownCallback() { functor_run = &ShutdownCallback::Run; } + // TakeCQ takes ownership of the cq into the shutdown callback + // so that the shutdown callback will be responsible for destroying it + void TakeCQ(CompletionQueue* cq) { cq_ = cq; } + + // The Run function will get invoked by the completion queue library + // when the shutdown is actually complete + static void Run(grpc_experimental_completion_queue_functor* cb, int) { + auto* callback = static_cast<ShutdownCallback*>(cb); + delete callback->cq_; + delete callback; + } + + private: + CompletionQueue* cq_ = nullptr; +}; +} // namespace + +CompletionQueue* Server::CallbackCQ() { + // TODO(vjpai): Consider using a single global CQ for the default CQ + // if there is no explicit per-server CQ registered + std::lock_guard<std::mutex> l(mu_); + if (callback_cq_ == nullptr) { + auto* shutdown_callback = new ShutdownCallback; + callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, + shutdown_callback}); + + // Transfer ownership of the new cq to its own shutdown callback + shutdown_callback->TakeCQ(callback_cq_); + } + return callback_cq_; +}; + } // namespace grpc diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 995e787785..355debb3fb 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -45,11 +45,18 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { : call_(*call), has_tag_(false), tag_(nullptr), + core_cq_tag_(this), refs_(2), finalized_(false), cancelled_(0), done_intercepting_(false) {} + // CompletionOp isn't copyable or movable + CompletionOp(const CompletionOp&) = delete; + CompletionOp& operator=(const CompletionOp&) = delete; + CompletionOp(CompletionOp&&) = delete; + CompletionOp& operator=(CompletionOp&&) = delete; + ~CompletionOp() { if (call_.server_rpc_info()) { call_.server_rpc_info()->Unref(); @@ -85,8 +92,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { tag_ = tag; } - /// TODO(vjpai): Allow override of cq_tag if appropriate for callback API - void* cq_tag() override { return this; } + void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } + + void* core_cq_tag() override { return core_cq_tag_; } void Unref(); @@ -130,6 +138,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { internal::Call call_; bool has_tag_; void* tag_; + void* core_cq_tag_; std::mutex mu_; int refs_; bool finalized_; @@ -157,8 +166,8 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) { interceptor_methods_.SetCall(&call_); interceptor_methods_.SetReverse(); interceptor_methods_.SetCallOpSetInterface(this); - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), &ops, 1, this, nullptr)); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), &ops, 1, + core_cq_tag_, nullptr)); /* No interceptors to run here */ } @@ -209,31 +218,35 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { // ServerContext body -ServerContext::ServerContext() - : completion_op_(nullptr), - has_notify_when_done_tag_(false), - async_notify_when_done_tag_(nullptr), - deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)), - call_(nullptr), - cq_(nullptr), - sent_initial_metadata_(false), - compression_level_set_(false), - has_pending_ops_(false) {} - -ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) - : completion_op_(nullptr), - has_notify_when_done_tag_(false), - async_notify_when_done_tag_(nullptr), - deadline_(deadline), - call_(nullptr), - cq_(nullptr), - sent_initial_metadata_(false), - compression_level_set_(false), - has_pending_ops_(false) { +ServerContext::ServerContext() { Setup(gpr_inf_future(GPR_CLOCK_REALTIME)); } + +ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) { + Setup(deadline); std::swap(*client_metadata_.arr(), *arr); } -ServerContext::~ServerContext() { +void ServerContext::Setup(gpr_timespec deadline) { + completion_op_ = nullptr; + has_notify_when_done_tag_ = false; + async_notify_when_done_tag_ = nullptr; + deadline_ = deadline; + call_ = nullptr; + cq_ = nullptr; + sent_initial_metadata_ = false; + compression_level_set_ = false; + has_pending_ops_ = false; + rpc_info_ = nullptr; +} + +void ServerContext::BindDeadlineAndMetadata(gpr_timespec deadline, + grpc_metadata_array* arr) { + deadline_ = deadline; + std::swap(*client_metadata_.arr(), *arr); +} + +ServerContext::~ServerContext() { Clear(); } + +void ServerContext::Clear() { if (call_) { grpc_call_unref(call_); } @@ -243,9 +256,11 @@ ServerContext::~ServerContext() { if (rpc_info_) { rpc_info_->Unref(); } + // Don't need to clear out call_, completion_op_, or rpc_info_ because this is + // either called from destructor or just before Setup } -void ServerContext::BeginCompletionOp(internal::Call* call) { +void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) { GPR_ASSERT(!completion_op_); if (rpc_info_) { rpc_info_->Ref(); @@ -254,7 +269,11 @@ void ServerContext::BeginCompletionOp(internal::Call* call) { completion_op_ = new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp))) CompletionOp(call); - if (has_notify_when_done_tag_) { + if (callback) { + completion_tag_ = + internal::CallbackWithSuccessTag(call->call(), nullptr, completion_op_); + completion_op_->set_core_cq_tag(&completion_tag_); + } else if (has_notify_when_done_tag_) { completion_op_->set_tag(async_notify_when_done_tag_); } call->PerformOps(completion_op_); @@ -283,12 +302,15 @@ void ServerContext::TryCancel() const { } bool ServerContext::IsCancelled() const { - if (has_notify_when_done_tag_) { - // when using async API, but the result is only valid + if (completion_tag_) { + // When using callback API, this result is always valid. + return completion_op_->CheckCancelledAsync(); + } else if (has_notify_when_done_tag_) { + // When using async API, the result is only valid // if the tag has already been delivered at the completion queue return completion_op_ && completion_op_->CheckCancelledAsync(); } else { - // when using sync API + // when using sync API, the result is always valid return completion_op_ && completion_op_->CheckCancelled(cq_); } } diff --git a/src/python/.gitignore b/src/python/.gitignore index 7b520579a0..41813129bd 100644 --- a/src/python/.gitignore +++ b/src/python/.gitignore @@ -1,3 +1,4 @@ gens/ *_pb2.py *_pb2_grpc.py +*.egg-info/ diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py index 0a3097111f..3cb0eb179e 100644 --- a/src/python/grpcio/commands.py +++ b/src/python/grpcio/commands.py @@ -274,8 +274,14 @@ class BuildExt(build_ext.build_ext): extra_defines = [ 'EXTRA_DEFINES="GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK=1"' ] + # Ensure the BoringSSL are built instead of using system provided + # libraries. It prevents dependency issues while distributing to + # Mac users who use MacPorts to manage their libraries. #17002 + mod_env = dict(os.environ) + mod_env['REQUIRE_CUSTOM_LIBRARIES_opt'] = '1' make_process = subprocess.Popen( ['make'] + extra_defines + targets, + env=mod_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) make_out, make_err = make_process.communicate() diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index eeeb4ddb33..734eac3801 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -24,8 +24,8 @@ from grpc import _grpcio_metadata from grpc._cython import cygrpc from grpc.framework.foundation import callable_util -logging.basicConfig() _LOGGER = logging.getLogger(__name__) +_LOGGER.addHandler(logging.NullHandler()) _USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__) @@ -980,8 +980,9 @@ class Channel(grpc.Channel): # for as long as they are in use and to close them after using them, # then deletion of this grpc._channel.Channel instance can be made to # effect closure of the underlying cygrpc.Channel instance. - cygrpc.fork_unregister_channel(self) + if cygrpc is not None: # Globals may have already been collected. + cygrpc.fork_unregister_channel(self) # This prevent the failed-at-initializing object removal from failing. # Though the __init__ failed, the removal will still trigger __del__. - if hasattr(self, "_connectivity_state"): + if _moot is not None and hasattr(self, "_connectivity_state"): _moot(self._connectivity_state) diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index 3805c7e82a..d35f4566bd 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -20,8 +20,8 @@ import six import grpc from grpc._cython import cygrpc -logging.basicConfig() _LOGGER = logging.getLogger(__name__) +_LOGGER.addHandler(logging.NullHandler()) CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = { cygrpc.ConnectivityState.idle: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi index 334e561baa..fa356d913e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi @@ -14,8 +14,8 @@ import logging -logging.basicConfig() _LOGGER = logging.getLogger(__name__) +_LOGGER.addHandler(logging.NullHandler()) # This function will ascii encode unicode string inputs if neccesary. # In Python3, unicode strings are the default str type. diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 5779437b92..f9d1e863ca 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -18,8 +18,8 @@ import logging import time import grpc -logging.basicConfig() _LOGGER = logging.getLogger(__name__) +_LOGGER.addHandler(logging.NullHandler()) cdef class Server: diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py index 88ab4d8371..53af2ff913 100644 --- a/src/python/grpcio/grpc/_plugin_wrapping.py +++ b/src/python/grpcio/grpc/_plugin_wrapping.py @@ -20,8 +20,8 @@ import grpc from grpc import _common from grpc._cython import cygrpc -logging.basicConfig() _LOGGER = logging.getLogger(__name__) +_LOGGER.addHandler(logging.NullHandler()) class _AuthMetadataContext( diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index daa000a6e1..7276a7fd90 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -27,7 +27,6 @@ from grpc import _interceptor from grpc._cython import cygrpc from grpc.framework.foundation import callable_util -logging.basicConfig() _LOGGER = logging.getLogger(__name__) _SHUTDOWN_TAG = 'shutdown' diff --git a/src/python/grpcio/grpc/framework/foundation/callable_util.py b/src/python/grpcio/grpc/framework/foundation/callable_util.py index fb8d5f7c1e..36066e19df 100644 --- a/src/python/grpcio/grpc/framework/foundation/callable_util.py +++ b/src/python/grpcio/grpc/framework/foundation/callable_util.py @@ -21,8 +21,8 @@ import logging import six -logging.basicConfig() _LOGGER = logging.getLogger(__name__) +_LOGGER.addHandler(logging.NullHandler()) class Outcome(six.with_metaclass(abc.ABCMeta)): diff --git a/src/python/grpcio/grpc/framework/foundation/logging_pool.py b/src/python/grpcio/grpc/framework/foundation/logging_pool.py index 7702d1785f..dfb8dbdc30 100644 --- a/src/python/grpcio/grpc/framework/foundation/logging_pool.py +++ b/src/python/grpcio/grpc/framework/foundation/logging_pool.py @@ -17,8 +17,8 @@ import logging from concurrent import futures -logging.basicConfig() _LOGGER = logging.getLogger(__name__) +_LOGGER.addHandler(logging.NullHandler()) def _wrap(behavior): diff --git a/src/python/grpcio/grpc/framework/foundation/stream_util.py b/src/python/grpcio/grpc/framework/foundation/stream_util.py index 9184f95873..e03130cced 100644 --- a/src/python/grpcio/grpc/framework/foundation/stream_util.py +++ b/src/python/grpcio/grpc/framework/foundation/stream_util.py @@ -19,8 +19,8 @@ import threading from grpc.framework.foundation import stream _NO_VALUE = object() -logging.basicConfig() _LOGGER = logging.getLogger(__name__) +_LOGGER.addHandler(logging.NullHandler()) class TransformingConsumer(stream.Consumer): diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py index 768cdaf468..72f88a1c98 100644 --- a/src/python/grpcio_tests/tests/interop/server.py +++ b/src/python/grpcio_tests/tests/interop/server.py @@ -25,8 +25,8 @@ from tests.interop import methods from tests.interop import resources from tests.unit import test_common -logging.basicConfig() _ONE_DAY_IN_SECONDS = 60 * 60 * 24 +logging.basicConfig() _LOGGER = logging.getLogger(__name__) diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 76d5d22d57..5505369867 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -46,6 +46,7 @@ "unit._interceptor_test.InterceptorTest", "unit._invalid_metadata_test.InvalidMetadataTest", "unit._invocation_defects_test.InvocationDefectsTest", + "unit._logging_test.LoggingTest", "unit._metadata_code_details_test.MetadataCodeDetailsTest", "unit._metadata_test.MetadataTest", "unit._reconnect_test.ReconnectTest", diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index dcd6d9fbb2..de33b81e32 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -17,6 +17,7 @@ GRPCIO_TESTS_UNIT = [ "_interceptor_test.py", "_invalid_metadata_test.py", "_invocation_defects_test.py", + "_logging_test.py", "_metadata_code_details_test.py", "_metadata_test.py", # TODO: Issue 16336 diff --git a/src/python/grpcio_tests/tests/unit/_api_test.py b/src/python/grpcio_tests/tests/unit/_api_test.py index f6245be77d..38072861a4 100644 --- a/src/python/grpcio_tests/tests/unit/_api_test.py +++ b/src/python/grpcio_tests/tests/unit/_api_test.py @@ -14,6 +14,7 @@ """Test of gRPC Python's application-layer API.""" import unittest +import logging import six @@ -102,4 +103,5 @@ class ChannelTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_auth_context_test.py b/src/python/grpcio_tests/tests/unit/_auth_context_test.py index d174051070..b1b5bbdcab 100644 --- a/src/python/grpcio_tests/tests/unit/_auth_context_test.py +++ b/src/python/grpcio_tests/tests/unit/_auth_context_test.py @@ -15,6 +15,7 @@ import pickle import unittest +import logging import grpc from grpc import _channel @@ -187,4 +188,5 @@ class AuthContextTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_auth_test.py b/src/python/grpcio_tests/tests/unit/_auth_test.py index e2cb938936..d9df2add4f 100644 --- a/src/python/grpcio_tests/tests/unit/_auth_test.py +++ b/src/python/grpcio_tests/tests/unit/_auth_test.py @@ -16,6 +16,7 @@ import collections import threading import unittest +import logging from grpc import _auth @@ -77,4 +78,5 @@ class AccessTokenAuthMetadataPluginTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_channel_args_test.py b/src/python/grpcio_tests/tests/unit/_channel_args_test.py index dd1d2969a2..228c0e4c16 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_args_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_args_test.py @@ -15,6 +15,7 @@ from concurrent import futures import unittest +import logging import grpc @@ -62,4 +63,5 @@ class ChannelArgsTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_channel_close_test.py b/src/python/grpcio_tests/tests/unit/_channel_close_test.py index af3a9ee1ee..82fa165710 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_close_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_close_test.py @@ -13,6 +13,7 @@ # limitations under the License. """Tests server and client side compression.""" +import logging import threading import time import unittest @@ -182,4 +183,5 @@ class ChannelCloseTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py index f9eb0011dc..727fb7d65f 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py @@ -13,6 +13,7 @@ # limitations under the License. """Tests of grpc._channel.Channel connectivity.""" +import logging import threading import time import unittest @@ -142,4 +143,5 @@ class ChannelConnectivityTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py index 30b486079c..345460ef40 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py @@ -15,6 +15,7 @@ import threading import unittest +import logging import grpc from tests.unit.framework.common import test_constants @@ -85,4 +86,5 @@ class ChannelReadyFutureTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index 0b11f03adf..876d8e827e 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -15,6 +15,7 @@ import unittest +import logging import grpc from grpc import _grpcio_metadata @@ -117,4 +118,5 @@ class CompressionTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_credentials_test.py b/src/python/grpcio_tests/tests/unit/_credentials_test.py index f487fe66a2..be7378ecbc 100644 --- a/src/python/grpcio_tests/tests/unit/_credentials_test.py +++ b/src/python/grpcio_tests/tests/unit/_credentials_test.py @@ -14,6 +14,7 @@ """Tests of credentials.""" import unittest +import logging import grpc @@ -54,4 +55,5 @@ class CredentialsTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_empty_message_test.py b/src/python/grpcio_tests/tests/unit/_empty_message_test.py index c55ef61c13..3e8393b53c 100644 --- a/src/python/grpcio_tests/tests/unit/_empty_message_test.py +++ b/src/python/grpcio_tests/tests/unit/_empty_message_test.py @@ -13,6 +13,7 @@ # limitations under the License. import unittest +import logging import grpc @@ -118,4 +119,5 @@ class EmptyMessageTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py index 0a0239a63d..f489db12cb 100644 --- a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py +++ b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py @@ -16,6 +16,7 @@ import argparse import threading import time +import logging import grpc @@ -161,6 +162,7 @@ def infinite_request_iterator(): if __name__ == '__main__': + logging.basicConfig() parser = argparse.ArgumentParser() parser.add_argument('scenario', type=str) parser.add_argument( diff --git a/src/python/grpcio_tests/tests/unit/_exit_test.py b/src/python/grpcio_tests/tests/unit/_exit_test.py index f40f3ae07c..5226537579 100644 --- a/src/python/grpcio_tests/tests/unit/_exit_test.py +++ b/src/python/grpcio_tests/tests/unit/_exit_test.py @@ -26,6 +26,7 @@ import sys import threading import time import unittest +import logging from tests.unit import _exit_scenarios @@ -187,4 +188,5 @@ class ExitTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_interceptor_test.py b/src/python/grpcio_tests/tests/unit/_interceptor_test.py index 3d547b71cd..99db0ac58b 100644 --- a/src/python/grpcio_tests/tests/unit/_interceptor_test.py +++ b/src/python/grpcio_tests/tests/unit/_interceptor_test.py @@ -17,6 +17,7 @@ import collections import itertools import threading import unittest +import logging from concurrent import futures import grpc @@ -598,4 +599,5 @@ class InterceptorTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py index f153089a24..9771764635 100644 --- a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py @@ -14,6 +14,7 @@ """Test of RPCs made against gRPC Python's application-layer API.""" import unittest +import logging import grpc @@ -131,4 +132,5 @@ class InvalidMetadataTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py index 93a5fdf9ff..00949e2236 100644 --- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -15,6 +15,7 @@ import itertools import threading import unittest +import logging import grpc @@ -271,4 +272,5 @@ class InvocationDefectsTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_logging_test.py b/src/python/grpcio_tests/tests/unit/_logging_test.py new file mode 100644 index 0000000000..80b1f1b3c1 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_logging_test.py @@ -0,0 +1,73 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test of gRPC Python's interaction with the python logging module""" + +import unittest +import six +from six.moves import reload_module +import logging +import grpc +import functools +import sys + + +def patch_stderr(f): + + @functools.wraps(f) + def _impl(*args, **kwargs): + old_stderr = sys.stderr + sys.stderr = six.StringIO() + try: + f(*args, **kwargs) + finally: + sys.stderr = old_stderr + + return _impl + + +def isolated_logging(f): + + @functools.wraps(f) + def _impl(*args, **kwargs): + reload_module(logging) + reload_module(grpc) + try: + f(*args, **kwargs) + finally: + reload_module(logging) + + return _impl + + +class LoggingTest(unittest.TestCase): + + @isolated_logging + def test_logger_not_occupied(self): + self.assertEqual(0, len(logging.getLogger().handlers)) + + @patch_stderr + @isolated_logging + def test_handler_found(self): + self.assertEqual(0, len(sys.stderr.getvalue())) + + @isolated_logging + def test_can_configure_logger(self): + intended_stream = six.StringIO() + logging.basicConfig(stream=intended_stream) + self.assertEqual(1, len(logging.getLogger().handlers)) + self.assertIs(logging.getLogger().handlers[0].stream, intended_stream) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py index ca10bd4dab..0dafab827a 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py @@ -15,6 +15,7 @@ import threading import unittest +import logging import grpc @@ -656,4 +657,5 @@ class MetadataCodeDetailsTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_metadata_test.py b/src/python/grpcio_tests/tests/unit/_metadata_test.py index 5908421011..777ab683e3 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_test.py @@ -15,6 +15,7 @@ import unittest import weakref +import logging import grpc from grpc import _channel @@ -237,4 +238,5 @@ class MetadataTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py index a708d8d862..f6d4fcbd0a 100644 --- a/src/python/grpcio_tests/tests/unit/_reconnect_test.py +++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py @@ -15,6 +15,7 @@ import socket import time +import logging import unittest import grpc @@ -100,4 +101,5 @@ class ReconnectTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py index df4b129018..4fead8fcd5 100644 --- a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py +++ b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py @@ -15,6 +15,7 @@ import threading import unittest +import logging import grpc from grpc import _channel @@ -253,4 +254,5 @@ class ResourceExhaustedTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_rpc_test.py b/src/python/grpcio_tests/tests/unit/_rpc_test.py index 34e7831a98..a768d6c7c1 100644 --- a/src/python/grpcio_tests/tests/unit/_rpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_rpc_test.py @@ -16,6 +16,7 @@ import itertools import threading import unittest +import logging from concurrent import futures import grpc @@ -846,4 +847,5 @@ class RPCTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py b/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py index 0d78034b7b..e733a59a5b 100644 --- a/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py +++ b/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py @@ -35,6 +35,7 @@ import os import six import threading import unittest +import logging from concurrent import futures @@ -518,4 +519,5 @@ class ServerSSLCertReloadTestCertConfigReuse(_ServerSSLCertReloadTest): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_server_test.py b/src/python/grpcio_tests/tests/unit/_server_test.py index acf4a17921..2c8205f365 100644 --- a/src/python/grpcio_tests/tests/unit/_server_test.py +++ b/src/python/grpcio_tests/tests/unit/_server_test.py @@ -14,6 +14,7 @@ from concurrent import futures import unittest +import logging import grpc @@ -49,4 +50,5 @@ class ServerTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_session_cache_test.py b/src/python/grpcio_tests/tests/unit/_session_cache_test.py index b4e4670fa7..9223a6da03 100644 --- a/src/python/grpcio_tests/tests/unit/_session_cache_test.py +++ b/src/python/grpcio_tests/tests/unit/_session_cache_test.py @@ -15,6 +15,7 @@ import pickle import unittest +import logging import grpc from grpc import _channel @@ -142,4 +143,5 @@ class SSLSessionCacheTest(unittest.TestCase): if __name__ == '__main__': + logging.basicConfig() unittest.main(verbosity=2) diff --git a/templates/tools/dockerfile/bazel.include b/templates/tools/dockerfile/bazel.include index ee4f9556af..c7714f5663 100644 --- a/templates/tools/dockerfile/bazel.include +++ b/templates/tools/dockerfile/bazel.include @@ -1,5 +1,6 @@ #======================== # Bazel installation -RUN echo "deb [arch=amd64] http://storage.googleapis.com/bazel-apt stable jdk1.8" > /etc/apt/sources.list.d/bazel.list -RUN curl https://bazel.build/bazel-release.pub.gpg | apt-key add - -RUN apt-get -y update && apt-get -y install bazel=0.15.0 && apt-get clean + +RUN apt-get update && apt-get install -y wget && apt-get clean +RUN wget -q https://github.com/bazelbuild/bazel/releases/download/0.17.1/bazel-0.17.1-linux-x86_64 -O /usr/local/bin/bazel +RUN chmod 755 /usr/local/bin/bazel diff --git a/test/core/memory_usage/BUILD b/test/core/memory_usage/BUILD new file mode 100644 index 0000000000..f39c309e36 --- /dev/null +++ b/test/core/memory_usage/BUILD @@ -0,0 +1,60 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package") + +grpc_package(name = "test/core/memory_usage") + +licenses(["notice"]) # Apache v2 + +grpc_cc_library( + name = "client", + testonly = 1, + srcs = ["client.cc"], + deps = [ + "//:gpr", + "//:grpc", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + ], +) + +grpc_cc_library( + name = "server", + testonly = 1, + srcs = ["server.cc"], + deps = [ + "//:gpr", + "//:grpc", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + "//test/core/end2end:ssl_test_data" + ], +) + +grpc_cc_test( + name = "memory_usage_test", + srcs = ["memory_usage_test.cc"], + language = "C++", + data = [ + ":client", + ":server", + ], + deps = [ + "//:gpr", + "//:grpc", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/memory_usage/server.cc b/test/core/memory_usage/server.cc index 3e7bb7e11f..6f8a9bc0d4 100644 --- a/test/core/memory_usage/server.cc +++ b/test/core/memory_usage/server.cc @@ -292,9 +292,9 @@ int main(int argc, char** argv) { send_status(&calls[k]); } } + /* fallthrough */ // no break here since we want to continue to case // FLING_SERVER_SEND_STATUS_SNAPSHOT to destroy the snapshot call - /* fallthrough */ case FLING_SERVER_SEND_STATUS_SNAPSHOT: grpc_byte_buffer_destroy(payload_buffer); grpc_byte_buffer_destroy(terminal_buffer); diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 93e1e68654..fdc67969d9 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -33,6 +33,7 @@ #include <grpcpp/impl/codegen/method_handler_impl.h> #include <grpcpp/impl/codegen/proto_utils.h> #include <grpcpp/impl/codegen/rpc_method.h> +#include <grpcpp/impl/codegen/server_callback.h> #include <grpcpp/impl/codegen/service_type.h> #include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/stub_options.h> @@ -308,6 +309,80 @@ class ServiceA final { }; typedef WithAsyncMethod_MethodA1<WithAsyncMethod_MethodA2<WithAsyncMethod_MethodA3<WithAsyncMethod_MethodA4<Service > > > > AsyncService; template <class BaseClass> + class ExperimentalWithCallbackMethod_MethodA1 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithCallbackMethod_MethodA1() { + ::grpc::Service::experimental().MarkMethodCallback(0, + new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithCallbackMethod_MethodA1<BaseClass>, ::grpc::testing::Request, ::grpc::testing::Response>( + [this](::grpc::ServerContext* context, + const ::grpc::testing::Request* request, + ::grpc::testing::Response* response, + ::grpc::experimental::ServerCallbackRpcController* controller) { + this->MethodA1(context, request, response, controller); + }, this)); + } + ~ExperimentalWithCallbackMethod_MethodA1() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA1(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + virtual void MethodA1(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); } + }; + template <class BaseClass> + class ExperimentalWithCallbackMethod_MethodA2 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithCallbackMethod_MethodA2() { + } + ~ExperimentalWithCallbackMethod_MethodA2() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA2(::grpc::ServerContext* context, ::grpc::ServerReader< ::grpc::testing::Request>* reader, ::grpc::testing::Response* response) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + }; + template <class BaseClass> + class ExperimentalWithCallbackMethod_MethodA3 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithCallbackMethod_MethodA3() { + } + ~ExperimentalWithCallbackMethod_MethodA3() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA3(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::ServerWriter< ::grpc::testing::Response>* writer) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + }; + template <class BaseClass> + class ExperimentalWithCallbackMethod_MethodA4 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithCallbackMethod_MethodA4() { + } + ~ExperimentalWithCallbackMethod_MethodA4() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA4(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::grpc::testing::Response, ::grpc::testing::Request>* stream) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + }; + typedef ExperimentalWithCallbackMethod_MethodA1<ExperimentalWithCallbackMethod_MethodA2<ExperimentalWithCallbackMethod_MethodA3<ExperimentalWithCallbackMethod_MethodA4<Service > > > > ExperimentalCallbackService; + template <class BaseClass> class WithGenericMethod_MethodA1 : public BaseClass { private: void BaseClassMustBeDerivedFromService(const Service *service) {} @@ -456,6 +531,79 @@ class ServiceA final { } }; template <class BaseClass> + class ExperimentalWithRawCallbackMethod_MethodA1 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithRawCallbackMethod_MethodA1() { + ::grpc::Service::experimental().MarkMethodRawCallback(0, + new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithRawCallbackMethod_MethodA1<BaseClass>, ::grpc::ByteBuffer, ::grpc::ByteBuffer>( + [this](::grpc::ServerContext* context, + const ::grpc::ByteBuffer* request, + ::grpc::ByteBuffer* response, + ::grpc::experimental::ServerCallbackRpcController* controller) { + this->MethodA1(context, request, response, controller); + }, this)); + } + ~ExperimentalWithRawCallbackMethod_MethodA1() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA1(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + virtual void MethodA1(::grpc::ServerContext* context, const ::grpc::ByteBuffer* request, ::grpc::ByteBuffer* response, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); } + }; + template <class BaseClass> + class ExperimentalWithRawCallbackMethod_MethodA2 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithRawCallbackMethod_MethodA2() { + } + ~ExperimentalWithRawCallbackMethod_MethodA2() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA2(::grpc::ServerContext* context, ::grpc::ServerReader< ::grpc::testing::Request>* reader, ::grpc::testing::Response* response) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + }; + template <class BaseClass> + class ExperimentalWithRawCallbackMethod_MethodA3 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithRawCallbackMethod_MethodA3() { + } + ~ExperimentalWithRawCallbackMethod_MethodA3() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA3(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::ServerWriter< ::grpc::testing::Response>* writer) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + }; + template <class BaseClass> + class ExperimentalWithRawCallbackMethod_MethodA4 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithRawCallbackMethod_MethodA4() { + } + ~ExperimentalWithRawCallbackMethod_MethodA4() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodA4(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::grpc::testing::Response, ::grpc::testing::Request>* stream) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + }; + template <class BaseClass> class WithStreamedUnaryMethod_MethodA1 : public BaseClass { private: void BaseClassMustBeDerivedFromService(const Service *service) {} @@ -591,6 +739,32 @@ class ServiceB final { }; typedef WithAsyncMethod_MethodB1<Service > AsyncService; template <class BaseClass> + class ExperimentalWithCallbackMethod_MethodB1 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithCallbackMethod_MethodB1() { + ::grpc::Service::experimental().MarkMethodCallback(0, + new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithCallbackMethod_MethodB1<BaseClass>, ::grpc::testing::Request, ::grpc::testing::Response>( + [this](::grpc::ServerContext* context, + const ::grpc::testing::Request* request, + ::grpc::testing::Response* response, + ::grpc::experimental::ServerCallbackRpcController* controller) { + this->MethodB1(context, request, response, controller); + }, this)); + } + ~ExperimentalWithCallbackMethod_MethodB1() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodB1(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + virtual void MethodB1(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); } + }; + typedef ExperimentalWithCallbackMethod_MethodB1<Service > ExperimentalCallbackService; + template <class BaseClass> class WithGenericMethod_MethodB1 : public BaseClass { private: void BaseClassMustBeDerivedFromService(const Service *service) {} @@ -628,6 +802,31 @@ class ServiceB final { } }; template <class BaseClass> + class ExperimentalWithRawCallbackMethod_MethodB1 : public BaseClass { + private: + void BaseClassMustBeDerivedFromService(const Service *service) {} + public: + ExperimentalWithRawCallbackMethod_MethodB1() { + ::grpc::Service::experimental().MarkMethodRawCallback(0, + new ::grpc::internal::CallbackUnaryHandler< ExperimentalWithRawCallbackMethod_MethodB1<BaseClass>, ::grpc::ByteBuffer, ::grpc::ByteBuffer>( + [this](::grpc::ServerContext* context, + const ::grpc::ByteBuffer* request, + ::grpc::ByteBuffer* response, + ::grpc::experimental::ServerCallbackRpcController* controller) { + this->MethodB1(context, request, response, controller); + }, this)); + } + ~ExperimentalWithRawCallbackMethod_MethodB1() override { + BaseClassMustBeDerivedFromService(this); + } + // disable synchronous version of this method + ::grpc::Status MethodB1(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response) override { + abort(); + return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); + } + virtual void MethodB1(::grpc::ServerContext* context, const ::grpc::ByteBuffer* request, ::grpc::ByteBuffer* response, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); } + }; + template <class BaseClass> class WithStreamedUnaryMethod_MethodB1 : public BaseClass { private: void BaseClassMustBeDerivedFromService(const Service *service) {} diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 62a85641c7..7ffc610ce2 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -41,13 +41,38 @@ namespace grpc { namespace testing { namespace { -class ClientCallbackEnd2endTest : public ::testing::Test { +class TestScenario { + public: + TestScenario(bool serve_callback) : callback_server(serve_callback) {} + void Log() const; + bool callback_server; +}; + +static std::ostream& operator<<(std::ostream& out, + const TestScenario& scenario) { + return out << "TestScenario{callback_server=" + << (scenario.callback_server ? "true" : "false") << "}"; +} + +void TestScenario::Log() const { + std::ostringstream out; + out << *this; + gpr_log(GPR_DEBUG, "%s", out.str().c_str()); +} + +class ClientCallbackEnd2endTest + : public ::testing::TestWithParam<TestScenario> { protected: - ClientCallbackEnd2endTest() {} + ClientCallbackEnd2endTest() { GetParam().Log(); } void SetUp() override { ServerBuilder builder; - builder.RegisterService(&service_); + + if (!GetParam().callback_server) { + builder.RegisterService(&service_); + } else { + builder.RegisterService(&callback_service_); + } server_ = builder.BuildAndStart(); is_server_started_ = true; @@ -151,37 +176,38 @@ class ClientCallbackEnd2endTest : public ::testing::Test { std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::unique_ptr<grpc::GenericStub> generic_stub_; TestServiceImpl service_; + CallbackTestServiceImpl callback_service_; std::unique_ptr<Server> server_; }; -TEST_F(ClientCallbackEnd2endTest, SimpleRpc) { +TEST_P(ClientCallbackEnd2endTest, SimpleRpc) { ResetStub(); SendRpcs(1, false); } -TEST_F(ClientCallbackEnd2endTest, SequentialRpcs) { +TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) { ResetStub(); SendRpcs(10, false); } -TEST_F(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) { +TEST_P(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) { ResetStub(); SendRpcs(10, true); } -TEST_F(ClientCallbackEnd2endTest, SequentialGenericRpcs) { +TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) { ResetStub(); SendRpcsGeneric(10, false); } #if GRPC_ALLOW_EXCEPTIONS -TEST_F(ClientCallbackEnd2endTest, ExceptingRpc) { +TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) { ResetStub(); SendRpcsGeneric(10, true); } #endif -TEST_F(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { +TEST_P(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { ResetStub(); std::vector<std::thread> threads; threads.reserve(10); @@ -193,7 +219,7 @@ TEST_F(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { } } -TEST_F(ClientCallbackEnd2endTest, MultipleRpcs) { +TEST_P(ClientCallbackEnd2endTest, MultipleRpcs) { ResetStub(); std::vector<std::thread> threads; threads.reserve(10); @@ -205,7 +231,7 @@ TEST_F(ClientCallbackEnd2endTest, MultipleRpcs) { } } -TEST_F(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { +TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { ResetStub(); EchoRequest request; EchoResponse response; @@ -230,6 +256,11 @@ TEST_F(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { } } +TestScenario scenarios[] = {TestScenario{false}, TestScenario{true}}; + +INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest, + ::testing::ValuesIn(scenarios)); + } // namespace } // namespace testing } // namespace grpc diff --git a/test/cpp/end2end/client_interceptors_end2end_test.cc b/test/cpp/end2end/client_interceptors_end2end_test.cc index 5720e87478..e8ffd46344 100644 --- a/test/cpp/end2end/client_interceptors_end2end_test.cc +++ b/test/cpp/end2end/client_interceptors_end2end_test.cc @@ -152,7 +152,9 @@ class HijackingInterceptor : public experimental::Interceptor { EchoRequest req; auto* buffer = methods->GetSendMessage(); auto copied_buffer = *buffer; - SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req); + EXPECT_TRUE( + SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req) + .ok()); EXPECT_EQ(req.message(), "Hello"); } if (methods->QueryInterceptionHookPoint( @@ -255,7 +257,9 @@ class HijackingInterceptorMakesAnotherCall : public experimental::Interceptor { EchoRequest req; auto* buffer = methods->GetSendMessage(); auto copied_buffer = *buffer; - SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req); + EXPECT_TRUE( + SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req) + .ok()); EXPECT_EQ(req.message(), "Hello"); req_ = req; stub_ = grpc::testing::EchoTestService::NewStub( @@ -367,7 +371,9 @@ class LoggingInterceptor : public experimental::Interceptor { EchoRequest req; auto* buffer = methods->GetSendMessage(); auto copied_buffer = *buffer; - SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req); + EXPECT_TRUE( + SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req) + .ok()); EXPECT_TRUE(req.message().find("Hello") == 0); } if (methods->QueryInterceptionHookPoint( diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 77c25c1416..9218c85717 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -42,6 +42,9 @@ #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/security/credentials/fake/fake_credentials.h" +#include "src/cpp/client/secure_credentials.h" +#include "src/cpp/server/secure_server_credentials.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" @@ -207,19 +210,22 @@ class ClientLbEnd2endTest : public ::testing::Test { } // else, default to pick first args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_.get()); - return CreateCustomChannel("fake:///", InsecureChannelCredentials(), args); + std::shared_ptr<ChannelCredentials> creds(new SecureChannelCredentials( + grpc_fake_transport_security_credentials_create())); + return CreateCustomChannel("fake:///", std::move(creds), args); } bool SendRpc( const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub, EchoResponse* response = nullptr, int timeout_ms = 1000, - Status* result = nullptr) { + Status* result = nullptr, bool wait_for_ready = false) { const bool local_response = (response == nullptr); if (local_response) response = new EchoResponse; EchoRequest request; request.set_message(kRequestMessage_); ClientContext context; context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); + if (wait_for_ready) context.set_wait_for_ready(true); Status status = stub->Echo(&context, request, response); if (result != nullptr) *result = status; if (local_response) delete response; @@ -228,10 +234,11 @@ class ClientLbEnd2endTest : public ::testing::Test { void CheckRpcSendOk( const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub, - const grpc_core::DebugLocation& location) { + const grpc_core::DebugLocation& location, bool wait_for_ready = false) { EchoResponse response; Status status; - const bool success = SendRpc(stub, &response, 2000, &status); + const bool success = + SendRpc(stub, &response, 2000, &status, wait_for_ready); ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line() << "\n" << "Error: " << status.error_message() << " " @@ -275,8 +282,9 @@ class ClientLbEnd2endTest : public ::testing::Test { std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; - builder.AddListeningPort(server_address.str(), - InsecureServerCredentials()); + std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials( + grpc_fake_transport_security_server_credentials_create())); + builder.AddListeningPort(server_address.str(), std::move(creds)); builder.RegisterService(&service_); server_ = builder.BuildAndStart(); std::lock_guard<std::mutex> lock(*mu); @@ -306,7 +314,7 @@ class ClientLbEnd2endTest : public ::testing::Test { if (ignore_failure) { SendRpc(stub); } else { - CheckRpcSendOk(stub, location); + CheckRpcSendOk(stub, location, true); } } while (servers_[server_idx]->service_.request_count() == 0); ResetCounters(); @@ -518,7 +526,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { do { channel_state = channel->GetState(true /* try to connect */); } while (channel_state == GRPC_CHANNEL_READY); - GPR_ASSERT(channel_state != GRPC_CHANNEL_READY); + ASSERT_NE(channel_state, GRPC_CHANNEL_READY); servers_[0]->service_.ResetCounters(); // Next update introduces servers_[1], making the channel recover. @@ -835,7 +843,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { do { channel_state = channel->GetState(true /* try to connect */); } while (channel_state == GRPC_CHANNEL_READY); - GPR_ASSERT(channel_state != GRPC_CHANNEL_READY); + ASSERT_NE(channel_state, GRPC_CHANNEL_READY); servers_[0]->service_.ResetCounters(); // Next update introduces servers_[1], making the channel recover. @@ -844,7 +852,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { SetNextResolution(ports); WaitForServer(stub, 1, DEBUG_LOCATION); channel_state = channel->GetState(false /* try to connect */); - GPR_ASSERT(channel_state == GRPC_CHANNEL_READY); + ASSERT_EQ(channel_state, GRPC_CHANNEL_READY); // Check LB policy name for the channel. EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName()); @@ -954,7 +962,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) { if (SendRpc(stub)) break; now = gpr_now(GPR_CLOCK_MONOTONIC); } - GPR_ASSERT(gpr_time_cmp(deadline, now) > 0); + ASSERT_GT(gpr_time_cmp(deadline, now), 0); } TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index b69b861fcf..6ce0696114 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -539,13 +539,15 @@ class GrpclbEnd2endTest : public ::testing::Test { balancers_.at(i)->add_response(response, delay_ms); } - Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000) { + Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000, + bool wait_for_ready = false) { const bool local_response = (response == nullptr); if (local_response) response = new EchoResponse; EchoRequest request; request.set_message(kRequestMessage_); ClientContext context; context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms)); + if (wait_for_ready) context.set_wait_for_ready(true); Status status = stub_->Echo(&context, request, response); if (local_response) delete response; return status; @@ -1366,7 +1368,7 @@ TEST_F(SingleBalancerTest, DropAllFirst) { {}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses}, {"load_balancing", num_of_drop_by_load_balancing_addresses}}), 0); - const Status status = SendRpc(); + const Status status = SendRpc(nullptr, 1000, true); EXPECT_FALSE(status.ok()); EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); } @@ -1391,7 +1393,7 @@ TEST_F(SingleBalancerTest, DropAll) { // fail. Status status; do { - status = SendRpc(); + status = SendRpc(nullptr, 1000, true); } while (status.ok()); EXPECT_FALSE(status.ok()); EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy"); diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc index 44ba2a6009..e08a4493d3 100644 --- a/test/cpp/end2end/server_interceptors_end2end_test.cc +++ b/test/cpp/end2end/server_interceptors_end2end_test.cc @@ -103,7 +103,9 @@ class LoggingInterceptor : public experimental::Interceptor { EchoRequest req; auto* buffer = methods->GetSendMessage(); auto copied_buffer = *buffer; - SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req); + EXPECT_TRUE( + SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req) + .ok()); EXPECT_TRUE(req.message().find("Hello") == 0); } if (methods->QueryInterceptionHookPoint( diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 3c3a5d9cd4..605356724f 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -165,6 +165,138 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, return Status::OK; } +void CallbackTestServiceImpl::Echo( + ServerContext* context, const EchoRequest* request, EchoResponse* response, + experimental::ServerCallbackRpcController* controller) { + // A bit of sleep to make sure that short deadline tests fail + if (request->has_param() && request->param().server_sleep_us() > 0) { + // Set an alarm for that much time + alarm_.experimental().Set( + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_micros(request->param().server_sleep_us(), + GPR_TIMESPAN)), + [this, context, request, response, controller](bool) { + EchoNonDelayed(context, request, response, controller); + }); + } else { + EchoNonDelayed(context, request, response, controller); + } +} + +void CallbackTestServiceImpl::EchoNonDelayed( + ServerContext* context, const EchoRequest* request, EchoResponse* response, + experimental::ServerCallbackRpcController* controller) { + if (request->has_param() && request->param().server_die()) { + gpr_log(GPR_ERROR, "The request should not reach application handler."); + GPR_ASSERT(0); + } + if (request->has_param() && request->param().has_expected_error()) { + const auto& error = request->param().expected_error(); + controller->Finish(Status(static_cast<StatusCode>(error.code()), + error.error_message(), + error.binary_error_details())); + } + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + if (server_try_cancel > DO_NOT_CANCEL) { + // Since this is a unary RPC, by the time this server handler is called, + // the 'request' message is already read from the client. So the scenarios + // in server_try_cancel don't make much sense. Just cancel the RPC as long + // as server_try_cancel is not DO_NOT_CANCEL + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); + // Now wait until it's really canceled + + std::function<void(bool)> recurrence = [this, context, controller, + &recurrence](bool) { + if (!context->IsCancelled()) { + alarm_.experimental().Set( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000, GPR_TIMESPAN)), + recurrence); + } else { + controller->Finish(Status::CANCELLED); + } + }; + recurrence(true); + return; + } + + response->set_message(request->message()); + MaybeEchoDeadline(context, request, response); + if (host_) { + response->mutable_param()->set_host(*host_); + } + if (request->has_param() && request->param().client_cancel_after_us()) { + { + std::unique_lock<std::mutex> lock(mu_); + signal_client_ = true; + } + std::function<void(bool)> recurrence = [this, context, request, controller, + &recurrence](bool) { + if (!context->IsCancelled()) { + alarm_.experimental().Set( + gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().client_cancel_after_us(), + GPR_TIMESPAN)), + recurrence); + } else { + controller->Finish(Status::CANCELLED); + } + }; + recurrence(true); + return; + } else if (request->has_param() && + request->param().server_cancel_after_us()) { + alarm_.experimental().Set( + gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().client_cancel_after_us(), + GPR_TIMESPAN)), + [controller](bool) { controller->Finish(Status::CANCELLED); }); + return; + } else if (!request->has_param() || + !request->param().skip_cancelled_check()) { + EXPECT_FALSE(context->IsCancelled()); + } + + if (request->has_param() && request->param().echo_metadata()) { + const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = + context->client_metadata(); + for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator + iter = client_metadata.begin(); + iter != client_metadata.end(); ++iter) { + context->AddTrailingMetadata(ToString(iter->first), + ToString(iter->second)); + } + // Terminate rpc with error and debug info in trailer. + if (request->param().debug_info().stack_entries_size() || + !request->param().debug_info().detail().empty()) { + grpc::string serialized_debug_info = + request->param().debug_info().SerializeAsString(); + context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info); + controller->Finish(Status::CANCELLED); + } + } + if (request->has_param() && + (request->param().expected_client_identity().length() > 0 || + request->param().check_auth_context())) { + CheckServerAuthContext(context, + request->param().expected_transport_security_type(), + request->param().expected_client_identity()); + } + if (request->has_param() && request->param().response_message_length() > 0) { + response->set_message( + grpc::string(request->param().response_message_length(), '\0')); + } + if (request->has_param() && request->param().echo_peer()) { + response->mutable_param()->set_peer(context->peer()); + } + controller->Finish(Status::OK); +} + // Unimplemented is left unimplemented to test the returned error. Status TestServiceImpl::RequestStream(ServerContext* context, @@ -332,7 +464,8 @@ Status TestServiceImpl::BidiStream( return Status::OK; } -int TestServiceImpl::GetIntValueFromMetadata( +namespace { +int GetIntValueFromMetadataHelper( const char* key, const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, int default_value) { @@ -344,6 +477,21 @@ int TestServiceImpl::GetIntValueFromMetadata( return default_value; } +}; // namespace + +int TestServiceImpl::GetIntValueFromMetadata( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value) { + return GetIntValueFromMetadataHelper(key, metadata, default_value); +} + +int CallbackTestServiceImpl::GetIntValueFromMetadata( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value) { + return GetIntValueFromMetadataHelper(key, metadata, default_value); +} void TestServiceImpl::ServerTryCancel(ServerContext* context) { EXPECT_FALSE(context->IsCancelled()); diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 052543a03e..ddfe94487e 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -22,6 +22,7 @@ #include <mutex> #include <grpc/grpc.h> +#include <grpcpp/alarm.h> #include <grpcpp/server_context.h> #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -78,7 +79,39 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { void ServerTryCancel(ServerContext* context); + bool signal_client_; + std::mutex mu_; + std::unique_ptr<grpc::string> host_; +}; + +class CallbackTestServiceImpl + : public ::grpc::testing::EchoTestService::ExperimentalCallbackService { + public: + CallbackTestServiceImpl() : signal_client_(false), host_() {} + explicit CallbackTestServiceImpl(const grpc::string& host) + : signal_client_(false), host_(new grpc::string(host)) {} + + void Echo(ServerContext* context, const EchoRequest* request, + EchoResponse* response, + experimental::ServerCallbackRpcController* controller) override; + + // Unimplemented is left unimplemented to test the returned error. + bool signal_client() { + std::unique_lock<std::mutex> lock(mu_); + return signal_client_; + } + private: + void EchoNonDelayed(ServerContext* context, const EchoRequest* request, + EchoResponse* response, + experimental::ServerCallbackRpcController* controller); + + int GetIntValueFromMetadata( + const char* key, + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + int default_value); + + Alarm alarm_; bool signal_client_; std::mutex mu_; std::unique_ptr<grpc::string> host_; diff --git a/tools/dockerfile/test/bazel/Dockerfile b/tools/dockerfile/test/bazel/Dockerfile index 4f913dc396..0aa6209f4f 100644 --- a/tools/dockerfile/test/bazel/Dockerfile +++ b/tools/dockerfile/test/bazel/Dockerfile @@ -44,9 +44,10 @@ RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.5.2.post1 six==1.10.0 t #======================== # Bazel installation -RUN echo "deb [arch=amd64] http://storage.googleapis.com/bazel-apt stable jdk1.8" > /etc/apt/sources.list.d/bazel.list -RUN curl https://bazel.build/bazel-release.pub.gpg | apt-key add - -RUN apt-get -y update && apt-get -y install bazel=0.15.0 && apt-get clean + +RUN apt-get update && apt-get install -y wget && apt-get clean +RUN wget -q https://github.com/bazelbuild/bazel/releases/download/0.17.1/bazel-0.17.1-linux-x86_64 -O /usr/local/bin/bazel +RUN chmod 755 /usr/local/bin/bazel RUN mkdir -p /var/local/jenkins diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index f2bb5df7d3..392113c284 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -971,6 +971,7 @@ include/grpcpp/impl/codegen/rpc_method.h \ include/grpcpp/impl/codegen/rpc_service_method.h \ include/grpcpp/impl/codegen/security/auth_context.h \ include/grpcpp/impl/codegen/serialization_traits.h \ +include/grpcpp/impl/codegen/server_callback.h \ include/grpcpp/impl/codegen/server_context.h \ include/grpcpp/impl/codegen/server_interceptor.h \ include/grpcpp/impl/codegen/server_interface.h \ @@ -1008,6 +1009,7 @@ include/grpcpp/support/client_callback.h \ include/grpcpp/support/config.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ +include/grpcpp/support/server_callback.h \ include/grpcpp/support/slice.h \ include/grpcpp/support/status.h \ include/grpcpp/support/status_code_enum.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 9af44e75ff..7f5f9bc0db 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -973,6 +973,7 @@ include/grpcpp/impl/codegen/rpc_method.h \ include/grpcpp/impl/codegen/rpc_service_method.h \ include/grpcpp/impl/codegen/security/auth_context.h \ include/grpcpp/impl/codegen/serialization_traits.h \ +include/grpcpp/impl/codegen/server_callback.h \ include/grpcpp/impl/codegen/server_context.h \ include/grpcpp/impl/codegen/server_interceptor.h \ include/grpcpp/impl/codegen/server_interface.h \ @@ -1010,6 +1011,7 @@ include/grpcpp/support/client_callback.h \ include/grpcpp/support/config.h \ include/grpcpp/support/proto_buffer_reader.h \ include/grpcpp/support/proto_buffer_writer.h \ +include/grpcpp/support/server_callback.h \ include/grpcpp/support/slice.h \ include/grpcpp/support/status.h \ include/grpcpp/support/status_code_enum.h \ diff --git a/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh b/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh index c5eb9b398b..74778d9d29 100755 --- a/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh +++ b/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh @@ -21,11 +21,10 @@ set -ex mkdir -p ${KOKORO_KEYSTORE_DIR} cp ${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db331.json ${KOKORO_KEYSTORE_DIR}/4321_grpc-testing-service -# Get latest release of bazel -# TODO(jtattermusch): replace by open-source installation of bazel -temp_dir=$(mktemp -d) -ln -f "${KOKORO_GFILE_DIR}/bazel-latest-release" ${temp_dir}/bazel -chmod 755 "${KOKORO_GFILE_DIR}/bazel-latest-release" +# Download bazel +temp_dir="$(mktemp -d)" +wget -q https://github.com/bazelbuild/bazel/releases/download/0.17.1/bazel-0.17.1-linux-x86_64 -O "${temp_dir}/bazel" +chmod 755 "${temp_dir}/bazel" export PATH="${temp_dir}:${PATH}" # This should show ${temp_dir}/bazel which bazel diff --git a/tools/internal_ci/macos/grpc_basictests_objc_dbg.cfg b/tools/internal_ci/macos/grpc_basictests_objc_dbg.cfg new file mode 100644 index 0000000000..068961234b --- /dev/null +++ b/tools/internal_ci/macos/grpc_basictests_objc_dbg.cfg @@ -0,0 +1,31 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Config file for the internal CI (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc/tools/internal_ci/macos/grpc_run_tests_matrix.sh" +gfile_resources: "/bigstore/grpc-testing-secrets/gcp_credentials/GrpcTesting-d0eeee2db331.json" +timeout_mins: 120 +action { + define_artifacts { + regex: "**/*sponge_log.*" + regex: "github/grpc/reports/**" + } +} + +env_vars { + key: "RUN_TESTS_FLAGS" + value: "-f basictests macos objc dbg --internal_ci -j 1 --inner_jobs 4 --bq_result_table aggregate_results" +} diff --git a/tools/internal_ci/macos/grpc_basictests_objc_opt.cfg b/tools/internal_ci/macos/grpc_basictests_objc_opt.cfg new file mode 100644 index 0000000000..927fa50deb --- /dev/null +++ b/tools/internal_ci/macos/grpc_basictests_objc_opt.cfg @@ -0,0 +1,31 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Config file for the internal CI (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc/tools/internal_ci/macos/grpc_run_tests_matrix.sh" +gfile_resources: "/bigstore/grpc-testing-secrets/gcp_credentials/GrpcTesting-d0eeee2db331.json" +timeout_mins: 120 +action { + define_artifacts { + regex: "**/*sponge_log.*" + regex: "github/grpc/reports/**" + } +} + +env_vars { + key: "RUN_TESTS_FLAGS" + value: "-f basictests macos objc opt --internal_ci -j 1 --inner_jobs 4 --bq_result_table aggregate_results" +} diff --git a/tools/internal_ci/macos/pull_request/grpc_basictests_objc_dbg.cfg b/tools/internal_ci/macos/pull_request/grpc_basictests_objc_dbg.cfg new file mode 100644 index 0000000000..775fd355a5 --- /dev/null +++ b/tools/internal_ci/macos/pull_request/grpc_basictests_objc_dbg.cfg @@ -0,0 +1,31 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Config file for the internal CI (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc/tools/internal_ci/macos/grpc_run_tests_matrix.sh" +gfile_resources: "/bigstore/grpc-testing-secrets/gcp_credentials/GrpcTesting-d0eeee2db331.json" +timeout_mins: 120 +action { + define_artifacts { + regex: "**/*sponge_log.*" + regex: "github/grpc/reports/**" + } +} + +env_vars { + key: "RUN_TESTS_FLAGS" + value: "-f basictests macos objc dbg --internal_ci -j 1 --inner_jobs 4 --max_time=3600" +} diff --git a/tools/internal_ci/macos/pull_request/grpc_basictests_objc_opt.cfg b/tools/internal_ci/macos/pull_request/grpc_basictests_objc_opt.cfg new file mode 100644 index 0000000000..652ef1bb77 --- /dev/null +++ b/tools/internal_ci/macos/pull_request/grpc_basictests_objc_opt.cfg @@ -0,0 +1,31 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Config file for the internal CI (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc/tools/internal_ci/macos/grpc_run_tests_matrix.sh" +gfile_resources: "/bigstore/grpc-testing-secrets/gcp_credentials/GrpcTesting-d0eeee2db331.json" +timeout_mins: 120 +action { + define_artifacts { + regex: "**/*sponge_log.*" + regex: "github/grpc/reports/**" + } +} + +env_vars { + key: "RUN_TESTS_FLAGS" + value: "-f basictests macos objc opt --internal_ci -j 1 --inner_jobs 4 --max_time=3600" +} diff --git a/tools/interop_matrix/client_matrix.py b/tools/interop_matrix/client_matrix.py index c4935e8eee..ff3344cd95 100644 --- a/tools/interop_matrix/client_matrix.py +++ b/tools/interop_matrix/client_matrix.py @@ -204,6 +204,9 @@ LANG_RELEASE_MATRIX = { { 'v1.15.0': None }, + { + 'v1.16.1': None + }, ], 'python': [ { diff --git a/tools/remote_build/README.md b/tools/remote_build/README.md index 7492de75a5..c4d03547a2 100644 --- a/tools/remote_build/README.md +++ b/tools/remote_build/README.md @@ -28,3 +28,6 @@ Sanitizer runs (asan, msan, tsan, ubsan): # manual run of bazel tests remotely on Foundry with given sanitizer bazel --bazelrc=tools/remote_build/manual.bazelrc test --config=asan //test/... ``` + +Available command line options can be found in +[Bazel command line reference](https://docs.bazel.build/versions/master/command-line-reference.html) diff --git a/tools/remote_build/kokoro.bazelrc b/tools/remote_build/kokoro.bazelrc index b06fb83d11..11462bd301 100644 --- a/tools/remote_build/kokoro.bazelrc +++ b/tools/remote_build/kokoro.bazelrc @@ -36,5 +36,3 @@ build --test_env=USER=anon build --jobs=200 build --test_output=errors build --keep_going=true -build --remote_accept_cached=true -build --remote_local_fallback=true diff --git a/tools/remote_build/manual.bazelrc b/tools/remote_build/manual.bazelrc index d67bafedf3..b4fdc70637 100644 --- a/tools/remote_build/manual.bazelrc +++ b/tools/remote_build/manual.bazelrc @@ -40,3 +40,6 @@ build --jobs=100 # TODO(jtattermusch): this should be part of the common config # but currently sanitizers use different test_timeout values build --test_timeout=300,450,1200,3600 + +# print output for tests that fail (default is "summary") +build --test_output=errors diff --git a/tools/remote_build/rbe_common.bazelrc b/tools/remote_build/rbe_common.bazelrc index 9db867948f..e8c7f0b9cb 100644 --- a/tools/remote_build/rbe_common.bazelrc +++ b/tools/remote_build/rbe_common.bazelrc @@ -49,12 +49,6 @@ build:asan --copt=-gmlt # TODO(jtattermusch): use more reasonable test timeout build:asan --test_timeout=3600 build:asan --test_tag_filters=-qps_json_driver,-json_run_localhost -# TODO: revisit these from bazel.rc: -#build:asan --copt=-O0 -#build:asan --copt=-fno-omit-frame-pointer -#build:asan --copt=-DGPR_NO_DIRECT_SYSCALLS -#build:asan --action_env=ASAN_OPTIONS=detect_leaks=1:color=always -#build:asan --action_env=LSAN_OPTIONS=suppressions=test/core/util/lsan_suppressions.txt:report_objects=1 # memory sanitizer: most settings are already in %workspace%/.bazelrc # we only need a few additional ones that are Foundry specific @@ -62,7 +56,7 @@ build:msan --copt=-gmlt # TODO(jtattermusch): use more reasonable test timeout build:msan --test_timeout=3600 build:msan --cxxopt=--stdlib=libc++ -# TODO(jtattermusch): setting LD_LIBRARY_PATH is necessary +# setting LD_LIBRARY_PATH is necessary # to avoid "libc++.so.1: cannot open shared object file" build:msan --action_env=LD_LIBRARY_PATH=/usr/local/lib build:msan --host_crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.16.1/default:toolchain @@ -75,23 +69,14 @@ build:tsan --copt=-gmlt # TODO(jtattermusch): use more reasonable test timeout build:tsan --test_timeout=3600 build:tsan --test_tag_filters=-qps_json_driver,-json_run_localhost -# TODO: revisit these from bazel.rc: -#build:tsan --copt=-fno-omit-frame-pointer -#build:tsan --copt=-DGPR_NO_DIRECT_SYSCALLS -#build:tsan --copt=-DGRPC_TSAN # undefined behavior sanitizer: most settings are already in %workspace%/.bazelrc # we only need a few additional ones that are Foundry specific build:ubsan --copt=-gmlt # TODO(jtattermusch): use more reasonable test timeout build:ubsan --test_timeout=3600 -# TODO: revisit these from bazel.rc: -#build:ubsan --copt=-fno-omit-frame-pointer -#build:ubsan --copt=-DGRPC_UBSAN -#build:ubsan --copt=-DNDEBUG -#build:ubsan --copt=-fno-sanitize=function,vptr -# TODO: revisit this from grpc_ubsan_on_foundry.sh: -#--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/experimental/ubuntu16_04_clang/1.0/bazel_0.15.0/ubsan:toolchain +# override the config-agnostic crosstool_top +--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/experimental/ubuntu16_04_clang/1.0/bazel_0.16.1/ubsan:toolchain # TODO(jtattermusch): remove this once Foundry adds the env to the docker image. # ubsan needs symbolizer to work properly, otherwise the suppression file doesn't work # and we get test failures. diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index f045c2719b..f79b391681 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -11221,6 +11221,7 @@ "include/grpcpp/impl/codegen/rpc_service_method.h", "include/grpcpp/impl/codegen/security/auth_context.h", "include/grpcpp/impl/codegen/serialization_traits.h", + "include/grpcpp/impl/codegen/server_callback.h", "include/grpcpp/impl/codegen/server_context.h", "include/grpcpp/impl/codegen/server_interceptor.h", "include/grpcpp/impl/codegen/server_interface.h", @@ -11296,6 +11297,7 @@ "include/grpcpp/impl/codegen/rpc_service_method.h", "include/grpcpp/impl/codegen/security/auth_context.h", "include/grpcpp/impl/codegen/serialization_traits.h", + "include/grpcpp/impl/codegen/server_callback.h", "include/grpcpp/impl/codegen/server_context.h", "include/grpcpp/impl/codegen/server_interceptor.h", "include/grpcpp/impl/codegen/server_interface.h", @@ -11445,6 +11447,7 @@ "include/grpcpp/support/config.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", + "include/grpcpp/support/server_callback.h", "include/grpcpp/support/slice.h", "include/grpcpp/support/status.h", "include/grpcpp/support/status_code_enum.h", @@ -11549,6 +11552,7 @@ "include/grpcpp/support/config.h", "include/grpcpp/support/proto_buffer_reader.h", "include/grpcpp/support/proto_buffer_writer.h", + "include/grpcpp/support/server_callback.h", "include/grpcpp/support/slice.h", "include/grpcpp/support/status.h", "include/grpcpp/support/status_code_enum.h", |