aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-29 22:42:33 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-29 22:42:33 -0700
commit49924e0e62d29499874e93f5dcf5976edcf610f4 (patch)
treee0ed5306462b9e38a5887b4389da3f20eb2b1ecb /src/core
parentff3ae687e1e85d4fb29024c20a17595dce05e51f (diff)
Better handling of cancellation, uri parse errors, and disconnection
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_channel.c4
-rw-r--r--src/core/client_config/resolver_registry.c7
-rw-r--r--src/core/client_config/subchannel.c14
-rw-r--r--src/core/client_config/uri_parser.c36
-rw-r--r--src/core/client_config/uri_parser.h2
-rw-r--r--src/core/surface/server.c23
-rw-r--r--src/core/transport/chttp2_transport.c4
7 files changed, 52 insertions, 38 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index d465a970b9..e62a262bab 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -242,6 +242,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
+ grpc_transport_stream_op op2;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@@ -263,8 +264,11 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_
if (!continuation) {
if (op->cancel_with_status != GRPC_STATUS_OK) {
calld->state = CALL_CANCELLED;
+ op2 = calld->waiting_op;
+ memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(elem, op);
+ handle_op_after_cancellation(elem, &op2);
} else {
GPR_ASSERT((calld->waiting_op.send_ops == NULL) !=
(op->send_ops == NULL));
diff --git a/src/core/client_config/resolver_registry.c b/src/core/client_config/resolver_registry.c
index abdb5f9377..16be2da994 100644
--- a/src/core/client_config/resolver_registry.c
+++ b/src/core/client_config/resolver_registry.c
@@ -100,18 +100,21 @@ grpc_resolver *grpc_resolver_create(
grpc_resolver_factory *factory = NULL;
grpc_resolver *resolver;
- uri = grpc_uri_parse(name);
+ uri = grpc_uri_parse(name, 1);
factory = lookup_factory(uri);
if (factory == NULL && g_default_resolver_scheme != NULL) {
grpc_uri_destroy(uri);
gpr_asprintf(&tmp, "%s%s", g_default_resolver_scheme, name);
- uri = grpc_uri_parse(tmp);
+ uri = grpc_uri_parse(tmp, 1);
factory = lookup_factory(uri);
if (factory == NULL) {
+ grpc_uri_destroy(grpc_uri_parse(name, 0));
+ grpc_uri_destroy(grpc_uri_parse(tmp, 0));
gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", name, tmp);
}
gpr_free(tmp);
} else if (factory == NULL) {
+ grpc_uri_destroy(grpc_uri_parse(name, 0));
gpr_log(GPR_ERROR, "don't know how to resolve '%s'", name);
}
resolver =
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index c8c562f29d..cae6db0110 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -409,7 +409,6 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_transport_op op;
grpc_channel_element *elem;
connection *destroy_connection = NULL;
- int do_connect = 0;
gpr_mu_lock(mu);
@@ -436,6 +435,7 @@ static void on_state_changed(void *p, int iomgr_success) {
gpr_mu_unlock(mu);
return;
case GRPC_CHANNEL_FATAL_FAILURE:
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
/* things have gone wrong, deactivate and enter idle */
if (sw->subchannel->active->refs == 0) {
destroy_connection = sw->subchannel->active;
@@ -444,15 +444,6 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_connectivity_state_set(&c->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE);
break;
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- /* things are starting to go wrong, reconnect but don't deactivate */
- /* released by connection */
- SUBCHANNEL_REF_LOCKED(c, "connecting");
- grpc_connectivity_state_set(&c->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE);
- do_connect = 1;
- c->connecting = 1;
- break;
}
done:
@@ -460,9 +451,6 @@ done:
destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
gpr_free(sw);
gpr_mu_unlock(mu);
- if (do_connect) {
- start_connect(c);
- }
if (destroy) {
subchannel_destroy(c);
}
diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c
index 43b5b47f55..c5faab5eba 100644
--- a/src/core/client_config/uri_parser.c
+++ b/src/core/client_config/uri_parser.c
@@ -39,20 +39,22 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section) {
+static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section, int suppress_errors) {
char *line_prefix;
int pfx_len;
- gpr_asprintf(&line_prefix, "bad uri.%s: '", section);
- pfx_len = strlen(line_prefix) + pos;
- gpr_log(GPR_ERROR, "%s%s'", line_prefix, uri_text);
- gpr_free(line_prefix);
+ if (!suppress_errors) {
+ gpr_asprintf(&line_prefix, "bad uri.%s: '", section);
+ pfx_len = strlen(line_prefix) + pos;
+ gpr_log(GPR_ERROR, "%s%s'", line_prefix, uri_text);
+ gpr_free(line_prefix);
- line_prefix = gpr_malloc(pfx_len + 1);
- memset(line_prefix, ' ', pfx_len);
- line_prefix[pfx_len] = 0;
- gpr_log(GPR_ERROR, "%s^ here", line_prefix);
- gpr_free(line_prefix);
+ line_prefix = gpr_malloc(pfx_len + 1);
+ memset(line_prefix, ' ', pfx_len);
+ line_prefix[pfx_len] = 0;
+ gpr_log(GPR_ERROR, "%s^ here", line_prefix);
+ gpr_free(line_prefix);
+ }
return NULL;
}
@@ -64,7 +66,7 @@ static char *copy_fragment(const char *src, int begin, int end) {
return out;
}
-grpc_uri *grpc_uri_parse(const char *uri_text) {
+grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) {
grpc_uri *uri;
int scheme_begin = 0;
int scheme_end = -1;
@@ -90,7 +92,7 @@ grpc_uri *grpc_uri_parse(const char *uri_text) {
break;
}
if (scheme_end == -1) {
- return bad_uri(uri_text, i, "scheme");
+ return bad_uri(uri_text, i, "scheme", suppress_errors);
}
if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') {
@@ -100,17 +102,17 @@ grpc_uri *grpc_uri_parse(const char *uri_text) {
authority_end = i;
}
if (uri_text[i] == '?') {
- return bad_uri(uri_text, i, "query_not_supported");
+ return bad_uri(uri_text, i, "query_not_supported", suppress_errors);
}
if (uri_text[i] == '#') {
- return bad_uri(uri_text, i, "fragment_not_supported");
+ return bad_uri(uri_text, i, "fragment_not_supported", suppress_errors);
}
}
if (authority_end == -1 && uri_text[i] == 0) {
authority_end = i;
}
if (authority_end == -1) {
- return bad_uri(uri_text, i, "authority");
+ return bad_uri(uri_text, i, "authority", suppress_errors);
}
/* TODO(ctiller): parse the authority correctly */
path_begin = authority_end;
@@ -120,10 +122,10 @@ grpc_uri *grpc_uri_parse(const char *uri_text) {
for (i = path_begin; uri_text[i] != 0; i++) {
if (uri_text[i] == '?') {
- return bad_uri(uri_text, i, "query_not_supported");
+ return bad_uri(uri_text, i, "query_not_supported", suppress_errors);
}
if (uri_text[i] == '#') {
- return bad_uri(uri_text, i, "fragment_not_supported");
+ return bad_uri(uri_text, i, "fragment_not_supported", suppress_errors);
}
}
path_end = i;
diff --git a/src/core/client_config/uri_parser.h b/src/core/client_config/uri_parser.h
index b6821f9621..ce4e6aecb0 100644
--- a/src/core/client_config/uri_parser.h
+++ b/src/core/client_config/uri_parser.h
@@ -41,7 +41,7 @@ typedef struct {
} grpc_uri;
/** parse a uri, return NULL on failure */
-grpc_uri *grpc_uri_parse(const char *uri_text);
+grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors);
/** destroy a uri */
void grpc_uri_destroy(grpc_uri *uri);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 383c3d921d..703eeaf2bd 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -204,7 +204,9 @@ struct call_data {
typedef struct {
grpc_channel **channels;
+ grpc_channel **disconnects;
size_t num_channels;
+ size_t num_disconnects;
} channel_broadcaster;
#define SERVER_FROM_CALL_ELEM(elem) \
@@ -223,18 +225,28 @@ static void maybe_finish_shutdown(grpc_server *server);
static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
channel_data *c;
size_t count = 0;
+ size_t dc_count = 0;
for (c = s->root_channel_data.next; c != &s->root_channel_data;
c = c->next) {
count ++;
+ if (c->num_calls == 0) {
+ dc_count ++;
+ }
}
cb->num_channels = count;
+ cb->num_disconnects = dc_count;
cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
+ cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects);
count = 0;
+ dc_count = 0;
for (c = s->root_channel_data.next; c != &s->root_channel_data;
c = c->next) {
- cb->channels[count] = c->channel;
+ cb->channels[count++] = c->channel;
GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
- count ++;
+ if (c->num_calls == 0) {
+ cb->disconnects[dc_count++] = c->channel;
+ GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast-disconnect");
+ }
}
}
@@ -274,10 +286,15 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb, int send_goawa
size_t i;
for (i = 0; i < cb->num_channels; i++) {
- send_shutdown(cb->channels[i], send_goaway, send_disconnect);
+ send_shutdown(cb->channels[i], 1, 0);
GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
}
+ for (i = 0; i < cb->num_disconnects; i++) {
+ send_shutdown(cb->disconnects[i], 0, 1);
+ GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect");
+ }
gpr_free(cb->channels);
+ gpr_free(cb->disconnects);
}
/* call list */
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 08a767f1d5..e32071e692 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -553,8 +553,7 @@ static void maybe_start_some_streams(
transport_global->next_stream_id += 2;
if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
- connectivity_state_set(transport_global,
- GRPC_CHANNEL_TRANSIENT_FAILURE);
+ connectivity_state_set(transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE);
}
stream_global->outgoing_window =
@@ -940,6 +939,7 @@ static void schedule_closure_for_connectivity(void *a, grpc_iomgr_closure *closu
}
static void connectivity_state_set(grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state) {
+ GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
grpc_connectivity_state_set_with_scheduler(
&TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
state,