aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2016-10-07 14:26:37 -0700
committerGravatar Mark D. Roth <roth@google.com>2016-10-07 14:26:37 -0700
commit90576b1b67f48a1581f6cf36626118e5035d16d1 (patch)
treeedbb2878a1baee506a1ce0d21dd57bd17fb06a6a /src/core
parent43b817ced14083de585111c2657bbe34040bdcc7 (diff)
parent70c0b32c0fc5db15c764a2e22c2491d19d7cff7b (diff)
Merge remote-tracking branch 'upstream/master' into run_interop_tests_go
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/client_config/client_channel.c44
-rw-r--r--src/core/ext/client_config/lb_policy.h27
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c74
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c14
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c86
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c33
-rw-r--r--src/core/lib/channel/message_size_filter.c14
-rw-r--r--src/core/lib/iomgr/error.h8
-rw-r--r--src/core/lib/iomgr/udp_server.c10
-rw-r--r--src/core/lib/surface/byte_buffer.c5
-rw-r--r--src/core/lib/surface/call.c15
11 files changed, 181 insertions, 149 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index feb4cbde7b..a6056c3e8d 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -111,10 +111,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
state == GRPC_CHANNEL_SHUTDOWN) &&
chand->lb_policy != NULL) {
- /* cancel fail-fast picks */
+ /* cancel picks with wait_for_ready=false */
grpc_lb_policy_cancel_picks(
exec_ctx, chand->lb_policy,
- /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
+ /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
/* check= */ 0, GRPC_ERROR_REF(error));
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
@@ -185,10 +185,35 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
lb_policy_args.additional_args =
grpc_resolver_result_get_lb_policy_args(chand->resolver_result);
lb_policy_args.client_channel_factory = chand->client_channel_factory;
- lb_policy = grpc_lb_policy_create(
- exec_ctx,
- grpc_resolver_result_get_lb_policy_name(chand->resolver_result),
- &lb_policy_args);
+
+ // Special case: If all of the addresses are balancer addresses,
+ // assume that we should use the grpclb policy, regardless of what the
+ // resolver actually specified.
+ const char *lb_policy_name =
+ grpc_resolver_result_get_lb_policy_name(chand->resolver_result);
+ bool found_backend_address = false;
+ for (size_t i = 0; i < lb_policy_args.addresses->num_addresses; ++i) {
+ if (!lb_policy_args.addresses->addresses[i].is_balancer) {
+ found_backend_address = true;
+ break;
+ }
+ }
+ if (!found_backend_address) {
+ if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
+ gpr_log(GPR_INFO,
+ "resolver requested LB policy %s but provided only balancer "
+ "addresses, no backend addresses -- forcing use of grpclb LB "
+ "policy",
+ (lb_policy_name == NULL ? "(none)" : lb_policy_name));
+ }
+ lb_policy_name = "grpclb";
+ }
+ // Use pick_first if nothing was specified and we didn't select grpclb
+ // above.
+ if (lb_policy_name == NULL) lb_policy_name = "pick_first";
+
+ lb_policy =
+ grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "config_change");
GRPC_ERROR_UNREF(state_error);
@@ -602,9 +627,10 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
int r;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
gpr_mu_unlock(&chand->mu);
- const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata,
- initial_metadata_flags,
- &calld->lb_token_mdelem};
+ // TODO(dgq): make this deadline configurable somehow.
+ const grpc_lb_policy_pick_args inputs = {
+ calld->pollent, initial_metadata, initial_metadata_flags,
+ &calld->lb_token_mdelem, gpr_inf_future(GPR_CLOCK_MONOTONIC)};
r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
NULL, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index 7fb3e08cb3..110d08fcac 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -59,10 +59,14 @@ typedef struct grpc_lb_policy_pick_args {
grpc_polling_entity *pollent;
/** Initial metadata associated with the picking call. */
grpc_metadata_batch *initial_metadata;
- /** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */
+ /** Bitmask used for selective cancelling. See \a
+ * grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in
+ * grpc_types.h */
uint32_t initial_metadata_flags;
/** Storage for LB token in \a initial_metadata, or NULL if not used */
grpc_linked_mdelem *lb_token_mdelem_storage;
+ /** Deadline for the call to the LB server */
+ gpr_timespec deadline;
} grpc_lb_policy_pick_args;
struct grpc_lb_policy_vtable {
@@ -138,15 +142,18 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
-/** Find an appropriate target for this call, based on \a pick_args.
- Picking can be synchronous or asynchronous. In the synchronous case, when a
- pick is readily available, it'll be returned in \a target and a non-zero
- value will be returned.
- In the asynchronous case, zero is returned and \a on_complete will be called
- once \a target and \a user_data have been set. Any IO should be done under
- \a pick_args->pollent. The opaque \a user_data output argument corresponds
- to information that may need be propagated from the LB policy. It may be
- NULL. Errors are signaled by receiving a NULL \a *target. */
+/** Finds an appropriate subchannel for a call, based on \a pick_args.
+
+ \a target will be set to the selected subchannel, or NULL on failure.
+ Upon success, \a user_data will be set to whatever opaque information
+ may need to be propagated from the LB policy, or NULL if not needed.
+
+ If the pick succeeds and a result is known immediately, a non-zero
+ value will be returned. Otherwise, \a on_complete will be invoked
+ once the pick is complete with its error argument set to indicate
+ success or failure.
+
+ Any I/O should be done under \a pick_args->pollent. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 0050489425..ae1f2a3b4c 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -105,6 +105,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
#include "src/core/ext/client_config/client_channel_factory.h"
#include "src/core/ext/client_config/lb_policy_factory.h"
@@ -199,18 +200,8 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
typedef struct pending_pick {
struct pending_pick *next;
- /* polling entity for the pick()'s async notification */
- grpc_polling_entity *pollent;
-
- /* the initial metadata for the pick. See grpc_lb_policy_pick() */
- grpc_metadata_batch *initial_metadata;
-
- /* storage for the lb token initial metadata mdelem */
- grpc_linked_mdelem *lb_token_mdelem_storage;
-
- /* bitmask passed to pick() and used for selective cancelling. See
- * grpc_lb_policy_cancel_picks() */
- uint32_t initial_metadata_flags;
+ /* original pick()'s arguments */
+ grpc_lb_policy_pick_args pick_args;
/* output argument where to store the pick()ed connected subchannel, or NULL
* upon error. */
@@ -232,11 +223,8 @@ static void add_pending_pick(pending_pick **root,
memset(pp, 0, sizeof(pending_pick));
memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
pp->next = *root;
- pp->pollent = pick_args->pollent;
+ pp->pick_args = *pick_args;
pp->target = target;
- pp->initial_metadata = pick_args->initial_metadata;
- pp->initial_metadata_flags = pick_args->initial_metadata_flags;
- pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
pp->wrapped_on_complete_arg.target = target;
pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
@@ -283,9 +271,13 @@ typedef struct glb_lb_policy {
/** mutex protecting remaining members */
gpr_mu mu;
+ /** who the client is trying to communicate with */
const char *server_name;
grpc_client_channel_factory *cc_factory;
+ /** deadline for the LB's call */
+ gpr_timespec deadline;
+
/** for communicating with the LB server */
grpc_channel *lb_channel;
@@ -486,10 +478,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
- const grpc_lb_policy_pick_args pick_args = {
- pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
- pp->lb_token_mdelem_storage};
- grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
+ grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pp->pick_args,
+ pp->target,
(void **)&pp->wrapped_on_complete_arg.lb_token,
&pp->wrapped_on_complete);
pp->wrapped_on_complete_arg.owning_pending_node = pp;
@@ -589,7 +579,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
&addr_strs[addr_index++],
(const struct sockaddr *)&args->addresses->addresses[i]
.address.addr,
- true) == 0);
+ true) > 0);
}
}
}
@@ -660,7 +650,6 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
*pp->target = NULL;
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE,
NULL);
- gpr_free(pp);
pp = next;
}
@@ -698,12 +687,11 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
grpc_polling_entity_del_from_pollset_set(
- exec_ctx, pp->pollent, glb_policy->base.interested_parties);
+ exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(
exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
- gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -729,14 +717,13 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
glb_policy->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
- if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(
- exec_ctx, pp->pollent, glb_policy->base.interested_parties);
+ exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties);
grpc_exec_ctx_sched(
exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
- gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -767,8 +754,6 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
-
if (pick_args->lb_token_mdelem_storage == NULL) {
*target = NULL;
grpc_exec_ctx_sched(
@@ -776,11 +761,13 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
"won't work without it. Failing"),
NULL);
- return 1;
+ return 0;
}
+ glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
- int r;
+ glb_policy->deadline = pick_args->deadline;
+ bool pick_done;
if (glb_policy->rr_policy != NULL) {
if (grpc_lb_glb_trace) {
@@ -799,10 +786,11 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
&glb_policy->wc_arg);
- r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
+ pick_done =
+ grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
(void **)&glb_policy->wc_arg.lb_token,
&glb_policy->wrapped_on_complete);
- if (r != 0) {
+ if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
@@ -816,6 +804,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token));
}
} else {
+ /* else, the pending pick will be registered and taken care of by the
+ * pending pick list inside the RR policy (glb_policy->rr_policy) */
grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
glb_policy->base.interested_parties);
add_pending_pick(&glb_policy->pending_picks, pick_args, target,
@@ -824,10 +814,10 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!glb_policy->started_picking) {
start_picking(exec_ctx, glb_policy);
}
- r = 0;
+ pick_done = false;
}
gpr_mu_unlock(&glb_policy->mu);
- return r;
+ return pick_done;
}
static grpc_connectivity_state glb_check_connectivity(
@@ -923,6 +913,9 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
+ GPR_ASSERT(glb_policy->server_name != NULL);
+ GPR_ASSERT(glb_policy->server_name[0] != '\0');
+
lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data));
memset(lb_client, 0, sizeof(lb_client_data));
@@ -934,8 +927,7 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client);
- /* TODO(dgq): get the deadline from the parent channel. */
- lb_client->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ lb_client->deadline = glb_policy->deadline;
/* Note the following LB call progresses every time there's activity in \a
* glb_policy->base.interested_parties, which is comprised of the polling
@@ -943,14 +935,14 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
lb_client->lb_call = grpc_channel_create_pollset_set_call(
glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
glb_policy->base.interested_parties,
- "/grpc.lb.v1.LoadBalancer/BalanceLoad", NULL, lb_client->deadline, NULL);
+ "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name,
+ lb_client->deadline, NULL);
grpc_metadata_array_init(&lb_client->initial_metadata_recv);
grpc_metadata_array_init(&lb_client->trailing_metadata_recv);
- grpc_grpclb_request *request = grpc_grpclb_request_create(
- "load.balanced.service.name"); /* FIXME(dgq): get the name of the load
- balanced service from the resolver */
+ grpc_grpclb_request *request =
+ grpc_grpclb_request_create(glb_policy->server_name);
gpr_slice request_payload_slice = grpc_grpclb_request_encode(request);
lb_client->request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index e8ac1b12ae..fa33ffd7bd 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -53,16 +53,12 @@
typedef struct {
/** base class: must be first */
grpc_resolver base;
- /** refcount */
- gpr_refcount refs;
/** target name */
char *target_name;
/** name to resolve (usually the same as target_name) */
char *name_to_resolve;
/** default port to use */
char *default_port;
- /** load balancing policy name */
- char *lb_policy_name;
/** mutex guarding the rest of the state */
gpr_mu mu;
@@ -181,7 +177,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
}
grpc_resolved_addresses_destroy(r->addresses);
result = grpc_resolver_result_create(r->target_name, addresses,
- r->lb_policy_name, NULL);
+ NULL /* lb_policy_name */, NULL);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
@@ -245,13 +241,11 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
gpr_free(r->target_name);
gpr_free(r->name_to_resolve);
gpr_free(r->default_port);
- gpr_free(r->lb_policy_name);
gpr_free(r);
}
static grpc_resolver *dns_create(grpc_resolver_args *args,
- const char *default_port,
- const char *lb_policy_name) {
+ const char *default_port) {
if (0 != strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based dns uri's not supported");
return NULL;
@@ -264,7 +258,6 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
// Create resolver.
dns_resolver *r = gpr_malloc(sizeof(dns_resolver));
memset(r, 0, sizeof(*r));
- gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &dns_resolver_vtable);
r->target_name = gpr_strdup(path);
@@ -272,7 +265,6 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
r->default_port = gpr_strdup(default_port);
gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER,
BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000);
- r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;
}
@@ -286,7 +278,7 @@ static void dns_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *dns_factory_create_resolver(
grpc_resolver_factory *factory, grpc_resolver_args *args) {
- return dns_create(args, "https", "pick_first");
+ return dns_create(args, "https");
}
static char *dns_factory_get_default_host_name(grpc_resolver_factory *factory,
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 74d2015e5c..5a7a32d7cb 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -49,14 +49,10 @@
typedef struct {
/** base class: must be first */
grpc_resolver base;
- /** refcount */
- gpr_refcount refs;
- /** load balancing policy name */
- char *lb_policy_name;
-
+ /** the path component of the uri passed in */
+ char *target_name;
/** the addresses that we've 'resolved' */
grpc_lb_addresses *addresses;
-
/** mutex guarding the rest of the state */
gpr_mu mu;
/** have we published? */
@@ -121,8 +117,9 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
if (r->next_completion != NULL && !r->published) {
r->published = true;
*r->target_result = grpc_resolver_result_create(
- "", grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */),
- r->lb_policy_name, NULL);
+ r->target_name,
+ grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */),
+ NULL /* lb_policy_name */, NULL);
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
}
@@ -132,7 +129,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
- gpr_free(r->lb_policy_name);
+ gpr_free(r->target_name);
gpr_free(r);
}
@@ -161,78 +158,49 @@ char *unix_get_default_authority(grpc_resolver_factory *factory,
static void do_nothing(void *ignored) {}
-static grpc_resolver *sockaddr_create(
- grpc_resolver_args *args, const char *default_lb_policy_name,
- int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) {
- bool errors_found = false;
- sockaddr_resolver *r;
- gpr_slice path_slice;
- gpr_slice_buffer path_parts;
-
+static grpc_resolver *sockaddr_create(grpc_resolver_args *args,
+ int parse(grpc_uri *uri,
+ struct sockaddr_storage *dst,
+ size_t *len)) {
if (0 != strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme",
args->uri->scheme);
return NULL;
}
-
- r = gpr_malloc(sizeof(sockaddr_resolver));
- memset(r, 0, sizeof(*r));
-
- r->lb_policy_name =
- gpr_strdup(grpc_uri_get_query_arg(args->uri, "lb_policy"));
- const char *lb_enabled_qpart =
- grpc_uri_get_query_arg(args->uri, "lb_enabled");
- /* anything other than "0" is interpreted as true */
- const bool lb_enabled =
- (lb_enabled_qpart != NULL && (strcmp("0", lb_enabled_qpart) != 0));
-
- if (r->lb_policy_name != NULL && strcmp("grpclb", r->lb_policy_name) == 0 &&
- !lb_enabled) {
- /* we want grpclb but the "resolved" addresses aren't LB enabled. Bail
- * out, as this is meant mostly for tests. */
- gpr_log(GPR_ERROR,
- "Requested 'grpclb' LB policy but resolved addresses don't "
- "support load balancing.");
- abort();
- }
-
- if (r->lb_policy_name == NULL) {
- r->lb_policy_name = gpr_strdup(default_lb_policy_name);
- }
-
- path_slice =
+ /* Construct addresses. */
+ gpr_slice path_slice =
gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing);
+ gpr_slice_buffer path_parts;
gpr_slice_buffer_init(&path_parts);
-
gpr_slice_split(path_slice, ",", &path_parts);
- r->addresses = grpc_lb_addresses_create(path_parts.count);
- for (size_t i = 0; i < r->addresses->num_addresses; i++) {
+ grpc_lb_addresses *addresses = grpc_lb_addresses_create(path_parts.count);
+ bool errors_found = false;
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
grpc_uri ith_uri = *args->uri;
char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
ith_uri.path = part_str;
- if (!parse(&ith_uri, (struct sockaddr_storage *)(&r->addresses->addresses[i]
- .address.addr),
- &r->addresses->addresses[i].address.len)) {
+ if (!parse(
+ &ith_uri,
+ (struct sockaddr_storage *)(&addresses->addresses[i].address.addr),
+ &addresses->addresses[i].address.len)) {
errors_found = true;
}
gpr_free(part_str);
- r->addresses->addresses[i].is_balancer = lb_enabled;
if (errors_found) break;
}
-
gpr_slice_buffer_destroy(&path_parts);
gpr_slice_unref(path_slice);
if (errors_found) {
- gpr_free(r->lb_policy_name);
- grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
- gpr_free(r);
+ grpc_lb_addresses_destroy(addresses, NULL /* user_data_destroy */);
return NULL;
}
-
- gpr_ref_init(&r->refs, 1);
+ /* Instantiate resolver. */
+ sockaddr_resolver *r = gpr_malloc(sizeof(sockaddr_resolver));
+ memset(r, 0, sizeof(*r));
+ r->target_name = gpr_strdup(args->uri->path);
+ r->addresses = addresses;
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
-
return &r->base;
}
@@ -247,7 +215,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {}
#define DECL_FACTORY(name) \
static grpc_resolver *name##_factory_create_resolver( \
grpc_resolver_factory *factory, grpc_resolver_args *args) { \
- return sockaddr_create(args, "pick_first", parse_##name); \
+ return sockaddr_create(args, parse_##name); \
} \
static const grpc_resolver_factory_vtable name##_factory_vtable = { \
sockaddr_factory_ref, sockaddr_factory_unref, \
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 366690acf2..25ad40b935 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -239,6 +239,14 @@ static const char *op_id_string(enum e_op_id i) {
return "UNKNOWN";
}
+static void free_read_buffer(stream_obj *s) {
+ if (s->state.rs.read_buffer &&
+ s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
+ gpr_free(s->state.rs.read_buffer);
+ s->state.rs.read_buffer = NULL;
+ }
+}
+
/*
Add a new stream op to op storage.
*/
@@ -341,6 +349,7 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
+ free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -363,6 +372,7 @@ static void on_canceled(cronet_bidirectional_stream *stream) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
+ free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -377,6 +387,7 @@ static void on_succeeded(cronet_bidirectional_stream *stream) {
cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_SUCCEEDED] = true;
s->cbs = NULL;
+ free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -531,7 +542,8 @@ static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer,
*/
static void convert_metadata_to_cronet_headers(
grpc_linked_mdelem *head, const char *host, char **pp_url,
- cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers) {
+ cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers,
+ const char **method) {
grpc_linked_mdelem *curr = head;
/* Walk the linked list and get number of header fields */
size_t num_headers_available = 0;
@@ -558,11 +570,20 @@ static void convert_metadata_to_cronet_headers(
curr = curr->next;
const char *key = grpc_mdstr_as_c_string(mdelem->key);
const char *value = grpc_mdstr_as_c_string(mdelem->value);
- if (mdelem->key == GRPC_MDSTR_METHOD || mdelem->key == GRPC_MDSTR_SCHEME ||
+ if (mdelem->key == GRPC_MDSTR_SCHEME ||
mdelem->key == GRPC_MDSTR_AUTHORITY) {
/* Cronet populates these fields on its own */
continue;
}
+ if (mdelem->key == GRPC_MDSTR_METHOD) {
+ if (mdelem->value == GRPC_MDSTR_PUT) {
+ *method = "PUT";
+ } else {
+ /* POST method in default*/
+ *method = "POST";
+ }
+ continue;
+ }
if (mdelem->key == GRPC_MDSTR_PATH) {
/* Create URL by appending :path value to the hostname */
gpr_asprintf(pp_url, "https://%s%s", host, value);
@@ -759,15 +780,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
&cronet_callbacks);
CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs);
- char *url;
+ char *url = NULL;
+ const char *method = "POST";
s->header_array.headers = NULL;
convert_metadata_to_cronet_headers(
stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url,
- &s->header_array.headers, &s->header_array.count);
+ &s->header_array.headers, &s->header_array.count, &method);
s->header_array.capacity = s->header_array.count;
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start(%p, %s)", s->cbs,
url);
- cronet_bidirectional_stream_start(s->cbs, url, 0, "POST", &s->header_array,
+ cronet_bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array,
false);
stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
result = ACTION_TAKEN_WITH_CALLBACK;
@@ -901,6 +923,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
memcpy(dst_p, stream_state->rs.read_buffer,
(size_t)stream_state->rs.length_field);
+ free_read_buffer(s);
gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer,
read_data_slice);
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index 02fc68fc3a..f067a3a51c 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -73,16 +73,22 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
gpr_asprintf(&message_string,
"Received message larger than max (%u vs. %d)",
(*calld->recv_message)->length, chand->max_recv_size);
- gpr_slice message = gpr_slice_from_copied_string(message_string);
+ grpc_error* new_error = grpc_error_set_int(
+ GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_INVALID_ARGUMENT);
+ if (error == GRPC_ERROR_NONE) {
+ error = new_error;
+ } else {
+ error = grpc_error_add_child(error, new_error);
+ GRPC_ERROR_UNREF(new_error);
+ }
gpr_free(message_string);
- grpc_call_element_send_close_with_message(
- exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
}
// Invoke the next callback.
grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL);
}
-// Start transport op.
+// Start transport stream op.
static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 7e2fd7a3bd..00ace8a7a9 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -40,6 +40,10 @@
#include <grpc/status.h>
#include <grpc/support/time.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
/// Opaque representation of an error.
/// Errors are refcounted objects that represent the result of an operation.
/// Ownership laws:
@@ -204,4 +208,8 @@ bool grpc_log_if_error(const char *what, grpc_error *error, const char *file,
#define GRPC_LOG_IF_ERROR(what, error) \
grpc_log_if_error((what), (error), __FILE__, __LINE__)
+#ifdef __cplusplus
+}
+#endif
+
#endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 48032412a2..edf7b133e9 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -38,7 +38,6 @@
#include <grpc/support/port_platform.h>
-#ifdef GRPC_NEED_UDP
#ifdef GPR_POSIX_SOCKET
#include "src/core/lib/iomgr/udp_server.h"
@@ -171,6 +170,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
+ /* Call the orphan_cb to signal that the FD is about to be closed and
+ * should no longer be used. */
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(sp->emfd);
@@ -197,6 +198,12 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
/* shutdown all fd's */
if (s->active_ports) {
for (i = 0; i < s->nports; i++) {
+ server_port *sp = &s->ports[i];
+ /* Call the orphan_cb to signal that the FD is about to be closed and
+ * should no longer be used. */
+ GPR_ASSERT(sp->orphan_cb);
+ sp->orphan_cb(sp->emfd);
+
grpc_fd_shutdown(exec_ctx, s->ports[i].emfd);
}
gpr_mu_unlock(&s->mu);
@@ -439,4 +446,3 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
}
#endif
-#endif
diff --git a/src/core/lib/surface/byte_buffer.c b/src/core/lib/surface/byte_buffer.c
index a093a37af3..054a6e6c58 100644
--- a/src/core/lib/surface/byte_buffer.c
+++ b/src/core/lib/surface/byte_buffer.c
@@ -72,8 +72,9 @@ grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_RAW:
- return grpc_raw_byte_buffer_create(bb->data.raw.slice_buffer.slices,
- bb->data.raw.slice_buffer.count);
+ return grpc_raw_compressed_byte_buffer_create(
+ bb->data.raw.slice_buffer.slices, bb->data.raw.slice_buffer.count,
+ bb->data.raw.compression);
}
GPR_UNREACHABLE_CODE(return NULL);
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 5690bcab1e..b0f66f4f61 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1103,8 +1103,8 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
-static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
- bool success) {
+static void process_data_after_md(grpc_exec_ctx *exec_ctx,
+ batch_control *bctl) {
grpc_call *call = bctl->call;
if (call->receiving_stream == NULL) {
*call->receiving_buffer = NULL;
@@ -1124,8 +1124,6 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
bctl);
continue_receiving_slices(exec_ctx, bctl);
- /* early out */
- return;
}
}
@@ -1133,12 +1131,17 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
-
+ if (error != GRPC_ERROR_NONE) {
+ grpc_status_code status;
+ const char *msg;
+ grpc_error_get_status(error, &status, &msg);
+ close_with_status(exec_ctx, call, status, msg);
+ }
gpr_mu_lock(&bctl->call->mu);
if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
call->receiving_stream == NULL) {
gpr_mu_unlock(&bctl->call->mu);
- process_data_after_md(exec_ctx, bctlp, error);
+ process_data_after_md(exec_ctx, bctlp);
} else {
call->saved_receiving_stream_ready_bctlp = bctlp;
gpr_mu_unlock(&bctl->call->mu);