aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/client_channel.c94
-rw-r--r--src/core/channel/connectivity_state.c3
-rw-r--r--src/core/client_config/lb_policies/pick_first.c6
-rw-r--r--src/core/client_config/lb_policy.c4
-rw-r--r--src/core/client_config/lb_policy.h2
-rw-r--r--src/core/client_config/subchannel.c94
-rw-r--r--src/core/client_config/subchannel.h22
-rw-r--r--src/core/surface/channel.c28
-rw-r--r--src/core/surface/channel.h2
9 files changed, 149 insertions, 106 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 4d082aceb8..19700a90a6 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -38,6 +38,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
+#include "src/core/channel/connectivity_state.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/pollset_set.h"
#include "src/core/support/string.h"
@@ -68,8 +69,7 @@ typedef struct {
/** resolver callback */
grpc_iomgr_closure on_config_changed;
/** connectivity state being tracked */
- grpc_iomgr_closure *on_connectivity_state_change;
- grpc_connectivity_state *connectivity_state;
+ grpc_connectivity_state_tracker state_tracker;
} channel_data;
typedef enum {
@@ -98,60 +98,6 @@ struct call_data {
grpc_linked_mdelem details;
};
-#if 0
-static int prepare_activate(grpc_call_element *elem,
- grpc_child_channel *on_child) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- if (calld->state == CALL_CANCELLED) return 0;
-
- /* no more access to calld->s.waiting allowed */
- GPR_ASSERT(calld->state == CALL_WAITING);
-
- if (calld->waiting_op.bind_pollset) {
- grpc_transport_setup_del_interested_party(chand->transport_setup,
- calld->waiting_op.bind_pollset);
- }
-
- calld->state = CALL_ACTIVE;
-
- /* create a child call */
- /* TODO(ctiller): pass the waiting op down here */
- calld->s.active.child_call =
- grpc_child_channel_create_call(on_child, elem, NULL);
-
- return 1;
-}
-
-static void complete_activate(grpc_call_element *elem, grpc_transport_stream_op *op) {
- call_data *calld = elem->call_data;
- grpc_call_element *child_elem =
- grpc_child_call_get_top_element(calld->s.active.child_call);
-
- GPR_ASSERT(calld->state == CALL_ACTIVE);
-
- /* continue the start call down the stack, this nees to happen after metadata
- are flushed*/
- child_elem->filter->start_transport_op(child_elem, op);
-}
-
-static void remove_waiting_child(channel_data *chand, call_data *calld) {
- size_t new_count;
- size_t i;
- for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
- if (chand->waiting_children[i] == calld) {
- grpc_transport_setup_del_interested_party(
- chand->transport_setup, calld->waiting_op.bind_pollset);
- continue;
- }
- chand->waiting_children[new_count++] = chand->waiting_children[i];
- }
- GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
- new_count == chand->waiting_child_count);
- chand->waiting_child_count = new_count;
-}
-#endif
-
static void handle_op_after_cancellation(grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
@@ -426,7 +372,39 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
}
-static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {}
+static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {
+ grpc_lb_policy *lb_policy = NULL;
+ channel_data *chand = elem->channel_data;
+ grpc_iomgr_closure *on_consumed = op->on_consumed;
+ op->on_consumed = NULL;
+
+ GPR_ASSERT(op->set_accept_stream == NULL);
+ GPR_ASSERT(op->bind_pollset == NULL);
+
+ gpr_mu_lock(&chand->mu_config);
+ if (op->on_connectivity_state_change != NULL) {
+ grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change);
+ op->on_connectivity_state_change = NULL;
+ op->connectivity_state = NULL;
+ }
+
+ if (!is_empty(op, sizeof(*op))) {
+ lb_policy = chand->lb_policy;
+ if (lb_policy) {
+ grpc_lb_policy_ref(lb_policy);
+ }
+ }
+ gpr_mu_unlock(&chand->mu_config);
+
+ if (lb_policy) {
+ grpc_lb_policy_broadcast(lb_policy, op);
+ grpc_lb_policy_unref(lb_policy);
+ }
+
+ if (on_consumed) {
+ grpc_iomgr_add_callback(on_consumed);
+ }
+}
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
@@ -458,7 +436,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
case CALL_ACTIVE:
subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
- grpc_subchannel_call_unref(subchannel_call);
+ GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "channel");
break;
case CALL_CREATED:
case CALL_CANCELLED:
diff --git a/src/core/channel/connectivity_state.c b/src/core/channel/connectivity_state.c
index 566a2c3344..0ee268ee59 100644
--- a/src/core/channel/connectivity_state.c
+++ b/src/core/channel/connectivity_state.c
@@ -75,6 +75,9 @@ int grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_track
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state) {
grpc_connectivity_state_watcher *new = NULL;
grpc_connectivity_state_watcher *w;
+ if (tracker->current_state == state) {
+ return;
+ }
tracker->current_state = state;
while ((w = tracker->watchers)) {
tracker->watchers = w->next;
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 4fda07a63f..e6e743fa7a 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -173,7 +173,7 @@ loop:
p->checking_subchannel %= p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]);
p->num_subchannels--;
- grpc_subchannel_unref(p->subchannels[p->num_subchannels]);
+ GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
add_interested_parties_locked(p);
if (p->num_subchannels == 0) {
abort();
@@ -199,13 +199,13 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
subchannels = gpr_malloc(n * sizeof(*subchannels));
for (i = 0; i < n; i++) {
subchannels[i] = p->subchannels[i];
- grpc_subchannel_ref(subchannels[i]);
+ GRPC_SUBCHANNEL_REF(subchannels[i], "broadcast");
}
gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) {
grpc_subchannel_process_transport_op(subchannels[i], op);
- grpc_subchannel_unref(subchannels[i]);
+ GRPC_SUBCHANNEL_UNREF(subchannels[i], "broadcast");
}
gpr_free(subchannels);
}
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 5a2b70b94e..2daba14c2a 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -49,3 +49,7 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_iomgr_closure *on_complete) {
policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete);
}
+
+void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) {
+ policy->vtable->broadcast(policy, op);
+}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 42be9152cb..dcefa6e27e 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -85,4 +85,6 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_subchannel **target,
grpc_iomgr_closure *on_complete);
+void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op);
+
#endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 5cbb5d9971..e441befc0c 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -106,13 +106,67 @@ static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(void *subchannel, int iomgr_success);
/*
+ * connection implementation
+ */
+
+#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
+#define CONNECTION_REF(c, r) connection_ref((c), __FILE__, __LINE__, (r))
+#define CONNECTION_UNREF(c, r) connection_unref((c), __FILE__, __LINE__, (r))
+#else
+#define CONNECTION_REF(c, r) connection_ref((c))
+#define CONNECTION_UNREF(c, r) connection_unref((c))
+#endif
+
+#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
+static void connection_ref(connection *c, const char *file, int line, const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCONN:%p ref %d -> %d %s",
+ c, (int)c->refs.count, (int)c->refs.count + 1,
+ reason);
+#else
+static void connection_ref(connection *c) {
+#endif
+ gpr_ref(&c->refs);
+}
+
+#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
+static void connection_unref(connection *c, const char *file, int line, const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCONN:%p unref %d -> %d %s",
+ c, (int)c->refs.count, (int)c->refs.count - 1,
+ reason);
+#else
+static void connection_unref(connection *c) {
+#endif
+ if (gpr_unref(&c->refs)) {
+ GRPC_SUBCHANNEL_UNREF(c->subchannel, "connection");
+ gpr_free(c);
+ }
+}
+
+/*
* grpc_subchannel implementation
*/
-void grpc_subchannel_ref(grpc_subchannel *c) { gpr_ref(&c->refs); }
+#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
+void grpc_subchannel_ref(grpc_subchannel *c, const char *file, int line, const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHAN:%p ref %d -> %d %s",
+ c, (int)c->refs.count, (int)c->refs.count + 1,
+ reason);
+#else
+void grpc_subchannel_ref(grpc_subchannel *c) {
+#endif
+ gpr_ref(&c->refs);
+}
+#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
+void grpc_subchannel_unref(grpc_subchannel *c, const char *file, int line, const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHAN:%p unref %d -> %d %s",
+ c, (int)c->refs.count, (int)c->refs.count - 1,
+ reason);
+#else
void grpc_subchannel_unref(grpc_subchannel *c) {
+#endif
if (gpr_unref(&c->refs)) {
+ if (c->active != NULL) CONNECTION_UNREF(c->active, "subchannel");
gpr_free(c->filters);
grpc_channel_args_destroy(c->args);
gpr_free(c->addr);
@@ -178,7 +232,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
con = c->active;
- gpr_ref(&con->refs);
+ CONNECTION_REF(con, "call");
gpr_mu_unlock(&c->mu);
*target = create_call(con, initial_op);
@@ -194,7 +248,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
if (!c->connecting) {
c->connecting = 1;
connectivity_state_changed_locked(c);
- grpc_subchannel_ref(c);
+ GRPC_SUBCHANNEL_REF(c, "connection");
gpr_mu_unlock(&c->mu);
start_connect(c);
@@ -220,7 +274,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) {
do_connect = 1;
c->connecting = 1;
- grpc_subchannel_ref(c);
+ GRPC_SUBCHANNEL_REF(c, "connection");
grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c));
}
gpr_mu_unlock(&c->mu);
@@ -275,7 +329,7 @@ static void subchannel_connected(void *arg, int iomgr_success) {
if (c->connecting_result.transport) {
publish_transport(c);
} else {
- grpc_subchannel_unref(c);
+ GRPC_SUBCHANNEL_UNREF(c, "connection");
/* TODO(ctiller): retry after sleeping */
abort();
}
@@ -304,17 +358,29 @@ static void connectivity_state_changed_locked(grpc_subchannel *c) {
* grpc_subchannel_call implementation
*/
-void grpc_subchannel_call_ref(grpc_subchannel_call *call) {
- gpr_ref(&call->refs);
+#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
+void grpc_subchannel_call_ref(grpc_subchannel_call *c, const char *file, int line, const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCALL:%p ref %d -> %d %s",
+ c, (int)c->refs.count, (int)c->refs.count + 1,
+ reason);
+#else
+void grpc_subchannel_call_ref(grpc_subchannel_call *c) {
+#endif
+ gpr_ref(&c->refs);
}
-void grpc_subchannel_call_unref(grpc_subchannel_call *call) {
- if (gpr_unref(&call->refs)) {
- grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(call));
- if (gpr_unref(&call->connection->refs)) {
- gpr_free(call->connection);
- }
- gpr_free(call);
+#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
+void grpc_subchannel_call_unref(grpc_subchannel_call *c, const char *file, int line, const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCALL:%p unref %d -> %d %s",
+ c, (int)c->refs.count, (int)c->refs.count - 1,
+ reason);
+#else
+void grpc_subchannel_call_unref(grpc_subchannel_call *c) {
+#endif
+ if (gpr_unref(&c->refs)) {
+ grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c));
+ CONNECTION_UNREF(c->connection, "call");
+ gpr_free(c);
}
}
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 60b95d3d8f..7cdcccce8f 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -37,14 +37,33 @@
#include "src/core/channel/channel_stack.h"
#include "src/core/client_config/connector.h"
+#define GRPC_SUBCHANNEL_REFCOUNT_DEBUG
+
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
+#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
+#define GRPC_SUBCHANNEL_REF(c, r) grpc_subchannel_ref((c), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_UNREF(c, r) grpc_subchannel_unref((c), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_CALL_REF(c, r) grpc_subchannel_call_ref((c), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_CALL_UNREF(c, r) grpc_subchannel_call_unref((c), __FILE__, __LINE__, (r))
+void grpc_subchannel_ref(grpc_subchannel *channel, const char *file, int line, const char *reason);
+void grpc_subchannel_unref(grpc_subchannel *channel, const char *file, int line, const char *reason);
+void grpc_subchannel_call_ref(grpc_subchannel_call *call, const char *file, int line, const char *reason);
+void grpc_subchannel_call_unref(grpc_subchannel_call *call, const char *file, int line, const char *reason);
+#else
+#define GRPC_SUBCHANNEL_REF(c, r) grpc_subchannel_ref((c))
+#define GRPC_SUBCHANNEL_UNREF(c, r) grpc_subchannel_unref((c))
+#define GRPC_SUBCHANNEL_CALL_REF(c, r) grpc_subchannel_call_ref((c))
+#define GRPC_SUBCHANNEL_CALL_UNREF(c, r) grpc_subchannel_call_unref((c))
void grpc_subchannel_ref(grpc_subchannel *channel);
void grpc_subchannel_unref(grpc_subchannel *channel);
+void grpc_subchannel_call_ref(grpc_subchannel_call *call);
+void grpc_subchannel_call_unref(grpc_subchannel_call *call);
+#endif
/** construct a call (possibly asynchronously) */
void grpc_subchannel_create_call(grpc_subchannel *subchannel,
@@ -55,9 +74,6 @@ void grpc_subchannel_create_call(grpc_subchannel *subchannel,
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, grpc_transport_op *op);
-void grpc_subchannel_call_ref(grpc_subchannel_call *call);
-void grpc_subchannel_call_unref(grpc_subchannel_call *call);
-
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
grpc_subchannel *channel);
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 8c136af841..4857912b4f 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -93,9 +93,8 @@ grpc_channel *grpc_channel_create_from_filters(
grpc_channel *channel = gpr_malloc(size);
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
channel->is_client = is_client;
- /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if
- * is_client */
- gpr_ref_init(&channel->refs, 1 + is_client);
+ /* decremented by grpc_channel_destroy */
+ gpr_ref_init(&channel->refs, 1);
channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
channel->grpc_compression_level_string =
@@ -237,33 +236,10 @@ void grpc_channel_internal_unref(grpc_channel *channel) {
}
}
-static void default_consumed(void *arg, int iomgr_success) {
- grpc_channel *channel = arg;
- GRPC_CHANNEL_INTERNAL_UNREF(channel, "op");
-}
-
-static void execute_op(grpc_channel *channel, grpc_transport_op *op) {
- grpc_channel_element *elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
- if (op->on_consumed == NULL) {
- GRPC_CHANNEL_INTERNAL_REF(channel, "op");
- op->on_consumed = gpr_malloc(sizeof(*op->on_consumed));
- grpc_iomgr_closure_init(op->on_consumed, default_consumed, channel);
- }
- elem->filter->start_transport_op(elem, op);
-}
-
void grpc_channel_destroy(grpc_channel *channel) {
- grpc_transport_op op;
- memset(&op, 0, sizeof(op));
- op.disconnect = 1;
- execute_op(channel, &op);
GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel");
}
-void grpc_client_channel_closed(grpc_channel_element *elem) {
- GRPC_CHANNEL_INTERNAL_UNREF(CHANNEL_FROM_TOP_ELEM(elem), "closed");
-}
-
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) {
return CHANNEL_STACK_FROM_CHANNEL(channel);
}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 516c0ac559..71f8a55731 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -58,8 +58,6 @@ grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
-void grpc_client_channel_closed(grpc_channel_element *elem);
-
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
void grpc_channel_internal_unref(grpc_channel *channel, const char *reason);