aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-01 08:55:28 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-01 08:55:28 -0700
commit5d44c069e6870c0ddf26c1782f035c2835983e31 (patch)
tree280b65c92917be16efd1bf43f881d0731bec1f56
parent95078352a7ed5526f3977c5331a8a510a223517c (diff)
Refcounting fixes and debugging, empty batch stability fixes
-rw-r--r--src/core/channel/client_channel.c72
-rw-r--r--src/core/httpcli/httpcli.c2
-rw-r--r--src/core/security/client_auth_filter.c4
-rw-r--r--src/core/security/secure_transport_setup.c4
-rw-r--r--src/core/security/security_connector.c36
-rw-r--r--src/core/security/security_connector.h23
-rw-r--r--src/core/security/server_auth_filter.c4
-rw-r--r--src/core/security/server_secure_chttp2.c4
-rw-r--r--src/core/surface/secure_channel_create.c4
9 files changed, 109 insertions, 44 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 95afc0d2e3..a4de59efb1 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -77,6 +77,7 @@ typedef struct {
typedef enum {
CALL_CREATED,
+ CALL_WAITING_FOR_SEND,
CALL_WAITING_FOR_CONFIG,
CALL_WAITING_FOR_PICK,
CALL_WAITING_FOR_CALL,
@@ -101,6 +102,9 @@ struct call_data {
grpc_linked_mdelem details;
};
+static grpc_iomgr_closure *merge_into_waiting_op(grpc_call_element *elem,
+ grpc_transport_stream_op *new_op) GRPC_MUST_USE_RESULT;
+
static void handle_op_after_cancellation(grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
@@ -241,12 +245,13 @@ static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
&calld->picked_channel, &calld->async_setup_task);
}
-static void merge_into_waiting_op(grpc_call_element *elem,
+static grpc_iomgr_closure *merge_into_waiting_op(grpc_call_element *elem,
grpc_transport_stream_op *new_op) {
call_data *calld = elem->call_data;
+ grpc_iomgr_closure *consumed_op = NULL;
grpc_transport_stream_op *waiting_op = &calld->waiting_op;
- GPR_ASSERT((waiting_op->send_ops == NULL) != (new_op->send_ops == NULL));
- GPR_ASSERT((waiting_op->recv_ops == NULL) != (new_op->recv_ops == NULL));
+ GPR_ASSERT((waiting_op->send_ops == NULL) != (new_op->send_ops == NULL) || waiting_op->send_ops == NULL);
+ GPR_ASSERT((waiting_op->recv_ops == NULL) != (new_op->recv_ops == NULL) || waiting_op->recv_ops == NULL);
if (new_op->send_ops != NULL) {
waiting_op->send_ops = new_op->send_ops;
waiting_op->is_last_send = new_op->is_last_send;
@@ -257,13 +262,16 @@ static void merge_into_waiting_op(grpc_call_element *elem,
waiting_op->recv_state = new_op->recv_state;
waiting_op->on_done_recv = new_op->on_done_recv;
}
- if (waiting_op->on_consumed == NULL) {
+ if (new_op->on_consumed != NULL) {
+ if (waiting_op->on_consumed != NULL) {
+ consumed_op = waiting_op->on_consumed;
+ }
waiting_op->on_consumed = new_op->on_consumed;
- new_op->on_consumed = NULL;
}
if (new_op->cancel_with_status != GRPC_STATUS_OK) {
waiting_op->cancel_with_status = new_op->cancel_with_status;
}
+ return consumed_op;
}
static void perform_transport_stream_op(grpc_call_element *elem,
@@ -274,6 +282,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
grpc_transport_stream_op op2;
+ grpc_iomgr_closure *consumed_op = NULL;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@@ -289,6 +298,17 @@ static void perform_transport_stream_op(grpc_call_element *elem,
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(elem, op);
break;
+ case CALL_WAITING_FOR_SEND:
+ GPR_ASSERT(!continuation);
+ consumed_op = merge_into_waiting_op(elem, op);
+ if (!calld->waiting_op.send_ops && calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
+ gpr_mu_unlock(&calld->mu_state);
+ break;
+ }
+ *op = calld->waiting_op;
+ memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
+ continuation = 1;
+ /* fall through */
case CALL_WAITING_FOR_CONFIG:
case CALL_WAITING_FOR_PICK:
case CALL_WAITING_FOR_CALL:
@@ -308,7 +328,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
handle_op_after_cancellation(elem, op);
handle_op_after_cancellation(elem, &op2);
} else {
- merge_into_waiting_op(elem, op);
+ consumed_op = merge_into_waiting_op(elem, op);
gpr_mu_unlock(&calld->mu_state);
if (op->on_consumed != NULL) {
op->on_consumed->cb(op->on_consumed->cb_arg, 0);
@@ -325,26 +345,37 @@ static void perform_transport_stream_op(grpc_call_element *elem,
} else {
calld->waiting_op = *op;
- gpr_mu_lock(&chand->mu_config);
- lb_policy = chand->lb_policy;
- if (lb_policy) {
- GRPC_LB_POLICY_REF(lb_policy, "pick");
- gpr_mu_unlock(&chand->mu_config);
- calld->state = CALL_WAITING_FOR_PICK;
+ if (op->send_ops == NULL) {
+ /* need to have some send ops before we can select the
+ lb target */
+ calld->state = CALL_WAITING_FOR_SEND;
gpr_mu_unlock(&calld->mu_state);
-
- pick_target(lb_policy, calld);
-
- GRPC_LB_POLICY_UNREF(lb_policy, "pick");
} else {
- calld->state = CALL_WAITING_FOR_CONFIG;
- add_to_lb_policy_wait_queue_locked_state_config(elem);
- gpr_mu_unlock(&chand->mu_config);
- gpr_mu_unlock(&calld->mu_state);
+ gpr_mu_lock(&chand->mu_config);
+ lb_policy = chand->lb_policy;
+ if (lb_policy) {
+ GRPC_LB_POLICY_REF(lb_policy, "pick");
+ gpr_mu_unlock(&chand->mu_config);
+ calld->state = CALL_WAITING_FOR_PICK;
+ gpr_mu_unlock(&calld->mu_state);
+
+ pick_target(lb_policy, calld);
+
+ GRPC_LB_POLICY_UNREF(lb_policy, "pick");
+ } else {
+ calld->state = CALL_WAITING_FOR_CONFIG;
+ add_to_lb_policy_wait_queue_locked_state_config(elem);
+ gpr_mu_unlock(&chand->mu_config);
+ gpr_mu_unlock(&calld->mu_state);
+ }
}
}
break;
}
+
+ if (consumed_op != NULL) {
+ consumed_op->cb(consumed_op->cb_arg, 1);
+ }
}
static void cc_start_transport_stream_op(grpc_call_element *elem,
@@ -503,6 +534,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
case CALL_WAITING_FOR_PICK:
case CALL_WAITING_FOR_CONFIG:
case CALL_WAITING_FOR_CALL:
+ case CALL_WAITING_FOR_SEND:
gpr_log(GPR_ERROR, "should never reach here");
abort();
break;
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 914355a408..3f5557e08e 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -198,7 +198,7 @@ static void on_connected(void *arg, grpc_endpoint *tcp) {
GRPC_SECURITY_OK);
grpc_setup_secure_transport(&sc->base, tcp, on_secure_transport_setup_done,
req);
- grpc_security_connector_unref(&sc->base);
+ GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli");
} else {
start_write(req);
}
diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c
index 5d04ec49b2..6816fbcfa1 100644
--- a/src/core/security/client_auth_filter.c
+++ b/src/core/security/client_auth_filter.c
@@ -297,7 +297,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
/* initialize members */
GPR_ASSERT(sc->is_client_side);
chand->security_connector =
- (grpc_channel_security_connector *)grpc_security_connector_ref(sc);
+ (grpc_channel_security_connector *)GRPC_SECURITY_CONNECTOR_REF(sc, "client_auth_filter");
chand->md_ctx = metadata_context;
chand->authority_string = grpc_mdstr_from_string(chand->md_ctx, ":authority");
chand->path_string = grpc_mdstr_from_string(chand->md_ctx, ":path");
@@ -310,7 +310,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
/* grab pointers to our data from the channel element */
channel_data *chand = elem->channel_data;
grpc_channel_security_connector *ctx = chand->security_connector;
- if (ctx != NULL) grpc_security_connector_unref(&ctx->base);
+ if (ctx != NULL) GRPC_SECURITY_CONNECTOR_UNREF(&ctx->base, "client_auth_filter");
if (chand->authority_string != NULL) {
grpc_mdstr_unref(chand->authority_string);
}
diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c
index 1b39ab141e..becc23bf7f 100644
--- a/src/core/security/secure_transport_setup.c
+++ b/src/core/security/secure_transport_setup.c
@@ -74,7 +74,7 @@ static void secure_transport_setup_done(grpc_secure_transport_setup *s,
if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker);
if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer);
gpr_slice_buffer_destroy(&s->left_overs);
- grpc_security_connector_unref(s->connector);
+ GRPC_SECURITY_CONNECTOR_UNREF(s->connector, "secure_transport_setup");
gpr_free(s);
}
@@ -275,7 +275,7 @@ void grpc_setup_secure_transport(grpc_security_connector *connector,
secure_transport_setup_done(s, 0);
return;
}
- s->connector = grpc_security_connector_ref(connector);
+ s->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "secure_transport_setup");
s->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE;
s->handshake_buffer = gpr_malloc(s->handshake_buffer_size);
s->endpoint = nonsecure_endpoint;
diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c
index 34cb0395a2..f53e005d5b 100644
--- a/src/core/security/security_connector.c
+++ b/src/core/security/security_connector.c
@@ -124,24 +124,42 @@ grpc_security_status grpc_channel_security_connector_check_call_host(
return sc->check_call_host(sc, host, cb, user_data);
}
-void grpc_security_connector_unref(grpc_security_connector *sc) {
- if (sc == NULL) return;
- if (gpr_unref(&sc->refcount)) sc->vtable->destroy(sc);
-}
-
-grpc_security_connector *grpc_security_connector_ref(
- grpc_security_connector *sc) {
+#ifdef GRPC_SECURITY_CONNECTOR_REFCOUNT_DEBUG
+grpc_security_connector *grpc_security_connector_ref(grpc_security_connector *sc,
+ const char *file, int line,
+ const char *reason) {
+ if (sc == NULL) return NULL;
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "SECURITY_CONNECTOR:%p ref %d -> %d %s", sc, (int)sc->refcount.count,
+ (int)sc->refcount.count + 1, reason);
+#else
+grpc_security_connector *grpc_security_connector_ref(grpc_security_connector *sc) {
if (sc == NULL) return NULL;
+#endif
gpr_ref(&sc->refcount);
return sc;
}
+#ifdef GRPC_SECURITY_CONNECTOR_REFCOUNT_DEBUG
+void grpc_security_connector_unref(grpc_security_connector *sc, const char *file, int line,
+ const char *reason) {
+ if (sc == NULL) return;
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "SECURITY_CONNECTOR:%p unref %d -> %d %s", sc, (int)sc->refcount.count,
+ (int)sc->refcount.count - 1, reason);
+#else
+void grpc_security_connector_unref(grpc_security_connector *sc) {
+ if (sc == NULL) return;
+#endif
+ if (gpr_unref(&sc->refcount)) sc->vtable->destroy(sc);
+}
+
static void connector_pointer_arg_destroy(void *p) {
- grpc_security_connector_unref(p);
+ GRPC_SECURITY_CONNECTOR_UNREF(p, "connector_pointer_arg");
}
static void *connector_pointer_arg_copy(void *p) {
- return grpc_security_connector_ref(p);
+ return GRPC_SECURITY_CONNECTOR_REF(p, "connector_pointer_arg");
}
grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc) {
diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h
index ee3057b43b..f258b86b28 100644
--- a/src/core/security/security_connector.h
+++ b/src/core/security/security_connector.h
@@ -80,12 +80,23 @@ struct grpc_security_connector {
grpc_auth_context *auth_context; /* Populated after the peer is checked. */
};
-/* Increments the refcount. */
-grpc_security_connector *grpc_security_connector_ref(
- grpc_security_connector *sc);
-
-/* Decrements the refcount and destroys the object if it reaches 0. */
-void grpc_security_connector_unref(grpc_security_connector *sc);
+/* Refcounting. */
+#ifdef GRPC_SECURITY_CONNECTOR_REFCOUNT_DEBUG
+#define GRPC_SECURITY_CONNECTOR_REF(p, r) \
+ grpc_security_connector_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_SECURITY_CONNECTOR_UNREF(p, r) \
+ grpc_security_connector_unref((p), __FILE__, __LINE__, (r))
+grpc_security_connector *grpc_security_connector_ref(grpc_security_connector *policy,
+ const char *file, int line,
+ const char *reason);
+void grpc_security_connector_unref(grpc_security_connector *policy, const char *file,
+ int line, const char *reason);
+#else
+#define GRPC_SECURITY_CONNECTOR_REF(p, r) grpc_security_connector_ref((p))
+#define GRPC_SECURITY_CONNECTOR_UNREF(p, r) grpc_security_connector_unref((p))
+grpc_security_connector *grpc_security_connector_ref(grpc_security_connector *policy);
+void grpc_security_connector_unref(grpc_security_connector *policy);
+#endif
/* Handshake creation. */
grpc_security_status grpc_security_connector_create_handshaker(
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index 5675c06402..51a4b32a66 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -107,14 +107,14 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
/* initialize members */
GPR_ASSERT(!sc->is_client_side);
- chand->security_connector = grpc_security_connector_ref(sc);
+ chand->security_connector = GRPC_SECURITY_CONNECTOR_REF(sc, "server_auth_filter");
}
/* Destructor for channel data */
static void destroy_channel_elem(grpc_channel_element *elem) {
/* grab pointers to our data from the channel element */
channel_data *chand = elem->channel_data;
- grpc_security_connector_unref(chand->security_connector);
+ GRPC_SECURITY_CONNECTOR_UNREF(chand->security_connector, "server_auth_filter");
}
const grpc_channel_filter grpc_server_auth_filter = {
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 2e49e370f7..6a99324da6 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -70,7 +70,7 @@ static void state_unref(grpc_server_secure_state *state) {
gpr_mu_lock(&state->mu);
gpr_mu_unlock(&state->mu);
/* clean up */
- grpc_security_connector_unref(state->sc);
+ GRPC_SECURITY_CONNECTOR_UNREF(state->sc, "server");
gpr_free(state);
}
}
@@ -220,7 +220,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
/* Error path: cleanup and return */
error:
if (sc) {
- grpc_security_connector_unref(sc);
+ GRPC_SECURITY_CONNECTOR_UNREF(sc, "server");
}
if (resolved) {
grpc_resolved_addresses_destroy(resolved);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 1dd9e61d0f..927c678c67 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -142,6 +142,7 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
static void subchannel_factory_unref(grpc_subchannel_factory *scf) {
subchannel_factory *f = (subchannel_factory *)scf;
if (gpr_unref(&f->refs)) {
+ GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, "subchannel_factory");
grpc_channel_args_destroy(f->merge_args);
grpc_mdctx_unref(f->mdctx);
gpr_free(f);
@@ -218,6 +219,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
gpr_ref_init(&f->refs, 1);
grpc_mdctx_ref(mdctx);
f->mdctx = mdctx;
+ GRPC_SECURITY_CONNECTOR_REF(&connector->base, "subchannel_factory");
f->security_connector = connector;
f->merge_args = grpc_channel_args_copy(args_copy);
resolver = grpc_resolver_create(target, &f->base);
@@ -229,6 +231,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel),
resolver);
GRPC_RESOLVER_UNREF(resolver, "create");
+ grpc_subchannel_factory_unref(&f->base);
+ GRPC_SECURITY_CONNECTOR_UNREF(&connector->base, "channel_create");
grpc_channel_args_destroy(args_copy);
if (new_args_from_connector != NULL) {