diff options
21 files changed, 217 insertions, 109 deletions
diff --git a/include/grpcpp/impl/codegen/channel_interface.h b/include/grpcpp/impl/codegen/channel_interface.h index 728a7b9049..5353f5feaa 100644 --- a/include/grpcpp/impl/codegen/channel_interface.h +++ b/include/grpcpp/impl/codegen/channel_interface.h @@ -21,7 +21,6 @@ #include <grpc/impl/codegen/connectivity_state.h> #include <grpcpp/impl/codegen/call.h> -#include <grpcpp/impl/codegen/client_context.h> #include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/time.h> diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h index 142cfa35dd..0a71f3d9b6 100644 --- a/include/grpcpp/impl/codegen/client_context.h +++ b/include/grpcpp/impl/codegen/client_context.h @@ -46,6 +46,7 @@ #include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/create_auth_context.h> #include <grpcpp/impl/codegen/metadata_map.h> +#include <grpcpp/impl/codegen/rpc_method.h> #include <grpcpp/impl/codegen/security/auth_context.h> #include <grpcpp/impl/codegen/slice.h> #include <grpcpp/impl/codegen/status.h> @@ -418,12 +419,13 @@ class ClientContext { void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel); experimental::ClientRpcInfo* set_client_rpc_info( - const char* method, grpc::ChannelInterface* channel, + const char* method, internal::RpcMethod::RpcType type, + grpc::ChannelInterface* channel, const std::vector< std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>& creators, size_t interceptor_pos) { - rpc_info_ = experimental::ClientRpcInfo(this, method, channel); + rpc_info_ = experimental::ClientRpcInfo(this, type, method, channel); rpc_info_.RegisterInterceptors(creators, interceptor_pos); return &rpc_info_; } diff --git a/include/grpcpp/impl/codegen/client_interceptor.h b/include/grpcpp/impl/codegen/client_interceptor.h index f69c99ab22..2bae11a251 100644 --- a/include/grpcpp/impl/codegen/client_interceptor.h +++ b/include/grpcpp/impl/codegen/client_interceptor.h @@ -23,6 +23,7 @@ #include <vector> #include <grpcpp/impl/codegen/interceptor.h> +#include <grpcpp/impl/codegen/rpc_method.h> #include <grpcpp/impl/codegen/string_ref.h> namespace grpc { @@ -52,23 +53,56 @@ extern experimental::ClientInterceptorFactoryInterface* namespace experimental { class ClientRpcInfo { public: - ClientRpcInfo() {} + // TODO(yashykt): Stop default-constructing ClientRpcInfo and remove UNKNOWN + // from the list of possible Types. + enum class Type { + UNARY, + CLIENT_STREAMING, + SERVER_STREAMING, + BIDI_STREAMING, + UNKNOWN // UNKNOWN is not API and will be removed later + }; ~ClientRpcInfo(){}; ClientRpcInfo(const ClientRpcInfo&) = delete; ClientRpcInfo(ClientRpcInfo&&) = default; - ClientRpcInfo& operator=(ClientRpcInfo&&) = default; // Getter methods - const char* method() { return method_; } + const char* method() const { return method_; } ChannelInterface* channel() { return channel_; } grpc::ClientContext* client_context() { return ctx_; } + Type type() const { return type_; } private: - ClientRpcInfo(grpc::ClientContext* ctx, const char* method, - grpc::ChannelInterface* channel) - : ctx_(ctx), method_(method), channel_(channel) {} + static_assert(Type::UNARY == + static_cast<Type>(internal::RpcMethod::NORMAL_RPC), + "violated expectation about Type enum"); + static_assert(Type::CLIENT_STREAMING == + static_cast<Type>(internal::RpcMethod::CLIENT_STREAMING), + "violated expectation about Type enum"); + static_assert(Type::SERVER_STREAMING == + static_cast<Type>(internal::RpcMethod::SERVER_STREAMING), + "violated expectation about Type enum"); + static_assert(Type::BIDI_STREAMING == + static_cast<Type>(internal::RpcMethod::BIDI_STREAMING), + "violated expectation about Type enum"); + + // Default constructor should only be used by ClientContext + ClientRpcInfo() = default; + + // Constructor will only be called from ClientContext + ClientRpcInfo(grpc::ClientContext* ctx, internal::RpcMethod::RpcType type, + const char* method, grpc::ChannelInterface* channel) + : ctx_(ctx), + type_(static_cast<Type>(type)), + method_(method), + channel_(channel) {} + + // Move assignment should only be used by ClientContext + // TODO(yashykt): Delete move assignment + ClientRpcInfo& operator=(ClientRpcInfo&&) = default; + // Runs interceptor at pos \a pos. void RunInterceptor( experimental::InterceptorBatchMethods* interceptor_methods, size_t pos) { @@ -97,6 +131,8 @@ class ClientRpcInfo { } grpc::ClientContext* ctx_ = nullptr; + // TODO(yashykt): make type_ const once move-assignment is deleted + Type type_{Type::UNKNOWN}; const char* method_ = nullptr; grpc::ChannelInterface* channel_ = nullptr; std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_; diff --git a/include/grpcpp/impl/codegen/server_context.h b/include/grpcpp/impl/codegen/server_context.h index 4a5f9e2dd9..ccb5925e7d 100644 --- a/include/grpcpp/impl/codegen/server_context.h +++ b/include/grpcpp/impl/codegen/server_context.h @@ -314,12 +314,12 @@ class ServerContext { uint32_t initial_metadata_flags() const { return 0; } experimental::ServerRpcInfo* set_server_rpc_info( - const char* method, + const char* method, internal::RpcMethod::RpcType type, const std::vector< std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>& creators) { if (creators.size() != 0) { - rpc_info_ = new experimental::ServerRpcInfo(this, method); + rpc_info_ = new experimental::ServerRpcInfo(this, method, type); rpc_info_->RegisterInterceptors(creators); } return rpc_info_; diff --git a/include/grpcpp/impl/codegen/server_interceptor.h b/include/grpcpp/impl/codegen/server_interceptor.h index 5fb5df28b7..cd7c0600b6 100644 --- a/include/grpcpp/impl/codegen/server_interceptor.h +++ b/include/grpcpp/impl/codegen/server_interceptor.h @@ -23,6 +23,7 @@ #include <vector> #include <grpcpp/impl/codegen/interceptor.h> +#include <grpcpp/impl/codegen/rpc_method.h> #include <grpcpp/impl/codegen/string_ref.h> namespace grpc { @@ -44,6 +45,8 @@ class ServerInterceptorFactoryInterface { class ServerRpcInfo { public: + enum class Type { UNARY, CLIENT_STREAMING, SERVER_STREAMING, BIDI_STREAMING }; + ~ServerRpcInfo(){}; ServerRpcInfo(const ServerRpcInfo&) = delete; @@ -51,12 +54,27 @@ class ServerRpcInfo { ServerRpcInfo& operator=(ServerRpcInfo&&) = default; // Getter methods - const char* method() { return method_; } + const char* method() const { return method_; } + Type type() const { return type_; } grpc::ServerContext* server_context() { return ctx_; } private: - ServerRpcInfo(grpc::ServerContext* ctx, const char* method) - : ctx_(ctx), method_(method) { + static_assert(Type::UNARY == + static_cast<Type>(internal::RpcMethod::NORMAL_RPC), + "violated expectation about Type enum"); + static_assert(Type::CLIENT_STREAMING == + static_cast<Type>(internal::RpcMethod::CLIENT_STREAMING), + "violated expectation about Type enum"); + static_assert(Type::SERVER_STREAMING == + static_cast<Type>(internal::RpcMethod::SERVER_STREAMING), + "violated expectation about Type enum"); + static_assert(Type::BIDI_STREAMING == + static_cast<Type>(internal::RpcMethod::BIDI_STREAMING), + "violated expectation about Type enum"); + + ServerRpcInfo(grpc::ServerContext* ctx, const char* method, + internal::RpcMethod::RpcType type) + : ctx_(ctx), method_(method), type_(static_cast<Type>(type)) { ref_.store(1); } @@ -86,6 +104,7 @@ class ServerRpcInfo { grpc::ServerContext* ctx_ = nullptr; const char* method_ = nullptr; + const Type type_; std::atomic_int ref_; std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_; diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h index 55c94f4d2f..e0e2629827 100644 --- a/include/grpcpp/impl/codegen/server_interface.h +++ b/include/grpcpp/impl/codegen/server_interface.h @@ -174,13 +174,14 @@ class ServerInterface : public internal::CallHook { bool done_intercepting_; }; + /// RegisteredAsyncRequest is not part of the C++ API class RegisteredAsyncRequest : public BaseAsyncRequest { public: RegisteredAsyncRequest(ServerInterface* server, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, - const char* name); + const char* name, internal::RpcMethod::RpcType type); virtual bool FinalizeResult(void** tag, bool* status) override { /* If we are done intercepting, then there is nothing more for us to do */ @@ -189,7 +190,7 @@ class ServerInterface : public internal::CallHook { } call_wrapper_ = internal::Call( call_, server_, call_cq_, server_->max_receive_message_size(), - context_->set_server_rpc_info(name_, + context_->set_server_rpc_info(name_, type_, *server_->interceptor_creators())); return BaseAsyncRequest::FinalizeResult(tag, status); } @@ -198,6 +199,7 @@ class ServerInterface : public internal::CallHook { void IssueRequest(void* registered_method, grpc_byte_buffer** payload, ServerCompletionQueue* notification_cq); const char* name_; + const internal::RpcMethod::RpcType type_; }; class NoPayloadAsyncRequest final : public RegisteredAsyncRequest { @@ -207,9 +209,9 @@ class ServerInterface : public internal::CallHook { internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) - : RegisteredAsyncRequest(server, context, stream, call_cq, - notification_cq, tag, - registered_method->name()) { + : RegisteredAsyncRequest( + server, context, stream, call_cq, notification_cq, tag, + registered_method->name(), registered_method->method_type()) { IssueRequest(registered_method->server_tag(), nullptr, notification_cq); } @@ -225,9 +227,9 @@ class ServerInterface : public internal::CallHook { CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* request) - : RegisteredAsyncRequest(server, context, stream, call_cq, - notification_cq, tag, - registered_method->name()), + : RegisteredAsyncRequest( + server, context, stream, call_cq, notification_cq, tag, + registered_method->name(), registered_method->method_type()), registered_method_(registered_method), server_(server), context_(context), diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 3347676a48..ebc412b468 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -570,12 +570,6 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { } else { grpc_error* error = GRPC_ERROR_NONE; grpc_core::LoadBalancingPolicy::PickState pick_state; - pick_state.initial_metadata = nullptr; - pick_state.initial_metadata_flags = 0; - pick_state.on_complete = nullptr; - memset(&pick_state.subchannel_call_context, 0, - sizeof(pick_state.subchannel_call_context)); - pick_state.user_data = nullptr; // Pick must return synchronously, because pick_state.on_complete is null. GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error)); if (pick_state.connected_subchannel != nullptr) { diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 0817b1dd39..af55f7710e 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -153,7 +153,7 @@ struct grpc_subchannel { /** have we started the backoff loop */ bool backoff_begun; // reset_backoff() was called while alarm was pending - bool deferred_reset_backoff; + bool retry_immediately; /** our alarm */ grpc_timer alarm; @@ -709,8 +709,8 @@ static void on_alarm(void* arg, grpc_error* error) { if (c->disconnected) { error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", &error, 1); - } else if (c->deferred_reset_backoff) { - c->deferred_reset_backoff = false; + } else if (c->retry_immediately) { + c->retry_immediately = false; error = GRPC_ERROR_NONE; } else { GRPC_ERROR_REF(error); @@ -887,12 +887,12 @@ static void on_subchannel_connected(void* arg, grpc_error* error) { void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel) { gpr_mu_lock(&subchannel->mu); + subchannel->backoff->Reset(); if (subchannel->have_alarm) { - subchannel->deferred_reset_backoff = true; + subchannel->retry_immediately = true; grpc_timer_cancel(&subchannel->alarm); } else { subchannel->backoff_begun = false; - subchannel->backoff->Reset(); maybe_start_connecting_locked(subchannel); } gpr_mu_unlock(&subchannel->mu); diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc index 1c23a6c4be..aa8441f17b 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.cc +++ b/src/core/ext/filters/client_channel/subchannel_index.cc @@ -91,7 +91,7 @@ void grpc_subchannel_key_destroy(grpc_subchannel_key* k) { gpr_free(k); } -static void sck_avl_destroy(void* p, void* user_data) { +static void sck_avl_destroy(void* p, void* unused) { grpc_subchannel_key_destroy(static_cast<grpc_subchannel_key*>(p)); } @@ -104,7 +104,7 @@ static long sck_avl_compare(void* a, void* b, void* unused) { static_cast<grpc_subchannel_key*>(b)); } -static void scv_avl_destroy(void* p, void* user_data) { +static void scv_avl_destroy(void* p, void* unused) { GRPC_SUBCHANNEL_WEAK_UNREF((grpc_subchannel*)p, "subchannel_index"); } @@ -137,7 +137,7 @@ void grpc_subchannel_index_shutdown(void) { void grpc_subchannel_index_unref(void) { if (gpr_unref(&g_refcount)) { gpr_mu_destroy(&g_mu); - grpc_avl_unref(g_subchannel_index, grpc_core::ExecCtx::Get()); + grpc_avl_unref(g_subchannel_index, nullptr); } } @@ -147,13 +147,12 @@ grpc_subchannel* grpc_subchannel_index_find(grpc_subchannel_key* key) { // Lock, and take a reference to the subchannel index. // We don't need to do the search under a lock as avl's are immutable. gpr_mu_lock(&g_mu); - grpc_avl index = grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get()); + grpc_avl index = grpc_avl_ref(g_subchannel_index, nullptr); gpr_mu_unlock(&g_mu); grpc_subchannel* c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF( - (grpc_subchannel*)grpc_avl_get(index, key, grpc_core::ExecCtx::Get()), - "index_find"); - grpc_avl_unref(index, grpc_core::ExecCtx::Get()); + (grpc_subchannel*)grpc_avl_get(index, key, nullptr), "index_find"); + grpc_avl_unref(index, nullptr); return c; } @@ -169,13 +168,11 @@ grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); - grpc_avl index = - grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get()); + grpc_avl index = grpc_avl_ref(g_subchannel_index, nullptr); gpr_mu_unlock(&g_mu); // - Check to see if a subchannel already exists - c = static_cast<grpc_subchannel*>( - grpc_avl_get(index, key, grpc_core::ExecCtx::Get())); + c = static_cast<grpc_subchannel*>(grpc_avl_get(index, key, nullptr)); if (c != nullptr) { c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register"); } @@ -184,11 +181,9 @@ grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, need_to_unref_constructed = true; } else { // no -> update the avl and compare/swap - grpc_avl updated = - grpc_avl_add(grpc_avl_ref(index, grpc_core::ExecCtx::Get()), - subchannel_key_copy(key), - GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), - grpc_core::ExecCtx::Get()); + grpc_avl updated = grpc_avl_add( + grpc_avl_ref(index, nullptr), subchannel_key_copy(key), + GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), nullptr); // it may happen (but it's expected to be unlikely) // that some other thread has changed the index: @@ -200,9 +195,9 @@ grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key, } gpr_mu_unlock(&g_mu); - grpc_avl_unref(updated, grpc_core::ExecCtx::Get()); + grpc_avl_unref(updated, nullptr); } - grpc_avl_unref(index, grpc_core::ExecCtx::Get()); + grpc_avl_unref(index, nullptr); } if (need_to_unref_constructed) { @@ -219,24 +214,22 @@ void grpc_subchannel_index_unregister(grpc_subchannel_key* key, // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); - grpc_avl index = - grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get()); + grpc_avl index = grpc_avl_ref(g_subchannel_index, nullptr); gpr_mu_unlock(&g_mu); // Check to see if this key still refers to the previously // registered subchannel - grpc_subchannel* c = static_cast<grpc_subchannel*>( - grpc_avl_get(index, key, grpc_core::ExecCtx::Get())); + grpc_subchannel* c = + static_cast<grpc_subchannel*>(grpc_avl_get(index, key, nullptr)); if (c != constructed) { - grpc_avl_unref(index, grpc_core::ExecCtx::Get()); + grpc_avl_unref(index, nullptr); break; } // compare and swap the update (some other thread may have // mutated the index behind us) grpc_avl updated = - grpc_avl_remove(grpc_avl_ref(index, grpc_core::ExecCtx::Get()), key, - grpc_core::ExecCtx::Get()); + grpc_avl_remove(grpc_avl_ref(index, nullptr), key, nullptr); gpr_mu_lock(&g_mu); if (index.root == g_subchannel_index.root) { @@ -245,8 +238,8 @@ void grpc_subchannel_index_unregister(grpc_subchannel_key* key, } gpr_mu_unlock(&g_mu); - grpc_avl_unref(updated, grpc_core::ExecCtx::Get()); - grpc_avl_unref(index, grpc_core::ExecCtx::Get()); + grpc_avl_unref(updated, nullptr); + grpc_avl_unref(index, nullptr); } } diff --git a/src/core/lib/debug/trace.cc b/src/core/lib/debug/trace.cc index 01c1e867d9..cafdb15c69 100644 --- a/src/core/lib/debug/trace.cc +++ b/src/core/lib/debug/trace.cc @@ -21,6 +21,7 @@ #include "src/core/lib/debug/trace.h" #include <string.h> +#include <type_traits> #include <grpc/grpc.h> #include <grpc/support/alloc.h> @@ -79,6 +80,8 @@ void TraceFlagList::LogAllTracers() { // Flags register themselves on the list during construction TraceFlag::TraceFlag(bool default_enabled, const char* name) : name_(name) { + static_assert(std::is_trivially_destructible<TraceFlag>::value, + "TraceFlag needs to be trivially destructible."); set_enabled(default_enabled); TraceFlagList::Add(this); } diff --git a/src/core/lib/debug/trace.h b/src/core/lib/debug/trace.h index 5ed52454bd..4623494520 100644 --- a/src/core/lib/debug/trace.h +++ b/src/core/lib/debug/trace.h @@ -53,7 +53,8 @@ void grpc_tracer_enable_flag(grpc_core::TraceFlag* flag); class TraceFlag { public: TraceFlag(bool default_enabled, const char* name); - ~TraceFlag() {} + // This needs to be trivially destructible as it is used as global variable. + ~TraceFlag() = default; const char* name() const { return name_; } diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index d1c55319f7..a31d0b30b1 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -149,8 +149,9 @@ internal::Call Channel::CreateCallInternal(const internal::RpcMethod& method, // ClientRpcInfo should be set before call because set_call also checks // whether the call has been cancelled, and if the call was cancelled, we // should notify the interceptors too/ - auto* info = context->set_client_rpc_info( - method.name(), this, interceptor_creators_, interceptor_pos); + auto* info = + context->set_client_rpc_info(method.name(), method.method_type(), this, + interceptor_creators_, interceptor_pos); context->set_call(c_call, shared_from_this()); return internal::Call(c_call, this, cq, info); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 69af43a656..1e3c57446f 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -236,9 +236,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag { : nullptr), request_(nullptr), method_(mrd->method_), - call_(mrd->call_, server, &cq_, server->max_receive_message_size(), - ctx_.set_server_rpc_info(method_->name(), - server->interceptor_creators_)), + call_( + mrd->call_, server, &cq_, server->max_receive_message_size(), + ctx_.set_server_rpc_info(method_->name(), method_->method_type(), + server->interceptor_creators_)), server_(server), global_callbacks_(nullptr), resources_(false) { @@ -427,7 +428,8 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag { req_->call_, req_->server_, req_->cq_, req_->server_->max_receive_message_size(), req_->ctx_.set_server_rpc_info( - req_->method_->name(), req_->server_->interceptor_creators_)); + req_->method_->name(), req_->method_->method_type(), + req_->server_->interceptor_creators_)); req_->interceptor_methods_.SetCall(call_); req_->interceptor_methods_.SetReverse(); @@ -1041,10 +1043,12 @@ void ServerInterface::BaseAsyncRequest:: ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest( ServerInterface* server, ServerContext* context, internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag, const char* name) + ServerCompletionQueue* notification_cq, void* tag, const char* name, + internal::RpcMethod::RpcType type) : BaseAsyncRequest(server, context, stream, call_cq, notification_cq, tag, true), - name_(name) {} + name_(name), + type_(type) {} void ServerInterface::RegisteredAsyncRequest::IssueRequest( void* registered_method, grpc_byte_buffer** payload, @@ -1091,6 +1095,7 @@ bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, call_, server_, call_cq_, server_->max_receive_message_size(), context_->set_server_rpc_info( static_cast<GenericServerContext*>(context_)->method_.c_str(), + internal::RpcMethod::BIDI_STREAMING, *server_->interceptor_creators())); return BaseAsyncRequest::FinalizeResult(tag, status); } diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index ab154d8512..35fa82d56b 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -175,6 +175,7 @@ def _event_handler(state, response_deserializer): return handle_event +#pylint: disable=too-many-statements def _consume_request_iterator(request_iterator, state, call, request_serializer, event_handler): if cygrpc.is_fork_support_enabled(): diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py index 2a8ddd8ce4..fc0ad77eb9 100644 --- a/src/python/grpcio/grpc/_interceptor.py +++ b/src/python/grpcio/grpc/_interceptor.py @@ -135,9 +135,12 @@ class _FailureOutcome(grpc.RpcError, grpc.Future, grpc.Call): def __iter__(self): return self - def next(self): + def __next__(self): raise self._exception + def next(self): + return self.__next__() + class _UnaryOutcome(grpc.Call, grpc.Future): diff --git a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py index 84f8594689..8ca5189522 100644 --- a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py +++ b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py @@ -69,32 +69,28 @@ class _ChannelServerPair(object): def __init__(self): # Server will enable channelz service - # Bind as attribute, so its `del` can be called explicitly, during - # the destruction process. Otherwise, if the removal of server - # rely on gc cycle, the test will become non-deterministic. - self._server = grpc.server( + self.server = grpc.server( futures.ThreadPoolExecutor(max_workers=3), options=_DISABLE_REUSE_PORT + _ENABLE_CHANNELZ) - port = self._server.add_insecure_port('[::]:0') - self._server.add_generic_rpc_handlers((_GenericHandler(),)) - self._server.start() + port = self.server.add_insecure_port('[::]:0') + self.server.add_generic_rpc_handlers((_GenericHandler(),)) + self.server.start() # Channel will enable channelz service... self.channel = grpc.insecure_channel('localhost:%d' % port, _ENABLE_CHANNELZ) - def __del__(self): - self._server.__del__() - self.channel.close() - def _generate_channel_server_pairs(n): return [_ChannelServerPair() for i in range(n)] -def _clean_channel_server_pairs(pairs): +def _close_channel_server_pairs(pairs): for pair in pairs: - pair.__del__() + pair.server.stop(None) + # TODO(ericgribkoff) This del should not be required + del pair.server + pair.channel.close() class ChannelzServicerTest(unittest.TestCase): @@ -147,9 +143,9 @@ class ChannelzServicerTest(unittest.TestCase): self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel) def tearDown(self): - self._server.__del__() + self._server.stop(None) self._channel.close() - _clean_channel_server_pairs(self._pairs) + _close_channel_server_pairs(self._pairs) def test_get_top_channels_basic(self): self._pairs = _generate_channel_server_pairs(1) @@ -278,20 +274,12 @@ class ChannelzServicerTest(unittest.TestCase): self.assertEqual(gtc_resp.channel[i].data.calls_failed, gsc_resp.subchannel.data.calls_failed) - @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ - 'immediately when the reference goes out of scope, so ' \ - 'servers from multiple test cases are not hermetic. ' \ - 'TODO(https://github.com/grpc/grpc/issues/17258)') def test_server_basic(self): self._pairs = _generate_channel_server_pairs(1) resp = self._channelz_stub.GetServers( channelz_pb2.GetServersRequest(start_server_id=0)) self.assertEqual(len(resp.server), 1) - @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ - 'immediately when the reference goes out of scope, so ' \ - 'servers from multiple test cases are not hermetic. ' \ - 'TODO(https://github.com/grpc/grpc/issues/17258)') def test_get_one_server(self): self._pairs = _generate_channel_server_pairs(1) gss_resp = self._channelz_stub.GetServers( @@ -303,10 +291,6 @@ class ChannelzServicerTest(unittest.TestCase): self.assertEqual(gss_resp.server[0].ref.server_id, gs_resp.server.ref.server_id) - @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ - 'immediately when the reference goes out of scope, so ' \ - 'servers from multiple test cases are not hermetic. ' \ - 'TODO(https://github.com/grpc/grpc/issues/17258)') def test_server_call(self): self._pairs = _generate_channel_server_pairs(1) k_success = 23 @@ -401,10 +385,6 @@ class ChannelzServicerTest(unittest.TestCase): self.assertEqual(gs_resp.socket.data.messages_received, test_constants.STREAM_LENGTH) - @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ - 'immediately when the reference goes out of scope, so ' \ - 'servers from multiple test cases are not hermetic. ' \ - 'TODO(https://github.com/grpc/grpc/issues/17258)') def test_server_sockets(self): self._pairs = _generate_channel_server_pairs(1) self._send_successful_unary_unary(0) @@ -423,10 +403,6 @@ class ChannelzServicerTest(unittest.TestCase): # If the RPC call failed, it will raise a grpc.RpcError # So, if there is no exception raised, considered pass - @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ - 'immediately when the reference goes out of scope, so ' \ - 'servers from multiple test cases are not hermetic. ' \ - 'TODO(https://github.com/grpc/grpc/issues/17258)') def test_server_listen_sockets(self): self._pairs = _generate_channel_server_pairs(1) diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index 740bdcf1eb..337a94b546 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -39,7 +39,7 @@ class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer): self._quit_event = threading.Event() def RunServer(self, request_iterator, context): - config = next(request_iterator).setup + config = next(request_iterator).setup #pylint: disable=stop-iteration-return server, port = self._create_server(config) cores = multiprocessing.cpu_count() server.start() @@ -102,7 +102,7 @@ class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer): return (server, port) def RunClient(self, request_iterator, context): - config = next(request_iterator).setup + config = next(request_iterator).setup #pylint: disable=stop-iteration-return client_runners = [] qps_data = histogram.Histogram(config.histogram_params.resolution, config.histogram_params.max_possible) diff --git a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py index f489db12cb..d1263c2c56 100644 --- a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py +++ b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py @@ -87,7 +87,7 @@ def hang_stream_stream(request_iterator, servicer_context): def hang_partial_stream_stream(request_iterator, servicer_context): for _ in range(test_constants.STREAM_LENGTH // 2): - yield next(request_iterator) + yield next(request_iterator) #pylint: disable=stop-iteration-return time.sleep(WAIT_TIME) diff --git a/test/cpp/end2end/client_interceptors_end2end_test.cc b/test/cpp/end2end/client_interceptors_end2end_test.cc index 3a191d1e03..f55ece1c2b 100644 --- a/test/cpp/end2end/client_interceptors_end2end_test.cc +++ b/test/cpp/end2end/client_interceptors_end2end_test.cc @@ -50,6 +50,7 @@ class HijackingInterceptor : public experimental::Interceptor { info_ = info; // Make sure it is the right method EXPECT_EQ(strcmp("/grpc.testing.EchoTestService/Echo", info->method()), 0); + EXPECT_EQ(info->type(), experimental::ClientRpcInfo::Type::UNARY); } virtual void Intercept(experimental::InterceptorBatchMethods* methods) { diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 759847be3e..1dfa91a76c 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -537,6 +537,51 @@ TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) { EXPECT_LT(waited_ms, kInitialBackOffMs); } +TEST_F(ClientLbEnd2endTest, + PickFirstResetConnectionBackoffNextAttemptStartsImmediately) { + ChannelArguments args; + constexpr int kInitialBackOffMs = 1000; + args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); + const std::vector<int> ports = {grpc_pick_unused_port_or_die()}; + auto channel = BuildChannel("pick_first", args); + auto stub = BuildStub(channel); + SetNextResolution(ports); + // Wait for connect, which should fail ~immediately, because the server + // is not up. + gpr_log(GPR_INFO, "=== INITIAL CONNECTION ATTEMPT"); + EXPECT_FALSE( + channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10))); + // Reset connection backoff. + // Note that the time at which the third attempt will be started is + // actually computed at this point, so we record the start time here. + gpr_log(GPR_INFO, "=== RESETTING BACKOFF"); + const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC); + experimental::ChannelResetConnectionBackoff(channel.get()); + // Trigger a second connection attempt. This should also fail + // ~immediately, but the retry should be scheduled for + // kInitialBackOffMs instead of applying the multiplier. + gpr_log(GPR_INFO, "=== POLLING FOR SECOND CONNECTION ATTEMPT"); + EXPECT_FALSE( + channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10))); + // Bring up a server on the chosen port. + gpr_log(GPR_INFO, "=== STARTING BACKEND"); + StartServers(1, ports); + // Wait for connect. Should happen within kInitialBackOffMs. + // Give an extra 100ms to account for the time spent in the second and + // third connection attempts themselves (since what we really want to + // measure is the time between the two). As long as this is less than + // the 1.6x increase we would see if the backoff state was not reset + // properly, the test is still proving that the backoff was reset. + constexpr int kWaitMs = kInitialBackOffMs + 100; + gpr_log(GPR_INFO, "=== POLLING FOR THIRD CONNECTION ATTEMPT"); + EXPECT_TRUE(channel->WaitForConnected( + grpc_timeout_milliseconds_to_deadline(kWaitMs))); + const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC); + const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0)); + gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms); + EXPECT_LT(waited_ms, kWaitMs); +} + TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc index c98b6143c6..9460a7d6c6 100644 --- a/test/cpp/end2end/server_interceptors_end2end_test.cc +++ b/test/cpp/end2end/server_interceptors_end2end_test.cc @@ -44,7 +44,34 @@ namespace { class LoggingInterceptor : public experimental::Interceptor { public: - LoggingInterceptor(experimental::ServerRpcInfo* info) { info_ = info; } + LoggingInterceptor(experimental::ServerRpcInfo* info) { + info_ = info; + + // Check the method name and compare to the type + const char* method = info->method(); + experimental::ServerRpcInfo::Type type = info->type(); + + // Check that we use one of our standard methods with expected type. + // Also allow the health checking service. + // We accept BIDI_STREAMING for Echo in case it's an AsyncGenericService + // being tested (the GenericRpc test). + // The empty method is for the Unimplemented requests that arise + // when draining the CQ. + EXPECT_TRUE( + strstr(method, "/grpc.health") == method || + (strcmp(method, "/grpc.testing.EchoTestService/Echo") == 0 && + (type == experimental::ServerRpcInfo::Type::UNARY || + type == experimental::ServerRpcInfo::Type::BIDI_STREAMING)) || + (strcmp(method, "/grpc.testing.EchoTestService/RequestStream") == 0 && + type == experimental::ServerRpcInfo::Type::CLIENT_STREAMING) || + (strcmp(method, "/grpc.testing.EchoTestService/ResponseStream") == 0 && + type == experimental::ServerRpcInfo::Type::SERVER_STREAMING) || + (strcmp(method, "/grpc.testing.EchoTestService/BidiStream") == 0 && + type == experimental::ServerRpcInfo::Type::BIDI_STREAMING) || + strcmp(method, "/grpc.testing.EchoTestService/Unimplemented") == 0 || + (strcmp(method, "") == 0 && + type == experimental::ServerRpcInfo::Type::BIDI_STREAMING)); + } virtual void Intercept(experimental::InterceptorBatchMethods* methods) { if (methods->QueryInterceptionHookPoint( |