diff options
Diffstat (limited to 'src')
24 files changed, 821 insertions, 533 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 6cbc333b83..21ba4301ff 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -183,7 +183,7 @@ typedef struct client_channel_channel_data { grpc_pollset_set *interested_parties; /* the following properties are guarded by a mutex since API's require them - to be instantaniously available */ + to be instantaneously available */ gpr_mu info_mu; char *info_lb_policy_name; /** service config in JSON form */ @@ -200,9 +200,9 @@ typedef struct { grpc_lb_policy *lb_policy; } lb_policy_connectivity_watcher; -static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, - grpc_lb_policy *lb_policy, - grpc_connectivity_state current_state); +static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, + grpc_lb_policy *lb_policy, + grpc_connectivity_state current_state); static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, @@ -213,7 +213,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, state == GRPC_CHANNEL_SHUTDOWN) && chand->lb_policy != NULL) { /* cancel picks with wait_for_ready=false */ - grpc_lb_policy_cancel_picks( + grpc_lb_policy_cancel_picks_locked( exec_ctx, chand->lb_policy, /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, /* check= */ 0, GRPC_ERROR_REF(error)); @@ -237,7 +237,7 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, GRPC_ERROR_REF(error), "lb_changed"); if (w->state != GRPC_CHANNEL_SHUTDOWN) { - watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state); } } @@ -245,9 +245,9 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, gpr_free(w); } -static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, - grpc_lb_policy *lb_policy, - grpc_connectivity_state current_state) { +static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, + grpc_lb_policy *lb_policy, + grpc_connectivity_state current_state) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); @@ -256,8 +256,8 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_combiner_scheduler(chand->combiner, false)); w->state = current_state; w->lb_policy = lb_policy; - grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, - &w->on_changed); + grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state, + &w->on_changed); } static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, @@ -313,13 +313,14 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy_args lb_policy_args; lb_policy_args.args = chand->resolver_result; lb_policy_args.client_channel_factory = chand->client_channel_factory; + lb_policy_args.combiner = chand->combiner; lb_policy = grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); if (lb_policy != NULL) { GRPC_LB_POLICY_REF(lb_policy, "config_change"); GRPC_ERROR_UNREF(state_error); - state = - grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); + state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy, + &state_error); } // Find service config. channel_arg = @@ -383,7 +384,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, set_channel_connectivity_state_locked( exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); if (lb_policy != NULL) { - watch_lb_policy(exec_ctx, chand, lb_policy, state); + watch_lb_policy_locked(exec_ctx, chand, lb_policy, state); } GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next_locked(exec_ctx, chand->resolver, @@ -404,7 +405,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, } if (exit_idle) { - grpc_lb_policy_exit_idle(exec_ctx, lb_policy); + grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle"); } @@ -441,7 +442,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_closure_sched(exec_ctx, op->send_ping, GRPC_ERROR_CREATE("Ping with no load balancing")); } else { - grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); + grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping); op->bind_pollset = NULL; } op->send_ping = NULL; @@ -808,8 +809,9 @@ static bool pick_subchannel_locked( if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, - connected_subchannel, GRPC_ERROR_REF(error)); + grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy, + connected_subchannel, + GRPC_ERROR_REF(error)); } for (closure = chand->waiting_for_config_closures.head; closure != NULL; closure = closure->next_data.next) { @@ -848,7 +850,7 @@ static bool pick_subchannel_locked( const grpc_lb_policy_pick_args inputs = { initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem, gpr_inf_future(GPR_CLOCK_MONOTONIC)}; - const bool result = grpc_lb_policy_pick( + const bool result = grpc_lb_policy_pick_locked( exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); GPR_TIMER_END("pick_subchannel", 0); @@ -1216,7 +1218,7 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { channel_data *chand = arg; if (chand->lb_policy != NULL) { - grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); + grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy); } else { chand->exit_idle_when_lb_policy_arrives = true; if (!chand->started_resolving && chand->resolver != NULL) { diff --git a/src/core/ext/client_channel/lb_policy.c b/src/core/ext/client_channel/lb_policy.c index 90401b586f..aba51add53 100644 --- a/src/core/ext/client_channel/lb_policy.c +++ b/src/core/ext/client_channel/lb_policy.c @@ -32,14 +32,17 @@ */ #include "src/core/ext/client_channel/lb_policy.h" +#include "src/core/lib/iomgr/combiner.h" #define WEAK_REF_BITS 16 void grpc_lb_policy_init(grpc_lb_policy *policy, - const grpc_lb_policy_vtable *vtable) { + const grpc_lb_policy_vtable *vtable, + grpc_combiner *combiner) { policy->vtable = vtable; gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); policy->interested_parties = grpc_pollset_set_create(); + policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy"); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG @@ -71,6 +74,13 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF")); } +static void shutdown_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_lb_policy *policy = arg; + policy->vtable->shutdown_locked(exec_ctx, policy); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, policy, "strong-unref"); +} + void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { gpr_atm old_val = @@ -79,10 +89,15 @@ void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); gpr_atm check = 1 << WEAK_REF_BITS; if ((old_val & mask) == check) { - policy->vtable->shutdown(exec_ctx, policy); + grpc_closure_sched( + exec_ctx, + grpc_closure_create(shutdown_locked, policy, + grpc_combiner_scheduler(policy->combiner, false)), + GRPC_ERROR_NONE); + } else { + grpc_lb_policy_weak_unref(exec_ctx, + policy REF_FUNC_PASS_ARGS("strong-unref")); } - grpc_lb_policy_weak_unref(exec_ctx, - policy REF_FUNC_PASS_ARGS("strong-unref")); } void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { @@ -95,52 +110,58 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); if (old_val == 1) { grpc_pollset_set_destroy(exec_ctx, policy->interested_parties); + grpc_combiner *combiner = policy->combiner; policy->vtable->destroy(exec_ctx, policy); + GRPC_COMBINER_UNREF(exec_ctx, combiner, "lb_policy"); } } -int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, - grpc_closure *on_complete) { - return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data, - on_complete); +int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, + void **user_data, grpc_closure *on_complete) { + return policy->vtable->pick_locked(exec_ctx, policy, pick_args, target, + user_data, on_complete); } -void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target, - grpc_error *error) { - policy->vtable->cancel_pick(exec_ctx, policy, target, error); +void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + grpc_connected_subchannel **target, + grpc_error *error) { + policy->vtable->cancel_pick_locked(exec_ctx, policy, target, error); } -void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *policy, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error *error) { - policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask, - initial_metadata_flags_eq, error); +void grpc_lb_policy_cancel_picks_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error *error) { + policy->vtable->cancel_picks_locked(exec_ctx, policy, + initial_metadata_flags_mask, + initial_metadata_flags_eq, error); } -void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { - policy->vtable->exit_idle(exec_ctx, policy); +void grpc_lb_policy_exit_idle_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy) { + policy->vtable->exit_idle_locked(exec_ctx, policy); } -void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_closure *closure) { - policy->vtable->ping_one(exec_ctx, policy, closure); +void grpc_lb_policy_ping_one_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + grpc_closure *closure) { + policy->vtable->ping_one_locked(exec_ctx, policy, closure); } -void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_closure *closure) { - policy->vtable->notify_on_state_change(exec_ctx, policy, state, closure); +void grpc_lb_policy_notify_on_state_change_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_connectivity_state *state, grpc_closure *closure) { + policy->vtable->notify_on_state_change_locked(exec_ctx, policy, state, + closure); } -grpc_connectivity_state grpc_lb_policy_check_connectivity( +grpc_connectivity_state grpc_lb_policy_check_connectivity_locked( grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_error **connectivity_error) { - return policy->vtable->check_connectivity(exec_ctx, policy, - connectivity_error); + return policy->vtable->check_connectivity_locked(exec_ctx, policy, + connectivity_error); } diff --git a/src/core/ext/client_channel/lb_policy.h b/src/core/ext/client_channel/lb_policy.h index 120c641edc..3405709c2c 100644 --- a/src/core/ext/client_channel/lb_policy.h +++ b/src/core/ext/client_channel/lb_policy.h @@ -51,6 +51,8 @@ struct grpc_lb_policy { gpr_atm ref_pair; /* owned pointer to interested parties in load balancing decisions */ grpc_pollset_set *interested_parties; + /* combiner under which lb_policy actions take place */ + grpc_combiner *combiner; }; /** Extra arguments for an LB pick */ @@ -69,42 +71,44 @@ typedef struct grpc_lb_policy_pick_args { struct grpc_lb_policy_vtable { void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); + void (*shutdown_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** \see grpc_lb_policy_pick */ - int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, - grpc_closure *on_complete); + int (*pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, + grpc_closure *on_complete); /** \see grpc_lb_policy_cancel_pick */ - void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target, grpc_error *error); + void (*cancel_pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_connected_subchannel **target, + grpc_error *error); /** \see grpc_lb_policy_cancel_picks */ - void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, grpc_error *error); + void (*cancel_picks_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error *error); /** \see grpc_lb_policy_ping_one */ - void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_closure *closure); + void (*ping_one_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_closure *closure); /** Try to enter a READY connectivity state */ - void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); + void (*exit_idle_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** check the current connectivity of the lb_policy */ - grpc_connectivity_state (*check_connectivity)( + grpc_connectivity_state (*check_connectivity_locked)( grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_error **connectivity_error); /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the policy. Calling with a NULL \a state cancels the subscription. */ - void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_closure *closure); + void (*notify_on_state_change_locked)(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_closure *closure); }; /*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/ @@ -144,7 +148,8 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** called by concrete implementations to initialize the base struct */ void grpc_lb_policy_init(grpc_lb_policy *policy, - const grpc_lb_policy_vtable *vtable); + const grpc_lb_policy_vtable *vtable, + grpc_combiner *combiner); /** Finds an appropriate subchannel for a call, based on \a pick_args. @@ -159,43 +164,45 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, Any IO should be done under the \a interested_parties \a grpc_pollset_set in the \a grpc_lb_policy struct. */ -int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, - grpc_closure *on_complete); +int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, + void **user_data, grpc_closure *on_complete); /** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) against one of the connected subchannels managed by \a policy. */ -void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_closure *closure); +void grpc_lb_policy_ping_one_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + grpc_closure *closure); /** Cancel picks for \a target. The \a on_complete callback of the pending picks will be invoked with \a *target set to NULL. */ -void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target, - grpc_error *error); +void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + grpc_connected_subchannel **target, + grpc_error *error); /** Cancel all pending picks for which their \a initial_metadata_flags (as given in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq when AND'd with \a initial_metadata_flags_mask */ -void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *policy, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error *error); +void grpc_lb_policy_cancel_picks_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error *error); /** Try to enter a READY connectivity state */ -void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); +void grpc_lb_policy_exit_idle_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy); /* Call notify when the connectivity state of a channel changes from \a *state. * Updates \a *state with the new state of the policy */ -void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_closure *closure); +void grpc_lb_policy_notify_on_state_change_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_connectivity_state *state, grpc_closure *closure); -grpc_connectivity_state grpc_lb_policy_check_connectivity( +grpc_connectivity_state grpc_lb_policy_check_connectivity_locked( grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_error **connectivity_error); diff --git a/src/core/ext/client_channel/lb_policy_factory.h b/src/core/ext/client_channel/lb_policy_factory.h index 9b8b03f982..27c12c0d73 100644 --- a/src/core/ext/client_channel/lb_policy_factory.h +++ b/src/core/ext/client_channel/lb_policy_factory.h @@ -107,6 +107,7 @@ grpc_arg grpc_lb_addresses_create_channel_arg( typedef struct grpc_lb_policy_args { grpc_client_channel_factory *client_channel_factory; grpc_channel_args *args; + grpc_combiner *combiner; } grpc_lb_policy_args; struct grpc_lb_policy_factory_vtable { diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 09c68a91dd..f2da148e49 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -438,7 +438,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&w->subchannel->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); gpr_free(w); - follow_up->cb(exec_ctx, follow_up->cb_arg, error); + grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error)); } static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 8a2af48328..b5210cb046 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -115,6 +115,7 @@ #include "src/core/ext/lb_policy/grpclb/grpclb_channel.h" #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" @@ -285,9 +286,6 @@ typedef struct glb_lb_policy { /** base policy: must be first */ grpc_lb_policy base; - /** mutex protecting remaining members */ - gpr_mu mu; - /** who the client is trying to communicate with */ const char *server_name; grpc_client_channel_factory *cc_factory; @@ -557,9 +555,9 @@ static bool pick_from_internal_rr_locked( const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { GPR_ASSERT(rr_policy != NULL); - const bool pick_done = - grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target, - (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); + const bool pick_done = grpc_lb_policy_pick_locked( + exec_ctx, rr_policy, pick_args, target, (void **)&wc_arg->lb_token, + &wc_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { @@ -590,6 +588,7 @@ static grpc_lb_policy *create_rr_locked( grpc_lb_policy_args args; memset(&args, 0, sizeof(args)); args.client_channel_factory = glb_policy->cc_factory; + args.combiner = glb_policy->base.combiner; grpc_lb_addresses *addresses = process_serverlist_locked(exec_ctx, serverlist); @@ -608,8 +607,8 @@ static grpc_lb_policy *create_rr_locked( return rr; } -static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); +static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error); /* glb_policy->rr_policy may be NULL (initial handover) */ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { @@ -633,8 +632,8 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, grpc_error *new_rr_state_error = NULL; const grpc_connectivity_state new_rr_state = - grpc_lb_policy_check_connectivity(exec_ctx, new_rr_policy, - &new_rr_state_error); + grpc_lb_policy_check_connectivity_locked(exec_ctx, new_rr_policy, + &new_rr_state_error); /* Connectivity state is a function of the new RR policy just created */ const bool replace_old_rr = update_lb_connectivity_status_locked( exec_ctx, glb_policy, new_rr_state, new_rr_state_error); @@ -677,17 +676,18 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, rr_connectivity_data *rr_connectivity = gpr_malloc(sizeof(rr_connectivity_data)); memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); - grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, - rr_connectivity, grpc_schedule_on_exec_ctx); + grpc_closure_init(&rr_connectivity->on_change, + glb_rr_connectivity_changed_locked, rr_connectivity, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); rr_connectivity->glb_policy = glb_policy; rr_connectivity->state = new_rr_state; /* Subscribe to changes to the connectivity of the new RR */ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb"); - grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, - &rr_connectivity->state, - &rr_connectivity->on_change); - grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy); + grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, + &rr_connectivity->state, + &rr_connectivity->on_change); + grpc_lb_policy_exit_idle_locked(exec_ctx, glb_policy->rr_policy); /* Update picks and pings in wait */ pending_pick *pp; @@ -713,17 +713,16 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, - &pping->wrapped_notify_arg.wrapper_closure); + grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, + &pping->wrapped_notify_arg.wrapper_closure); } } -static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { rr_connectivity_data *rr_connectivity = arg; glb_lb_policy *glb_policy = rr_connectivity->glb_policy; - gpr_mu_lock(&glb_policy->mu); const bool shutting_down = glb_policy->shutting_down; bool unref_needed = false; GRPC_ERROR_REF(error); @@ -740,11 +739,10 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_connectivity->state, error); /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ - grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, - &rr_connectivity->state, - &rr_connectivity->on_change); + grpc_lb_policy_notify_on_state_change_locked( + exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, + &rr_connectivity->on_change); } - gpr_mu_unlock(&glb_policy->mu); if (unref_needed) { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "rr_connectivity_cb"); @@ -899,8 +897,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, gpr_free(glb_policy); return NULL; } - grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable); - gpr_mu_init(&glb_policy->mu); + grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); return &glb_policy->base; @@ -918,13 +915,11 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->serverlist != NULL) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } - gpr_mu_destroy(&glb_policy->mu); gpr_free(glb_policy); } -static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); glb_policy->shutting_down = true; pending_pick *pp = glb_policy->pending_picks; @@ -941,7 +936,6 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { * while holding glb_policy->mu: lb_on_server_status_received, invoked due to * the cancel, needs to acquire that same lock */ grpc_call *lb_call = glb_policy->lb_call; - gpr_mu_unlock(&glb_policy->mu); /* glb_policy->lb_call and this local lb_call must be consistent at this point * because glb_policy->lb_call is only assigned in lb_call_init_locked as part @@ -967,11 +961,10 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } } -static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target, - grpc_error *error) { +static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_connected_subchannel **target, + grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); pending_pick *pp = glb_policy->pending_picks; glb_policy->pending_picks = NULL; while (pp != NULL) { @@ -987,16 +980,15 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } - gpr_mu_unlock(&glb_policy->mu); GRPC_ERROR_UNREF(error); } -static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error *error) { +static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); pending_pick *pp = glb_policy->pending_picks; glb_policy->pending_picks = NULL; while (pp != NULL) { @@ -1012,7 +1004,6 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } - gpr_mu_unlock(&glb_policy->mu); GRPC_ERROR_UNREF(error); } @@ -1025,19 +1016,17 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx, query_for_backends_locked(exec_ctx, glb_policy); } -static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); if (!glb_policy->started_picking) { start_picking_locked(exec_ctx, glb_policy); } - gpr_mu_unlock(&glb_policy->mu); } -static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, - grpc_closure *on_complete) { +static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, + grpc_closure *on_complete) { if (pick_args->lb_token_mdelem_storage == NULL) { *target = NULL; grpc_closure_sched( @@ -1048,7 +1037,6 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); glb_policy->deadline = pick_args->deadline; bool pick_done; @@ -1087,53 +1075,43 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pick_done = false; } - gpr_mu_unlock(&glb_policy->mu); return pick_done; } -static grpc_connectivity_state glb_check_connectivity( +static grpc_connectivity_state glb_check_connectivity_locked( grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **connectivity_error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - grpc_connectivity_state st; - gpr_mu_lock(&glb_policy->mu); - st = grpc_connectivity_state_get(&glb_policy->state_tracker, - connectivity_error); - gpr_mu_unlock(&glb_policy->mu); - return st; + return grpc_connectivity_state_get(&glb_policy->state_tracker, + connectivity_error); } -static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { +static void glb_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); if (glb_policy->rr_policy) { - grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure); + grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure); } else { add_pending_ping(&glb_policy->pending_pings, closure); if (!glb_policy->started_picking) { start_picking_locked(exec_ctx, glb_policy); } } - gpr_mu_unlock(&glb_policy->mu); } -static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify) { +static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); grpc_connectivity_state_notify_on_state_change( exec_ctx, &glb_policy->state_tracker, current, notify); - - gpr_mu_unlock(&glb_policy->mu); } -static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); +static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error); +static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); @@ -1162,11 +1140,11 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_grpclb_request_destroy(request); grpc_closure_init(&glb_policy->lb_on_server_status_received, - lb_on_server_status_received, glb_policy, - grpc_schedule_on_exec_ctx); + lb_on_server_status_received_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); grpc_closure_init(&glb_policy->lb_on_response_received, - lb_on_response_received, glb_policy, - grpc_schedule_on_exec_ctx); + lb_on_response_received_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); gpr_backoff_init(&glb_policy->lb_call_backoff_state, GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS, @@ -1261,14 +1239,13 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(GRPC_CALL_OK == call_error); } -static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { glb_lb_policy *glb_policy = arg; grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; - gpr_mu_lock(&glb_policy->mu); if (glb_policy->lb_response_payload != NULL) { gpr_backoff_reset(&glb_policy->lb_call_backoff_state); /* Received data from the LB server. Look inside @@ -1342,20 +1319,17 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); } - gpr_mu_unlock(&glb_policy->mu); } else { /* empty payload: call cancelled. */ /* dispose of the "lb_on_response_received" weak ref taken in * query_for_backends_locked() and reused in every reception loop */ - gpr_mu_unlock(&glb_policy->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "lb_on_response_received_empty_payload"); } } -static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { glb_lb_policy *glb_policy = arg; - gpr_mu_lock(&glb_policy->mu); if (!glb_policy->shutting_down) { if (grpc_lb_glb_trace) { @@ -1365,15 +1339,13 @@ static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(glb_policy->lb_call == NULL); query_for_backends_locked(exec_ctx, glb_policy); } - gpr_mu_unlock(&glb_policy->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_on_retry_timer"); } -static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; - gpr_mu_lock(&glb_policy->mu); GPR_ASSERT(glb_policy->lb_call != NULL); @@ -1408,21 +1380,27 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, } } GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); - grpc_closure_init(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer, - glb_policy, grpc_schedule_on_exec_ctx); + grpc_closure_init( + &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked, + glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false)); grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, &glb_policy->lb_on_call_retry, now); } - gpr_mu_unlock(&glb_policy->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "lb_on_server_status_received"); } /* Code wiring the policy with the rest of the core */ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { - glb_destroy, glb_shutdown, glb_pick, - glb_cancel_pick, glb_cancel_picks, glb_ping_one, - glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change}; + glb_destroy, + glb_shutdown_locked, + glb_pick_locked, + glb_cancel_pick_locked, + glb_cancel_picks_locked, + glb_ping_one_locked, + glb_exit_idle_locked, + glb_check_connectivity_locked, + glb_notify_on_state_change_locked}; static void glb_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 1b965183f6..501cb6d94d 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -38,6 +38,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" @@ -57,11 +58,11 @@ typedef struct { grpc_closure connectivity_changed; - /** the selected channel (a grpc_connected_subchannel) */ - gpr_atm selected; + /** remaining members are protected by the combiner */ + + /** the selected channel */ + grpc_connected_subchannel *selected; - /** mutex protecting remaining members */ - gpr_mu mu; /** have we started picking? */ int started_picking; /** are we shut down? */ @@ -77,32 +78,24 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -#define GET_SELECTED(p) \ - ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected)) - static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connected_subchannel *selected = GET_SELECTED(p); size_t i; GPR_ASSERT(p->pending_picks == NULL); for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); } - if (selected != NULL) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "picked_first"); + if (p->selected != NULL) { + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first"); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); - gpr_mu_destroy(&p->mu); gpr_free(p); } -static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; - grpc_connected_subchannel *selected; - gpr_mu_lock(&p->mu); - selected = GET_SELECTED(p); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; @@ -110,15 +103,14 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE("Channel shutdown"), "shutdown"); /* cancel subscription */ - if (selected != NULL) { + if (p->selected != NULL) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, NULL, NULL, &p->connectivity_changed); + exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); } else if (p->num_subchannels > 0) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed); } - gpr_mu_unlock(&p->mu); while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; @@ -128,12 +120,11 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } } -static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target, - grpc_error *error) { +static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_connected_subchannel **target, + grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; - gpr_mu_lock(&p->mu); pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { @@ -150,17 +141,15 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } - gpr_mu_unlock(&p->mu); GRPC_ERROR_UNREF(error); } -static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error *error) { +static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; - gpr_mu_lock(&p->mu); pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { @@ -177,7 +166,6 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } - gpr_mu_unlock(&p->mu); GRPC_ERROR_UNREF(error); } @@ -192,63 +180,48 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { &p->connectivity_changed); } -static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - gpr_mu_lock(&p->mu); if (!p->started_picking) { start_picking(exec_ctx, p); } - gpr_mu_unlock(&p->mu); } -static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, - grpc_closure *on_complete) { +static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, + grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; /* Check atomically for a selected channel */ - grpc_connected_subchannel *selected = GET_SELECTED(p); - if (selected != NULL) { - *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked"); + if (p->selected != NULL) { + *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); return 1; } - /* No subchannel selected yet, so acquire lock and then attempt again */ - gpr_mu_lock(&p->mu); - selected = GET_SELECTED(p); - if (selected) { - gpr_mu_unlock(&p->mu); - *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked"); - return 1; - } else { - if (!p->started_picking) { - start_picking(exec_ctx, p); - } - pp = gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->target = target; - pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->on_complete = on_complete; - p->pending_picks = pp; - gpr_mu_unlock(&p->mu); - return 0; + /* No subchannel selected yet, so try again */ + if (!p->started_picking) { + start_picking(exec_ctx, p); } + pp = gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->target = target; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->on_complete = on_complete; + p->pending_picks = pp; + return 0; } -static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - pick_first_lb_policy *p = arg; +static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { size_t i; size_t num_subchannels = p->num_subchannels; grpc_subchannel **subchannels; - gpr_mu_lock(&p->mu); subchannels = p->subchannels; p->num_subchannels = 0; p->subchannels = NULL; - gpr_mu_unlock(&p->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); for (i = 0; i < num_subchannels; i++) { @@ -258,25 +231,19 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(subchannels); } -static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { pick_first_lb_policy *p = arg; grpc_subchannel *selected_subchannel; pending_pick *pp; - grpc_connected_subchannel *selected; GRPC_ERROR_REF(error); - gpr_mu_lock(&p->mu); - - selected = GET_SELECTED(p); - if (p->shutdown) { - gpr_mu_unlock(&p->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); GRPC_ERROR_UNREF(error); return; - } else if (selected != NULL) { + } else if (p->selected != NULL) { if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { /* if the selected channel goes bad, we're done */ p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN; @@ -286,7 +253,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, p->base.interested_parties, + exec_ctx, p->selected, p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); @@ -301,26 +268,21 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); selected_subchannel = p->subchannels[p->checking_subchannel]; - selected = - grpc_subchannel_get_connected_subchannel(selected_subchannel); - GPR_ASSERT(selected != NULL); - GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first"); + p->selected = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(selected_subchannel), + "picked_first"); /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); - gpr_atm_rel_store(&p->selected, (gpr_atm)selected); - grpc_closure_sched(exec_ctx, - grpc_closure_create(destroy_subchannels, p, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + destroy_subchannels_locked(exec_ctx, p); /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked"); + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, p->base.interested_parties, + exec_ctx, p->selected, p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: @@ -387,48 +349,44 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } } - gpr_mu_unlock(&p->mu); - GRPC_ERROR_UNREF(error); } -static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_error **error) { +static grpc_connectivity_state pf_check_connectivity_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connectivity_state st; - gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_get(&p->state_tracker, error); - gpr_mu_unlock(&p->mu); - return st; + return grpc_connectivity_state_get(&p->state_tracker, error); } -static void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify) { +static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - gpr_mu_lock(&p->mu); grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, current, notify); - gpr_mu_unlock(&p->mu); } -static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { +static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connected_subchannel *selected = GET_SELECTED(p); - if (selected) { - grpc_connected_subchannel_ping(exec_ctx, selected, closure); + if (p->selected) { + grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); } else { grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected")); } } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, pf_shutdown, pf_pick, - pf_cancel_pick, pf_cancel_picks, pf_ping_one, - pf_exit_idle, pf_check_connectivity, pf_notify_on_state_change}; + pf_destroy, + pf_shutdown_locked, + pf_pick_locked, + pf_cancel_pick_locked, + pf_cancel_picks_locked, + pf_ping_one_locked, + pf_exit_idle_locked, + pf_check_connectivity_locked, + pf_notify_on_state_change_locked}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} @@ -489,10 +447,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, } p->num_subchannels = subchannel_idx; - grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); - grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p, - grpc_schedule_on_exec_ctx); - gpr_mu_init(&p->mu); + grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); + grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p, + grpc_combiner_scheduler(args->combiner, false)); return &p->base; } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 63e3d033ad..687df170ad 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -67,6 +67,7 @@ #include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" @@ -134,7 +135,6 @@ typedef struct { struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; - gpr_mu mu; /** total number of addresses received at creation time */ size_t num_addresses; @@ -293,7 +293,6 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); - gpr_mu_destroy(&p->mu); elem = p->ready_list.next; while (elem != NULL && elem != &p->ready_list) { @@ -309,12 +308,11 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_free(p); } -static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; size_t i; - gpr_mu_lock(&p->mu); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); } @@ -335,15 +333,13 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); } - gpr_mu_unlock(&p->mu); } -static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target, - grpc_error *error) { +static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_connected_subchannel **target, + grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; - gpr_mu_lock(&p->mu); pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { @@ -360,17 +356,15 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } - gpr_mu_unlock(&p->mu); GRPC_ERROR_UNREF(error); } -static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error *error) { +static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; - gpr_mu_lock(&p->mu); pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { @@ -388,11 +382,11 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } - gpr_mu_unlock(&p->mu); GRPC_ERROR_UNREF(error); } -static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { +static void start_picking_locked(grpc_exec_ctx *exec_ctx, + round_robin_lb_policy *p) { size_t i; p->started_picking = 1; @@ -411,23 +405,20 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { } } -static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void rr_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - gpr_mu_lock(&p->mu); if (!p->started_picking) { - start_picking(exec_ctx, p); + start_picking_locked(exec_ctx, p); } - gpr_mu_unlock(&p->mu); } -static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, - grpc_closure *on_complete) { +static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, + grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; ready_list *selected; - gpr_mu_lock(&p->mu); if (grpc_lb_round_robin_trace) { gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); @@ -449,12 +440,11 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } /* only advance the last picked pointer if the selection was used */ advance_last_picked_locked(p); - gpr_mu_unlock(&p->mu); return 1; } else { /* no pick currently available. Save for later in list of pending picks */ if (!p->started_picking) { - start_picking(exec_ctx, p); + start_picking_locked(exec_ctx, p); } pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; @@ -463,7 +453,6 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp->initial_metadata_flags = pick_args->initial_metadata_flags; pp->user_data = user_data; p->pending_picks = pp; - gpr_mu_unlock(&p->mu); return 0; } } @@ -538,17 +527,15 @@ static grpc_connectivity_state update_lb_connectivity_status( return sd->curr_connectivity_state; } -static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; pending_pick *pp; GRPC_ERROR_REF(error); - gpr_mu_lock(&p->mu); if (p->shutdown) { - gpr_mu_unlock(&p->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); GRPC_ERROR_UNREF(error); return; @@ -645,56 +632,51 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); break; } - gpr_mu_unlock(&p->mu); GRPC_ERROR_UNREF(error); } -static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_error **error) { +static grpc_connectivity_state rr_check_connectivity_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - grpc_connectivity_state st; - gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_get(&p->state_tracker, error); - gpr_mu_unlock(&p->mu); - return st; + return grpc_connectivity_state_get(&p->state_tracker, error); } -static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify) { +static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - gpr_mu_lock(&p->mu); grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, current, notify); - gpr_mu_unlock(&p->mu); } -static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { +static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; ready_list *selected; grpc_connected_subchannel *target; - gpr_mu_lock(&p->mu); if ((selected = peek_next_connected_locked(p))) { - gpr_mu_unlock(&p->mu); target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), "rr_picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); } else { - gpr_mu_unlock(&p->mu); grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Round Robin not connected")); } } static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, rr_shutdown, rr_pick, - rr_cancel_pick, rr_cancel_picks, rr_ping_one, - rr_exit_idle, rr_check_connectivity, rr_notify_on_state_change}; + rr_destroy, + rr_shutdown_locked, + rr_pick_locked, + rr_cancel_pick_locked, + rr_cancel_picks_locked, + rr_ping_one_locked, + rr_exit_idle_locked, + rr_check_connectivity_locked, + rr_notify_on_state_change_locked}; static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} @@ -762,7 +744,8 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, } ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, - rr_connectivity_changed, sd, grpc_schedule_on_exec_ctx); + rr_connectivity_changed_locked, sd, + grpc_combiner_scheduler(args->combiner, false)); } } if (subchannel_idx == 0) { @@ -779,7 +762,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, p->ready_list.next = NULL; p->ready_list_last_pick = &p->ready_list; - grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); + grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); @@ -787,7 +770,6 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels", (void *)p, (unsigned long)p->num_subchannels); } - gpr_mu_init(&p->mu); return &p->base; } diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index 63df8e135f..84586cd998 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -173,6 +173,7 @@ static void add_header_data(framer_state *st, grpc_slice slice) { static uint8_t *add_tiny_header_data(framer_state *st, size_t len) { ensure_space(st, len); + st->stats->header_bytes += len; return grpc_slice_buffer_tiny_add(st->output, len); } diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index a6946ef9f3..ce519f9c92 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -252,12 +252,11 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, *calld->pp_recv_message = calld->payload_bin_delivered ? NULL : (grpc_byte_stream *)&calld->read_stream; - calld->recv_message_ready->cb(exec_ctx, calld->recv_message_ready->cb_arg, - err); + grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); calld->recv_message_ready = NULL; calld->payload_bin_delivered = true; } - calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, err); + grpc_closure_run(exec_ctx, calld->on_complete, GRPC_ERROR_REF(err)); } static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, @@ -268,8 +267,7 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data, /* do nothing. This is probably a GET request, and payload will be returned in hs_on_complete callback. */ } else { - calld->recv_message_ready->cb(exec_ctx, calld->recv_message_ready->cb_arg, - err); + grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err)); } } diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index ffacdac393..2613512acb 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -137,8 +137,8 @@ typedef enum { } grpc_error_times; /// The following "special" errors can be propagated without allocating memory. -/// They are always even so that other code (particularly combiner locks) can -/// safely use the lower bit for themselves. +/// They are always even so that other code (particularly combiner locks, +/// polling engines) can safely use the lower bit for themselves. #define GRPC_ERROR_NONE ((grpc_error *)NULL) #define GRPC_ERROR_OOM ((grpc_error *)2) diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index fac3705142..11208b9ad1 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -68,7 +68,7 @@ static int grpc_polling_trace = 0; /* Disabled by default */ gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \ } -/* Uncomment the following enable extra checks on poll_object operations */ +/* Uncomment the following to enable extra checks on poll_object operations */ /* #define PO_DEBUG */ static int grpc_wakeup_signal = -1; @@ -140,24 +140,61 @@ struct grpc_fd { Ref/Unref by two to avoid altering the orphaned bit */ gpr_atm refst; - /* Indicates that the fd is shutdown and that any pending read/write closures - should fail */ - bool shutdown; - grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */ + /* Internally stores data of type (grpc_error *). If the FD is shutdown, this + contains reason for shutdown (i.e a pointer to grpc_error) ORed with + FD_SHUTDOWN_BIT. Since address allocations are word-aligned, the lower bit + of (grpc_error *) addresses is guaranteed to be zero. Even if the + (grpc_error *), is of special types like GRPC_ERROR_NONE, GRPC_ERROR_OOM + etc, the lower bit is guaranteed to be zero. - /* The fd is either closed or we relinquished control of it. In either cases, - this indicates that the 'fd' on this structure is no longer valid */ + Once an fd is shutdown, any pending or future read/write closures on the + fd should fail */ + gpr_atm shutdown_error; + + /* The fd is either closed or we relinquished control of it. In either + cases, this indicates that the 'fd' on this structure is no longer + valid */ bool orphaned; - /* TODO: sreek - Move this to a lockfree implementation */ - grpc_closure *read_closure; - grpc_closure *write_closure; + /* Closures to call when the fd is readable or writable respectively. These + fields contain one of the following values: + CLOSURE_READY : The fd has an I/O event of interest but there is no + closure yet to execute + + CLOSURE_NOT_READY : The fd has no I/O event of interest + + closure ptr : The closure to be executed when the fd has an I/O + event of interest + + shutdown_error | FD_SHUTDOWN_BIT : + 'shutdown_error' field ORed with FD_SHUTDOWN_BIT. + This indicates that the fd is shutdown. Since all + memory allocations are word-aligned, the lower two + bits of the shutdown_error pointer are always 0. So + it is safe to OR these with FD_SHUTDOWN_BIT + + Valid state transitions: + + <closure ptr> <-----3------ CLOSURE_NOT_READY ----1----> CLOSURE_READY + | | ^ | ^ | | + | | | | | | | + | +--------------4----------+ 6 +---------2---------------+ | + | | | + | v | + +-----5-------> [shutdown_error | FD_SHUTDOWN_BIT] <----7---------+ + + For 1, 4 : See set_ready() function + For 2, 3 : See notify_on() function + For 5,6,7: See set_shutdown() function */ + gpr_atm read_closure; + gpr_atm write_closure; struct grpc_fd *freelist_next; grpc_closure *on_done_closure; - /* The pollset that last noticed that the fd is readable */ - grpc_pollset *read_notifier_pollset; + /* The pollset that last noticed that the fd is readable. The actual type + * stored in this is (grpc_pollset *) */ + gpr_atm read_notifier_pollset; grpc_iomgr_object iomgr_object; }; @@ -180,8 +217,10 @@ static void fd_unref(grpc_fd *fd); static void fd_global_init(void); static void fd_global_shutdown(void); -#define CLOSURE_NOT_READY ((grpc_closure *)0) -#define CLOSURE_READY ((grpc_closure *)1) +#define CLOSURE_NOT_READY ((gpr_atm)0) +#define CLOSURE_READY ((gpr_atm)2) + +#define FD_SHUTDOWN_BIT 1 /******************************************************************************* * Polling island Declarations @@ -908,7 +947,11 @@ static void unref_by(grpc_fd *fd, int n) { fd->freelist_next = fd_freelist; fd_freelist = fd; grpc_iomgr_unregister_object(&fd->iomgr_object); - if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error); + + grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error); + /* Clear the least significant bit if it set (in case fd was shutdown) */ + err = (grpc_error *)((intptr_t)err & ~FD_SHUTDOWN_BIT); + GRPC_ERROR_UNREF(err); gpr_mu_unlock(&fd_freelist_mu); } else { @@ -972,13 +1015,14 @@ static grpc_fd *fd_create(int fd, const char *name) { gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; - new_fd->shutdown = false; + gpr_atm_no_barrier_store(&new_fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE); new_fd->orphaned = false; - new_fd->read_closure = CLOSURE_NOT_READY; - new_fd->write_closure = CLOSURE_NOT_READY; + gpr_atm_no_barrier_store(&new_fd->read_closure, CLOSURE_NOT_READY); + gpr_atm_no_barrier_store(&new_fd->write_closure, CLOSURE_NOT_READY); + gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); + new_fd->freelist_next = NULL; new_fd->on_done_closure = NULL; - new_fd->read_notifier_pollset = NULL; gpr_mu_unlock(&new_fd->po.mu); @@ -1060,101 +1104,206 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, GRPC_ERROR_UNREF(error); } -static grpc_error *fd_shutdown_error(grpc_fd *fd) { - if (!fd->shutdown) { - return GRPC_ERROR_NONE; - } else { - return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1); +static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, + grpc_closure *closure) { + while (true) { + /* Fast-path: CLOSURE_NOT_READY -> <closure>. + The 'release' cas here matches the 'acquire' load in set_ready and + set_shutdown ensuring that the closure (scheduled by set_ready or + set_shutdown) happens-after the I/O event on the fd */ + if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) { + return; /* Fast-path successful. Return */ + } + + /* Slowpath. The 'acquire' load matches the 'release' cas in set_ready and + set_shutdown */ + gpr_atm curr = gpr_atm_acq_load(state); + switch (curr) { + case CLOSURE_NOT_READY: { + break; /* retry */ + } + + case CLOSURE_READY: { + /* Change the state to CLOSURE_NOT_READY. Schedule the closure if + successful. If not, the state most likely transitioned to shutdown. + We should retry. + + This can be a no-barrier cas since the state is being transitioned to + CLOSURE_NOT_READY; set_ready and set_shutdown do not schedule any + closure when transitioning out of CLOSURE_NO_READY state (i.e there + is no other code that needs to 'happen-after' this) */ + if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) { + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); + return; /* Slow-path successful. Return */ + } + + break; /* retry */ + } + + default: { + /* 'curr' is either a closure or the fd is shutdown(in which case 'curr' + contains a pointer to the shutdown-error). If the fd is shutdown, + schedule the closure with the shutdown error */ + if ((curr & FD_SHUTDOWN_BIT) > 0) { + grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT); + grpc_closure_sched( + exec_ctx, closure, + GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1)); + return; + } + + /* There is already a closure!. This indicates a bug in the code */ + gpr_log(GPR_ERROR, + "notify_on called with a previous callback still pending"); + abort(); + } + } } + + GPR_UNREACHABLE_CODE(return ); } -static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_closure **st, grpc_closure *closure) { - if (fd->shutdown) { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown")); - } else if (*st == CLOSURE_NOT_READY) { - /* not ready ==> switch to a waiting state by setting the closure */ - *st = closure; - } else if (*st == CLOSURE_READY) { - /* already ready ==> queue the closure to run immediately */ - *st = CLOSURE_NOT_READY; - grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd)); - } else { - /* upcallptr was set to a different closure. This is an error! */ - gpr_log(GPR_ERROR, - "User called a notify_on function with a previous callback still " - "pending"); - abort(); +static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, + grpc_error *shutdown_err) { + /* Try the fast-path first (i.e expect the current value to be + CLOSURE_NOT_READY */ + gpr_atm curr = CLOSURE_NOT_READY; + gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT; + + while (true) { + /* The 'release' cas here matches the 'acquire' load in notify_on to ensure + that the closure it schedules 'happens-after' the set_shutdown is called + on the fd */ + if (gpr_atm_rel_cas(state, curr, new_state)) { + return; /* Fast-path successful. Return */ + } + + /* Fallback to slowpath. This 'acquire' load matches the 'release' cas in + notify_on and set_ready */ + curr = gpr_atm_acq_load(state); + switch (curr) { + case CLOSURE_READY: { + break; /* retry */ + } + + case CLOSURE_NOT_READY: { + break; /* retry */ + } + + default: { + /* 'curr' is either a closure or the fd is already shutdown */ + + /* If fd is already shutdown, we are done */ + if ((curr & FD_SHUTDOWN_BIT) > 0) { + return; + } + + /* Fd is not shutdown. Schedule the closure and move the state to + shutdown state. The 'release' cas here matches the 'acquire' load in + notify_on to ensure that the closure it schedules 'happens-after' + the set_shutdown is called on the fd */ + if (gpr_atm_rel_cas(state, curr, new_state)) { + grpc_closure_sched( + exec_ctx, (grpc_closure *)curr, + GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1)); + return; + } + + /* 'curr' was a closure but now changed to a different state. We will + have to retry */ + break; + } + } } + + GPR_UNREACHABLE_CODE(return ); } -/* returns 1 if state becomes not ready */ -static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_closure **st) { - if (*st == CLOSURE_READY) { - /* duplicate ready ==> ignore */ - return 0; - } else if (*st == CLOSURE_NOT_READY) { - /* not ready, and not waiting ==> flag ready */ - *st = CLOSURE_READY; - return 0; - } else { - /* waiting ==> queue closure */ - grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd)); - *st = CLOSURE_NOT_READY; - return 1; +static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) { + /* Try an optimistic case first (i.e assume current state is + CLOSURE_NOT_READY). + + This 'release' cas matches the 'acquire' load in notify_on ensuring that + any closure (scheduled by notify_on) 'happens-after' the return from + epoll_pwait */ + if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) { + return; /* early out */ + } + + /* The 'acquire' load here matches the 'release' cas in notify_on and + set_shutdown */ + gpr_atm curr = gpr_atm_acq_load(state); + switch (curr) { + case CLOSURE_READY: { + /* Already ready. We are done here */ + break; + } + + case CLOSURE_NOT_READY: { + /* The state was not CLOSURE_NOT_READY when we checked initially at the + beginning of this function but now it is CLOSURE_NOT_READY again. + This is only possible if the state transitioned out of + CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then + back to CLOSURE_NOT_READY again (i.e after we entered this function, + the fd became "ready" and the necessary actions were already done). + So there is no need to make the state CLOSURE_READY now */ + break; + } + + default: { + /* 'curr' is either a closure or the fd is shutdown */ + if ((curr & FD_SHUTDOWN_BIT) > 0) { + /* The fd is shutdown. Do nothing */ + } else if (gpr_atm_no_barrier_cas(state, curr, CLOSURE_NOT_READY)) { + /* The cas above was no-barrier since the state is being transitioned to + CLOSURE_NOT_READY; notify_on and set_shutdown do not schedule any + closures when transitioning out of CLOSURE_NO_READY state (i.e there + is no other code that needs to 'happen-after' this) */ + + grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE); + } + /* else the state changed again (only possible by either a racing + set_ready or set_shutdown functions. In both these cases, the closure + would have been scheduled for execution. So we are done here */ + break; + } } } static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_pollset *notifier = NULL; - - gpr_mu_lock(&fd->po.mu); - notifier = fd->read_notifier_pollset; - gpr_mu_unlock(&fd->po.mu); - - return notifier; + gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset); + return (grpc_pollset *)notifier; } static bool fd_is_shutdown(grpc_fd *fd) { - gpr_mu_lock(&fd->po.mu); - const bool r = fd->shutdown; - gpr_mu_unlock(&fd->po.mu); - return r; + grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error); + return (((intptr_t)err & FD_SHUTDOWN_BIT) > 0); } /* Might be called multiple times */ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { - gpr_mu_lock(&fd->po.mu); - /* Do the actual shutdown only once */ - if (!fd->shutdown) { - fd->shutdown = true; - fd->shutdown_error = why; - + /* Store the shutdown error ORed with FD_SHUTDOWN_BIT in fd->shutdown_error */ + if (gpr_atm_rel_cas(&fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE, + (gpr_atm)why | FD_SHUTDOWN_BIT)) { shutdown(fd->fd, SHUT_RDWR); - /* Flush any pending read and write closures. Since fd->shutdown is 'true' - at this point, the closures would be called with 'success = false' */ - set_ready_locked(exec_ctx, fd, &fd->read_closure); - set_ready_locked(exec_ctx, fd, &fd->write_closure); + + set_shutdown(exec_ctx, fd, &fd->read_closure, why); + set_shutdown(exec_ctx, fd, &fd->write_closure, why); } else { + /* Shutdown already called */ GRPC_ERROR_UNREF(why); } - gpr_mu_unlock(&fd->po.mu); } static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - gpr_mu_lock(&fd->po.mu); - notify_on_locked(exec_ctx, fd, &fd->read_closure, closure); - gpr_mu_unlock(&fd->po.mu); + notify_on(exec_ctx, fd, &fd->read_closure, closure); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - gpr_mu_lock(&fd->po.mu); - notify_on_locked(exec_ctx, fd, &fd->write_closure, closure); - gpr_mu_unlock(&fd->po.mu); + notify_on(exec_ctx, fd, &fd->write_closure, closure); } static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { @@ -1343,18 +1492,19 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { - /* Need the fd->po.mu since we might be racing with fd_notify_on_read */ - gpr_mu_lock(&fd->po.mu); - set_ready_locked(exec_ctx, fd, &fd->read_closure); - fd->read_notifier_pollset = notifier; - gpr_mu_unlock(&fd->po.mu); + set_ready(exec_ctx, fd, &fd->read_closure); + + /* Note, it is possible that fd_become_readable might be called twice with + different 'notifier's when an fd becomes readable and it is in two epoll + sets (This can happen briefly during polling island merges). In such cases + it does not really matter which notifer is set as the read_notifier_pollset + (They would both point to the same polling island anyway) */ + /* Use release store to match with acquire load in fd_get_read_notifier */ + gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier); } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - /* Need the fd->po.mu since we might be racing with fd_notify_on_write */ - gpr_mu_lock(&fd->po.mu); - set_ready_locked(exec_ctx, fd, &fd->write_closure); - gpr_mu_unlock(&fd->po.mu); + set_ready(exec_ctx, fd, &fd->write_closure); } static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, @@ -1737,7 +1887,7 @@ retry: (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type), (void *)bag); /* No need to lock 'pi_new' here since this is a new polling island - * and no one has a reference to it yet */ + and no one has a reference to it yet */ polling_island_remove_all_fds_locked(pi_new, true, &error); /* Ref and unref so that the polling island gets deleted during unref diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 5fb398c50b..5541c62068 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -79,8 +79,6 @@ typedef struct { grpc_pollset *pollset; } grpc_tcp; -static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); } - static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_resource_user_unref(exec_ctx, tcp->resource_user); gpr_free(tcp); @@ -119,6 +117,13 @@ static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif +static void uv_close_callback(uv_handle_t *handle) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_tcp *tcp = handle->data; + TCP_UNREF(&exec_ctx, tcp, "destroy"); + grpc_exec_ctx_finish(&exec_ctx); +} + static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -313,7 +318,6 @@ static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; uv_close((uv_handle_t *)tcp->handle, uv_close_callback); - TCP_UNREF(exec_ctx, tcp, "destroy"); } static char *uv_get_peer(grpc_endpoint *ep) { diff --git a/src/core/plugin_registry/grpc_cronet_plugin_registry.c b/src/core/plugin_registry/grpc_cronet_plugin_registry.c index d339ed327f..c97f47b397 100644 --- a/src/core/plugin_registry/grpc_cronet_plugin_registry.c +++ b/src/core/plugin_registry/grpc_cronet_plugin_registry.c @@ -37,10 +37,14 @@ extern void grpc_chttp2_plugin_init(void); extern void grpc_chttp2_plugin_shutdown(void); extern void grpc_client_channel_init(void); extern void grpc_client_channel_shutdown(void); +extern void grpc_load_reporting_plugin_init(void); +extern void grpc_load_reporting_plugin_shutdown(void); void grpc_register_built_in_plugins(void) { grpc_register_plugin(grpc_chttp2_plugin_init, grpc_chttp2_plugin_shutdown); grpc_register_plugin(grpc_client_channel_init, grpc_client_channel_shutdown); + grpc_register_plugin(grpc_load_reporting_plugin_init, + grpc_load_reporting_plugin_shutdown); } diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 68a8f4879b..32a52eee6b 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -45,6 +45,7 @@ using Google.Apis.Auth.OAuth2; using Google.Protobuf; using Grpc.Auth; using Grpc.Core; +using Grpc.Core.Logging; using Grpc.Core.Utils; using Grpc.Testing; using Newtonsoft.Json.Linq; @@ -95,6 +96,7 @@ namespace Grpc.IntegrationTesting public static void Run(string[] args) { + GrpcEnvironment.SetLogger(new ConsoleLogger()); var parserResult = Parser.Default.ParseArguments<ClientOptions>(args) .WithNotParsed(errors => Environment.Exit(1)) .WithParsed(options => diff --git a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs index 4118f99c2b..be99f92d0b 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs @@ -41,6 +41,7 @@ using System.Threading.Tasks; using CommandLine; using CommandLine.Text; using Grpc.Core; +using Grpc.Core.Logging; using Grpc.Core.Utils; using Grpc.Testing; using NUnit.Framework; @@ -68,6 +69,7 @@ namespace Grpc.IntegrationTesting public static void Run(string[] args) { + GrpcEnvironment.SetLogger(new ConsoleLogger()); var parserResult = Parser.Default.ParseArguments<ServerOptions>(args) .WithNotParsed(errors => Environment.Exit(1)) .WithParsed(options => diff --git a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs index 62a7347d42..b17b2c2183 100644 --- a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs +++ b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs @@ -42,6 +42,7 @@ using System.Threading.Tasks; using CommandLine; using CommandLine.Text; using Grpc.Core; +using Grpc.Core.Logging; using Grpc.Core.Utils; using Grpc.Testing; using NUnit.Framework; @@ -65,6 +66,7 @@ namespace Grpc.IntegrationTesting public static void Run(string[] args) { + GrpcEnvironment.SetLogger(new ConsoleLogger()); var parserResult = Parser.Default.ParseArguments<ServerOptions>(args) .WithNotParsed((x) => Environment.Exit(1)) .WithParsed(options => diff --git a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs index 750613b078..dbf8844265 100644 --- a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs @@ -92,6 +92,7 @@ namespace Grpc.IntegrationTesting public static void Run(string[] args) { + GrpcEnvironment.SetLogger(new ConsoleLogger()); var parserResult = Parser.Default.ParseArguments<ClientOptions>(args) .WithNotParsed((x) => Environment.Exit(1)) .WithParsed(options => { diff --git a/src/node/ext/completion_queue_threadpool.cc b/src/node/ext/completion_queue_threadpool.cc index 4881542f2d..1917074dc2 100644 --- a/src/node/ext/completion_queue_threadpool.cc +++ b/src/node/ext/completion_queue_threadpool.cc @@ -174,7 +174,6 @@ grpc_completion_queue *GetCompletionQueue() { } void CompletionQueueNext() { - gpr_log(GPR_DEBUG, "Called CompletionQueueNext"); CompletionQueueAsyncWorker::Next(); } diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 4761b2867d..ccb55aa54c 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -78,8 +78,6 @@ using v8::Value; Nan::Callback *Server::constructor; Persistent<FunctionTemplate> Server::fun_tpl; -static Callback *shutdown_callback; - class NewCallOp : public Op { public: NewCallOp() { @@ -128,51 +126,6 @@ class NewCallOp : public Op { std::string GetTypeString() const { return "new_call"; } }; -class ServerShutdownOp : public Op { - public: - ServerShutdownOp(grpc_server *server): server(server) { - } - - ~ServerShutdownOp() { - } - - Local<Value> GetNodeValue() const { - return Nan::New<External>(reinterpret_cast<void *>(server)); - } - - bool ParseOp(Local<Value> value, grpc_op *out) { - return true; - } - bool IsFinalOp() { - return false; - } - - grpc_server *server; - - protected: - std::string GetTypeString() const { return "shutdown"; } -}; - -NAN_METHOD(ServerShutdownCallback) { - if (!info[0]->IsNull()) { - return Nan::ThrowError("forceShutdown failed somehow"); - } - MaybeLocal<Object> maybe_result = Nan::To<Object>(info[1]); - Local<Object> result = maybe_result.ToLocalChecked(); - Local<Value> server_val = Nan::Get( - result, Nan::New("shutdown").ToLocalChecked()).ToLocalChecked(); - Local<External> server_extern = server_val.As<External>(); - grpc_server *server = reinterpret_cast<grpc_server *>(server_extern->Value()); - grpc_server_destroy(server); -} - -Server::Server(grpc_server *server) : wrapped_server(server) { -} - -Server::~Server() { - this->ShutdownServer(); -} - void Server::Init(Local<Object> exports) { HandleScope scope; Local<FunctionTemplate> tpl = Nan::New<FunctionTemplate>(New); @@ -187,11 +140,6 @@ void Server::Init(Local<Object> exports) { Local<Function> ctr = Nan::GetFunction(tpl).ToLocalChecked(); Nan::Set(exports, Nan::New("Server").ToLocalChecked(), ctr); constructor = new Callback(ctr); - - Local<FunctionTemplate>callback_tpl = - Nan::New<FunctionTemplate>(ServerShutdownCallback); - shutdown_callback = new Callback( - Nan::GetFunction(callback_tpl).ToLocalChecked()); } bool Server::HasInstance(Local<Value> val) { @@ -199,21 +147,6 @@ bool Server::HasInstance(Local<Value> val) { return Nan::New(fun_tpl)->HasInstance(val); } -void Server::ShutdownServer() { - if (this->wrapped_server != NULL) { - ServerShutdownOp *op = new ServerShutdownOp(this->wrapped_server); - unique_ptr<OpVec> ops(new OpVec()); - ops->push_back(unique_ptr<Op>(op)); - - grpc_server_shutdown_and_notify( - this->wrapped_server, GetCompletionQueue(), - new struct tag(new Callback(**shutdown_callback), ops.release(), NULL)); - grpc_server_cancel_all_calls(this->wrapped_server); - CompletionQueueNext(); - this->wrapped_server = NULL; - } -} - NAN_METHOD(Server::New) { /* If this is not a constructor call, make a constructor call and return the result */ diff --git a/src/node/ext/server.h b/src/node/ext/server.h index 9e6a7bd1e0..ab5fc210e8 100644 --- a/src/node/ext/server.h +++ b/src/node/ext/server.h @@ -73,6 +73,7 @@ class Server : public Nan::ObjectWrap { static Nan::Persistent<v8::FunctionTemplate> fun_tpl; grpc_server *wrapped_server; + grpc_completion_queue *shutdown_queue; }; } // namespace node diff --git a/src/node/ext/server_generic.cc b/src/node/ext/server_generic.cc new file mode 100644 index 0000000000..0cf20f754a --- /dev/null +++ b/src/node/ext/server_generic.cc @@ -0,0 +1,73 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_UV + +#include "server.h" + +#include <node.h> +#include <nan.h> +#include "grpc/grpc.h" +#include "grpc/support/time.h" + +namespace grpc { +namespace node { + +Server::Server(grpc_server *server) : wrapped_server(server) { + shutdown_queue = grpc_completion_queue_create(NULL); + grpc_server_register_non_listening_completion_queue(server, shutdown_queue, + NULL); +} + +Server::~Server() { + this->ShutdownServer(); + grpc_completion_queue_shutdown(this->shutdown_queue); + grpc_completion_queue_destroy(this->shutdown_queue); +} + +void Server::ShutdownServer() { + if (this->wrapped_server != NULL) { + grpc_server_shutdown_and_notify(this->wrapped_server, this->shutdown_queue, + NULL); + grpc_server_cancel_all_calls(this->wrapped_server); + grpc_completion_queue_pluck(this->shutdown_queue, NULL, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + grpc_server_destroy(this->wrapped_server); + this->wrapped_server = NULL; + } +} + +} // namespace grpc +} // namespace node + +#endif /* GRPC_UV */ diff --git a/src/node/ext/server_uv.cc b/src/node/ext/server_uv.cc new file mode 100644 index 0000000000..bf8b609a63 --- /dev/null +++ b/src/node/ext/server_uv.cc @@ -0,0 +1,131 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifdef GRPC_UV + +#include "server.h" + +#include <node.h> +#include <nan.h> +#include "grpc/grpc.h" +#include "grpc/support/time.h" + +#include "call.h" +#include "completion_queue.h" + +namespace grpc { +namespace node { + +using Nan::Callback; + +using v8::External; +using v8::Function; +using v8::FunctionTemplate; +using v8::Local; +using v8::MaybeLocal; +using v8::Object; +using v8::Value; + +static Callback *shutdown_callback = NULL; + +class ServerShutdownOp : public Op { + public: + ServerShutdownOp(grpc_server *server): server(server) { + } + + ~ServerShutdownOp() { + } + + Local<Value> GetNodeValue() const { + return Nan::New<External>(reinterpret_cast<void *>(server)); + } + + bool ParseOp(Local<Value> value, grpc_op *out) { + return true; + } + bool IsFinalOp() { + return false; + } + + grpc_server *server; + + protected: + std::string GetTypeString() const { return "shutdown"; } +}; + +Server::Server(grpc_server *server) : wrapped_server(server) { +} + +Server::~Server() { + this->ShutdownServer(); +} + +NAN_METHOD(ServerShutdownCallback) { + if (!info[0]->IsNull()) { + return Nan::ThrowError("forceShutdown failed somehow"); + } + MaybeLocal<Object> maybe_result = Nan::To<Object>(info[1]); + Local<Object> result = maybe_result.ToLocalChecked(); + Local<Value> server_val = Nan::Get( + result, Nan::New("shutdown").ToLocalChecked()).ToLocalChecked(); + Local<External> server_extern = server_val.As<External>(); + grpc_server *server = reinterpret_cast<grpc_server *>(server_extern->Value()); + grpc_server_destroy(server); +} + +void Server::ShutdownServer() { + if (this->wrapped_server != NULL) { + if (shutdown_callback == NULL) { + Local<FunctionTemplate>callback_tpl = + Nan::New<FunctionTemplate>(ServerShutdownCallback); + shutdown_callback = new Callback( + Nan::GetFunction(callback_tpl).ToLocalChecked()); + } + + ServerShutdownOp *op = new ServerShutdownOp(this->wrapped_server); + unique_ptr<OpVec> ops(new OpVec()); + ops->push_back(unique_ptr<Op>(op)); + + grpc_server_shutdown_and_notify( + this->wrapped_server, GetCompletionQueue(), + new struct tag(new Callback(**shutdown_callback), ops.release(), NULL)); + grpc_server_cancel_all_calls(this->wrapped_server); + CompletionQueueNext(); + this->wrapped_server = NULL; + } +} + +} // namespace grpc +} // namespace node + +#endif /* GRPC_UV */ diff --git a/src/proto/grpc/reflection/v1alpha/BUILD b/src/proto/grpc/reflection/v1alpha/BUILD new file mode 100644 index 0000000000..92dd3d7f68 --- /dev/null +++ b/src/proto/grpc/reflection/v1alpha/BUILD @@ -0,0 +1,39 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +licenses(["notice"]) # 3-clause BSD + +package(default_visibility = ["//visibility:public"]) + +load("//bazel:grpc_build_system.bzl", "grpc_proto_library") + +grpc_proto_library( + name = "reflection_proto", + srcs = ["reflection.proto"], +) |