aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-10-14 15:15:19 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-10-14 15:15:19 -0700
commit936f1ea9ac37965a6cbe22a04f4fedc9d7facc01 (patch)
tree5b0bff43e5f6356991d123441d058e4d2286854b /src/core
parentfd77de6a8a7617b4cd17d52e65317f7a36b34200 (diff)
Fix some test failures
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c90
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h6
-rw-r--r--src/core/lib/iomgr/error.c16
-rw-r--r--src/core/lib/iomgr/error.h4
4 files changed, 79 insertions, 37 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 8ab26e512d..b1dd974011 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -90,10 +90,6 @@ static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_setting_id id, uint32_t value);
-/** Start disconnection chain */
-static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_error *error);
-
static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error);
@@ -118,6 +114,11 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_error *error);
+static void close_transport_locked(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t, grpc_error *error);
+static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_error *error);
+
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -367,7 +368,10 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = tp;
t->destroying = 1;
- drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));
+ close_transport_locked(
+ exec_ctx, t,
+ grpc_error_set_int(GRPC_ERROR_CREATE("Transport destroyed"),
+ GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state));
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "destroy");
}
@@ -382,6 +386,19 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
if (!t->closed) {
+ if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
+ if (t->close_transport_on_writes_finished == NULL) {
+ t->close_transport_on_writes_finished =
+ GRPC_ERROR_CREATE("Delayed close due to in-progress write");
+ }
+ t->close_transport_on_writes_finished =
+ grpc_error_add_child(t->close_transport_on_writes_finished, error);
+ return;
+ }
+ if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE);
+ }
t->closed = 1;
connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
@@ -392,6 +409,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
}
+ end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
@@ -555,13 +573,19 @@ static const char *write_state_name(grpc_chttp2_write_state st) {
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
-static void set_write_state(grpc_chttp2_transport *t,
+static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_write_state st, const char *reason) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_DEBUG, "W:%p %s state %s -> %s [%s]", t,
t->is_client ? "CLIENT" : "SERVER",
write_state_name(t->write_state),
write_state_name(st), reason));
t->write_state = st;
+ if (st == GRPC_CHTTP2_WRITE_STATE_IDLE &&
+ t->close_transport_on_writes_finished != NULL) {
+ grpc_error *err = t->close_transport_on_writes_finished;
+ t->close_transport_on_writes_finished = NULL;
+ close_transport_locked(exec_ctx, t, err);
+ }
}
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
@@ -571,7 +595,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE:
- set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason);
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason);
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner,
&t->write_action_begin_locked,
@@ -579,7 +603,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
set_write_state(
- t,
+ exec_ctx, t,
covered_by_poller
? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER
: GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
@@ -588,7 +612,8 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
if (covered_by_poller) {
set_write_state(
- t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER,
+ exec_ctx, t,
+ GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER,
reason);
}
break;
@@ -614,10 +639,12 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
grpc_chttp2_transport *t = gt;
GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) {
- set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "begin writing");
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
+ "begin writing");
grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL);
} else {
- set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing");
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
+ "begin writing nothing");
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
}
GPR_TIMER_END("write_action_begin_locked", 0);
@@ -645,7 +672,7 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_chttp2_transport *t = tp;
if (error != GRPC_ERROR_NONE) {
- drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
+ close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
}
grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
@@ -655,11 +682,12 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_UNREACHABLE_CODE(break);
case GRPC_CHTTP2_WRITE_STATE_WRITING:
GPR_TIMER_MARK("state=writing", 0);
- set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
+ "finish writing");
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
- set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"continue writing [!covered]");
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner,
@@ -668,7 +696,7 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
- set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"continue writing [covered]");
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
grpc_combiner_execute_finally(exec_ctx, t->combiner,
@@ -1434,8 +1462,8 @@ static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) {
++*nrefs;
}
-static grpc_error *removal_error(grpc_error *extra_error,
- grpc_chttp2_stream *s) {
+static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
+ const char *master_error_msg) {
grpc_error *refs[3];
size_t nrefs = 0;
add_error(s->read_closed_error, refs, &nrefs);
@@ -1443,8 +1471,7 @@ static grpc_error *removal_error(grpc_error *extra_error,
add_error(extra_error, refs, &nrefs);
grpc_error *error = GRPC_ERROR_NONE;
if (nrefs > 0) {
- error = GRPC_ERROR_CREATE_REFERENCING("Failed due to stream removal", refs,
- nrefs);
+ error = GRPC_ERROR_CREATE_REFERENCING(master_error_msg, refs, nrefs);
}
GRPC_ERROR_UNREF(extra_error);
return error;
@@ -1453,7 +1480,8 @@ static grpc_error *removal_error(grpc_error *extra_error,
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_error *error) {
- error = removal_error(error, s);
+ error =
+ removal_error(error, s, "Pending writes failed due to stream closure");
s->fetching_send_message = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error),
@@ -1507,7 +1535,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
if (s->read_closed && s->write_closed) {
if (s->id != 0) {
remove_stream(exec_ctx, t, s->id,
- removal_error(GRPC_ERROR_REF(error), s));
+ removal_error(GRPC_ERROR_REF(error), s, "Stream removed"));
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2");
}
@@ -1650,16 +1678,6 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_UNREF(error);
}
-static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_error *error) {
- if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
- error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_UNAVAILABLE);
- }
- close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
- end_all_the_calls(exec_ctx, t, error);
-}
-
/** update window from a settings change */
typedef struct {
grpc_chttp2_transport *t;
@@ -1743,6 +1761,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_ERROR_REF(error);
+ grpc_error *err = error;
+ if (err != GRPC_ERROR_NONE) {
+ err = grpc_error_set_int(
+ GRPC_ERROR_CREATE_REFERENCING("Endpoint read failed", &err, 1),
+ GRPC_ERROR_INT_OCCURRED_DURING_WRITE, t->write_state);
+ }
+ GPR_SWAP(grpc_error *, err, error);
+ GRPC_ERROR_UNREF(err);
if (!t->closed) {
GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0;
@@ -1789,7 +1815,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
error = GRPC_ERROR_CREATE("Transport closed");
}
if (error != GRPC_ERROR_NONE) {
- drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
+ close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
} else if (!t->closed) {
keep_reading = true;
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 774fed0722..008dda8043 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -316,6 +316,10 @@ struct grpc_chttp2_transport {
gpr_slice goaway_text;
grpc_chttp2_write_cb *write_cb_pool;
+
+ /* if non-NULL, close the transport with this error when writes are finished
+ */
+ grpc_error *close_transport_on_writes_finished;
};
typedef enum {
@@ -509,7 +513,7 @@ extern int grpc_flowctl_trace;
if (!(grpc_http_trace)) \
; \
else \
- stmt
+ stmt
typedef enum {
GRPC_CHTTP2_FLOWCTL_MOVE,
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index 38fd1e0960..f6bb3a0477 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -120,6 +120,8 @@ static const char *error_int_name(grpc_error_ints key) {
return "http_status";
case GRPC_ERROR_INT_LIMIT:
return "limit";
+ case GRPC_ERROR_INT_OCCURRED_DURING_WRITE:
+ return "occurred_during_write";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -144,6 +146,8 @@ static const char *error_str_name(grpc_error_strs key) {
return "tsi_error";
case GRPC_ERROR_STR_FILENAME:
return "filename";
+ case GRPC_ERROR_STR_QUEUED_BUFFERS:
+ return "queued_buffers";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -523,21 +527,25 @@ static char *fmt_time(void *p) {
return out;
}
-static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap) {
+static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap,
+ bool *first) {
if (n == NULL) return;
- add_errs(n->left, s, sz, cap);
+ add_errs(n->left, s, sz, cap, first);
+ if (!*first) append_chr(',', s, sz, cap);
+ *first = false;
const char *e = grpc_error_string(n->value);
append_str(e, s, sz, cap);
grpc_error_free_string(e);
- add_errs(n->right, s, sz, cap);
+ add_errs(n->right, s, sz, cap, first);
}
static char *errs_string(grpc_error *err) {
char *s = NULL;
size_t sz = 0;
size_t cap = 0;
+ bool first = true;
append_chr('[', &s, &sz, &cap);
- add_errs(err->errs.root, &s, &sz, &cap);
+ add_errs(err->errs.root, &s, &sz, &cap, &first);
append_chr(']', &s, &sz, &cap);
append_chr(0, &s, &sz, &cap);
return s;
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index ae6a6cb35e..f3f3b80a09 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -100,6 +100,8 @@ typedef enum {
GRPC_ERROR_INT_HTTP_STATUS,
/// context sensitive limit associated with the error
GRPC_ERROR_INT_LIMIT,
+ /// chttp2: did the error occur while a write was in progress
+ GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
} grpc_error_ints;
typedef enum {
@@ -121,6 +123,8 @@ typedef enum {
GRPC_ERROR_STR_TSI_ERROR,
/// filename that we were trying to read/write when this error occurred
GRPC_ERROR_STR_FILENAME,
+ /// which data was queued for writing when the error occurred
+ GRPC_ERROR_STR_QUEUED_BUFFERS
} grpc_error_strs;
typedef enum {