aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/surface/call.c4
-rw-r--r--src/core/surface/channel_connectivity.c2
-rw-r--r--src/core/surface/channel_ping.c2
-rw-r--r--src/core/surface/completion_queue.c40
-rw-r--r--src/core/surface/completion_queue.h5
-rw-r--r--src/core/surface/server.c6
-rw-r--r--src/core/transport/chttp2/writing.c1
-rw-r--r--src/core/transport/chttp2_transport.c23
8 files changed, 72 insertions, 11 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 5d064ef00d..f857b66e5e 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -1119,7 +1119,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_INTERNAL_REF(call, "completion");
bctl->success = 1;
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq);
+ grpc_cq_begin_op(call->cq, notify_tag);
}
gpr_mu_unlock(&call->mu);
post_batch_completion(exec_ctx, bctl);
@@ -1333,7 +1333,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq);
+ grpc_cq_begin_op(call->cq, notify_tag);
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 57529ff903..10f5c4da4d 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -189,7 +189,7 @@ void grpc_channel_watch_connectivity_state(
7, (channel, (int)last_observed_state, (long long)deadline.tv_sec,
(int)deadline.tv_nsec, (int)deadline.clock_type, cq, tag));
- grpc_cq_begin_op(cq);
+ grpc_cq_begin_op(cq, tag);
gpr_mu_init(&w->mu);
grpc_closure_init(&w->on_complete, watch_complete, w);
diff --git a/src/core/surface/channel_ping.c b/src/core/surface/channel_ping.c
index 1b6f06ded1..b4ce282787 100644
--- a/src/core/surface/channel_ping.c
+++ b/src/core/surface/channel_ping.c
@@ -73,7 +73,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
grpc_closure_init(&pr->closure, ping_done, pr);
op.send_ping = &pr->closure;
op.bind_pollset = grpc_cq_pollset(cq);
- grpc_cq_begin_op(cq);
+ grpc_cq_begin_op(cq, tag);
top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index c71b9b02b0..0b333258ba 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -73,6 +73,12 @@ struct grpc_completion_queue {
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
grpc_closure pollset_shutdown_done;
+#ifndef NDEBUG
+ void **outstanding_tags;
+ size_t outstanding_tag_count;
+ size_t outstanding_tag_capacity;
+#endif
+
grpc_completion_queue *next_free;
};
@@ -89,6 +95,9 @@ void grpc_cq_global_shutdown(void) {
while (g_freelist) {
grpc_completion_queue *next = g_freelist->next_free;
grpc_pollset_destroy(&g_freelist->pollset);
+#ifndef NDEBUG
+ gpr_free(g_freelist->outstanding_tags);
+#endif
gpr_free(g_freelist);
g_freelist = next;
}
@@ -117,6 +126,10 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc = gpr_malloc(sizeof(grpc_completion_queue));
grpc_pollset_init(&cc->pollset);
+#ifndef NDEBUG
+ cc->outstanding_tags = NULL;
+ cc->outstanding_tag_capacity = 0;
+#endif
} else {
cc = g_freelist;
g_freelist = g_freelist->next_free;
@@ -134,6 +147,9 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->shutdown_called = 0;
cc->is_server_cq = 0;
cc->num_pluckers = 0;
+#ifndef NDEBUG
+ cc->outstanding_tag_count = 0;
+#endif
grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc);
GPR_TIMER_END("grpc_completion_queue_create", 0);
@@ -176,10 +192,17 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
}
}
-void grpc_cq_begin_op(grpc_completion_queue *cc) {
+void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
#ifndef NDEBUG
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
GPR_ASSERT(!cc->shutdown_called);
+ if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) {
+ cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity);
+ cc->outstanding_tags =
+ gpr_realloc(cc->outstanding_tags, sizeof(*cc->outstanding_tags) *
+ cc->outstanding_tag_capacity);
+ }
+ cc->outstanding_tags[cc->outstanding_tag_count++] = tag;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
#endif
gpr_ref(&cc->pending_events);
@@ -196,6 +219,9 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
int shutdown;
int i;
grpc_pollset_worker *pluck_worker;
+#ifndef NDEBUG
+ int found = 0;
+#endif
GPR_TIMER_BEGIN("grpc_cq_end_op", 0);
@@ -206,6 +232,18 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+#ifndef NDEBUG
+ for (i = 0; i < (int)cc->outstanding_tag_count; i++) {
+ if (cc->outstanding_tags[i] == tag) {
+ cc->outstanding_tag_count--;
+ GPR_SWAP(void *, cc->outstanding_tags[i],
+ cc->outstanding_tags[cc->outstanding_tag_count]);
+ found = 1;
+ break;
+ }
+ }
+ GPR_ASSERT(found);
+#endif
shutdown = gpr_unref(&cc->pending_events);
if (!shutdown) {
cc->completed_tail->next =
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index a40bb048ac..d58fb1eb87 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -69,9 +69,10 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made */
-void grpc_cq_begin_op(grpc_completion_queue *cc);
+void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
-/* Queue a GRPC_OP_COMPLETED operation */
+/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
+ grpc_cq_begin_op */
void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
void *tag, int success,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index cdbd542d9a..1e1cde3648 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -1007,7 +1007,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
- grpc_cq_begin_op(cq);
+ grpc_cq_begin_op(cq, tag);
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, 1, done_published_shutdown, NULL,
gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1176,7 +1176,7 @@ grpc_call_error grpc_server_request_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
- grpc_cq_begin_op(cq_for_notification);
+ grpc_cq_begin_op(cq_for_notification, tag);
details->reserved = NULL;
rc->type = BATCH_CALL;
rc->server = server;
@@ -1213,7 +1213,7 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
- grpc_cq_begin_op(cq_for_notification);
+ grpc_cq_begin_op(cq_for_notification, tag);
rc->type = REGISTERED_CALL;
rc->server = server;
rc->tag = tag;
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 805d05222d..265448641a 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -342,7 +342,6 @@ void grpc_chttp2_cleanup_writing(
}
if (stream_writing->sent_message) {
GPR_ASSERT(stream_writing->send_message == NULL);
- GPR_ASSERT(stream_global->send_message_finished);
grpc_chttp2_complete_closure_step(
exec_ctx, &stream_global->send_message_finished, 1);
stream_writing->sent_message = 0;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index aa459c8bac..91c1a27cff 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -115,6 +115,11 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_status_code status,
gpr_slice *optional_message);
+/** Fail any outstanding ops */
+static void fail_all_outstanding_ops(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global);
+
/** Add endpoint from this transport to pollset */
static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
@@ -748,6 +753,21 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
*pclosure = NULL;
}
+static void fail_all_outstanding_ops(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global) {
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->send_initial_metadata_finished, 0);
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->send_trailing_metadata_finished, 0);
+ grpc_chttp2_complete_closure_step(exec_ctx,
+ &stream_global->send_message_finished, 0);
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->recv_initial_metadata_finished, 0);
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, &stream_global->recv_trailing_metadata_finished, 0);
+}
+
static int contains_non_ok_status(
grpc_chttp2_transport_global *transport_global,
grpc_metadata_batch *batch) {
@@ -1022,6 +1042,9 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
exec_ctx, &stream_global->recv_trailing_metadata_finished, 1);
}
}
+ if (stream_global->finished_close) {
+ fail_all_outstanding_ops(exec_ctx, transport_global, stream_global);
+ }
}
}