aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-05-23 22:55:34 +0000
committerGravatar Craig Tiller <ctiller@google.com>2017-05-23 22:55:34 +0000
commit674af8f23ea1780ed0803d626df74408299e46c7 (patch)
tree2a95de9044e5d0e5d25f0bf9d1850d33ca3fa72a /src
parentd3ec4aaf6f52387d27d02ae80b18bdcaf65828d6 (diff)
parent0a94f3c8ab55dfd12c14058d57f33121c8d6c411 (diff)
Merge github.com:grpc/grpc into thread_pool
Diffstat (limited to 'src')
-rw-r--r--src/compiler/php_generator.cc21
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c459
-rw-r--r--src/core/ext/filters/client_channel/parse_address.c129
-rw-r--r--src/core/ext/filters/client_channel/parse_address.h6
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c18
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c73
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h6
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c41
-rw-r--r--src/core/lib/support/cmdline.c6
-rw-r--r--src/core/lib/support/histogram.c4
-rw-r--r--src/core/lib/support/host_port.c2
-rw-r--r--src/core/lib/support/string.c24
-rw-r--r--src/core/lib/support/string_posix.c2
-rw-r--r--src/core/lib/support/subprocess_posix.c4
-rw-r--r--src/core/lib/support/thd_posix.c2
-rw-r--r--src/core/lib/transport/bdp_estimator.c5
-rw-r--r--src/core/lib/transport/bdp_estimator.h2
-rw-r--r--src/cpp/server/server_builder.cc4
-rw-r--r--src/csharp/Grpc.Core/Internal/CallError.cs5
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ClientRunners.cs28
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Histogram.cs57
-rw-r--r--src/csharp/Grpc.IntegrationTesting/HistogramTest.cs26
-rw-r--r--src/objective-c/tests/GRPCClientTests.m29
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h3
-rw-r--r--src/ruby/lib/grpc/grpc.rb2
26 files changed, 517 insertions, 443 deletions
diff --git a/src/compiler/php_generator.cc b/src/compiler/php_generator.cc
index 7d51d40301..67c4c80a7b 100644
--- a/src/compiler/php_generator.cc
+++ b/src/compiler/php_generator.cc
@@ -67,12 +67,11 @@ void PrintMethod(const MethodDescriptor *method, Printer *out) {
vars["input_type_id"] = MessageIdentifierName(input_type->full_name());
vars["output_type_id"] = MessageIdentifierName(output_type->full_name());
- out->Print("/**\n");
- out->Print(GetPHPComments(method, " *").c_str());
+ out->Print(GetPHPComments(method, " //").c_str());
if (method->client_streaming()) {
out->Print(vars,
- " * @param array $$metadata metadata\n"
- " * @param array $$options call options\n */\n"
+ " // @param array $$metadata metadata\n"
+ " // @param array $$options call options\n"
"public function $name$($$metadata = [], "
"$$options = []) {\n");
out->Indent();
@@ -87,9 +86,9 @@ void PrintMethod(const MethodDescriptor *method, Printer *out) {
"$$metadata, $$options);\n");
} else {
out->Print(vars,
- " * @param \\$input_type_id$ $$argument input argument\n"
- " * @param array $$metadata metadata\n"
- " * @param array $$options call options\n */\n"
+ " // @param \\$input_type_id$ $$argument input argument\n"
+ " // @param array $$metadata metadata\n"
+ " // @param array $$options call options\n"
"public function $name$(\\$input_type_id$ $$argument,\n"
" $$metadata = [], $$options = []) {\n");
out->Indent();
@@ -116,10 +115,10 @@ void PrintService(const ServiceDescriptor *service, Printer *out) {
out->Print(vars, "class $name$Client extends \\Grpc\\BaseStub {\n\n");
out->Indent();
out->Print(
- "/**\n * @param string $$hostname hostname\n"
- " * @param array $$opts channel options\n"
- " * @param \\Grpc\\Channel $$channel (optional) re-use channel "
- "object\n */\n"
+ " // @param string $$hostname hostname\n"
+ " // @param array $$opts channel options\n"
+ " // @param \\Grpc\\Channel $$channel (optional) re-use channel "
+ "object\n"
"public function __construct($$hostname, $$opts, "
"$$channel = null) {\n");
out->Indent();
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index b4d1565c45..16035208e3 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -99,26 +99,13 @@ typedef struct pending_pick {
grpc_closure *on_complete;
} pending_pick;
-/** List of subchannels in a connectivity READY state */
-typedef struct ready_list {
- grpc_subchannel *subchannel;
- /* references namesake entry in subchannel_data */
- void *user_data;
- struct ready_list *next;
- struct ready_list *prev;
-} ready_list;
-
typedef struct {
- /** index within policy->subchannels */
- size_t index;
/** backpointer to owning policy */
round_robin_lb_policy *policy;
/** subchannel itself */
grpc_subchannel *subchannel;
/** notification that connectivity has changed on subchannel */
grpc_closure connectivity_changed_closure;
- /** this subchannels current position in subchannel->ready_list */
- ready_list *ready_list_node;
/** last observed connectivity. Not updated by
* \a grpc_subchannel_notify_on_state_change. Used to determine the previous
* state while processing the new state in \a rr_connectivity_changed */
@@ -126,6 +113,10 @@ typedef struct {
/** current connectivity state. Updated by \a
* grpc_subchannel_notify_on_state_change */
grpc_connectivity_state curr_connectivity_state;
+ /** connectivity state to be updated by the watcher, not guarded by
+ * the combiner. Will be moved to curr_connectivity_state inside of
+ * the combiner by rr_connectivity_changed_locked(). */
+ grpc_connectivity_state pending_connectivity_state_unsafe;
/** the subchannel's target user data */
void *user_data;
/** vtable to operate over \a user_data */
@@ -141,182 +132,106 @@ struct round_robin_lb_policy {
/** all our subchannels */
size_t num_subchannels;
- subchannel_data **subchannels;
+ subchannel_data *subchannels;
- /** how many subchannels are in TRANSIENT_FAILURE */
+ /** how many subchannels are in state READY */
+ size_t num_ready;
+ /** how many subchannels are in state TRANSIENT_FAILURE */
size_t num_transient_failures;
- /** how many subchannels are IDLE */
+ /** how many subchannels are in state IDLE */
size_t num_idle;
/** have we started picking? */
- int started_picking;
+ bool started_picking;
/** are we shutting down? */
- int shutdown;
+ bool shutdown;
/** List of picks that are waiting on connectivity */
pending_pick *pending_picks;
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker;
- /** (Dummy) root of the doubly linked list containing READY subchannels */
- ready_list ready_list;
- /** Last pick from the ready list. */
- ready_list *ready_list_last_pick;
+ // Index into subchannels for last pick.
+ size_t last_ready_subchannel_index;
};
-/** Returns the next subchannel from the connected list or NULL if the list is
- * empty.
+/** Returns the index into p->subchannels of the next subchannel in
+ * READY state, or p->num_subchannels if no subchannel is READY.
*
- * Note that this function does *not* advance p->ready_list_last_pick. Use \a
- * advance_last_picked_locked() for that. */
-static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) {
- ready_list *selected;
- selected = p->ready_list_last_pick->next;
-
- while (selected != NULL) {
- if (selected == &p->ready_list) {
- GPR_ASSERT(selected->subchannel == NULL);
- /* skip dummy root */
- selected = selected->next;
- } else {
- GPR_ASSERT(selected->subchannel != NULL);
- return selected;
- }
+ * Note that this function does *not* update p->last_ready_subchannel_index.
+ * The caller must do that if it returns a pick. */
+static size_t get_next_ready_subchannel_index_locked(
+ const round_robin_lb_policy *p) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_INFO,
+ "[RR: %p] getting next ready subchannel, "
+ "last_ready_subchannel_index=%lu",
+ p, (unsigned long)p->last_ready_subchannel_index);
}
- return NULL;
-}
-
-/** Advance the \a ready_list picking head. */
-static void advance_last_picked_locked(round_robin_lb_policy *p) {
- if (p->ready_list_last_pick->next != NULL) { /* non-empty list */
- p->ready_list_last_pick = p->ready_list_last_pick->next;
- if (p->ready_list_last_pick == &p->ready_list) {
- /* skip dummy root */
- p->ready_list_last_pick = p->ready_list_last_pick->next;
+ for (size_t i = 0; i < p->num_subchannels; ++i) {
+ const size_t index =
+ (i + p->last_ready_subchannel_index + 1) % p->num_subchannels;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "[RR %p] checking index %lu: state=%d", p,
+ (unsigned long)index,
+ p->subchannels[index].curr_connectivity_state);
+ }
+ if (p->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) {
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel at index %lu",
+ p, (unsigned long)index);
+ }
+ return index;
}
- } else { /* should be an empty list */
- GPR_ASSERT(p->ready_list_last_pick == &p->ready_list);
}
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG,
- "[READYLIST, RR: %p] ADVANCED LAST PICK. NOW AT NODE %p (SC %p, "
- "CSC %p)",
- (void *)p, (void *)p->ready_list_last_pick,
- (void *)p->ready_list_last_pick->subchannel,
- (void *)grpc_subchannel_get_connected_subchannel(
- p->ready_list_last_pick->subchannel));
+ gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", p);
}
+ return p->num_subchannels;
}
-/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
- * csc to the list of ready subchannels. */
-static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
- subchannel_data *sd) {
- ready_list *new_elem = gpr_zalloc(sizeof(ready_list));
- new_elem->subchannel = sd->subchannel;
- new_elem->user_data = sd->user_data;
- if (p->ready_list.prev == NULL) {
- /* first element */
- new_elem->next = &p->ready_list;
- new_elem->prev = &p->ready_list;
- p->ready_list.next = new_elem;
- p->ready_list.prev = new_elem;
- } else {
- new_elem->next = &p->ready_list;
- new_elem->prev = p->ready_list.prev;
- p->ready_list.prev->next = new_elem;
- p->ready_list.prev = new_elem;
- }
+// Sets p->last_ready_subchannel_index to last_ready_index.
+static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p,
+ size_t last_ready_index) {
+ GPR_ASSERT(last_ready_index < p->num_subchannels);
+ p->last_ready_subchannel_index = last_ready_index;
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)",
- (void *)new_elem, (void *)sd->subchannel);
- }
- return new_elem;
-}
-
-/** Removes \a node from the list of connected subchannels */
-static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
- ready_list *node) {
- if (node == NULL) {
- return;
- }
- if (node == p->ready_list_last_pick) {
- p->ready_list_last_pick = p->ready_list_last_pick->prev;
- }
-
- /* removing last item */
- if (node->next == &p->ready_list && node->prev == &p->ready_list) {
- GPR_ASSERT(p->ready_list.next == node);
- GPR_ASSERT(p->ready_list.prev == node);
- p->ready_list.next = NULL;
- p->ready_list.prev = NULL;
- } else {
- node->prev->next = node->next;
- node->next->prev = node->prev;
- }
-
- if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node,
- (void *)node->subchannel);
+ gpr_log(GPR_DEBUG,
+ "[RR: %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)",
+ (void *)p, (unsigned long)last_ready_index,
+ (void *)p->subchannels[last_ready_index].subchannel,
+ (void *)grpc_subchannel_get_connected_subchannel(
+ p->subchannels[last_ready_index].subchannel));
}
-
- node->next = NULL;
- node->prev = NULL;
- node->subchannel = NULL;
-
- gpr_free(node);
-}
-
-static bool is_ready_list_empty(round_robin_lb_policy *p) {
- return p->ready_list.prev == NULL;
}
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- ready_list *elem;
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol);
}
-
for (size_t i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = p->subchannels[i];
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
- if (sd->user_data != NULL) {
- GPR_ASSERT(sd->user_data_vtable != NULL);
- sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ subchannel_data *sd = &p->subchannels[i];
+ if (sd->subchannel != NULL) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
+ if (sd->user_data != NULL) {
+ GPR_ASSERT(sd->user_data_vtable != NULL);
+ sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ }
}
- gpr_free(sd);
}
-
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
-
- elem = p->ready_list.next;
- while (elem != NULL && elem != &p->ready_list) {
- ready_list *tmp;
- tmp = elem->next;
- elem->next = NULL;
- elem->prev = NULL;
- elem->subchannel = NULL;
- gpr_free(elem);
- elem = tmp;
- }
-
gpr_free(p);
}
static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- size_t i;
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol);
}
-
- p->shutdown = 1;
+ p->shutdown = true;
+ pending_pick *pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
@@ -328,10 +243,13 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown");
- for (i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = p->subchannels[i];
- grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
- &sd->connectivity_changed_closure);
+ for (size_t i = 0; i < p->num_subchannels; i++) {
+ subchannel_data *sd = &p->subchannels[i];
+ if (sd->subchannel != NULL) {
+ grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL,
+ NULL,
+ &sd->connectivity_changed_closure);
+ }
}
}
@@ -339,8 +257,7 @@ static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target,
grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- pp = p->pending_picks;
+ pending_pick *pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
@@ -364,8 +281,7 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_eq,
grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- pp = p->pending_picks;
+ pending_pick *pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
@@ -387,21 +303,16 @@ static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void start_picking_locked(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p) {
- size_t i;
- p->started_picking = 1;
-
- for (i = 0; i < p->num_subchannels; i++) {
- subchannel_data *sd = p->subchannels[i];
- /* use some sentinel value outside of the range of grpc_connectivity_state
- * to signal an undefined previous state. We won't be referring to this
- * value again and it'll be overwritten after the first call to
- * rr_connectivity_changed */
- sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
- sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
- GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
+ p->started_picking = true;
+ for (size_t i = 0; i < p->num_subchannels; i++) {
+ subchannel_data *sd = &p->subchannels[i];
+ if (sd->subchannel != NULL) {
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
+ }
}
}
@@ -418,36 +329,32 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- pending_pick *pp;
- ready_list *selected;
-
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol);
}
-
- if ((selected = peek_next_connected_locked(p))) {
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ if (next_ready_index < p->num_subchannels) {
/* readily available, report right away */
+ subchannel_data *sd = &p->subchannels[next_ready_index];
*target = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(selected->subchannel),
- "rr_picked");
-
+ grpc_subchannel_get_connected_subchannel(sd->subchannel), "rr_picked");
if (user_data != NULL) {
- *user_data = selected->user_data;
+ *user_data = sd->user_data;
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
- (void *)*target, (void *)selected);
+ "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (INDEX %lu)",
+ (void *)*target, (unsigned long)next_ready_index);
}
/* only advance the last picked pointer if the selection was used */
- advance_last_picked_locked(p);
+ update_last_ready_subchannel_index_locked(p, next_ready_index);
return 1;
} else {
/* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
start_picking_locked(exec_ctx, p);
}
- pp = gpr_malloc(sizeof(*pp));
+ pending_pick *pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->target = target;
pp->on_complete = on_complete;
@@ -458,25 +365,31 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
}
-static void update_state_counters(subchannel_data *sd) {
+static void update_state_counters_locked(subchannel_data *sd) {
round_robin_lb_policy *p = sd->policy;
-
- /* update p->num_transient_failures (resp. p->num_idle): if the previous
- * state was TRANSIENT_FAILURE (resp. IDLE), decrement
- * p->num_transient_failures (resp. p->num_idle). */
- if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) {
+ GPR_ASSERT(p->num_ready > 0);
+ --p->num_ready;
+ } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(p->num_transient_failures > 0);
--p->num_transient_failures;
} else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(p->num_idle > 0);
--p->num_idle;
}
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
+ ++p->num_ready;
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ ++p->num_transient_failures;
+ } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) {
+ ++p->num_idle;
+ }
}
/* sd is the subchannel_data associted with the updated subchannel.
* shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE
* or SHUTDOWN */
-static grpc_connectivity_state update_lb_connectivity_status(
+static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
@@ -498,7 +411,7 @@ static grpc_connectivity_state update_lb_connectivity_status(
* CHECK: p->num_idle == p->num_subchannels.
*/
round_robin_lb_policy *p = sd->policy;
- if (!is_ready_list_empty(p)) { /* 1) READY */
+ if (p->num_ready > 0) { /* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
return GRPC_CHANNEL_READY;
@@ -532,32 +445,62 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy;
- pending_pick *pp;
-
- GRPC_ERROR_REF(error);
-
+ // Now that we're inside the combiner, copy the pending connectivity
+ // state (which was set by the connectivity state watcher) to
+ // curr_connectivity_state, which is what we use inside of the combiner.
+ sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
+ if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
+ gpr_log(GPR_DEBUG,
+ "[RR %p] connectivity changed for subchannel %p: "
+ "prev_state=%d new_state=%d",
+ p, sd->subchannel, sd->prev_connectivity_state,
+ sd->curr_connectivity_state);
+ }
+ // If we're shutting down, unref and return.
if (p->shutdown) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- GRPC_ERROR_UNREF(error);
return;
}
- switch (sd->curr_connectivity_state) {
- case GRPC_CHANNEL_INIT:
- GPR_UNREACHABLE_CODE(return );
- case GRPC_CHANNEL_READY:
- /* add the newly connected subchannel to the list of connected ones.
- * Note that it goes to the "end of the line". */
- sd->ready_list_node = add_connected_sc_locked(p, sd);
+ // Update state counters and determine new overall state.
+ update_state_counters_locked(sd);
+ sd->prev_connectivity_state = sd->curr_connectivity_state;
+ grpc_connectivity_state new_connectivity_state =
+ update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
+ // If the new state is SHUTDOWN, unref the subchannel, and if the new
+ // overall state is SHUTDOWN, clean up.
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
+ sd->subchannel = NULL;
+ if (sd->user_data != NULL) {
+ GPR_ASSERT(sd->user_data_vtable != NULL);
+ sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
+ }
+ if (new_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+ /* the policy is shutting down. Flush all the pending picks... */
+ pending_pick *pp;
+ while ((pp = p->pending_picks)) {
+ p->pending_picks = pp->next;
+ *pp->target = NULL;
+ grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
+ gpr_free(pp);
+ }
+ }
+ /* unref the "rr_connectivity" weak ref from start_picking */
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
+ } else {
+ if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
- ready_list *selected = peek_next_connected_locked(p);
- GPR_ASSERT(selected != NULL);
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ GPR_ASSERT(next_ready_index < p->num_subchannels);
+ subchannel_data *selected = &p->subchannels[next_ready_index];
if (p->pending_picks != NULL) {
/* if the selected subchannel is going to be used for the pending
* picks, update the last picked pointer */
- advance_last_picked_locked(p);
+ update_last_ready_subchannel_index_locked(p, next_ready_index);
}
+ pending_pick *pp;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
@@ -568,72 +511,20 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_DEBUG,
- "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
- (void *)selected->subchannel, (void *)selected);
+ "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (INDEX %lu)",
+ (void *)selected->subchannel,
+ (unsigned long)next_ready_index);
}
grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
gpr_free(pp);
}
- update_lb_connectivity_status(exec_ctx, sd, error);
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- /* renew notification: reuses the "rr_connectivity" weak ref */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_IDLE:
- ++p->num_idle;
- /* fallthrough */
- case GRPC_CHANNEL_CONNECTING:
- update_state_counters(sd);
- update_lb_connectivity_status(exec_ctx, sd, error);
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- /* renew notification: reuses the "rr_connectivity" weak ref */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- ++p->num_transient_failures;
- /* remove from ready list if still present */
- if (sd->ready_list_node != NULL) {
- remove_disconnected_sc_locked(p, sd->ready_list_node);
- sd->ready_list_node = NULL;
- }
- update_lb_connectivity_status(exec_ctx, sd, error);
- sd->prev_connectivity_state = sd->curr_connectivity_state;
- /* renew notification: reuses the "rr_connectivity" weak ref */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_SHUTDOWN:
- update_state_counters(sd);
- if (sd->ready_list_node != NULL) {
- remove_disconnected_sc_locked(p, sd->ready_list_node);
- sd->ready_list_node = NULL;
- }
- --p->num_subchannels;
- GPR_SWAP(subchannel_data *, p->subchannels[sd->index],
- p->subchannels[p->num_subchannels]);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
- p->subchannels[sd->index]->index = sd->index;
- if (update_lb_connectivity_status(exec_ctx, sd, error) ==
- GRPC_CHANNEL_SHUTDOWN) {
- /* the policy is shutting down. Flush all the pending picks... */
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = NULL;
- grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
- gpr_free(pp);
- }
- }
- gpr_free(sd);
- /* unref the "rr_connectivity" weak ref from start_picking */
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
- break;
+ }
+ /* renew notification: reuses the "rr_connectivity" weak ref */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->pending_connectivity_state_unsafe,
+ &sd->connectivity_changed_closure);
}
- GRPC_ERROR_UNREF(error);
}
static grpc_connectivity_state rr_check_connectivity_locked(
@@ -654,10 +545,10 @@ static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- ready_list *selected;
- grpc_connected_subchannel *target;
- if ((selected = peek_next_connected_locked(p))) {
- target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
+ if (next_ready_index < p->num_subchannels) {
+ subchannel_data *selected = &p->subchannels[next_ready_index];
+ grpc_connected_subchannel *target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel),
"rr_picked");
grpc_connected_subchannel_ping(exec_ctx, target, closure);
@@ -708,7 +599,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args;
- size_t subchannel_idx = 0;
+ size_t subchannel_index = 0;
for (size_t i = 0; i < addresses->num_addresses; i++) {
/* Skip balancer addresses, since we only know how to handle backends. */
if (addresses->addresses[i].is_balancer) continue;
@@ -727,42 +618,44 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
char *address_uri =
grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- gpr_log(GPR_DEBUG, "Created subchannel %p for address uri %s",
- (void *)subchannel, address_uri);
+ gpr_log(GPR_DEBUG, "index %lu: Created subchannel %p for address uri %s",
+ (unsigned long)subchannel_index, (void *)subchannel, address_uri);
gpr_free(address_uri);
}
grpc_channel_args_destroy(exec_ctx, new_args);
if (subchannel != NULL) {
- subchannel_data *sd = gpr_zalloc(sizeof(*sd));
- p->subchannels[subchannel_idx] = sd;
+ subchannel_data *sd = &p->subchannels[subchannel_index];
sd->policy = p;
- sd->index = subchannel_idx;
sd->subchannel = subchannel;
+ /* use some sentinel value outside of the range of grpc_connectivity_state
+ * to signal an undefined previous state. We won't be referring to this
+ * value again and it'll be overwritten after the first call to
+ * rr_connectivity_changed */
+ sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
+ sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
sd->user_data_vtable = addresses->user_data_vtable;
if (sd->user_data_vtable != NULL) {
sd->user_data =
sd->user_data_vtable->copy(addresses->addresses[i].user_data);
}
- ++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed_locked, sd,
grpc_combiner_scheduler(args->combiner));
+ ++subchannel_index;
}
}
- if (subchannel_idx == 0) {
+ if (subchannel_index == 0) {
/* couldn't create any subchannel. Bail out */
gpr_free(p->subchannels);
gpr_free(p);
return NULL;
}
- p->num_subchannels = subchannel_idx;
+ p->num_subchannels = subchannel_index;
- /* The (dummy node) root of the ready list */
- p->ready_list.subchannel = NULL;
- p->ready_list.prev = NULL;
- p->ready_list.next = NULL;
- p->ready_list_last_pick = &p->ready_list;
+ // Initialize the last pick index to the last subchannel, so that the
+ // first pick will start at the beginning of the list.
+ p->last_ready_subchannel_index = subchannel_index - 1;
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
diff --git a/src/core/ext/filters/client_channel/parse_address.c b/src/core/ext/filters/client_channel/parse_address.c
index edc6ce697d..18381eec55 100644
--- a/src/core/ext/filters/client_channel/parse_address.c
+++ b/src/core/ext/filters/client_channel/parse_address.c
@@ -57,11 +57,11 @@ bool grpc_parse_unix(const grpc_uri *uri,
struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr;
const size_t maxlen = sizeof(un->sun_path);
const size_t path_len = strnlen(uri->path, maxlen);
- if (path_len == maxlen) return 0;
+ if (path_len == maxlen) return false;
un->sun_family = AF_UNIX;
strcpy(un->sun_path, uri->path);
resolved_addr->len = sizeof(*un);
- return 1;
+ return true;
}
#else /* GRPC_HAVE_UNIX_SOCKET */
@@ -73,74 +73,65 @@ bool grpc_parse_unix(const grpc_uri *uri,
#endif /* GRPC_HAVE_UNIX_SOCKET */
-bool grpc_parse_ipv4(const grpc_uri *uri,
- grpc_resolved_address *resolved_addr) {
- if (strcmp("ipv4", uri->scheme) != 0) {
- gpr_log(GPR_ERROR, "Expected 'ipv4' scheme, got '%s'", uri->scheme);
- return false;
- }
- const char *host_port = uri->path;
+bool grpc_parse_ipv4_hostport(const char *hostport, grpc_resolved_address *addr,
+ bool log_errors) {
+ bool success = false;
+ // Split host and port.
char *host;
char *port;
- int port_num;
- bool result = false;
- struct sockaddr_in *in = (struct sockaddr_in *)resolved_addr->addr;
-
- if (*host_port == '/') ++host_port;
- if (!gpr_split_host_port(host_port, &host, &port)) {
- return false;
- }
-
- memset(resolved_addr, 0, sizeof(grpc_resolved_address));
- resolved_addr->len = sizeof(struct sockaddr_in);
+ if (!gpr_split_host_port(hostport, &host, &port)) return false;
+ // Parse IP address.
+ memset(addr, 0, sizeof(*addr));
+ addr->len = sizeof(struct sockaddr_in);
+ struct sockaddr_in *in = (struct sockaddr_in *)addr->addr;
in->sin_family = AF_INET;
if (inet_pton(AF_INET, host, &in->sin_addr) == 0) {
- gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host);
+ if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host);
goto done;
}
-
- if (port != NULL) {
- if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 ||
- port_num > 65535) {
- gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port);
- goto done;
- }
- in->sin_port = htons((uint16_t)port_num);
- } else {
- gpr_log(GPR_ERROR, "no port given for ipv4 scheme");
+ // Parse port.
+ if (port == NULL) {
+ if (log_errors) gpr_log(GPR_ERROR, "no port given for ipv4 scheme");
goto done;
}
-
- result = true;
+ int port_num;
+ if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || port_num > 65535) {
+ if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port);
+ goto done;
+ }
+ in->sin_port = htons((uint16_t)port_num);
+ success = true;
done:
gpr_free(host);
gpr_free(port);
- return result;
+ return success;
}
-bool grpc_parse_ipv6(const grpc_uri *uri,
+bool grpc_parse_ipv4(const grpc_uri *uri,
grpc_resolved_address *resolved_addr) {
- if (strcmp("ipv6", uri->scheme) != 0) {
- gpr_log(GPR_ERROR, "Expected 'ipv6' scheme, got '%s'", uri->scheme);
+ if (strcmp("ipv4", uri->scheme) != 0) {
+ gpr_log(GPR_ERROR, "Expected 'ipv4' scheme, got '%s'", uri->scheme);
return false;
}
const char *host_port = uri->path;
- char *host;
- char *port;
- int port_num;
- int result = 0;
- struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)resolved_addr->addr;
-
if (*host_port == '/') ++host_port;
- if (!gpr_split_host_port(host_port, &host, &port)) {
- return 0;
- }
+ return grpc_parse_ipv4_hostport(host_port, resolved_addr,
+ true /* log_errors */);
+}
- memset(in6, 0, sizeof(*in6));
- resolved_addr->len = sizeof(*in6);
+bool grpc_parse_ipv6_hostport(const char *hostport, grpc_resolved_address *addr,
+ bool log_errors) {
+ bool success = false;
+ // Split host and port.
+ char *host;
+ char *port;
+ if (!gpr_split_host_port(hostport, &host, &port)) return false;
+ // Parse IP address.
+ memset(addr, 0, sizeof(*addr));
+ addr->len = sizeof(struct sockaddr_in6);
+ struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr->addr;
in6->sin6_family = AF_INET6;
-
- /* Handle the RFC6874 syntax for IPv6 zone identifiers. */
+ // Handle the RFC6874 syntax for IPv6 zone identifiers.
char *host_end = (char *)gpr_memrchr(host, '%', strlen(host));
if (host_end != NULL) {
GPR_ASSERT(host_end >= host);
@@ -159,7 +150,7 @@ bool grpc_parse_ipv6(const grpc_uri *uri,
gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1);
goto done;
}
- // Handle "sin6_scope_id" being type "u_long". See grpc issue ##10027.
+ // Handle "sin6_scope_id" being type "u_long". See grpc issue #10027.
in6->sin6_scope_id = sin6_scope_id;
} else {
if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) {
@@ -167,24 +158,34 @@ bool grpc_parse_ipv6(const grpc_uri *uri,
goto done;
}
}
-
- if (port != NULL) {
- if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 ||
- port_num > 65535) {
- gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port);
- goto done;
- }
- in6->sin6_port = htons((uint16_t)port_num);
- } else {
- gpr_log(GPR_ERROR, "no port given for ipv6 scheme");
+ // Parse port.
+ if (port == NULL) {
+ if (log_errors) gpr_log(GPR_ERROR, "no port given for ipv6 scheme");
goto done;
}
-
- result = 1;
+ int port_num;
+ if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || port_num > 65535) {
+ if (log_errors) gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port);
+ goto done;
+ }
+ in6->sin6_port = htons((uint16_t)port_num);
+ success = true;
done:
gpr_free(host);
gpr_free(port);
- return result;
+ return success;
+}
+
+bool grpc_parse_ipv6(const grpc_uri *uri,
+ grpc_resolved_address *resolved_addr) {
+ if (strcmp("ipv6", uri->scheme) != 0) {
+ gpr_log(GPR_ERROR, "Expected 'ipv6' scheme, got '%s'", uri->scheme);
+ return false;
+ }
+ const char *host_port = uri->path;
+ if (*host_port == '/') ++host_port;
+ return grpc_parse_ipv6_hostport(host_port, resolved_addr,
+ true /* log_errors */);
}
bool grpc_parse_uri(const grpc_uri *uri, grpc_resolved_address *resolved_addr) {
diff --git a/src/core/ext/filters/client_channel/parse_address.h b/src/core/ext/filters/client_channel/parse_address.h
index fa7ea33a00..1a203a3b26 100644
--- a/src/core/ext/filters/client_channel/parse_address.h
+++ b/src/core/ext/filters/client_channel/parse_address.h
@@ -54,4 +54,10 @@ bool grpc_parse_ipv6(const grpc_uri *uri, grpc_resolved_address *resolved_addr);
/** Populate \a resolved_addr from \a uri. Returns true upon success. */
bool grpc_parse_uri(const grpc_uri *uri, grpc_resolved_address *resolved_addr);
+/** Parse bare IPv4 or IPv6 "IP:port" strings. */
+bool grpc_parse_ipv4_hostport(const char *hostport, grpc_resolved_address *addr,
+ bool log_errors);
+bool grpc_parse_ipv6_hostport(const char *hostport, grpc_resolved_address *addr,
+ bool log_errors);
+
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_PARSE_ADDRESS_H */
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
index 656c3a5860..861b809b19 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
@@ -61,6 +61,8 @@
typedef struct {
/** base class: must be first */
grpc_resolver base;
+ /** DNS server to use (if not system default) */
+ char *dns_server;
/** name to resolve (usually the same as target_name) */
char *name_to_resolve;
/** default port to use */
@@ -172,6 +174,8 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_resolved_addresses_destroy(r->addresses);
grpc_lb_addresses_destroy(exec_ctx, addresses);
} else {
+ const char *msg = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
gpr_timespec timeout = gpr_time_sub(next_try, now);
@@ -221,9 +225,9 @@ static void dns_ares_start_resolving_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(!r->resolving);
r->resolving = true;
r->addresses = NULL;
- grpc_resolve_address(exec_ctx, r->name_to_resolve, r->default_port,
- r->interested_parties, &r->dns_ares_on_resolved_locked,
- &r->addresses);
+ grpc_dns_lookup_ares(exec_ctx, r->dns_server, r->name_to_resolve,
+ r->default_port, r->interested_parties,
+ &r->dns_ares_on_resolved_locked, &r->addresses);
}
static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
@@ -246,6 +250,7 @@ static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
grpc_channel_args_destroy(exec_ctx, r->resolved_result);
}
grpc_pollset_set_destroy(exec_ctx, r->interested_parties);
+ gpr_free(r->dns_server);
gpr_free(r->name_to_resolve);
gpr_free(r->default_port);
grpc_channel_args_destroy(exec_ctx, r->channel_args);
@@ -257,14 +262,13 @@ static grpc_resolver *dns_ares_create(grpc_exec_ctx *exec_ctx,
const char *default_port) {
// Get name from args.
const char *path = args->uri->path;
- if (0 != strcmp(args->uri->authority, "")) {
- gpr_log(GPR_ERROR, "authority based dns uri's not supported");
- return NULL;
- }
if (path[0] == '/') ++path;
// Create resolver.
ares_dns_resolver *r = gpr_zalloc(sizeof(ares_dns_resolver));
grpc_resolver_init(&r->base, &dns_ares_resolver_vtable, args->combiner);
+ if (0 != strcmp(args->uri->authority, "")) {
+ r->dns_server = gpr_strdup(args->uri->authority);
+ }
r->name_to_resolve = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
r->channel_args = grpc_channel_args_copy(args->args);
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
index 09c46a66e0..e0cfd8b629 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
@@ -48,7 +48,10 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
+
+#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
+#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -58,6 +61,8 @@ static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
typedef struct grpc_ares_request {
+ /** indicates the DNS server to use, if specified */
+ struct ares_addr_port_node dns_server_addr;
/** following members are set in grpc_resolve_address_ares_impl */
/** host to resolve, parsed from the name to resolve */
char *host;
@@ -192,11 +197,12 @@ static void on_done_cb(void *arg, int status, int timeouts,
grpc_ares_request_unref(NULL, r);
}
-void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
- const char *default_port,
- grpc_pollset_set *interested_parties,
- grpc_closure *on_done,
- grpc_resolved_addresses **addrs) {
+void grpc_dns_lookup_ares(grpc_exec_ctx *exec_ctx, const char *dns_server,
+ const char *name, const char *default_port,
+ grpc_pollset_set *interested_parties,
+ grpc_closure *on_done,
+ grpc_resolved_addresses **addrs) {
+ grpc_error *error = GRPC_ERROR_NONE;
/* TODO(zyc): Enable tracing after #9603 is checked in */
/* if (grpc_dns_trace) {
gpr_log(GPR_DEBUG, "resolve_address (blocking): name=%s, default_port=%s",
@@ -208,28 +214,23 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
char *port;
gpr_split_host_port(name, &host, &port);
if (host == NULL) {
- grpc_error *err = grpc_error_set_str(
+ error = grpc_error_set_str(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("unparseable host:port"),
GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name));
- grpc_closure_sched(exec_ctx, on_done, err);
goto error_cleanup;
} else if (port == NULL) {
if (default_port == NULL) {
- grpc_error *err = grpc_error_set_str(
+ error = grpc_error_set_str(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("no port in name"),
GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name));
- grpc_closure_sched(exec_ctx, on_done, err);
goto error_cleanup;
}
port = gpr_strdup(default_port);
}
grpc_ares_ev_driver *ev_driver;
- grpc_error *err = grpc_ares_ev_driver_create(&ev_driver, interested_parties);
- if (err != GRPC_ERROR_NONE) {
- GRPC_LOG_IF_ERROR("grpc_ares_ev_driver_create() failed", err);
- goto error_cleanup;
- }
+ error = grpc_ares_ev_driver_create(&ev_driver, interested_parties);
+ if (error != GRPC_ERROR_NONE) goto error_cleanup;
grpc_ares_request *r = gpr_malloc(sizeof(grpc_ares_request));
gpr_mu_init(&r->mu);
@@ -242,6 +243,40 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
r->success = false;
r->error = GRPC_ERROR_NONE;
ares_channel *channel = grpc_ares_ev_driver_get_channel(r->ev_driver);
+
+ // If dns_server is specified, use it.
+ if (dns_server != NULL) {
+ gpr_log(GPR_INFO, "Using DNS server %s", dns_server);
+ grpc_resolved_address addr;
+ if (grpc_parse_ipv4_hostport(dns_server, &addr, false /* log_errors */)) {
+ r->dns_server_addr.family = AF_INET;
+ memcpy(&r->dns_server_addr.addr.addr4, addr.addr, addr.len);
+ r->dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr);
+ r->dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr);
+ } else if (grpc_parse_ipv6_hostport(dns_server, &addr,
+ false /* log_errors */)) {
+ r->dns_server_addr.family = AF_INET6;
+ memcpy(&r->dns_server_addr.addr.addr6, addr.addr, addr.len);
+ r->dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr);
+ r->dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr);
+ } else {
+ error = grpc_error_set_str(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("cannot parse authority"),
+ GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(name));
+ goto error_cleanup;
+ }
+ int status = ares_set_servers_ports(*channel, &r->dns_server_addr);
+ if (status != ARES_SUCCESS) {
+ char *error_msg;
+ gpr_asprintf(&error_msg, "C-ares status is not ARES_SUCCESS: %s",
+ ares_strerror(status));
+ error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg);
+ gpr_free(error_msg);
+ goto error_cleanup;
+ }
+ }
+ // An extra reference is put here to avoid destroying the request in
+ // on_done_cb before calling grpc_ares_ev_driver_start.
gpr_ref_init(&r->pending_queries, 2);
if (grpc_ipv6_loopback_available()) {
gpr_ref(&r->pending_queries);
@@ -254,10 +289,20 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
return;
error_cleanup:
+ grpc_closure_sched(exec_ctx, on_done, error);
gpr_free(host);
gpr_free(port);
}
+void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port,
+ grpc_pollset_set *interested_parties,
+ grpc_closure *on_done,
+ grpc_resolved_addresses **addrs) {
+ grpc_dns_lookup_ares(exec_ctx, NULL /* dns_server */, name, default_port,
+ interested_parties, on_done, addrs);
+}
+
void (*grpc_resolve_address_ares)(
grpc_exec_ctx *exec_ctx, const char *name, const char *default_port,
grpc_pollset_set *interested_parties, grpc_closure *on_done,
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
index 3dd40ea268..84fd7fcbd6 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -51,6 +51,12 @@ extern void (*grpc_resolve_address_ares)(grpc_exec_ctx *exec_ctx,
grpc_closure *on_done,
grpc_resolved_addresses **addresses);
+void grpc_dns_lookup_ares(grpc_exec_ctx *exec_ctx, const char *dns_server,
+ const char *addr, const char *default_port,
+ grpc_pollset_set *interested_parties,
+ grpc_closure *on_done,
+ grpc_resolved_addresses **addresses);
+
/* Initialize gRPC ares wrapper. Must be called at least once before
grpc_resolve_address_ares(). */
grpc_error *grpc_ares_init(void);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 10b187496e..f112ce1b76 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -2109,15 +2109,8 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
double bdp_dbl) {
- int32_t bdp;
- const int32_t kMinBDP = 128;
- if (bdp_dbl <= kMinBDP) {
- bdp = kMinBDP;
- } else if (bdp_dbl > INT32_MAX) {
- bdp = INT32_MAX;
- } else {
- bdp = (int32_t)(bdp_dbl);
- }
+ // initial window size bounded [1,2^31-1], but we set the min to 128.
+ int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX);
int64_t delta =
(int64_t)bdp -
(int64_t)t->settings[GRPC_LOCAL_SETTINGS]
@@ -2131,7 +2124,26 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
(uint32_t)bdp);
- push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, (uint32_t)bdp);
+}
+
+static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ double bw_dbl, double bdp_dbl) {
+ int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX);
+ int32_t target = GPR_MAX((int32_t)bw_dbl / 1000, bdp);
+ // frame size is bounded [2^14,2^24-1]
+ int32_t frame_size = GPR_CLAMP(target, 16384, 16777215);
+ int64_t delta = (int64_t)frame_size -
+ (int64_t)t->settings[GRPC_LOCAL_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
+ if (delta == 0 || (delta > -frame_size / 10 && delta < frame_size / 10)) {
+ return;
+ }
+ if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) {
+ gpr_log(GPR_DEBUG, "%s: update max_frame size to %d", t->peer_string,
+ (int)frame_size);
+ }
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ (uint32_t)frame_size);
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@@ -2250,6 +2262,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
}
int64_t estimate = -1;
+ double bdp_guess = -1;
if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) {
double target = 1 + log2((double)estimate);
double memory_pressure = grpc_resource_quota_get_memory_pressure(
@@ -2267,9 +2280,15 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
}
double log2_bdp_guess =
grpc_pid_controller_update(&t->pid_controller, bdp_error, dt);
- update_bdp(exec_ctx, t, pow(2, log2_bdp_guess));
+ bdp_guess = pow(2, log2_bdp_guess);
+ update_bdp(exec_ctx, t, bdp_guess);
t->last_pid_update = now;
}
+
+ double bw = -1;
+ if (grpc_bdp_estimator_get_bw(&t->bdp_estimator, &bw)) {
+ update_frame(exec_ctx, t, bw, bdp_guess);
+ }
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
diff --git a/src/core/lib/support/cmdline.c b/src/core/lib/support/cmdline.c
index 88a65a8e2e..e5c9f3b84b 100644
--- a/src/core/lib/support/cmdline.c
+++ b/src/core/lib/support/cmdline.c
@@ -71,7 +71,7 @@ struct gpr_cmdline {
static int normal_state(gpr_cmdline *cl, char *arg);
gpr_cmdline *gpr_cmdline_create(const char *description) {
- gpr_cmdline *cl = gpr_zalloc(sizeof(gpr_cmdline));
+ gpr_cmdline *cl = (gpr_cmdline *)gpr_zalloc(sizeof(gpr_cmdline));
cl->description = description;
cl->state = normal_state;
@@ -100,7 +100,7 @@ static void add_arg(gpr_cmdline *cl, const char *name, const char *help,
GPR_ASSERT(0 != strcmp(a->name, name));
}
- a = gpr_zalloc(sizeof(arg));
+ a = (arg *)gpr_zalloc(sizeof(arg));
a->name = name;
a->help = help;
a->type = type;
@@ -302,7 +302,7 @@ static int normal_state(gpr_cmdline *cl, char *str) {
eq = strchr(str, '=');
if (eq != NULL) {
/* copy the string into a temp buffer and extract the name */
- tmp = arg_name = gpr_malloc((size_t)(eq - str + 1));
+ tmp = arg_name = (char *)gpr_malloc((size_t)(eq - str + 1));
memcpy(arg_name, str, (size_t)(eq - str));
arg_name[eq - str] = 0;
} else {
diff --git a/src/core/lib/support/histogram.c b/src/core/lib/support/histogram.c
index ba8176bb05..c88695409d 100644
--- a/src/core/lib/support/histogram.c
+++ b/src/core/lib/support/histogram.c
@@ -88,7 +88,7 @@ static double bucket_start(gpr_histogram *h, double x) {
gpr_histogram *gpr_histogram_create(double resolution,
double max_bucket_start) {
- gpr_histogram *h = gpr_malloc(sizeof(gpr_histogram));
+ gpr_histogram *h = (gpr_histogram *)gpr_malloc(sizeof(gpr_histogram));
GPR_ASSERT(resolution > 0.0);
GPR_ASSERT(max_bucket_start > resolution);
h->sum = 0.0;
@@ -102,7 +102,7 @@ gpr_histogram *gpr_histogram_create(double resolution,
h->num_buckets = bucket_for_unchecked(h, max_bucket_start) + 1;
GPR_ASSERT(h->num_buckets > 1);
GPR_ASSERT(h->num_buckets < 100000000);
- h->buckets = gpr_zalloc(sizeof(uint32_t) * h->num_buckets);
+ h->buckets = (uint32_t *)gpr_zalloc(sizeof(uint32_t) * h->num_buckets);
return h;
}
diff --git a/src/core/lib/support/host_port.c b/src/core/lib/support/host_port.c
index f19bdbc835..bbd42c26e0 100644
--- a/src/core/lib/support/host_port.c
+++ b/src/core/lib/support/host_port.c
@@ -98,7 +98,7 @@ int gpr_split_host_port(const char *name, char **host, char **port) {
}
/* Allocate return values. */
- *host = gpr_malloc(host_len + 1);
+ *host = (char *)gpr_malloc(host_len + 1);
memcpy(*host, host_start, host_len);
(*host)[host_len] = '\0';
diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c
index d20b86f7cf..11297c9ddb 100644
--- a/src/core/lib/support/string.c
+++ b/src/core/lib/support/string.c
@@ -53,7 +53,7 @@ char *gpr_strdup(const char *src) {
}
len = strlen(src) + 1;
- dst = gpr_malloc(len);
+ dst = (char *)gpr_malloc(len);
memcpy(dst, src, len);
@@ -74,13 +74,13 @@ static dump_out dump_out_create(void) {
static void dump_out_append(dump_out *out, char c) {
if (out->length == out->capacity) {
out->capacity = GPR_MAX(8, 2 * out->capacity);
- out->data = gpr_realloc(out->data, out->capacity);
+ out->data = (char *)gpr_realloc(out->data, out->capacity);
}
out->data[out->length++] = c;
}
static void hexdump(dump_out *out, const char *buf, size_t len) {
- static const char hex[16] = "0123456789abcdef";
+ static const char *hex = "0123456789abcdef";
const uint8_t *const beg = (const uint8_t *)buf;
const uint8_t *const end = beg + len;
@@ -124,16 +124,16 @@ char *gpr_dump(const char *buf, size_t len, uint32_t flags) {
int gpr_parse_bytes_to_uint32(const char *buf, size_t len, uint32_t *result) {
uint32_t out = 0;
- uint32_t new;
+ uint32_t new_val;
size_t i;
if (len == 0) return 0; /* must have some bytes */
for (i = 0; i < len; i++) {
if (buf[i] < '0' || buf[i] > '9') return 0; /* bad char */
- new = 10 * out + (uint32_t)(buf[i] - '0');
- if (new < out) return 0; /* overflow */
- out = new;
+ new_val = 10 * out + (uint32_t)(buf[i] - '0');
+ if (new_val < out) return 0; /* overflow */
+ out = new_val;
}
*result = out;
@@ -201,7 +201,7 @@ int gpr_parse_nonnegative_int(const char *value) {
char *gpr_leftpad(const char *str, char flag, size_t length) {
const size_t str_length = strlen(str);
const size_t out_length = str_length > length ? str_length : length;
- char *out = gpr_malloc(out_length + 1);
+ char *out = (char *)gpr_malloc(out_length + 1);
memset(out, flag, out_length - str_length);
memcpy(out + out_length - str_length, str, str_length);
out[out_length] = 0;
@@ -225,7 +225,7 @@ char *gpr_strjoin_sep(const char **strs, size_t nstrs, const char *sep,
if (nstrs > 0) {
out_length += sep_len * (nstrs - 1); /* separators */
}
- out = gpr_malloc(out_length);
+ out = (char *)gpr_malloc(out_length);
out_length = 0;
for (i = 0; i < nstrs; i++) {
const size_t slen = strlen(strs[i]);
@@ -256,7 +256,7 @@ void gpr_strvec_destroy(gpr_strvec *sv) {
void gpr_strvec_add(gpr_strvec *sv, char *str) {
if (sv->count == sv->capacity) {
sv->capacity = GPR_MAX(sv->capacity + 8, sv->capacity * 2);
- sv->strs = gpr_realloc(sv->strs, sizeof(char *) * sv->capacity);
+ sv->strs = (char **)gpr_realloc(sv->strs, sizeof(char *) * sv->capacity);
}
sv->strs[sv->count++] = str;
}
@@ -278,12 +278,12 @@ int gpr_stricmp(const char *a, const char *b) {
static void add_string_to_split(const char *beg, const char *end, char ***strs,
size_t *nstrs, size_t *capstrs) {
- char *out = gpr_malloc((size_t)(end - beg) + 1);
+ char *out = (char *)gpr_malloc((size_t)(end - beg) + 1);
memcpy(out, beg, (size_t)(end - beg));
out[end - beg] = 0;
if (*nstrs == *capstrs) {
*capstrs = GPR_MAX(8, 2 * *capstrs);
- *strs = gpr_realloc(*strs, sizeof(*strs) * *capstrs);
+ *strs = (char **)gpr_realloc(*strs, sizeof(*strs) * *capstrs);
}
(*strs)[*nstrs] = out;
++*nstrs;
diff --git a/src/core/lib/support/string_posix.c b/src/core/lib/support/string_posix.c
index c804ed5ded..2438b18d21 100644
--- a/src/core/lib/support/string_posix.c
+++ b/src/core/lib/support/string_posix.c
@@ -58,7 +58,7 @@ int gpr_asprintf(char **strp, const char *format, ...) {
/* Allocate a new buffer, with space for the NUL terminator. */
strp_buflen = (size_t)ret + 1;
- if ((*strp = gpr_malloc(strp_buflen)) == NULL) {
+ if ((*strp = (char *)gpr_malloc(strp_buflen)) == NULL) {
/* This shouldn't happen, because gpr_malloc() calls abort(). */
return -1;
}
diff --git a/src/core/lib/support/subprocess_posix.c b/src/core/lib/support/subprocess_posix.c
index ed653b9c2e..b9d0796b01 100644
--- a/src/core/lib/support/subprocess_posix.c
+++ b/src/core/lib/support/subprocess_posix.c
@@ -67,7 +67,7 @@ gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) {
if (pid == -1) {
return NULL;
} else if (pid == 0) {
- exec_args = gpr_malloc(((size_t)argc + 1) * sizeof(char *));
+ exec_args = (char **)gpr_malloc(((size_t)argc + 1) * sizeof(char *));
memcpy(exec_args, argv, (size_t)argc * sizeof(char *));
exec_args[argc] = NULL;
execv(exec_args[0], exec_args);
@@ -76,7 +76,7 @@ gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) {
_exit(1);
return NULL;
} else {
- r = gpr_zalloc(sizeof(gpr_subprocess));
+ r = (gpr_subprocess *)gpr_zalloc(sizeof(gpr_subprocess));
r->pid = pid;
return r;
}
diff --git a/src/core/lib/support/thd_posix.c b/src/core/lib/support/thd_posix.c
index 2fc23bffaf..16e645ad91 100644
--- a/src/core/lib/support/thd_posix.c
+++ b/src/core/lib/support/thd_posix.c
@@ -65,7 +65,7 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
pthread_t p;
/* don't use gpr_malloc as we may cause an infinite recursion with
* the profiling code */
- struct thd_arg *a = malloc(sizeof(*a));
+ struct thd_arg *a = (struct thd_arg *)malloc(sizeof(*a));
GPR_ASSERT(a != NULL);
a->body = thd_body;
a->arg = arg;
diff --git a/src/core/lib/transport/bdp_estimator.c b/src/core/lib/transport/bdp_estimator.c
index da8019d98b..e3a82b492a 100644
--- a/src/core/lib/transport/bdp_estimator.c
+++ b/src/core/lib/transport/bdp_estimator.c
@@ -53,6 +53,11 @@ bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
return true;
}
+bool grpc_bdp_estimator_get_bw(grpc_bdp_estimator *estimator, double *bw) {
+ *bw = estimator->bw_est;
+ return true;
+}
+
bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes) {
estimator->accumulator += num_bytes;
diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h
index 135376edd7..b9a7fc84bb 100644
--- a/src/core/lib/transport/bdp_estimator.h
+++ b/src/core/lib/transport/bdp_estimator.h
@@ -64,6 +64,8 @@ void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name);
// Returns true if a reasonable estimate could be obtained
bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator,
int64_t *estimate);
+// Returns true if a reasonable estimate could be obtained
+bool grpc_bdp_estimator_get_bw(grpc_bdp_estimator *estimator, double *bw);
// Returns true if the user should schedule a ping
bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator,
int64_t num_bytes);
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 6dca6a6862..c849d2e5b4 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -180,6 +180,10 @@ ServerBuilder& ServerBuilder::AddListeningPort(
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ (*plugin)->UpdateServerBuilder(this);
+ }
+
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
(*option)->UpdateArguments(&args);
diff --git a/src/csharp/Grpc.Core/Internal/CallError.cs b/src/csharp/Grpc.Core/Internal/CallError.cs
index 541575f5e6..a46f9b3859 100644
--- a/src/csharp/Grpc.Core/Internal/CallError.cs
+++ b/src/csharp/Grpc.Core/Internal/CallError.cs
@@ -72,7 +72,10 @@ namespace Grpc.Core.Internal
/// </summary>
public static void CheckOk(this CallError callError)
{
- GrpcPreconditions.CheckState(callError == CallError.OK, "Call error: " + callError);
+ if (callError != CallError.OK)
+ {
+ throw new InvalidOperationException("Call error: " + callError);
+ }
}
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
index c9f7c42b71..8a44f8d68f 100644
--- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
@@ -140,7 +140,8 @@ namespace Grpc.IntegrationTesting
readonly ClientType clientType;
readonly RpcType rpcType;
readonly PayloadConfig payloadConfig;
- readonly Histogram histogram;
+ readonly Lazy<byte[]> cachedByteBufferRequest;
+ readonly ThreadLocal<Histogram> threadLocalHistogram;
readonly List<Task> runnerTasks;
readonly CancellationTokenSource stoppedCts = new CancellationTokenSource();
@@ -155,7 +156,8 @@ namespace Grpc.IntegrationTesting
this.clientType = clientType;
this.rpcType = rpcType;
this.payloadConfig = payloadConfig;
- this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible);
+ this.cachedByteBufferRequest = new Lazy<byte[]>(() => new byte[payloadConfig.BytebufParams.ReqSize]);
+ this.threadLocalHistogram = new ThreadLocal<Histogram>(() => new Histogram(histogramParams.Resolution, histogramParams.MaxPossible), true);
this.runnerTasks = new List<Task>();
foreach (var channel in this.channels)
@@ -171,7 +173,12 @@ namespace Grpc.IntegrationTesting
public ClientStats GetStats(bool reset)
{
- var histogramData = histogram.GetSnapshot(reset);
+ var histogramData = new HistogramData();
+ foreach (var hist in threadLocalHistogram.Values)
+ {
+ hist.GetSnapshot(histogramData, reset);
+ }
+
var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds;
if (reset)
@@ -232,7 +239,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop();
// spec requires data point in nanoseconds.
- histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
timer.WaitForNext();
}
@@ -251,7 +258,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop();
// spec requires data point in nanoseconds.
- histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
}
@@ -273,7 +280,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop();
// spec requires data point in nanoseconds.
- histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
}
@@ -286,7 +293,7 @@ namespace Grpc.IntegrationTesting
private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer)
{
- var request = CreateByteBufferRequest();
+ var request = cachedByteBufferRequest.Value;
var stopwatch = new Stopwatch();
var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions());
@@ -301,7 +308,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop();
// spec requires data point in nanoseconds.
- histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
}
@@ -351,11 +358,6 @@ namespace Grpc.IntegrationTesting
};
}
- private byte[] CreateByteBufferRequest()
- {
- return new byte[payloadConfig.BytebufParams.ReqSize];
- }
-
private static Payload CreateZerosPayload(int size)
{
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
diff --git a/src/csharp/Grpc.IntegrationTesting/Histogram.cs b/src/csharp/Grpc.IntegrationTesting/Histogram.cs
index 28d1f078a9..9d33c497e6 100644
--- a/src/csharp/Grpc.IntegrationTesting/Histogram.cs
+++ b/src/csharp/Grpc.IntegrationTesting/Histogram.cs
@@ -84,15 +84,27 @@ namespace Grpc.IntegrationTesting
}
}
-
/// <summary>
- /// Gets snapshot of stats and reset
+ /// Gets snapshot of stats and optionally resets the histogram.
/// </summary>
public HistogramData GetSnapshot(bool reset = false)
{
lock (myLock)
{
- return GetSnapshotUnsafe(reset);
+ var histogramData = new HistogramData();
+ GetSnapshotUnsafe(histogramData, reset);
+ return histogramData;
+ }
+ }
+
+ /// <summary>
+ /// Merges snapshot of stats into <c>mergeTo</c> and optionally resets the histogram.
+ /// </summary>
+ public void GetSnapshot(HistogramData mergeTo, bool reset)
+ {
+ lock (myLock)
+ {
+ GetSnapshotUnsafe(mergeTo, reset);
}
}
@@ -117,24 +129,39 @@ namespace Grpc.IntegrationTesting
this.buckets[FindBucket(value)]++;
}
- private HistogramData GetSnapshotUnsafe(bool reset)
+ private void GetSnapshotUnsafe(HistogramData mergeTo, bool reset)
{
- var data = new HistogramData
+ GrpcPreconditions.CheckArgument(mergeTo.Bucket.Count == 0 || mergeTo.Bucket.Count == buckets.Length);
+ if (mergeTo.Count == 0)
{
- Count = count,
- Sum = sum,
- SumOfSquares = sumOfSquares,
- MinSeen = min,
- MaxSeen = max,
- Bucket = { buckets }
- };
+ mergeTo.MinSeen = min;
+ mergeTo.MaxSeen = max;
+ }
+ else
+ {
+ mergeTo.MinSeen = Math.Min(mergeTo.MinSeen, min);
+ mergeTo.MaxSeen = Math.Max(mergeTo.MaxSeen, max);
+ }
+ mergeTo.Count += count;
+ mergeTo.Sum += sum;
+ mergeTo.SumOfSquares += sumOfSquares;
- if (reset)
+ if (mergeTo.Bucket.Count == 0)
{
- ResetUnsafe();
+ mergeTo.Bucket.AddRange(buckets);
+ }
+ else
+ {
+ for (int i = 0; i < buckets.Length; i++)
+ {
+ mergeTo.Bucket[i] += buckets[i];
+ }
}
- return data;
+ if (reset)
+ {
+ ResetUnsafe();
+ }
}
private void ResetUnsafe()
diff --git a/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs b/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs
index fa160cbd15..e8a2ed0c5b 100644
--- a/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs
@@ -73,7 +73,7 @@ namespace Grpc.IntegrationTesting
{
var hist = new Histogram(0.01, 60e9);
hist.AddObservation(-0.5); // should be in the first bucket
- hist.AddObservation(1e12); // should be in the last bucket
+ hist.AddObservation(1e12); // should be in the last bucket
var data = hist.GetSnapshot();
Assert.AreEqual(1, data.Bucket[0]);
@@ -81,6 +81,30 @@ namespace Grpc.IntegrationTesting
}
[Test]
+ public void MergeSnapshots()
+ {
+ var data = new HistogramData();
+
+ var hist1 = new Histogram(0.01, 60e9);
+ hist1.AddObservation(-0.5); // should be in the first bucket
+ hist1.AddObservation(1e12); // should be in the last bucket
+ hist1.GetSnapshot(data, false);
+
+ var hist2 = new Histogram(0.01, 60e9);
+ hist2.AddObservation(10000);
+ hist2.AddObservation(11000);
+ hist2.GetSnapshot(data, false);
+
+ Assert.AreEqual(4, data.Count);
+ Assert.AreEqual(-0.5, data.MinSeen);
+ Assert.AreEqual(1e12, data.MaxSeen);
+ Assert.AreEqual(1, data.Bucket[0]);
+ Assert.AreEqual(1, data.Bucket[925]);
+ Assert.AreEqual(1, data.Bucket[935]);
+ Assert.AreEqual(1, data.Bucket[data.Bucket.Count - 1]);
+ }
+
+ [Test]
public void Reset()
{
var hist = new Histogram(0.01, 60e9);
diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m
index e36f5c3ee9..deaaf17b07 100644
--- a/src/objective-c/tests/GRPCClientTests.m
+++ b/src/objective-c/tests/GRPCClientTests.m
@@ -293,6 +293,35 @@ static GRPCProtoMethod *kUnaryCallMethod;
[self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
}
+- (void)testTrailers {
+ __weak XCTestExpectation *response = [self expectationWithDescription:@"Empty response received."];
+ __weak XCTestExpectation *completion = [self expectationWithDescription:@"Empty RPC completed."];
+
+ GRPCCall *call = [[GRPCCall alloc] initWithHost:kHostAddress
+ path:kEmptyCallMethod.HTTPPath
+ requestsWriter:[GRXWriter writerWithValue:[NSData data]]];
+ // Setting this special key in the header will cause the interop server to echo back the
+ // trailer data.
+ const unsigned char raw_bytes[] = {1,2,3,4};
+ NSData *trailer_data = [NSData dataWithBytes:raw_bytes length:sizeof(raw_bytes)];
+ call.requestHeaders[@"x-grpc-test-echo-trailing-bin"] = trailer_data;
+
+ id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) {
+ XCTAssertNotNil(value, @"nil value received as response.");
+ XCTAssertEqual([value length], 0, @"Non-empty response received: %@", value);
+ [response fulfill];
+ } completionHandler:^(NSError *errorOrNil) {
+ XCTAssertNil(errorOrNil, @"Finished with unexpected error: %@", errorOrNil);
+ XCTAssertEqualObjects((NSData *)call.responseTrailers[@"x-grpc-test-echo-trailing-bin"],
+ trailer_data,
+ @"Did not receive expected trailer");
+ [completion fulfill];
+ }];
+
+ [call startWithWriteable:responsesWriteable];
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
+}
+
// TODO(makarandd): Move to a different file that contains only unit tests
- (void)testExceptions {
// Try to set parameters to nil for GRPCCall. This should cause an exception
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index 221a1e14ec..8f76420af6 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -270,6 +270,7 @@ gpr_histogram_get_contents_type gpr_histogram_get_contents_import;
gpr_histogram_merge_contents_type gpr_histogram_merge_contents_import;
gpr_join_host_port_type gpr_join_host_port_import;
gpr_split_host_port_type gpr_split_host_port_import;
+gpr_log_severity_string_type gpr_log_severity_string_import;
gpr_log_type gpr_log_import;
gpr_log_message_type gpr_log_message_import;
gpr_set_log_verbosity_type gpr_set_log_verbosity_import;
@@ -571,6 +572,7 @@ void grpc_rb_load_imports(HMODULE library) {
gpr_histogram_merge_contents_import = (gpr_histogram_merge_contents_type) GetProcAddress(library, "gpr_histogram_merge_contents");
gpr_join_host_port_import = (gpr_join_host_port_type) GetProcAddress(library, "gpr_join_host_port");
gpr_split_host_port_import = (gpr_split_host_port_type) GetProcAddress(library, "gpr_split_host_port");
+ gpr_log_severity_string_import = (gpr_log_severity_string_type) GetProcAddress(library, "gpr_log_severity_string");
gpr_log_import = (gpr_log_type) GetProcAddress(library, "gpr_log");
gpr_log_message_import = (gpr_log_message_type) GetProcAddress(library, "gpr_log_message");
gpr_set_log_verbosity_import = (gpr_set_log_verbosity_type) GetProcAddress(library, "gpr_set_log_verbosity");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index f62b31e83d..58467f97e8 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -761,6 +761,9 @@ extern gpr_join_host_port_type gpr_join_host_port_import;
typedef int(*gpr_split_host_port_type)(const char *name, char **host, char **port);
extern gpr_split_host_port_type gpr_split_host_port_import;
#define gpr_split_host_port gpr_split_host_port_import
+typedef const char *(*gpr_log_severity_string_type)(gpr_log_severity severity);
+extern gpr_log_severity_string_type gpr_log_severity_string_import;
+#define gpr_log_severity_string gpr_log_severity_string_import
typedef void(*gpr_log_type)(const char *file, int line, gpr_log_severity severity, const char *format, ...) GPR_PRINT_FORMAT_CHECK(4, 5);
extern gpr_log_type gpr_log_import;
#define gpr_log gpr_log_import
diff --git a/src/ruby/lib/grpc/grpc.rb b/src/ruby/lib/grpc/grpc.rb
index f46710dc74..48f2a45d44 100644
--- a/src/ruby/lib/grpc/grpc.rb
+++ b/src/ruby/lib/grpc/grpc.rb
@@ -34,6 +34,6 @@ begin
if File.directory?(distrib_lib_dir)
require_relative "#{distrib_lib_dir}/grpc_c"
else
- require_relative 'grpc_c'
+ require 'grpc/grpc_c'
end
end