diff options
Diffstat (limited to 'src')
26 files changed, 517 insertions, 443 deletions
diff --git a/src/compiler/php_generator.cc b/src/compiler/php_generator.cc index 7d51d40301..67c4c80a7b 100644 --- a/src/compiler/php_generator.cc +++ b/src/compiler/php_generator.cc @@ -67,12 +67,11 @@ void PrintMethod(const MethodDescriptor *method, Printer *out) { vars["input_type_id"] = MessageIdentifierName(input_type->full_name()); vars["output_type_id"] = MessageIdentifierName(output_type->full_name()); - out->Print("/**\n"); - out->Print(GetPHPComments(method, " *").c_str()); + out->Print(GetPHPComments(method, " //").c_str()); if (method->client_streaming()) { out->Print(vars, - " * @param array $$metadata metadata\n" - " * @param array $$options call options\n */\n" + " // @param array $$metadata metadata\n" + " // @param array $$options call options\n" "public function $name$($$metadata = [], " "$$options = []) {\n"); out->Indent(); @@ -87,9 +86,9 @@ void PrintMethod(const MethodDescriptor *method, Printer *out) { "$$metadata, $$options);\n"); } else { out->Print(vars, - " * @param \\$input_type_id$ $$argument input argument\n" - " * @param array $$metadata metadata\n" - " * @param array $$options call options\n */\n" + " // @param \\$input_type_id$ $$argument input argument\n" + " // @param array $$metadata metadata\n" + " // @param array $$options call options\n" "public function $name$(\\$input_type_id$ $$argument,\n" " $$metadata = [], $$options = []) {\n"); out->Indent(); @@ -116,10 +115,10 @@ void PrintService(const ServiceDescriptor *service, Printer *out) { out->Print(vars, "class $name$Client extends \\Grpc\\BaseStub {\n\n"); out->Indent(); out->Print( - "/**\n * @param string $$hostname hostname\n" - " * @param array $$opts channel options\n" - " * @param \\Grpc\\Channel $$channel (optional) re-use channel " - "object\n */\n" + " // @param string $$hostname hostname\n" + " // @param array $$opts channel options\n" + " // @param \\Grpc\\Channel $$channel (optional) re-use channel " + "object\n" "public function __construct($$hostname, $$opts, " "$$channel = null) {\n"); out->Indent(); diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index b4d1565c45..16035208e3 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -99,26 +99,13 @@ typedef struct pending_pick { grpc_closure *on_complete; } pending_pick; -/** List of subchannels in a connectivity READY state */ -typedef struct ready_list { - grpc_subchannel *subchannel; - /* references namesake entry in subchannel_data */ - void *user_data; - struct ready_list *next; - struct ready_list *prev; -} ready_list; - typedef struct { - /** index within policy->subchannels */ - size_t index; /** backpointer to owning policy */ round_robin_lb_policy *policy; /** subchannel itself */ grpc_subchannel *subchannel; /** notification that connectivity has changed on subchannel */ grpc_closure connectivity_changed_closure; - /** this subchannels current position in subchannel->ready_list */ - ready_list *ready_list_node; /** last observed connectivity. Not updated by * \a grpc_subchannel_notify_on_state_change. Used to determine the previous * state while processing the new state in \a rr_connectivity_changed */ @@ -126,6 +113,10 @@ typedef struct { /** current connectivity state. Updated by \a * grpc_subchannel_notify_on_state_change */ grpc_connectivity_state curr_connectivity_state; + /** connectivity state to be updated by the watcher, not guarded by + * the combiner. Will be moved to curr_connectivity_state inside of + * the combiner by rr_connectivity_changed_locked(). */ + grpc_connectivity_state pending_connectivity_state_unsafe; /** the subchannel's target user data */ void *user_data; /** vtable to operate over \a user_data */ @@ -141,182 +132,106 @@ struct round_robin_lb_policy { /** all our subchannels */ size_t num_subchannels; - subchannel_data **subchannels; + subchannel_data *subchannels; - /** how many subchannels are in TRANSIENT_FAILURE */ + /** how many subchannels are in state READY */ + size_t num_ready; + /** how many subchannels are in state TRANSIENT_FAILURE */ size_t num_transient_failures; - /** how many subchannels are IDLE */ + /** how many subchannels are in state IDLE */ size_t num_idle; /** have we started picking? */ - int started_picking; + bool started_picking; /** are we shutting down? */ - int shutdown; + bool shutdown; /** List of picks that are waiting on connectivity */ pending_pick *pending_picks; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker; - /** (Dummy) root of the doubly linked list containing READY subchannels */ - ready_list ready_list; - /** Last pick from the ready list. */ - ready_list *ready_list_last_pick; + // Index into subchannels for last pick. + size_t last_ready_subchannel_index; }; -/** Returns the next subchannel from the connected list or NULL if the list is - * empty. +/** Returns the index into p->subchannels of the next subchannel in + * READY state, or p->num_subchannels if no subchannel is READY. * - * Note that this function does *not* advance p->ready_list_last_pick. Use \a - * advance_last_picked_locked() for that. */ -static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) { - ready_list *selected; - selected = p->ready_list_last_pick->next; - - while (selected != NULL) { - if (selected == &p->ready_list) { - GPR_ASSERT(selected->subchannel == NULL); - /* skip dummy root */ - selected = selected->next; - } else { - GPR_ASSERT(selected->subchannel != NULL); - return selected; - } + * Note that this function does *not* update p->last_ready_subchannel_index. + * The caller must do that if it returns a pick. */ +static size_t get_next_ready_subchannel_index_locked( + const round_robin_lb_policy *p) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, + "[RR: %p] getting next ready subchannel, " + "last_ready_subchannel_index=%lu", + p, (unsigned long)p->last_ready_subchannel_index); } - return NULL; -} - -/** Advance the \a ready_list picking head. */ -static void advance_last_picked_locked(round_robin_lb_policy *p) { - if (p->ready_list_last_pick->next != NULL) { /* non-empty list */ - p->ready_list_last_pick = p->ready_list_last_pick->next; - if (p->ready_list_last_pick == &p->ready_list) { - /* skip dummy root */ - p->ready_list_last_pick = p->ready_list_last_pick->next; + for (size_t i = 0; i < p->num_subchannels; ++i) { + const size_t index = + (i + p->last_ready_subchannel_index + 1) % p->num_subchannels; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] checking index %lu: state=%d", p, + (unsigned long)index, + p->subchannels[index].curr_connectivity_state); + } + if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %lu", + p, (unsigned long)index); + } + return index; } - } else { /* should be an empty list */ - GPR_ASSERT(p->ready_list_last_pick == &p->ready_list); } - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[READYLIST, RR: %p] ADVANCED LAST PICK. NOW AT NODE %p (SC %p, " - "CSC %p)", - (void *)p, (void *)p->ready_list_last_pick, - (void *)p->ready_list_last_pick->subchannel, - (void *)grpc_subchannel_get_connected_subchannel( - p->ready_list_last_pick->subchannel)); + gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p); } + return p->num_subchannels; } -/** Prepends (relative to the root at p->ready_list) the connected subchannel \a - * csc to the list of ready subchannels. */ -static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, - subchannel_data *sd) { - ready_list *new_elem = gpr_zalloc(sizeof(ready_list)); - new_elem->subchannel = sd->subchannel; - new_elem->user_data = sd->user_data; - if (p->ready_list.prev == NULL) { - /* first element */ - new_elem->next = &p->ready_list; - new_elem->prev = &p->ready_list; - p->ready_list.next = new_elem; - p->ready_list.prev = new_elem; - } else { - new_elem->next = &p->ready_list; - new_elem->prev = p->ready_list.prev; - p->ready_list.prev->next = new_elem; - p->ready_list.prev = new_elem; - } +// Sets p->last_ready_subchannel_index to last_ready_index. +static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p, + size_t last_ready_index) { + GPR_ASSERT(last_ready_index < p->num_subchannels); + p->last_ready_subchannel_index = last_ready_index; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)", - (void *)new_elem, (void *)sd->subchannel); - } - return new_elem; -} - -/** Removes \a node from the list of connected subchannels */ -static void remove_disconnected_sc_locked(round_robin_lb_policy *p, - ready_list *node) { - if (node == NULL) { - return; - } - if (node == p->ready_list_last_pick) { - p->ready_list_last_pick = p->ready_list_last_pick->prev; - } - - /* removing last item */ - if (node->next == &p->ready_list && node->prev == &p->ready_list) { - GPR_ASSERT(p->ready_list.next == node); - GPR_ASSERT(p->ready_list.prev == node); - p->ready_list.next = NULL; - p->ready_list.prev = NULL; - } else { - node->prev->next = node->next; - node->next->prev = node->prev; - } - - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node, - (void *)node->subchannel); + gpr_log(GPR_DEBUG, + "[RR: %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", + (void *)p, (unsigned long)last_ready_index, + (void *)p->subchannels[last_ready_index].subchannel, + (void *)grpc_subchannel_get_connected_subchannel( + p->subchannels[last_ready_index].subchannel)); } - - node->next = NULL; - node->prev = NULL; - node->subchannel = NULL; - - gpr_free(node); -} - -static bool is_ready_list_empty(round_robin_lb_policy *p) { - return p->ready_list.prev == NULL; } static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - ready_list *elem; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); } - for (size_t i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); - if (sd->user_data != NULL) { - GPR_ASSERT(sd->user_data_vtable != NULL); - sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + subchannel_data *sd = &p->subchannels[i]; + if (sd->subchannel != NULL) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy"); + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } } - gpr_free(sd); } - grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); - - elem = p->ready_list.next; - while (elem != NULL && elem != &p->ready_list) { - ready_list *tmp; - tmp = elem->next; - elem->next = NULL; - elem->prev = NULL; - elem->subchannel = NULL; - gpr_free(elem); - elem = tmp; - } - gpr_free(p); } static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - size_t i; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); } - - p->shutdown = 1; + p->shutdown = true; + pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; @@ -328,10 +243,13 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); - for (i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, - &sd->connectivity_changed_closure); + for (size_t i = 0; i < p->num_subchannels; i++) { + subchannel_data *sd = &p->subchannels[i]; + if (sd->subchannel != NULL) { + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, + NULL, + &sd->connectivity_changed_closure); + } } } @@ -339,8 +257,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target, grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -364,8 +281,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_eq, grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - pp = p->pending_picks; + pending_pick *pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; @@ -387,21 +303,16 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void start_picking_locked(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { - size_t i; - p->started_picking = 1; - - for (i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - /* use some sentinel value outside of the range of grpc_connectivity_state - * to signal an undefined previous state. We won't be referring to this - * value again and it'll be overwritten after the first call to - * rr_connectivity_changed */ - sd->prev_connectivity_state = GRPC_CHANNEL_INIT; - sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); + p->started_picking = true; + for (size_t i = 0; i < p->num_subchannels; i++) { + subchannel_data *sd = &p->subchannels[i]; + if (sd->subchannel != NULL) { + GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); + } } } @@ -418,36 +329,32 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - ready_list *selected; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); } - - if ((selected = peek_next_connected_locked(p))) { + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + if (next_ready_index < p->num_subchannels) { /* readily available, report right away */ + subchannel_data *sd = &p->subchannels[next_ready_index]; *target = GRPC_CONNECTED_SUBCHANNEL_REF( - grpc_subchannel_get_connected_subchannel(selected->subchannel), - "rr_picked"); - + grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked"); if (user_data != NULL) { - *user_data = selected->user_data; + *user_data = sd->user_data; } if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, - "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", - (void *)*target, (void *)selected); + "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %lu)", + (void *)*target, (unsigned long)next_ready_index); } /* only advance the last picked pointer if the selection was used */ - advance_last_picked_locked(p); + update_last_ready_subchannel_index_locked(p, next_ready_index); return 1; } else { /* no pick currently available. Save for later in list of pending picks */ if (!p->started_picking) { start_picking_locked(exec_ctx, p); } - pp = gpr_malloc(sizeof(*pp)); + pending_pick *pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->target = target; pp->on_complete = on_complete; @@ -458,25 +365,31 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } } -static void update_state_counters(subchannel_data *sd) { +static void update_state_counters_locked(subchannel_data *sd) { round_robin_lb_policy *p = sd->policy; - - /* update p->num_transient_failures (resp. p->num_idle): if the previous - * state was TRANSIENT_FAILURE (resp. IDLE), decrement - * p->num_transient_failures (resp. p->num_idle). */ - if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { + GPR_ASSERT(p->num_ready > 0); + --p->num_ready; + } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GPR_ASSERT(p->num_transient_failures > 0); --p->num_transient_failures; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { GPR_ASSERT(p->num_idle > 0); --p->num_idle; } + if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { + ++p->num_ready; + } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + ++p->num_transient_failures; + } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { + ++p->num_idle; + } } /* sd is the subchannel_data associted with the updated subchannel. * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE * or SHUTDOWN */ -static grpc_connectivity_state update_lb_connectivity_status( +static grpc_connectivity_state update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). @@ -498,7 +411,7 @@ static grpc_connectivity_state update_lb_connectivity_status( * CHECK: p->num_idle == p->num_subchannels. */ round_robin_lb_policy *p = sd->policy; - if (!is_ready_list_empty(p)) { /* 1) READY */ + if (p->num_ready > 0) { /* 1) READY */ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); return GRPC_CHANNEL_READY; @@ -532,32 +445,62 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; - pending_pick *pp; - - GRPC_ERROR_REF(error); - + // Now that we're inside the combiner, copy the pending connectivity + // state (which was set by the connectivity state watcher) to + // curr_connectivity_state, which is what we use inside of the combiner. + sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "[RR %p] connectivity changed for subchannel %p: " + "prev_state=%d new_state=%d", + p, sd->subchannel, sd->prev_connectivity_state, + sd->curr_connectivity_state); + } + // If we're shutting down, unref and return. if (p->shutdown) { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - GRPC_ERROR_UNREF(error); return; } - switch (sd->curr_connectivity_state) { - case GRPC_CHANNEL_INIT: - GPR_UNREACHABLE_CODE(return ); - case GRPC_CHANNEL_READY: - /* add the newly connected subchannel to the list of connected ones. - * Note that it goes to the "end of the line". */ - sd->ready_list_node = add_connected_sc_locked(p, sd); + // Update state counters and determine new overall state. + update_state_counters_locked(sd); + sd->prev_connectivity_state = sd->curr_connectivity_state; + grpc_connectivity_state new_connectivity_state = + update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error)); + // If the new state is SHUTDOWN, unref the subchannel, and if the new + // overall state is SHUTDOWN, clean up. + if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); + sd->subchannel = NULL; + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } + if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { + /* the policy is shutting down. Flush all the pending picks... */ + pending_pick *pp; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + *pp->target = NULL; + grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); + gpr_free(pp); + } + } + /* unref the "rr_connectivity" weak ref from start_picking */ + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); + } else { + if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - ready_list *selected = peek_next_connected_locked(p); - GPR_ASSERT(selected != NULL); + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + GPR_ASSERT(next_ready_index < p->num_subchannels); + subchannel_data *selected = &p->subchannels[next_ready_index]; if (p->pending_picks != NULL) { /* if the selected subchannel is going to be used for the pending * picks, update the last picked pointer */ - advance_last_picked_locked(p); + update_last_ready_subchannel_index_locked(p, next_ready_index); } + pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( @@ -568,72 +511,20 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, - "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", - (void *)selected->subchannel, (void *)selected); + "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (INDEX %lu)", + (void *)selected->subchannel, + (unsigned long)next_ready_index); } grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } - update_lb_connectivity_status(exec_ctx, sd, error); - sd->prev_connectivity_state = sd->curr_connectivity_state; - /* renew notification: reuses the "rr_connectivity" weak ref */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_IDLE: - ++p->num_idle; - /* fallthrough */ - case GRPC_CHANNEL_CONNECTING: - update_state_counters(sd); - update_lb_connectivity_status(exec_ctx, sd, error); - sd->prev_connectivity_state = sd->curr_connectivity_state; - /* renew notification: reuses the "rr_connectivity" weak ref */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - ++p->num_transient_failures; - /* remove from ready list if still present */ - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - update_lb_connectivity_status(exec_ctx, sd, error); - sd->prev_connectivity_state = sd->curr_connectivity_state; - /* renew notification: reuses the "rr_connectivity" weak ref */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->curr_connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_SHUTDOWN: - update_state_counters(sd); - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - --p->num_subchannels; - GPR_SWAP(subchannel_data *, p->subchannels[sd->index], - p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown"); - p->subchannels[sd->index]->index = sd->index; - if (update_lb_connectivity_status(exec_ctx, sd, error) == - GRPC_CHANNEL_SHUTDOWN) { - /* the policy is shutting down. Flush all the pending picks... */ - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - } - } - gpr_free(sd); - /* unref the "rr_connectivity" weak ref from start_picking */ - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); - break; + } + /* renew notification: reuses the "rr_connectivity" weak ref */ + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, p->base.interested_parties, + &sd->pending_connectivity_state_unsafe, + &sd->connectivity_changed_closure); } - GRPC_ERROR_UNREF(error); } static grpc_connectivity_state rr_check_connectivity_locked( @@ -654,10 +545,10 @@ static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *closure) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - ready_list *selected; - grpc_connected_subchannel *target; - if ((selected = peek_next_connected_locked(p))) { - target = GRPC_CONNECTED_SUBCHANNEL_REF( + const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); + if (next_ready_index < p->num_subchannels) { + subchannel_data *selected = &p->subchannels[next_ready_index]; + grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), "rr_picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); @@ -708,7 +599,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; - size_t subchannel_idx = 0; + size_t subchannel_index = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { /* Skip balancer addresses, since we only know how to handle backends. */ if (addresses->addresses[i].is_balancer) continue; @@ -727,42 +618,44 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, "Created subchannel %p for address uri %s", - (void *)subchannel, address_uri); + gpr_log(GPR_DEBUG, "index %lu: Created subchannel %p for address uri %s", + (unsigned long)subchannel_index, (void *)subchannel, address_uri); gpr_free(address_uri); } grpc_channel_args_destroy(exec_ctx, new_args); if (subchannel != NULL) { - subchannel_data *sd = gpr_zalloc(sizeof(*sd)); - p->subchannels[subchannel_idx] = sd; + subchannel_data *sd = &p->subchannels[subchannel_index]; sd->policy = p; - sd->index = subchannel_idx; sd->subchannel = subchannel; + /* use some sentinel value outside of the range of grpc_connectivity_state + * to signal an undefined previous state. We won't be referring to this + * value again and it'll be overwritten after the first call to + * rr_connectivity_changed */ + sd->prev_connectivity_state = GRPC_CHANNEL_INIT; + sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; sd->user_data_vtable = addresses->user_data_vtable; if (sd->user_data_vtable != NULL) { sd->user_data = sd->user_data_vtable->copy(addresses->addresses[i].user_data); } - ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed_locked, sd, grpc_combiner_scheduler(args->combiner)); + ++subchannel_index; } } - if (subchannel_idx == 0) { + if (subchannel_index == 0) { /* couldn't create any subchannel. Bail out */ gpr_free(p->subchannels); gpr_free(p); return NULL; } - p->num_subchannels = subchannel_idx; + p->num_subchannels = subchannel_index; - /* The (dummy node) root of the ready list */ - p->ready_list.subchannel = NULL; - p->ready_list.prev = NULL; - p->ready_list.next = NULL; - p->ready_list_last_pick = &p->ready_list; + // Initialize the last pick index to the last subchannel, so that the + // first pick will start at the beginning of the list. + p->last_ready_subchannel_index = subchannel_index - 1; grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, diff --git a/src/core/ext/filters/client_channel/parse_address.c b/src/core/ext/filters/client_channel/parse_address.c index edc6ce697d..18381eec55 100644 --- a/src/core/ext/filters/client_channel/parse_address.c +++ b/src/core/ext/filters/client_channel/parse_address.c @@ -57,11 +57,11 @@ bool grpc_parse_unix(const grpc_uri *uri, struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr; const size_t maxlen = sizeof(un->sun_path); const size_t path_len = strnlen(uri->path, maxlen); - if (path_len == maxlen) return 0; + if (path_len == maxlen) return false; un->sun_family = AF_UNIX; strcpy(un->sun_path, uri->path); resolved_addr->len = sizeof(*un); - return 1; + return true; } #else /* GRPC_HAVE_UNIX_SOCKET */ @@ -73,74 +73,65 @@ bool grpc_parse_unix(const grpc_uri *uri, #endif /* GRPC_HAVE_UNIX_SOCKET */ -bool grpc_parse_ipv4(const grpc_uri *uri, - grpc_resolved_address *resolved_addr) { - if (strcmp("ipv4", uri->scheme) != 0) { - gpr_log(GPR_ERROR, "Expected 'ipv4' scheme, got '%s'", uri->scheme); - return false; - } - const char *host_port = uri->path; +bool grpc_parse_ipv4_hostport(const char *hostport, grpc_resolved_address *addr, + bool log_errors) { + bool success = false; + // Split host and port. char *host; char *port; - int port_num; - bool result = false; - struct sockaddr_in *in = (struct sockaddr_in *)resolved_addr->addr; - - if (*host_port == '/') ++host_port; - if (!gpr_split_host_port(host_port, &host, &port)) { - return false; - } - - memset(resolved_addr, 0, sizeof(grpc_resolved_address)); - resolved_addr->len = sizeof(struct sockaddr_in); + if (!gpr_split_host_port(hostport, &host, &port)) return false; + // Parse IP address. + memset(addr, 0, sizeof(*addr)); + addr->len = sizeof(struct sockaddr_in); + struct sockaddr_in *in = (struct sockaddr_in *)addr->addr; in->sin_family = AF_INET; if (inet_pton(AF_INET, host, &in->sin_addr) == 0) { - gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); + if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); goto done; } - - if (port != NULL) { - if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || - port_num > 65535) { - gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); - goto done; - } - in->sin_port = htons((uint16_t)port_num); - } else { - gpr_log(GPR_ERROR, "no port given for ipv4 scheme"); + // Parse port. + if (port == NULL) { + if (log_errors) gpr_log(GPR_ERROR, "no port given for ipv4 scheme"); goto done; } - - result = true; + int port_num; + if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || port_num > 65535) { + if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); + goto done; + } + in->sin_port = htons((uint16_t)port_num); + success = true; done: gpr_free(host); gpr_free(port); - return result; + return success; } -bool grpc_parse_ipv6(const grpc_uri *uri, +bool grpc_parse_ipv4(const grpc_uri *uri, grpc_resolved_address *resolved_addr) { - if (strcmp("ipv6", uri->scheme) != 0) { - gpr_log(GPR_ERROR, "Expected 'ipv6' scheme, got '%s'", uri->scheme); + if (strcmp("ipv4", uri->scheme) != 0) { + gpr_log(GPR_ERROR, "Expected 'ipv4' scheme, got '%s'", uri->scheme); return false; } const char *host_port = uri->path; - char *host; - char *port; - int port_num; - int result = 0; - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)resolved_addr->addr; - if (*host_port == '/') ++host_port; - if (!gpr_split_host_port(host_port, &host, &port)) { - return 0; - } + return grpc_parse_ipv4_hostport(host_port, resolved_addr, + true /* log_errors */); +} - memset(in6, 0, sizeof(*in6)); - resolved_addr->len = sizeof(*in6); +bool grpc_parse_ipv6_hostport(const char *hostport, grpc_resolved_address *addr, + bool log_errors) { + bool success = false; + // Split host and port. + char *host; + char *port; + if (!gpr_split_host_port(hostport, &host, &port)) return false; + // Parse IP address. + memset(addr, 0, sizeof(*addr)); + addr->len = sizeof(struct sockaddr_in6); + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr->addr; in6->sin6_family = AF_INET6; - - /* Handle the RFC6874 syntax for IPv6 zone identifiers. */ + // Handle the RFC6874 syntax for IPv6 zone identifiers. char *host_end = (char *)gpr_memrchr(host, '%', strlen(host)); if (host_end != NULL) { GPR_ASSERT(host_end >= host); @@ -159,7 +150,7 @@ bool grpc_parse_ipv6(const grpc_uri *uri, gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1); goto done; } - // Handle "sin6_scope_id" being type "u_long". See grpc issue ##10027. + // Handle "sin6_scope_id" being type "u_long". See grpc issue #10027. in6->sin6_scope_id = sin6_scope_id; } else { if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { @@ -167,24 +158,34 @@ bool grpc_parse_ipv6(const grpc_uri *uri, goto done; } } - - if (port != NULL) { - if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || - port_num > 65535) { - gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); - goto done; - } - in6->sin6_port = htons((uint16_t)port_num); - } else { - gpr_log(GPR_ERROR, "no port given for ipv6 scheme"); + // Parse port. + if (port == NULL) { + if (log_errors) gpr_log(GPR_ERROR, "no port given for ipv6 scheme"); goto done; } - - result = 1; + int port_num; + if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || port_num > 65535) { + if (log_errors) gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); + goto done; + } + in6->sin6_port = htons((uint16_t)port_num); + success = true; done: gpr_free(host); gpr_free(port); - return result; + return success; +} + +bool grpc_parse_ipv6(const grpc_uri *uri, + grpc_resolved_address *resolved_addr) { + if (strcmp("ipv6", uri->scheme) != 0) { + gpr_log(GPR_ERROR, "Expected 'ipv6' scheme, got '%s'", uri->scheme); + return false; + } + const char *host_port = uri->path; + if (*host_port == '/') ++host_port; + return grpc_parse_ipv6_hostport(host_port, resolved_addr, + true /* log_errors */); } bool grpc_parse_uri(const grpc_uri *uri, grpc_resolved_address *resolved_addr) { diff --git a/src/core/ext/filters/client_channel/parse_address.h b/src/core/ext/filters/client_channel/parse_address.h index fa7ea33a00..1a203a3b26 100644 --- a/src/core/ext/filters/client_channel/parse_address.h +++ b/src/core/ext/filters/client_channel/parse_address.h @@ -54,4 +54,10 @@ bool grpc_parse_ipv6(const grpc_uri *uri, grpc_resolved_address *resolved_addr); /** Populate \a resolved_addr from \a uri. Returns true upon success. */ bool grpc_parse_uri(const grpc_uri *uri, grpc_resolved_address *resolved_addr); +/** Parse bare IPv4 or IPv6 "IP:port" strings. */ +bool grpc_parse_ipv4_hostport(const char *hostport, grpc_resolved_address *addr, + bool log_errors); +bool grpc_parse_ipv6_hostport(const char *hostport, grpc_resolved_address *addr, + bool log_errors); + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PARSE_ADDRESS_H */ diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c index 656c3a5860..861b809b19 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c @@ -61,6 +61,8 @@ typedef struct { /** base class: must be first */ grpc_resolver base; + /** DNS server to use (if not system default) */ + char *dns_server; /** name to resolve (usually the same as target_name) */ char *name_to_resolve; /** default port to use */ @@ -172,6 +174,8 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses_destroy(r->addresses); grpc_lb_addresses_destroy(exec_ctx, addresses); } else { + const char *msg = grpc_error_string(error); + gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); gpr_timespec timeout = gpr_time_sub(next_try, now); @@ -221,9 +225,9 @@ static void dns_ares_start_resolving_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!r->resolving); r->resolving = true; r->addresses = NULL; - grpc_resolve_address(exec_ctx, r->name_to_resolve, r->default_port, - r->interested_parties, &r->dns_ares_on_resolved_locked, - &r->addresses); + grpc_dns_lookup_ares(exec_ctx, r->dns_server, r->name_to_resolve, + r->default_port, r->interested_parties, + &r->dns_ares_on_resolved_locked, &r->addresses); } static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, @@ -246,6 +250,7 @@ static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { grpc_channel_args_destroy(exec_ctx, r->resolved_result); } grpc_pollset_set_destroy(exec_ctx, r->interested_parties); + gpr_free(r->dns_server); gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); @@ -257,14 +262,13 @@ static grpc_resolver *dns_ares_create(grpc_exec_ctx *exec_ctx, const char *default_port) { // Get name from args. const char *path = args->uri->path; - if (0 != strcmp(args->uri->authority, "")) { - gpr_log(GPR_ERROR, "authority based dns uri's not supported"); - return NULL; - } if (path[0] == '/') ++path; // Create resolver. ares_dns_resolver *r = gpr_zalloc(sizeof(ares_dns_resolver)); grpc_resolver_init(&r->base, &dns_ares_resolver_vtable, args->combiner); + if (0 != strcmp(args->uri->authority, "")) { + r->dns_server = gpr_strdup(args->uri->authority); + } r->name_to_resolve = gpr_strdup(path); r->default_port = gpr_strdup(default_port); r->channel_args = grpc_channel_args_copy(args->args); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c index 09c46a66e0..e0cfd8b629 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -48,7 +48,10 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> + +#include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" +#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -58,6 +61,8 @@ static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; typedef struct grpc_ares_request { + /** indicates the DNS server to use, if specified */ + struct ares_addr_port_node dns_server_addr; /** following members are set in grpc_resolve_address_ares_impl */ /** host to resolve, parsed from the name to resolve */ char *host; @@ -192,11 +197,12 @@ static void on_done_cb(void *arg, int status, int timeouts, grpc_ares_request_unref(NULL, r); } -void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, - const char *default_port, - grpc_pollset_set *interested_parties, - grpc_closure *on_done, - grpc_resolved_addresses **addrs) { +void grpc_dns_lookup_ares(grpc_exec_ctx *exec_ctx, const char *dns_server, + const char *name, const char *default_port, + grpc_pollset_set *interested_parties, + grpc_closure *on_done, + grpc_resolved_addresses **addrs) { + grpc_error *error = GRPC_ERROR_NONE; /* TODO(zyc): Enable tracing after #9603 is checked in */ /* if (grpc_dns_trace) { gpr_log(GPR_DEBUG, "resolve_address (blocking): name=%s, default_port=%s", @@ -208,28 +214,23 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, char *port; gpr_split_host_port(name, &host, &port); if (host == NULL) { - grpc_error *err = grpc_error_set_str( + error = grpc_error_set_str( GRPC_ERROR_CREATE_FROM_STATIC_STRING("unparseable host:port"), GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); - grpc_closure_sched(exec_ctx, on_done, err); goto error_cleanup; } else if (port == NULL) { if (default_port == NULL) { - grpc_error *err = grpc_error_set_str( + error = grpc_error_set_str( GRPC_ERROR_CREATE_FROM_STATIC_STRING("no port in name"), GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); - grpc_closure_sched(exec_ctx, on_done, err); goto error_cleanup; } port = gpr_strdup(default_port); } grpc_ares_ev_driver *ev_driver; - grpc_error *err = grpc_ares_ev_driver_create(&ev_driver, interested_parties); - if (err != GRPC_ERROR_NONE) { - GRPC_LOG_IF_ERROR("grpc_ares_ev_driver_create() failed", err); - goto error_cleanup; - } + error = grpc_ares_ev_driver_create(&ev_driver, interested_parties); + if (error != GRPC_ERROR_NONE) goto error_cleanup; grpc_ares_request *r = gpr_malloc(sizeof(grpc_ares_request)); gpr_mu_init(&r->mu); @@ -242,6 +243,40 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, r->success = false; r->error = GRPC_ERROR_NONE; ares_channel *channel = grpc_ares_ev_driver_get_channel(r->ev_driver); + + // If dns_server is specified, use it. + if (dns_server != NULL) { + gpr_log(GPR_INFO, "Using DNS server %s", dns_server); + grpc_resolved_address addr; + if (grpc_parse_ipv4_hostport(dns_server, &addr, false /* log_errors */)) { + r->dns_server_addr.family = AF_INET; + memcpy(&r->dns_server_addr.addr.addr4, addr.addr, addr.len); + r->dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr); + r->dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr); + } else if (grpc_parse_ipv6_hostport(dns_server, &addr, + false /* log_errors */)) { + r->dns_server_addr.family = AF_INET6; + memcpy(&r->dns_server_addr.addr.addr6, addr.addr, addr.len); + r->dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr); + r->dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr); + } else { + error = grpc_error_set_str( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("cannot parse authority"), + GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name)); + goto error_cleanup; + } + int status = ares_set_servers_ports(*channel, &r->dns_server_addr); + if (status != ARES_SUCCESS) { + char *error_msg; + gpr_asprintf(&error_msg, "C-ares status is not ARES_SUCCESS: %s", + ares_strerror(status)); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg); + gpr_free(error_msg); + goto error_cleanup; + } + } + // An extra reference is put here to avoid destroying the request in + // on_done_cb before calling grpc_ares_ev_driver_start. gpr_ref_init(&r->pending_queries, 2); if (grpc_ipv6_loopback_available()) { gpr_ref(&r->pending_queries); @@ -254,10 +289,20 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, return; error_cleanup: + grpc_closure_sched(exec_ctx, on_done, error); gpr_free(host); gpr_free(port); } +void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, + const char *default_port, + grpc_pollset_set *interested_parties, + grpc_closure *on_done, + grpc_resolved_addresses **addrs) { + grpc_dns_lookup_ares(exec_ctx, NULL /* dns_server */, name, default_port, + interested_parties, on_done, addrs); +} + void (*grpc_resolve_address_ares)( grpc_exec_ctx *exec_ctx, const char *name, const char *default_port, grpc_pollset_set *interested_parties, grpc_closure *on_done, diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h index 3dd40ea268..84fd7fcbd6 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -51,6 +51,12 @@ extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_resolved_addresses **addresses); +void grpc_dns_lookup_ares(grpc_exec_ctx *exec_ctx, const char *dns_server, + const char *addr, const char *default_port, + grpc_pollset_set *interested_parties, + grpc_closure *on_done, + grpc_resolved_addresses **addresses); + /* Initialize gRPC ares wrapper. Must be called at least once before grpc_resolve_address_ares(). */ grpc_error *grpc_ares_init(void); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 10b187496e..f112ce1b76 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -2109,15 +2109,8 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, double bdp_dbl) { - int32_t bdp; - const int32_t kMinBDP = 128; - if (bdp_dbl <= kMinBDP) { - bdp = kMinBDP; - } else if (bdp_dbl > INT32_MAX) { - bdp = INT32_MAX; - } else { - bdp = (int32_t)(bdp_dbl); - } + // initial window size bounded [1,2^31-1], but we set the min to 128. + int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX); int64_t delta = (int64_t)bdp - (int64_t)t->settings[GRPC_LOCAL_SETTINGS] @@ -2131,7 +2124,26 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, (uint32_t)bdp); - push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, (uint32_t)bdp); +} + +static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + double bw_dbl, double bdp_dbl) { + int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX); + int32_t target = GPR_MAX((int32_t)bw_dbl / 1000, bdp); + // frame size is bounded [2^14,2^24-1] + int32_t frame_size = GPR_CLAMP(target, 16384, 16777215); + int64_t delta = (int64_t)frame_size - + (int64_t)t->settings[GRPC_LOCAL_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; + if (delta == 0 || (delta > -frame_size / 10 && delta < frame_size / 10)) { + return; + } + if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { + gpr_log(GPR_DEBUG, "%s: update max_frame size to %d", t->peer_string, + (int)frame_size); + } + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + (uint32_t)frame_size); } static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, @@ -2250,6 +2262,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, } int64_t estimate = -1; + double bdp_guess = -1; if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) { double target = 1 + log2((double)estimate); double memory_pressure = grpc_resource_quota_get_memory_pressure( @@ -2267,9 +2280,15 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, } double log2_bdp_guess = grpc_pid_controller_update(&t->pid_controller, bdp_error, dt); - update_bdp(exec_ctx, t, pow(2, log2_bdp_guess)); + bdp_guess = pow(2, log2_bdp_guess); + update_bdp(exec_ctx, t, bdp_guess); t->last_pid_update = now; } + + double bw = -1; + if (grpc_bdp_estimator_get_bw(&t->bdp_estimator, &bw)) { + update_frame(exec_ctx, t, bw, bdp_guess); + } } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { diff --git a/src/core/lib/support/cmdline.c b/src/core/lib/support/cmdline.c index 88a65a8e2e..e5c9f3b84b 100644 --- a/src/core/lib/support/cmdline.c +++ b/src/core/lib/support/cmdline.c @@ -71,7 +71,7 @@ struct gpr_cmdline { static int normal_state(gpr_cmdline *cl, char *arg); gpr_cmdline *gpr_cmdline_create(const char *description) { - gpr_cmdline *cl = gpr_zalloc(sizeof(gpr_cmdline)); + gpr_cmdline *cl = (gpr_cmdline *)gpr_zalloc(sizeof(gpr_cmdline)); cl->description = description; cl->state = normal_state; @@ -100,7 +100,7 @@ static void add_arg(gpr_cmdline *cl, const char *name, const char *help, GPR_ASSERT(0 != strcmp(a->name, name)); } - a = gpr_zalloc(sizeof(arg)); + a = (arg *)gpr_zalloc(sizeof(arg)); a->name = name; a->help = help; a->type = type; @@ -302,7 +302,7 @@ static int normal_state(gpr_cmdline *cl, char *str) { eq = strchr(str, '='); if (eq != NULL) { /* copy the string into a temp buffer and extract the name */ - tmp = arg_name = gpr_malloc((size_t)(eq - str + 1)); + tmp = arg_name = (char *)gpr_malloc((size_t)(eq - str + 1)); memcpy(arg_name, str, (size_t)(eq - str)); arg_name[eq - str] = 0; } else { diff --git a/src/core/lib/support/histogram.c b/src/core/lib/support/histogram.c index ba8176bb05..c88695409d 100644 --- a/src/core/lib/support/histogram.c +++ b/src/core/lib/support/histogram.c @@ -88,7 +88,7 @@ static double bucket_start(gpr_histogram *h, double x) { gpr_histogram *gpr_histogram_create(double resolution, double max_bucket_start) { - gpr_histogram *h = gpr_malloc(sizeof(gpr_histogram)); + gpr_histogram *h = (gpr_histogram *)gpr_malloc(sizeof(gpr_histogram)); GPR_ASSERT(resolution > 0.0); GPR_ASSERT(max_bucket_start > resolution); h->sum = 0.0; @@ -102,7 +102,7 @@ gpr_histogram *gpr_histogram_create(double resolution, h->num_buckets = bucket_for_unchecked(h, max_bucket_start) + 1; GPR_ASSERT(h->num_buckets > 1); GPR_ASSERT(h->num_buckets < 100000000); - h->buckets = gpr_zalloc(sizeof(uint32_t) * h->num_buckets); + h->buckets = (uint32_t *)gpr_zalloc(sizeof(uint32_t) * h->num_buckets); return h; } diff --git a/src/core/lib/support/host_port.c b/src/core/lib/support/host_port.c index f19bdbc835..bbd42c26e0 100644 --- a/src/core/lib/support/host_port.c +++ b/src/core/lib/support/host_port.c @@ -98,7 +98,7 @@ int gpr_split_host_port(const char *name, char **host, char **port) { } /* Allocate return values. */ - *host = gpr_malloc(host_len + 1); + *host = (char *)gpr_malloc(host_len + 1); memcpy(*host, host_start, host_len); (*host)[host_len] = '\0'; diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c index d20b86f7cf..11297c9ddb 100644 --- a/src/core/lib/support/string.c +++ b/src/core/lib/support/string.c @@ -53,7 +53,7 @@ char *gpr_strdup(const char *src) { } len = strlen(src) + 1; - dst = gpr_malloc(len); + dst = (char *)gpr_malloc(len); memcpy(dst, src, len); @@ -74,13 +74,13 @@ static dump_out dump_out_create(void) { static void dump_out_append(dump_out *out, char c) { if (out->length == out->capacity) { out->capacity = GPR_MAX(8, 2 * out->capacity); - out->data = gpr_realloc(out->data, out->capacity); + out->data = (char *)gpr_realloc(out->data, out->capacity); } out->data[out->length++] = c; } static void hexdump(dump_out *out, const char *buf, size_t len) { - static const char hex[16] = "0123456789abcdef"; + static const char *hex = "0123456789abcdef"; const uint8_t *const beg = (const uint8_t *)buf; const uint8_t *const end = beg + len; @@ -124,16 +124,16 @@ char *gpr_dump(const char *buf, size_t len, uint32_t flags) { int gpr_parse_bytes_to_uint32(const char *buf, size_t len, uint32_t *result) { uint32_t out = 0; - uint32_t new; + uint32_t new_val; size_t i; if (len == 0) return 0; /* must have some bytes */ for (i = 0; i < len; i++) { if (buf[i] < '0' || buf[i] > '9') return 0; /* bad char */ - new = 10 * out + (uint32_t)(buf[i] - '0'); - if (new < out) return 0; /* overflow */ - out = new; + new_val = 10 * out + (uint32_t)(buf[i] - '0'); + if (new_val < out) return 0; /* overflow */ + out = new_val; } *result = out; @@ -201,7 +201,7 @@ int gpr_parse_nonnegative_int(const char *value) { char *gpr_leftpad(const char *str, char flag, size_t length) { const size_t str_length = strlen(str); const size_t out_length = str_length > length ? str_length : length; - char *out = gpr_malloc(out_length + 1); + char *out = (char *)gpr_malloc(out_length + 1); memset(out, flag, out_length - str_length); memcpy(out + out_length - str_length, str, str_length); out[out_length] = 0; @@ -225,7 +225,7 @@ char *gpr_strjoin_sep(const char **strs, size_t nstrs, const char *sep, if (nstrs > 0) { out_length += sep_len * (nstrs - 1); /* separators */ } - out = gpr_malloc(out_length); + out = (char *)gpr_malloc(out_length); out_length = 0; for (i = 0; i < nstrs; i++) { const size_t slen = strlen(strs[i]); @@ -256,7 +256,7 @@ void gpr_strvec_destroy(gpr_strvec *sv) { void gpr_strvec_add(gpr_strvec *sv, char *str) { if (sv->count == sv->capacity) { sv->capacity = GPR_MAX(sv->capacity + 8, sv->capacity * 2); - sv->strs = gpr_realloc(sv->strs, sizeof(char *) * sv->capacity); + sv->strs = (char **)gpr_realloc(sv->strs, sizeof(char *) * sv->capacity); } sv->strs[sv->count++] = str; } @@ -278,12 +278,12 @@ int gpr_stricmp(const char *a, const char *b) { static void add_string_to_split(const char *beg, const char *end, char ***strs, size_t *nstrs, size_t *capstrs) { - char *out = gpr_malloc((size_t)(end - beg) + 1); + char *out = (char *)gpr_malloc((size_t)(end - beg) + 1); memcpy(out, beg, (size_t)(end - beg)); out[end - beg] = 0; if (*nstrs == *capstrs) { *capstrs = GPR_MAX(8, 2 * *capstrs); - *strs = gpr_realloc(*strs, sizeof(*strs) * *capstrs); + *strs = (char **)gpr_realloc(*strs, sizeof(*strs) * *capstrs); } (*strs)[*nstrs] = out; ++*nstrs; diff --git a/src/core/lib/support/string_posix.c b/src/core/lib/support/string_posix.c index c804ed5ded..2438b18d21 100644 --- a/src/core/lib/support/string_posix.c +++ b/src/core/lib/support/string_posix.c @@ -58,7 +58,7 @@ int gpr_asprintf(char **strp, const char *format, ...) { /* Allocate a new buffer, with space for the NUL terminator. */ strp_buflen = (size_t)ret + 1; - if ((*strp = gpr_malloc(strp_buflen)) == NULL) { + if ((*strp = (char *)gpr_malloc(strp_buflen)) == NULL) { /* This shouldn't happen, because gpr_malloc() calls abort(). */ return -1; } diff --git a/src/core/lib/support/subprocess_posix.c b/src/core/lib/support/subprocess_posix.c index ed653b9c2e..b9d0796b01 100644 --- a/src/core/lib/support/subprocess_posix.c +++ b/src/core/lib/support/subprocess_posix.c @@ -67,7 +67,7 @@ gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) { if (pid == -1) { return NULL; } else if (pid == 0) { - exec_args = gpr_malloc(((size_t)argc + 1) * sizeof(char *)); + exec_args = (char **)gpr_malloc(((size_t)argc + 1) * sizeof(char *)); memcpy(exec_args, argv, (size_t)argc * sizeof(char *)); exec_args[argc] = NULL; execv(exec_args[0], exec_args); @@ -76,7 +76,7 @@ gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) { _exit(1); return NULL; } else { - r = gpr_zalloc(sizeof(gpr_subprocess)); + r = (gpr_subprocess *)gpr_zalloc(sizeof(gpr_subprocess)); r->pid = pid; return r; } diff --git a/src/core/lib/support/thd_posix.c b/src/core/lib/support/thd_posix.c index 2fc23bffaf..16e645ad91 100644 --- a/src/core/lib/support/thd_posix.c +++ b/src/core/lib/support/thd_posix.c @@ -65,7 +65,7 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg, pthread_t p; /* don't use gpr_malloc as we may cause an infinite recursion with * the profiling code */ - struct thd_arg *a = malloc(sizeof(*a)); + struct thd_arg *a = (struct thd_arg *)malloc(sizeof(*a)); GPR_ASSERT(a != NULL); a->body = thd_body; a->arg = arg; diff --git a/src/core/lib/transport/bdp_estimator.c b/src/core/lib/transport/bdp_estimator.c index da8019d98b..e3a82b492a 100644 --- a/src/core/lib/transport/bdp_estimator.c +++ b/src/core/lib/transport/bdp_estimator.c @@ -53,6 +53,11 @@ bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator, return true; } +bool grpc_bdp_estimator_get_bw(grpc_bdp_estimator *estimator, double *bw) { + *bw = estimator->bw_est; + return true; +} + bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, int64_t num_bytes) { estimator->accumulator += num_bytes; diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h index 135376edd7..b9a7fc84bb 100644 --- a/src/core/lib/transport/bdp_estimator.h +++ b/src/core/lib/transport/bdp_estimator.h @@ -64,6 +64,8 @@ void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name); // Returns true if a reasonable estimate could be obtained bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator, int64_t *estimate); +// Returns true if a reasonable estimate could be obtained +bool grpc_bdp_estimator_get_bw(grpc_bdp_estimator *estimator, double *bw); // Returns true if the user should schedule a ping bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, int64_t num_bytes); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 6dca6a6862..c849d2e5b4 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -180,6 +180,10 @@ ServerBuilder& ServerBuilder::AddListeningPort( } std::unique_ptr<Server> ServerBuilder::BuildAndStart() { + for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) { + (*plugin)->UpdateServerBuilder(this); + } + ChannelArguments args; for (auto option = options_.begin(); option != options_.end(); ++option) { (*option)->UpdateArguments(&args); diff --git a/src/csharp/Grpc.Core/Internal/CallError.cs b/src/csharp/Grpc.Core/Internal/CallError.cs index 541575f5e6..a46f9b3859 100644 --- a/src/csharp/Grpc.Core/Internal/CallError.cs +++ b/src/csharp/Grpc.Core/Internal/CallError.cs @@ -72,7 +72,10 @@ namespace Grpc.Core.Internal /// </summary> public static void CheckOk(this CallError callError) { - GrpcPreconditions.CheckState(callError == CallError.OK, "Call error: " + callError); + if (callError != CallError.OK) + { + throw new InvalidOperationException("Call error: " + callError); + } } } } diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index c9f7c42b71..8a44f8d68f 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -140,7 +140,8 @@ namespace Grpc.IntegrationTesting readonly ClientType clientType; readonly RpcType rpcType; readonly PayloadConfig payloadConfig; - readonly Histogram histogram; + readonly Lazy<byte[]> cachedByteBufferRequest; + readonly ThreadLocal<Histogram> threadLocalHistogram; readonly List<Task> runnerTasks; readonly CancellationTokenSource stoppedCts = new CancellationTokenSource(); @@ -155,7 +156,8 @@ namespace Grpc.IntegrationTesting this.clientType = clientType; this.rpcType = rpcType; this.payloadConfig = payloadConfig; - this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); + this.cachedByteBufferRequest = new Lazy<byte[]>(() => new byte[payloadConfig.BytebufParams.ReqSize]); + this.threadLocalHistogram = new ThreadLocal<Histogram>(() => new Histogram(histogramParams.Resolution, histogramParams.MaxPossible), true); this.runnerTasks = new List<Task>(); foreach (var channel in this.channels) @@ -171,7 +173,12 @@ namespace Grpc.IntegrationTesting public ClientStats GetStats(bool reset) { - var histogramData = histogram.GetSnapshot(reset); + var histogramData = new HistogramData(); + foreach (var hist in threadLocalHistogram.Values) + { + hist.GetSnapshot(histogramData, reset); + } + var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds; if (reset) @@ -232,7 +239,7 @@ namespace Grpc.IntegrationTesting stopwatch.Stop(); // spec requires data point in nanoseconds. - histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); timer.WaitForNext(); } @@ -251,7 +258,7 @@ namespace Grpc.IntegrationTesting stopwatch.Stop(); // spec requires data point in nanoseconds. - histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); await timer.WaitForNextAsync(); } @@ -273,7 +280,7 @@ namespace Grpc.IntegrationTesting stopwatch.Stop(); // spec requires data point in nanoseconds. - histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); await timer.WaitForNextAsync(); } @@ -286,7 +293,7 @@ namespace Grpc.IntegrationTesting private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer) { - var request = CreateByteBufferRequest(); + var request = cachedByteBufferRequest.Value; var stopwatch = new Stopwatch(); var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions()); @@ -301,7 +308,7 @@ namespace Grpc.IntegrationTesting stopwatch.Stop(); // spec requires data point in nanoseconds. - histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); await timer.WaitForNextAsync(); } @@ -351,11 +358,6 @@ namespace Grpc.IntegrationTesting }; } - private byte[] CreateByteBufferRequest() - { - return new byte[payloadConfig.BytebufParams.ReqSize]; - } - private static Payload CreateZerosPayload(int size) { return new Payload { Body = ByteString.CopyFrom(new byte[size]) }; diff --git a/src/csharp/Grpc.IntegrationTesting/Histogram.cs b/src/csharp/Grpc.IntegrationTesting/Histogram.cs index 28d1f078a9..9d33c497e6 100644 --- a/src/csharp/Grpc.IntegrationTesting/Histogram.cs +++ b/src/csharp/Grpc.IntegrationTesting/Histogram.cs @@ -84,15 +84,27 @@ namespace Grpc.IntegrationTesting } } - /// <summary> - /// Gets snapshot of stats and reset + /// Gets snapshot of stats and optionally resets the histogram. /// </summary> public HistogramData GetSnapshot(bool reset = false) { lock (myLock) { - return GetSnapshotUnsafe(reset); + var histogramData = new HistogramData(); + GetSnapshotUnsafe(histogramData, reset); + return histogramData; + } + } + + /// <summary> + /// Merges snapshot of stats into <c>mergeTo</c> and optionally resets the histogram. + /// </summary> + public void GetSnapshot(HistogramData mergeTo, bool reset) + { + lock (myLock) + { + GetSnapshotUnsafe(mergeTo, reset); } } @@ -117,24 +129,39 @@ namespace Grpc.IntegrationTesting this.buckets[FindBucket(value)]++; } - private HistogramData GetSnapshotUnsafe(bool reset) + private void GetSnapshotUnsafe(HistogramData mergeTo, bool reset) { - var data = new HistogramData + GrpcPreconditions.CheckArgument(mergeTo.Bucket.Count == 0 || mergeTo.Bucket.Count == buckets.Length); + if (mergeTo.Count == 0) { - Count = count, - Sum = sum, - SumOfSquares = sumOfSquares, - MinSeen = min, - MaxSeen = max, - Bucket = { buckets } - }; + mergeTo.MinSeen = min; + mergeTo.MaxSeen = max; + } + else + { + mergeTo.MinSeen = Math.Min(mergeTo.MinSeen, min); + mergeTo.MaxSeen = Math.Max(mergeTo.MaxSeen, max); + } + mergeTo.Count += count; + mergeTo.Sum += sum; + mergeTo.SumOfSquares += sumOfSquares; - if (reset) + if (mergeTo.Bucket.Count == 0) { - ResetUnsafe(); + mergeTo.Bucket.AddRange(buckets); + } + else + { + for (int i = 0; i < buckets.Length; i++) + { + mergeTo.Bucket[i] += buckets[i]; + } } - return data; + if (reset) + { + ResetUnsafe(); + } } private void ResetUnsafe() diff --git a/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs b/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs index fa160cbd15..e8a2ed0c5b 100644 --- a/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs @@ -73,7 +73,7 @@ namespace Grpc.IntegrationTesting { var hist = new Histogram(0.01, 60e9); hist.AddObservation(-0.5); // should be in the first bucket - hist.AddObservation(1e12); // should be in the last bucket + hist.AddObservation(1e12); // should be in the last bucket var data = hist.GetSnapshot(); Assert.AreEqual(1, data.Bucket[0]); @@ -81,6 +81,30 @@ namespace Grpc.IntegrationTesting } [Test] + public void MergeSnapshots() + { + var data = new HistogramData(); + + var hist1 = new Histogram(0.01, 60e9); + hist1.AddObservation(-0.5); // should be in the first bucket + hist1.AddObservation(1e12); // should be in the last bucket + hist1.GetSnapshot(data, false); + + var hist2 = new Histogram(0.01, 60e9); + hist2.AddObservation(10000); + hist2.AddObservation(11000); + hist2.GetSnapshot(data, false); + + Assert.AreEqual(4, data.Count); + Assert.AreEqual(-0.5, data.MinSeen); + Assert.AreEqual(1e12, data.MaxSeen); + Assert.AreEqual(1, data.Bucket[0]); + Assert.AreEqual(1, data.Bucket[925]); + Assert.AreEqual(1, data.Bucket[935]); + Assert.AreEqual(1, data.Bucket[data.Bucket.Count - 1]); + } + + [Test] public void Reset() { var hist = new Histogram(0.01, 60e9); diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m index e36f5c3ee9..deaaf17b07 100644 --- a/src/objective-c/tests/GRPCClientTests.m +++ b/src/objective-c/tests/GRPCClientTests.m @@ -293,6 +293,35 @@ static GRPCProtoMethod *kUnaryCallMethod; [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; } +- (void)testTrailers { + __weak XCTestExpectation *response = [self expectationWithDescription:@"Empty response received."]; + __weak XCTestExpectation *completion = [self expectationWithDescription:@"Empty RPC completed."]; + + GRPCCall *call = [[GRPCCall alloc] initWithHost:kHostAddress + path:kEmptyCallMethod.HTTPPath + requestsWriter:[GRXWriter writerWithValue:[NSData data]]]; + // Setting this special key in the header will cause the interop server to echo back the + // trailer data. + const unsigned char raw_bytes[] = {1,2,3,4}; + NSData *trailer_data = [NSData dataWithBytes:raw_bytes length:sizeof(raw_bytes)]; + call.requestHeaders[@"x-grpc-test-echo-trailing-bin"] = trailer_data; + + id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { + XCTAssertNotNil(value, @"nil value received as response."); + XCTAssertEqual([value length], 0, @"Non-empty response received: %@", value); + [response fulfill]; + } completionHandler:^(NSError *errorOrNil) { + XCTAssertNil(errorOrNil, @"Finished with unexpected error: %@", errorOrNil); + XCTAssertEqualObjects((NSData *)call.responseTrailers[@"x-grpc-test-echo-trailing-bin"], + trailer_data, + @"Did not receive expected trailer"); + [completion fulfill]; + }]; + + [call startWithWriteable:responsesWriteable]; + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; +} + // TODO(makarandd): Move to a different file that contains only unit tests - (void)testExceptions { // Try to set parameters to nil for GRPCCall. This should cause an exception diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 221a1e14ec..8f76420af6 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -270,6 +270,7 @@ gpr_histogram_get_contents_type gpr_histogram_get_contents_import; gpr_histogram_merge_contents_type gpr_histogram_merge_contents_import; gpr_join_host_port_type gpr_join_host_port_import; gpr_split_host_port_type gpr_split_host_port_import; +gpr_log_severity_string_type gpr_log_severity_string_import; gpr_log_type gpr_log_import; gpr_log_message_type gpr_log_message_import; gpr_set_log_verbosity_type gpr_set_log_verbosity_import; @@ -571,6 +572,7 @@ void grpc_rb_load_imports(HMODULE library) { gpr_histogram_merge_contents_import = (gpr_histogram_merge_contents_type) GetProcAddress(library, "gpr_histogram_merge_contents"); gpr_join_host_port_import = (gpr_join_host_port_type) GetProcAddress(library, "gpr_join_host_port"); gpr_split_host_port_import = (gpr_split_host_port_type) GetProcAddress(library, "gpr_split_host_port"); + gpr_log_severity_string_import = (gpr_log_severity_string_type) GetProcAddress(library, "gpr_log_severity_string"); gpr_log_import = (gpr_log_type) GetProcAddress(library, "gpr_log"); gpr_log_message_import = (gpr_log_message_type) GetProcAddress(library, "gpr_log_message"); gpr_set_log_verbosity_import = (gpr_set_log_verbosity_type) GetProcAddress(library, "gpr_set_log_verbosity"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index f62b31e83d..58467f97e8 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -761,6 +761,9 @@ extern gpr_join_host_port_type gpr_join_host_port_import; typedef int(*gpr_split_host_port_type)(const char *name, char **host, char **port); extern gpr_split_host_port_type gpr_split_host_port_import; #define gpr_split_host_port gpr_split_host_port_import +typedef const char *(*gpr_log_severity_string_type)(gpr_log_severity severity); +extern gpr_log_severity_string_type gpr_log_severity_string_import; +#define gpr_log_severity_string gpr_log_severity_string_import typedef void(*gpr_log_type)(const char *file, int line, gpr_log_severity severity, const char *format, ...) GPR_PRINT_FORMAT_CHECK(4, 5); extern gpr_log_type gpr_log_import; #define gpr_log gpr_log_import diff --git a/src/ruby/lib/grpc/grpc.rb b/src/ruby/lib/grpc/grpc.rb index f46710dc74..48f2a45d44 100644 --- a/src/ruby/lib/grpc/grpc.rb +++ b/src/ruby/lib/grpc/grpc.rb @@ -34,6 +34,6 @@ begin if File.directory?(distrib_lib_dir) require_relative "#{distrib_lib_dir}/grpc_c" else - require_relative 'grpc_c' + require 'grpc/grpc_c' end end |