aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc18
-rw-r--r--src/core/ext/filters/client_channel/http_connect_handshaker.cc11
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.cc7
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h4
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc139
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc11
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc11
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc6
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.cc6
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc4
-rw-r--r--src/core/lib/channel/channel_args.cc9
-rw-r--r--src/core/lib/channel/channel_args.h5
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc20
-rw-r--r--src/core/lib/security/credentials/fake/fake_credentials.cc6
-rw-r--r--src/python/grpcio/grpc/__init__.py3
15 files changed, 146 insertions, 114 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 50d562f946..e1356d90b3 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -402,13 +402,9 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
if (chand->resolver_result != nullptr) {
if (chand->resolver != nullptr) {
// Find LB policy name.
- const char* lb_policy_name = nullptr;
const grpc_arg* channel_arg = grpc_channel_args_find(
chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
- if (channel_arg != nullptr) {
- GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
- lb_policy_name = channel_arg->value.string;
- }
+ const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
// Special case: If at least one balancer address is present, we use
// the grpclb policy, regardless of what the resolver actually specified.
channel_arg =
@@ -475,17 +471,17 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
// Find service config.
channel_arg = grpc_channel_args_find(chand->resolver_result,
GRPC_ARG_SERVICE_CONFIG);
- if (channel_arg != nullptr) {
- GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
- service_config_json = gpr_strdup(channel_arg->value.string);
+ service_config_json =
+ gpr_strdup(grpc_channel_arg_get_string(channel_arg));
+ if (service_config_json != nullptr) {
grpc_service_config* service_config =
grpc_service_config_create(service_config_json);
if (service_config != nullptr) {
channel_arg = grpc_channel_args_find(chand->resolver_result,
GRPC_ARG_SERVER_URI);
- GPR_ASSERT(channel_arg != nullptr);
- GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
- grpc_uri* uri = grpc_uri_parse(channel_arg->value.string, true);
+ const char* server_uri = grpc_channel_arg_get_string(channel_arg);
+ GPR_ASSERT(server_uri != nullptr);
+ grpc_uri* uri = grpc_uri_parse(server_uri, true);
GPR_ASSERT(uri->path[0] != '\0');
service_config_parsing_state parsing_state;
memset(&parsing_state, 0, sizeof(parsing_state));
diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
index 6bb4cefe73..248a6347d5 100644
--- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc
+++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc
@@ -254,7 +254,8 @@ static void http_connect_handshaker_do_handshake(
// If not found, invoke on_handshake_done without doing anything.
const grpc_arg* arg =
grpc_channel_args_find(args->args, GRPC_ARG_HTTP_CONNECT_SERVER);
- if (arg == nullptr) {
+ char* server_name = grpc_channel_arg_get_string(arg);
+ if (server_name == nullptr) {
// Set shutdown to true so that subsequent calls to
// http_connect_handshaker_shutdown() do nothing.
gpr_mu_lock(&handshaker->mu);
@@ -263,17 +264,15 @@ static void http_connect_handshaker_do_handshake(
GRPC_CLOSURE_SCHED(on_handshake_done, GRPC_ERROR_NONE);
return;
}
- GPR_ASSERT(arg->type == GRPC_ARG_STRING);
- char* server_name = arg->value.string;
// Get headers from channel args.
arg = grpc_channel_args_find(args->args, GRPC_ARG_HTTP_CONNECT_HEADERS);
+ char* arg_header_string = grpc_channel_arg_get_string(arg);
grpc_http_header* headers = nullptr;
size_t num_headers = 0;
char** header_strings = nullptr;
size_t num_header_strings = 0;
- if (arg != nullptr) {
- GPR_ASSERT(arg->type == GRPC_ARG_STRING);
- gpr_string_split(arg->value.string, "\n", &header_strings,
+ if (arg_header_string != nullptr) {
+ gpr_string_split(arg_header_string, "\n", &header_strings,
&num_header_strings);
headers = static_cast<grpc_http_header*>(
gpr_malloc(sizeof(grpc_http_header) * num_header_strings));
diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc
index 27fb2ad1f4..cc4fe7ec62 100644
--- a/src/core/ext/filters/client_channel/lb_policy.cc
+++ b/src/core/ext/filters/client_channel/lb_policy.cc
@@ -118,8 +118,7 @@ void grpc_lb_policy_update_locked(grpc_lb_policy* policy,
void grpc_lb_policy_set_reresolve_closure_locked(
grpc_lb_policy* policy, grpc_closure* request_reresolution) {
- GPR_ASSERT(policy->request_reresolution == nullptr);
- policy->request_reresolution = request_reresolution;
+ policy->vtable->set_reresolve_closure_locked(policy, request_reresolution);
}
void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
@@ -134,8 +133,8 @@ void grpc_lb_policy_try_reresolve(grpc_lb_policy* policy,
grpc_lb_trace->name(), policy, grpc_error_string(error));
}
} else {
- if (grpc_lb_trace->enabled()) {
- gpr_log(GPR_DEBUG, "%s %p: no available re-resolution closure.",
+ if (grpc_lb_trace->enabled() && error == GRPC_ERROR_NONE) {
+ gpr_log(GPR_DEBUG, "%s %p: re-resolution already in progress.",
grpc_lb_trace->name(), policy);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 6edd314d5e..30660cb83d 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -107,6 +107,10 @@ struct grpc_lb_policy_vtable {
void (*update_locked)(grpc_lb_policy* policy,
const grpc_lb_policy_args* args);
+
+ /** \see grpc_lb_policy_set_reresolve_closure */
+ void (*set_reresolve_closure_locked)(grpc_lb_policy* policy,
+ grpc_closure* request_reresolution);
};
#ifndef NDEBUG
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 1c8809eabc..21c95460c8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -249,7 +249,7 @@ typedef struct glb_lb_policy {
/** the RR policy to use of the backend servers returned by the LB server */
grpc_lb_policy* rr_policy;
- /** the connectivity state of the embedded RR policy */
+ grpc_closure on_rr_connectivity_changed;
grpc_connectivity_state rr_connectivity_state;
bool started_picking;
@@ -292,12 +292,6 @@ typedef struct glb_lb_policy {
/** called upon changes to the LB channel's connectivity. */
grpc_closure lb_channel_on_connectivity_changed;
- /** called upon changes to the RR's connectivity. */
- grpc_closure rr_on_connectivity_changed;
-
- /** called upon reresolution request from the RR policy. */
- grpc_closure rr_on_reresolution_requested;
-
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
@@ -596,8 +590,9 @@ static grpc_lb_addresses* extract_backend_addresses_locked(
return backend_addresses;
}
-static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy,
- grpc_error* rr_state_error) {
+static void update_lb_connectivity_status_locked(
+ glb_lb_policy* glb_policy, grpc_connectivity_state rr_state,
+ grpc_error* rr_state_error) {
const grpc_connectivity_state curr_glb_state =
grpc_connectivity_state_check(&glb_policy->state_tracker);
/* The new connectivity status is a function of the previous one and the new
@@ -629,7 +624,7 @@ static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy,
*
* (*) This function mustn't be called during shutting down. */
GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
- switch (glb_policy->rr_connectivity_state) {
+ switch (rr_state) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_SHUTDOWN:
GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
@@ -643,12 +638,11 @@ static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy,
gpr_log(
GPR_INFO,
"[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
- glb_policy,
- grpc_connectivity_state_name(glb_policy->rr_connectivity_state),
+ glb_policy, grpc_connectivity_state_name(rr_state),
glb_policy->rr_policy);
}
- grpc_connectivity_state_set(&glb_policy->state_tracker,
- glb_policy->rr_connectivity_state, rr_state_error,
+ grpc_connectivity_state_set(&glb_policy->state_tracker, rr_state,
+ rr_state_error,
"update_lb_connectivity_status_locked");
}
@@ -746,36 +740,11 @@ static void lb_policy_args_destroy(grpc_lb_policy_args* args) {
gpr_free(args);
}
-static void rr_on_reresolution_requested_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
- if (glb_policy->shutting_down || error != GRPC_ERROR_NONE) {
- GRPC_LB_POLICY_UNREF(&glb_policy->base,
- "rr_on_reresolution_requested_locked");
- return;
- }
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(
- GPR_DEBUG,
- "[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
- glb_policy, glb_policy->rr_policy);
- }
- // If we are talking to a balancer, we expect to get updated addresses form
- // the balancer, so we can ignore the re-resolution request from the RR
- // policy. Otherwise, handle the re-resolution request using glb's original
- // re-resolution closure.
- if (glb_policy->lb_calld == nullptr ||
- !glb_policy->lb_calld->seen_initial_response) {
- grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace,
- GRPC_ERROR_NONE);
- }
- // Give back the wrapper closure to the RR policy.
- grpc_lb_policy_set_reresolve_closure_locked(
- glb_policy->rr_policy, &glb_policy->rr_on_reresolution_requested);
-}
-
+static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error);
static void create_rr_locked(glb_lb_policy* glb_policy,
grpc_lb_policy_args* args) {
GPR_ASSERT(glb_policy->rr_policy == nullptr);
+
grpc_lb_policy* new_rr_policy = grpc_lb_policy_create("round_robin", args);
if (new_rr_policy == nullptr) {
gpr_log(GPR_ERROR,
@@ -788,25 +757,29 @@ static void create_rr_locked(glb_lb_policy* glb_policy,
glb_policy->rr_policy);
return;
}
- GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_reresolution_requested_locked");
grpc_lb_policy_set_reresolve_closure_locked(
- new_rr_policy, &glb_policy->rr_on_reresolution_requested);
+ new_rr_policy, glb_policy->base.request_reresolution);
+ glb_policy->base.request_reresolution = nullptr;
glb_policy->rr_policy = new_rr_policy;
grpc_error* rr_state_error = nullptr;
glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked(
glb_policy->rr_policy, &rr_state_error);
/* Connectivity state is a function of the RR policy updated/created */
- update_lb_connectivity_status_locked(glb_policy, rr_state_error);
+ update_lb_connectivity_status_locked(
+ glb_policy, glb_policy->rr_connectivity_state, rr_state_error);
/* Add the gRPC LB's interested_parties pollset_set to that of the newly
* created RR policy. This will make the RR policy progress upon activity on
* gRPC LB, which in turn is tied to the application's call */
grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties,
glb_policy->base.interested_parties);
+ GRPC_CLOSURE_INIT(&glb_policy->on_rr_connectivity_changed,
+ on_rr_connectivity_changed_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
/* Subscribe to changes to the connectivity of the new RR */
- GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_connectivity_changed_locked");
+ GRPC_LB_POLICY_REF(&glb_policy->base, "glb_rr_connectivity_cb");
grpc_lb_policy_notify_on_state_change_locked(
glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
- &glb_policy->rr_on_connectivity_changed);
+ &glb_policy->on_rr_connectivity_changed);
grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy);
// Send pending picks to RR policy.
pending_pick* pp;
@@ -854,18 +827,28 @@ static void rr_handover_locked(glb_lb_policy* glb_policy) {
lb_policy_args_destroy(args);
}
-static void rr_on_connectivity_changed_locked(void* arg, grpc_error* error) {
- glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
+static void on_rr_connectivity_changed_locked(void* arg, grpc_error* error) {
+ glb_lb_policy* glb_policy = static_cast<glb_lb_policy*>(arg);
if (glb_policy->shutting_down) {
- GRPC_LB_POLICY_UNREF(&glb_policy->base,
- "rr_on_connectivity_changed_locked");
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
+ return;
+ }
+ if (glb_policy->rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ /* An RR policy that has transitioned into the SHUTDOWN connectivity state
+ * should not be considered for picks or updates: the SHUTDOWN state is a
+ * sink, policies can't transition back from it. .*/
+ GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "rr_connectivity_shutdown");
+ glb_policy->rr_policy = nullptr;
+ GRPC_LB_POLICY_UNREF(&glb_policy->base, "glb_rr_connectivity_cb");
return;
}
- update_lb_connectivity_status_locked(glb_policy, GRPC_ERROR_REF(error));
- // Resubscribe. Reuse the "rr_on_connectivity_changed_locked" ref.
+ /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
+ update_lb_connectivity_status_locked(
+ glb_policy, glb_policy->rr_connectivity_state, GRPC_ERROR_REF(error));
+ /* Resubscribe. Reuse the "glb_rr_connectivity_cb" ref. */
grpc_lb_policy_notify_on_state_change_locked(
glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
- &glb_policy->rr_on_connectivity_changed);
+ &glb_policy->on_rr_connectivity_changed);
}
static void destroy_balancer_name(void* balancer_name) {
@@ -988,6 +971,8 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
if (glb_policy->rr_policy != nullptr) {
grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr);
GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown");
+ } else {
+ grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
}
// We destroy the LB channel here because
// glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
@@ -999,7 +984,6 @@ static void glb_shutdown_locked(grpc_lb_policy* pol,
}
grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "glb_shutdown");
- grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
// Clear pending picks.
pending_pick* pp = glb_policy->pending_picks;
glb_policy->pending_picks = nullptr;
@@ -1639,8 +1623,6 @@ static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
lb_calld, lb_calld->lb_call, grpc_error_string(error));
gpr_free(status_details);
}
- grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace,
- GRPC_ERROR_NONE);
// If this lb_calld is still in use, this call ended because of a failure so
// we want to retry connecting. Otherwise, we have deliberately ended this
// call and no further action is required.
@@ -1669,15 +1651,16 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
glb_policy->fallback_timer_callback_pending = false;
/* If we receive a serverlist after the timer fires but before this callback
* actually runs, don't fall back. */
- if (glb_policy->serverlist == nullptr && !glb_policy->shutting_down &&
- error == GRPC_ERROR_NONE) {
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Falling back to use backends from resolver",
- glb_policy);
+ if (glb_policy->serverlist == nullptr) {
+ if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
+ if (grpc_lb_glb_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Falling back to use backends from resolver",
+ glb_policy);
+ }
+ GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
+ rr_handover_locked(glb_policy);
}
- GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
- rr_handover_locked(glb_policy);
}
GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer");
}
@@ -1798,6 +1781,19 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
}
}
+static void glb_set_reresolve_closure_locked(
+ grpc_lb_policy* policy, grpc_closure* request_reresolution) {
+ glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(policy);
+ GPR_ASSERT(!glb_policy->shutting_down);
+ GPR_ASSERT(glb_policy->base.request_reresolution == nullptr);
+ if (glb_policy->rr_policy != nullptr) {
+ grpc_lb_policy_set_reresolve_closure_locked(glb_policy->rr_policy,
+ request_reresolution);
+ } else {
+ glb_policy->base.request_reresolution = request_reresolution;
+ }
+}
+
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy,
@@ -1809,7 +1805,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_exit_idle_locked,
glb_check_connectivity_locked,
glb_notify_on_state_change_locked,
- glb_update_locked};
+ glb_update_locked,
+ glb_set_reresolve_closure_locked};
static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
grpc_lb_policy_args* args) {
@@ -1832,9 +1829,9 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
/* Get server name. */
arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
- GPR_ASSERT(arg != nullptr);
- GPR_ASSERT(arg->type == GRPC_ARG_STRING);
- grpc_uri* uri = grpc_uri_parse(arg->value.string, true);
+ const char* server_uri = grpc_channel_arg_get_string(arg);
+ GPR_ASSERT(server_uri != nullptr);
+ grpc_uri* uri = grpc_uri_parse(server_uri, true);
GPR_ASSERT(uri->path[0] != '\0');
glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
@@ -1890,12 +1887,6 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
return nullptr;
}
grpc_subchannel_index_ref();
- GRPC_CLOSURE_INIT(&glb_policy->rr_on_connectivity_changed,
- rr_on_connectivity_changed_locked, glb_policy,
- grpc_combiner_scheduler(args->combiner));
- GRPC_CLOSURE_INIT(&glb_policy->rr_on_reresolution_requested,
- rr_on_reresolution_requested_locked, glb_policy,
- grpc_combiner_scheduler(args->combiner));
GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy,
grpc_combiner_scheduler(args->combiner));
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 296bdcb247..1485f7caf5 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -520,6 +520,14 @@ static void pf_connectivity_changed_locked(void* arg, grpc_error* error) {
}
}
+static void pf_set_reresolve_closure_locked(
+ grpc_lb_policy* policy, grpc_closure* request_reresolution) {
+ pick_first_lb_policy* p = reinterpret_cast<pick_first_lb_policy*>(policy);
+ GPR_ASSERT(!p->shutdown);
+ GPR_ASSERT(policy->request_reresolution == nullptr);
+ policy->request_reresolution = request_reresolution;
+}
+
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy,
pf_shutdown_locked,
@@ -530,7 +538,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_exit_idle_locked,
pf_check_connectivity_locked,
pf_notify_on_state_change_locked,
- pf_update_locked};
+ pf_update_locked,
+ pf_set_reresolve_closure_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index b5b4c44ef1..cefd0d8d7d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -620,6 +620,14 @@ static void rr_update_locked(grpc_lb_policy* policy,
}
}
+static void rr_set_reresolve_closure_locked(
+ grpc_lb_policy* policy, grpc_closure* request_reresolution) {
+ round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(policy);
+ GPR_ASSERT(!p->shutdown);
+ GPR_ASSERT(policy->request_reresolution == nullptr);
+ policy->request_reresolution = request_reresolution;
+}
+
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy,
rr_shutdown_locked,
@@ -630,7 +638,8 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_exit_idle_locked,
rr_check_connectivity_locked,
rr_notify_on_state_change_locked,
- rr_update_locked};
+ rr_update_locked,
+ rr_set_reresolve_closure_locked};
static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {}
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 179e3f27ac..fbe07c58f7 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -728,9 +728,9 @@ void grpc_get_subchannel_address_arg(const grpc_channel_args* args,
const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) {
const grpc_arg* addr_arg =
grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
- GPR_ASSERT(addr_arg != nullptr); // Should have been set by LB policy.
- GPR_ASSERT(addr_arg->type == GRPC_ARG_STRING);
- return addr_arg->value.string;
+ const char* addr_str = grpc_channel_arg_get_string(addr_arg);
+ GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy.
+ return addr_str;
}
grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc
index 49d9ae60ae..0fb3935609 100644
--- a/src/core/ext/filters/message_size/message_size_filter.cc
+++ b/src/core/ext/filters/message_size/message_size_filter.cc
@@ -249,10 +249,10 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem,
// Get method config table from channel args.
const grpc_arg* channel_arg =
grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG);
- if (channel_arg != nullptr) {
- GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+ const char* service_config_str = grpc_channel_arg_get_string(channel_arg);
+ if (service_config_str != nullptr) {
grpc_service_config* service_config =
- grpc_service_config_create(channel_arg->value.string);
+ grpc_service_config_create(service_config_str);
if (service_config != nullptr) {
chand->method_limit_table =
grpc_service_config_create_method_config_table(
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
index 8c4025ed13..8f896f70b4 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
@@ -63,9 +63,7 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
// To which address are we connecting? By default, use the server URI.
const grpc_arg* server_uri_arg =
grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
- GPR_ASSERT(server_uri_arg != nullptr);
- GPR_ASSERT(server_uri_arg->type == GRPC_ARG_STRING);
- const char* server_uri_str = server_uri_arg->value.string;
+ const char* server_uri_str = grpc_channel_arg_get_string(server_uri_arg);
GPR_ASSERT(server_uri_str != nullptr);
grpc_uri* server_uri =
grpc_uri_parse(server_uri_str, true /* supress errors */);
diff --git a/src/core/lib/channel/channel_args.cc b/src/core/lib/channel/channel_args.cc
index 98fdfa6b9a..66a86c2286 100644
--- a/src/core/lib/channel/channel_args.cc
+++ b/src/core/lib/channel/channel_args.cc
@@ -354,6 +354,15 @@ int grpc_channel_arg_get_integer(const grpc_arg* arg,
return arg->value.integer;
}
+char* grpc_channel_arg_get_string(const grpc_arg* arg) {
+ if (arg == nullptr) return nullptr;
+ if (arg->type != GRPC_ARG_STRING) {
+ gpr_log(GPR_ERROR, "%s ignored: it must be an string", arg->key);
+ return nullptr;
+ }
+ return arg->value.string;
+}
+
bool grpc_channel_arg_get_bool(const grpc_arg* arg, bool default_value) {
if (arg == nullptr) return default_value;
if (arg->type != GRPC_ARG_INTEGER) {
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 73e9122e75..c59bd0c7e7 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -109,6 +109,11 @@ typedef struct grpc_integer_options {
int grpc_channel_arg_get_integer(const grpc_arg* arg,
const grpc_integer_options options);
+/** Returns the value of \a arg if \a arg is of type GRPC_ARG_STRING.
+ Otherwise, emits a warning log, and returns nullptr.
+ If arg is nullptr, returns nullptr, and does not emit a warning. */
+char* grpc_channel_arg_get_string(const grpc_arg* arg);
+
bool grpc_channel_arg_get_bool(const grpc_arg* arg, bool default_value);
// Helpers for creating channel args.
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 3ad94a4ecd..bb7622c4cc 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -57,7 +57,7 @@
//#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1
#define MAX_EPOLL_EVENTS 100
-#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 1
+#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16
grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false,
"pollable_refcount");
@@ -196,6 +196,7 @@ struct grpc_pollset_worker {
struct grpc_pollset {
gpr_mu mu;
+ gpr_atm worker_count;
pollable* active_pollable;
bool kicked_without_poller;
grpc_closure* shutdown_closure;
@@ -683,6 +684,7 @@ static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
gpr_mu_init(&pollset->mu);
+ gpr_atm_no_barrier_store(&pollset->worker_count, 0);
pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset");
pollset->kicked_without_poller = false;
pollset->shutdown_closure = nullptr;
@@ -756,8 +758,20 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset,
pollable* pollable_obj, bool drain) {
GPR_TIMER_SCOPE("pollable_process_events", 0);
static const char* err_desc = "pollset_process_events";
+ // Use a simple heuristic to determine how many fd events to process
+ // per loop iteration. (events/workers)
+ int handle_count = 1;
+ int worker_count = gpr_atm_no_barrier_load(&pollset->worker_count);
+ GPR_ASSERT(worker_count > 0);
+ handle_count =
+ (pollable_obj->event_count - pollable_obj->event_cursor) / worker_count;
+ if (handle_count == 0) {
+ handle_count = 1;
+ } else if (handle_count > MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) {
+ handle_count = MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL;
+ }
grpc_error* error = GRPC_ERROR_NONE;
- for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) &&
+ for (int i = 0; (drain || i < handle_count) &&
pollable_obj->event_cursor != pollable_obj->event_count;
i++) {
int n = pollable_obj->event_cursor++;
@@ -882,6 +896,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
GPR_TIMER_SCOPE("begin_worker", 0);
bool do_poll =
(pollset->shutdown_closure == nullptr && !pollset->already_shutdown);
+ gpr_atm_no_barrier_fetch_add(&pollset->worker_count, 1);
if (worker_hdl != nullptr) *worker_hdl = worker;
worker->initialized_cv = false;
worker->kicked = false;
@@ -962,6 +977,7 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
+ gpr_atm_no_barrier_fetch_add(&pollset->worker_count, -1);
}
#ifndef NDEBUG
diff --git a/src/core/lib/security/credentials/fake/fake_credentials.cc b/src/core/lib/security/credentials/fake/fake_credentials.cc
index 3b29db2efa..fa0f89c583 100644
--- a/src/core/lib/security/credentials/fake/fake_credentials.cc
+++ b/src/core/lib/security/credentials/fake/fake_credentials.cc
@@ -87,11 +87,7 @@ const char* grpc_fake_transport_get_expected_targets(
const grpc_channel_args* args) {
const grpc_arg* expected_target_arg =
grpc_channel_args_find(args, GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS);
- if (expected_target_arg != nullptr &&
- expected_target_arg->type == GRPC_ARG_STRING) {
- return expected_target_arg->value.string;
- }
- return nullptr;
+ return grpc_channel_arg_get_string(expected_target_arg);
}
/* -- Metadata-only test credentials. -- */
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index 79793a710e..7fa7303691 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -173,7 +173,8 @@ class Future(six.with_metaclass(abc.ABCMeta)):
"""Adds a function to be called at completion of the computation.
The callback will be passed this Future object describing the outcome
- of the computation.
+ of the computation. Callbacks will be invoked after the future is
+ terimated, whether successfully or not.
If the computation has already completed, the callback will be called
immediately.