diff options
Diffstat (limited to 'src')
20 files changed, 139 insertions, 80 deletions
diff --git a/src/core/ext/filters/client_channel/backup_poller.cc b/src/core/ext/filters/client_channel/backup_poller.cc index ed437d255c..5b86e8631f 100644 --- a/src/core/ext/filters/client_channel/backup_poller.cc +++ b/src/core/ext/filters/client_channel/backup_poller.cc @@ -83,8 +83,8 @@ static void done_poller(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { } static void g_poller_unref(grpc_exec_ctx* exec_ctx) { + gpr_mu_lock(&g_poller_mu); if (gpr_unref(&g_poller->refs)) { - gpr_mu_lock(&g_poller_mu); backup_poller* p = g_poller; g_poller = nullptr; gpr_mu_unlock(&g_poller_mu); @@ -95,6 +95,8 @@ static void g_poller_unref(grpc_exec_ctx* exec_ctx) { p, grpc_schedule_on_exec_ctx)); gpr_mu_unlock(p->pollset_mu); grpc_timer_cancel(exec_ctx, &p->polling_timer); + } else { + gpr_mu_unlock(&g_poller_mu); } } diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index aced9adf9f..fc8210569e 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -643,16 +643,22 @@ static void start_transport_op_locked(grpc_exec_ctx* exec_ctx, void* arg, op->connectivity_state = nullptr; } - if (op->send_ping != nullptr) { + if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { if (chand->lb_policy == nullptr) { GRPC_CLOSURE_SCHED( - exec_ctx, op->send_ping, + exec_ctx, op->send_ping.on_initiate, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing")); + GRPC_CLOSURE_SCHED( + exec_ctx, op->send_ping.on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing")); } else { - grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping); + grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, + op->send_ping.on_initiate, + op->send_ping.on_ack); op->bind_pollset = nullptr; } - op->send_ping = nullptr; + op->send_ping.on_initiate = nullptr; + op->send_ping.on_ack = nullptr; } if (op->disconnect_with_error != GRPC_ERROR_NONE) { diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index db566f1b56..6b6022c247 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -138,8 +138,9 @@ void grpc_lb_policy_exit_idle_locked(grpc_exec_ctx* exec_ctx, void grpc_lb_policy_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, - grpc_closure* closure) { - policy->vtable->ping_one_locked(exec_ctx, policy, closure); + grpc_closure* on_initiate, + grpc_closure* on_ack) { + policy->vtable->ping_one_locked(exec_ctx, policy, on_initiate, on_ack); } void grpc_lb_policy_notify_on_state_change_locked( diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index d3159eebf3..38cc26422f 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -78,7 +78,7 @@ struct grpc_lb_policy_vtable { /** \see grpc_lb_policy_ping_one */ void (*ping_one_locked)(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, - grpc_closure* closure); + grpc_closure* on_initiate, grpc_closure* on_ack); /** Try to enter a READY connectivity state */ void (*exit_idle_locked)(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy); @@ -171,7 +171,8 @@ int grpc_lb_policy_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, against one of the connected subchannels managed by \a policy. */ void grpc_lb_policy_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, - grpc_closure* closure); + grpc_closure* on_initiate, + grpc_closure* on_ack); /** Cancel picks for \a target. The \a on_complete callback of the pending picks will be invoked with \a diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index db06fc20b6..704cbd8a3b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -275,18 +275,30 @@ static void add_pending_pick(pending_pick** root, typedef struct pending_ping { struct pending_ping* next; - /* args for wrapped_notify */ - wrapped_rr_closure_arg wrapped_notify_arg; + /* args for sending the ping */ + wrapped_rr_closure_arg* on_initiate; + wrapped_rr_closure_arg* on_ack; } pending_ping; -static void add_pending_ping(pending_ping** root, grpc_closure* notify) { +static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate, + grpc_closure* on_ack) { pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping)); - pping->wrapped_notify_arg.wrapped_closure = notify; - pping->wrapped_notify_arg.free_when_done = pping; + if (on_initiate != nullptr) { + pping->on_initiate = + (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_initiate)); + pping->on_initiate->wrapped_closure = on_initiate; + pping->on_initiate->free_when_done = pping->on_initiate; + GRPC_CLOSURE_INIT(&pping->on_initiate->wrapper_closure, wrapped_rr_closure, + &pping->on_initiate, grpc_schedule_on_exec_ctx); + } + if (on_ack != nullptr) { + pping->on_ack = (wrapped_rr_closure_arg*)gpr_zalloc(sizeof(*pping->on_ack)); + pping->on_ack->wrapped_closure = on_ack; + pping->on_ack->free_when_done = pping->on_ack; + GRPC_CLOSURE_INIT(&pping->on_ack->wrapper_closure, wrapped_rr_closure, + &pping->on_ack, grpc_schedule_on_exec_ctx); + } pping->next = *root; - GRPC_CLOSURE_INIT(&pping->wrapped_notify_arg.wrapper_closure, - wrapped_rr_closure, &pping->wrapped_notify_arg, - grpc_schedule_on_exec_ctx); *root = pping; } @@ -822,14 +834,25 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, pending_ping* pping; while ((pping = glb_policy->pending_pings)) { glb_policy->pending_pings = pping->next; - GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); - pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy; + grpc_closure* on_initiate = nullptr; + grpc_closure* on_ack = nullptr; + if (pping->on_initiate != nullptr) { + GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); + pping->on_initiate->rr_policy = glb_policy->rr_policy; + on_initiate = &pping->on_initiate->wrapper_closure; + } + if (pping->on_ack != nullptr) { + GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); + pping->on_ack->rr_policy = glb_policy->rr_policy; + on_ack = &pping->on_ack->wrapper_closure; + } if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", glb_policy, glb_policy->rr_policy); } - grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, - &pping->wrapped_notify_arg.wrapper_closure); + grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, on_initiate, + on_ack); + gpr_free(pping); } } @@ -1052,8 +1075,16 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { while (pping != nullptr) { pending_ping* next = pping->next; - GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, - GRPC_ERROR_REF(error)); + if (pping->on_initiate != nullptr) { + GRPC_CLOSURE_SCHED(exec_ctx, &pping->on_initiate->wrapper_closure, + GRPC_ERROR_REF(error)); + gpr_free(pping->on_initiate); + } + if (pping->on_ack != nullptr) { + GRPC_CLOSURE_SCHED(exec_ctx, &pping->on_ack->wrapper_closure, + GRPC_ERROR_REF(error)); + gpr_free(pping->on_ack); + } gpr_free(pping); pping = next; } @@ -1251,12 +1282,14 @@ static grpc_connectivity_state glb_check_connectivity_locked( } static void glb_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, - grpc_closure* closure) { + grpc_closure* on_initiate, + grpc_closure* on_ack) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; if (glb_policy->rr_policy) { - grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure); + grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, on_initiate, + on_ack); } else { - add_pending_ping(&glb_policy->pending_pings, closure); + add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack); if (!glb_policy->started_picking) { start_picking_locked(exec_ctx, glb_policy); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc index 2c8d7f4291..fc781da330 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc @@ -215,9 +215,6 @@ grpc_grpclb_serverlist* grpc_grpclb_response_parse_serverlist( return nullptr; } } - if (res.server_list.has_expiration_interval) { - sl->expiration_interval = res.server_list.expiration_interval; - } return sl; } @@ -237,8 +234,6 @@ grpc_grpclb_serverlist* grpc_grpclb_serverlist_copy( grpc_grpclb_serverlist* copy = (grpc_grpclb_serverlist*)gpr_zalloc(sizeof(grpc_grpclb_serverlist)); copy->num_servers = sl->num_servers; - memcpy(©->expiration_interval, &sl->expiration_interval, - sizeof(grpc_grpclb_duration)); copy->servers = (grpc_grpclb_server**)gpr_malloc(sizeof(grpc_grpclb_server*) * sl->num_servers); for (size_t i = 0; i < sl->num_servers; i++) { @@ -257,10 +252,6 @@ bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist* lhs, if (lhs->num_servers != rhs->num_servers) { return false; } - if (grpc_grpclb_duration_compare(&lhs->expiration_interval, - &rhs->expiration_interval) != 0) { - return false; - } for (size_t i = 0; i < lhs->num_servers; i++) { if (!grpc_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) { return false; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index 017c40ec1a..ccb0212643 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -35,7 +35,6 @@ typedef grpc_lb_v1_Duration grpc_grpclb_duration; typedef struct { grpc_grpclb_server** servers; size_t num_servers; - grpc_grpclb_duration expiration_interval; } grpc_grpclb_serverlist; /** Create a request for a gRPC LB service under \a lb_service_name */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c index 6a5d54c82a..4e6c5cc832 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c @@ -61,9 +61,8 @@ const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3] = { PB_LAST_FIELD }; -const pb_field_t grpc_lb_v1_ServerList_fields[3] = { +const pb_field_t grpc_lb_v1_ServerList_fields[2] = { PB_FIELD( 1, MESSAGE , REPEATED, CALLBACK, FIRST, grpc_lb_v1_ServerList, servers, servers, &grpc_lb_v1_Server_fields), - PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ServerList, expiration_interval, servers, &grpc_lb_v1_Duration_fields), PB_LAST_FIELD }; @@ -85,7 +84,7 @@ const pb_field_t grpc_lb_v1_Server_fields[5] = { * numbers or field sizes that are larger than what can fit in 8 or 16 bit * field descriptors. */ -PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) +PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) #endif #if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT) @@ -96,7 +95,7 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) * numbers or field sizes that are larger than what can fit in the default * 8 bit descriptors. */ -PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) +PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) #endif diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h index 93333d1aed..066c076202 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h @@ -14,6 +14,11 @@ extern "C" { #endif /* Struct definitions */ +typedef struct _grpc_lb_v1_ServerList { + pb_callback_t servers; +/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */ +} grpc_lb_v1_ServerList; + typedef struct _grpc_lb_v1_ClientStatsPerToken { pb_callback_t load_balance_token; bool has_num_calls; @@ -79,13 +84,6 @@ typedef struct _grpc_lb_v1_InitialLoadBalanceResponse { /* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceResponse) */ } grpc_lb_v1_InitialLoadBalanceResponse; -typedef struct _grpc_lb_v1_ServerList { - pb_callback_t servers; - bool has_expiration_interval; - grpc_lb_v1_Duration expiration_interval; -/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */ -} grpc_lb_v1_ServerList; - typedef struct _grpc_lb_v1_LoadBalanceRequest { bool has_initial_request; grpc_lb_v1_InitialLoadBalanceRequest initial_request; @@ -113,7 +111,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse { #define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}} #define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default} #define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default} -#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default} +#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}} #define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0} #define grpc_lb_v1_Duration_init_zero {false, 0, false, 0} #define grpc_lb_v1_Timestamp_init_zero {false, 0, false, 0} @@ -123,10 +121,11 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse { #define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}} #define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero} #define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero} -#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero} +#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}} #define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0} /* Field tags (for use in manual encoding/decoding) */ +#define grpc_lb_v1_ServerList_servers_tag 1 #define grpc_lb_v1_ClientStatsPerToken_load_balance_token_tag 1 #define grpc_lb_v1_ClientStatsPerToken_num_calls_tag 2 #define grpc_lb_v1_Duration_seconds_tag 1 @@ -146,8 +145,6 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse { #define grpc_lb_v1_ClientStats_calls_finished_with_drop_tag 8 #define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1 #define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2 -#define grpc_lb_v1_ServerList_servers_tag 1 -#define grpc_lb_v1_ServerList_expiration_interval_tag 3 #define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1 #define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2 #define grpc_lb_v1_LoadBalanceResponse_initial_response_tag 1 @@ -162,7 +159,7 @@ extern const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3]; extern const pb_field_t grpc_lb_v1_ClientStats_fields[7]; extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3]; extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3]; -extern const pb_field_t grpc_lb_v1_ServerList_fields[3]; +extern const pb_field_t grpc_lb_v1_ServerList_fields[2]; extern const pb_field_t grpc_lb_v1_Server_fields[5]; /* Maximum encoded size of messages (where known) */ diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 228a77d9db..b2007ca301 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -226,13 +226,16 @@ static void pf_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx, } static void pf_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, - grpc_closure* closure) { + grpc_closure* on_initiate, + grpc_closure* on_ack) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; if (p->selected) { grpc_connected_subchannel_ping(exec_ctx, p->selected->connected_subchannel, - closure); + on_initiate, on_ack); } else { - GRPC_CLOSURE_SCHED(exec_ctx, closure, + GRPC_CLOSURE_SCHED(exec_ctx, on_initiate, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); + GRPC_CLOSURE_SCHED(exec_ctx, on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index c05557ba6f..df458d339d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -540,7 +540,8 @@ static void rr_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx, } static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, - grpc_closure* closure) { + grpc_closure* on_initiate, + grpc_closure* on_ack) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); if (next_ready_index < p->subchannel_list->num_subchannels) { @@ -548,11 +549,14 @@ static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, &p->subchannel_list->subchannels[next_ready_index]; grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( selected->connected_subchannel, "rr_ping"); - grpc_connected_subchannel_ping(exec_ctx, target, closure); + grpc_connected_subchannel_ping(exec_ctx, target, on_initiate, on_ack); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_ping"); } else { GRPC_CLOSURE_SCHED( - exec_ctx, closure, + exec_ctx, on_initiate, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected")); + GRPC_CLOSURE_SCHED( + exec_ctx, on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected")); } } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 58e294d597..bff2ae487d 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -584,10 +584,12 @@ void grpc_connected_subchannel_notify_on_state_change( void grpc_connected_subchannel_ping(grpc_exec_ctx* exec_ctx, grpc_connected_subchannel* con, - grpc_closure* closure) { + grpc_closure* on_initiate, + grpc_closure* on_ack) { grpc_transport_op* op = grpc_make_transport_op(nullptr); grpc_channel_element* elem; - op->send_ping = closure; + op->send_ping.on_initiate = on_initiate; + op->send_ping.on_ack = on_ack; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); elem->filter->start_transport_op(exec_ctx, elem, op); } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 1f326fc1d2..3916ea00ca 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -135,7 +135,8 @@ void grpc_connected_subchannel_notify_on_state_change( grpc_closure* notify); void grpc_connected_subchannel_ping(grpc_exec_ctx* exec_ctx, grpc_connected_subchannel* channel, - grpc_closure* notify); + grpc_closure* on_initiate, + grpc_closure* on_ack); /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 63ac65ac78..ea637e6bec 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1815,8 +1815,9 @@ static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx, grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set); } - if (op->send_ping) { - send_ping_locked(exec_ctx, t, nullptr, op->send_ping); + if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { + send_ping_locked(exec_ctx, t, op->send_ping.on_initiate, + op->send_ping.on_ack); grpc_chttp2_initiate_write(exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); } diff --git a/src/core/lib/surface/channel_ping.cc b/src/core/lib/surface/channel_ping.cc index e8f47f01cf..7b1964fd55 100644 --- a/src/core/lib/surface/channel_ping.cc +++ b/src/core/lib/surface/channel_ping.cc @@ -57,7 +57,7 @@ void grpc_channel_ping(grpc_channel* channel, grpc_completion_queue* cq, pr->tag = tag; pr->cq = cq; GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx); - op->send_ping = &pr->closure; + op->send_ping.on_ack = &pr->closure; op->bind_pollset = grpc_cq_pollset(cq); GPR_ASSERT(grpc_cq_begin_op(cq, tag)); top_elem->filter->start_transport_op(&exec_ctx, top_elem, op); diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index c32c9af50e..559d7af43e 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -104,9 +104,14 @@ static void lame_start_transport_op(grpc_exec_ctx* exec_ctx, GRPC_CLOSURE_SCHED(exec_ctx, op->on_connectivity_state_change, GRPC_ERROR_NONE); } - if (op->send_ping != nullptr) { + if (op->send_ping.on_initiate != nullptr) { GRPC_CLOSURE_SCHED( - exec_ctx, op->send_ping, + exec_ctx, op->send_ping.on_initiate, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); + } + if (op->send_ping.on_ack != nullptr) { + GRPC_CLOSURE_SCHED( + exec_ctx, op->send_ping.on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); } GRPC_ERROR_UNREF(op->disconnect_with_error); diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index b3cf04c22d..73264142d9 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -245,8 +245,14 @@ typedef struct grpc_transport_op { grpc_pollset* bind_pollset; /** add this transport to a pollset_set */ grpc_pollset_set* bind_pollset_set; - /** send a ping, call this back if not NULL */ - grpc_closure* send_ping; + /** send a ping, if either on_initiate or on_ack is not NULL */ + struct { + /** Ping may be delayed by the transport, on_initiate callback will be + called when the ping is actually being sent. */ + grpc_closure* on_initiate; + /** Called when the ping ack is received */ + grpc_closure* on_ack; + } send_ping; /*************************************************************************** * remaining fields are initialized and used at the discretion of the diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc index e69ab02570..c0f82fea0d 100644 --- a/src/core/lib/transport/transport_op_string.cc +++ b/src/core/lib/transport/transport_op_string.cc @@ -187,7 +187,7 @@ char* grpc_transport_op_string(grpc_transport_op* op) { gpr_strvec_add(&b, gpr_strdup("BIND_POLLSET_SET")); } - if (op->send_ping != nullptr) { + if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); // first = false; gpr_strvec_add(&b, gpr_strdup("SEND_PING")); diff --git a/src/proto/grpc/lb/v1/load_balancer.proto b/src/proto/grpc/lb/v1/load_balancer.proto index 0a33568bd6..75c916defa 100644 --- a/src/proto/grpc/lb/v1/load_balancer.proto +++ b/src/proto/grpc/lb/v1/load_balancer.proto @@ -133,11 +133,8 @@ message ServerList { // unless instructed otherwise via the client_config. repeated Server servers = 1; - // Indicates the amount of time that the client should consider this server - // list as valid. It may be considered stale after waiting this interval of - // time after receiving the list. If the interval is not positive, the - // client can assume the list is valid until the next list is received. - Duration expiration_interval = 3; + // Was google.protobuf.Duration expiration_interval. + reserved 3; } // Contains server information. When the drop field is not true, use the other diff --git a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb index 59f6f275e4..8f078cfbed 100755 --- a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb +++ b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb @@ -20,7 +20,7 @@ Thread.abort_on_exception = true include GRPC::Core::ConnectivityStates -def watch_state(ch) +def watch_state(ch, sleep_time) thd = Thread.new do state = ch.connectivity_state(false) fail "non-idle state: #{state}" unless state == IDLE @@ -28,23 +28,34 @@ def watch_state(ch) end # sleep to get the thread into the middle of a # "watch connectivity state" call - sleep 0.1 + sleep sleep_time thd.kill end -def main +def run_multiple_killed_watches(num_threads, sleep_time) channels = [] - 10.times do + num_threads.times do ch = GRPC::Core::Channel.new('dummy_host', nil, :this_channel_is_insecure) - watch_state(ch) + watch_state(ch, sleep_time) channels << ch end # checking state should still be safe to call channels.each do |c| - fail unless c.connectivity_state(false) == FATAL_FAILURE + connectivity_state = c.connectivity_state(false) + # The state should be FATAL_FAILURE in the case that it was interrupted + # while watching connectivity state, and IDLE if it we never started + # watching the channel's connectivity state + unless [FATAL_FAILURE, IDLE].include?(connectivity_state) + fail "unexpected connectivity state: #{connectivity_state}" + end end end +def main + run_multiple_killed_watches(10, 0.1) + run_multiple_killed_watches(1000, 0.001) +end + main |