aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-12-02 17:10:06 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-12-02 17:10:06 -0800
commitc494c7b104faf393839ff33dc02f0994dec70ba1 (patch)
tree163c0332d0ddde75022214a0db96349c301be534 /src/core/ext/transport
parent0627af2b4820e3d73b1c894ab9b269ab8e3bd81e (diff)
Fix some things:
- use after free for resource quota - stuck-in-combiner-lock bug for End2endTest.ClientCancelsBidi/1 under asan/epoll - fixes clang-format stuff for a few files
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.c16
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.h3
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c2
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c61
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h9
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c3
7 files changed, 38 insertions, 62 deletions
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c
index 7795606e73..8ee7e29316 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.c
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.c
@@ -58,8 +58,8 @@ void grpc_chttp2_server_handshaker_factory_create_handshakers(
grpc_chttp2_server_handshaker_factory *handshaker_factory,
grpc_handshake_manager *handshake_mgr) {
if (handshaker_factory != NULL) {
- handshaker_factory->vtable->create_handshakers(
- exec_ctx, handshaker_factory, handshake_mgr);
+ handshaker_factory->vtable->create_handshakers(exec_ctx, handshaker_factory,
+ handshake_mgr);
}
}
@@ -71,7 +71,6 @@ void grpc_chttp2_server_handshaker_factory_destroy(
}
}
-
typedef struct pending_handshake_manager_node {
grpc_handshake_manager *handshake_mgr;
struct pending_handshake_manager_node *next;
@@ -196,9 +195,9 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
// args instead of hard-coding it.
const gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN));
- grpc_handshake_manager_do_handshake(
- exec_ctx, connection_state->handshake_mgr, tcp, state->args, deadline,
- acceptor, on_handshake_done, connection_state);
+ grpc_handshake_manager_do_handshake(exec_ctx, connection_state->handshake_mgr,
+ tcp, state->args, deadline, acceptor,
+ on_handshake_done, connection_state);
}
/* Server callback: start listening on our ports */
@@ -275,9 +274,8 @@ grpc_error *grpc_chttp2_server_add_port(
memset(state, 0, sizeof(*state));
grpc_closure_init(&state->tcp_server_shutdown_complete,
tcp_server_shutdown_complete, state);
- err =
- grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete,
- args, &tcp_server);
+ err = grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete,
+ args, &tcp_server);
if (err != GRPC_ERROR_NONE) {
goto error;
}
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h
index b1ff04bcbb..3073399267 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.h
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.h
@@ -73,7 +73,6 @@ void grpc_chttp2_server_handshaker_factory_destroy(
grpc_error *grpc_chttp2_server_add_port(
grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr,
grpc_channel_args *args,
- grpc_chttp2_server_handshaker_factory *handshaker_factory,
- int *port_num);
+ grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H */
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index 366312bd72..7e286d4e46 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -45,7 +45,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
int port_num = 0;
GRPC_API_TRACE("grpc_server_add_insecure_http2_port(server=%p, addr=%s)", 2,
(server, addr));
- grpc_error* err = grpc_chttp2_server_add_port(
+ grpc_error *err = grpc_chttp2_server_add_port(
&exec_ctx, server, addr,
grpc_channel_args_copy(grpc_server_get_channel_args(server)),
NULL /* handshaker_factory */, &port_num);
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index 5f41728132..85c21f0ca2 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -64,7 +64,7 @@ static void server_security_handshaker_factory_create_handshakers(
}
static void server_security_handshaker_factory_destroy(
- grpc_exec_ctx* exec_ctx, grpc_chttp2_server_handshaker_factory *hf) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf) {
server_security_handshaker_factory *handshaker_factory =
(server_security_handshaker_factory *)hf;
GRPC_SECURITY_CONNECTOR_UNREF(&handshaker_factory->security_connector->base,
@@ -106,8 +106,8 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
goto done;
}
// Create handshaker factory.
- server_security_handshaker_factory* handshaker_factory =
- gpr_malloc(sizeof(*handshaker_factory));
+ server_security_handshaker_factory *handshaker_factory =
+ gpr_malloc(sizeof(*handshaker_factory));
memset(handshaker_factory, 0, sizeof(*handshaker_factory));
handshaker_factory->base.vtable = &server_security_handshaker_factory_vtable;
handshaker_factory->security_connector = sc;
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 3e7c078d3c..d449300969 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -425,7 +425,6 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
- grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
}
end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
@@ -521,10 +520,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
}
}
- if (s->fail_pending_writes_on_writes_finished_error != NULL) {
- GRPC_ERROR_UNREF(s->fail_pending_writes_on_writes_finished_error);
- }
-
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
GPR_ASSERT(s->fetching_send_message == NULL);
GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
@@ -826,6 +821,7 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
}
#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
+#define CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE (1 << 1)
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
static grpc_closure *add_closure_barrier(grpc_closure *closure) {
@@ -852,6 +848,16 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
return;
}
closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
+ if (grpc_http_trace) {
+ const char *errstr = grpc_error_string(error);
+ gpr_log(GPR_DEBUG,
+ "complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s",
+ closure,
+ (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT),
+ (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT),
+ desc, errstr);
+ grpc_error_free_string(errstr);
+ }
if (error != GRPC_ERROR_NONE) {
if (closure->error_data.error == GRPC_ERROR_NONE) {
closure->error_data.error =
@@ -868,7 +874,14 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
grpc_transport_move_stats(&s->stats, s->collecting_stats);
s->collecting_stats = NULL;
}
- grpc_closure_run(exec_ctx, closure, closure->error_data.error);
+ if (t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE ||
+ (closure->next_data.scratch & CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE) ==
+ 0) {
+ grpc_closure_run(exec_ctx, closure, closure->error_data.error);
+ } else {
+ grpc_closure_list_append(&t->run_after_write, closure,
+ closure->error_data.error);
+ }
}
}
@@ -1013,6 +1026,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (op->send_initial_metadata != NULL) {
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE;
s->send_initial_metadata_finished = add_closure_barrier(on_complete);
s->send_initial_metadata = op->send_initial_metadata;
const size_t metadata_size =
@@ -1066,6 +1080,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (op->send_message != NULL) {
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
grpc_chttp2_complete_closure_step(
@@ -1103,6 +1118,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (op->send_trailing_metadata != NULL) {
GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_CANNOT_RUN_WITH_WRITE;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
s->send_trailing_metadata = op->send_trailing_metadata;
const size_t metadata_size =
@@ -1406,7 +1422,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
if (grpc_chttp2_list_remove_writable_stream(t, s)) {
- grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream");
}
@@ -1537,41 +1552,9 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
return error;
}
-void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s) {
- if (s->need_fail_pending_writes_on_writes_finished) {
- grpc_error *error = s->fail_pending_writes_on_writes_finished_error;
- s->fail_pending_writes_on_writes_finished_error = NULL;
- s->need_fail_pending_writes_on_writes_finished = false;
- grpc_chttp2_fail_pending_writes(exec_ctx, t, s, error);
- }
-}
-
void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error) {
- if (s->need_fail_pending_writes_on_writes_finished ||
- (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE &&
- (s->included[GRPC_CHTTP2_LIST_WRITABLE] ||
- s->included[GRPC_CHTTP2_LIST_WRITING]))) {
- /* If a write is in progress, and it involves this stream, wait for the
- * write to complete before cancelling things out. If we don't do this, then
- * our combiner lock might think that some operation on its queue might be
- * covering a completion even though there is none, in which case we might
- * offload to another thread, which isn't guarateed to exist */
- if (error != GRPC_ERROR_NONE) {
- if (s->fail_pending_writes_on_writes_finished_error == GRPC_ERROR_NONE) {
- s->fail_pending_writes_on_writes_finished_error = GRPC_ERROR_CREATE(
- "Post-poned fail writes due to in-progress write");
- }
- s->fail_pending_writes_on_writes_finished_error = grpc_error_add_child(
- s->fail_pending_writes_on_writes_finished_error, error);
- }
- s->need_fail_pending_writes_on_writes_finished = true;
- return; /* early out */
- }
-
error =
removal_error(error, s, "Pending writes failed due to stream closure");
s->send_initial_metadata = NULL;
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 6cba1e7fd2..31eb1e01ac 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -327,6 +327,9 @@ struct grpc_chttp2_transport {
*/
grpc_error *close_transport_on_writes_finished;
+ /* a list of closures to run after writes are finished */
+ grpc_closure_list run_after_write;
+
/* buffer pool state */
/** have we scheduled a benign cleanup? */
bool benign_reclaimer_registered;
@@ -409,9 +412,6 @@ struct grpc_chttp2_stream {
grpc_error *read_closed_error;
/** the error that resulted in this stream being write-closed */
grpc_error *write_closed_error;
- /** should any writes be cleared once this stream becomes non-writable */
- bool need_fail_pending_writes_on_writes_finished;
- grpc_error *fail_pending_writes_on_writes_finished_error;
grpc_published_metadata_method published_metadata[2];
bool final_metadata_requested;
@@ -692,9 +692,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s);
void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_error *error);
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 769b229a0d..ddd75ee574 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -208,7 +208,6 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing");
}
} else {
- grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:no_write");
}
}
@@ -253,9 +252,9 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1,
GRPC_ERROR_REF(error));
}
- grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end");
}
+ grpc_exec_ctx_enqueue_list(exec_ctx, &t->run_after_write, NULL);
grpc_slice_buffer_reset_and_unref(&t->outbuf);
GRPC_ERROR_UNREF(error);
GPR_TIMER_END("grpc_chttp2_end_write", 0);