From d6ef707422c8afba5046f395419a2a524a248472 Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 1 Aug 2018 11:22:32 -0700 Subject: Add more filter priority levels --- test/core/channel/minimal_stack_is_minimal_test.cc | 16 ++++++++-------- test/core/end2end/tests/filter_call_init_fails.cc | 15 ++++++++------- test/core/end2end/tests/filter_latency.cc | 19 +++++++++---------- test/core/end2end/tests/filter_status_code.cc | 19 +++++++++---------- 4 files changed, 34 insertions(+), 35 deletions(-) (limited to 'test/core') diff --git a/test/core/channel/minimal_stack_is_minimal_test.cc b/test/core/channel/minimal_stack_is_minimal_test.cc index e5953acedc..5b651ed39b 100644 --- a/test/core/channel/minimal_stack_is_minimal_test.cc +++ b/test/core/channel/minimal_stack_is_minimal_test.cc @@ -85,21 +85,21 @@ int main(int argc, char** argv) { // tests with a default stack errors += - CHECK_STACK("unknown", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, "authority", - "message_size", "deadline", "connected", NULL); + CHECK_STACK("unknown", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, "deadline", + "authority", "message_size", "connected", NULL); errors += CHECK_STACK("unknown", nullptr, GRPC_CLIENT_SUBCHANNEL, "authority", "message_size", "connected", NULL); errors += CHECK_STACK("unknown", nullptr, GRPC_SERVER_CHANNEL, "server", - "message_size", "deadline", "connected", NULL); + "deadline", "message_size", "connected", NULL); errors += CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, - "authority", "message_size", "deadline", "http-client", - "message_compress", "connected", NULL); + "deadline", "authority", "message_size", + "message_compress", "http-client", "connected", NULL); errors += CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_SUBCHANNEL, "authority", - "message_size", "http-client", "message_compress", + "message_size", "message_compress", "http-client", "connected", NULL); errors += CHECK_STACK("chttp2", nullptr, GRPC_SERVER_CHANNEL, "server", - "message_size", "deadline", "http-server", - "message_compress", "connected", NULL); + "deadline", "message_size", "message_compress", + "http-server", "connected", NULL); errors += CHECK_STACK(nullptr, nullptr, GRPC_CLIENT_CHANNEL, "client-channel", NULL); diff --git a/test/core/end2end/tests/filter_call_init_fails.cc b/test/core/end2end/tests/filter_call_init_fails.cc index ab96879fe4..07e1421446 100644 --- a/test/core/end2end/tests/filter_call_init_fails.cc +++ b/test/core/end2end/tests/filter_call_init_fails.cc @@ -438,7 +438,6 @@ static bool maybe_add_server_channel_filter(grpc_channel_stack_builder* builder, // must be the last one. So we add it right before the last one. grpc_channel_stack_builder_iterator* it = grpc_channel_stack_builder_create_iterator_at_last(builder); - GPR_ASSERT(grpc_channel_stack_builder_move_prev(it)); const bool retval = grpc_channel_stack_builder_add_filter_before( it, &test_filter, nullptr, nullptr); grpc_channel_stack_builder_iterator_destroy(it); @@ -457,7 +456,6 @@ static bool maybe_add_client_channel_filter(grpc_channel_stack_builder* builder, // must be the last one. So we add it right before the last one. grpc_channel_stack_builder_iterator* it = grpc_channel_stack_builder_create_iterator_at_last(builder); - GPR_ASSERT(grpc_channel_stack_builder_move_prev(it)); const bool retval = grpc_channel_stack_builder_add_filter_before( it, &test_filter, nullptr, nullptr); grpc_channel_stack_builder_iterator_destroy(it); @@ -476,7 +474,6 @@ static bool maybe_add_client_subchannel_filter( // must be the last one. So we add it right before the last one. grpc_channel_stack_builder_iterator* it = grpc_channel_stack_builder_create_iterator_at_last(builder); - GPR_ASSERT(grpc_channel_stack_builder_move_prev(it)); const bool retval = grpc_channel_stack_builder_add_filter_before( it, &test_filter, nullptr, nullptr); grpc_channel_stack_builder_iterator_destroy(it); @@ -487,13 +484,17 @@ static bool maybe_add_client_subchannel_filter( } static void init_plugin(void) { - grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, + grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, + GRPC_CHANNEL_INIT_PRIORITY_MAX, maybe_add_server_channel_filter, nullptr); - grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, + grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, + GRPC_CHANNEL_INIT_PRIORITY_MAX, maybe_add_client_channel_filter, nullptr); - grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX, + grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, + GRPC_CHANNEL_INIT_PRIORITY_MAX, maybe_add_client_subchannel_filter, nullptr); - grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX, + grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, + GRPC_CHANNEL_INIT_PRIORITY_MAX, maybe_add_client_channel_filter, nullptr); } diff --git a/test/core/end2end/tests/filter_latency.cc b/test/core/end2end/tests/filter_latency.cc index a89db7b094..02a4d07927 100644 --- a/test/core/end2end/tests/filter_latency.cc +++ b/test/core/end2end/tests/filter_latency.cc @@ -314,7 +314,6 @@ static bool maybe_add_filter(grpc_channel_stack_builder* builder, void* arg) { // must be the last one. So we add it right before the last one. grpc_channel_stack_builder_iterator* it = grpc_channel_stack_builder_create_iterator_at_last(builder); - GPR_ASSERT(grpc_channel_stack_builder_move_prev(it)); const bool retval = grpc_channel_stack_builder_add_filter_before( it, filter, nullptr, nullptr); grpc_channel_stack_builder_iterator_destroy(it); @@ -326,15 +325,15 @@ static bool maybe_add_filter(grpc_channel_stack_builder* builder, void* arg) { static void init_plugin(void) { gpr_mu_init(&g_mu); - grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, - maybe_add_filter, - (void*)&test_client_filter); - grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX, - maybe_add_filter, - (void*)&test_client_filter); - grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, - maybe_add_filter, - (void*)&test_server_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, maybe_add_filter, + (void*)&test_client_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, + maybe_add_filter, (void*)&test_client_filter); + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, maybe_add_filter, + (void*)&test_server_filter); } static void destroy_plugin(void) { gpr_mu_destroy(&g_mu); } diff --git a/test/core/end2end/tests/filter_status_code.cc b/test/core/end2end/tests/filter_status_code.cc index ba3cbfa6d1..6ed1de15c6 100644 --- a/test/core/end2end/tests/filter_status_code.cc +++ b/test/core/end2end/tests/filter_status_code.cc @@ -333,7 +333,6 @@ static bool maybe_add_filter(grpc_channel_stack_builder* builder, void* arg) { // So we add it right before the last one. grpc_channel_stack_builder_iterator* it = grpc_channel_stack_builder_create_iterator_at_last(builder); - GPR_ASSERT(grpc_channel_stack_builder_move_prev(it)); const bool retval = grpc_channel_stack_builder_add_filter_before( it, filter, nullptr, nullptr); grpc_channel_stack_builder_iterator_destroy(it); @@ -350,15 +349,15 @@ static void init_plugin(void) { g_client_code_recv = false; g_server_code_recv = false; - grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, - maybe_add_filter, - (void*)&test_client_filter); - grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX, - maybe_add_filter, - (void*)&test_client_filter); - grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, - maybe_add_filter, - (void*)&test_server_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, maybe_add_filter, + (void*)&test_client_filter); + grpc_channel_init_register_stage( + GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, + maybe_add_filter, (void*)&test_client_filter); + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, maybe_add_filter, + (void*)&test_server_filter); } static void destroy_plugin(void) { -- cgit v1.2.3 From 8c5f24cf3c00891cf061678ad0e3937d2ea7aece Mon Sep 17 00:00:00 2001 From: yang-g Date: Mon, 6 Aug 2018 10:59:15 -0700 Subject: update test --- test/core/end2end/tests/filter_causes_close.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'test/core') diff --git a/test/core/end2end/tests/filter_causes_close.cc b/test/core/end2end/tests/filter_causes_close.cc index a7f4268803..891c1b8c1f 100644 --- a/test/core/end2end/tests/filter_causes_close.cc +++ b/test/core/end2end/tests/filter_causes_close.cc @@ -261,8 +261,9 @@ static bool maybe_add_filter(grpc_channel_stack_builder* builder, void* arg) { } static void init_plugin(void) { - grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, 0, maybe_add_filter, - nullptr); + grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, + GRPC_CHANNEL_INIT_PRIORITY_HIGH, + maybe_add_filter, nullptr); } static void destroy_plugin(void) {} -- cgit v1.2.3 From f7e72560b664cce34bdf3c64b411cd6a153219ad Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 6 Aug 2018 14:46:37 -0700 Subject: Add experimental API for resetting connection backoff. --- grpc.def | 1 + include/grpc/grpc.h | 5 ++++ include/grpcpp/channel.h | 9 +++++++ .../ext/filters/client_channel/client_channel.cc | 11 ++++++++ src/core/ext/filters/client_channel/lb_policy.h | 5 +++- .../client_channel/lb_policy/grpclb/grpclb.cc | 10 ++++++++ .../lb_policy/pick_first/pick_first.cc | 8 ++++++ .../lb_policy/round_robin/round_robin.cc | 8 ++++++ .../client_channel/lb_policy/subchannel_list.h | 27 +++++++++++++++++++ src/core/ext/filters/client_channel/resolver.h | 8 ++++++ .../resolver/dns/c_ares/dns_resolver_ares.cc | 9 +++++++ .../resolver/dns/native/dns_resolver.cc | 9 +++++++ src/core/ext/filters/client_channel/subchannel.cc | 18 +++++++++++++ src/core/ext/filters/client_channel/subchannel.h | 7 +++++ src/core/lib/surface/channel.cc | 11 ++++++++ src/core/lib/transport/transport.h | 2 ++ src/cpp/client/channel_cc.cc | 8 ++++++ src/ruby/ext/grpc/rb_grpc_imports.generated.c | 2 ++ src/ruby/ext/grpc/rb_grpc_imports.generated.h | 3 +++ test/core/surface/public_headers_must_be_c89.c | 1 + test/cpp/end2end/client_lb_end2end_test.cc | 30 ++++++++++++++++++++++ 21 files changed, 191 insertions(+), 1 deletion(-) (limited to 'test/core') diff --git a/grpc.def b/grpc.def index 5b98792662..5e9d86c769 100644 --- a/grpc.def +++ b/grpc.def @@ -42,6 +42,7 @@ EXPORTS grpc_census_call_get_context grpc_channel_get_target grpc_channel_get_info + grpc_channel_reset_connect_backoff grpc_insecure_channel_create grpc_lame_client_channel_create grpc_channel_destroy diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index f0eb2c0121..bc6cd340af 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -269,6 +269,11 @@ GRPCAPI char* grpc_channel_get_target(grpc_channel* channel); GRPCAPI void grpc_channel_get_info(grpc_channel* channel, const grpc_channel_info* channel_info); +/** EXPERIMENTAL. Resets the channel's connect backoff. + TODO(roth): When we see whether this proves useful, either promote + to non-experimental or remove it. */ +GRPCAPI void grpc_channel_reset_connect_backoff(grpc_channel* channel); + /** Create a client channel to 'target'. Additional channel level configuration MAY be provided by grpc_channel_args, though the expectation is that most clients will want to simply pass NULL. The user data in 'args' need only diff --git a/include/grpcpp/channel.h b/include/grpcpp/channel.h index 4b45d5382c..fed02bf7bc 100644 --- a/include/grpcpp/channel.h +++ b/include/grpcpp/channel.h @@ -30,6 +30,14 @@ struct grpc_channel; namespace grpc { + +namespace experimental { +/// Resets the channel's connection backoff. +/// TODO(roth): Once we see whether this proves useful, either create a gRFC +/// and change this to be a method of the Channel class, or remove it. +void ChannelResetConnectionBackoff(Channel* channel); +} // namespace experimental + /// Channels represent a connection to an endpoint. Created by \a CreateChannel. class Channel final : public ChannelInterface, public internal::CallHook, @@ -52,6 +60,7 @@ class Channel final : public ChannelInterface, private: template friend class internal::BlockingUnaryCallImpl; + friend void experimental::ChannelResetConnectionBackoff(Channel* channel); friend std::shared_ptr CreateChannelInternal( const grpc::string& host, grpc_channel* c_channel); Channel(const grpc::string& host, grpc_channel* c_channel); diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index fead8feb17..b06f09d8c7 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -622,6 +622,17 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { } GRPC_ERROR_UNREF(op->disconnect_with_error); } + + if (op->reset_connect_backoff) { + if (chand->resolver != nullptr) { + chand->resolver->ResetBackoffLocked(); + chand->resolver->RequestReresolutionLocked(); + } + if (chand->lb_policy != nullptr) { + chand->lb_policy->ResetBackoffLocked(); + } + } + GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op"); GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 31c08246ae..3c0a9c1118 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -144,7 +144,10 @@ class LoadBalancingPolicy /// consider whether this method is still needed. virtual void ExitIdleLocked() GRPC_ABSTRACT; - /// populates child_subchannels and child_channels with the uuids of this + /// Resets connection backoff. + virtual void ResetBackoffLocked() GRPC_ABSTRACT; + + /// Populates child_subchannels and child_channels with the uuids of this /// LB policy's referenced children. This is not invoked from the /// client_channel's combiner. The implementation is responsible for /// providing its own synchronization. diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 2d1f777474..cf029ef4c1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -134,6 +134,7 @@ class GrpcLb : public LoadBalancingPolicy { grpc_error** connectivity_error) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void ExitIdleLocked() override; + void ResetBackoffLocked() override; void FillChildRefsForChannelz(ChildRefsList* child_subchannels, ChildRefsList* child_channels) override; @@ -1214,6 +1215,15 @@ void GrpcLb::ExitIdleLocked() { } } +void GrpcLb::ResetBackoffLocked() { + if (lb_channel_ != nullptr) { + grpc_channel_reset_connect_backoff(lb_channel_); + } + if (rr_policy_ != nullptr) { + rr_policy_->ResetBackoffLocked(); + } +} + bool GrpcLb::PickLocked(PickState* pick, grpc_error** error) { PendingPick* pp = PendingPickCreate(pick); bool pick_done = false; diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 46acbf628b..2b6a9ba8c5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -57,6 +57,7 @@ class PickFirst : public LoadBalancingPolicy { grpc_error** connectivity_error) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void ExitIdleLocked() override; + void ResetBackoffLocked() override; void FillChildRefsForChannelz(ChildRefsList* child_subchannels, ChildRefsList* ignored) override; @@ -259,6 +260,13 @@ void PickFirst::ExitIdleLocked() { } } +void PickFirst::ResetBackoffLocked() { + subchannel_list_->ResetBackoffLocked(); + if (latest_pending_subchannel_list_ != nullptr) { + latest_pending_subchannel_list_->ResetBackoffLocked(); + } +} + bool PickFirst::PickLocked(PickState* pick, grpc_error** error) { // If we have a selected subchannel already, return synchronously. if (selected_ != nullptr) { diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 9c3a15c67b..fea84331d8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -68,6 +68,7 @@ class RoundRobin : public LoadBalancingPolicy { grpc_error** connectivity_error) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void ExitIdleLocked() override; + void ResetBackoffLocked() override; void FillChildRefsForChannelz(ChildRefsList* child_subchannels, ChildRefsList* ignored) override; @@ -333,6 +334,13 @@ void RoundRobin::ExitIdleLocked() { } } +void RoundRobin::ResetBackoffLocked() { + subchannel_list_->ResetBackoffLocked(); + if (latest_pending_subchannel_list_ != nullptr) { + latest_pending_subchannel_list_->ResetBackoffLocked(); + } +} + bool RoundRobin::DoPickLocked(PickState* pick) { const size_t next_ready_index = subchannel_list_->GetNextReadySubchannelIndexLocked(); diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 018ac3bb86..0fa2f04e73 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -107,6 +107,11 @@ class SubchannelData { // being unreffed. virtual void UnrefSubchannelLocked(const char* reason); + // Resets the connection backoff. + // TODO(roth): This method should go away when we move the backoff + // code out of the subchannel and into the LB policies. + void ResetBackoffLocked(); + // Starts watching the connectivity state of the subchannel. // ProcessConnectivityChangeLocked() will be called when the // connectivity state changes. @@ -206,6 +211,11 @@ class SubchannelList LoadBalancingPolicy* policy() const { return policy_; } TraceFlag* tracer() const { return tracer_; } + // Resets connection backoff of all subchannels. + // TODO(roth): We will probably need to rethink this as part of moving + // the backoff code out of subchannels and into LB policies. + void ResetBackoffLocked(); + // Note: Caller must ensure that this is invoked inside of the combiner. void Orphan() override { ShutdownLocked(); @@ -298,6 +308,14 @@ void SubchannelData:: } } +template +void SubchannelData::ResetBackoffLocked() { + if (subchannel_ != nullptr) { + grpc_subchannel_reset_backoff(subchannel_); + } +} + template void SubchannelData::StartConnectivityWatchLocked() { @@ -544,6 +562,15 @@ void SubchannelList::ShutdownLocked() { } } +template +void SubchannelList::ResetBackoffLocked() { + for (size_t i = 0; i < subchannels_.size(); i++) { + SubchannelDataType* sd = &subchannels_[i]; + sd->ResetBackoffLocked(); + } +} + } // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */ diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h index c7e37e4468..48f2e89095 100644 --- a/src/core/ext/filters/client_channel/resolver.h +++ b/src/core/ext/filters/client_channel/resolver.h @@ -94,6 +94,14 @@ class Resolver : public InternallyRefCountedWithTracing { /// throw away unselected subchannels. virtual void RequestReresolutionLocked() GRPC_ABSTRACT; + /// Resets the re-resolution backoff, if any. + /// This needs to be implemented only by pull-based implementations; + /// for push-based implementations, it will be a no-op. + /// TODO(roth): Pull the backoff code out of resolver and into + /// client_channel, so that it can be shared across resolver + /// implementations. At that point, this method can go away. + virtual void ResetBackoffLocked() {} + void Orphan() override { // Invoke ShutdownAndUnrefLocked() inside of the combiner. GRPC_CLOSURE_SCHED( diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 7050e82121..f2bb5f3c71 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -66,6 +66,8 @@ class AresDnsResolver : public Resolver { void RequestReresolutionLocked() override; + void ResetBackoffLocked() override; + void ShutdownLocked() override; private: @@ -187,6 +189,13 @@ void AresDnsResolver::RequestReresolutionLocked() { } } +void AresDnsResolver::ResetBackoffLocked() { + if (have_next_resolution_timer_) { + grpc_timer_cancel(&next_resolution_timer_); + } + backoff_.Reset(); +} + void AresDnsResolver::ShutdownLocked() { if (have_next_resolution_timer_) { grpc_timer_cancel(&next_resolution_timer_); diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index fae4c33a17..282caf215c 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -58,6 +58,8 @@ class NativeDnsResolver : public Resolver { void RequestReresolutionLocked() override; + void ResetBackoffLocked() override; + void ShutdownLocked() override; private: @@ -158,6 +160,13 @@ void NativeDnsResolver::RequestReresolutionLocked() { } } +void NativeDnsResolver::ResetBackoffLocked() { + if (have_next_resolution_timer_) { + grpc_timer_cancel(&next_resolution_timer_); + } + backoff_.Reset(); +} + void NativeDnsResolver::ShutdownLocked() { if (have_next_resolution_timer_) { grpc_timer_cancel(&next_resolution_timer_); diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 71ef8c518b..48c6030c89 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -132,6 +132,8 @@ struct grpc_subchannel { bool have_alarm; /** have we started the backoff loop */ bool backoff_begun; + // reset_backoff() was called while alarm was pending + bool deferred_reset_backoff; /** our alarm */ grpc_timer alarm; @@ -438,6 +440,9 @@ static void on_alarm(void* arg, grpc_error* error) { if (c->disconnected) { error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", &error, 1); + } else if (c->deferred_reset_backoff) { + c->deferred_reset_backoff = false; + error = GRPC_ERROR_NONE; } else { GRPC_ERROR_REF(error); } @@ -675,6 +680,19 @@ static void on_subchannel_connected(void* arg, grpc_error* error) { grpc_channel_args_destroy(delete_channel_args); } +void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel) { + gpr_mu_lock(&subchannel->mu); + if (subchannel->have_alarm) { + subchannel->deferred_reset_backoff = true; + grpc_timer_cancel(&subchannel->alarm); + } else { + subchannel->backoff_begun = false; + subchannel->backoff->Reset(); + maybe_start_connecting_locked(subchannel); + } + gpr_mu_unlock(&subchannel->mu); +} + /* * grpc_subchannel_call implementation */ diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 9e53f7d542..a135035d62 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -145,6 +145,13 @@ grpc_subchannel_get_connected_subchannel(grpc_subchannel* c); const grpc_subchannel_key* grpc_subchannel_get_key( const grpc_subchannel* subchannel); +// Resets the connection backoff of the subchannel. +// TODO(roth): Move connection backoff out of subchannels and up into LB +// policy code (probably by adding a SubchannelGroup between +// SubchannelList and SubchannelData), at which point this method can +// go away. +void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel); + /** continue processing a transport op */ void grpc_subchannel_call_process_op(grpc_subchannel_call* subchannel_call, grpc_transport_stream_op_batch* op); diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 7cbd61adef..82635d3c21 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -281,6 +281,17 @@ void grpc_channel_get_info(grpc_channel* channel, elem->filter->get_channel_info(elem, channel_info); } +void grpc_channel_reset_connect_backoff(grpc_channel* channel) { + grpc_core::ExecCtx exec_ctx; + GRPC_API_TRACE("grpc_channel_reset_connect_backoff(channel=%p)", 1, + (channel)); + grpc_transport_op* op = grpc_make_transport_op(nullptr); + op->reset_connect_backoff = true; + grpc_channel_element* elem = + grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); + elem->filter->start_transport_op(elem, op); +} + static grpc_call* grpc_channel_create_call_internal( grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask, grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative, diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 585b9dfae9..9e784635c6 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -282,6 +282,8 @@ typedef struct grpc_transport_op { /** Called when the ping ack is received */ grpc_closure* on_ack; } send_ping; + // If true, will reset the channel's connection backoff. + bool reset_connect_backoff; /*************************************************************************** * remaining fields are initialized and used at the discretion of the diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 867f31f025..39b891c2e1 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -84,6 +84,14 @@ grpc::string Channel::GetServiceConfigJSON() const { &channel_info.service_config_json); } +namespace experimental { + +void ChannelResetConnectionBackoff(Channel* channel) { + grpc_channel_reset_connect_backoff(channel->c_channel_); +} + +} // namespace experimental + internal::Call Channel::CreateCall(const internal::RpcMethod& method, ClientContext* context, CompletionQueue* cq) { diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 2443532bb8..38b68462df 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -65,6 +65,7 @@ grpc_census_call_set_context_type grpc_census_call_set_context_import; grpc_census_call_get_context_type grpc_census_call_get_context_import; grpc_channel_get_target_type grpc_channel_get_target_import; grpc_channel_get_info_type grpc_channel_get_info_import; +grpc_channel_reset_connect_backoff_type grpc_channel_reset_connect_backoff_import; grpc_insecure_channel_create_type grpc_insecure_channel_create_import; grpc_lame_client_channel_create_type grpc_lame_client_channel_create_import; grpc_channel_destroy_type grpc_channel_destroy_import; @@ -315,6 +316,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_census_call_get_context_import = (grpc_census_call_get_context_type) GetProcAddress(library, "grpc_census_call_get_context"); grpc_channel_get_target_import = (grpc_channel_get_target_type) GetProcAddress(library, "grpc_channel_get_target"); grpc_channel_get_info_import = (grpc_channel_get_info_type) GetProcAddress(library, "grpc_channel_get_info"); + grpc_channel_reset_connect_backoff_import = (grpc_channel_reset_connect_backoff_type) GetProcAddress(library, "grpc_channel_reset_connect_backoff"); grpc_insecure_channel_create_import = (grpc_insecure_channel_create_type) GetProcAddress(library, "grpc_insecure_channel_create"); grpc_lame_client_channel_create_import = (grpc_lame_client_channel_create_type) GetProcAddress(library, "grpc_lame_client_channel_create"); grpc_channel_destroy_import = (grpc_channel_destroy_type) GetProcAddress(library, "grpc_channel_destroy"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index b08a1f94f7..d6add00d12 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -170,6 +170,9 @@ extern grpc_channel_get_target_type grpc_channel_get_target_import; typedef void(*grpc_channel_get_info_type)(grpc_channel* channel, const grpc_channel_info* channel_info); extern grpc_channel_get_info_type grpc_channel_get_info_import; #define grpc_channel_get_info grpc_channel_get_info_import +typedef void(*grpc_channel_reset_connect_backoff_type)(grpc_channel* channel); +extern grpc_channel_reset_connect_backoff_type grpc_channel_reset_connect_backoff_import; +#define grpc_channel_reset_connect_backoff grpc_channel_reset_connect_backoff_import typedef grpc_channel*(*grpc_insecure_channel_create_type)(const char* target, const grpc_channel_args* args, void* reserved); extern grpc_insecure_channel_create_type grpc_insecure_channel_create_import; #define grpc_insecure_channel_create grpc_insecure_channel_create_import diff --git a/test/core/surface/public_headers_must_be_c89.c b/test/core/surface/public_headers_must_be_c89.c index 9f4ad2b4d7..7b3e875cf0 100644 --- a/test/core/surface/public_headers_must_be_c89.c +++ b/test/core/surface/public_headers_must_be_c89.c @@ -104,6 +104,7 @@ int main(int argc, char **argv) { printf("%lx", (unsigned long) grpc_census_call_get_context); printf("%lx", (unsigned long) grpc_channel_get_target); printf("%lx", (unsigned long) grpc_channel_get_info); + printf("%lx", (unsigned long) grpc_channel_reset_connect_backoff); printf("%lx", (unsigned long) grpc_insecure_channel_create); printf("%lx", (unsigned long) grpc_lame_client_channel_create); printf("%lx", (unsigned long) grpc_channel_destroy); diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index c5a73a2469..7fe0da8aae 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -408,6 +408,36 @@ TEST_F(ClientLbEnd2endTest, PickFirstBackOffMinReconnect) { gpr_atm_rel_store(&g_connection_delay_ms, 0); } +TEST_F(ClientLbEnd2endTest, PickFirstResetConnectionBackoff) { + ChannelArguments args; + constexpr int kInitialBackOffMs = 1000; + args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, kInitialBackOffMs); + const std::vector ports = {grpc_pick_unused_port_or_die()}; + auto channel = BuildChannel("pick_first", args); + auto stub = BuildStub(channel); + SetNextResolution(ports); + // The channel won't become connected (there's no server). + EXPECT_FALSE( + channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10))); + // Bring up a server on the chosen port. + StartServers(1, ports); + const gpr_timespec t0 = gpr_now(GPR_CLOCK_MONOTONIC); + // Wait for connect, but not long enough. This proves that we're + // being throttled by initial backoff. + EXPECT_FALSE( + channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10))); + // Reset connection backoff. + experimental::ChannelResetConnectionBackoff(channel.get()); + // Wait for connect. Should happen ~immediately. + EXPECT_TRUE( + channel->WaitForConnected(grpc_timeout_milliseconds_to_deadline(10))); + const gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC); + const grpc_millis waited_ms = gpr_time_to_millis(gpr_time_sub(t1, t0)); + gpr_log(GPR_DEBUG, "Waited %" PRId64 " milliseconds", waited_ms); + // We should have waited less than kInitialBackOffMs. + EXPECT_LT(waited_ms, kInitialBackOffMs); +} + TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { // Start servers and send one RPC per server. const int kNumServers = 3; -- cgit v1.2.3