diff options
Diffstat (limited to 'src')
15 files changed, 146 insertions, 114 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 50d562f946..e1356d90b3 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -402,13 +402,9 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { if (chand->resolver_result != nullptr) { if (chand->resolver != nullptr) { // Find LB policy name. - const char* lb_policy_name = nullptr; const grpc_arg* channel_arg = grpc_channel_args_find( chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); - if (channel_arg != nullptr) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - lb_policy_name = channel_arg->value.string; - } + const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg); // Special case: If at least one balancer address is present, we use // the grpclb policy, regardless of what the resolver actually specified. channel_arg = @@ -475,17 +471,17 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { // Find service config. channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); - if (channel_arg != nullptr) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - service_config_json = gpr_strdup(channel_arg->value.string); + service_config_json = + gpr_strdup(grpc_channel_arg_get_string(channel_arg)); + if (service_config_json != nullptr) { grpc_service_config* service_config = grpc_service_config_create(service_config_json); if (service_config != nullptr) { channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); - GPR_ASSERT(channel_arg != nullptr); - GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - grpc_uri* uri = grpc_uri_parse(channel_arg->value.string, true); + const char* server_uri = grpc_channel_arg_get_string(channel_arg); + GPR_ASSERT(server_uri != nullptr); + grpc_uri* uri = grpc_uri_parse(server_uri, true); GPR_ASSERT(uri->path[0] != '\0'); service_config_parsing_state parsing_state; memset(&parsing_state, 0, sizeof(parsing_state)); diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index 6bb4cefe73..248a6347d5 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -254,7 +254,8 @@ static void http_connect_handshaker_do_handshake( // If not found, invoke on_handshake_done without doing anything. const grpc_arg* arg = grpc_channel_args_find(args->args, GRPC_ARG_HTTP_CONNECT_SERVER); - if (arg == nullptr) { + char* server_name = grpc_channel_arg_get_string(arg); + if (server_name == nullptr) { // Set shutdown to true so that subsequent calls to // http_connect_handshaker_shutdown() do nothing. gpr_mu_lock(&handshaker->mu); @@ -263,17 +264,15 @@ static void http_connect_handshaker_do_handshake( GRPC_CLOSURE_SCHED(on_handshake_done, GRPC_ERROR_NONE); return; } - GPR_ASSERT(arg->type == GRPC_ARG_STRING); - char* server_name = arg->value.string; // Get headers from channel args. arg = grpc_channel_args_find(args->args, GRPC_ARG_HTTP_CONNECT_HEADERS); + char* arg_header_string = grpc_channel_arg_get_string(arg); grpc_http_header* headers = nullptr; size_t num_headers = 0; char** header_strings = nullptr; size_t num_header_strings = 0; - if (arg != nullptr) { - GPR_ASSERT(arg->type == GRPC_ARG_STRING); - gpr_string_split(arg->value.string, "\n", &header_strings, + if (arg_header_string != nullptr) { + gpr_string_split(arg_header_string, "\n", &header_strings, &num_header_strings); headers = static_cast<grpc_http_header*>( gpr_malloc(sizeof(grpc_http_header) * num_header_strings)); diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index 27fb2ad1f4..cc4fe7ec62 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -118,8 +118,7 @@ void grpc_lb_policy_update_locked(grpc_lb_policy* policy, void grpc_lb_policy_set_reresolve_closure_locked( grpc_lb_policy* policy, grpc_closure* request_reresolution) { - GPR_ASSERT(policy->request_reresolution == nullptr); - policy->request_reresolution = request_reresolution; + policy->vtable->set_reresolve_closure_locked(policy, request_reresolution); } void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy, @@ -134,8 +133,8 @@ void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy, grpc_lb_trace->name(), policy, grpc_error_string(error)); } } else { - if (grpc_lb_trace->enabled()) { - gpr_log(GPR_DEBUG, "%s %p: no available re-resolution closure.", + if (grpc_lb_trace->enabled() && error == GRPC_ERROR_NONE) { + gpr_log(GPR_DEBUG, "%s %p: re-resolution already in progress.", grpc_lb_trace->name(), policy); } } diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 6edd314d5e..30660cb83d 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -107,6 +107,10 @@ struct grpc_lb_policy_vtable { void (*update_locked)(grpc_lb_policy* policy, const grpc_lb_policy_args* args); + + /** \see grpc_lb_policy_set_reresolve_closure */ + void (*set_reresolve_closure_locked)(grpc_lb_policy* policy, + grpc_closure* request_reresolution); }; #ifndef NDEBUG diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 1c8809eabc..21c95460c8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -249,7 +249,7 @@ typedef struct glb_lb_policy { /** the RR policy to use of the backend servers returned by the LB server */ grpc_lb_policy* rr_policy; - /** the connectivity state of the embedded RR policy */ + grpc_closure on_rr_connectivity_changed; grpc_connectivity_state rr_connectivity_state; bool started_picking; @@ -292,12 +292,6 @@ typedef struct glb_lb_policy { /** called upon changes to the LB channel's connectivity. */ grpc_closure lb_channel_on_connectivity_changed; - /** called upon changes to the RR's connectivity. */ - grpc_closure rr_on_connectivity_changed; - - /** called upon reresolution request from the RR policy. */ - grpc_closure rr_on_reresolution_requested; - /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ @@ -596,8 +590,9 @@ static grpc_lb_addresses* extract_backend_addresses_locked( return backend_addresses; } -static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy, - grpc_error* rr_state_error) { +static void update_lb_connectivity_status_locked( + glb_lb_policy* glb_policy, grpc_connectivity_state rr_state, + grpc_error* rr_state_error) { const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(&glb_policy->state_tracker); /* The new connectivity status is a function of the previous one and the new @@ -629,7 +624,7 @@ static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy, * * (*) This function mustn't be called during shutting down. */ GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); - switch (glb_policy->rr_connectivity_state) { + switch (rr_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_SHUTDOWN: GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); @@ -643,12 +638,11 @@ static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy, gpr_log( GPR_INFO, "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.", - glb_policy, - grpc_connectivity_state_name(glb_policy->rr_connectivity_state), + glb_policy, grpc_connectivity_state_name(rr_state), glb_policy->rr_policy); } - grpc_connectivity_state_set(&glb_policy->state_tracker, - glb_policy->rr_connectivity_state, rr_state_error, + grpc_connectivity_state_set(&glb_policy->state_tracker, rr_state, + rr_state_error, "update_lb_connectivity_status_locked"); } @@ -746,36 +740,11 @@ static void lb_policy_args_destroy(grpc_lb_policy_args* args) { gpr_free(args); } -static void rr_on_reresolution_requested_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - if (glb_policy->shutting_down || error != GRPC_ERROR_NONE) { - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "rr_on_reresolution_requested_locked"); - return; - } - if (grpc_lb_glb_trace.enabled()) { - gpr_log( - GPR_DEBUG, - "[grpclb %p] Re-resolution requested from the internal RR policy (%p).", - glb_policy, glb_policy->rr_policy); - } - // If we are talking to a balancer, we expect to get updated addresses form - // the balancer, so we can ignore the re-resolution request from the RR - // policy. Otherwise, handle the re-resolution request using glb's original - // re-resolution closure. - if (glb_policy->lb_calld == nullptr || - !glb_policy->lb_calld->seen_initial_response) { - grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace, - GRPC_ERROR_NONE); - } - // Give back the wrapper closure to the RR policy. - grpc_lb_policy_set_reresolve_closure_locked( - glb_policy->rr_policy, &glb_policy->rr_on_reresolution_requested); -} - +static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error); static void create_rr_locked(glb_lb_policy* glb_policy, grpc_lb_policy_args* args) { GPR_ASSERT(glb_policy->rr_policy == nullptr); + grpc_lb_policy* new_rr_policy = grpc_lb_policy_create("round_robin", args); if (new_rr_policy == nullptr) { gpr_log(GPR_ERROR, @@ -788,25 +757,29 @@ static void create_rr_locked(glb_lb_policy* glb_policy, glb_policy->rr_policy); return; } - GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_reresolution_requested_locked"); grpc_lb_policy_set_reresolve_closure_locked( - new_rr_policy, &glb_policy->rr_on_reresolution_requested); + new_rr_policy, glb_policy->base.request_reresolution); + glb_policy->base.request_reresolution = nullptr; glb_policy->rr_policy = new_rr_policy; grpc_error* rr_state_error = nullptr; glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked( glb_policy->rr_policy, &rr_state_error); /* Connectivity state is a function of the RR policy updated/created */ - update_lb_connectivity_status_locked(glb_policy, rr_state_error); + update_lb_connectivity_status_locked( + glb_policy, glb_policy->rr_connectivity_state, rr_state_error); /* Add the gRPC LB's interested_parties pollset_set to that of the newly * created RR policy. This will make the RR policy progress upon activity on * gRPC LB, which in turn is tied to the application's call */ grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties, glb_policy->base.interested_parties); + GRPC_CLOSURE_INIT(&glb_policy->on_rr_connectivity_changed, + on_rr_connectivity_changed_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner)); /* Subscribe to changes to the connectivity of the new RR */ - GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_connectivity_changed_locked"); + GRPC_LB_POLICY_REF(&glb_policy->base, "glb_rr_connectivity_cb"); grpc_lb_policy_notify_on_state_change_locked( glb_policy->rr_policy, &glb_policy->rr_connectivity_state, - &glb_policy->rr_on_connectivity_changed); + &glb_policy->on_rr_connectivity_changed); grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy); // Send pending picks to RR policy. pending_pick* pp; @@ -854,18 +827,28 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) { lb_policy_args_destroy(args); } -static void rr_on_connectivity_changed_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; +static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error) { + glb_lb_policy* glb_policy = static_cast<glb_lb_policy*>(arg); if (glb_policy->shutting_down) { - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "rr_on_connectivity_changed_locked"); + GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); + return; + } + if (glb_policy->rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + /* An RR policy that has transitioned into the SHUTDOWN connectivity state + * should not be considered for picks or updates: the SHUTDOWN state is a + * sink, policies can't transition back from it. .*/ + GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown"); + glb_policy->rr_policy = nullptr; + GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb"); return; } - update_lb_connectivity_status_locked(glb_policy, GRPC_ERROR_REF(error)); - // Resubscribe. Reuse the "rr_on_connectivity_changed_locked" ref. + /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */ + update_lb_connectivity_status_locked( + glb_policy, glb_policy->rr_connectivity_state, GRPC_ERROR_REF(error)); + /* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */ grpc_lb_policy_notify_on_state_change_locked( glb_policy->rr_policy, &glb_policy->rr_connectivity_state, - &glb_policy->rr_on_connectivity_changed); + &glb_policy->on_rr_connectivity_changed); } static void destroy_balancer_name(void* balancer_name) { @@ -988,6 +971,8 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, if (glb_policy->rr_policy != nullptr) { grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr); GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown"); + } else { + grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); } // We destroy the LB channel here because // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy @@ -999,7 +984,6 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, } grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "glb_shutdown"); - grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED); // Clear pending picks. pending_pick* pp = glb_policy->pending_picks; glb_policy->pending_picks = nullptr; @@ -1639,8 +1623,6 @@ static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { lb_calld, lb_calld->lb_call, grpc_error_string(error)); gpr_free(status_details); } - grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace, - GRPC_ERROR_NONE); // If this lb_calld is still in use, this call ended because of a failure so // we want to retry connecting. Otherwise, we have deliberately ended this // call and no further action is required. @@ -1669,15 +1651,16 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { glb_policy->fallback_timer_callback_pending = false; /* If we receive a serverlist after the timer fires but before this callback * actually runs, don't fall back. */ - if (glb_policy->serverlist == nullptr && !glb_policy->shutting_down && - error == GRPC_ERROR_NONE) { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Falling back to use backends from resolver", - glb_policy); + if (glb_policy->serverlist == nullptr) { + if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, + "[grpclb %p] Falling back to use backends from resolver", + glb_policy); + } + GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr); + rr_handover_locked(glb_policy); } - GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr); - rr_handover_locked(glb_policy); } GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer"); } @@ -1798,6 +1781,19 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, } } +static void glb_set_reresolve_closure_locked( + grpc_lb_policy* policy, grpc_closure* request_reresolution) { + glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(policy); + GPR_ASSERT(!glb_policy->shutting_down); + GPR_ASSERT(glb_policy->base.request_reresolution == nullptr); + if (glb_policy->rr_policy != nullptr) { + grpc_lb_policy_set_reresolve_closure_locked(glb_policy->rr_policy, + request_reresolution); + } else { + glb_policy->base.request_reresolution = request_reresolution; + } +} + /* Code wiring the policy with the rest of the core */ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_destroy, @@ -1809,7 +1805,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_exit_idle_locked, glb_check_connectivity_locked, glb_notify_on_state_change_locked, - glb_update_locked}; + glb_update_locked, + glb_set_reresolve_closure_locked}; static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) { @@ -1832,9 +1829,9 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory, /* Get server name. */ arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); - GPR_ASSERT(arg != nullptr); - GPR_ASSERT(arg->type == GRPC_ARG_STRING); - grpc_uri* uri = grpc_uri_parse(arg->value.string, true); + const char* server_uri = grpc_channel_arg_get_string(arg); + GPR_ASSERT(server_uri != nullptr); + grpc_uri* uri = grpc_uri_parse(server_uri, true); GPR_ASSERT(uri->path[0] != '\0'); glb_policy->server_name = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); @@ -1890,12 +1887,6 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory, return nullptr; } grpc_subchannel_index_ref(); - GRPC_CLOSURE_INIT(&glb_policy->rr_on_connectivity_changed, - rr_on_connectivity_changed_locked, glb_policy, - grpc_combiner_scheduler(args->combiner)); - GRPC_CLOSURE_INIT(&glb_policy->rr_on_reresolution_requested, - rr_on_reresolution_requested_locked, glb_policy, - grpc_combiner_scheduler(args->combiner)); GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, glb_lb_channel_on_connectivity_changed_cb, glb_policy, grpc_combiner_scheduler(args->combiner)); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 296bdcb247..1485f7caf5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -520,6 +520,14 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) { } } +static void pf_set_reresolve_closure_locked( + grpc_lb_policy* policy, grpc_closure* request_reresolution) { + pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(policy); + GPR_ASSERT(!p->shutdown); + GPR_ASSERT(policy->request_reresolution == nullptr); + policy->request_reresolution = request_reresolution; +} + static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_destroy, pf_shutdown_locked, @@ -530,7 +538,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_exit_idle_locked, pf_check_connectivity_locked, pf_notify_on_state_change_locked, - pf_update_locked}; + pf_update_locked, + pf_set_reresolve_closure_locked}; static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index b5b4c44ef1..cefd0d8d7d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -620,6 +620,14 @@ static void rr_update_locked(grpc_lb_policy* policy, } } +static void rr_set_reresolve_closure_locked( + grpc_lb_policy* policy, grpc_closure* request_reresolution) { + round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(policy); + GPR_ASSERT(!p->shutdown); + GPR_ASSERT(policy->request_reresolution == nullptr); + policy->request_reresolution = request_reresolution; +} + static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_destroy, rr_shutdown_locked, @@ -630,7 +638,8 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_exit_idle_locked, rr_check_connectivity_locked, rr_notify_on_state_change_locked, - rr_update_locked}; + rr_update_locked, + rr_set_reresolve_closure_locked}; static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {} diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 179e3f27ac..fbe07c58f7 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -728,9 +728,9 @@ void grpc_get_subchannel_address_arg(const grpc_channel_args* args, const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) { const grpc_arg* addr_arg = grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); - GPR_ASSERT(addr_arg != nullptr); // Should have been set by LB policy. - GPR_ASSERT(addr_arg->type == GRPC_ARG_STRING); - return addr_arg->value.string; + const char* addr_str = grpc_channel_arg_get_string(addr_arg); + GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. + return addr_str; } grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index 49d9ae60ae..0fb3935609 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -249,10 +249,10 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, // Get method config table from channel args. const grpc_arg* channel_arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG); - if (channel_arg != nullptr) { - GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); + const char* service_config_str = grpc_channel_arg_get_string(channel_arg); + if (service_config_str != nullptr) { grpc_service_config* service_config = - grpc_service_config_create(channel_arg->value.string); + grpc_service_config_create(service_config_str); if (service_config != nullptr) { chand->method_limit_table = grpc_service_config_create_method_config_table( diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc index 8c4025ed13..8f896f70b4 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc @@ -63,9 +63,7 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args( // To which address are we connecting? By default, use the server URI. const grpc_arg* server_uri_arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); - GPR_ASSERT(server_uri_arg != nullptr); - GPR_ASSERT(server_uri_arg->type == GRPC_ARG_STRING); - const char* server_uri_str = server_uri_arg->value.string; + const char* server_uri_str = grpc_channel_arg_get_string(server_uri_arg); GPR_ASSERT(server_uri_str != nullptr); grpc_uri* server_uri = grpc_uri_parse(server_uri_str, true /* supress errors */); diff --git a/src/core/lib/channel/channel_args.cc b/src/core/lib/channel/channel_args.cc index 98fdfa6b9a..66a86c2286 100644 --- a/src/core/lib/channel/channel_args.cc +++ b/src/core/lib/channel/channel_args.cc @@ -354,6 +354,15 @@ int grpc_channel_arg_get_integer(const grpc_arg* arg, return arg->value.integer; } +char* grpc_channel_arg_get_string(const grpc_arg* arg) { + if (arg == nullptr) return nullptr; + if (arg->type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "%s ignored: it must be an string", arg->key); + return nullptr; + } + return arg->value.string; +} + bool grpc_channel_arg_get_bool(const grpc_arg* arg, bool default_value) { if (arg == nullptr) return default_value; if (arg->type != GRPC_ARG_INTEGER) { diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 73e9122e75..c59bd0c7e7 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -109,6 +109,11 @@ typedef struct grpc_integer_options { int grpc_channel_arg_get_integer(const grpc_arg* arg, const grpc_integer_options options); +/** Returns the value of \a arg if \a arg is of type GRPC_ARG_STRING. + Otherwise, emits a warning log, and returns nullptr. + If arg is nullptr, returns nullptr, and does not emit a warning. */ +char* grpc_channel_arg_get_string(const grpc_arg* arg); + bool grpc_channel_arg_get_bool(const grpc_arg* arg, bool default_value); // Helpers for creating channel args. diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 3ad94a4ecd..bb7622c4cc 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -57,7 +57,7 @@ //#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1 #define MAX_EPOLL_EVENTS 100 -#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 1 +#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16 grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false, "pollable_refcount"); @@ -196,6 +196,7 @@ struct grpc_pollset_worker { struct grpc_pollset { gpr_mu mu; + gpr_atm worker_count; pollable* active_pollable; bool kicked_without_poller; grpc_closure* shutdown_closure; @@ -683,6 +684,7 @@ static grpc_error* pollset_kick_all(grpc_pollset* pollset) { static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { gpr_mu_init(&pollset->mu); + gpr_atm_no_barrier_store(&pollset->worker_count, 0); pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset"); pollset->kicked_without_poller = false; pollset->shutdown_closure = nullptr; @@ -756,8 +758,20 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset, pollable* pollable_obj, bool drain) { GPR_TIMER_SCOPE("pollable_process_events", 0); static const char* err_desc = "pollset_process_events"; + // Use a simple heuristic to determine how many fd events to process + // per loop iteration. (events/workers) + int handle_count = 1; + int worker_count = gpr_atm_no_barrier_load(&pollset->worker_count); + GPR_ASSERT(worker_count > 0); + handle_count = + (pollable_obj->event_count - pollable_obj->event_cursor) / worker_count; + if (handle_count == 0) { + handle_count = 1; + } else if (handle_count > MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) { + handle_count = MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL; + } grpc_error* error = GRPC_ERROR_NONE; - for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && + for (int i = 0; (drain || i < handle_count) && pollable_obj->event_cursor != pollable_obj->event_count; i++) { int n = pollable_obj->event_cursor++; @@ -882,6 +896,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, GPR_TIMER_SCOPE("begin_worker", 0); bool do_poll = (pollset->shutdown_closure == nullptr && !pollset->already_shutdown); + gpr_atm_no_barrier_fetch_add(&pollset->worker_count, 1); if (worker_hdl != nullptr) *worker_hdl = worker; worker->initialized_cv = false; worker->kicked = false; @@ -962,6 +977,7 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } + gpr_atm_no_barrier_fetch_add(&pollset->worker_count, -1); } #ifndef NDEBUG diff --git a/src/core/lib/security/credentials/fake/fake_credentials.cc b/src/core/lib/security/credentials/fake/fake_credentials.cc index 3b29db2efa..fa0f89c583 100644 --- a/src/core/lib/security/credentials/fake/fake_credentials.cc +++ b/src/core/lib/security/credentials/fake/fake_credentials.cc @@ -87,11 +87,7 @@ const char* grpc_fake_transport_get_expected_targets( const grpc_channel_args* args) { const grpc_arg* expected_target_arg = grpc_channel_args_find(args, GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS); - if (expected_target_arg != nullptr && - expected_target_arg->type == GRPC_ARG_STRING) { - return expected_target_arg->value.string; - } - return nullptr; + return grpc_channel_arg_get_string(expected_target_arg); } /* -- Metadata-only test credentials. -- */ diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 79793a710e..7fa7303691 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -173,7 +173,8 @@ class Future(six.with_metaclass(abc.ABCMeta)): """Adds a function to be called at completion of the computation. The callback will be passed this Future object describing the outcome - of the computation. + of the computation. Callbacks will be invoked after the future is + terimated, whether successfully or not. If the computation has already completed, the callback will be called immediately. |