aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/census/grpc_filter.c4
-rw-r--r--src/core/ext/client_channel/client_channel.c81
-rw-r--r--src/core/ext/client_channel/lb_policy.c89
-rw-r--r--src/core/ext/client_channel/lb_policy.h85
-rw-r--r--src/core/ext/client_channel/lb_policy_factory.h1
-rw-r--r--src/core/ext/client_channel/resolver.c25
-rw-r--r--src/core/ext/client_channel/resolver.h35
-rw-r--r--src/core/ext/client_channel/resolver_factory.h1
-rw-r--r--src/core/ext/client_channel/resolver_registry.c4
-rw-r--r--src/core/ext/client_channel/resolver_registry.h3
-rw-r--r--src/core/ext/client_channel/subchannel.c2
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c170
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c181
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c100
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c2
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c59
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c44
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c7
-rw-r--r--src/core/lib/channel/channel_stack.c15
-rw-r--r--src/core/lib/channel/channel_stack.h2
-rw-r--r--src/core/lib/channel/compress_filter.c2
-rw-r--r--src/core/lib/channel/connected_channel.c2
-rw-r--r--src/core/lib/channel/deadline_filter.c104
-rw-r--r--src/core/lib/channel/deadline_filter.h12
-rw-r--r--src/core/lib/channel/http_client_filter.c2
-rw-r--r--src/core/lib/channel/http_server_filter.c10
-rw-r--r--src/core/lib/channel/message_size_filter.c2
-rw-r--r--src/core/lib/iomgr/error.h4
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c338
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c5
-rw-r--r--src/core/lib/iomgr/socket_utils_windows.c4
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c14
-rw-r--r--src/core/lib/iomgr/timer_generic.c18
-rw-r--r--src/core/lib/iomgr/timer_generic.h2
-rw-r--r--src/core/lib/iomgr/timer_uv.c12
-rw-r--r--src/core/lib/iomgr/timer_uv.h2
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c2
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c2
-rw-r--r--src/core/lib/surface/call.c145
-rw-r--r--src/core/lib/surface/lame_client.c2
-rw-r--r--src/core/lib/surface/server.c6
-rw-r--r--src/core/plugin_registry/grpc_cronet_plugin_registry.c4
42 files changed, 896 insertions, 708 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 65cfe1fa90..b80d831557 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -127,7 +127,7 @@ static void server_start_transport_op(grpc_exec_ctx *exec_ctx,
static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
@@ -146,7 +146,7 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 62fb061bcf..21ba4301ff 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -183,7 +183,7 @@ typedef struct client_channel_channel_data {
grpc_pollset_set *interested_parties;
/* the following properties are guarded by a mutex since API's require them
- to be instantaniously available */
+ to be instantaneously available */
gpr_mu info_mu;
char *info_lb_policy_name;
/** service config in JSON form */
@@ -200,9 +200,9 @@ typedef struct {
grpc_lb_policy *lb_policy;
} lb_policy_connectivity_watcher;
-static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
- grpc_lb_policy *lb_policy,
- grpc_connectivity_state current_state);
+static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
+ grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state);
static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
channel_data *chand,
@@ -213,7 +213,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
state == GRPC_CHANNEL_SHUTDOWN) &&
chand->lb_policy != NULL) {
/* cancel picks with wait_for_ready=false */
- grpc_lb_policy_cancel_picks(
+ grpc_lb_policy_cancel_picks_locked(
exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
/* check= */ 0, GRPC_ERROR_REF(error));
@@ -230,14 +230,14 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
if (w->lb_policy == w->chand->lb_policy) {
if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) {
publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver);
+ grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
w->chand->lb_policy = NULL;
}
set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
GRPC_ERROR_REF(error), "lb_changed");
if (w->state != GRPC_CHANNEL_SHUTDOWN) {
- watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
+ watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
}
}
@@ -245,9 +245,9 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx,
gpr_free(w);
}
-static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
- grpc_lb_policy *lb_policy,
- grpc_connectivity_state current_state) {
+static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
+ grpc_lb_policy *lb_policy,
+ grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
@@ -256,8 +256,8 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_combiner_scheduler(chand->combiner, false));
w->state = current_state;
w->lb_policy = lb_policy;
- grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
- &w->on_changed);
+ grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
+ &w->on_changed);
}
static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
@@ -313,13 +313,14 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args lb_policy_args;
lb_policy_args.args = chand->resolver_result;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
+ lb_policy_args.combiner = chand->combiner;
lb_policy =
grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "config_change");
GRPC_ERROR_UNREF(state_error);
- state =
- grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
+ state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
+ &state_error);
}
// Find service config.
channel_arg =
@@ -383,14 +384,15 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
set_channel_connectivity_state_locked(
exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
if (lb_policy != NULL) {
- watch_lb_policy(exec_ctx, chand, lb_policy, state);
+ watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
}
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
- &chand->on_resolver_result_changed);
+ grpc_resolver_next_locked(exec_ctx, chand->resolver,
+ &chand->resolver_result,
+ &chand->on_resolver_result_changed);
} else {
if (chand->resolver != NULL) {
- grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
@@ -403,7 +405,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
}
if (exit_idle) {
- grpc_lb_policy_exit_idle(exec_ctx, lb_policy);
+ grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
}
@@ -440,7 +442,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_closure_sched(exec_ctx, op->send_ping,
GRPC_ERROR_CREATE("Ping with no load balancing"));
} else {
- grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
+ grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping);
op->bind_pollset = NULL;
}
op->send_ping = NULL;
@@ -451,7 +453,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
set_channel_connectivity_state_locked(
exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
- grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ grpc_resolver_shutdown_locked(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
if (!chand->started_resolving) {
@@ -550,7 +552,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
chand->resolver = grpc_resolver_create(
exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string,
new_args != NULL ? new_args : args->channel_args,
- chand->interested_parties);
+ chand->interested_parties, chand->combiner);
if (proxy_name != NULL) gpr_free(proxy_name);
if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
if (chand->resolver == NULL) {
@@ -559,13 +561,23 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE;
}
+static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_resolver *resolver = arg;
+ grpc_resolver_shutdown_locked(exec_ctx, resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel");
+}
+
/* Destructor for channel_data */
static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
if (chand->resolver != NULL) {
- grpc_resolver_shutdown(exec_ctx, chand->resolver);
- GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_create(shutdown_resolver_locked, chand->resolver,
+ grpc_combiner_scheduler(chand->combiner, false)),
+ GRPC_ERROR_NONE);
}
if (chand->client_channel_factory != NULL) {
grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
@@ -797,8 +809,9 @@ static bool pick_subchannel_locked(
if (initial_metadata == NULL) {
if (chand->lb_policy != NULL) {
- grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
- connected_subchannel, GRPC_ERROR_REF(error));
+ grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy,
+ connected_subchannel,
+ GRPC_ERROR_REF(error));
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = closure->next_data.next) {
@@ -837,7 +850,7 @@ static bool pick_subchannel_locked(
const grpc_lb_policy_pick_args inputs = {
initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
gpr_inf_future(GPR_CLOCK_MONOTONIC)};
- const bool result = grpc_lb_policy_pick(
+ const bool result = grpc_lb_policy_pick_locked(
exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
@@ -846,8 +859,9 @@ static bool pick_subchannel_locked(
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = true;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
- &chand->on_resolver_result_changed);
+ grpc_resolver_next_locked(exec_ctx, chand->resolver,
+ &chand->resolver_result,
+ &chand->on_resolver_result_changed);
}
if (chand->resolver != NULL) {
cpa = gpr_malloc(sizeof(*cpa));
@@ -1123,7 +1137,7 @@ static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
// Initialize data members.
@@ -1204,14 +1218,15 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_ignored) {
channel_data *chand = arg;
if (chand->lb_policy != NULL) {
- grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
+ grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy);
} else {
chand->exit_idle_when_lb_policy_arrives = true;
if (!chand->started_resolving && chand->resolver != NULL) {
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
chand->started_resolving = true;
- grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
- &chand->on_resolver_result_changed);
+ grpc_resolver_next_locked(exec_ctx, chand->resolver,
+ &chand->resolver_result,
+ &chand->on_resolver_result_changed);
}
}
GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
diff --git a/src/core/ext/client_channel/lb_policy.c b/src/core/ext/client_channel/lb_policy.c
index 90401b586f..aba51add53 100644
--- a/src/core/ext/client_channel/lb_policy.c
+++ b/src/core/ext/client_channel/lb_policy.c
@@ -32,14 +32,17 @@
*/
#include "src/core/ext/client_channel/lb_policy.h"
+#include "src/core/lib/iomgr/combiner.h"
#define WEAK_REF_BITS 16
void grpc_lb_policy_init(grpc_lb_policy *policy,
- const grpc_lb_policy_vtable *vtable) {
+ const grpc_lb_policy_vtable *vtable,
+ grpc_combiner *combiner) {
policy->vtable = vtable;
gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS);
policy->interested_parties = grpc_pollset_set_create();
+ policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy");
}
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@@ -71,6 +74,13 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF"));
}
+static void shutdown_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_lb_policy *policy = arg;
+ policy->vtable->shutdown_locked(exec_ctx, policy);
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, policy, "strong-unref");
+}
+
void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
gpr_atm old_val =
@@ -79,10 +89,15 @@ void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx,
gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1);
gpr_atm check = 1 << WEAK_REF_BITS;
if ((old_val & mask) == check) {
- policy->vtable->shutdown(exec_ctx, policy);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_create(shutdown_locked, policy,
+ grpc_combiner_scheduler(policy->combiner, false)),
+ GRPC_ERROR_NONE);
+ } else {
+ grpc_lb_policy_weak_unref(exec_ctx,
+ policy REF_FUNC_PASS_ARGS("strong-unref"));
}
- grpc_lb_policy_weak_unref(exec_ctx,
- policy REF_FUNC_PASS_ARGS("strong-unref"));
}
void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) {
@@ -95,52 +110,58 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
if (old_val == 1) {
grpc_pollset_set_destroy(exec_ctx, policy->interested_parties);
+ grpc_combiner *combiner = policy->combiner;
policy->vtable->destroy(exec_ctx, policy);
+ GRPC_COMBINER_UNREF(exec_ctx, combiner, "lb_policy");
}
}
-int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- const grpc_lb_policy_pick_args *pick_args,
- grpc_connected_subchannel **target, void **user_data,
- grpc_closure *on_complete) {
- return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data,
- on_complete);
+int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target,
+ void **user_data, grpc_closure *on_complete) {
+ return policy->vtable->pick_locked(exec_ctx, policy, pick_args, target,
+ user_data, on_complete);
}
-void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_connected_subchannel **target,
- grpc_error *error) {
- policy->vtable->cancel_pick(exec_ctx, policy, target, error);
+void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
+ policy->vtable->cancel_pick_locked(exec_ctx, policy, target, error);
}
-void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *policy,
- uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq,
- grpc_error *error) {
- policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
- initial_metadata_flags_eq, error);
+void grpc_lb_policy_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
+ policy->vtable->cancel_picks_locked(exec_ctx, policy,
+ initial_metadata_flags_mask,
+ initial_metadata_flags_eq, error);
}
-void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
- policy->vtable->exit_idle(exec_ctx, policy);
+void grpc_lb_policy_exit_idle_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy) {
+ policy->vtable->exit_idle_locked(exec_ctx, policy);
}
-void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_closure *closure) {
- policy->vtable->ping_one(exec_ctx, policy, closure);
+void grpc_lb_policy_ping_one_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ grpc_closure *closure) {
+ policy->vtable->ping_one_locked(exec_ctx, policy, closure);
}
-void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *policy,
- grpc_connectivity_state *state,
- grpc_closure *closure) {
- policy->vtable->notify_on_state_change(exec_ctx, policy, state, closure);
+void grpc_lb_policy_notify_on_state_change_locked(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_connectivity_state *state, grpc_closure *closure) {
+ policy->vtable->notify_on_state_change_locked(exec_ctx, policy, state,
+ closure);
}
-grpc_connectivity_state grpc_lb_policy_check_connectivity(
+grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_error **connectivity_error) {
- return policy->vtable->check_connectivity(exec_ctx, policy,
- connectivity_error);
+ return policy->vtable->check_connectivity_locked(exec_ctx, policy,
+ connectivity_error);
}
diff --git a/src/core/ext/client_channel/lb_policy.h b/src/core/ext/client_channel/lb_policy.h
index 120c641edc..3405709c2c 100644
--- a/src/core/ext/client_channel/lb_policy.h
+++ b/src/core/ext/client_channel/lb_policy.h
@@ -51,6 +51,8 @@ struct grpc_lb_policy {
gpr_atm ref_pair;
/* owned pointer to interested parties in load balancing decisions */
grpc_pollset_set *interested_parties;
+ /* combiner under which lb_policy actions take place */
+ grpc_combiner *combiner;
};
/** Extra arguments for an LB pick */
@@ -69,42 +71,44 @@ typedef struct grpc_lb_policy_pick_args {
struct grpc_lb_policy_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
- void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+ void (*shutdown_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
/** \see grpc_lb_policy_pick */
- int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- const grpc_lb_policy_pick_args *pick_args,
- grpc_connected_subchannel **target, void **user_data,
- grpc_closure *on_complete);
+ int (*pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
+ grpc_closure *on_complete);
/** \see grpc_lb_policy_cancel_pick */
- void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_connected_subchannel **target, grpc_error *error);
+ void (*cancel_pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_connected_subchannel **target,
+ grpc_error *error);
/** \see grpc_lb_policy_cancel_picks */
- void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq, grpc_error *error);
+ void (*cancel_picks_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error);
/** \see grpc_lb_policy_ping_one */
- void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_closure *closure);
+ void (*ping_one_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_closure *closure);
/** Try to enter a READY connectivity state */
- void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+ void (*exit_idle_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
/** check the current connectivity of the lb_policy */
- grpc_connectivity_state (*check_connectivity)(
+ grpc_connectivity_state (*check_connectivity_locked)(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_error **connectivity_error);
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy. Calling with a NULL \a
state cancels the subscription. */
- void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *policy,
- grpc_connectivity_state *state,
- grpc_closure *closure);
+ void (*notify_on_state_change_locked)(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ grpc_connectivity_state *state,
+ grpc_closure *closure);
};
/*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/
@@ -144,7 +148,8 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
/** called by concrete implementations to initialize the base struct */
void grpc_lb_policy_init(grpc_lb_policy *policy,
- const grpc_lb_policy_vtable *vtable);
+ const grpc_lb_policy_vtable *vtable,
+ grpc_combiner *combiner);
/** Finds an appropriate subchannel for a call, based on \a pick_args.
@@ -159,43 +164,45 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
Any IO should be done under the \a interested_parties \a grpc_pollset_set
in the \a grpc_lb_policy struct. */
-int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- const grpc_lb_policy_pick_args *pick_args,
- grpc_connected_subchannel **target, void **user_data,
- grpc_closure *on_complete);
+int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target,
+ void **user_data, grpc_closure *on_complete);
/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
against one of the connected subchannels managed by \a policy. */
-void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_closure *closure);
+void grpc_lb_policy_ping_one_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ grpc_closure *closure);
/** Cancel picks for \a target.
The \a on_complete callback of the pending picks will be invoked with \a
*target set to NULL. */
-void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_connected_subchannel **target,
- grpc_error *error);
+void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ grpc_connected_subchannel **target,
+ grpc_error *error);
/** Cancel all pending picks for which their \a initial_metadata_flags (as given
in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
when AND'd with \a initial_metadata_flags_mask */
-void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *policy,
- uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq,
- grpc_error *error);
+void grpc_lb_policy_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error);
/** Try to enter a READY connectivity state */
-void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
+void grpc_lb_policy_exit_idle_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy);
/* Call notify when the connectivity state of a channel changes from \a *state.
* Updates \a *state with the new state of the policy */
-void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *policy,
- grpc_connectivity_state *state,
- grpc_closure *closure);
+void grpc_lb_policy_notify_on_state_change_locked(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ grpc_connectivity_state *state, grpc_closure *closure);
-grpc_connectivity_state grpc_lb_policy_check_connectivity(
+grpc_connectivity_state grpc_lb_policy_check_connectivity_locked(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_error **connectivity_error);
diff --git a/src/core/ext/client_channel/lb_policy_factory.h b/src/core/ext/client_channel/lb_policy_factory.h
index 9b8b03f982..27c12c0d73 100644
--- a/src/core/ext/client_channel/lb_policy_factory.h
+++ b/src/core/ext/client_channel/lb_policy_factory.h
@@ -107,6 +107,7 @@ grpc_arg grpc_lb_addresses_create_channel_arg(
typedef struct grpc_lb_policy_args {
grpc_client_channel_factory *client_channel_factory;
grpc_channel_args *args;
+ grpc_combiner *combiner;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {
diff --git a/src/core/ext/client_channel/resolver.c b/src/core/ext/client_channel/resolver.c
index 2ae4fe862e..b1a1faa6c9 100644
--- a/src/core/ext/client_channel/resolver.c
+++ b/src/core/ext/client_channel/resolver.c
@@ -32,10 +32,13 @@
*/
#include "src/core/ext/client_channel/resolver.h"
+#include "src/core/lib/iomgr/combiner.h"
void grpc_resolver_init(grpc_resolver *resolver,
- const grpc_resolver_vtable *vtable) {
+ const grpc_resolver_vtable *vtable,
+ grpc_combiner *combiner) {
resolver->vtable = vtable;
+ resolver->combiner = GRPC_COMBINER_REF(combiner, "resolver");
gpr_ref_init(&resolver->refs, 1);
}
@@ -62,20 +65,24 @@ void grpc_resolver_unref(grpc_resolver *resolver,
void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
#endif
if (gpr_unref(&resolver->refs)) {
+ grpc_combiner *combiner = resolver->combiner;
resolver->vtable->destroy(exec_ctx, resolver);
+ GRPC_COMBINER_UNREF(exec_ctx, combiner, "resolver");
}
}
-void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
- resolver->vtable->shutdown(exec_ctx, resolver);
+void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
+ resolver->vtable->shutdown_locked(exec_ctx, resolver);
}
-void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver) {
- resolver->vtable->channel_saw_error(exec_ctx, resolver);
+void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
+ resolver->vtable->channel_saw_error_locked(exec_ctx, resolver);
}
-void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_channel_args **result, grpc_closure *on_complete) {
- resolver->vtable->next(exec_ctx, resolver, result, on_complete);
+void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_channel_args **result,
+ grpc_closure *on_complete) {
+ resolver->vtable->next_locked(exec_ctx, resolver, result, on_complete);
}
diff --git a/src/core/ext/client_channel/resolver.h b/src/core/ext/client_channel/resolver.h
index 96ece92b9d..bbba424ca5 100644
--- a/src/core/ext/client_channel/resolver.h
+++ b/src/core/ext/client_channel/resolver.h
@@ -44,14 +44,16 @@ typedef struct grpc_resolver_vtable grpc_resolver_vtable;
struct grpc_resolver {
const grpc_resolver_vtable *vtable;
gpr_refcount refs;
+ grpc_combiner *combiner;
};
struct grpc_resolver_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
- void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
- void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
- void (*next)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_channel_args **result, grpc_closure *on_complete);
+ void (*shutdown_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
+ void (*channel_saw_error_locked)(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver);
+ void (*next_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_channel_args **result, grpc_closure *on_complete);
};
#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
@@ -70,21 +72,30 @@ void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *policy);
#endif
void grpc_resolver_init(grpc_resolver *resolver,
- const grpc_resolver_vtable *vtable);
+ const grpc_resolver_vtable *vtable,
+ grpc_combiner *combiner);
-void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
+void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver);
/** Notification that the channel has seen an error on some address.
- Can be used as a hint that re-resolution is desirable soon. */
-void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver);
+ Can be used as a hint that re-resolution is desirable soon.
+
+ Must be called from the combiner passed as a resolver_arg at construction
+ time.*/
+void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver);
/** Get the next result from the resolver. Expected to set \a *result with
new channel args and then schedule \a on_complete for execution.
If resolution is fatally broken, set \a *result to NULL and
- schedule \a on_complete. */
-void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_channel_args **result, grpc_closure *on_complete);
+ schedule \a on_complete.
+
+ Must be called from the combiner passed as a resolver_arg at construction
+ time.*/
+void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_channel_args **result,
+ grpc_closure *on_complete);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H */
diff --git a/src/core/ext/client_channel/resolver_factory.h b/src/core/ext/client_channel/resolver_factory.h
index 3792ddca18..e3cd99ec5a 100644
--- a/src/core/ext/client_channel/resolver_factory.h
+++ b/src/core/ext/client_channel/resolver_factory.h
@@ -50,6 +50,7 @@ typedef struct grpc_resolver_args {
grpc_uri *uri;
const grpc_channel_args *args;
grpc_pollset_set *pollset_set;
+ grpc_combiner *combiner;
} grpc_resolver_args;
struct grpc_resolver_factory_vtable {
diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c
index 5110a7cad9..f8e8bc9c39 100644
--- a/src/core/ext/client_channel/resolver_registry.c
+++ b/src/core/ext/client_channel/resolver_registry.c
@@ -133,7 +133,8 @@ static grpc_resolver_factory *resolve_factory(const char *target,
grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_args *args,
- grpc_pollset_set *pollset_set) {
+ grpc_pollset_set *pollset_set,
+ grpc_combiner *combiner) {
grpc_uri *uri = NULL;
char *canonical_target = NULL;
grpc_resolver_factory *factory =
@@ -144,6 +145,7 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target,
resolver_args.uri = uri;
resolver_args.args = args;
resolver_args.pollset_set = pollset_set;
+ resolver_args.combiner = combiner;
resolver =
grpc_resolver_factory_create_resolver(exec_ctx, factory, &resolver_args);
grpc_uri_destroy(uri);
diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h
index a4606463eb..e2c189cf0c 100644
--- a/src/core/ext/client_channel/resolver_registry.h
+++ b/src/core/ext/client_channel/resolver_registry.h
@@ -65,7 +65,8 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
should not be NULL. */
grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_args *args,
- grpc_pollset_set *pollset_set);
+ grpc_pollset_set *pollset_set,
+ grpc_combiner *combiner);
/** Find a resolver factory given a name and return an (owned-by-the-caller)
* reference to it */
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index 09c68a91dd..f2da148e49 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -438,7 +438,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&w->subchannel->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
gpr_free(w);
- follow_up->cb(exec_ctx, follow_up->cb_arg, error);
+ grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
}
static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 8a2af48328..b5210cb046 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -115,6 +115,7 @@
#include "src/core/ext/lb_policy/grpclb/grpclb_channel.h"
#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
@@ -285,9 +286,6 @@ typedef struct glb_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
- /** mutex protecting remaining members */
- gpr_mu mu;
-
/** who the client is trying to communicate with */
const char *server_name;
grpc_client_channel_factory *cc_factory;
@@ -557,9 +555,9 @@ static bool pick_from_internal_rr_locked(
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
GPR_ASSERT(rr_policy != NULL);
- const bool pick_done =
- grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target,
- (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
+ const bool pick_done = grpc_lb_policy_pick_locked(
+ exec_ctx, rr_policy, pick_args, target, (void **)&wc_arg->lb_token,
+ &wc_arg->wrapper_closure);
if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (grpc_lb_glb_trace) {
@@ -590,6 +588,7 @@ static grpc_lb_policy *create_rr_locked(
grpc_lb_policy_args args;
memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory;
+ args.combiner = glb_policy->base.combiner;
grpc_lb_addresses *addresses =
process_serverlist_locked(exec_ctx, serverlist);
@@ -608,8 +607,8 @@ static grpc_lb_policy *create_rr_locked(
return rr;
}
-static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
+static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
+ void *arg, grpc_error *error);
/* glb_policy->rr_policy may be NULL (initial handover) */
static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) {
@@ -633,8 +632,8 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
grpc_error *new_rr_state_error = NULL;
const grpc_connectivity_state new_rr_state =
- grpc_lb_policy_check_connectivity(exec_ctx, new_rr_policy,
- &new_rr_state_error);
+ grpc_lb_policy_check_connectivity_locked(exec_ctx, new_rr_policy,
+ &new_rr_state_error);
/* Connectivity state is a function of the new RR policy just created */
const bool replace_old_rr = update_lb_connectivity_status_locked(
exec_ctx, glb_policy, new_rr_state, new_rr_state_error);
@@ -677,17 +676,18 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
rr_connectivity_data *rr_connectivity =
gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
- grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
- rr_connectivity, grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&rr_connectivity->on_change,
+ glb_rr_connectivity_changed_locked, rr_connectivity,
+ grpc_combiner_scheduler(glb_policy->base.combiner, false));
rr_connectivity->glb_policy = glb_policy;
rr_connectivity->state = new_rr_state;
/* Subscribe to changes to the connectivity of the new RR */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb");
- grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
- &rr_connectivity->state,
- &rr_connectivity->on_change);
- grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
+ grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
+ &rr_connectivity->state,
+ &rr_connectivity->on_change);
+ grpc_lb_policy_exit_idle_locked(exec_ctx, glb_policy->rr_policy);
/* Update picks and pings in wait */
pending_pick *pp;
@@ -713,17 +713,16 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
- grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
- &pping->wrapped_notify_arg.wrapper_closure);
+ grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy,
+ &pping->wrapped_notify_arg.wrapper_closure);
}
}
-static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
+ void *arg, grpc_error *error) {
rr_connectivity_data *rr_connectivity = arg;
glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
- gpr_mu_lock(&glb_policy->mu);
const bool shutting_down = glb_policy->shutting_down;
bool unref_needed = false;
GRPC_ERROR_REF(error);
@@ -740,11 +739,10 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
update_lb_connectivity_status_locked(exec_ctx, glb_policy,
rr_connectivity->state, error);
/* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
- grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
- &rr_connectivity->state,
- &rr_connectivity->on_change);
+ grpc_lb_policy_notify_on_state_change_locked(
+ exec_ctx, glb_policy->rr_policy, &rr_connectivity->state,
+ &rr_connectivity->on_change);
}
- gpr_mu_unlock(&glb_policy->mu);
if (unref_needed) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"rr_connectivity_cb");
@@ -899,8 +897,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
gpr_free(glb_policy);
return NULL;
}
- grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable);
- gpr_mu_init(&glb_policy->mu);
+ grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb");
return &glb_policy->base;
@@ -918,13 +915,11 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (glb_policy->serverlist != NULL) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
- gpr_mu_destroy(&glb_policy->mu);
gpr_free(glb_policy);
}
-static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
- gpr_mu_lock(&glb_policy->mu);
glb_policy->shutting_down = true;
pending_pick *pp = glb_policy->pending_picks;
@@ -941,7 +936,6 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
* while holding glb_policy->mu: lb_on_server_status_received, invoked due to
* the cancel, needs to acquire that same lock */
grpc_call *lb_call = glb_policy->lb_call;
- gpr_mu_unlock(&glb_policy->mu);
/* glb_policy->lb_call and this local lb_call must be consistent at this point
* because glb_policy->lb_call is only assigned in lb_call_init_locked as part
@@ -967,11 +961,10 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
}
-static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_connected_subchannel **target,
- grpc_error *error) {
+static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
- gpr_mu_lock(&glb_policy->mu);
pending_pick *pp = glb_policy->pending_picks;
glb_policy->pending_picks = NULL;
while (pp != NULL) {
@@ -987,16 +980,15 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
pp = next;
}
- gpr_mu_unlock(&glb_policy->mu);
GRPC_ERROR_UNREF(error);
}
-static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq,
- grpc_error *error) {
+static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
- gpr_mu_lock(&glb_policy->mu);
pending_pick *pp = glb_policy->pending_picks;
glb_policy->pending_picks = NULL;
while (pp != NULL) {
@@ -1012,7 +1004,6 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
pp = next;
}
- gpr_mu_unlock(&glb_policy->mu);
GRPC_ERROR_UNREF(error);
}
@@ -1025,19 +1016,17 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx,
query_for_backends_locked(exec_ctx, glb_policy);
}
-static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
- gpr_mu_lock(&glb_policy->mu);
if (!glb_policy->started_picking) {
start_picking_locked(exec_ctx, glb_policy);
}
- gpr_mu_unlock(&glb_policy->mu);
}
-static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- const grpc_lb_policy_pick_args *pick_args,
- grpc_connected_subchannel **target, void **user_data,
- grpc_closure *on_complete) {
+static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
+ grpc_closure *on_complete) {
if (pick_args->lb_token_mdelem_storage == NULL) {
*target = NULL;
grpc_closure_sched(
@@ -1048,7 +1037,6 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
- gpr_mu_lock(&glb_policy->mu);
glb_policy->deadline = pick_args->deadline;
bool pick_done;
@@ -1087,53 +1075,43 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
pick_done = false;
}
- gpr_mu_unlock(&glb_policy->mu);
return pick_done;
}
-static grpc_connectivity_state glb_check_connectivity(
+static grpc_connectivity_state glb_check_connectivity_locked(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_error **connectivity_error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
- grpc_connectivity_state st;
- gpr_mu_lock(&glb_policy->mu);
- st = grpc_connectivity_state_get(&glb_policy->state_tracker,
- connectivity_error);
- gpr_mu_unlock(&glb_policy->mu);
- return st;
+ return grpc_connectivity_state_get(&glb_policy->state_tracker,
+ connectivity_error);
}
-static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_closure *closure) {
+static void glb_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_closure *closure) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
- gpr_mu_lock(&glb_policy->mu);
if (glb_policy->rr_policy) {
- grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure);
+ grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure);
} else {
add_pending_ping(&glb_policy->pending_pings, closure);
if (!glb_policy->started_picking) {
start_picking_locked(exec_ctx, glb_policy);
}
}
- gpr_mu_unlock(&glb_policy->mu);
}
-static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
- grpc_connectivity_state *current,
- grpc_closure *notify) {
+static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *pol,
+ grpc_connectivity_state *current,
+ grpc_closure *notify) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
- gpr_mu_lock(&glb_policy->mu);
grpc_connectivity_state_notify_on_state_change(
exec_ctx, &glb_policy->state_tracker, current, notify);
-
- gpr_mu_unlock(&glb_policy->mu);
}
-static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
-static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
+static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
+ void *arg, grpc_error *error);
+static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->server_name != NULL);
@@ -1162,11 +1140,11 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_grpclb_request_destroy(request);
grpc_closure_init(&glb_policy->lb_on_server_status_received,
- lb_on_server_status_received, glb_policy,
- grpc_schedule_on_exec_ctx);
+ lb_on_server_status_received_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner, false));
grpc_closure_init(&glb_policy->lb_on_response_received,
- lb_on_response_received, glb_policy,
- grpc_schedule_on_exec_ctx);
+ lb_on_response_received_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner, false));
gpr_backoff_init(&glb_policy->lb_call_backoff_state,
GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
@@ -1261,14 +1239,13 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
-static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
glb_lb_policy *glb_policy = arg;
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
- gpr_mu_lock(&glb_policy->mu);
if (glb_policy->lb_response_payload != NULL) {
gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
/* Received data from the LB server. Look inside
@@ -1342,20 +1319,17 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
&glb_policy->lb_on_response_received); /* loop */
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
- gpr_mu_unlock(&glb_policy->mu);
} else { /* empty payload: call cancelled. */
/* dispose of the "lb_on_response_received" weak ref taken in
* query_for_backends_locked() and reused in every reception loop */
- gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"lb_on_response_received_empty_payload");
}
}
-static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
glb_lb_policy *glb_policy = arg;
- gpr_mu_lock(&glb_policy->mu);
if (!glb_policy->shutting_down) {
if (grpc_lb_glb_trace) {
@@ -1365,15 +1339,13 @@ static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(glb_policy->lb_call == NULL);
query_for_backends_locked(exec_ctx, glb_policy);
}
- gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"grpclb_on_retry_timer");
}
-static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
+ void *arg, grpc_error *error) {
glb_lb_policy *glb_policy = arg;
- gpr_mu_lock(&glb_policy->mu);
GPR_ASSERT(glb_policy->lb_call != NULL);
@@ -1408,21 +1380,27 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
}
}
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
- grpc_closure_init(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer,
- glb_policy, grpc_schedule_on_exec_ctx);
+ grpc_closure_init(
+ &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked,
+ glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false));
grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
&glb_policy->lb_on_call_retry, now);
}
- gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"lb_on_server_status_received");
}
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
- glb_destroy, glb_shutdown, glb_pick,
- glb_cancel_pick, glb_cancel_picks, glb_ping_one,
- glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change};
+ glb_destroy,
+ glb_shutdown_locked,
+ glb_pick_locked,
+ glb_cancel_pick_locked,
+ glb_cancel_picks_locked,
+ glb_ping_one_locked,
+ glb_exit_idle_locked,
+ glb_check_connectivity_locked,
+ glb_notify_on_state_change_locked};
static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 1b965183f6..501cb6d94d 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -38,6 +38,7 @@
#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -57,11 +58,11 @@ typedef struct {
grpc_closure connectivity_changed;
- /** the selected channel (a grpc_connected_subchannel) */
- gpr_atm selected;
+ /** remaining members are protected by the combiner */
+
+ /** the selected channel */
+ grpc_connected_subchannel *selected;
- /** mutex protecting remaining members */
- gpr_mu mu;
/** have we started picking? */
int started_picking;
/** are we shut down? */
@@ -77,32 +78,24 @@ typedef struct {
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
-#define GET_SELECTED(p) \
- ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected))
-
static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connected_subchannel *selected = GET_SELECTED(p);
size_t i;
GPR_ASSERT(p->pending_picks == NULL);
for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
}
- if (selected != NULL) {
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "picked_first");
+ if (p->selected != NULL) {
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
- gpr_mu_destroy(&p->mu);
gpr_free(p);
}
-static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
- grpc_connected_subchannel *selected;
- gpr_mu_lock(&p->mu);
- selected = GET_SELECTED(p);
p->shutdown = 1;
pp = p->pending_picks;
p->pending_picks = NULL;
@@ -110,15 +103,14 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE("Channel shutdown"), "shutdown");
/* cancel subscription */
- if (selected != NULL) {
+ if (p->selected != NULL) {
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, selected, NULL, NULL, &p->connectivity_changed);
+ exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
} else if (p->num_subchannels > 0) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
&p->connectivity_changed);
}
- gpr_mu_unlock(&p->mu);
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
@@ -128,12 +120,11 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
}
-static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_connected_subchannel **target,
- grpc_error *error) {
+static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
- gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
@@ -150,17 +141,15 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
pp = next;
}
- gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
-static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq,
- grpc_error *error) {
+static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
- gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
@@ -177,7 +166,6 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
pp = next;
}
- gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
@@ -192,63 +180,48 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
&p->connectivity_changed);
}
-static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- gpr_mu_lock(&p->mu);
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- gpr_mu_unlock(&p->mu);
}
-static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- const grpc_lb_policy_pick_args *pick_args,
- grpc_connected_subchannel **target, void **user_data,
- grpc_closure *on_complete) {
+static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
+ grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
/* Check atomically for a selected channel */
- grpc_connected_subchannel *selected = GET_SELECTED(p);
- if (selected != NULL) {
- *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
+ if (p->selected != NULL) {
+ *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
return 1;
}
- /* No subchannel selected yet, so acquire lock and then attempt again */
- gpr_mu_lock(&p->mu);
- selected = GET_SELECTED(p);
- if (selected) {
- gpr_mu_unlock(&p->mu);
- *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
- return 1;
- } else {
- if (!p->started_picking) {
- start_picking(exec_ctx, p);
- }
- pp = gpr_malloc(sizeof(*pp));
- pp->next = p->pending_picks;
- pp->target = target;
- pp->initial_metadata_flags = pick_args->initial_metadata_flags;
- pp->on_complete = on_complete;
- p->pending_picks = pp;
- gpr_mu_unlock(&p->mu);
- return 0;
+ /* No subchannel selected yet, so try again */
+ if (!p->started_picking) {
+ start_picking(exec_ctx, p);
}
+ pp = gpr_malloc(sizeof(*pp));
+ pp->next = p->pending_picks;
+ pp->target = target;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->on_complete = on_complete;
+ p->pending_picks = pp;
+ return 0;
}
-static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- pick_first_lb_policy *p = arg;
+static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx,
+ pick_first_lb_policy *p) {
size_t i;
size_t num_subchannels = p->num_subchannels;
grpc_subchannel **subchannels;
- gpr_mu_lock(&p->mu);
subchannels = p->subchannels;
p->num_subchannels = 0;
p->subchannels = NULL;
- gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels");
for (i = 0; i < num_subchannels; i++) {
@@ -258,25 +231,19 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(subchannels);
}
-static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
pick_first_lb_policy *p = arg;
grpc_subchannel *selected_subchannel;
pending_pick *pp;
- grpc_connected_subchannel *selected;
GRPC_ERROR_REF(error);
- gpr_mu_lock(&p->mu);
-
- selected = GET_SELECTED(p);
-
if (p->shutdown) {
- gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
GRPC_ERROR_UNREF(error);
return;
- } else if (selected != NULL) {
+ } else if (p->selected != NULL) {
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
/* if the selected channel goes bad, we're done */
p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN;
@@ -286,7 +253,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) {
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, selected, p->base.interested_parties,
+ exec_ctx, p->selected, p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
} else {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
@@ -301,26 +268,21 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
"connecting_ready");
selected_subchannel = p->subchannels[p->checking_subchannel];
- selected =
- grpc_subchannel_get_connected_subchannel(selected_subchannel);
- GPR_ASSERT(selected != NULL);
- GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first");
+ p->selected = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(selected_subchannel),
+ "picked_first");
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
- gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
- grpc_closure_sched(exec_ctx,
- grpc_closure_create(destroy_subchannels, p,
- grpc_schedule_on_exec_ctx),
- GRPC_ERROR_NONE);
+ destroy_subchannels_locked(exec_ctx, p);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
- *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked");
+ *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, selected, p->base.interested_parties,
+ exec_ctx, p->selected, p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
@@ -387,48 +349,44 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
}
- gpr_mu_unlock(&p->mu);
-
GRPC_ERROR_UNREF(error);
}
-static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
- grpc_error **error) {
+static grpc_connectivity_state pf_check_connectivity_locked(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connectivity_state st;
- gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_get(&p->state_tracker, error);
- gpr_mu_unlock(&p->mu);
- return st;
+ return grpc_connectivity_state_get(&p->state_tracker, error);
}
-static void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
- grpc_connectivity_state *current,
- grpc_closure *notify) {
+static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *pol,
+ grpc_connectivity_state *current,
+ grpc_closure *notify) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- gpr_mu_lock(&p->mu);
grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
current, notify);
- gpr_mu_unlock(&p->mu);
}
-static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_closure *closure) {
+static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_closure *closure) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connected_subchannel *selected = GET_SELECTED(p);
- if (selected) {
- grpc_connected_subchannel_ping(exec_ctx, selected, closure);
+ if (p->selected) {
+ grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
} else {
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"));
}
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_destroy, pf_shutdown, pf_pick,
- pf_cancel_pick, pf_cancel_picks, pf_ping_one,
- pf_exit_idle, pf_check_connectivity, pf_notify_on_state_change};
+ pf_destroy,
+ pf_shutdown_locked,
+ pf_pick_locked,
+ pf_cancel_pick_locked,
+ pf_cancel_picks_locked,
+ pf_ping_one_locked,
+ pf_exit_idle_locked,
+ pf_check_connectivity_locked,
+ pf_notify_on_state_change_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -489,10 +447,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
}
p->num_subchannels = subchannel_idx;
- grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
- grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p,
- grpc_schedule_on_exec_ctx);
- gpr_mu_init(&p->mu);
+ grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
+ grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p,
+ grpc_combiner_scheduler(args->combiner, false));
return &p->base;
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 63e3d033ad..687df170ad 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -67,6 +67,7 @@
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -134,7 +135,6 @@ typedef struct {
struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
- gpr_mu mu;
/** total number of addresses received at creation time */
size_t num_addresses;
@@ -293,7 +293,6 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
- gpr_mu_destroy(&p->mu);
elem = p->ready_list.next;
while (elem != NULL && elem != &p->ready_list) {
@@ -309,12 +308,11 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(p);
}
-static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
size_t i;
- gpr_mu_lock(&p->mu);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol);
}
@@ -335,15 +333,13 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
&sd->connectivity_changed_closure);
}
- gpr_mu_unlock(&p->mu);
}
-static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_connected_subchannel **target,
- grpc_error *error) {
+static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
- gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
@@ -360,17 +356,15 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
pp = next;
}
- gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
-static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq,
- grpc_error *error) {
+static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
- gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
@@ -388,11 +382,11 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
pp = next;
}
- gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
-static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
+static void start_picking_locked(grpc_exec_ctx *exec_ctx,
+ round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
@@ -411,23 +405,20 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
}
}
-static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
+static void rr_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- gpr_mu_lock(&p->mu);
if (!p->started_picking) {
- start_picking(exec_ctx, p);
+ start_picking_locked(exec_ctx, p);
}
- gpr_mu_unlock(&p->mu);
}
-static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- const grpc_lb_policy_pick_args *pick_args,
- grpc_connected_subchannel **target, void **user_data,
- grpc_closure *on_complete) {
+static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
+ grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
ready_list *selected;
- gpr_mu_lock(&p->mu);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol);
@@ -449,12 +440,11 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
/* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p);
- gpr_mu_unlock(&p->mu);
return 1;
} else {
/* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
- start_picking(exec_ctx, p);
+ start_picking_locked(exec_ctx, p);
}
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
@@ -463,7 +453,6 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->user_data = user_data;
p->pending_picks = pp;
- gpr_mu_unlock(&p->mu);
return 0;
}
}
@@ -538,17 +527,15 @@ static grpc_connectivity_state update_lb_connectivity_status(
return sd->curr_connectivity_state;
}
-static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy;
pending_pick *pp;
GRPC_ERROR_REF(error);
- gpr_mu_lock(&p->mu);
if (p->shutdown) {
- gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
GRPC_ERROR_UNREF(error);
return;
@@ -645,56 +632,51 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
break;
}
- gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
-static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
- grpc_error **error) {
+static grpc_connectivity_state rr_check_connectivity_locked(
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- grpc_connectivity_state st;
- gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_get(&p->state_tracker, error);
- gpr_mu_unlock(&p->mu);
- return st;
+ return grpc_connectivity_state_get(&p->state_tracker, error);
}
-static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol,
- grpc_connectivity_state *current,
- grpc_closure *notify) {
+static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *pol,
+ grpc_connectivity_state *current,
+ grpc_closure *notify) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- gpr_mu_lock(&p->mu);
grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
current, notify);
- gpr_mu_unlock(&p->mu);
}
-static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_closure *closure) {
+static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ grpc_closure *closure) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
ready_list *selected;
grpc_connected_subchannel *target;
- gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
- gpr_mu_unlock(&p->mu);
target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel),
"rr_picked");
grpc_connected_subchannel_ping(exec_ctx, target, closure);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked");
} else {
- gpr_mu_unlock(&p->mu);
grpc_closure_sched(exec_ctx, closure,
GRPC_ERROR_CREATE("Round Robin not connected"));
}
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
- rr_destroy, rr_shutdown, rr_pick,
- rr_cancel_pick, rr_cancel_picks, rr_ping_one,
- rr_exit_idle, rr_check_connectivity, rr_notify_on_state_change};
+ rr_destroy,
+ rr_shutdown_locked,
+ rr_pick_locked,
+ rr_cancel_pick_locked,
+ rr_cancel_picks_locked,
+ rr_ping_one_locked,
+ rr_exit_idle_locked,
+ rr_check_connectivity_locked,
+ rr_notify_on_state_change_locked};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -762,7 +744,8 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
}
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
- rr_connectivity_changed, sd, grpc_schedule_on_exec_ctx);
+ rr_connectivity_changed_locked, sd,
+ grpc_combiner_scheduler(args->combiner, false));
}
}
if (subchannel_idx == 0) {
@@ -779,7 +762,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
p->ready_list.next = NULL;
p->ready_list_last_pick = &p->ready_list;
- grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable);
+ grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
@@ -787,7 +770,6 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels",
(void *)p, (unsigned long)p->num_subchannels);
}
- gpr_mu_init(&p->mu);
return &p->base;
}
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index 7fc88db603..c6386a8942 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -100,7 +100,7 @@ static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data,
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(call_data));
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index c08b53ea04..96ac521a91 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -40,6 +40,7 @@
#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/backoff.h"
@@ -63,8 +64,6 @@ typedef struct {
/** pollset_set to drive the name resolution process */
grpc_pollset_set *interested_parties;
- /** mutex guarding the rest of the state */
- gpr_mu mu;
/** are we currently resolving? */
bool resolving;
/** which version of the result have we published? */
@@ -95,18 +94,20 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
dns_resolver *r);
-static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
-static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
-static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
- grpc_channel_args **target_result,
- grpc_closure *on_complete);
+static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *r);
+static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
+ grpc_channel_args **target_result,
+ grpc_closure *on_complete);
static const grpc_resolver_vtable dns_resolver_vtable = {
- dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next};
+ dns_destroy, dns_shutdown_locked, dns_channel_saw_error_locked,
+ dns_next_locked};
-static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
+static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
dns_resolver *r = (dns_resolver *)resolver;
- gpr_mu_lock(&r->mu);
if (r->have_retry_timer) {
grpc_timer_cancel(exec_ctx, &r->retry_timer);
}
@@ -116,25 +117,21 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
GRPC_ERROR_CREATE("Resolver Shutdown"));
r->next_completion = NULL;
}
- gpr_mu_unlock(&r->mu);
}
-static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver) {
+static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
dns_resolver *r = (dns_resolver *)resolver;
- gpr_mu_lock(&r->mu);
if (!r->resolving) {
gpr_backoff_reset(&r->backoff_state);
dns_start_resolving_locked(exec_ctx, r);
}
- gpr_mu_unlock(&r->mu);
}
-static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_channel_args **target_result,
- grpc_closure *on_complete) {
+static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
+ grpc_channel_args **target_result,
+ grpc_closure *on_complete) {
dns_resolver *r = (dns_resolver *)resolver;
- gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
r->target_result = target_result;
@@ -144,30 +141,26 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
} else {
dns_maybe_finish_next_locked(exec_ctx, r);
}
- gpr_mu_unlock(&r->mu);
}
-static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
dns_resolver *r = arg;
- gpr_mu_lock(&r->mu);
r->have_retry_timer = false;
if (error == GRPC_ERROR_NONE) {
if (!r->resolving) {
dns_start_resolving_locked(exec_ctx, r);
}
}
- gpr_mu_unlock(&r->mu);
GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer");
}
-static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
dns_resolver *r = arg;
grpc_channel_args *result = NULL;
- gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
r->resolving = false;
if (r->addresses != NULL) {
@@ -198,8 +191,8 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
} else {
gpr_log(GPR_DEBUG, "retrying immediately");
}
- grpc_closure_init(&r->on_retry, dns_on_retry_timer, r,
- grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r,
+ grpc_combiner_scheduler(r->base.combiner, false));
grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now);
}
if (r->resolved_result != NULL) {
@@ -208,7 +201,6 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
r->resolved_result = result;
r->resolved_version++;
dns_maybe_finish_next_locked(exec_ctx, r);
- gpr_mu_unlock(&r->mu);
GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving");
}
@@ -221,7 +213,8 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
r->addresses = NULL;
grpc_resolve_address(
exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties,
- grpc_closure_create(dns_on_resolved, r, grpc_schedule_on_exec_ctx),
+ grpc_closure_create(dns_on_resolved_locked, r,
+ grpc_combiner_scheduler(r->base.combiner, false)),
&r->addresses);
}
@@ -240,7 +233,6 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
dns_resolver *r = (dns_resolver *)gr;
- gpr_mu_destroy(&r->mu);
if (r->resolved_result != NULL) {
grpc_channel_args_destroy(exec_ctx, r->resolved_result);
}
@@ -264,8 +256,7 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx,
// Create resolver.
dns_resolver *r = gpr_malloc(sizeof(dns_resolver));
memset(r, 0, sizeof(*r));
- gpr_mu_init(&r->mu);
- grpc_resolver_init(&r->base, &dns_resolver_vtable);
+ grpc_resolver_init(&r->base, &dns_resolver_vtable, args->combiner);
r->name_to_resolve = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
r->channel_args = grpc_channel_args_copy(args->args);
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index a1365f6465..e7f66649b5 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -45,6 +45,7 @@
#include "src/core/ext/client_channel/parse_address.h"
#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -58,8 +59,6 @@ typedef struct {
grpc_lb_addresses *addresses;
/** channel args */
grpc_channel_args *channel_args;
- /** mutex guarding the rest of the state */
- gpr_mu mu;
/** have we published? */
bool published;
/** pending next completion, or NULL */
@@ -73,48 +72,43 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
sockaddr_resolver *r);
-static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
-static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *r);
-static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
- grpc_channel_args **target_result,
- grpc_closure *on_complete);
+static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *r);
+static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
+ grpc_channel_args **target_result,
+ grpc_closure *on_complete);
static const grpc_resolver_vtable sockaddr_resolver_vtable = {
- sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error,
- sockaddr_next};
+ sockaddr_destroy, sockaddr_shutdown_locked,
+ sockaddr_channel_saw_error_locked, sockaddr_next_locked};
-static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver) {
+static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
- gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_result = NULL;
grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE);
r->next_completion = NULL;
}
- gpr_mu_unlock(&r->mu);
}
-static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
- grpc_resolver *resolver) {
+static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
- gpr_mu_lock(&r->mu);
r->published = false;
sockaddr_maybe_finish_next_locked(exec_ctx, r);
- gpr_mu_unlock(&r->mu);
}
-static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_channel_args **target_result,
- grpc_closure *on_complete) {
+static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver,
+ grpc_channel_args **target_result,
+ grpc_closure *on_complete) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
- gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
r->target_result = target_result;
sockaddr_maybe_finish_next_locked(exec_ctx, r);
- gpr_mu_unlock(&r->mu);
}
static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
@@ -131,7 +125,6 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
- gpr_mu_destroy(&r->mu);
grpc_lb_addresses_destroy(exec_ctx, r->addresses);
grpc_channel_args_destroy(exec_ctx, r->channel_args);
gpr_free(r);
@@ -201,8 +194,7 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx,
memset(r, 0, sizeof(*r));
r->addresses = addresses;
r->channel_args = grpc_channel_args_copy(args->args);
- gpr_mu_init(&r->mu);
- grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
+ grpc_resolver_init(&r->base, &sockaddr_resolver_vtable, args->combiner);
return &r->base;
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index d1fab25478..28a3166832 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1114,8 +1114,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_chttp2_list_add_waiting_for_concurrency(t, s);
maybe_start_some_streams(exec_ctx, t);
} else {
- grpc_chttp2_cancel_stream(exec_ctx, t, s,
- GRPC_ERROR_CREATE("Transport closed"));
+ grpc_chttp2_cancel_stream(
+ exec_ctx, t, s,
+ grpc_error_set_int(GRPC_ERROR_CREATE("Transport closed"),
+ GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE));
}
} else {
GPR_ASSERT(s->id != 0);
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index ec973d4e7f..3fb2a60ac7 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -173,7 +173,6 @@ grpc_error *grpc_call_stack_init(
grpc_slice path, gpr_timespec start_time, gpr_timespec deadline,
grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
- grpc_call_element_args args;
size_t count = channel_stack->count;
grpc_call_element *call_elems;
char *user_data;
@@ -188,13 +187,15 @@ grpc_error *grpc_call_stack_init(
/* init per-filter data */
grpc_error *first_error = GRPC_ERROR_NONE;
- args.start_time = start_time;
+ const grpc_call_element_args args = {
+ .start_time = start_time,
+ .call_stack = call_stack,
+ .server_transport_data = transport_server_data,
+ .context = context,
+ .path = path,
+ .deadline = deadline,
+ };
for (i = 0; i < count; i++) {
- args.call_stack = call_stack;
- args.server_transport_data = transport_server_data;
- args.context = context;
- args.path = path;
- args.deadline = deadline;
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 1cf07d43c2..1e943dc2e5 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -131,7 +131,7 @@ typedef struct {
argument. */
grpc_error *(*init_call_elem)(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args);
+ const grpc_call_element_args *args);
void (*set_pollset_or_pollset_set)(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_polling_entity *pollent);
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 22781c7839..aa41014a21 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -274,7 +274,7 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index 068c61c92a..29796f7ca7 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -83,7 +83,7 @@ static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r = grpc_transport_init_stream(
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
index bc9a2effc2..f9668be0fa 100644
--- a/src/core/lib/channel/deadline_filter.c
+++ b/src/core/lib/channel/deadline_filter.c
@@ -52,9 +52,6 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
grpc_call_element* elem = arg;
grpc_deadline_state* deadline_state = elem->call_data;
- gpr_mu_lock(&deadline_state->timer_mu);
- deadline_state->timer_pending = false;
- gpr_mu_unlock(&deadline_state->timer_mu);
if (error != GRPC_ERROR_CANCELLED) {
grpc_call_element_signal_error(
exec_ctx, elem,
@@ -66,53 +63,64 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
}
// Starts the deadline timer.
-static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
- gpr_timespec deadline) {
- grpc_deadline_state* deadline_state = elem->call_data;
- deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- // Note: We do not start the timer if there is already a timer
- // pending. This should be okay, because this is only called from two
- // functions exported by this module: grpc_deadline_state_start(), which
- // starts the initial timer, and grpc_deadline_state_reset(), which
- // cancels any pre-existing timer before starting a new one. In
- // particular, we want to ensure that if grpc_deadline_state_start()
- // winds up trying to start the timer after grpc_deadline_state_reset()
- // has already done so, we ignore the value from the former.
- if (!deadline_state->timer_pending &&
- gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
- // Take a reference to the call stack, to be owned by the timer.
- GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
- deadline_state->timer_pending = true;
- grpc_closure_init(&deadline_state->timer_callback, timer_callback, elem,
- grpc_schedule_on_exec_ctx);
- grpc_timer_init(exec_ctx, &deadline_state->timer, deadline,
- &deadline_state->timer_callback,
- gpr_now(GPR_CLOCK_MONOTONIC));
- }
-}
static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
gpr_timespec deadline) {
+ deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+ if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) {
+ return;
+ }
grpc_deadline_state* deadline_state = elem->call_data;
- gpr_mu_lock(&deadline_state->timer_mu);
- start_timer_if_needed_locked(exec_ctx, elem, deadline);
- gpr_mu_unlock(&deadline_state->timer_mu);
+ grpc_deadline_timer_state cur_state;
+ grpc_closure* closure = NULL;
+retry:
+ cur_state =
+ (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state);
+ switch (cur_state) {
+ case GRPC_DEADLINE_STATE_PENDING:
+ // Note: We do not start the timer if there is already a timer
+ return;
+ case GRPC_DEADLINE_STATE_FINISHED:
+ if (gpr_atm_rel_cas(&deadline_state->timer_state,
+ GRPC_DEADLINE_STATE_FINISHED,
+ GRPC_DEADLINE_STATE_PENDING)) {
+ // If we've already created and destroyed a timer, we always create a
+ // new closure: we have no other guarantee that the inlined closure is
+ // not in use (it may hold a pending call to timer_callback)
+ closure = grpc_closure_create(timer_callback, elem,
+ grpc_schedule_on_exec_ctx);
+ } else {
+ goto retry;
+ }
+ break;
+ case GRPC_DEADLINE_STATE_INITIAL:
+ if (gpr_atm_rel_cas(&deadline_state->timer_state,
+ GRPC_DEADLINE_STATE_INITIAL,
+ GRPC_DEADLINE_STATE_PENDING)) {
+ closure =
+ grpc_closure_init(&deadline_state->timer_callback, timer_callback,
+ elem, grpc_schedule_on_exec_ctx);
+ } else {
+ goto retry;
+ }
+ break;
+ }
+ GPR_ASSERT(closure);
+ GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
+ grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure,
+ gpr_now(GPR_CLOCK_MONOTONIC));
}
// Cancels the deadline timer.
-static void cancel_timer_if_needed_locked(grpc_exec_ctx* exec_ctx,
- grpc_deadline_state* deadline_state) {
- if (deadline_state->timer_pending) {
- grpc_timer_cancel(exec_ctx, &deadline_state->timer);
- deadline_state->timer_pending = false;
- }
-}
static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_deadline_state* deadline_state) {
- gpr_mu_lock(&deadline_state->timer_mu);
- cancel_timer_if_needed_locked(exec_ctx, deadline_state);
- gpr_mu_unlock(&deadline_state->timer_mu);
+ if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING,
+ GRPC_DEADLINE_STATE_FINISHED)) {
+ grpc_timer_cancel(exec_ctx, &deadline_state->timer);
+ } else {
+ // timer was either in STATE_INITAL (nothing to cancel)
+ // OR in STATE_FINISHED (again nothing to cancel)
+ }
}
// Callback run when the call is complete.
@@ -120,8 +128,8 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
grpc_deadline_state* deadline_state = arg;
cancel_timer_if_needed(exec_ctx, deadline_state);
// Invoke the next callback.
- deadline_state->next_on_complete->cb(
- exec_ctx, deadline_state->next_on_complete->cb_arg, error);
+ grpc_closure_run(exec_ctx, deadline_state->next_on_complete,
+ GRPC_ERROR_REF(error));
}
// Inject our own on_complete callback into op.
@@ -138,14 +146,12 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_deadline_state* deadline_state = elem->call_data;
memset(deadline_state, 0, sizeof(*deadline_state));
deadline_state->call_stack = call_stack;
- gpr_mu_init(&deadline_state->timer_mu);
}
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem) {
grpc_deadline_state* deadline_state = elem->call_data;
cancel_timer_if_needed(exec_ctx, deadline_state);
- gpr_mu_destroy(&deadline_state->timer_mu);
}
// Callback and associated state for starting the timer after call stack
@@ -187,10 +193,8 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
gpr_timespec new_deadline) {
grpc_deadline_state* deadline_state = elem->call_data;
- gpr_mu_lock(&deadline_state->timer_mu);
- cancel_timer_if_needed_locked(exec_ctx, deadline_state);
- start_timer_if_needed_locked(exec_ctx, elem, new_deadline);
- gpr_mu_unlock(&deadline_state->timer_mu);
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ start_timer_if_needed(exec_ctx, elem, new_deadline);
}
void grpc_deadline_state_client_start_transport_stream_op(
@@ -244,7 +248,7 @@ typedef struct server_call_data {
// Constructor for call_data. Used for both client and server filters.
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
- grpc_call_element_args* args) {
+ const grpc_call_element_args* args) {
// Note: size of call data is different between client and server.
memset(elem->call_data, 0, elem->filter->sizeof_call_data);
grpc_deadline_state_init(exec_ctx, elem, args->call_stack);
diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h
index bd2b84f79e..94717f6bc7 100644
--- a/src/core/lib/channel/deadline_filter.h
+++ b/src/core/lib/channel/deadline_filter.h
@@ -35,16 +35,18 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/timer.h"
+typedef enum grpc_deadline_timer_state {
+ GRPC_DEADLINE_STATE_INITIAL,
+ GRPC_DEADLINE_STATE_PENDING,
+ GRPC_DEADLINE_STATE_FINISHED
+} grpc_deadline_timer_state;
+
// State used for filters that enforce call deadlines.
// Must be the first field in the filter's call_data.
typedef struct grpc_deadline_state {
// We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
- // Guards access to timer_pending and timer.
- gpr_mu timer_mu;
- // True if the timer callback is currently pending.
- bool timer_pending;
- // The deadline timer.
+ gpr_atm timer_state;
grpc_timer timer;
grpc_closure timer_callback;
// Closure to invoke when the call is complete.
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 49a2a980e0..c031533dd8 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -386,7 +386,7 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
calld->on_done_recv_initial_metadata = NULL;
calld->on_done_recv_trailing_metadata = NULL;
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index bb185351a8..ce519f9c92 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -252,12 +252,11 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
*calld->pp_recv_message = calld->payload_bin_delivered
? NULL
: (grpc_byte_stream *)&calld->read_stream;
- calld->recv_message_ready->cb(exec_ctx, calld->recv_message_ready->cb_arg,
- err);
+ grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
calld->recv_message_ready = NULL;
calld->payload_bin_delivered = true;
}
- calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, err);
+ grpc_closure_run(exec_ctx, calld->on_complete, GRPC_ERROR_REF(err));
}
static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
@@ -268,8 +267,7 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
/* do nothing. This is probably a GET request, and payload will be returned
in hs_on_complete callback. */
} else {
- calld->recv_message_ready->cb(exec_ctx, calld->recv_message_ready->cb_arg,
- err);
+ grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
}
}
@@ -343,7 +341,7 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index 5e22860cfb..22938c64b5 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -166,7 +166,7 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
// Constructor for call_data.
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
- grpc_call_element_args* args) {
+ const grpc_call_element_args* args) {
channel_data* chand = elem->channel_data;
call_data* calld = elem->call_data;
calld->next_recv_message_ready = NULL;
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index ffacdac393..2613512acb 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -137,8 +137,8 @@ typedef enum {
} grpc_error_times;
/// The following "special" errors can be propagated without allocating memory.
-/// They are always even so that other code (particularly combiner locks) can
-/// safely use the lower bit for themselves.
+/// They are always even so that other code (particularly combiner locks,
+/// polling engines) can safely use the lower bit for themselves.
#define GRPC_ERROR_NONE ((grpc_error *)NULL)
#define GRPC_ERROR_OOM ((grpc_error *)2)
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index fac3705142..11208b9ad1 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -68,7 +68,7 @@ static int grpc_polling_trace = 0; /* Disabled by default */
gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
}
-/* Uncomment the following enable extra checks on poll_object operations */
+/* Uncomment the following to enable extra checks on poll_object operations */
/* #define PO_DEBUG */
static int grpc_wakeup_signal = -1;
@@ -140,24 +140,61 @@ struct grpc_fd {
Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst;
- /* Indicates that the fd is shutdown and that any pending read/write closures
- should fail */
- bool shutdown;
- grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */
+ /* Internally stores data of type (grpc_error *). If the FD is shutdown, this
+ contains reason for shutdown (i.e a pointer to grpc_error) ORed with
+ FD_SHUTDOWN_BIT. Since address allocations are word-aligned, the lower bit
+ of (grpc_error *) addresses is guaranteed to be zero. Even if the
+ (grpc_error *), is of special types like GRPC_ERROR_NONE, GRPC_ERROR_OOM
+ etc, the lower bit is guaranteed to be zero.
- /* The fd is either closed or we relinquished control of it. In either cases,
- this indicates that the 'fd' on this structure is no longer valid */
+ Once an fd is shutdown, any pending or future read/write closures on the
+ fd should fail */
+ gpr_atm shutdown_error;
+
+ /* The fd is either closed or we relinquished control of it. In either
+ cases, this indicates that the 'fd' on this structure is no longer
+ valid */
bool orphaned;
- /* TODO: sreek - Move this to a lockfree implementation */
- grpc_closure *read_closure;
- grpc_closure *write_closure;
+ /* Closures to call when the fd is readable or writable respectively. These
+ fields contain one of the following values:
+ CLOSURE_READY : The fd has an I/O event of interest but there is no
+ closure yet to execute
+
+ CLOSURE_NOT_READY : The fd has no I/O event of interest
+
+ closure ptr : The closure to be executed when the fd has an I/O
+ event of interest
+
+ shutdown_error | FD_SHUTDOWN_BIT :
+ 'shutdown_error' field ORed with FD_SHUTDOWN_BIT.
+ This indicates that the fd is shutdown. Since all
+ memory allocations are word-aligned, the lower two
+ bits of the shutdown_error pointer are always 0. So
+ it is safe to OR these with FD_SHUTDOWN_BIT
+
+ Valid state transitions:
+
+ <closure ptr> <-----3------ CLOSURE_NOT_READY ----1----> CLOSURE_READY
+ | | ^ | ^ | |
+ | | | | | | |
+ | +--------------4----------+ 6 +---------2---------------+ |
+ | | |
+ | v |
+ +-----5-------> [shutdown_error | FD_SHUTDOWN_BIT] <----7---------+
+
+ For 1, 4 : See set_ready() function
+ For 2, 3 : See notify_on() function
+ For 5,6,7: See set_shutdown() function */
+ gpr_atm read_closure;
+ gpr_atm write_closure;
struct grpc_fd *freelist_next;
grpc_closure *on_done_closure;
- /* The pollset that last noticed that the fd is readable */
- grpc_pollset *read_notifier_pollset;
+ /* The pollset that last noticed that the fd is readable. The actual type
+ * stored in this is (grpc_pollset *) */
+ gpr_atm read_notifier_pollset;
grpc_iomgr_object iomgr_object;
};
@@ -180,8 +217,10 @@ static void fd_unref(grpc_fd *fd);
static void fd_global_init(void);
static void fd_global_shutdown(void);
-#define CLOSURE_NOT_READY ((grpc_closure *)0)
-#define CLOSURE_READY ((grpc_closure *)1)
+#define CLOSURE_NOT_READY ((gpr_atm)0)
+#define CLOSURE_READY ((gpr_atm)2)
+
+#define FD_SHUTDOWN_BIT 1
/*******************************************************************************
* Polling island Declarations
@@ -908,7 +947,11 @@ static void unref_by(grpc_fd *fd, int n) {
fd->freelist_next = fd_freelist;
fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object);
- if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
+
+ grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error);
+ /* Clear the least significant bit if it set (in case fd was shutdown) */
+ err = (grpc_error *)((intptr_t)err & ~FD_SHUTDOWN_BIT);
+ GRPC_ERROR_UNREF(err);
gpr_mu_unlock(&fd_freelist_mu);
} else {
@@ -972,13 +1015,14 @@ static grpc_fd *fd_create(int fd, const char *name) {
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
- new_fd->shutdown = false;
+ gpr_atm_no_barrier_store(&new_fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE);
new_fd->orphaned = false;
- new_fd->read_closure = CLOSURE_NOT_READY;
- new_fd->write_closure = CLOSURE_NOT_READY;
+ gpr_atm_no_barrier_store(&new_fd->read_closure, CLOSURE_NOT_READY);
+ gpr_atm_no_barrier_store(&new_fd->write_closure, CLOSURE_NOT_READY);
+ gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
+
new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL;
- new_fd->read_notifier_pollset = NULL;
gpr_mu_unlock(&new_fd->po.mu);
@@ -1060,101 +1104,206 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
GRPC_ERROR_UNREF(error);
}
-static grpc_error *fd_shutdown_error(grpc_fd *fd) {
- if (!fd->shutdown) {
- return GRPC_ERROR_NONE;
- } else {
- return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
+static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
+ grpc_closure *closure) {
+ while (true) {
+ /* Fast-path: CLOSURE_NOT_READY -> <closure>.
+ The 'release' cas here matches the 'acquire' load in set_ready and
+ set_shutdown ensuring that the closure (scheduled by set_ready or
+ set_shutdown) happens-after the I/O event on the fd */
+ if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
+ return; /* Fast-path successful. Return */
+ }
+
+ /* Slowpath. The 'acquire' load matches the 'release' cas in set_ready and
+ set_shutdown */
+ gpr_atm curr = gpr_atm_acq_load(state);
+ switch (curr) {
+ case CLOSURE_NOT_READY: {
+ break; /* retry */
+ }
+
+ case CLOSURE_READY: {
+ /* Change the state to CLOSURE_NOT_READY. Schedule the closure if
+ successful. If not, the state most likely transitioned to shutdown.
+ We should retry.
+
+ This can be a no-barrier cas since the state is being transitioned to
+ CLOSURE_NOT_READY; set_ready and set_shutdown do not schedule any
+ closure when transitioning out of CLOSURE_NO_READY state (i.e there
+ is no other code that needs to 'happen-after' this) */
+ if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ return; /* Slow-path successful. Return */
+ }
+
+ break; /* retry */
+ }
+
+ default: {
+ /* 'curr' is either a closure or the fd is shutdown(in which case 'curr'
+ contains a pointer to the shutdown-error). If the fd is shutdown,
+ schedule the closure with the shutdown error */
+ if ((curr & FD_SHUTDOWN_BIT) > 0) {
+ grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT);
+ grpc_closure_sched(
+ exec_ctx, closure,
+ GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1));
+ return;
+ }
+
+ /* There is already a closure!. This indicates a bug in the code */
+ gpr_log(GPR_ERROR,
+ "notify_on called with a previous callback still pending");
+ abort();
+ }
+ }
}
+
+ GPR_UNREACHABLE_CODE(return );
}
-static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure **st, grpc_closure *closure) {
- if (fd->shutdown) {
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
- } else if (*st == CLOSURE_NOT_READY) {
- /* not ready ==> switch to a waiting state by setting the closure */
- *st = closure;
- } else if (*st == CLOSURE_READY) {
- /* already ready ==> queue the closure to run immediately */
- *st = CLOSURE_NOT_READY;
- grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
- } else {
- /* upcallptr was set to a different closure. This is an error! */
- gpr_log(GPR_ERROR,
- "User called a notify_on function with a previous callback still "
- "pending");
- abort();
+static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
+ grpc_error *shutdown_err) {
+ /* Try the fast-path first (i.e expect the current value to be
+ CLOSURE_NOT_READY */
+ gpr_atm curr = CLOSURE_NOT_READY;
+ gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT;
+
+ while (true) {
+ /* The 'release' cas here matches the 'acquire' load in notify_on to ensure
+ that the closure it schedules 'happens-after' the set_shutdown is called
+ on the fd */
+ if (gpr_atm_rel_cas(state, curr, new_state)) {
+ return; /* Fast-path successful. Return */
+ }
+
+ /* Fallback to slowpath. This 'acquire' load matches the 'release' cas in
+ notify_on and set_ready */
+ curr = gpr_atm_acq_load(state);
+ switch (curr) {
+ case CLOSURE_READY: {
+ break; /* retry */
+ }
+
+ case CLOSURE_NOT_READY: {
+ break; /* retry */
+ }
+
+ default: {
+ /* 'curr' is either a closure or the fd is already shutdown */
+
+ /* If fd is already shutdown, we are done */
+ if ((curr & FD_SHUTDOWN_BIT) > 0) {
+ return;
+ }
+
+ /* Fd is not shutdown. Schedule the closure and move the state to
+ shutdown state. The 'release' cas here matches the 'acquire' load in
+ notify_on to ensure that the closure it schedules 'happens-after'
+ the set_shutdown is called on the fd */
+ if (gpr_atm_rel_cas(state, curr, new_state)) {
+ grpc_closure_sched(
+ exec_ctx, (grpc_closure *)curr,
+ GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1));
+ return;
+ }
+
+ /* 'curr' was a closure but now changed to a different state. We will
+ have to retry */
+ break;
+ }
+ }
}
+
+ GPR_UNREACHABLE_CODE(return );
}
-/* returns 1 if state becomes not ready */
-static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure **st) {
- if (*st == CLOSURE_READY) {
- /* duplicate ready ==> ignore */
- return 0;
- } else if (*st == CLOSURE_NOT_READY) {
- /* not ready, and not waiting ==> flag ready */
- *st = CLOSURE_READY;
- return 0;
- } else {
- /* waiting ==> queue closure */
- grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
- *st = CLOSURE_NOT_READY;
- return 1;
+static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) {
+ /* Try an optimistic case first (i.e assume current state is
+ CLOSURE_NOT_READY).
+
+ This 'release' cas matches the 'acquire' load in notify_on ensuring that
+ any closure (scheduled by notify_on) 'happens-after' the return from
+ epoll_pwait */
+ if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
+ return; /* early out */
+ }
+
+ /* The 'acquire' load here matches the 'release' cas in notify_on and
+ set_shutdown */
+ gpr_atm curr = gpr_atm_acq_load(state);
+ switch (curr) {
+ case CLOSURE_READY: {
+ /* Already ready. We are done here */
+ break;
+ }
+
+ case CLOSURE_NOT_READY: {
+ /* The state was not CLOSURE_NOT_READY when we checked initially at the
+ beginning of this function but now it is CLOSURE_NOT_READY again.
+ This is only possible if the state transitioned out of
+ CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then
+ back to CLOSURE_NOT_READY again (i.e after we entered this function,
+ the fd became "ready" and the necessary actions were already done).
+ So there is no need to make the state CLOSURE_READY now */
+ break;
+ }
+
+ default: {
+ /* 'curr' is either a closure or the fd is shutdown */
+ if ((curr & FD_SHUTDOWN_BIT) > 0) {
+ /* The fd is shutdown. Do nothing */
+ } else if (gpr_atm_no_barrier_cas(state, curr, CLOSURE_NOT_READY)) {
+ /* The cas above was no-barrier since the state is being transitioned to
+ CLOSURE_NOT_READY; notify_on and set_shutdown do not schedule any
+ closures when transitioning out of CLOSURE_NO_READY state (i.e there
+ is no other code that needs to 'happen-after' this) */
+
+ grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
+ }
+ /* else the state changed again (only possible by either a racing
+ set_ready or set_shutdown functions. In both these cases, the closure
+ would have been scheduled for execution. So we are done here */
+ break;
+ }
}
}
static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
grpc_fd *fd) {
- grpc_pollset *notifier = NULL;
-
- gpr_mu_lock(&fd->po.mu);
- notifier = fd->read_notifier_pollset;
- gpr_mu_unlock(&fd->po.mu);
-
- return notifier;
+ gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
+ return (grpc_pollset *)notifier;
}
static bool fd_is_shutdown(grpc_fd *fd) {
- gpr_mu_lock(&fd->po.mu);
- const bool r = fd->shutdown;
- gpr_mu_unlock(&fd->po.mu);
- return r;
+ grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error);
+ return (((intptr_t)err & FD_SHUTDOWN_BIT) > 0);
}
/* Might be called multiple times */
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
- gpr_mu_lock(&fd->po.mu);
- /* Do the actual shutdown only once */
- if (!fd->shutdown) {
- fd->shutdown = true;
- fd->shutdown_error = why;
-
+ /* Store the shutdown error ORed with FD_SHUTDOWN_BIT in fd->shutdown_error */
+ if (gpr_atm_rel_cas(&fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE,
+ (gpr_atm)why | FD_SHUTDOWN_BIT)) {
shutdown(fd->fd, SHUT_RDWR);
- /* Flush any pending read and write closures. Since fd->shutdown is 'true'
- at this point, the closures would be called with 'success = false' */
- set_ready_locked(exec_ctx, fd, &fd->read_closure);
- set_ready_locked(exec_ctx, fd, &fd->write_closure);
+
+ set_shutdown(exec_ctx, fd, &fd->read_closure, why);
+ set_shutdown(exec_ctx, fd, &fd->write_closure, why);
} else {
+ /* Shutdown already called */
GRPC_ERROR_UNREF(why);
}
- gpr_mu_unlock(&fd->po.mu);
}
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- gpr_mu_lock(&fd->po.mu);
- notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
- gpr_mu_unlock(&fd->po.mu);
+ notify_on(exec_ctx, fd, &fd->read_closure, closure);
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- gpr_mu_lock(&fd->po.mu);
- notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
- gpr_mu_unlock(&fd->po.mu);
+ notify_on(exec_ctx, fd, &fd->write_closure, closure);
}
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
@@ -1343,18 +1492,19 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- /* Need the fd->po.mu since we might be racing with fd_notify_on_read */
- gpr_mu_lock(&fd->po.mu);
- set_ready_locked(exec_ctx, fd, &fd->read_closure);
- fd->read_notifier_pollset = notifier;
- gpr_mu_unlock(&fd->po.mu);
+ set_ready(exec_ctx, fd, &fd->read_closure);
+
+ /* Note, it is possible that fd_become_readable might be called twice with
+ different 'notifier's when an fd becomes readable and it is in two epoll
+ sets (This can happen briefly during polling island merges). In such cases
+ it does not really matter which notifer is set as the read_notifier_pollset
+ (They would both point to the same polling island anyway) */
+ /* Use release store to match with acquire load in fd_get_read_notifier */
+ gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- /* Need the fd->po.mu since we might be racing with fd_notify_on_write */
- gpr_mu_lock(&fd->po.mu);
- set_ready_locked(exec_ctx, fd, &fd->write_closure);
- gpr_mu_unlock(&fd->po.mu);
+ set_ready(exec_ctx, fd, &fd->write_closure);
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
@@ -1737,7 +1887,7 @@ retry:
(void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
(void *)bag);
/* No need to lock 'pi_new' here since this is a new polling island
- * and no one has a reference to it yet */
+ and no one has a reference to it yet */
polling_island_remove_all_fds_locked(pi_new, true, &error);
/* Ref and unref so that the polling island gets deleted during unref
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index 9b5f3209f0..79ff910738 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -113,14 +113,15 @@ static grpc_error *try_split_host_port(const char *name,
/* parse name, splitting it into host and port parts */
grpc_error *error;
gpr_split_host_port(name, host, port);
- if (host == NULL) {
+ if (*host == NULL) {
char *msg;
gpr_asprintf(&msg, "unparseable host:port: '%s'", name);
error = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
return error;
}
- if (port == NULL) {
+ if (*port == NULL) {
+ // TODO(murgatroid99): add tests for this case
if (default_port == NULL) {
char *msg;
gpr_asprintf(&msg, "no port in name '%s'", name);
diff --git a/src/core/lib/iomgr/socket_utils_windows.c b/src/core/lib/iomgr/socket_utils_windows.c
index 628ad4a45b..5bbe8fa34c 100644
--- a/src/core/lib/iomgr/socket_utils_windows.c
+++ b/src/core/lib/iomgr/socket_utils_windows.c
@@ -41,8 +41,12 @@
#include <grpc/support/log.h>
const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size) {
+#ifdef GPR_WIN_INET_NTOP
+ return inet_ntop(af, src, dst, size);
+#else
/* Windows InetNtopA wants a mutable ip pointer */
return InetNtopA(af, (void *)src, dst, size);
+#endif /* GPR_WIN_INET_NTOP */
}
#endif /* GRPC_WINDOWS_SOCKETUTILS */
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index 5225a5402b..3de0795187 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -46,6 +46,8 @@
#include "src/core/lib/iomgr/tcp_uv.h"
#include "src/core/lib/iomgr/timer.h"
+extern int grpc_tcp_trace;
+
typedef struct grpc_uv_tcp_connect {
uv_connect_t connect_req;
grpc_timer alarm;
@@ -70,6 +72,12 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp,
grpc_error *error) {
int done;
grpc_uv_tcp_connect *connect = acp;
+ if (grpc_tcp_trace) {
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s",
+ connect->addr_name, str);
+ grpc_error_free_string(str);
+ }
if (error == GRPC_ERROR_NONE) {
/* error == NONE implies that the timer ran out, and wasn't cancelled. If
it was cancelled, then the handler that cancelled it also should close
@@ -145,6 +153,12 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
connect->resource_quota = resource_quota;
uv_tcp_init(uv_default_loop(), connect->tcp_handle);
connect->connect_req.data = connect;
+
+ if (grpc_tcp_trace) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
+ connect->addr_name);
+ }
+
// TODO(murgatroid99): figure out what the return value here means
uv_tcp_connect(&connect->connect_req, connect->tcp_handle,
(const struct sockaddr *)resolved_addr->addr,
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index 8a5617e7c1..6d638bcbaa 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -180,25 +180,25 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
GPR_ASSERT(now.clock_type == g_clock_type);
timer->closure = closure;
timer->deadline = deadline;
- timer->triggered = 0;
if (!g_initialized) {
- timer->triggered = 1;
+ timer->pending = false;
grpc_closure_sched(
exec_ctx, timer->closure,
GRPC_ERROR_CREATE("Attempt to create timer before initialization"));
return;
}
+ gpr_mu_lock(&shard->mu);
+ timer->pending = true;
if (gpr_time_cmp(deadline, now) <= 0) {
- timer->triggered = 1;
+ timer->pending = false;
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE);
+ gpr_mu_unlock(&shard->mu);
+ /* early out */
return;
}
- /* TODO(ctiller): check deadline expired */
-
- gpr_mu_lock(&shard->mu);
grpc_time_averaged_stats_add_sample(&shard->stats,
ts_to_dbl(gpr_time_sub(deadline, now)));
if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) {
@@ -243,9 +243,9 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
gpr_mu_lock(&shard->mu);
- if (!timer->triggered) {
+ if (timer->pending) {
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
- timer->triggered = 1;
+ timer->pending = false;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
} else {
@@ -296,7 +296,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) {
}
timer = grpc_timer_heap_top(&shard->heap);
if (gpr_time_cmp(timer->deadline, now) > 0) return NULL;
- timer->triggered = 1;
+ timer->pending = false;
grpc_timer_heap_pop(&shard->heap);
return timer;
}
diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h
index 9d901c7e68..1608dce9fb 100644
--- a/src/core/lib/iomgr/timer_generic.h
+++ b/src/core/lib/iomgr/timer_generic.h
@@ -40,7 +40,7 @@
struct grpc_timer {
gpr_timespec deadline;
uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */
- int triggered;
+ bool pending;
struct grpc_timer *next;
struct grpc_timer *prev;
grpc_closure *closure;
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index fa2cdee964..f28a14405d 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -53,8 +53,8 @@ static void stop_uv_timer(uv_timer_t *handle) {
void run_expired_timer(uv_timer_t *handle) {
grpc_timer *timer = (grpc_timer *)handle->data;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GPR_ASSERT(!timer->triggered);
- timer->triggered = 1;
+ GPR_ASSERT(timer->pending);
+ timer->pending = 0;
grpc_closure_sched(&exec_ctx, timer->closure, GRPC_ERROR_NONE);
stop_uv_timer(handle);
grpc_exec_ctx_finish(&exec_ctx);
@@ -67,11 +67,11 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
uv_timer_t *uv_timer;
timer->closure = closure;
if (gpr_time_cmp(deadline, now) <= 0) {
- timer->triggered = 1;
+ timer->pending = 0;
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE);
return;
}
- timer->triggered = 0;
+ timer->pending = 1;
timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now));
uv_timer = gpr_malloc(sizeof(uv_timer_t));
uv_timer_init(uv_default_loop(), uv_timer);
@@ -81,8 +81,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
}
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
- if (!timer->triggered) {
- timer->triggered = 1;
+ if (timer->pending) {
+ timer->pending = 0;
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
stop_uv_timer((uv_timer_t *)timer->uv_timer);
}
diff --git a/src/core/lib/iomgr/timer_uv.h b/src/core/lib/iomgr/timer_uv.h
index 13cf8bd4fa..9870cd4a5c 100644
--- a/src/core/lib/iomgr/timer_uv.h
+++ b/src/core/lib/iomgr/timer_uv.h
@@ -41,7 +41,7 @@ struct grpc_timer {
/* This is actually a uv_timer_t*, but we want to keep platform-specific
types out of headers */
void *uv_timer;
- int triggered;
+ int pending;
};
#endif /* GRPC_CORE_LIB_IOMGR_TIMER_UV_H */
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index b9bbe1b304..a23082a866 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -302,7 +302,7 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(*calld));
return GRPC_ERROR_NONE;
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 36e81d6501..14619d97ca 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -197,7 +197,7 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 48a1e586e1..3c563bcc6f 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -101,6 +101,17 @@ typedef struct {
grpc_error *error;
} received_status;
+static gpr_atm pack_received_status(received_status r) {
+ return r.is_set ? (1 | (gpr_atm)r.error) : 0;
+}
+
+static received_status unpack_received_status(gpr_atm atm) {
+ return (atm & 1) == 0
+ ? (received_status){.is_set = false, .error = GRPC_ERROR_NONE}
+ : (received_status){.is_set = true,
+ .error = (grpc_error *)(atm & ~(gpr_atm)1)};
+}
+
#define MAX_ERRORS_PER_BATCH 3
typedef struct batch_control {
@@ -142,8 +153,6 @@ struct grpc_call {
bool destroy_called;
/** flag indicating that cancellation is inherited */
bool cancellation_is_inherited;
- /** bitmask of live batches */
- uint8_t used_batches;
/** which ops are in-flight */
bool sent_initial_metadata;
bool sending_message;
@@ -165,8 +174,8 @@ struct grpc_call {
Element 0 is initial metadata, element 1 is trailing metadata. */
grpc_metadata_array *buffered_metadata[2];
- /* Received call statuses from various sources */
- received_status status[STATUS_SOURCE_COUNT];
+ /* Packed received call statuses from various sources */
+ gpr_atm status[STATUS_SOURCE_COUNT];
/* Call data useful used for reporting. Only valid after the call has
* completed */
@@ -446,7 +455,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- GRPC_ERROR_UNREF(c->status[i].error);
+ GRPC_ERROR_UNREF(
+ unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error);
}
grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c);
@@ -614,13 +624,12 @@ static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
*/
static bool get_final_status_from(
- grpc_call *call, status_source from_source, bool allow_ok_status,
+ grpc_call *call, grpc_error *error, bool allow_ok_status,
void (*set_value)(grpc_status_code code, void *user_data),
void *set_value_user_data, grpc_slice *details) {
grpc_status_code code;
const char *msg = NULL;
- grpc_error_get_status(call->status[from_source].error, call->send_deadline,
- &code, &msg, NULL);
+ grpc_error_get_status(error, call->send_deadline, &code, &msg, NULL);
if (code == GRPC_STATUS_OK && !allow_ok_status) {
return false;
}
@@ -638,12 +647,15 @@ static void get_final_status(grpc_call *call,
void *user_data),
void *set_value_user_data, grpc_slice *details) {
int i;
+ received_status status[STATUS_SOURCE_COUNT];
+ for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+ status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i]));
+ }
if (grpc_call_error_trace) {
gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR");
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (call->status[i].is_set) {
- gpr_log(GPR_DEBUG, " %d: %s", i,
- grpc_error_string(call->status[i].error));
+ if (status[i].is_set) {
+ gpr_log(GPR_DEBUG, " %d: %s", i, grpc_error_string(status[i].error));
}
}
}
@@ -653,9 +665,9 @@ static void get_final_status(grpc_call *call,
/* search for the best status we can present: ideally the error we use has a
clearly defined grpc-status, and we'll prefer that. */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (call->status[i].is_set &&
- grpc_error_has_clear_grpc_status(call->status[i].error)) {
- if (get_final_status_from(call, (status_source)i, allow_ok_status != 0,
+ if (status[i].is_set &&
+ grpc_error_has_clear_grpc_status(status[i].error)) {
+ if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
set_value, set_value_user_data, details)) {
return;
}
@@ -663,8 +675,8 @@ static void get_final_status(grpc_call *call,
}
/* If no clearly defined status exists, search for 'anything' */
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (call->status[i].is_set) {
- if (get_final_status_from(call, (status_source)i, allow_ok_status != 0,
+ if (status[i].is_set) {
+ if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
set_value, set_value_user_data, details)) {
return;
}
@@ -681,12 +693,13 @@ static void get_final_status(grpc_call *call,
static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
status_source source, grpc_error *error) {
- if (call->status[source].is_set) {
+ if (!gpr_atm_rel_cas(&call->status[source],
+ pack_received_status((received_status){
+ .is_set = false, .error = GRPC_ERROR_NONE}),
+ pack_received_status((received_status){
+ .is_set = true, .error = error}))) {
GRPC_ERROR_UNREF(error);
- return;
}
- call->status[source].is_set = true;
- call->status[source].error = error;
}
/*******************************************************************************
@@ -997,25 +1010,48 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
return !(flags & invalid_positions);
}
-static batch_control *allocate_batch_control(grpc_call *call) {
- size_t i;
- for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) {
- if ((call->used_batches & (1 << i)) == 0) {
- call->used_batches = (uint8_t)(call->used_batches | (uint8_t)(1 << i));
- return &call->active_batches[i];
- }
+static int batch_slot_for_op(grpc_op_type type) {
+ switch (type) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ return 0;
+ case GRPC_OP_SEND_MESSAGE:
+ return 1;
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ return 2;
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ return 3;
+ case GRPC_OP_RECV_MESSAGE:
+ return 4;
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ return 5;
+ }
+ GPR_UNREACHABLE_CODE(return 123456789);
+}
+
+static batch_control *allocate_batch_control(grpc_call *call,
+ const grpc_op *ops,
+ size_t num_ops) {
+ int slot = batch_slot_for_op(ops[0].op);
+ for (size_t i = 1; i < num_ops; i++) {
+ int op_slot = batch_slot_for_op(ops[i].op);
+ slot = GPR_MIN(slot, op_slot);
+ }
+ batch_control *bctl = &call->active_batches[slot];
+ if (bctl->call != NULL) {
+ return NULL;
}
- return NULL;
+ memset(bctl, 0, sizeof(*bctl));
+ bctl->call = call;
+ return bctl;
}
static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_cq_completion *storage) {
batch_control *bctl = user_data;
grpc_call *call = bctl->call;
- gpr_mu_lock(&call->mu);
- call->used_batches = (uint8_t)(
- call->used_batches & ~(uint8_t)(1 << (bctl - call->active_batches)));
- gpr_mu_unlock(&call->mu);
+ bctl->call = NULL;
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
}
@@ -1098,12 +1134,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
if (bctl->is_notify_tag_closure) {
/* unrefs bctl->error */
+ bctl->call = NULL;
grpc_closure_run(exec_ctx, bctl->notify_tag, error);
- gpr_mu_lock(&call->mu);
- bctl->call->used_batches =
- (uint8_t)(bctl->call->used_batches &
- ~(uint8_t)(1 << (bctl - bctl->call->active_batches)));
- gpr_mu_unlock(&call->mu);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
/* unrefs bctl->error */
@@ -1315,6 +1347,11 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
finish_batch_step(exec_ctx, bctl);
}
+static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p,
+ grpc_cq_completion *completion) {
+ gpr_free(completion);
+}
+
static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_call *call, const grpc_op *ops,
size_t nops, void *notify_tag,
@@ -1329,32 +1366,34 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_metadata compression_md;
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
-
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
- /* TODO(ctiller): this feels like it could be made lock-free */
- gpr_mu_lock(&call->mu);
- bctl = allocate_batch_control(call);
- memset(bctl, 0, sizeof(*bctl));
- bctl->call = call;
- bctl->notify_tag = notify_tag;
- bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
-
- grpc_transport_stream_op *stream_op = &bctl->op;
- memset(stream_op, 0, sizeof(*stream_op));
- stream_op->covered_by_poller = true;
-
if (nops == 0) {
- GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
grpc_cq_begin_op(call->cq, notify_tag);
+ grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
+ free_no_op_completion, NULL,
+ gpr_malloc(sizeof(grpc_cq_completion)));
+ } else {
+ grpc_closure_sched(exec_ctx, notify_tag, GRPC_ERROR_NONE);
}
- gpr_mu_unlock(&call->mu);
- post_batch_completion(exec_ctx, bctl);
error = GRPC_CALL_OK;
goto done;
}
+ /* TODO(ctiller): this feels like it could be made lock-free */
+ bctl = allocate_batch_control(call, ops, nops);
+ if (bctl == NULL) {
+ return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
+ }
+ bctl->notify_tag = notify_tag;
+ bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0);
+
+ gpr_mu_lock(&call->mu);
+ grpc_transport_stream_op *stream_op = &bctl->op;
+ memset(stream_op, 0, sizeof(*stream_op));
+ stream_op->covered_by_poller = true;
+
/* rewrite batch ops into a transport op */
for (i = 0; i < nops; i++) {
op = &ops[i];
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index 48de0e1d5b..49bc4c114b 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -122,7 +122,7 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
gpr_atm_no_barrier_store(&calld->filled_metadata, 0);
return GRPC_ERROR_NONE;
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 6ab1c0d94d..7210c69fb0 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -879,7 +879,7 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
@@ -1198,7 +1198,9 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
crm->server_registered_method = rm;
crm->flags = rm->flags;
crm->has_host = has_host;
- crm->host = host;
+ if (has_host) {
+ crm->host = host;
+ }
crm->method = method;
}
GPR_ASSERT(slots <= UINT32_MAX);
diff --git a/src/core/plugin_registry/grpc_cronet_plugin_registry.c b/src/core/plugin_registry/grpc_cronet_plugin_registry.c
index d339ed327f..c97f47b397 100644
--- a/src/core/plugin_registry/grpc_cronet_plugin_registry.c
+++ b/src/core/plugin_registry/grpc_cronet_plugin_registry.c
@@ -37,10 +37,14 @@ extern void grpc_chttp2_plugin_init(void);
extern void grpc_chttp2_plugin_shutdown(void);
extern void grpc_client_channel_init(void);
extern void grpc_client_channel_shutdown(void);
+extern void grpc_load_reporting_plugin_init(void);
+extern void grpc_load_reporting_plugin_shutdown(void);
void grpc_register_built_in_plugins(void) {
grpc_register_plugin(grpc_chttp2_plugin_init,
grpc_chttp2_plugin_shutdown);
grpc_register_plugin(grpc_client_channel_init,
grpc_client_channel_shutdown);
+ grpc_register_plugin(grpc_load_reporting_plugin_init,
+ grpc_load_reporting_plugin_shutdown);
}