diff options
author | 2017-05-23 14:10:10 -0700 | |
---|---|---|
committer | 2017-05-23 14:10:10 -0700 | |
commit | d74dbd3889d4cbd3f756d0d6392569bf358a88d8 (patch) | |
tree | d64647b9fa65c5996ce56203f3698cfbefc4ab46 /src/core/ext | |
parent | a194aab223af6558713b6482976a407b816ce15a (diff) | |
parent | 0a94f3c8ab55dfd12c14058d57f33121c8d6c411 (diff) |
Merge branch 'master' of https://github.com/Vizerai/grpc into intrusive_hash_map
Diffstat (limited to 'src/core/ext')
40 files changed, 1853 insertions, 687 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index 62f58fb278..f83670db82 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -132,7 +132,7 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, gpr_mu_lock(&w->mu); if (due_to_completion) { - if (grpc_trace_operation_failures) { + if (GRPC_TRACER_ON(grpc_trace_operation_failures)) { GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error)); } GRPC_ERROR_UNREF(error); diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 0463b25412..f2f27b9175 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -760,12 +760,6 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, #define CANCELLED_CALL ((grpc_subchannel_call *)1) -typedef enum { - /* zero so that it can be default-initialized */ - GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING = 0, - GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL -} subchannel_creation_phase; - /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. Handles queueing of stream ops until a call object is ready, waiting @@ -793,8 +787,9 @@ typedef struct client_channel_call_data { gpr_atm subchannel_call; gpr_arena *arena; - subchannel_creation_phase creation_phase; + bool pick_pending; grpc_connected_subchannel *connected_subchannel; + grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity *pollent; grpc_transport_stream_op_batch **waiting_ops; @@ -914,16 +909,18 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_call_element *elem = arg; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GPR_ASSERT(calld->creation_phase == - GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); + GPR_ASSERT(calld->pick_pending); + calld->pick_pending = false; grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (calld->connected_subchannel == NULL) { - gpr_atm_no_barrier_store(&calld->subchannel_call, 1); + gpr_atm_no_barrier_store(&calld->subchannel_call, (gpr_atm)CANCELLED_CALL); fail_locked(exec_ctx, calld, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Failed to create subchannel", &error, 1)); + error == GRPC_ERROR_NONE + ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Call dropped by load balancing policy") + : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed to create subchannel", &error, 1)); } else if (GET_CALL(calld) == CANCELLED_CALL) { /* already cancelled before subchannel became ready */ grpc_error *cancellation_error = @@ -944,7 +941,8 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, .path = calld->path, .start_time = calld->call_start_time, .deadline = calld->deadline, - .arena = calld->arena}; + .arena = calld->arena, + .context = calld->subchannel_call_context}; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); gpr_atm_rel_store(&calld->subchannel_call, @@ -973,6 +971,7 @@ typedef struct { grpc_metadata_batch *initial_metadata; uint32_t initial_metadata_flags; grpc_connected_subchannel **connected_subchannel; + grpc_call_context_element *subchannel_call_context; grpc_closure *on_ready; grpc_call_element *elem; grpc_closure closure; @@ -984,8 +983,8 @@ typedef struct { static bool pick_subchannel_locked( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, - grpc_error *error); + grpc_connected_subchannel **connected_subchannel, + grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready); static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -997,49 +996,49 @@ static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, - cpa->connected_subchannel, cpa->on_ready, - GRPC_ERROR_NONE)) { + cpa->connected_subchannel, + cpa->subchannel_call_context, cpa->on_ready)) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } } gpr_free(cpa); } +static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_error *error) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (chand->lb_policy != NULL) { + grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy, + &calld->connected_subchannel, + GRPC_ERROR_REF(error)); + } + for (grpc_closure *closure = chand->waiting_for_config_closures.head; + closure != NULL; closure = closure->next_data.next) { + continue_picking_args *cpa = closure->cb_arg; + if (cpa->connected_subchannel == &calld->connected_subchannel) { + cpa->connected_subchannel = NULL; + grpc_closure_sched(exec_ctx, cpa->on_ready, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick cancelled", &error, 1)); + } + } + GRPC_ERROR_UNREF(error); +} + static bool pick_subchannel_locked( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, - grpc_error *error) { + grpc_connected_subchannel **connected_subchannel, + grpc_call_context_element *subchannel_call_context, + grpc_closure *on_ready) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - continue_picking_args *cpa; - grpc_closure *closure; GPR_ASSERT(connected_subchannel); - if (initial_metadata == NULL) { - if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy, - connected_subchannel, - GRPC_ERROR_REF(error)); - } - for (closure = chand->waiting_for_config_closures.head; closure != NULL; - closure = closure->next_data.next) { - cpa = closure->cb_arg; - if (cpa->connected_subchannel == connected_subchannel) { - cpa->connected_subchannel = NULL; - grpc_closure_sched(exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); - } - } - GPR_TIMER_END("pick_subchannel", 0); - GRPC_ERROR_UNREF(error); - return true; - } - GPR_ASSERT(error == GRPC_ERROR_NONE); if (chand->lb_policy != NULL) { apply_final_configuration_locked(exec_ctx, elem); grpc_lb_policy *lb_policy = chand->lb_policy; @@ -1062,8 +1061,7 @@ static bool pick_subchannel_locked( } } const grpc_lb_policy_pick_args inputs = { - initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem, - gpr_inf_future(GPR_CLOCK_MONOTONIC)}; + initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem}; // Wrap the user-provided callback in order to hold a strong reference to // the LB policy for the duration of the pick. @@ -1076,8 +1074,8 @@ static bool pick_subchannel_locked( GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping"); w_on_pick_arg->lb_policy = lb_policy; const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, - &w_on_pick_arg->wrapper_closure); + exec_ctx, lb_policy, &inputs, connected_subchannel, + subchannel_call_context, NULL, &w_on_pick_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy, @@ -1096,10 +1094,11 @@ static bool pick_subchannel_locked( &chand->on_resolver_result_changed); } if (chand->resolver != NULL) { - cpa = gpr_malloc(sizeof(*cpa)); + continue_picking_args *cpa = gpr_malloc(sizeof(*cpa)); cpa->initial_metadata = initial_metadata; cpa->initial_metadata_flags = initial_metadata_flags; cpa->connected_subchannel = connected_subchannel; + cpa->subchannel_call_context = subchannel_call_context; cpa->on_ready = on_ready; cpa->elem = elem; grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, @@ -1151,16 +1150,13 @@ static void start_transport_stream_op_batch_locked_inner( error to the caller when the first op does get passed down. */ calld->cancel_error = GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error); - switch (calld->creation_phase) { - case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: - fail_locked(exec_ctx, calld, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - break; - case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: - pick_subchannel_locked( - exec_ctx, elem, NULL, 0, &calld->connected_subchannel, NULL, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - break; + if (calld->pick_pending) { + cancel_pick_locked( + exec_ctx, elem, + GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); + } else { + fail_locked(exec_ctx, calld, + GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); } grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, @@ -1170,9 +1166,9 @@ static void start_transport_stream_op_batch_locked_inner( } } /* if we don't have a subchannel, try to get one */ - if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && - calld->connected_subchannel == NULL && op->send_initial_metadata) { - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; + if (!calld->pick_pending && calld->connected_subchannel == NULL && + op->send_initial_metadata) { + calld->pick_pending = true; grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, grpc_combiner_scheduler(chand->combiner, true)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); @@ -1183,24 +1179,34 @@ static void start_transport_stream_op_batch_locked_inner( exec_ctx, elem, op->payload->send_initial_metadata.send_initial_metadata, op->payload->send_initial_metadata.send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step, GRPC_ERROR_NONE)) { - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + &calld->connected_subchannel, calld->subchannel_call_context, + &calld->next_step)) { + calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); + if (calld->connected_subchannel == NULL) { + gpr_atm_no_barrier_store(&calld->subchannel_call, + (gpr_atm)CANCELLED_CALL); + grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Call dropped by load balancing policy"); + fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + return; // Early out. + } } else { grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); } } /* if we've got a subchannel, then let's ask it to create a call */ - if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && - calld->connected_subchannel != NULL) { + if (!calld->pick_pending && calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; const grpc_connected_subchannel_call_args call_args = { .pollent = calld->pollent, .path = calld->path, .start_time = calld->call_start_time, .deadline = calld->deadline, - .arena = calld->arena}; + .arena = calld->arena, + .context = calld->subchannel_call_context}; grpc_error *error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); gpr_atm_rel_store(&calld->subchannel_call, @@ -1349,12 +1355,18 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, then_schedule_closure = NULL; GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } - GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); + GPR_ASSERT(!calld->pick_pending); GPR_ASSERT(calld->waiting_ops_count == 0); if (calld->connected_subchannel != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, "picked"); } + for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { + if (calld->subchannel_call_context[i].value != NULL) { + calld->subchannel_call_context[i].destroy( + calld->subchannel_call_context[i].value); + } + } gpr_free(calld->waiting_ops); grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } @@ -1450,12 +1462,12 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, void grpc_client_channel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *on_complete) { + grpc_connectivity_state *state, grpc_closure *closure) { channel_data *chand = elem->channel_data; external_connectivity_watcher *w = gpr_malloc(sizeof(*w)); w->chand = chand; w->pollset = pollset; - w->on_complete = on_complete; + w->on_complete = closure; w->state = state; grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c index 2d31499d13..112ba40658 100644 --- a/src/core/ext/filters/client_channel/lb_policy.c +++ b/src/core/ext/filters/client_channel/lb_policy.c @@ -119,9 +119,10 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { return policy->vtable->pick_locked(exec_ctx, policy, pick_args, target, - user_data, on_complete); + context, user_data, on_complete); } void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 25427666ae..fb4aa084a6 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -43,9 +43,6 @@ typedef struct grpc_lb_policy grpc_lb_policy; typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable; -typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, - grpc_status_code status, const char *errmsg); - struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_atm ref_pair; @@ -65,8 +62,6 @@ typedef struct grpc_lb_policy_pick_args { uint32_t initial_metadata_flags; /** Storage for LB token in \a initial_metadata, or NULL if not used */ grpc_linked_mdelem *lb_token_mdelem_storage; - /** Deadline for the call to the LB server */ - gpr_timespec deadline; } grpc_lb_policy_pick_args; struct grpc_lb_policy_vtable { @@ -76,7 +71,8 @@ struct grpc_lb_policy_vtable { /** \see grpc_lb_policy_pick */ int (*pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete); /** \see grpc_lb_policy_cancel_pick */ @@ -153,9 +149,13 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, /** Finds an appropriate subchannel for a call, based on \a pick_args. - \a target will be set to the selected subchannel, or NULL on failure. + \a target will be set to the selected subchannel, or NULL on failure + or when the LB policy decides to drop the call. + Upon success, \a user_data will be set to whatever opaque information may need to be propagated from the LB policy, or NULL if not needed. + \a context will be populated with context to pass to the subchannel + call, if needed. If the pick succeeds and a result is known immediately, a non-zero value will be returned. Otherwise, \a on_complete will be invoked @@ -167,6 +167,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete); /** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c new file mode 100644 index 0000000000..67baa46de7 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -0,0 +1,153 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" + +#include <grpc/support/atm.h> +#include <grpc/support/log.h> + +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/profiling/timers.h" + +static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { + return GRPC_ERROR_NONE; +} + +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) {} + +typedef struct { + // Stats object to update. + grpc_grpclb_client_stats *client_stats; + // State for intercepting send_initial_metadata. + grpc_closure on_complete_for_send; + grpc_closure *original_on_complete_for_send; + bool send_initial_metadata_succeeded; + // State for intercepting recv_initial_metadata. + grpc_closure recv_initial_metadata_ready; + grpc_closure *original_recv_initial_metadata_ready; + bool recv_initial_metadata_succeeded; +} call_data; + +static void on_complete_for_send(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + call_data *calld = arg; + if (error == GRPC_ERROR_NONE) { + calld->send_initial_metadata_succeeded = true; + } + grpc_closure_run(exec_ctx, calld->original_on_complete_for_send, + GRPC_ERROR_REF(error)); +} + +static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + call_data *calld = arg; + if (error == GRPC_ERROR_NONE) { + calld->recv_initial_metadata_succeeded = true; + } + grpc_closure_run(exec_ctx, calld->original_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); +} + +static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_call_element_args *args) { + call_data *calld = elem->call_data; + // Get stats object from context and take a ref. + GPR_ASSERT(args->context != NULL); + GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != NULL); + calld->client_stats = grpc_grpclb_client_stats_ref( + args->context[GRPC_GRPCLB_CLIENT_STATS].value); + // Record call started. + grpc_grpclb_client_stats_add_call_started(calld->client_stats); + return GRPC_ERROR_NONE; +} + +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const grpc_call_final_info *final_info, + grpc_closure *ignored) { + call_data *calld = elem->call_data; + // Record call finished, optionally setting client_failed_to_send and + // received. + grpc_grpclb_client_stats_add_call_finished( + false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */, + !calld->send_initial_metadata_succeeded /* client_failed_to_send */, + calld->recv_initial_metadata_succeeded /* known_received */, + calld->client_stats); + // All done, so unref the stats object. + grpc_grpclb_client_stats_unref(calld->client_stats); +} + +static void start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { + call_data *calld = elem->call_data; + GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0); + // Intercept send_initial_metadata. + if (batch->send_initial_metadata) { + calld->original_on_complete_for_send = batch->on_complete; + grpc_closure_init(&calld->on_complete_for_send, on_complete_for_send, calld, + grpc_schedule_on_exec_ctx); + batch->on_complete = &calld->on_complete_for_send; + } + // Intercept recv_initial_metadata. + if (batch->recv_initial_metadata) { + calld->original_recv_initial_metadata_ready = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + grpc_closure_init(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, calld, + grpc_schedule_on_exec_ctx); + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; + } + // Chain to next filter. + grpc_call_next_op(exec_ctx, elem, batch); + GPR_TIMER_END("clr_start_transport_stream_op_batch", 0); +} + +const grpc_channel_filter grpc_client_load_reporting_filter = { + start_transport_stream_op_batch, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + destroy_call_elem, + 0, // sizeof(channel_data) + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + grpc_channel_next_get_info, + "client_load_reporting"}; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h new file mode 100644 index 0000000000..28b313d874 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" + +extern const grpc_channel_filter grpc_client_load_reporting_filter; + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 9e158a94ad..d2a2856a18 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -95,8 +95,7 @@ headers. Therefore, sockaddr.h must always be included first */ #include "src/core/lib/iomgr/sockaddr.h" -#include <errno.h> - +#include <limits.h> #include <string.h> #include <grpc/byte_buffer_reader.h> @@ -108,13 +107,16 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel_factory.h" +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -126,6 +128,7 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/static_metadata.h" #define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20 @@ -134,7 +137,7 @@ #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 -int grpc_lb_glb_trace = 0; +grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false); /* add lb_token of selected subchannel (address) to the call's initial * metadata */ @@ -147,6 +150,10 @@ static grpc_error *initial_metadata_add_lb_token( lb_token_mdelem_storage, lb_token); } +static void destroy_client_stats(void *arg) { + grpc_grpclb_client_stats_unref(arg); +} + typedef struct wrapped_rr_closure_arg { /* the closure instance using this struct as argument */ grpc_closure wrapper_closure; @@ -163,6 +170,13 @@ typedef struct wrapped_rr_closure_arg { * initial metadata */ grpc_connected_subchannel **target; + /* the context to be populated for the subchannel call */ + grpc_call_context_element *context; + + /* Stats for client-side load reporting. Note that this holds a + * reference, which must be either passed on via context or unreffed. */ + grpc_grpclb_client_stats *client_stats; + /* the LB token associated with the pick */ grpc_mdelem lb_token; @@ -202,8 +216,14 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, (void *)*wc_arg->target, (void *)wc_arg->rr_policy); abort(); } + // Pass on client stats via context. Passes ownership of the reference. + GPR_ASSERT(wc_arg->client_stats != NULL); + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; + } else { + grpc_grpclb_client_stats_unref(wc_arg->client_stats); } - if (grpc_lb_glb_trace) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); @@ -237,6 +257,7 @@ typedef struct pending_pick { static void add_pending_pick(pending_pick **root, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, + grpc_call_context_element *context, grpc_closure *on_complete) { pending_pick *pp = gpr_zalloc(sizeof(*pp)); pp->next = *root; @@ -244,6 +265,7 @@ static void add_pending_pick(pending_pick **root, pp->target = target; pp->wrapped_on_complete_arg.wrapped_closure = on_complete; pp->wrapped_on_complete_arg.target = target; + pp->wrapped_on_complete_arg.context = context; pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; pp->wrapped_on_complete_arg.lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; @@ -287,8 +309,8 @@ typedef struct glb_lb_policy { grpc_client_channel_factory *cc_factory; grpc_channel_args *args; - /** deadline for the LB's call */ - gpr_timespec deadline; + /** timeout in milliseconds for the LB call. 0 means no deadline. */ + int lb_call_timeout_ms; /** for communicating with the LB server */ grpc_channel *lb_channel; @@ -305,6 +327,11 @@ typedef struct glb_lb_policy { * response has arrived. */ grpc_grpclb_serverlist *serverlist; + /** Index into serverlist for next pick. + * If the server at this index is a drop, we return a drop. + * Otherwise, we delegate to the RR policy. */ + size_t serverlist_index; + /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; @@ -316,6 +343,10 @@ typedef struct glb_lb_policy { /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ + + /* Finished sending initial request. */ + grpc_closure lb_on_sent_initial_request; + /* Status from the LB server has been received. This signals the end of the LB * call. */ grpc_closure lb_on_server_status_received; @@ -348,6 +379,23 @@ typedef struct glb_lb_policy { /** LB call retry timer */ grpc_timer lb_call_retry_timer; + + bool initial_request_sent; + bool seen_initial_response; + + /* Stats for client-side load reporting. Should be unreffed and + * recreated whenever lb_call is replaced. */ + grpc_grpclb_client_stats *client_stats; + /* Interval and timer for next client load report. */ + gpr_timespec client_stats_report_interval; + grpc_timer client_load_report_timer; + bool client_load_report_timer_pending; + bool last_client_load_report_counters_were_zero; + /* Closure used for either the load report timer or the callback for + * completion of sending the load report. */ + grpc_closure client_load_report_closure; + /* Client load report message payload. */ + grpc_byte_buffer *client_load_report_payload; } glb_lb_policy; /* Keeps track and reacts to changes in connectivity of the RR instance */ @@ -359,6 +407,9 @@ struct rr_connectivity_data { static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, bool log) { + if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { + return false; + } const grpc_grpclb_ip_address *ip = &server->ip_address; if (server->port >> 16 != 0) { if (log) { @@ -368,7 +419,6 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, } return false; } - if (ip->size != 4 && ip->size != 16) { if (log) { gpr_log(GPR_ERROR, @@ -402,11 +452,12 @@ static const grpc_lb_user_data_vtable lb_token_vtable = { static void parse_server(const grpc_grpclb_server *server, grpc_resolved_address *addr) { + memset(addr, 0, sizeof(*addr)); + if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return; const uint16_t netorder_port = htons((uint16_t)server->port); /* the addresses are given in binary format (a in(6)_addr struct) in * server->ip_address.bytes. */ const grpc_grpclb_ip_address *ip = &server->ip_address; - memset(addr, 0, sizeof(*addr)); if (ip->size == 4) { addr->len = sizeof(struct sockaddr_in); struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr; @@ -531,7 +582,7 @@ static bool update_lb_connectivity_status_locked( GPR_ASSERT(new_rr_state_error == GRPC_ERROR_NONE); } - if (grpc_lb_glb_trace) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.", grpc_connectivity_state_name(new_rr_state), @@ -543,31 +594,74 @@ static bool update_lb_connectivity_status_locked( return true; } -/* perform a pick over \a rr_policy. Given that a pick can return immediately - * (ignoring its completion callback) we need to perform the cleanups this - * callback would be otherwise resposible for */ +/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return + * immediately (ignoring its completion callback), we need to perform the + * cleanups this callback would otherwise be resposible for. + * If \a force_async is true, then we will manually schedule the + * completion callback even if the pick is available immediately. */ static bool pick_from_internal_rr_locked( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy, - const grpc_lb_policy_pick_args *pick_args, + grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, + const grpc_lb_policy_pick_args *pick_args, bool force_async, grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { - GPR_ASSERT(rr_policy != NULL); + // Look at the index into the serverlist to see if we should drop this call. + grpc_grpclb_server *server = + glb_policy->serverlist->servers[glb_policy->serverlist_index++]; + if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { + glb_policy->serverlist_index = 0; // Wrap-around. + } + if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { + // Not using the RR policy, so unref it. + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); + // Update client load reporting stats to indicate the number of + // dropped calls. Note that we have to do this here instead of in + // the client_load_reporting filter, because we do not create a + // subchannel call (and therefore no client_load_reporting filter) + // for dropped calls. + grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats); + grpc_grpclb_client_stats_add_call_finished( + server->drop_for_rate_limiting, server->drop_for_load_balancing, + false /* failed_to_send */, false /* known_received */, + wc_arg->client_stats); + grpc_grpclb_client_stats_unref(wc_arg->client_stats); + if (force_async) { + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + gpr_free(wc_arg->free_when_done); + return false; + } + gpr_free(wc_arg->free_when_done); + return true; + } + // Pick via the RR policy. const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, rr_policy, pick_args, target, (void **)&wc_arg->lb_token, - &wc_arg->wrapper_closure); + exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context, + (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ - if (grpc_lb_glb_trace) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", (intptr_t)wc_arg->rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); - /* add the load reporting initial metadata */ initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); - - gpr_free(wc_arg); + // Pass on client stats via context. Passes ownership of the reference. + GPR_ASSERT(wc_arg->client_stats != NULL); + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; + if (force_async) { + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + gpr_free(wc_arg->free_when_done); + return false; + } + gpr_free(wc_arg->free_when_done); } /* else, the pending pick will be registered and taken care of by the * pending pick list inside the RR policy (glb_policy->rr_policy). @@ -637,7 +731,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, if (!replace_old_rr) { /* dispose of the new RR policy that won't be used after all */ GRPC_LB_POLICY_UNREF(exec_ctx, new_rr_policy, "rr_handover_no_replace"); - if (grpc_lb_glb_trace) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Keeping old RR policy (%p) despite new serverlist: new RR " "policy was in %s connectivity state.", @@ -647,7 +741,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, return; } - if (grpc_lb_glb_trace) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Created RR policy (%p) to replace old RR (%p)", (void *)new_rr_policy, (void *)glb_policy->rr_policy); } @@ -690,12 +784,14 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_policy->pending_picks = pp->next; GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick"); pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy; - if (grpc_lb_glb_trace) { + pp->wrapped_on_complete_arg.client_stats = + grpc_grpclb_client_stats_ref(glb_policy->client_stats); + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, - &pp->pick_args, pp->target, + pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args, + true /* force_async */, pp->target, &pp->wrapped_on_complete_arg); } @@ -704,7 +800,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_policy->pending_pings = pping->next; GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy; - if (grpc_lb_glb_trace) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } @@ -857,16 +953,29 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(uri->path[0] != '\0'); glb_policy->server_name = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); - if (grpc_lb_glb_trace) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.", glb_policy->server_name); } grpc_uri_destroy(uri); glb_policy->cc_factory = args->client_channel_factory; - glb_policy->args = grpc_channel_args_copy(args->args); GPR_ASSERT(glb_policy->cc_factory != NULL); + arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); + glb_policy->lb_call_timeout_ms = + grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX}); + + // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, + // since we use this to trigger the client_load_reporting filter. + grpc_arg new_arg; + new_arg.key = GRPC_ARG_LB_POLICY_NAME; + new_arg.type = GRPC_ARG_STRING; + new_arg.value.string = "grpclb"; + static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; + glb_policy->args = grpc_channel_args_copy_and_add_and_remove( + args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); + grpc_slice_hash_table *targets_info = NULL; /* Create a client channel over them to communicate with a LB service */ char *lb_service_target_addresses = @@ -880,6 +989,8 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_channel_args_destroy(exec_ctx, lb_channel_args); gpr_free(lb_service_target_addresses); if (glb_policy->lb_channel == NULL) { + gpr_free((void *)glb_policy->server_name); + grpc_channel_args_destroy(exec_ctx, glb_policy->args); gpr_free(glb_policy); return NULL; } @@ -895,6 +1006,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GPR_ASSERT(glb_policy->pending_pings == NULL); gpr_free((void *)glb_policy->server_name); grpc_channel_args_destroy(exec_ctx, glb_policy->args); + if (glb_policy->client_stats != NULL) { + grpc_grpclb_client_stats_unref(glb_policy->client_stats); + } grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); @@ -1011,7 +1125,8 @@ static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { if (pick_args->lb_token_mdelem_storage == NULL) { *target = NULL; @@ -1023,11 +1138,10 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - glb_policy->deadline = pick_args->deadline; bool pick_done; if (glb_policy->rr_policy != NULL) { - if (grpc_lb_glb_trace) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p", (void *)glb_policy, (void *)glb_policy->rr_policy); } @@ -1039,20 +1153,25 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_schedule_on_exec_ctx); wc_arg->rr_policy = glb_policy->rr_policy; wc_arg->target = target; + wc_arg->context = context; + GPR_ASSERT(glb_policy->client_stats != NULL); + wc_arg->client_stats = + grpc_grpclb_client_stats_ref(glb_policy->client_stats); wc_arg->wrapped_closure = on_complete; wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; wc_arg->initial_metadata = pick_args->initial_metadata; wc_arg->free_when_done = wc_arg; - pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, - pick_args, target, wc_arg); + pick_done = + pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args, + false /* force_async */, target, wc_arg); } else { - if (grpc_lb_glb_trace) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, "No RR policy in grpclb instance %p. Adding to grpclb's pending " "picks", (void *)(glb_policy)); } - add_pending_pick(&glb_policy->pending_picks, pick_args, target, + add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, on_complete); if (!glb_policy->started_picking) { @@ -1093,6 +1212,104 @@ static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, exec_ctx, &glb_policy->state_tracker, current, notify); } +static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); + +static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + const gpr_timespec next_client_load_report_time = + gpr_time_add(now, glb_policy->client_stats_report_interval); + grpc_closure_init(&glb_policy->client_load_report_closure, + send_client_load_report_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer, + next_client_load_report_time, + &glb_policy->client_load_report_closure, now); +} + +static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + grpc_byte_buffer_destroy(glb_policy->client_load_report_payload); + glb_policy->client_load_report_payload = NULL; + if (error != GRPC_ERROR_NONE || glb_policy->lb_call == NULL) { + glb_policy->client_load_report_timer_pending = false; + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "client_load_report"); + return; + } + schedule_next_client_load_report(exec_ctx, glb_policy); +} + +static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message.send_message = glb_policy->client_load_report_payload; + grpc_closure_init(&glb_policy->client_load_report_closure, + client_load_report_done_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_call_error call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, &op, 1, + &glb_policy->client_load_report_closure); + GPR_ASSERT(GRPC_CALL_OK == call_error); +} + +static bool load_report_counters_are_zero(grpc_grpclb_request *request) { + return request->client_stats.num_calls_started == 0 && + request->client_stats.num_calls_finished == 0 && + request->client_stats.num_calls_finished_with_drop_for_rate_limiting == + 0 && + request->client_stats + .num_calls_finished_with_drop_for_load_balancing == 0 && + request->client_stats.num_calls_finished_with_client_failed_to_send == + 0 && + request->client_stats.num_calls_finished_known_received == 0; +} + +static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == NULL) { + glb_policy->client_load_report_timer_pending = false; + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "client_load_report"); + return; + } + // Construct message payload. + GPR_ASSERT(glb_policy->client_load_report_payload == NULL); + grpc_grpclb_request *request = + grpc_grpclb_load_report_request_create(glb_policy->client_stats); + // Skip client load report if the counters were all zero in the last + // report and they are still zero in this one. + if (load_report_counters_are_zero(request)) { + if (glb_policy->last_client_load_report_counters_were_zero) { + grpc_grpclb_request_destroy(request); + schedule_next_client_load_report(exec_ctx, glb_policy); + return; + } + glb_policy->last_client_load_report_counters_were_zero = true; + } else { + glb_policy->last_client_load_report_counters_were_zero = false; + } + grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); + glb_policy->client_load_report_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_slice_unref_internal(exec_ctx, request_payload_slice); + grpc_grpclb_request_destroy(request); + // If we've already sent the initial request, then we can go ahead and + // sent the load report. Otherwise, we need to wait until the initial + // request has been sent to send this + // (see lb_on_sent_initial_request_locked() below). + if (glb_policy->initial_request_sent) { + do_send_client_load_report_locked(exec_ctx, glb_policy); + } +} + +static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error); static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -1107,13 +1324,24 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, * glb_policy->base.interested_parties, which is comprised of the polling * entities from \a client_channel. */ grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name); + gpr_timespec deadline = + glb_policy->lb_call_timeout_ms == 0 + ? gpr_inf_future(GPR_CLOCK_MONOTONIC) + : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(glb_policy->lb_call_timeout_ms, + GPR_TIMESPAN)); glb_policy->lb_call = grpc_channel_create_pollset_set_call( exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, - &host, glb_policy->deadline, NULL); + &host, deadline, NULL); grpc_slice_unref_internal(exec_ctx, host); + if (glb_policy->client_stats != NULL) { + grpc_grpclb_client_stats_unref(glb_policy->client_stats); + } + glb_policy->client_stats = grpc_grpclb_client_stats_create(); + grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv); grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv); @@ -1125,6 +1353,9 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_slice_unref_internal(exec_ctx, request_payload_slice); grpc_grpclb_request_destroy(request); + grpc_closure_init(&glb_policy->lb_on_sent_initial_request, + lb_on_sent_initial_request_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); grpc_closure_init(&glb_policy->lb_on_server_status_received, lb_on_server_status_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false)); @@ -1138,6 +1369,10 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, GRPC_GRPCLB_RECONNECT_JITTER, GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + + glb_policy->initial_request_sent = false; + glb_policy->seen_initial_response = false; + glb_policy->last_client_load_report_counters_were_zero = false; } static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx, @@ -1151,6 +1386,10 @@ static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx, grpc_byte_buffer_destroy(glb_policy->lb_request_payload); grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details); + + if (!glb_policy->client_load_report_timer_pending) { + grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer); + } } /* @@ -1163,7 +1402,7 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, lb_call_init_locked(exec_ctx, glb_policy); - if (grpc_lb_glb_trace) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)", (void *)glb_policy, (void *)glb_policy->lb_call); } @@ -1179,21 +1418,27 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->flags = 0; op->reserved = NULL; op++; - op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata.recv_initial_metadata = &glb_policy->lb_initial_metadata_recv; op->flags = 0; op->reserved = NULL; op++; - GPR_ASSERT(glb_policy->lb_request_payload != NULL); op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message.send_message = glb_policy->lb_request_payload; op->flags = 0; op->reserved = NULL; op++; + /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref + * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); + call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_sent_initial_request); + GPR_ASSERT(GRPC_CALL_OK == call_error); + op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &glb_policy->lb_trailing_metadata_recv; @@ -1225,6 +1470,19 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(GRPC_CALL_OK == call_error); } +static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { + glb_lb_policy *glb_policy = arg; + glb_policy->initial_request_sent = true; + // If we attempted to send a client load report before the initial + // request was sent, send the load report now. + if (glb_policy->client_load_report_payload != NULL) { + do_send_client_load_report_locked(exec_ctx, glb_policy); + } + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "lb_on_response_received_locked"); +} + static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; @@ -1240,58 +1498,91 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload); grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); grpc_byte_buffer_destroy(glb_policy->lb_response_payload); - grpc_grpclb_serverlist *serverlist = - grpc_grpclb_response_parse_serverlist(response_slice); - if (serverlist != NULL) { - GPR_ASSERT(glb_policy->lb_call != NULL); - grpc_slice_unref_internal(exec_ctx, response_slice); - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Serverlist with %lu servers received", - (unsigned long)serverlist->num_servers); - for (size_t i = 0; i < serverlist->num_servers; ++i) { - grpc_resolved_address addr; - parse_server(serverlist->servers[i], &addr); - char *ipport; - grpc_sockaddr_to_string(&ipport, &addr, false); - gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); - gpr_free(ipport); + + grpc_grpclb_initial_response *response = NULL; + if (!glb_policy->seen_initial_response && + (response = grpc_grpclb_initial_response_parse(response_slice)) != + NULL) { + if (response->has_client_stats_report_interval) { + glb_policy->client_stats_report_interval = + gpr_time_max(gpr_time_from_seconds(1, GPR_TIMESPAN), + grpc_grpclb_duration_to_timespec( + &response->client_stats_report_interval)); + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, + "received initial LB response message; " + "client load reporting interval = %" PRId64 ".%09d sec", + glb_policy->client_stats_report_interval.tv_sec, + glb_policy->client_stats_report_interval.tv_nsec); } + /* take a weak ref (won't prevent calling of \a glb_shutdown() if the + * strong ref count goes to zero) to be unref'd in + * send_client_load_report() */ + glb_policy->client_load_report_timer_pending = true; + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); + schedule_next_client_load_report(exec_ctx, glb_policy); + } else if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, + "received initial LB response message; " + "client load reporting NOT enabled"); } + grpc_grpclb_initial_response_destroy(response); + glb_policy->seen_initial_response = true; + } else { + grpc_grpclb_serverlist *serverlist = + grpc_grpclb_response_parse_serverlist(response_slice); + if (serverlist != NULL) { + GPR_ASSERT(glb_policy->lb_call != NULL); + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "Serverlist with %lu servers received", + (unsigned long)serverlist->num_servers); + for (size_t i = 0; i < serverlist->num_servers; ++i) { + grpc_resolved_address addr; + parse_server(serverlist->servers[i], &addr); + char *ipport; + grpc_sockaddr_to_string(&ipport, &addr, false); + gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); + gpr_free(ipport); + } + } - /* update serverlist */ - if (serverlist->num_servers > 0) { - if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { - if (grpc_lb_glb_trace) { + /* update serverlist */ + if (serverlist->num_servers > 0) { + if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, + serverlist)) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, + "Incoming server list identical to current, ignoring."); + } + grpc_grpclb_destroy_serverlist(serverlist); + } else { /* new serverlist */ + if (glb_policy->serverlist != NULL) { + /* dispose of the old serverlist */ + grpc_grpclb_destroy_serverlist(glb_policy->serverlist); + } + /* and update the copy in the glb_lb_policy instance. This + * serverlist instance will be destroyed either upon the next + * update or in glb_destroy() */ + glb_policy->serverlist = serverlist; + glb_policy->serverlist_index = 0; + rr_handover_locked(exec_ctx, glb_policy); + } + } else { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, - "Incoming server list identical to current, ignoring."); + "Received empty server list. Picks will stay pending until " + "a response with > 0 servers is received"); } grpc_grpclb_destroy_serverlist(serverlist); - } else { /* new serverlist */ - if (glb_policy->serverlist != NULL) { - /* dispose of the old serverlist */ - grpc_grpclb_destroy_serverlist(glb_policy->serverlist); - } - /* and update the copy in the glb_lb_policy instance. This serverlist - * instance will be destroyed either upon the next update or in - * glb_destroy() */ - glb_policy->serverlist = serverlist; - - rr_handover_locked(exec_ctx, glb_policy); - } - } else { - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, - "Received empty server list. Picks will stay pending until a " - "response with > 0 servers is received"); } - grpc_grpclb_destroy_serverlist(serverlist); + } else { /* serverlist == NULL */ + gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", + grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); } - } else { /* serverlist == NULL */ - gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", - grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); - grpc_slice_unref_internal(exec_ctx, response_slice); } + grpc_slice_unref_internal(exec_ctx, response_slice); + if (!glb_policy->shutting_down) { /* keep listening for serverlist updates */ op->op = GRPC_OP_RECV_MESSAGE; @@ -1319,7 +1610,7 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, glb_lb_policy *glb_policy = arg; if (!glb_policy->shutting_down) { - if (grpc_lb_glb_trace) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)", (void *)glb_policy); } @@ -1336,7 +1627,7 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(glb_policy->lb_call != NULL); - if (grpc_lb_glb_trace) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { char *status_details = grpc_slice_to_c_string(glb_policy->lb_call_status_details); gpr_log(GPR_DEBUG, @@ -1355,7 +1646,7 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&glb_policy->lb_call_backoff_state, now); - if (grpc_lb_glb_trace) { + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...", (void *)glb_policy); gpr_timespec timeout = gpr_time_sub(next_try, now); @@ -1403,9 +1694,29 @@ grpc_lb_policy_factory *grpc_glb_lb_factory_create() { } /* Plugin registration */ + +// Only add client_load_reporting filter if the grpclb LB policy is used. +static bool maybe_add_client_load_reporting_filter( + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) { + const grpc_channel_args *args = + grpc_channel_stack_builder_get_channel_arguments(builder); + const grpc_arg *channel_arg = + grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME); + if (channel_arg != NULL && channel_arg->type == GRPC_ARG_STRING && + strcmp(channel_arg->value.string, "grpclb") == 0) { + return grpc_channel_stack_builder_append_filter( + builder, (const grpc_channel_filter *)arg, NULL, NULL); + } + return true; +} + void grpc_lb_policy_grpclb_init() { grpc_register_lb_policy(grpc_glb_lb_factory_create()); grpc_register_tracer("glb", &grpc_lb_glb_trace); + grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, + GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_client_load_reporting_filter, + (void *)&grpc_client_load_reporting_filter); } void grpc_lb_policy_grpclb_shutdown() {} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c new file mode 100644 index 0000000000..444c03b9aa --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c @@ -0,0 +1,133 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/channel/channel_args.h" + +#define GRPC_ARG_GRPCLB_CLIENT_STATS "grpc.grpclb_client_stats" + +struct grpc_grpclb_client_stats { + gpr_refcount refs; + gpr_atm num_calls_started; + gpr_atm num_calls_finished; + gpr_atm num_calls_finished_with_drop_for_rate_limiting; + gpr_atm num_calls_finished_with_drop_for_load_balancing; + gpr_atm num_calls_finished_with_client_failed_to_send; + gpr_atm num_calls_finished_known_received; +}; + +grpc_grpclb_client_stats* grpc_grpclb_client_stats_create() { + grpc_grpclb_client_stats* client_stats = gpr_zalloc(sizeof(*client_stats)); + gpr_ref_init(&client_stats->refs, 1); + return client_stats; +} + +grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( + grpc_grpclb_client_stats* client_stats) { + gpr_ref(&client_stats->refs); + return client_stats; +} + +void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) { + if (gpr_unref(&client_stats->refs)) { + gpr_free(client_stats); + } +} + +void grpc_grpclb_client_stats_add_call_started( + grpc_grpclb_client_stats* client_stats) { + gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1); +} + +void grpc_grpclb_client_stats_add_call_finished( + bool finished_with_drop_for_rate_limiting, + bool finished_with_drop_for_load_balancing, + bool finished_with_client_failed_to_send, bool finished_known_received, + grpc_grpclb_client_stats* client_stats) { + gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1); + if (finished_with_drop_for_rate_limiting) { + gpr_atm_full_fetch_add( + &client_stats->num_calls_finished_with_drop_for_rate_limiting, + (gpr_atm)1); + } + if (finished_with_drop_for_load_balancing) { + gpr_atm_full_fetch_add( + &client_stats->num_calls_finished_with_drop_for_load_balancing, + (gpr_atm)1); + } + if (finished_with_client_failed_to_send) { + gpr_atm_full_fetch_add( + &client_stats->num_calls_finished_with_client_failed_to_send, + (gpr_atm)1); + } + if (finished_known_received) { + gpr_atm_full_fetch_add(&client_stats->num_calls_finished_known_received, + (gpr_atm)1); + } +} + +static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) { + *value = (int64_t)gpr_atm_acq_load(counter); + gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value)); +} + +void grpc_grpclb_client_stats_get( + grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, + int64_t* num_calls_finished, + int64_t* num_calls_finished_with_drop_for_rate_limiting, + int64_t* num_calls_finished_with_drop_for_load_balancing, + int64_t* num_calls_finished_with_client_failed_to_send, + int64_t* num_calls_finished_known_received) { + atomic_get_and_reset_counter(num_calls_started, + &client_stats->num_calls_started); + atomic_get_and_reset_counter(num_calls_finished, + &client_stats->num_calls_finished); + atomic_get_and_reset_counter( + num_calls_finished_with_drop_for_rate_limiting, + &client_stats->num_calls_finished_with_drop_for_rate_limiting); + atomic_get_and_reset_counter( + num_calls_finished_with_drop_for_load_balancing, + &client_stats->num_calls_finished_with_drop_for_load_balancing); + atomic_get_and_reset_counter( + num_calls_finished_with_client_failed_to_send, + &client_stats->num_calls_finished_with_client_failed_to_send); + atomic_get_and_reset_counter( + num_calls_finished_known_received, + &client_stats->num_calls_finished_known_received); +} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h new file mode 100644 index 0000000000..0af4a919f8 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h @@ -0,0 +1,65 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H + +#include <stdbool.h> + +#include <grpc/impl/codegen/grpc_types.h> + +typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats; + +grpc_grpclb_client_stats* grpc_grpclb_client_stats_create(); +grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( + grpc_grpclb_client_stats* client_stats); +void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats); + +void grpc_grpclb_client_stats_add_call_started( + grpc_grpclb_client_stats* client_stats); +void grpc_grpclb_client_stats_add_call_finished( + bool finished_with_drop_for_rate_limiting, + bool finished_with_drop_for_load_balancing, + bool finished_with_client_failed_to_send, bool finished_known_received, + grpc_grpclb_client_stats* client_stats); + +void grpc_grpclb_client_stats_get( + grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, + int64_t* num_calls_finished, + int64_t* num_calls_finished_with_drop_for_rate_limiting, + int64_t* num_calls_finished_with_drop_for_load_balancing, + int64_t* num_calls_finished_with_client_failed_to_send, + int64_t* num_calls_finished_known_received); + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index 87549b78f0..90e7c2efe5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -37,58 +37,83 @@ #include <grpc/support/alloc.h> +/* invoked once for every Server in ServerList */ +static bool count_serverlist(pb_istream_t *stream, const pb_field_t *field, + void **arg) { + grpc_grpclb_serverlist *sl = *arg; + grpc_grpclb_server server; + if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) { + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); + return false; + } + ++sl->num_servers; + return true; +} + typedef struct decode_serverlist_arg { - /* The first pass counts the number of servers in the server list. The second - * one allocates and decodes. */ - bool first_pass; /* The decoding callback is invoked once per server in serverlist. Remember * which index of the serverlist are we currently decoding */ size_t decoding_idx; - /* Populated after the first pass. Number of server in the input serverlist */ - size_t num_servers; /* The decoded serverlist */ - grpc_grpclb_server **servers; + grpc_grpclb_serverlist *serverlist; } decode_serverlist_arg; /* invoked once for every Server in ServerList */ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, void **arg) { decode_serverlist_arg *dec_arg = *arg; - if (dec_arg->first_pass) { /* count how many server do we have */ - grpc_grpclb_server server; - if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) { - gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); - return false; - } - dec_arg->num_servers++; - } else { /* second pass. Actually decode. */ - grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server)); - GPR_ASSERT(dec_arg->num_servers > 0); - if (dec_arg->decoding_idx == 0) { /* first iteration of second pass */ - dec_arg->servers = - gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers); - } - if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) { - gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); - return false; - } - dec_arg->servers[dec_arg->decoding_idx++] = server; + GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx); + grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server)); + if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) { + gpr_free(server); + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); + return false; } - + dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server; return true; } grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name) { grpc_grpclb_request *req = gpr_malloc(sizeof(grpc_grpclb_request)); - - req->has_client_stats = 0; /* TODO(dgq): add support for stats once defined */ - req->has_initial_request = 1; - req->initial_request.has_name = 1; + req->has_client_stats = false; + req->has_initial_request = true; + req->initial_request.has_name = true; strncpy(req->initial_request.name, lb_service_name, GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH); return req; } +static void populate_timestamp(gpr_timespec timestamp, + struct _grpc_lb_v1_Timestamp *timestamp_pb) { + timestamp_pb->has_seconds = true; + timestamp_pb->seconds = timestamp.tv_sec; + timestamp_pb->has_nanos = true; + timestamp_pb->nanos = timestamp.tv_nsec; +} + +grpc_grpclb_request *grpc_grpclb_load_report_request_create( + grpc_grpclb_client_stats *client_stats) { + grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request)); + req->has_client_stats = true; + req->client_stats.has_timestamp = true; + populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp); + req->client_stats.has_num_calls_started = true; + req->client_stats.has_num_calls_finished = true; + req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true; + req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true; + req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; + req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; + req->client_stats.has_num_calls_finished_known_received = true; + grpc_grpclb_client_stats_get( + client_stats, &req->client_stats.num_calls_started, + &req->client_stats.num_calls_finished, + &req->client_stats.num_calls_finished_with_drop_for_rate_limiting, + &req->client_stats.num_calls_finished_with_drop_for_load_balancing, + &req->client_stats.num_calls_finished_with_client_failed_to_send, + &req->client_stats.num_calls_finished_known_received); + return req; +} + grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { size_t encoded_length; pb_ostream_t sizestream; @@ -122,6 +147,9 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } + + if (!res.has_initial_response) return NULL; + grpc_grpclb_initial_response *initial_res = gpr_malloc(sizeof(grpc_grpclb_initial_response)); memcpy(initial_res, &res.initial_response, @@ -132,36 +160,38 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( grpc_slice encoded_grpc_grpclb_response) { - bool status; - decode_serverlist_arg arg; pb_istream_t stream = pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response), GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response)); pb_istream_t stream_at_start = stream; + grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); grpc_grpclb_response res; memset(&res, 0, sizeof(grpc_grpclb_response)); - memset(&arg, 0, sizeof(decode_serverlist_arg)); - - res.server_list.servers.funcs.decode = decode_serverlist; - res.server_list.servers.arg = &arg; - arg.first_pass = true; - status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res); + // First pass: count number of servers. + res.server_list.servers.funcs.decode = count_serverlist; + res.server_list.servers.arg = sl; + bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res); if (!status) { + gpr_free(sl); gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } - - arg.first_pass = false; - status = - pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res); - if (!status) { - gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); - return NULL; + // Second pass: populate servers. + if (sl->num_servers > 0) { + sl->servers = gpr_zalloc(sizeof(grpc_grpclb_server *) * sl->num_servers); + decode_serverlist_arg decode_arg; + memset(&decode_arg, 0, sizeof(decode_arg)); + decode_arg.serverlist = sl; + res.server_list.servers.funcs.decode = decode_serverlist; + res.server_list.servers.arg = &decode_arg; + status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, + &res); + if (!status) { + grpc_grpclb_destroy_serverlist(sl); + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); + return NULL; + } } - - grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); - sl->num_servers = arg.num_servers; - sl->servers = arg.servers; if (res.server_list.has_expiration_interval) { sl->expiration_interval = res.server_list.expiration_interval; } @@ -195,7 +225,7 @@ grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy( bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist *lhs, const grpc_grpclb_serverlist *rhs) { - if ((lhs == NULL) || (rhs == NULL)) { + if (lhs == NULL || rhs == NULL) { return false; } if (lhs->num_servers != rhs->num_servers) { @@ -243,6 +273,15 @@ int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, return 0; } +gpr_timespec grpc_grpclb_duration_to_timespec( + grpc_grpclb_duration *duration_pb) { + gpr_timespec duration; + duration.tv_sec = duration_pb->has_seconds ? duration_pb->seconds : 0; + duration.tv_nsec = duration_pb->has_nanos ? duration_pb->nanos : 0; + duration.clock_type = GPR_TIMESPAN; + return duration; +} + void grpc_grpclb_initial_response_destroy( grpc_grpclb_initial_response *response) { gpr_free(response); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index d014b8800c..7f596ce1f1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -36,6 +36,7 @@ #include <grpc/slice_buffer.h> +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" @@ -50,7 +51,7 @@ typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request; typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response; typedef grpc_lb_v1_Server grpc_grpclb_server; typedef grpc_lb_v1_Duration grpc_grpclb_duration; -typedef struct grpc_grpclb_serverlist { +typedef struct { grpc_grpclb_server **servers; size_t num_servers; grpc_grpclb_duration expiration_interval; @@ -58,6 +59,8 @@ typedef struct grpc_grpclb_serverlist { /** Create a request for a gRPC LB service under \a lb_service_name */ grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name); +grpc_grpclb_request *grpc_grpclb_load_report_request_create( + grpc_grpclb_client_stats *client_stats); /** Protocol Buffers v3-encode \a request */ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request); @@ -93,6 +96,9 @@ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist); int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, const grpc_grpclb_duration *rhs); +gpr_timespec grpc_grpclb_duration_to_timespec( + grpc_grpclb_duration *duration_pb); + /** Destroy \a initial_response */ void grpc_grpclb_initial_response_destroy( grpc_grpclb_initial_response *response); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index 2b77cd39b8..b1c5dfc61c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -189,7 +189,8 @@ static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index ff41e61b3e..7ee6ffb787 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -74,7 +74,7 @@ typedef struct round_robin_lb_policy round_robin_lb_policy; -int grpc_lb_round_robin_trace = 0; +grpc_tracer_flag grpc_lb_round_robin_trace = GRPC_TRACER_INITIALIZER(false); /** List of entities waiting for a pick. * @@ -99,26 +99,13 @@ typedef struct pending_pick { grpc_closure *on_complete; } pending_pick; -/** List of subchannels in a connectivity READY state */ -typedef struct ready_list { - grpc_subchannel *subchannel; - /* references namesake entry in subchannel_data */ - void *user_data; - struct ready_list *next; - struct ready_list *prev; -} ready_list; - typedef struct { - /** index within policy->subchannels */ - size_t index; /** backpointer to owning policy */ round_robin_lb_policy *policy; /** subchannel itself */ grpc_subchannel *subchannel; /** notification that connectivity has changed on subchannel */ grpc_closure connectivity_changed_closure; - /** this subchannels current position in subchannel->ready_list */ - ready_list *ready_list_node; /** last observed connectivity. Not updated by * \a grpc_subchannel_notify_on_state_change. Used to determine the previous * state while processing the new state in \a rr_connectivity_changed */ @@ -126,6 +113,10 @@ typedef struct { /** current connectivity state. Updated by \a * grpc_subchannel_notify_on_state_change */ grpc_connectivity_state curr_connectivity_state; + /** connectivity state to be updated by the watcher, not guarded by + * the combiner. Will be moved to curr_connectivity_state inside of + * the combiner by rr_connectivity_changed_locked(). */ + grpc_connectivity_state pending_connectivity_state_unsafe; /** the subchannel's target user data */ void *user_data; /** vtable to operate over \a user_data */ @@ -141,182 +132,106 @@ struct round_robin_lb_policy { /** all our subchannels */ size_t num_subchannels; - subchannel_data **subchannels; + subchannel_data *subchannels; - /** how many subchannels are in TRANSIENT_FAILURE */ + /** how many subchannels are in state READY */ + size_t num_ready; + /** how many subchannels are in state TRANSIENT_FAILURE */ size_t num_transient_failures; - /** how many subchannels are IDLE */ + /** how many subchannels are in state IDLE */ size_t num_idle; /** have we started picking? */ - int started_picking; + bool started_picking; /** are we shutting down? */ - int shutdown; + bool shutdown; /** List of picks that are waiting on connectivity */ pending_pick *pending_picks; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; - /** (Dummy) root of the doubly linked list containing READY subchannels */ - ready_list ready_list; - /** Last pick from the ready list. */ - ready_list *ready_list_last_pick; + // Index into subchannels for last pick. + size_t last_ready_subchannel_index; }; -/** Returns the next subchannel from the connected list or NULL if the list is - * empty. +/** Returns the index into p->subchannels of the next subchannel in + * READY state, or p->num_subchannels if no subchannel is READY. * - * Note that this function does *not* advance p->ready_list_last_pick. Use \a - * advance_last_picked_locked() for that. */ -static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) { - ready_list *selected; - selected = p->ready_list_last_pick->next; - - while (selected != NULL) { - if (selected == &p->ready_list) { - GPR_ASSERT(selected->subchannel == NULL); - /* skip dummy root */ - selected = selected->next; - } else { - GPR_ASSERT(selected->subchannel != NULL); - return selected; - } + * Note that this function does *not* update p->last_ready_subchannel_index. + * The caller must do that if it returns a pick. */ +static size_t get_next_ready_subchannel_index_locked( + const round_robin_lb_policy *p) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, + "[RR: %p] getting next ready subchannel, " + "last_ready_subchannel_index=%lu", + p, (unsigned long)p->last_ready_subchannel_index); } - return NULL; -} - -/** Advance the \a ready_list picking head. */ -static void advance_last_picked_locked(round_robin_lb_policy *p) { - if (p->ready_list_last_pick->next != NULL) { /* non-empty list */ - p->ready_list_last_pick = p->ready_list_last_pick->next; - if (p->ready_list_last_pick == &p->ready_list) { - /* skip dummy root */ - p->ready_list_last_pick = p->ready_list_last_pick->next; + for (size_t i = 0; i < p->num_subchannels; ++i) { + const size_t index = + (i + p->last_ready_subchannel_index + 1) % p->num_subchannels; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] checking index %lu: state=%d", p, + (unsigned long)index, + p->subchannels[index].curr_connectivity_state); + } + if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %lu", + p, (unsigned long)index); + } + return index; } - } else { /* should be an empty list */ - GPR_ASSERT(p->ready_list_last_pick == &p->ready_list); - } - - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, - "[READYLIST, RR: %p] ADVANCED LAST PICK. NOW AT NODE %p (SC %p, " - "CSC %p)", - (void *)p, (void *)p->ready_list_last_pick, - (void *)p->ready_list_last_pick->subchannel, - (void *)grpc_subchannel_get_connected_subchannel( - p->ready_list_last_pick->subchannel)); - } -} - -/** Prepends (relative to the root at p->ready_list) the connected subchannel \a - * csc to the list of ready subchannels. */ -static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, - subchannel_data *sd) { - ready_list *new_elem = gpr_zalloc(sizeof(ready_list)); - new_elem->subchannel = sd->subchannel; - new_elem->user_data = sd->user_data; - if (p->ready_list.prev == NULL) { - /* first element */ - new_elem->next = &p->ready_list; - new_elem->prev = &p->ready_list; - p->ready_list.next = new_elem; - p->ready_list.prev = new_elem; - } else { - new_elem->next = &p->ready_list; - new_elem->prev = p->ready_list.prev; - p->ready_list.prev->next = new_elem; - p->ready_list.prev = new_elem; } - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)", - (void *)new_elem, (void *)sd->subchannel); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p); } - return new_elem; + return p->num_subchannels; } -/** Removes \a node from the list of connected subchannels */ -static void remove_disconnected_sc_locked(round_robin_lb_policy *p, - ready_list *node) { - if (node == NULL) { - return; - } - if (node == p->ready_list_last_pick) { - p->ready_list_last_pick = p->ready_list_last_pick->prev; - } - - /* removing last item */ - if (node->next == &p->ready_list && node->prev == &p->ready_list) { - GPR_ASSERT(p->ready_list.next == node); - GPR_ASSERT(p->ready_list.prev == node); - p->ready_list.next = NULL; - p->ready_list.prev = NULL; - } else { - node->prev->next = node->next; - node->next->prev = node->prev; - } - - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node, - (void *)node->subchannel); +// Sets p->last_ready_subchannel_index to last_ready_index. +static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p, + size_t last_ready_index) { + GPR_ASSERT(last_ready_index < p->num_subchannels); + p->last_ready_subchannel_index = last_ready_index; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "[RR: %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", + (void *)p, (unsigned long)last_ready_index, + (void *)p->subchannels[last_ready_index].subchannel, + (void *)grpc_subchannel_get_connected_subchannel( + p->subchannels[last_ready_index].subchannel)); } - - node->next = NULL; - node->prev = NULL; - node->subchannel = NULL; - - gpr_free(node); -} - -static bool is_ready_list_empty(round_robin_lb_policy *p) { - return p->ready_list.prev == NULL; } static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - ready_list *elem; - - if (grpc_lb_round_robin_trace) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); } - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); - if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable != NULL); - sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + subchannel_data *sd = &p->subchannels[i]; + if (sd->subchannel != NULL) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } } - gpr_free(sd); } - grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); - - elem = p->ready_list.next; - while (elem != NULL && elem != &p->ready_list) { - ready_list *tmp; - tmp = elem->next; - elem->next = NULL; - elem->prev = NULL; - elem->subchannel = NULL; - gpr_free(elem); - elem = tmp; - } - gpr_free(p); } static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - size_t i; - - if (grpc_lb_round_robin_trace) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); } - - p->shutdown = 1; + p->shutdown = true; + pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; @@ -328,10 +243,13 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); - for (i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, - &sd->connectivity_changed_closure); + for (size_t i = 0; i < p->num_subchannels; i++) { + subchannel_data *sd = &p->subchannels[i]; + if (sd->subchannel != NULL) { + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, + NULL, + &sd->connectivity_changed_closure); + } } } @@ -339,8 +257,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target, grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -364,8 +281,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_eq, grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -387,21 +303,16 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void start_picking_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { - size_t i; - p->started_picking = 1; - - for (i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - /* use some sentinel value outside of the range of grpc_connectivity_state - * to signal an undefined previous state. We won't be referring to this - * value again and it'll be overwritten after the first call to - * rr_connectivity_changed */ - sd->prev_connectivity_state = GRPC_CHANNEL_INIT; - sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); + p->started_picking = true; + for (size_t i = 0; i < p->num_subchannels; i++) { + subchannel_data *sd = &p->subchannels[i]; + if (sd->subchannel != NULL) { + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); + } } } @@ -414,39 +325,36 @@ static void rr_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - ready_list *selected; - - if (grpc_lb_round_robin_trace) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); } - - if ((selected = peek_next_connected_locked(p))) { + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + if (next_ready_index < p->num_subchannels) { /* readily available, report right away */ + subchannel_data *sd = &p->subchannels[next_ready_index]; *target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected->subchannel), - "rr_picked"); - + grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked"); if (user_data != NULL) { - *user_data = selected->user_data; + *user_data = sd->user_data; } - if (grpc_lb_round_robin_trace) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, - "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", - (void *)*target, (void *)selected); + "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %lu)", + (void *)*target, (unsigned long)next_ready_index); } /* only advance the last picked pointer if the selection was used */ - advance_last_picked_locked(p); + update_last_ready_subchannel_index_locked(p, next_ready_index); return 1; } else { /* no pick currently available. Save for later in list of pending picks */ if (!p->started_picking) { start_picking_locked(exec_ctx, p); } - pp = gpr_malloc(sizeof(*pp)); + pending_pick *pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->target = target; pp->on_complete = on_complete; @@ -457,25 +365,31 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } } -static void update_state_counters(subchannel_data *sd) { +static void update_state_counters_locked(subchannel_data *sd) { round_robin_lb_policy *p = sd->policy; - - /* update p->num_transient_failures (resp. p->num_idle): if the previous - * state was TRANSIENT_FAILURE (resp. IDLE), decrement - * p->num_transient_failures (resp. p->num_idle). */ - if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { + GPR_ASSERT(p->num_ready > 0); + --p->num_ready; + } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GPR_ASSERT(p->num_transient_failures > 0); --p->num_transient_failures; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { GPR_ASSERT(p->num_idle > 0); --p->num_idle; } + if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + ++p->num_ready; + } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + ++p->num_transient_failures; + } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { + ++p->num_idle; + } } /* sd is the subchannel_data associted with the updated subchannel. * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE * or SHUTDOWN */ -static grpc_connectivity_state update_lb_connectivity_status( +static grpc_connectivity_state update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). @@ -497,7 +411,7 @@ static grpc_connectivity_state update_lb_connectivity_status( * CHECK: p->num_idle == p->num_subchannels. */ round_robin_lb_policy *p = sd->policy; - if (!is_ready_list_empty(p)) { /* 1) READY */ + if (p->num_ready > 0) { /* 1) READY */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); return GRPC_CHANNEL_READY; @@ -531,32 +445,62 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; - pending_pick *pp; - - GRPC_ERROR_REF(error); - + // Now that we're inside the combiner, copy the pending connectivity + // state (which was set by the connectivity state watcher) to + // curr_connectivity_state, which is what we use inside of the combiner. + sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "[RR %p] connectivity changed for subchannel %p: " + "prev_state=%d new_state=%d", + p, sd->subchannel, sd->prev_connectivity_state, + sd->curr_connectivity_state); + } + // If we're shutting down, unref and return. if (p->shutdown) { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - GRPC_ERROR_UNREF(error); return; } - switch (sd->curr_connectivity_state) { - case GRPC_CHANNEL_INIT: - GPR_UNREACHABLE_CODE(return ); - case GRPC_CHANNEL_READY: - /* add the newly connected subchannel to the list of connected ones. - * Note that it goes to the "end of the line". */ - sd->ready_list_node = add_connected_sc_locked(p, sd); + // Update state counters and determine new overall state. + update_state_counters_locked(sd); + sd->prev_connectivity_state = sd->curr_connectivity_state; + grpc_connectivity_state new_connectivity_state = + update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); + // If the new state is SHUTDOWN, unref the subchannel, and if the new + // overall state is SHUTDOWN, clean up. + if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); + sd->subchannel = NULL; + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } + if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + /* the policy is shutting down. Flush all the pending picks... */ + pending_pick *pp; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + gpr_free(pp); + } + } + /* unref the "rr_connectivity" weak ref from start_picking */ + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); + } else { + if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - ready_list *selected = peek_next_connected_locked(p); - GPR_ASSERT(selected != NULL); + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + GPR_ASSERT(next_ready_index < p->num_subchannels); + subchannel_data *selected = &p->subchannels[next_ready_index]; if (p->pending_picks != NULL) { /* if the selected subchannel is going to be used for the pending * picks, update the last picked pointer */ - advance_last_picked_locked(p); + update_last_ready_subchannel_index_locked(p, next_ready_index); } + pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( @@ -565,74 +509,22 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, if (pp->user_data != NULL) { *pp->user_data = selected->user_data; } - if (grpc_lb_round_robin_trace) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, - "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", - (void *)selected->subchannel, (void *)selected); + "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (INDEX %lu)", + (void *)selected->subchannel, + (unsigned long)next_ready_index); } grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } - update_lb_connectivity_status(exec_ctx, sd, error); - sd->prev_connectivity_state = sd->curr_connectivity_state; - /* renew notification: reuses the "rr_connectivity" weak ref */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_IDLE: - ++p->num_idle; - /* fallthrough */ - case GRPC_CHANNEL_CONNECTING: - update_state_counters(sd); - update_lb_connectivity_status(exec_ctx, sd, error); - sd->prev_connectivity_state = sd->curr_connectivity_state; - /* renew notification: reuses the "rr_connectivity" weak ref */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - ++p->num_transient_failures; - /* remove from ready list if still present */ - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - update_lb_connectivity_status(exec_ctx, sd, error); - sd->prev_connectivity_state = sd->curr_connectivity_state; - /* renew notification: reuses the "rr_connectivity" weak ref */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_SHUTDOWN: - update_state_counters(sd); - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - --p->num_subchannels; - GPR_SWAP(subchannel_data *, p->subchannels[sd->index], - p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); - p->subchannels[sd->index]->index = sd->index; - if (update_lb_connectivity_status(exec_ctx, sd, error) == - GRPC_CHANNEL_SHUTDOWN) { - /* the policy is shutting down. Flush all the pending picks... */ - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - } - } - gpr_free(sd); - /* unref the "rr_connectivity" weak ref from start_picking */ - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - break; + } + /* renew notification: reuses the "rr_connectivity" weak ref */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); } - GRPC_ERROR_UNREF(error); } static grpc_connectivity_state rr_check_connectivity_locked( @@ -653,10 +545,10 @@ static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *closure) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - ready_list *selected; - grpc_connected_subchannel *target; - if ((selected = peek_next_connected_locked(p))) { - target = GRPC_CONNECTED_SUBCHANNEL_REF( + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + if (next_ready_index < p->num_subchannels) { + subchannel_data *selected = &p->subchannels[next_ready_index]; + grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), "rr_picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); @@ -707,7 +599,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; - size_t subchannel_idx = 0; + size_t subchannel_index = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { /* Skip balancer addresses, since we only know how to handle backends. */ if (addresses->addresses[i].is_balancer) continue; @@ -723,51 +615,53 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sc_args.args = new_args; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); - if (grpc_lb_round_robin_trace) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, "Created subchannel %p for address uri %s", - (void *)subchannel, address_uri); + gpr_log(GPR_DEBUG, "index %lu: Created subchannel %p for address uri %s", + (unsigned long)subchannel_index, (void *)subchannel, address_uri); gpr_free(address_uri); } grpc_channel_args_destroy(exec_ctx, new_args); if (subchannel != NULL) { - subchannel_data *sd = gpr_zalloc(sizeof(*sd)); - p->subchannels[subchannel_idx] = sd; + subchannel_data *sd = &p->subchannels[subchannel_index]; sd->policy = p; - sd->index = subchannel_idx; sd->subchannel = subchannel; + /* use some sentinel value outside of the range of grpc_connectivity_state + * to signal an undefined previous state. We won't be referring to this + * value again and it'll be overwritten after the first call to + * rr_connectivity_changed */ + sd->prev_connectivity_state = GRPC_CHANNEL_INIT; + sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; sd->user_data_vtable = addresses->user_data_vtable; if (sd->user_data_vtable != NULL) { sd->user_data = sd->user_data_vtable->copy(addresses->addresses[i].user_data); } - ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed_locked, sd, grpc_combiner_scheduler(args->combiner, false)); + ++subchannel_index; } } - if (subchannel_idx == 0) { + if (subchannel_index == 0) { /* couldn't create any subchannel. Bail out */ gpr_free(p->subchannels); gpr_free(p); return NULL; } - p->num_subchannels = subchannel_idx; + p->num_subchannels = subchannel_index; - /* The (dummy node) root of the ready list */ - p->ready_list.subchannel = NULL; - p->ready_list.prev = NULL; - p->ready_list.next = NULL; - p->ready_list_last_pick = &p->ready_list; + // Initialize the last pick index to the last subchannel, so that the + // first pick will start at the beginning of the list. + p->last_ready_subchannel_index = subchannel_index - 1; grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); - if (grpc_lb_round_robin_trace) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels", (void *)p, (unsigned long)p->num_subchannels); } diff --git a/src/core/ext/filters/client_channel/parse_address.c b/src/core/ext/filters/client_channel/parse_address.c index edc6ce697d..18381eec55 100644 --- a/src/core/ext/filters/client_channel/parse_address.c +++ b/src/core/ext/filters/client_channel/parse_address.c @@ -57,11 +57,11 @@ bool grpc_parse_unix(const grpc_uri *uri, struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr; const size_t maxlen = sizeof(un->sun_path); const size_t path_len = strnlen(uri->path, maxlen); - if (path_len == maxlen) return 0; + if (path_len == maxlen) return false; un->sun_family = AF_UNIX; strcpy(un->sun_path, uri->path); resolved_addr->len = sizeof(*un); - return 1; + return true; } #else /* GRPC_HAVE_UNIX_SOCKET */ @@ -73,74 +73,65 @@ bool grpc_parse_unix(const grpc_uri *uri, #endif /* GRPC_HAVE_UNIX_SOCKET */ -bool grpc_parse_ipv4(const grpc_uri *uri, - grpc_resolved_address *resolved_addr) { - if (strcmp("ipv4", uri->scheme) != 0) { - gpr_log(GPR_ERROR, "Expected 'ipv4' scheme, got '%s'", uri->scheme); - return false; - } - const char *host_port = uri->path; +bool grpc_parse_ipv4_hostport(const char *hostport, grpc_resolved_address *addr, + bool log_errors) { + bool success = false; + // Split host and port. char *host; char *port; - int port_num; - bool result = false; - struct sockaddr_in *in = (struct sockaddr_in *)resolved_addr->addr; - - if (*host_port == '/') ++host_port; - if (!gpr_split_host_port(host_port, &host, &port)) { - return false; - } - - memset(resolved_addr, 0, sizeof(grpc_resolved_address)); - resolved_addr->len = sizeof(struct sockaddr_in); + if (!gpr_split_host_port(hostport, &host, &port)) return false; + // Parse IP address. + memset(addr, 0, sizeof(*addr)); + addr->len = sizeof(struct sockaddr_in); + struct sockaddr_in *in = (struct sockaddr_in *)addr->addr; in->sin_family = AF_INET; if (inet_pton(AF_INET, host, &in->sin_addr) == 0) { - gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); + if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); goto done; } - - if (port != NULL) { - if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || - port_num > 65535) { - gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); - goto done; - } - in->sin_port = htons((uint16_t)port_num); - } else { - gpr_log(GPR_ERROR, "no port given for ipv4 scheme"); + // Parse port. + if (port == NULL) { + if (log_errors) gpr_log(GPR_ERROR, "no port given for ipv4 scheme"); goto done; } - - result = true; + int port_num; + if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || port_num > 65535) { + if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); + goto done; + } + in->sin_port = htons((uint16_t)port_num); + success = true; done: gpr_free(host); gpr_free(port); - return result; + return success; } -bool grpc_parse_ipv6(const grpc_uri *uri, +bool grpc_parse_ipv4(const grpc_uri *uri, grpc_resolved_address *resolved_addr) { - if (strcmp("ipv6", uri->scheme) != 0) { - gpr_log(GPR_ERROR, "Expected 'ipv6' scheme, got '%s'", uri->scheme); + if (strcmp("ipv4", uri->scheme) != 0) { + gpr_log(GPR_ERROR, "Expected 'ipv4' scheme, got '%s'", uri->scheme); return false; } const char *host_port = uri->path; - char *host; - char *port; - int port_num; - int result = 0; - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)resolved_addr->addr; - if (*host_port == '/') ++host_port; - if (!gpr_split_host_port(host_port, &host, &port)) { - return 0; - } + return grpc_parse_ipv4_hostport(host_port, resolved_addr, + true /* log_errors */); +} - memset(in6, 0, sizeof(*in6)); - resolved_addr->len = sizeof(*in6); +bool grpc_parse_ipv6_hostport(const char *hostport, grpc_resolved_address *addr, + bool log_errors) { + bool success = false; + // Split host and port. + char *host; + char *port; + if (!gpr_split_host_port(hostport, &host, &port)) return false; + // Parse IP address. + memset(addr, 0, sizeof(*addr)); + addr->len = sizeof(struct sockaddr_in6); + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr->addr; in6->sin6_family = AF_INET6; - - /* Handle the RFC6874 syntax for IPv6 zone identifiers. */ + // Handle the RFC6874 syntax for IPv6 zone identifiers. char *host_end = (char *)gpr_memrchr(host, '%', strlen(host)); if (host_end != NULL) { GPR_ASSERT(host_end >= host); @@ -159,7 +150,7 @@ bool grpc_parse_ipv6(const grpc_uri *uri, gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1); goto done; } - // Handle "sin6_scope_id" being type "u_long". See grpc issue ##10027. + // Handle "sin6_scope_id" being type "u_long". See grpc issue #10027. in6->sin6_scope_id = sin6_scope_id; } else { if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { @@ -167,24 +158,34 @@ bool grpc_parse_ipv6(const grpc_uri *uri, goto done; } } - - if (port != NULL) { - if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || - port_num > 65535) { - gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); - goto done; - } - in6->sin6_port = htons((uint16_t)port_num); - } else { - gpr_log(GPR_ERROR, "no port given for ipv6 scheme"); + // Parse port. + if (port == NULL) { + if (log_errors) gpr_log(GPR_ERROR, "no port given for ipv6 scheme"); goto done; } - - result = 1; + int port_num; + if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || port_num > 65535) { + if (log_errors) gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); + goto done; + } + in6->sin6_port = htons((uint16_t)port_num); + success = true; done: gpr_free(host); gpr_free(port); - return result; + return success; +} + +bool grpc_parse_ipv6(const grpc_uri *uri, + grpc_resolved_address *resolved_addr) { + if (strcmp("ipv6", uri->scheme) != 0) { + gpr_log(GPR_ERROR, "Expected 'ipv6' scheme, got '%s'", uri->scheme); + return false; + } + const char *host_port = uri->path; + if (*host_port == '/') ++host_port; + return grpc_parse_ipv6_hostport(host_port, resolved_addr, + true /* log_errors */); } bool grpc_parse_uri(const grpc_uri *uri, grpc_resolved_address *resolved_addr) { diff --git a/src/core/ext/filters/client_channel/parse_address.h b/src/core/ext/filters/client_channel/parse_address.h index fa7ea33a00..1a203a3b26 100644 --- a/src/core/ext/filters/client_channel/parse_address.h +++ b/src/core/ext/filters/client_channel/parse_address.h @@ -54,4 +54,10 @@ bool grpc_parse_ipv6(const grpc_uri *uri, grpc_resolved_address *resolved_addr); /** Populate \a resolved_addr from \a uri. Returns true upon success. */ bool grpc_parse_uri(const grpc_uri *uri, grpc_resolved_address *resolved_addr); +/** Parse bare IPv4 or IPv6 "IP:port" strings. */ +bool grpc_parse_ipv4_hostport(const char *hostport, grpc_resolved_address *addr, + bool log_errors); +bool grpc_parse_ipv6_hostport(const char *hostport, grpc_resolved_address *addr, + bool log_errors); + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PARSE_ADDRESS_H */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c index ffaeeed324..578e8d697f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c @@ -61,6 +61,8 @@ typedef struct { /** base class: must be first */ grpc_resolver base; + /** DNS server to use (if not system default) */ + char *dns_server; /** name to resolve (usually the same as target_name) */ char *name_to_resolve; /** default port to use */ @@ -172,6 +174,8 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses_destroy(r->addresses); grpc_lb_addresses_destroy(exec_ctx, addresses); } else { + const char *msg = grpc_error_string(error); + gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); gpr_timespec timeout = gpr_time_sub(next_try, now); @@ -221,9 +225,9 @@ static void dns_ares_start_resolving_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!r->resolving); r->resolving = true; r->addresses = NULL; - grpc_resolve_address(exec_ctx, r->name_to_resolve, r->default_port, - r->interested_parties, &r->dns_ares_on_resolved_locked, - &r->addresses); + grpc_dns_lookup_ares(exec_ctx, r->dns_server, r->name_to_resolve, + r->default_port, r->interested_parties, + &r->dns_ares_on_resolved_locked, &r->addresses); } static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, @@ -246,6 +250,7 @@ static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { grpc_channel_args_destroy(exec_ctx, r->resolved_result); } grpc_pollset_set_destroy(exec_ctx, r->interested_parties); + gpr_free(r->dns_server); gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); @@ -257,14 +262,13 @@ static grpc_resolver *dns_ares_create(grpc_exec_ctx *exec_ctx, const char *default_port) { // Get name from args. const char *path = args->uri->path; - if (0 != strcmp(args->uri->authority, "")) { - gpr_log(GPR_ERROR, "authority based dns uri's not supported"); - return NULL; - } if (path[0] == '/') ++path; // Create resolver. ares_dns_resolver *r = gpr_zalloc(sizeof(ares_dns_resolver)); grpc_resolver_init(&r->base, &dns_ares_resolver_vtable, args->combiner); + if (0 != strcmp(args->uri->authority, "")) { + r->dns_server = gpr_strdup(args->uri->authority); + } r->name_to_resolve = gpr_strdup(path); r->default_port = gpr_strdup(default_port); r->channel_args = grpc_channel_args_copy(args->args); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c index 09c46a66e0..e0cfd8b629 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -48,7 +48,10 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> + +#include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" +#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -58,6 +61,8 @@ static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; typedef struct grpc_ares_request { + /** indicates the DNS server to use, if specified */ + struct ares_addr_port_node dns_server_addr; /** following members are set in grpc_resolve_address_ares_impl */ /** host to resolve, parsed from the name to resolve */ char *host; @@ -192,11 +197,12 @@ static void on_done_cb(void *arg, int status, int timeouts, grpc_ares_request_unref(NULL, r); } -void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, - const char *default_port, - grpc_pollset_set *interested_parties, - grpc_closure *on_done, - grpc_resolved_addresses **addrs) { +void grpc_dns_lookup_ares(grpc_exec_ctx *exec_ctx, const char *dns_server, + const char *name, const char *default_port, + grpc_pollset_set *interested_parties, + grpc_closure *on_done, + grpc_resolved_addresses **addrs) { + grpc_error *error = GRPC_ERROR_NONE; /* TODO(zyc): Enable tracing after #9603 is checked in */ /* if (grpc_dns_trace) { gpr_log(GPR_DEBUG, "resolve_address (blocking): name=%s, default_port=%s", @@ -208,28 +214,23 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, char *port; gpr_split_host_port(name, &host, &port); if (host == NULL) { - grpc_error *err = grpc_error_set_str( + error = grpc_error_set_str( GRPC_ERROR_CREATE_FROM_STATIC_STRING("unparseable host:port"), GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); - grpc_closure_sched(exec_ctx, on_done, err); goto error_cleanup; } else if (port == NULL) { if (default_port == NULL) { - grpc_error *err = grpc_error_set_str( + error = grpc_error_set_str( GRPC_ERROR_CREATE_FROM_STATIC_STRING("no port in name"), GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); - grpc_closure_sched(exec_ctx, on_done, err); goto error_cleanup; } port = gpr_strdup(default_port); } grpc_ares_ev_driver *ev_driver; - grpc_error *err = grpc_ares_ev_driver_create(&ev_driver, interested_parties); - if (err != GRPC_ERROR_NONE) { - GRPC_LOG_IF_ERROR("grpc_ares_ev_driver_create() failed", err); - goto error_cleanup; - } + error = grpc_ares_ev_driver_create(&ev_driver, interested_parties); + if (error != GRPC_ERROR_NONE) goto error_cleanup; grpc_ares_request *r = gpr_malloc(sizeof(grpc_ares_request)); gpr_mu_init(&r->mu); @@ -242,6 +243,40 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, r->success = false; r->error = GRPC_ERROR_NONE; ares_channel *channel = grpc_ares_ev_driver_get_channel(r->ev_driver); + + // If dns_server is specified, use it. + if (dns_server != NULL) { + gpr_log(GPR_INFO, "Using DNS server %s", dns_server); + grpc_resolved_address addr; + if (grpc_parse_ipv4_hostport(dns_server, &addr, false /* log_errors */)) { + r->dns_server_addr.family = AF_INET; + memcpy(&r->dns_server_addr.addr.addr4, addr.addr, addr.len); + r->dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr); + r->dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr); + } else if (grpc_parse_ipv6_hostport(dns_server, &addr, + false /* log_errors */)) { + r->dns_server_addr.family = AF_INET6; + memcpy(&r->dns_server_addr.addr.addr6, addr.addr, addr.len); + r->dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr); + r->dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr); + } else { + error = grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("cannot parse authority"), + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); + goto error_cleanup; + } + int status = ares_set_servers_ports(*channel, &r->dns_server_addr); + if (status != ARES_SUCCESS) { + char *error_msg; + gpr_asprintf(&error_msg, "C-ares status is not ARES_SUCCESS: %s", + ares_strerror(status)); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg); + gpr_free(error_msg); + goto error_cleanup; + } + } + // An extra reference is put here to avoid destroying the request in + // on_done_cb before calling grpc_ares_ev_driver_start. gpr_ref_init(&r->pending_queries, 2); if (grpc_ipv6_loopback_available()) { gpr_ref(&r->pending_queries); @@ -254,10 +289,20 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, return; error_cleanup: + grpc_closure_sched(exec_ctx, on_done, error); gpr_free(host); gpr_free(port); } +void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, + const char *default_port, + grpc_pollset_set *interested_parties, + grpc_closure *on_done, + grpc_resolved_addresses **addrs) { + grpc_dns_lookup_ares(exec_ctx, NULL /* dns_server */, name, default_port, + interested_parties, on_done, addrs); +} + void (*grpc_resolve_address_ares)( grpc_exec_ctx *exec_ctx, const char *name, const char *default_port, grpc_pollset_set *interested_parties, grpc_closure *on_done, diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index 3dd40ea268..84fd7fcbd6 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -51,6 +51,12 @@ extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_resolved_addresses **addresses); +void grpc_dns_lookup_ares(grpc_exec_ctx *exec_ctx, const char *dns_server, + const char *addr, const char *default_port, + grpc_pollset_set *interested_parties, + grpc_closure *on_done, + grpc_resolved_addresses **addresses); + /* Initialize gRPC ares wrapper. Must be called at least once before grpc_resolve_address_ares(). */ grpc_error *grpc_ares_init(void); diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index b2de85c4a1..dd14bf1d02 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -283,6 +283,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; + // add a weak ref and subtract a strong ref (atomically) old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { @@ -656,7 +657,6 @@ static bool publish_transport_locked(grpc_exec_ctx *exec_ctx, gpr_free(sw_subchannel); grpc_channel_stack_destroy(exec_ctx, stk); gpr_free(con); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); return false; } @@ -781,7 +781,7 @@ grpc_error *grpc_connected_subchannel_create_call( (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); const grpc_call_element_args call_args = {.call_stack = callstk, .server_transport_data = NULL, - .context = NULL, + .context = args->context, .path = args->path, .start_time = args->start_time, .deadline = args->deadline, diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 6473de49b0..e433c33e40 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -119,6 +119,7 @@ typedef struct { gpr_timespec start_time; gpr_timespec deadline; gpr_arena *arena; + grpc_call_context_element *context; } grpc_connected_subchannel_call_args; grpc_error *grpc_connected_subchannel_create_call( diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c index f6ef4a845e..b25dbfcf51 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.c +++ b/src/core/ext/filters/client_channel/subchannel_index.c @@ -183,8 +183,11 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, enter_ctx(exec_ctx); grpc_subchannel *c = NULL; + bool need_to_unref_constructed; while (c == NULL) { + need_to_unref_constructed = false; + // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); @@ -194,8 +197,11 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, // - Check to see if a subchannel already exists c = gpr_avl_get(index, key); if (c != NULL) { + c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register"); + } + if (c != NULL) { // yes -> we're done - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, constructed, "index_register"); + need_to_unref_constructed = true; } else { // no -> update the avl and compare/swap gpr_avl updated = @@ -219,6 +225,10 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, leave_ctx(exec_ctx); + if (need_to_unref_constructed) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, constructed, "index_register"); + } + return c; } diff --git a/src/core/ext/filters/http/http_filters_plugin.c b/src/core/ext/filters/http/http_filters_plugin.c index 195a1a8119..856a7dbd91 100644 --- a/src/core/ext/filters/http/http_filters_plugin.c +++ b/src/core/ext/filters/http/http_filters_plugin.c @@ -37,6 +37,7 @@ #include "src/core/ext/filters/http/message_compress/message_compress_filter.h" #include "src/core/ext/filters/http/server/http_server_filter.h" #include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/transport_impl.h" diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index 1da8cf69cb..5a54a6ed15 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -47,6 +47,7 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" +#include "src/core/lib/surface/call.h" #include "src/core/lib/transport/static_metadata.h" #define INITIAL_METADATA_UNSEEN 0 @@ -197,7 +198,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { - if (grpc_compression_trace) { + if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name; const size_t before_size = calld->slices.length; const size_t after_size = tmp.length; @@ -211,7 +212,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_swap(&calld->slices, &tmp); calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { - if (grpc_compression_trace) { + if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name; GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, &algo_name)); diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.h b/src/core/ext/filters/http/message_compress/message_compress_filter.h index 75bfa17fba..135da4da62 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.h +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.h @@ -38,8 +38,6 @@ #include "src/core/lib/channel/channel_stack.h" -extern int grpc_compression_trace; - /** Compression filter for outgoing data. * * See <grpc/compression.h> for the available compression settings. diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c index ff857878e4..9e495f4d42 100644 --- a/src/core/ext/filters/http/server/http_server_filter.c +++ b/src/core/ext/filters/http/server/http_server_filter.c @@ -46,8 +46,6 @@ #define EXPECTED_CONTENT_TYPE "application/grpc" #define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1 -extern int grpc_http_trace; - typedef struct call_data { grpc_linked_mdelem status; grpc_linked_mdelem content_type; diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c new file mode 100644 index 0000000000..7fb75e3a4f --- /dev/null +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c @@ -0,0 +1,223 @@ +// +// Copyright 2017, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#include "src/core/ext/filters/workarounds/workaround_cronet_compression_filter.h" + +#include <string.h> + +#include <grpc/support/alloc.h> + +#include "src/core/ext/filters/workarounds/workaround_utils.h" +#include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/surface/channel_init.h" +#include "src/core/lib/transport/metadata.h" + +typedef struct call_data { + // Receive closures are chained: we inject this closure as the + // recv_initial_metadata_ready up-call on transport_stream_op, and remember to + // call our next_recv_initial_metadata_ready member after handling it. + grpc_closure recv_initial_metadata_ready; + // Used by recv_initial_metadata_ready. + grpc_metadata_batch* recv_initial_metadata; + // Original recv_initial_metadata_ready callback, invoked after our own. + grpc_closure* next_recv_initial_metadata_ready; + + // Marks whether the workaround is active + bool workaround_active; +} call_data; + +// Find the user agent metadata element in the batch +static bool get_user_agent_mdelem(const grpc_metadata_batch* batch, + grpc_mdelem* md) { + if (batch->idx.named.user_agent != NULL) { + *md = batch->idx.named.user_agent->md; + return true; + } + return false; +} + +// Callback invoked when we receive an initial metadata. +static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, + void* user_data, grpc_error* error) { + grpc_call_element* elem = user_data; + call_data* calld = elem->call_data; + + if (GRPC_ERROR_NONE == error) { + grpc_mdelem md; + if (get_user_agent_mdelem(calld->recv_initial_metadata, &md)) { + grpc_workaround_user_agent_md* user_agent_md = grpc_parse_user_agent(md); + if (user_agent_md + ->workaround_active[GRPC_WORKAROUND_ID_CRONET_COMPRESSION]) { + calld->workaround_active = true; + } + } + } + + // Invoke the next callback. + grpc_closure_run(exec_ctx, calld->next_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); +} + +// Start transport stream op. +static void start_transport_stream_op_batch( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* op) { + call_data* calld = elem->call_data; + + // Inject callback for receiving initial metadata + if (op->recv_initial_metadata) { + calld->next_recv_initial_metadata_ready = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; + } + + if (op->send_message) { + /* Send message happens after client's user-agent (initial metadata) is + * received, so workaround_active must be set already */ + if (calld->workaround_active) { + op->payload->send_message.send_message->flags |= GRPC_WRITE_NO_COMPRESS; + } + } + + // Chain to the next filter. + grpc_call_next_op(exec_ctx, elem, op); +} + +// Constructor for call_data. +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + const grpc_call_element_args* args) { + call_data* calld = elem->call_data; + calld->next_recv_initial_metadata_ready = NULL; + calld->workaround_active = false; + grpc_closure_init(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, elem, + grpc_schedule_on_exec_ctx); + return GRPC_ERROR_NONE; +} + +// Destructor for call_data. +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* ignored) {} + +// Constructor for channel_data. +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, + grpc_channel_element_args* args) { + return GRPC_ERROR_NONE; +} + +// Destructor for channel_data. +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) {} + +// Parse the user agent +static bool parse_user_agent(grpc_mdelem md) { + const char grpc_objc_specifier[] = "grpc-objc/"; + const size_t grpc_objc_specifier_len = sizeof(grpc_objc_specifier) - 1; + const char cronet_specifier[] = "cronet_http"; + const size_t cronet_specifier_len = sizeof(cronet_specifier) - 1; + + char* user_agent_str = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + bool grpc_objc_specifier_seen = false; + bool cronet_specifier_seen = false; + char *major_version_str = user_agent_str, *minor_version_str; + long major_version, minor_version; + + char* head = strtok(user_agent_str, " "); + while (head != NULL) { + if (!grpc_objc_specifier_seen && + 0 == strncmp(head, grpc_objc_specifier, grpc_objc_specifier_len)) { + major_version_str = head + grpc_objc_specifier_len; + grpc_objc_specifier_seen = true; + } else if (grpc_objc_specifier_seen && + 0 == strncmp(head, cronet_specifier, cronet_specifier_len)) { + cronet_specifier_seen = true; + break; + } + + head = strtok(NULL, " "); + } + if (grpc_objc_specifier_seen) { + major_version_str = strtok(major_version_str, "."); + minor_version_str = strtok(NULL, "."); + major_version = atol(major_version_str); + minor_version = atol(minor_version_str); + } + + gpr_free(user_agent_str); + return (grpc_objc_specifier_seen && cronet_specifier_seen && + (major_version < 1 || (major_version == 1 && minor_version <= 3))); +} + +const grpc_channel_filter grpc_workaround_cronet_compression_filter = { + start_transport_stream_op_batch, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + destroy_call_elem, + 0, + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + grpc_channel_next_get_info, + "workaround_cronet_compression"}; + +static bool register_workaround_cronet_compression( + grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, void* arg) { + const grpc_channel_args* channel_args = + grpc_channel_stack_builder_get_channel_arguments(builder); + const grpc_arg* a = grpc_channel_args_find( + channel_args, GRPC_ARG_WORKAROUND_CRONET_COMPRESSION); + if (a == NULL) { + return true; + } + if (grpc_channel_arg_get_bool(a, false) == false) { + return true; + } + return grpc_channel_stack_builder_prepend_filter( + builder, &grpc_workaround_cronet_compression_filter, NULL, NULL); +} + +void grpc_workaround_cronet_compression_filter_init(void) { + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_WORKAROUND_PRIORITY_HIGH, + register_workaround_cronet_compression, NULL); + grpc_register_workaround(GRPC_WORKAROUND_ID_CRONET_COMPRESSION, + parse_user_agent); +} + +void grpc_workaround_cronet_compression_filter_shutdown(void) {} diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.h b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.h new file mode 100644 index 0000000000..58c79a0c00 --- /dev/null +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.h @@ -0,0 +1,40 @@ +// +// Copyright 2017, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#ifndef GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_CRONET_COMPRESSION_FILTER_H +#define GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_CRONET_COMPRESSION_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" + +extern const grpc_channel_filter grpc_workaround_cronet_compression_filter; + +#endif /* GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_CRONET_COMPRESSION_FILTER_H \ + */ diff --git a/src/core/ext/filters/workarounds/workaround_utils.c b/src/core/ext/filters/workarounds/workaround_utils.c new file mode 100644 index 0000000000..1c565388e1 --- /dev/null +++ b/src/core/ext/filters/workarounds/workaround_utils.c @@ -0,0 +1,65 @@ +// +// Copyright 2017, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#include "src/core/ext/filters/workarounds/workaround_utils.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +user_agent_parser ua_parser[GRPC_MAX_WORKAROUND_ID]; + +static void destroy_user_agent_md(void *user_agent_md) { + gpr_free(user_agent_md); +} + +grpc_workaround_user_agent_md *grpc_parse_user_agent(grpc_mdelem md) { + grpc_workaround_user_agent_md *user_agent_md = + (grpc_workaround_user_agent_md *)grpc_mdelem_get_user_data( + md, destroy_user_agent_md); + + if (NULL != user_agent_md) { + return user_agent_md; + } + user_agent_md = gpr_malloc(sizeof(grpc_workaround_user_agent_md)); + for (int i = 0; i < GRPC_MAX_WORKAROUND_ID; i++) { + if (ua_parser[i]) { + user_agent_md->workaround_active[i] = ua_parser[i](md); + } + } + grpc_mdelem_set_user_data(md, destroy_user_agent_md, (void *)user_agent_md); + + return user_agent_md; +} + +void grpc_register_workaround(uint32_t id, user_agent_parser parser) { + GPR_ASSERT(id < GRPC_MAX_WORKAROUND_ID); + ua_parser[id] = parser; +} diff --git a/src/core/ext/filters/workarounds/workaround_utils.h b/src/core/ext/filters/workarounds/workaround_utils.h new file mode 100644 index 0000000000..7cd70c12d8 --- /dev/null +++ b/src/core/ext/filters/workarounds/workaround_utils.h @@ -0,0 +1,52 @@ +// +// Copyright 2017, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#ifndef GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_UTILS_H +#define GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_UTILS_H + +#include <grpc/support/workaround_list.h> + +#include "src/core/lib/transport/metadata.h" + +#define GRPC_WORKAROUND_PRIORITY_HIGH 10001 +#define GRPC_WORKAROUND_PROIRITY_LOW 9999 + +typedef struct grpc_workaround_user_agent_md { + bool workaround_active[GRPC_MAX_WORKAROUND_ID]; +} grpc_workaround_user_agent_md; + +grpc_workaround_user_agent_md *grpc_parse_user_agent(grpc_mdelem md); + +typedef bool (*user_agent_parser)(grpc_mdelem); + +void grpc_register_workaround(uint32_t id, user_agent_parser parser); + +#endif diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 9c8505ddfa..ad674b8eb4 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -101,7 +101,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, void *reserved) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE( - "grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3, + "grpc_insecure_channel_create(target=%s, args=%p, reserved=%p)", 3, (target, args, reserved)); GPR_ASSERT(reserved == NULL); // Add channel arg containing the client channel factory. diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index d6b79bd492..f3268bcfca 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -89,8 +89,8 @@ static bool g_default_keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; #define MAX_CLIENT_STREAM_ID 0x7fffffffu -int grpc_http_trace = 0; -int grpc_flowctl_trace = 0; +grpc_tracer_flag grpc_http_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_flowctl_trace = GRPC_TRACER_INITIALIZER(false); static const grpc_transport_vtable vtable; @@ -884,14 +884,23 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, GPR_TIMER_BEGIN("write_action_begin_locked", 0); grpc_chttp2_transport *t = gt; GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); - if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) { - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, - "begin writing"); - grpc_closure_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE); - } else { - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, - "begin writing nothing"); - GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); + switch (t->closed ? GRPC_CHTTP2_NOTHING_TO_WRITE + : grpc_chttp2_begin_write(exec_ctx, t)) { + case GRPC_CHTTP2_NOTHING_TO_WRITE: + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, + "begin writing nothing"); + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); + break; + case GRPC_CHTTP2_PARTIAL_WRITE: + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, + "begin writing partial"); + grpc_closure_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE); + break; + case GRPC_CHTTP2_FULL_WRITE: + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, + "begin writing"); + grpc_closure_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE); + break; } GPR_TIMER_END("write_action_begin_locked", 0); } @@ -988,7 +997,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, t->seen_goaway = 1; /* When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug - * data equal to “too_many_pings”, it should log the occurrence at a log level + * data equal to "too_many_pings", it should log the occurrence at a log level * that is enabled by default and double the configured KEEPALIVE_TIME used * for new connections on that channel. */ if (t->is_client && goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM && @@ -1095,7 +1104,7 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, return; } closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT; - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { const char *errstr = grpc_error_string(error); gpr_log(GPR_DEBUG, "complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s", @@ -1240,7 +1249,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_transport_stream_op_batch_payload *op_payload = op->payload; grpc_chttp2_transport *t = s->t; - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { char *str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str, op->on_complete); @@ -1483,9 +1492,9 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { char *str = grpc_transport_stream_op_batch_string(op); - gpr_log(GPR_DEBUG, "perform_stream_op[s=%p/%d]: %s", s, s->id, str); + gpr_log(GPR_DEBUG, "perform_stream_op[s=%p]: %s", s, str); gpr_free(str); } @@ -2130,27 +2139,41 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, double bdp_dbl) { - uint32_t bdp; - if (bdp_dbl <= 0) { - bdp = 0; - } else if (bdp_dbl > UINT32_MAX) { - bdp = UINT32_MAX; - } else { - bdp = (uint32_t)(bdp_dbl); - } + // initial window size bounded [1,2^31-1], but we set the min to 128. + int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX); int64_t delta = (int64_t)bdp - (int64_t)t->settings[GRPC_LOCAL_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) { + if (delta == 0 || (delta > -bdp / 10 && delta < bdp / 10)) { return; } - if (grpc_bdp_estimator_trace) { + if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { gpr_log(GPR_DEBUG, "%s: update initial window size to %d", t->peer_string, (int)bdp); } - push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp); - push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, bdp); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + (uint32_t)bdp); +} + +static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + double bw_dbl, double bdp_dbl) { + int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX); + int32_t target = GPR_MAX((int32_t)bw_dbl / 1000, bdp); + // frame size is bounded [2^14,2^24-1] + int32_t frame_size = GPR_CLAMP(target, 16384, 16777215); + int64_t delta = (int64_t)frame_size - + (int64_t)t->settings[GRPC_LOCAL_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; + if (delta == 0 || (delta > -frame_size / 10 && delta < frame_size / 10)) { + return; + } + if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { + gpr_log(GPR_DEBUG, "%s: update max_frame size to %d", t->peer_string, + (int)frame_size); + } + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + (uint32_t)frame_size); } static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, @@ -2269,6 +2292,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, } int64_t estimate = -1; + double bdp_guess = -1; if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) { double target = 1 + log2((double)estimate); double memory_pressure = grpc_resource_quota_get_memory_pressure( @@ -2286,9 +2310,15 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, } double log2_bdp_guess = grpc_pid_controller_update(&t->pid_controller, bdp_error, dt); - update_bdp(exec_ctx, t, pow(2, log2_bdp_guess)); + bdp_guess = pow(2, log2_bdp_guess); + update_bdp(exec_ctx, t, bdp_guess); t->last_pid_update = now; } + + double bw = -1; + if (grpc_bdp_estimator_get_bw(&t->bdp_estimator, &bw)) { + update_frame(exec_ctx, t, bw, bdp_guess); + } } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { @@ -2305,7 +2335,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = tp; - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string); } /* Reset the keepalive ping timer */ @@ -2318,7 +2348,7 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = tp; - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); } grpc_bdp_estimator_complete_ping(&t->bdp_estimator); @@ -2779,7 +2809,7 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_stream_map_size(&t->stream_map) == 0) { /* Channel with no active streams: send a goaway to try and make it * disconnect cleanly */ - if (grpc_resource_quota_trace) { + if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { gpr_log(GPR_DEBUG, "HTTP2: %s - send goaway to free memory", t->peer_string); } @@ -2787,7 +2817,8 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Buffers full"), GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_ENHANCE_YOUR_CALM)); - } else if (error == GRPC_ERROR_NONE && grpc_resource_quota_trace) { + } else if (error == GRPC_ERROR_NONE && + GRPC_TRACER_ON(grpc_resource_quota_trace)) { gpr_log(GPR_DEBUG, "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR " streams", @@ -2808,7 +2839,7 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, t->destructive_reclaimer_registered = false; if (error == GRPC_ERROR_NONE && n > 0) { grpc_chttp2_stream *s = grpc_chttp2_stream_map_rand(&t->stream_map); - if (grpc_resource_quota_trace) { + if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { gpr_log(GPR_DEBUG, "HTTP2: %s - abandon stream id %d", t->peer_string, s->id); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h index c372174f2d..83b17d1936 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h @@ -34,11 +34,12 @@ #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H +#include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/transport/transport.h" -extern int grpc_http_trace; -extern int grpc_flowctl_trace; +extern grpc_tracer_flag grpc_http_trace; +extern grpc_tracer_flag grpc_flowctl_trace; grpc_transport *grpc_create_chttp2_transport( grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c index e3cd70d3f3..dbaafb5929 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.c +++ b/src/core/ext/transport/chttp2/transport/frame_settings.c @@ -218,18 +218,18 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p, parser->incoming_settings[id] != parser->value) { t->initial_window_update += (int64_t)parser->value - parser->incoming_settings[id]; - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "adding %d for initial_window change", (int)t->initial_window_update); } } parser->incoming_settings[id] = parser->value; - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "CHTTP2:%s:%s: got setting %s = %d", t->is_client ? "CLI" : "SVR", t->peer_string, sp->name, parser->value); } - } else if (grpc_http_trace) { + } else if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_ERROR, "CHTTP2: Ignoring unknown setting %d (value %d)", parser->id, parser->value); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index 8fdd4ee77c..126e012aac 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -69,7 +69,7 @@ static grpc_slice_refcount terminal_slice_refcount = {NULL, NULL}; static const grpc_slice terminal_slice = {&terminal_slice_refcount, .data.refcounted = {0, 0}}; -extern int grpc_http_trace; +extern grpc_tracer_flag grpc_http_trace; typedef struct { int is_first_frame; @@ -425,7 +425,7 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, "Reserved header (colon-prefixed) happening after regular ones."); } - if (grpc_http_trace && !GRPC_MDELEM_IS_INTERNED(elem)) { + if (GRPC_TRACER_ON(grpc_http_trace) && !GRPC_MDELEM_IS_INTERNED(elem)) { char *k = grpc_slice_to_c_string(GRPC_MDKEY(elem)); char *v = grpc_slice_to_c_string(GRPC_MDVALUE(elem)); gpr_log( @@ -616,7 +616,7 @@ void grpc_chttp2_hpack_compressor_set_max_table_size( } } c->advertise_table_size_change = 1; - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "set max table size from encoder to %d", max_table_size); } } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 1846a85fc6..bb98bc4a79 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -50,8 +50,6 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/http2_errors.h" -extern int grpc_http_trace; - typedef enum { NOT_BINARY, BINARY_BEGIN, @@ -666,7 +664,7 @@ static const uint8_t inverse_base64[256] = { /* emission helpers */ static grpc_error *on_hdr(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, grpc_mdelem md, int add_to_table) { - if (grpc_http_trace && !GRPC_MDELEM_IS_INTERNED(md)) { + if (GRPC_TRACER_ON(grpc_http_trace) && !GRPC_MDELEM_IS_INTERNED(md)) { char *k = grpc_slice_to_c_string(GRPC_MDKEY(md)); char *v = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log( @@ -1052,7 +1050,7 @@ static grpc_error *parse_lithdr_nvridx_v(grpc_exec_ctx *exec_ctx, static grpc_error *finish_max_tbl_size(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end) { - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_INFO, "MAX TABLE SIZE: %d", p->index); } grpc_error *err = diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.c b/src/core/ext/transport/chttp2/transport/hpack_table.c index 9dd41fdbe1..7aaff55339 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_table.c +++ b/src/core/ext/transport/chttp2/transport/hpack_table.c @@ -40,9 +40,10 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/lib/debug/trace.h" #include "src/core/lib/support/murmur_hash.h" -extern int grpc_http_trace; +extern grpc_tracer_flag grpc_http_trace; static struct { const char *key; @@ -260,7 +261,7 @@ void grpc_chttp2_hptbl_set_max_bytes(grpc_exec_ctx *exec_ctx, if (tbl->max_bytes == max_bytes) { return; } - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "Update hpack parser max size to %d", max_bytes); } while (tbl->mem_used > max_bytes) { @@ -284,7 +285,7 @@ grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_exec_ctx *exec_ctx, gpr_free(msg); return err; } - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "Update hpack parser table size to %d", bytes); } while (tbl->mem_used > bytes) { diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 0aaa4aebe5..8d66e396ee 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -552,9 +552,14 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, bool covered_by_poller, const char *reason); -/** Someone is unlocking the transport mutex: check to see if writes - are required, and frame them if so */ -bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); +typedef enum { + GRPC_CHTTP2_NOTHING_TO_WRITE, + GRPC_CHTTP2_PARTIAL_WRITE, + GRPC_CHTTP2_FULL_WRITE, +} grpc_chttp2_begin_write_result; + +grpc_chttp2_begin_write_result grpc_chttp2_begin_write( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); @@ -629,13 +634,13 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1) -extern int grpc_http_trace; -extern int grpc_flowctl_trace; +extern grpc_tracer_flag grpc_http_trace; +extern grpc_tracer_flag grpc_flowctl_trace; -#define GRPC_CHTTP2_IF_TRACING(stmt) \ - if (!(grpc_http_trace)) \ - ; \ - else \ +#define GRPC_CHTTP2_IF_TRACING(stmt) \ + if (!(GRPC_TRACER_ON(grpc_http_trace))) \ + ; \ + else \ stmt typedef enum { @@ -648,7 +653,7 @@ typedef enum { dst_var, src_context, src_var) \ do { \ assert(id1 == id2); \ - if (grpc_flowctl_trace) { \ + if (GRPC_TRACER_ON(grpc_flowctl_trace)) { \ grpc_chttp2_flowctl_trace( \ __FILE__, __LINE__, phase, GRPC_CHTTP2_FLOWCTL_MOVE, #dst_context, \ #dst_var, #src_context, #src_var, transport->is_client, id1, \ @@ -671,7 +676,7 @@ typedef enum { #define GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, transport, id, dst_context, \ dst_var, amount) \ do { \ - if (grpc_flowctl_trace) { \ + if (GRPC_TRACER_ON(grpc_flowctl_trace)) { \ grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \ GRPC_CHTTP2_FLOWCTL_CREDIT, #dst_context, \ #dst_var, NULL, #amount, transport->is_client, \ @@ -729,7 +734,7 @@ typedef enum { #define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \ dst_var, amount) \ do { \ - if (grpc_flowctl_trace) { \ + if (GRPC_TRACER_ON(grpc_flowctl_trace)) { \ grpc_chttp2_flowctl_trace(__FILE__, __LINE__, phase, \ GRPC_CHTTP2_FLOWCTL_DEBIT, #dst_context, \ #dst_var, NULL, #amount, transport->is_client, \ @@ -815,7 +820,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /** Add a new ping strike to ping_recv_state.ping_strikes. If ping_recv_state.ping_strikes > ping_policy.max_ping_strikes, it sends GOAWAY with error code ENHANCE_YOUR_CALM and additional debug data resembling - “too_many_pings” followed by immediately closing the connection. */ + "too_many_pings" followed by immediately closing the connection. */ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 638b137316..1a676e2259 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -324,7 +324,7 @@ static grpc_error *init_frame_parser(grpc_exec_ctx *exec_ctx, case GRPC_CHTTP2_FRAME_GOAWAY: return init_goaway_parser(exec_ctx, t); default: - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type); } return init_skip_frame_parser(exec_ctx, t, 0); @@ -418,11 +418,9 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s, incoming_frame_size); - if ((int64_t)t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] + - (int64_t)s->incoming_window_delta - (int64_t)s->announce_window <= - (int64_t)t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] / + if ((int64_t)s->incoming_window_delta - (int64_t)s->announce_window <= + -(int64_t)t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] / 2) { grpc_chttp2_become_writable(exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED, @@ -494,7 +492,7 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, GPR_ASSERT(s != NULL); - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { char *key = grpc_slice_to_c_string(GRPC_MDKEY(md)); char *value = grpc_dump_slice(GRPC_MDVALUE(md), GPR_DUMP_HEX | GPR_DUMP_ASCII); @@ -574,7 +572,7 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp, GPR_ASSERT(s != NULL); - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { char *key = grpc_slice_to_c_string(GRPC_MDKEY(md)); char *value = grpc_dump_slice(GRPC_MDVALUE(md), GPR_DUMP_HEX | GPR_DUMP_ASCII); @@ -807,7 +805,7 @@ static grpc_error *parse_frame_slice(grpc_exec_ctx *exec_ctx, if (err == GRPC_ERROR_NONE) { return err; } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, NULL)) { - if (grpc_http_trace) { + if (GRPC_TRACER_ON(grpc_http_trace)) { const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "%s", msg); } diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 069780ae5a..5be1092946 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -74,7 +74,8 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, } if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) { /* ping already in-flight: wait */ - if (grpc_http_trace || grpc_bdp_estimator_trace) { + if (GRPC_TRACER_ON(grpc_http_trace) || + GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { gpr_log(GPR_DEBUG, "Ping delayed [%p]: already pinging", t->peer_string); } return; @@ -82,7 +83,8 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, if (t->ping_state.pings_before_data_required == 0 && t->ping_policy.max_pings_without_data != 0) { /* need to send something of substance before sending a ping again */ - if (grpc_http_trace || grpc_bdp_estimator_trace) { + if (GRPC_TRACER_ON(grpc_http_trace) || + GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { gpr_log(GPR_DEBUG, "Ping delayed [%p]: too many recent pings: %d/%d", t->peer_string, t->ping_state.pings_before_data_required, t->ping_policy.max_pings_without_data); @@ -96,7 +98,8 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, (int)t->ping_policy.min_time_between_pings.tv_nsec);*/ if (gpr_time_cmp(elapsed, t->ping_policy.min_time_between_pings) < 0) { /* not enough elapsed time between successive pings */ - if (grpc_http_trace || grpc_bdp_estimator_trace) { + if (GRPC_TRACER_ON(grpc_http_trace) || + GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { gpr_log(GPR_DEBUG, "Ping delayed [%p]: not enough time elapsed since last ping", t->peer_string); @@ -160,19 +163,22 @@ static bool stream_ref_if_not_destroyed(gpr_refcount *r) { return true; } +/* How many bytes of incoming flow control would we like to advertise */ uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t) { - return (uint32_t)GPR_MAX( + return (uint32_t)GPR_MIN( (int64_t)((1u << 31) - 1), t->stream_total_over_incoming_window + - (int64_t)GPR_MAX( - t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - - t->stream_total_under_incoming_window, - 0)); + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); } -bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { +/* How many bytes would we like to put on the wire during a single syscall */ +static uint32_t target_write_size(grpc_chttp2_transport *t) { + return 1024 * 1024; +} + +grpc_chttp2_begin_write_result grpc_chttp2_begin_write( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_stream *s; GPR_TIMER_BEGIN("grpc_chttp2_begin_write", 0); @@ -206,9 +212,20 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, } } + bool partial_write = false; + /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ - while (grpc_chttp2_list_pop_writable_stream(t, &s)) { + while (true) { + if (t->outbuf.length > target_write_size(t)) { + partial_write = true; + break; + } + + if (!grpc_chttp2_list_pop_writable_stream(t, &s)) { + break; + } + bool sent_initial_metadata = s->sent_initial_metadata; bool now_writing = false; @@ -395,7 +412,9 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("grpc_chttp2_begin_write", 0); - return t->outbuf.count > 0; + return t->outbuf.count > 0 ? (partial_write ? GRPC_CHTTP2_PARTIAL_WRITE + : GRPC_CHTTP2_FULL_WRITE) + : GRPC_CHTTP2_NOTHING_TO_WRITE; } void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index d4e89d6a6c..67974b0b6a 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -886,6 +886,10 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, !stream_state->state_op_done[OP_RECV_MESSAGE]) { CRONET_LOG(GPR_DEBUG, "Because"); result = false; + } else if (curr_op->cancel_stream && + !stream_state->state_callback_received[OP_CANCELED]) { + CRONET_LOG(GPR_DEBUG, "Because"); + result = false; } else if (curr_op->recv_trailing_metadata) { /* We aren't done with trailing metadata yet */ if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { |