aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-06-07 11:51:51 -0700
committerGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-06-07 11:51:51 -0700
commitd30d4e279c4a63effaa6e912fc00bd4ad96054c7 (patch)
tree4a5f475f5a7dae22eb60bd482b1a182bb43d7db0 /src/core/ext
parent570408fd4c508e2726fdff3e1c2541465ebc74a7 (diff)
parent4461da702ce8c23afae565201a60cea054421f49 (diff)
Merge pull request #6190 from dgquintas/lb_pollset_propagation
Enable use of pollset_set for a channel call
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/census/grpc_filter.c4
-rw-r--r--src/core/ext/client_config/client_channel.c11
-rw-r--r--src/core/ext/client_config/lb_policy.c4
-rw-r--r--src/core/ext/client_config/lb_policy.h6
-rw-r--r--src/core/ext/client_config/subchannel.c4
-rw-r--r--src/core/ext/client_config/subchannel.h3
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c5
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.h3
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c26
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c22
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c8
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c18
13 files changed, 72 insertions, 44 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 8c4c17ad9a..86ec2ae87a 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -180,7 +180,7 @@ const grpc_channel_filter grpc_client_census_filter = {
grpc_channel_next_op,
sizeof(call_data),
client_init_call_elem,
- grpc_call_stack_ignore_set_pollset,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
client_destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
@@ -193,7 +193,7 @@ const grpc_channel_filter grpc_server_census_filter = {
grpc_channel_next_op,
sizeof(call_data),
server_init_call_elem,
- grpc_call_stack_ignore_set_pollset,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
server_destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 713c2c7814..e297dfa0ef 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -376,7 +376,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
int r;
GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu_config);
- r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset,
+ r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent,
initial_metadata, initial_metadata_flags,
connected_subchannel, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
@@ -461,10 +461,11 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
gpr_mu_destroy(&chand->mu_config);
}
-static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_pollset *pollset) {
+static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_polling_entity *pollent) {
call_data *calld = elem->call_data;
- calld->pollset = pollset;
+ calld->pollent = pollent;
}
const grpc_channel_filter grpc_client_channel_filter = {
@@ -472,7 +473,7 @@ const grpc_channel_filter grpc_client_channel_filter = {
cc_start_transport_op,
sizeof(call_data),
init_call_elem,
- cc_set_pollset,
+ cc_set_pollset_or_pollset_set,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index a7ad9842dc..20535398d6 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -99,12 +99,12 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
}
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_pollset *pollset,
+ grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
- return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata,
+ return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata,
initial_metadata_flags, target, on_complete);
}
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index 0384e0b2eb..56fa11198b 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -35,6 +35,7 @@
#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H
#include "src/core/ext/client_config/subchannel.h"
+#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
/** A load balancing policy: specified by a vtable and a struct (which
@@ -59,7 +60,8 @@ struct grpc_lb_policy_vtable {
/** implement grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ grpc_polling_entity *pollent,
+ grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_closure *on_complete);
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
@@ -124,7 +126,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
\a target.
Picking can be asynchronous. Any IO should be done under \a pollset. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_pollset *pollset,
+ grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 3a635be02f..85183b0564 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -683,7 +683,7 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_subchannel_call *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_pollset *pollset) {
+ grpc_polling_entity *pollent) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call =
gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
@@ -692,7 +692,7 @@ grpc_subchannel_call *grpc_connected_subchannel_create_call(
GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
NULL, NULL, callstk);
- grpc_call_stack_set_pollset(exec_ctx, callstk, pollset);
+ grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent);
return call;
}
diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h
index 0765a544e8..525f854a44 100644
--- a/src/core/ext/client_config/subchannel.h
+++ b/src/core/ext/client_config/subchannel.h
@@ -36,6 +36,7 @@
#include "src/core/ext/client_config/connector.h"
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
/** A (sub-)channel that knows how to connect to exactly one target
@@ -109,7 +110,7 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
/** construct a subchannel call */
grpc_subchannel_call *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
- grpc_pollset *pollset);
+ grpc_polling_entity *pollent);
/** process a transport level op */
void grpc_connected_subchannel_process_transport_op(
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index 91fa917661..3df1f254d6 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -68,6 +68,7 @@ void grpc_subchannel_call_holder_init(
holder->waiting_ops_capacity = 0;
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
holder->owning_call = owning_call;
+ holder->pollent = NULL;
}
void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
@@ -157,7 +158,7 @@ retry:
gpr_atm_rel_store(
&holder->subchannel_call,
(gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
- exec_ctx, holder->connected_subchannel, holder->pollset));
+ exec_ctx, holder->connected_subchannel, holder->pollent));
retry_waiting_locked(exec_ctx, holder);
goto retry;
}
@@ -183,7 +184,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
gpr_atm_rel_store(
&holder->subchannel_call,
(gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
- exec_ctx, holder->connected_subchannel, holder->pollset));
+ exec_ctx, holder->connected_subchannel, holder->pollent));
retry_waiting_locked(exec_ctx, holder);
}
gpr_mu_unlock(&holder->mu);
diff --git a/src/core/ext/client_config/subchannel_call_holder.h b/src/core/ext/client_config/subchannel_call_holder.h
index 9299908788..8d2deb02f3 100644
--- a/src/core/ext/client_config/subchannel_call_holder.h
+++ b/src/core/ext/client_config/subchannel_call_holder.h
@@ -35,6 +35,7 @@
#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_CALL_HOLDER_H
#include "src/core/ext/client_config/subchannel.h"
+#include "src/core/lib/iomgr/polling_entity.h"
/** Pick a subchannel for grpc_subchannel_call_holder;
Return 1 if subchannel is available immediately (in which case on_ready
@@ -71,7 +72,7 @@ typedef struct grpc_subchannel_call_holder {
grpc_subchannel_call_holder_creation_phase creation_phase;
grpc_connected_subchannel *connected_subchannel;
- grpc_pollset *pollset;
+ grpc_polling_entity *pollent;
grpc_transport_stream_op *waiting_ops;
size_t waiting_ops_count;
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index b5797da832..cc559eb2da 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -39,7 +39,7 @@
typedef struct pending_pick {
struct pending_pick *next;
- grpc_pollset *pollset;
+ grpc_polling_entity *pollent;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
@@ -118,8 +118,8 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
pp = next;
@@ -136,8 +136,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
@@ -162,8 +162,8 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
} else {
@@ -196,7 +196,8 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ grpc_polling_entity *pollent,
+ grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
@@ -221,10 +222,11 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollset = pollset;
+ pp->pollent = pollent;
pp->target = target;
pp->initial_metadata_flags = initial_metadata_flags;
pp->on_complete = on_complete;
@@ -304,8 +306,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = selected;
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 7ab913b645..8645333c8e 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -48,7 +48,7 @@ int grpc_lb_round_robin_trace = 0;
* Once a pick is available, \a target is updated and \a on_complete called. */
typedef struct pending_pick {
struct pending_pick *next;
- grpc_pollset *pollset;
+ grpc_polling_entity *pollent;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
@@ -262,8 +262,8 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
@@ -288,8 +288,8 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
*pp->target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
@@ -331,7 +331,8 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ grpc_polling_entity *pollent,
+ grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
@@ -354,10 +355,11 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollset = pollset;
+ pp->pollent = pollent;
pp->target = target;
pp->on_complete = on_complete;
pp->initial_metadata_flags = initial_metadata_flags;
@@ -406,8 +408,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index f49730fac3..f372f88c3a 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -142,7 +142,7 @@ const grpc_channel_filter grpc_load_reporting_filter = {
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
- grpc_call_stack_ignore_set_pollset,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 1abc49f435..046b395001 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1734,6 +1734,13 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
add_to_pollset_locked, pollset, 0);
}
+static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset_set *pollset_set) {
+ grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
+ (grpc_chttp2_stream *)gs,
+ add_to_pollset_set_locked, pollset_set, 0);
+}
+
/*******************************************************************************
* BYTE STREAM
*/
@@ -2055,6 +2062,7 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
"chttp2",
init_stream,
set_pollset,
+ set_pollset_set,
perform_stream_op,
perform_transport_op,
destroy_stream,
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index df160aaaf6..1bb53b756c 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -152,6 +152,10 @@ static void next_recv_step(stream_obj *s, enum e_caller caller);
static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset *pollset) {}
+static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
+ grpc_transport *gt, grpc_stream *gs,
+ grpc_pollset_set *pollset_set) {}
+
static void enqueue_callbacks(grpc_closure *callback_list[]) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
if (callback_list[0]) {
@@ -646,7 +650,13 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
}
}
-const grpc_transport_vtable grpc_cronet_vtable = {
- sizeof(stream_obj), "cronet_http", init_stream,
- set_pollset_do_nothing, perform_stream_op, NULL,
- destroy_stream, destroy_transport, NULL};
+const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj),
+ "cronet_http",
+ init_stream,
+ set_pollset_do_nothing,
+ set_pollset_set_do_nothing,
+ perform_stream_op,
+ NULL,
+ destroy_stream,
+ destroy_transport,
+ NULL};