aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/connected_channel.c4
-rw-r--r--src/core/surface/completion_queue.c7
-rw-r--r--src/core/transport/chttp2_transport.c42
3 files changed, 42 insertions, 11 deletions
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index fa18655164..62611e08f3 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -48,12 +48,12 @@
/* the protobuf library will (by default) start warning at 100megs */
#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
-typedef struct {
+typedef struct connected_channel_channel_data {
grpc_transport *transport;
gpr_uint32 max_message_length;
} channel_data;
-typedef struct {
+typedef struct connected_channel_call_data {
grpc_call_element *elem;
grpc_stream_op_buffer outgoing_sopb;
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index c4b8d60782..6a1d83ce5d 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -71,6 +71,7 @@ struct grpc_completion_queue {
grpc_pollset pollset;
/* 0 initially, 1 once we've begun shutting down */
int shutdown;
+ int shutdown_called;
/* Head of a linked list of queued events (prev points to the last element) */
event *queue;
/* Fixed size chained hash table of events for pluck() */
@@ -107,7 +108,6 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
grpc_event_finish_func on_finish, void *user_data) {
event *ev = gpr_malloc(sizeof(event));
gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
- GPR_ASSERT(!cc->shutdown);
ev->base.type = type;
ev->base.tag = tag;
ev->base.call = call;
@@ -150,6 +150,7 @@ static void end_op_locked(grpc_completion_queue *cc,
#endif
if (gpr_unref(&cc->refs)) {
GPR_ASSERT(!cc->shutdown);
+ GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
}
@@ -380,6 +381,10 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ cc->shutdown_called = 1;
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+
if (gpr_unref(&cc->refs)) {
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown);
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 476cc4b226..6b7273e3e4 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -309,6 +309,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id,
static int prepare_callbacks(transport *t);
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb);
+static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
static int prepare_write(transport *t);
static void perform_write(transport *t, grpc_endpoint *ep);
@@ -516,13 +517,29 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
static void destroy_transport(grpc_transport *gt) {
transport *t = (transport *)gt;
- gpr_mu_lock(&t->mu);
+ lock(t);
t->destroying = 1;
- while (t->calling_back) {
+ /* Wait for pending stuff to finish.
+ We need to be not calling back to ensure that closed() gets a chance to
+ trigger if needed during unlock() before we die.
+ We need to be not writing as cancellation finalization may produce some
+ callbacks that NEED to be made to close out some streams when t->writing
+ becomes 0. */
+ while (t->calling_back || t->writing) {
gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
}
- t->cb = NULL;
- gpr_mu_unlock(&t->mu);
+ drop_connection(t);
+ unlock(t);
+
+ /* The drop_connection() above puts the transport into an error state, and
+ the follow-up unlock should then (as part of the cleanup work it does)
+ ensure that cb is NULL, and therefore not call back anything further.
+ This check validates this very subtle behavior.
+ It's shutdown path, so I don't believe an extra lock pair is going to be
+ problematic for performance. */
+ lock(t);
+ GPR_ASSERT(!t->cb);
+ unlock(t);
unref_transport(t);
}
@@ -680,6 +697,7 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
}
static void stream_list_join(transport *t, stream *s, stream_list_id id) {
+ if (id == PENDING_CALLBACKS) GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
if (s->included[id]) {
return;
}
@@ -738,7 +756,7 @@ static void unlock(transport *t) {
if (perform_callbacks) {
t->calling_back = 1;
}
- if (t->error_state == ERROR_STATE_SEEN) {
+ if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
call_closed = 1;
t->calling_back = 1;
t->cb = NULL; /* no more callbacks */
@@ -772,7 +790,7 @@ static void unlock(transport *t) {
}
if (call_closed) {
- cb->closed(t->cb_user_data, &t->base);
+ call_cb_closed(t, cb);
}
/* write some bytes if necessary */
@@ -903,13 +921,16 @@ static void finish_write_common(transport *t, int success) {
}
while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
s->sent_write_closed = 1;
- stream_list_join(t, s, PENDING_CALLBACKS);
+ if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS);
}
t->outbuf.count = 0;
t->outbuf.length = 0;
/* leave the writing flag up on shutdown to prevent further writes in unlock()
from starting */
t->writing = 0;
+ if (t->destroying) {
+ gpr_cv_signal(&t->cv);
+ }
if (!t->reading) {
grpc_endpoint_destroy(t->ep);
t->ep = NULL;
@@ -979,7 +1000,8 @@ static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
} else {
grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count);
}
- if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed) {
+ if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed &&
+ !s->published_close) {
stream_list_join(t, s, PENDING_CALLBACKS);
}
@@ -1765,6 +1787,10 @@ static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
}
}
+static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
+ cb->closed(t->cb_user_data, &t->base);
+}
+
static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
transport *t = (transport *)gt;
lock(t);