aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c54
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c46
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c10
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c10
-rw-r--r--src/core/ext/resolver/zookeeper/zookeeper_resolver.c11
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c89
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c101
-rw-r--r--src/core/lib/channel/client_channel.c40
-rw-r--r--src/core/lib/channel/http_client_filter.c10
-rw-r--r--src/core/lib/channel/subchannel_call_holder.c5
-rw-r--r--src/core/lib/channel/subchannel_call_holder.h1
-rw-r--r--src/core/lib/client_config/README.md2
-rw-r--r--src/core/lib/client_config/client_channel_factory.c (renamed from src/core/lib/client_config/subchannel_factory.c)20
-rw-r--r--src/core/lib/client_config/client_channel_factory.h (renamed from src/core/lib/client_config/subchannel_factory.h)51
-rw-r--r--src/core/lib/client_config/lb_policy.c11
-rw-r--r--src/core/lib/client_config/lb_policy.h13
-rw-r--r--src/core/lib/client_config/lb_policy_factory.h4
-rw-r--r--src/core/lib/client_config/resolver_factory.h4
-rw-r--r--src/core/lib/client_config/resolver_registry.c4
-rw-r--r--src/core/lib/client_config/resolver_registry.h2
-rw-r--r--src/core/lib/client_config/subchannel.c21
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c2
-rw-r--r--src/core/lib/iomgr/ev_posix.c2
-rw-r--r--src/core/lib/iomgr/ev_posix.h2
-rw-r--r--src/core/lib/surface/call.c10
-rw-r--r--src/core/lib/surface/channel.h2
-rw-r--r--src/core/lib/transport/transport.h6
27 files changed, 360 insertions, 173 deletions
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index cb5c40501e..43197c1191 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -40,6 +40,7 @@
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -149,6 +150,31 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
+static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ pending_pick *pp;
+ gpr_mu_lock(&p->mu);
+ pp = p->pending_picks;
+ p->pending_picks = NULL;
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq) {
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ pp->pollset);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ gpr_free(pp);
+ } else {
+ pp->next = p->pending_picks;
+ p->pending_picks = pp;
+ }
+ pp = next;
+ }
+ gpr_mu_unlock(&p->mu);
+}
+
static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
p->started_picking = 1;
p->checking_subchannel = 0;
@@ -171,6 +197,7 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -199,6 +226,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->next = p->pending_picks;
pp->pollset = pollset;
pp->target = target;
+ pp->initial_metadata_flags = initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
@@ -286,11 +314,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
+ if (p->checking_subchannel == 0) {
+ /* only trigger transient failure when we've tried all alternatives */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ "connecting_transient_failure");
+ }
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
@@ -378,14 +409,9 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_destroy,
- pf_shutdown,
- pf_pick,
- pf_cancel_pick,
- pf_ping_one,
- pf_exit_idle,
- pf_check_connectivity,
- pf_notify_on_state_change};
+ pf_destroy, pf_shutdown, pf_pick,
+ pf_cancel_pick, pf_cancel_picks, pf_ping_one,
+ pf_exit_idle, pf_check_connectivity, pf_notify_on_state_change};
static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -395,7 +421,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
- GPR_ASSERT(args->subchannel_factory != NULL);
+ GPR_ASSERT(args->client_channel_factory != NULL);
if (args->addresses->naddrs == 0) return NULL;
@@ -412,8 +438,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
- grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
- exec_ctx, args->subchannel_factory, &sc_args);
+ grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
+ exec_ctx, args->client_channel_factory, &sc_args);
if (subchannel != NULL) {
p->subchannels[subchannel_idx++] = subchannel;
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index d94c081494..eac6971099 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -49,6 +49,7 @@ int grpc_lb_round_robin_trace = 0;
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -275,6 +276,32 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
}
+static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
+ pending_pick *pp;
+ gpr_mu_lock(&p->mu);
+ pp = p->pending_picks;
+ p->pending_picks = NULL;
+ while (pp != NULL) {
+ pending_pick *next = pp->next;
+ if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq) {
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
+ pp->pollset);
+ *pp->target = NULL;
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ gpr_free(pp);
+ } else {
+ pp->next = p->pending_picks;
+ p->pending_picks = pp;
+ }
+ pp = next;
+ }
+ gpr_mu_unlock(&p->mu);
+}
+
static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
@@ -303,6 +330,7 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@@ -330,6 +358,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp->pollset = pollset;
pp->target = target;
pp->on_complete = on_complete;
+ pp->initial_metadata_flags = initial_metadata_flags;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@@ -485,14 +514,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
- rr_destroy,
- rr_shutdown,
- rr_pick,
- rr_cancel_pick,
- rr_ping_one,
- rr_exit_idle,
- rr_check_connectivity,
- rr_notify_on_state_change};
+ rr_destroy, rr_shutdown, rr_pick,
+ rr_cancel_pick, rr_cancel_picks, rr_ping_one,
+ rr_exit_idle, rr_check_connectivity, rr_notify_on_state_change};
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
@@ -502,7 +526,7 @@ static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
- GPR_ASSERT(args->subchannel_factory != NULL);
+ GPR_ASSERT(args->client_channel_factory != NULL);
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
@@ -518,8 +542,8 @@ static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx,
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
- grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
- exec_ctx, args->subchannel_factory, &sc_args);
+ grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
+ exec_ctx, args->client_channel_factory, &sc_args);
if (subchannel != NULL) {
subchannel_data *sd = gpr_malloc(sizeof(*sd));
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 70d8a3fe2d..1a0bf305d6 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -59,7 +59,7 @@ typedef struct {
/** default port to use */
char *default_port;
/** subchannel factory */
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@@ -170,7 +170,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
config = grpc_client_config_create();
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.addresses = addresses;
- lb_policy_args.subchannel_factory = r->subchannel_factory;
+ lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
@@ -228,7 +228,7 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
if (r->resolved_config) {
grpc_client_config_unref(exec_ctx, r->resolved_config);
}
- grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
+ grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r->lb_policy_name);
@@ -255,10 +255,10 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
grpc_resolver_init(&r->base, &dns_resolver_vtable);
r->name = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
- r->subchannel_factory = args->subchannel_factory;
+ r->client_channel_factory = args->client_channel_factory;
gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER,
BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000);
- grpc_subchannel_factory_ref(r->subchannel_factory);
+ grpc_client_channel_factory_ref(r->client_channel_factory);
r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;
}
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 69595ca3db..b50b4ee480 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -52,7 +52,7 @@ typedef struct {
/** refcount */
gpr_refcount refs;
/** subchannel factory */
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@@ -125,7 +125,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args lb_policy_args;
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.addresses = r->addresses;
- lb_policy_args.subchannel_factory = r->subchannel_factory;
+ lb_policy_args.client_channel_factory = r->client_channel_factory;
grpc_lb_policy *lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
grpc_client_config_set_lb_policy(cfg, lb_policy);
@@ -140,7 +140,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
- grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
+ grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
grpc_resolved_addresses_destroy(r->addresses);
gpr_free(r->lb_policy_name);
gpr_free(r);
@@ -318,8 +318,8 @@ static grpc_resolver *sockaddr_create(
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
- r->subchannel_factory = args->subchannel_factory;
- grpc_subchannel_factory_ref(r->subchannel_factory);
+ r->client_channel_factory = args->client_channel_factory;
+ grpc_client_channel_factory_ref(r->client_channel_factory);
return &r->base;
}
diff --git a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
index 5acb0940c6..91153f14b5 100644
--- a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
+++ b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c
@@ -57,7 +57,7 @@ typedef struct {
/** name to resolve */
char *name;
/** subchannel factory */
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@@ -187,9 +187,8 @@ static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create();
-
lb_policy_args.addresses = addresses;
- lb_policy_args.subchannel_factory = r->subchannel_factory;
+ lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
@@ -424,7 +423,7 @@ static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
if (r->resolved_config != NULL) {
grpc_client_config_unref(exec_ctx, r->resolved_config);
}
- grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
+ grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
gpr_free(r->name);
gpr_free(r->lb_policy_name);
gpr_free(r);
@@ -454,8 +453,8 @@ static grpc_resolver *zookeeper_create(grpc_resolver_args *args,
grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
r->name = gpr_strdup(path);
- r->subchannel_factory = args->subchannel_factory;
- grpc_subchannel_factory_ref(r->subchannel_factory);
+ r->client_channel_factory = args->client_channel_factory;
+ grpc_client_channel_factory_ref(r->client_channel_factory);
r->lb_policy_name = gpr_strdup(lb_policy_name);
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 606fff5fb4..ee2ac44978 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -136,31 +136,35 @@ static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect};
typedef struct {
- grpc_subchannel_factory base;
+ grpc_client_channel_factory base;
gpr_refcount refs;
grpc_channel_args *merge_args;
grpc_channel *master;
-} subchannel_factory;
+} client_channel_factory;
-static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
- subchannel_factory *f = (subchannel_factory *)scf;
+static void client_channel_factory_ref(
+ grpc_client_channel_factory *cc_factory) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
gpr_ref(&f->refs);
}
-static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *scf) {
- subchannel_factory *f = (subchannel_factory *)scf;
+static void client_channel_factory_unref(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) {
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, "subchannel_factory");
+ if (f->master != NULL) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
+ "client_channel_factory");
+ }
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
}
-static grpc_subchannel *subchannel_factory_create_subchannel(
- grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *scf,
+static grpc_subchannel *client_channel_factory_create_subchannel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
grpc_subchannel_args *args) {
- subchannel_factory *f = (subchannel_factory *)scf;
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
grpc_channel_args_merge(args->args, f->merge_args);
@@ -175,9 +179,33 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
return s;
}
-static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
- subchannel_factory_ref, subchannel_factory_unref,
- subchannel_factory_create_subchannel};
+static grpc_channel *client_channel_factory_create_channel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
+ const char *target, grpc_client_channel_type type,
+ grpc_channel_args *args) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
+ grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
+ grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
+ GRPC_CLIENT_CHANNEL, NULL);
+ grpc_channel_args_destroy(final_args);
+ grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
+ if (!resolver) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
+ "client_channel_factory_create_channel");
+ return NULL;
+ }
+
+ grpc_client_channel_set_resolver(
+ exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel");
+
+ return channel;
+}
+
+static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
+ {client_channel_factory_ref, client_channel_factory_unref,
+ client_channel_factory_create_subchannel,
+ client_channel_factory_create_channel};
/* Create a client channel:
Asynchronously: - resolve target
@@ -186,38 +214,27 @@ static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
grpc_channel *grpc_insecure_channel_create(const char *target,
const grpc_channel_args *args,
void *reserved) {
- grpc_channel *channel = NULL;
- grpc_resolver *resolver;
- subchannel_factory *f;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
"grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3,
(target, args, reserved));
GPR_ASSERT(!reserved);
- channel =
- grpc_channel_create(&exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL);
-
- f = gpr_malloc(sizeof(*f));
- f->base.vtable = &subchannel_factory_vtable;
+ client_channel_factory *f = gpr_malloc(sizeof(*f));
+ memset(f, 0, sizeof(*f));
+ f->base.vtable = &client_channel_factory_vtable;
gpr_ref_init(&f->refs, 1);
f->merge_args = grpc_channel_args_copy(args);
- f->master = channel;
- GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
- resolver = grpc_resolver_create(target, &f->base);
- if (!resolver) {
- GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, f->master, "subchannel_factory");
- grpc_subchannel_factory_unref(&exec_ctx, &f->base);
- grpc_exec_ctx_finish(&exec_ctx);
- return NULL;
- }
- grpc_client_channel_set_resolver(
- &exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
- GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create");
- grpc_subchannel_factory_unref(&exec_ctx, &f->base);
+ grpc_channel *channel = client_channel_factory_create_channel(
+ &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
+ if (channel != NULL) {
+ f->master = channel;
+ GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_insecure_channel_create");
+ }
+ grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
- return channel;
+ return channel; /* may be NULL */
}
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 3465d2b6c4..5d30805639 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -192,34 +192,38 @@ static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect};
typedef struct {
- grpc_subchannel_factory base;
+ grpc_client_channel_factory base;
gpr_refcount refs;
grpc_channel_args *merge_args;
grpc_channel_security_connector *security_connector;
grpc_channel *master;
-} subchannel_factory;
+} client_channel_factory;
-static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
- subchannel_factory *f = (subchannel_factory *)scf;
+static void client_channel_factory_ref(
+ grpc_client_channel_factory *cc_factory) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
gpr_ref(&f->refs);
}
-static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *scf) {
- subchannel_factory *f = (subchannel_factory *)scf;
+static void client_channel_factory_unref(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) {
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
- "subchannel_factory");
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, "subchannel_factory");
+ "client_channel_factory");
+ if (f->master != NULL) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
+ "client_channel_factory");
+ }
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
}
-static grpc_subchannel *subchannel_factory_create_subchannel(
- grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *scf,
+static grpc_subchannel *client_channel_factory_create_subchannel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
grpc_subchannel_args *args) {
- subchannel_factory *f = (subchannel_factory *)scf;
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
grpc_channel_args_merge(args->args, f->merge_args);
@@ -236,9 +240,37 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
return s;
}
-static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
- subchannel_factory_ref, subchannel_factory_unref,
- subchannel_factory_create_subchannel};
+static grpc_channel *client_channel_factory_create_channel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
+ const char *target, grpc_client_channel_type type,
+ grpc_channel_args *args) {
+ client_channel_factory *f = (client_channel_factory *)cc_factory;
+
+ grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
+ grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
+ GRPC_CLIENT_CHANNEL, NULL);
+ grpc_channel_args_destroy(final_args);
+
+ grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
+ if (resolver != NULL) {
+ grpc_client_channel_set_resolver(
+ exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create");
+ } else {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
+ "client_channel_factory_create_channel");
+ channel = NULL;
+ }
+
+ GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
+ "client_channel_factory_create_channel");
+ return channel;
+}
+
+static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
+ {client_channel_factory_ref, client_channel_factory_unref,
+ client_channel_factory_create_subchannel,
+ client_channel_factory_create_channel};
/* Create a secure client channel:
Asynchronously: - resolve target
@@ -248,13 +280,11 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
const char *target,
const grpc_channel_args *args,
void *reserved) {
- grpc_channel *channel;
grpc_arg connector_arg;
grpc_channel_args *args_copy;
grpc_channel_args *new_args_from_connector;
grpc_channel_security_connector *security_connector;
- grpc_resolver *resolver;
- subchannel_factory *f;
+ client_channel_factory *f;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
@@ -284,35 +314,30 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg, 1);
- channel = grpc_channel_create(&exec_ctx, target, args_copy,
- GRPC_CLIENT_CHANNEL, NULL);
-
f = gpr_malloc(sizeof(*f));
- f->base.vtable = &subchannel_factory_vtable;
+ memset(f, 0, sizeof(*f));
+ f->base.vtable = &client_channel_factory_vtable;
gpr_ref_init(&f->refs, 1);
- GRPC_SECURITY_CONNECTOR_REF(&security_connector->base, "subchannel_factory");
- f->security_connector = security_connector;
+
f->merge_args = grpc_channel_args_copy(args_copy);
- f->master = channel;
- GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
- resolver = grpc_resolver_create(target, &f->base);
- if (resolver) {
- grpc_client_channel_set_resolver(
- &exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
- GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create");
- }
- grpc_subchannel_factory_unref(&exec_ctx, &f->base);
- GRPC_SECURITY_CONNECTOR_UNREF(&security_connector->base, "channel_create");
grpc_channel_args_destroy(args_copy);
if (new_args_from_connector != NULL) {
grpc_channel_args_destroy(new_args_from_connector);
}
- if (!resolver) {
- GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "subchannel_factory");
- channel = NULL;
+ GRPC_SECURITY_CONNECTOR_REF(&security_connector->base,
+ "grpc_secure_channel_create");
+ f->security_connector = security_connector;
+
+ grpc_channel *channel = client_channel_factory_create_channel(
+ &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
+ if (channel != NULL) {
+ f->master = channel;
+ GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_secure_channel_create");
}
+
+ grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
- return channel;
+ return channel; /* may be NULL */
}
diff --git a/src/core/lib/channel/client_channel.c b/src/core/lib/channel/client_channel.c
index 3f7cf1cf97..55867ba80c 100644
--- a/src/core/lib/channel/client_channel.c
+++ b/src/core/lib/channel/client_channel.c
@@ -114,6 +114,22 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state);
+static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
+ channel_data *chand,
+ grpc_connectivity_state state,
+ const char *reason) {
+ if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
+ state == GRPC_CHANNEL_FATAL_FAILURE) &&
+ chand->lb_policy != NULL) {
+ /* cancel fail-fast picks */
+ grpc_lb_policy_cancel_picks(
+ exec_ctx, chand->lb_policy,
+ /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
+ /* check= */ 0);
+ }
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, reason);
+}
+
static void on_lb_policy_state_changed_locked(
grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
grpc_connectivity_state publish_state = w->state;
@@ -127,8 +143,8 @@ static void on_lb_policy_state_changed_locked(
GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
w->chand->lb_policy = NULL;
}
- grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, publish_state,
- "lb_changed");
+ set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
+ "lb_changed");
if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
}
@@ -200,8 +216,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (iomgr_success && chand->resolver) {
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
- "new_lb+resolver");
+ set_channel_connectivity_state_locked(exec_ctx, chand, state,
+ "new_lb+resolver");
if (lb_policy != NULL) {
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
@@ -216,8 +232,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
}
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
gpr_mu_unlock(&chand->mu_config);
}
@@ -272,8 +288,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
if (op->disconnect && chand->resolver != NULL) {
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+ set_channel_connectivity_state_locked(
+ exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
chand->resolver = NULL;
@@ -290,6 +306,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
typedef struct {
grpc_metadata_batch *initial_metadata;
+ uint32_t initial_metadata_flags;
grpc_connected_subchannel **connected_subchannel;
grpc_closure *on_ready;
grpc_call_element *elem;
@@ -298,6 +315,7 @@ typedef struct {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
@@ -308,6 +326,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
} else if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
+ cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready)) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL);
}
@@ -316,6 +335,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
grpc_call_element *elem = elemp;
@@ -349,7 +369,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu_config);
r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset,
- initial_metadata, connected_subchannel, on_ready);
+ initial_metadata, initial_metadata_flags,
+ connected_subchannel, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
return r;
}
@@ -362,6 +383,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
cpa = gpr_malloc(sizeof(*cpa));
cpa->initial_metadata = initial_metadata;
+ cpa->initial_metadata_flags = initial_metadata_flags;
cpa->connected_subchannel = connected_subchannel;
cpa->on_ready = on_ready;
cpa->elem = elem;
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index deba23e21f..211f537c69 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -111,10 +111,12 @@ static void hc_mutate_op(grpc_call_element *elem,
elem);
/* Send : prefixed headers, which have to be before any application
layer headers. */
- grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method,
- op->send_idempotent_request
- ? GRPC_MDELEM_METHOD_PUT
- : GRPC_MDELEM_METHOD_POST);
+ grpc_metadata_batch_add_head(
+ op->send_initial_metadata, &calld->method,
+ op->send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
+ ? GRPC_MDELEM_METHOD_PUT
+ : GRPC_MDELEM_METHOD_POST);
grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme,
channeld->static_scheme);
grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers,
diff --git a/src/core/lib/channel/subchannel_call_holder.c b/src/core/lib/channel/subchannel_call_holder.c
index 22f3679bf5..a2686a380a 100644
--- a/src/core/lib/channel/subchannel_call_holder.c
+++ b/src/core/lib/channel/subchannel_call_holder.c
@@ -127,7 +127,7 @@ retry:
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
- &holder->connected_subchannel, NULL);
+ 0, &holder->connected_subchannel, NULL);
break;
}
gpr_mu_unlock(&holder->mu);
@@ -145,7 +145,8 @@ retry:
GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel");
if (holder->pick_subchannel(
exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata,
- &holder->connected_subchannel, &holder->next_step)) {
+ op->send_initial_metadata_flags, &holder->connected_subchannel,
+ &holder->next_step)) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel");
}
diff --git a/src/core/lib/channel/subchannel_call_holder.h b/src/core/lib/channel/subchannel_call_holder.h
index 5cf291a266..769e22b061 100644
--- a/src/core/lib/channel/subchannel_call_holder.h
+++ b/src/core/lib/channel/subchannel_call_holder.h
@@ -42,6 +42,7 @@
called when the subchannel is available) */
typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready);
typedef enum {
diff --git a/src/core/lib/client_config/README.md b/src/core/lib/client_config/README.md
index fff7a5af5b..7024fd540d 100644
--- a/src/core/lib/client_config/README.md
+++ b/src/core/lib/client_config/README.md
@@ -40,7 +40,7 @@ decisions (for example, by avoiding disconnected backends).
Configured sub-channels are fully setup to participate in the grpc data plane.
Their behavior is specified by a set of grpc channel filters defined at their
construction. To customize this behavior, resolvers build
-grpc_subchannel_factory objects, which use the decorator pattern to customize
+grpc_client_channel_factory objects, which use the decorator pattern to customize
construction arguments for concrete grpc_subchannel instances.
diff --git a/src/core/lib/client_config/subchannel_factory.c b/src/core/lib/client_config/client_channel_factory.c
index 541368ec96..d27b38d9f2 100644
--- a/src/core/lib/client_config/subchannel_factory.c
+++ b/src/core/lib/client_config/client_channel_factory.c
@@ -31,19 +31,27 @@
*
*/
-#include "src/core/lib/client_config/subchannel_factory.h"
+#include "src/core/lib/client_config/client_channel_factory.h"
-void grpc_subchannel_factory_ref(grpc_subchannel_factory* factory) {
+void grpc_client_channel_factory_ref(grpc_client_channel_factory* factory) {
factory->vtable->ref(factory);
}
-void grpc_subchannel_factory_unref(grpc_exec_ctx* exec_ctx,
- grpc_subchannel_factory* factory) {
+void grpc_client_channel_factory_unref(grpc_exec_ctx* exec_ctx,
+ grpc_client_channel_factory* factory) {
factory->vtable->unref(exec_ctx, factory);
}
-grpc_subchannel* grpc_subchannel_factory_create_subchannel(
- grpc_exec_ctx* exec_ctx, grpc_subchannel_factory* factory,
+grpc_subchannel* grpc_client_channel_factory_create_subchannel(
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* factory,
grpc_subchannel_args* args) {
return factory->vtable->create_subchannel(exec_ctx, factory, args);
}
+
+grpc_channel* grpc_client_channel_factory_create_channel(
+ grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* factory,
+ const char* target, grpc_client_channel_type type,
+ grpc_channel_args* args) {
+ return factory->vtable->create_client_channel(exec_ctx, factory, target, type,
+ args);
+}
diff --git a/src/core/lib/client_config/subchannel_factory.h b/src/core/lib/client_config/client_channel_factory.h
index 96d68a2079..0b71cd9bdb 100644
--- a/src/core/lib/client_config/subchannel_factory.h
+++ b/src/core/lib/client_config/client_channel_factory.h
@@ -31,36 +31,55 @@
*
*/
-#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
-#define GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
+#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H
+#define GRPC_CORE_LIB_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H
+
+#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/client_config/subchannel.h"
-typedef struct grpc_subchannel_factory grpc_subchannel_factory;
-typedef struct grpc_subchannel_factory_vtable grpc_subchannel_factory_vtable;
+typedef struct grpc_client_channel_factory grpc_client_channel_factory;
+typedef struct grpc_client_channel_factory_vtable
+ grpc_client_channel_factory_vtable;
+
+typedef enum {
+ GRPC_CLIENT_CHANNEL_TYPE_REGULAR, /** for the user-level regular calls */
+ GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, /** for communication with a load
+ balancing service */
+} grpc_client_channel_type;
/** Constructor for new configured channels.
Creating decorators around this type is encouraged to adapt behavior. */
-struct grpc_subchannel_factory {
- const grpc_subchannel_factory_vtable *vtable;
+struct grpc_client_channel_factory {
+ const grpc_client_channel_factory_vtable *vtable;
};
-struct grpc_subchannel_factory_vtable {
- void (*ref)(grpc_subchannel_factory *factory);
- void (*unref)(grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory);
+struct grpc_client_channel_factory_vtable {
+ void (*ref)(grpc_client_channel_factory *factory);
+ void (*unref)(grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory);
grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *factory,
+ grpc_client_channel_factory *factory,
grpc_subchannel_args *args);
+ grpc_channel *(*create_client_channel)(grpc_exec_ctx *exec_ctx,
+ grpc_client_channel_factory *factory,
+ const char *target,
+ grpc_client_channel_type type,
+ grpc_channel_args *args);
};
-void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory);
-void grpc_subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *factory);
+void grpc_client_channel_factory_ref(grpc_client_channel_factory *factory);
+void grpc_client_channel_factory_unref(grpc_exec_ctx *exec_ctx,
+ grpc_client_channel_factory *factory);
/** Create a new grpc_subchannel */
-grpc_subchannel *grpc_subchannel_factory_create_subchannel(
- grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory,
+grpc_subchannel *grpc_client_channel_factory_create_subchannel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
grpc_subchannel_args *args);
-#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */
+/** Create a new grpc_channel */
+grpc_channel *grpc_client_channel_factory_create_channel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
+ const char *target, grpc_client_channel_type type, grpc_channel_args *args);
+
+#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H */
diff --git a/src/core/lib/client_config/lb_policy.c b/src/core/lib/client_config/lb_policy.c
index 3d23669ec2..6174162d49 100644
--- a/src/core/lib/client_config/lb_policy.c
+++ b/src/core/lib/client_config/lb_policy.c
@@ -101,10 +101,11 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata,
- target, on_complete);
+ initial_metadata_flags, target, on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
@@ -112,6 +113,14 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
policy->vtable->cancel_pick(exec_ctx, policy, target);
}
+void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq) {
+ policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
+ initial_metadata_flags_eq);
+}
+
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
policy->vtable->exit_idle(exec_ctx, policy);
}
diff --git a/src/core/lib/client_config/lb_policy.h b/src/core/lib/client_config/lb_policy.h
index a63e8e68df..60729ef8f9 100644
--- a/src/core/lib/client_config/lb_policy.h
+++ b/src/core/lib/client_config/lb_policy.h
@@ -60,9 +60,13 @@ struct grpc_lb_policy_vtable {
/** implement grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_closure *on_complete);
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+ void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq);
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
@@ -122,6 +126,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
+ uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete);
@@ -131,6 +136,14 @@ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+/** Cancel all pending picks which have:
+ (initial_metadata_flags & initial_metadata_flags_mask) ==
+ initial_metadata_flags_eq */
+void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy *policy,
+ uint32_t initial_metadata_flags_mask,
+ uint32_t initial_metadata_flags_eq);
+
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/client_config/lb_policy_factory.h b/src/core/lib/client_config/lb_policy_factory.h
index 6f21912821..4931d4df58 100644
--- a/src/core/lib/client_config/lb_policy_factory.h
+++ b/src/core/lib/client_config/lb_policy_factory.h
@@ -34,8 +34,8 @@
#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H
#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H
+#include "src/core/lib/client_config/client_channel_factory.h"
#include "src/core/lib/client_config/lb_policy.h"
-#include "src/core/lib/client_config/subchannel_factory.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@@ -51,7 +51,7 @@ struct grpc_lb_policy_factory {
typedef struct grpc_lb_policy_args {
grpc_resolved_addresses *addresses;
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {
diff --git a/src/core/lib/client_config/resolver_factory.h b/src/core/lib/client_config/resolver_factory.h
index a5bca06475..18c7fd7d62 100644
--- a/src/core/lib/client_config/resolver_factory.h
+++ b/src/core/lib/client_config/resolver_factory.h
@@ -34,8 +34,8 @@
#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_FACTORY_H
#define GRPC_CORE_LIB_CLIENT_CONFIG_RESOLVER_FACTORY_H
+#include "src/core/lib/client_config/client_channel_factory.h"
#include "src/core/lib/client_config/resolver.h"
-#include "src/core/lib/client_config/subchannel_factory.h"
#include "src/core/lib/client_config/uri_parser.h"
typedef struct grpc_resolver_factory grpc_resolver_factory;
@@ -49,7 +49,7 @@ struct grpc_resolver_factory {
typedef struct grpc_resolver_args {
grpc_uri *uri;
- grpc_subchannel_factory *subchannel_factory;
+ grpc_client_channel_factory *client_channel_factory;
} grpc_resolver_args;
struct grpc_resolver_factory_vtable {
diff --git a/src/core/lib/client_config/resolver_registry.c b/src/core/lib/client_config/resolver_registry.c
index 5f3db273b5..5584f6692e 100644
--- a/src/core/lib/client_config/resolver_registry.c
+++ b/src/core/lib/client_config/resolver_registry.c
@@ -123,14 +123,14 @@ static grpc_resolver_factory *resolve_factory(const char *target,
}
grpc_resolver *grpc_resolver_create(
- const char *target, grpc_subchannel_factory *subchannel_factory) {
+ const char *target, grpc_client_channel_factory *client_channel_factory) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
grpc_resolver *resolver;
grpc_resolver_args args;
memset(&args, 0, sizeof(args));
args.uri = uri;
- args.subchannel_factory = subchannel_factory;
+ args.client_channel_factory = client_channel_factory;
resolver = grpc_resolver_factory_create_resolver(factory, &args);
grpc_uri_destroy(uri);
return resolver;
diff --git a/src/core/lib/client_config/resolver_registry.h b/src/core/lib/client_config/resolver_registry.h
index 36c4f2fe03..b207313084 100644
--- a/src/core/lib/client_config/resolver_registry.h
+++ b/src/core/lib/client_config/resolver_registry.h
@@ -56,7 +56,7 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
return it.
If a resolver factory was not found, return NULL. */
grpc_resolver *grpc_resolver_create(
- const char *target, grpc_subchannel_factory *subchannel_factory);
+ const char *target, grpc_client_channel_factory *client_channel_factory);
/** Find a resolver factory given a name and return an (owned-by-the-caller)
* reference to it */
diff --git a/src/core/lib/client_config/subchannel.c b/src/core/lib/client_config/subchannel.c
index 47c53a16ba..f44e2f7404 100644
--- a/src/core/lib/client_config/subchannel.c
+++ b/src/core/lib/client_config/subchannel.c
@@ -54,7 +54,7 @@
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
#define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
-#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 2
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
@@ -352,6 +352,25 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
c->args->args[i].value.integer,
c->args->args[i].value.integer);
}
+ if (0 ==
+ strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
+ if (c->args->args[i].type == GRPC_ARG_INTEGER) {
+ if (c->args->args[i].value.integer >= 0) {
+ gpr_backoff_init(
+ &c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
+ GRPC_SUBCHANNEL_RECONNECT_JITTER,
+ GPR_MIN(c->args->args[i].value.integer,
+ GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000),
+ c->args->args[i].value.integer);
+ } else {
+ gpr_log(GPR_ERROR, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS
+ " : must be non-negative");
+ }
+ } else {
+ gpr_log(GPR_ERROR,
+ GRPC_ARG_MAX_RECONNECT_BACKOFF_MS " : must be an integer");
+ }
+ }
}
}
gpr_mu_init(&c->mu);
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index dba335490b..3c8127e1a8 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 235a7df08d..0eb95a2e09 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 9d27b2bcda..1fa9f5ef2d 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 37cc724b53..ba94852f9a 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -81,11 +81,11 @@ typedef enum {
/* Status came from the application layer overriding whatever
the wire says */
STATUS_FROM_API_OVERRIDE = 0,
- /* Status was created by some internal channel stack operation */
- STATUS_FROM_CORE,
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
+ /* Status was created by some internal channel stack operation */
+ STATUS_FROM_CORE,
/* Status came from the server sending status */
STATUS_FROM_SERVER_STATUS,
STATUS_SOURCE_COUNT
@@ -1110,6 +1110,9 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
gpr_mu_lock(&call->mu);
if (bctl->send_initial_metadata) {
+ if (!success) {
+ set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE);
+ }
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
@@ -1232,8 +1235,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call->metadata_batch[0][0].deadline = call->send_deadline;
stream_op.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
- stream_op.send_idempotent_request =
- (op->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) != 0;
+ stream_op.send_initial_metadata_flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index 640fd7e137..68afac6fac 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -35,7 +35,7 @@
#define GRPC_CORE_LIB_SURFACE_CHANNEL_H
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/client_config/subchannel_factory.h"
+#include "src/core/lib/client_config/client_channel_factory.h"
#include "src/core/lib/surface/channel_stack_type.h"
grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 460e4dcedc..bb1120ee3a 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -101,9 +101,9 @@ typedef struct grpc_transport_stream_op {
/** Send initial metadata to the peer, from the provided metadata batch.
idempotent_request MUST be set if this is non-null */
grpc_metadata_batch *send_initial_metadata;
- /** Iff send_initial_metadata != NULL, flags if this is an idempotent request
- or not */
- bool send_idempotent_request;
+ /** Iff send_initial_metadata != NULL, flags associated with
+ send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
+ uint32_t send_initial_metadata_flags;
/** Send trailing metadata to the peer, from the provided metadata batch. */
grpc_metadata_batch *send_trailing_metadata;