diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-06-29 22:42:33 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-06-29 22:42:33 -0700 |
commit | 49924e0e62d29499874e93f5dcf5976edcf610f4 (patch) | |
tree | e0ed5306462b9e38a5887b4389da3f20eb2b1ecb /src/core | |
parent | ff3ae687e1e85d4fb29024c20a17595dce05e51f (diff) |
Better handling of cancellation, uri parse errors, and disconnection
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_channel.c | 4 | ||||
-rw-r--r-- | src/core/client_config/resolver_registry.c | 7 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 14 | ||||
-rw-r--r-- | src/core/client_config/uri_parser.c | 36 | ||||
-rw-r--r-- | src/core/client_config/uri_parser.h | 2 | ||||
-rw-r--r-- | src/core/surface/server.c | 23 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 4 |
7 files changed, 52 insertions, 38 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index d465a970b9..e62a262bab 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -242,6 +242,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_ channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; + grpc_transport_stream_op op2; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -263,8 +264,11 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_ if (!continuation) { if (op->cancel_with_status != GRPC_STATUS_OK) { calld->state = CALL_CANCELLED; + op2 = calld->waiting_op; + memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); gpr_mu_unlock(&calld->mu_state); handle_op_after_cancellation(elem, op); + handle_op_after_cancellation(elem, &op2); } else { GPR_ASSERT((calld->waiting_op.send_ops == NULL) != (op->send_ops == NULL)); diff --git a/src/core/client_config/resolver_registry.c b/src/core/client_config/resolver_registry.c index abdb5f9377..16be2da994 100644 --- a/src/core/client_config/resolver_registry.c +++ b/src/core/client_config/resolver_registry.c @@ -100,18 +100,21 @@ grpc_resolver *grpc_resolver_create( grpc_resolver_factory *factory = NULL; grpc_resolver *resolver; - uri = grpc_uri_parse(name); + uri = grpc_uri_parse(name, 1); factory = lookup_factory(uri); if (factory == NULL && g_default_resolver_scheme != NULL) { grpc_uri_destroy(uri); gpr_asprintf(&tmp, "%s%s", g_default_resolver_scheme, name); - uri = grpc_uri_parse(tmp); + uri = grpc_uri_parse(tmp, 1); factory = lookup_factory(uri); if (factory == NULL) { + grpc_uri_destroy(grpc_uri_parse(name, 0)); + grpc_uri_destroy(grpc_uri_parse(tmp, 0)); gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", name, tmp); } gpr_free(tmp); } else if (factory == NULL) { + grpc_uri_destroy(grpc_uri_parse(name, 0)); gpr_log(GPR_ERROR, "don't know how to resolve '%s'", name); } resolver = diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index c8c562f29d..cae6db0110 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -409,7 +409,6 @@ static void on_state_changed(void *p, int iomgr_success) { grpc_transport_op op; grpc_channel_element *elem; connection *destroy_connection = NULL; - int do_connect = 0; gpr_mu_lock(mu); @@ -436,6 +435,7 @@ static void on_state_changed(void *p, int iomgr_success) { gpr_mu_unlock(mu); return; case GRPC_CHANNEL_FATAL_FAILURE: + case GRPC_CHANNEL_TRANSIENT_FAILURE: /* things have gone wrong, deactivate and enter idle */ if (sw->subchannel->active->refs == 0) { destroy_connection = sw->subchannel->active; @@ -444,15 +444,6 @@ static void on_state_changed(void *p, int iomgr_success) { grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE); break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - /* things are starting to go wrong, reconnect but don't deactivate */ - /* released by connection */ - SUBCHANNEL_REF_LOCKED(c, "connecting"); - grpc_connectivity_state_set(&c->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE); - do_connect = 1; - c->connecting = 1; - break; } done: @@ -460,9 +451,6 @@ done: destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher"); gpr_free(sw); gpr_mu_unlock(mu); - if (do_connect) { - start_connect(c); - } if (destroy) { subchannel_destroy(c); } diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c index 43b5b47f55..c5faab5eba 100644 --- a/src/core/client_config/uri_parser.c +++ b/src/core/client_config/uri_parser.c @@ -39,20 +39,22 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section) { +static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section, int suppress_errors) { char *line_prefix; int pfx_len; - gpr_asprintf(&line_prefix, "bad uri.%s: '", section); - pfx_len = strlen(line_prefix) + pos; - gpr_log(GPR_ERROR, "%s%s'", line_prefix, uri_text); - gpr_free(line_prefix); + if (!suppress_errors) { + gpr_asprintf(&line_prefix, "bad uri.%s: '", section); + pfx_len = strlen(line_prefix) + pos; + gpr_log(GPR_ERROR, "%s%s'", line_prefix, uri_text); + gpr_free(line_prefix); - line_prefix = gpr_malloc(pfx_len + 1); - memset(line_prefix, ' ', pfx_len); - line_prefix[pfx_len] = 0; - gpr_log(GPR_ERROR, "%s^ here", line_prefix); - gpr_free(line_prefix); + line_prefix = gpr_malloc(pfx_len + 1); + memset(line_prefix, ' ', pfx_len); + line_prefix[pfx_len] = 0; + gpr_log(GPR_ERROR, "%s^ here", line_prefix); + gpr_free(line_prefix); + } return NULL; } @@ -64,7 +66,7 @@ static char *copy_fragment(const char *src, int begin, int end) { return out; } -grpc_uri *grpc_uri_parse(const char *uri_text) { +grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { grpc_uri *uri; int scheme_begin = 0; int scheme_end = -1; @@ -90,7 +92,7 @@ grpc_uri *grpc_uri_parse(const char *uri_text) { break; } if (scheme_end == -1) { - return bad_uri(uri_text, i, "scheme"); + return bad_uri(uri_text, i, "scheme", suppress_errors); } if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') { @@ -100,17 +102,17 @@ grpc_uri *grpc_uri_parse(const char *uri_text) { authority_end = i; } if (uri_text[i] == '?') { - return bad_uri(uri_text, i, "query_not_supported"); + return bad_uri(uri_text, i, "query_not_supported", suppress_errors); } if (uri_text[i] == '#') { - return bad_uri(uri_text, i, "fragment_not_supported"); + return bad_uri(uri_text, i, "fragment_not_supported", suppress_errors); } } if (authority_end == -1 && uri_text[i] == 0) { authority_end = i; } if (authority_end == -1) { - return bad_uri(uri_text, i, "authority"); + return bad_uri(uri_text, i, "authority", suppress_errors); } /* TODO(ctiller): parse the authority correctly */ path_begin = authority_end; @@ -120,10 +122,10 @@ grpc_uri *grpc_uri_parse(const char *uri_text) { for (i = path_begin; uri_text[i] != 0; i++) { if (uri_text[i] == '?') { - return bad_uri(uri_text, i, "query_not_supported"); + return bad_uri(uri_text, i, "query_not_supported", suppress_errors); } if (uri_text[i] == '#') { - return bad_uri(uri_text, i, "fragment_not_supported"); + return bad_uri(uri_text, i, "fragment_not_supported", suppress_errors); } } path_end = i; diff --git a/src/core/client_config/uri_parser.h b/src/core/client_config/uri_parser.h index b6821f9621..ce4e6aecb0 100644 --- a/src/core/client_config/uri_parser.h +++ b/src/core/client_config/uri_parser.h @@ -41,7 +41,7 @@ typedef struct { } grpc_uri; /** parse a uri, return NULL on failure */ -grpc_uri *grpc_uri_parse(const char *uri_text); +grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors); /** destroy a uri */ void grpc_uri_destroy(grpc_uri *uri); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 383c3d921d..703eeaf2bd 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -204,7 +204,9 @@ struct call_data { typedef struct { grpc_channel **channels; + grpc_channel **disconnects; size_t num_channels; + size_t num_disconnects; } channel_broadcaster; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -223,18 +225,28 @@ static void maybe_finish_shutdown(grpc_server *server); static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { channel_data *c; size_t count = 0; + size_t dc_count = 0; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { count ++; + if (c->num_calls == 0) { + dc_count ++; + } } cb->num_channels = count; + cb->num_disconnects = dc_count; cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels); + cb->disconnects = gpr_malloc(sizeof(*cb->channels) * cb->num_disconnects); count = 0; + dc_count = 0; for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) { - cb->channels[count] = c->channel; + cb->channels[count++] = c->channel; GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast"); - count ++; + if (c->num_calls == 0) { + cb->disconnects[dc_count++] = c->channel; + GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast-disconnect"); + } } } @@ -274,10 +286,15 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb, int send_goawa size_t i; for (i = 0; i < cb->num_channels; i++) { - send_shutdown(cb->channels[i], send_goaway, send_disconnect); + send_shutdown(cb->channels[i], 1, 0); GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast"); } + for (i = 0; i < cb->num_disconnects; i++) { + send_shutdown(cb->disconnects[i], 0, 1); + GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast-disconnect"); + } gpr_free(cb->channels); + gpr_free(cb->disconnects); } /* call list */ diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 08a767f1d5..e32071e692 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -553,8 +553,7 @@ static void maybe_start_some_streams( transport_global->next_stream_id += 2; if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) { - connectivity_state_set(transport_global, - GRPC_CHANNEL_TRANSIENT_FAILURE); + connectivity_state_set(transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE); } stream_global->outgoing_window = @@ -940,6 +939,7 @@ static void schedule_closure_for_connectivity(void *a, grpc_iomgr_closure *closu } static void connectivity_state_set(grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state) { + GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "set connectivity_state=%d", state)); grpc_connectivity_state_set_with_scheduler( &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker, state, |