diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/surface/call.c | 4 | ||||
-rw-r--r-- | src/core/surface/channel_connectivity.c | 2 | ||||
-rw-r--r-- | src/core/surface/channel_ping.c | 2 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 40 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 5 | ||||
-rw-r--r-- | src/core/surface/server.c | 6 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 1 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 23 |
8 files changed, 72 insertions, 11 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 5d064ef00d..f857b66e5e 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1119,7 +1119,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_CALL_INTERNAL_REF(call, "completion"); bctl->success = 1; if (!is_notify_tag_closure) { - grpc_cq_begin_op(call->cq); + grpc_cq_begin_op(call->cq, notify_tag); } gpr_mu_unlock(&call->mu); post_batch_completion(exec_ctx, bctl); @@ -1333,7 +1333,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_CALL_INTERNAL_REF(call, "completion"); if (!is_notify_tag_closure) { - grpc_cq_begin_op(call->cq); + grpc_cq_begin_op(call->cq, notify_tag); } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 57529ff903..10f5c4da4d 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -189,7 +189,7 @@ void grpc_channel_watch_connectivity_state( 7, (channel, (int)last_observed_state, (long long)deadline.tv_sec, (int)deadline.tv_nsec, (int)deadline.clock_type, cq, tag)); - grpc_cq_begin_op(cq); + grpc_cq_begin_op(cq, tag); gpr_mu_init(&w->mu); grpc_closure_init(&w->on_complete, watch_complete, w); diff --git a/src/core/surface/channel_ping.c b/src/core/surface/channel_ping.c index 1b6f06ded1..b4ce282787 100644 --- a/src/core/surface/channel_ping.c +++ b/src/core/surface/channel_ping.c @@ -73,7 +73,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, grpc_closure_init(&pr->closure, ping_done, pr); op.send_ping = &pr->closure; op.bind_pollset = grpc_cq_pollset(cq); - grpc_cq_begin_op(cq); + grpc_cq_begin_op(cq, tag); top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index c71b9b02b0..0b333258ba 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -73,6 +73,12 @@ struct grpc_completion_queue { plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; grpc_closure pollset_shutdown_done; +#ifndef NDEBUG + void **outstanding_tags; + size_t outstanding_tag_count; + size_t outstanding_tag_capacity; +#endif + grpc_completion_queue *next_free; }; @@ -89,6 +95,9 @@ void grpc_cq_global_shutdown(void) { while (g_freelist) { grpc_completion_queue *next = g_freelist->next_free; grpc_pollset_destroy(&g_freelist->pollset); +#ifndef NDEBUG + gpr_free(g_freelist->outstanding_tags); +#endif gpr_free(g_freelist); g_freelist = next; } @@ -117,6 +126,10 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { cc = gpr_malloc(sizeof(grpc_completion_queue)); grpc_pollset_init(&cc->pollset); +#ifndef NDEBUG + cc->outstanding_tags = NULL; + cc->outstanding_tag_capacity = 0; +#endif } else { cc = g_freelist; g_freelist = g_freelist->next_free; @@ -134,6 +147,9 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { cc->shutdown_called = 0; cc->is_server_cq = 0; cc->num_pluckers = 0; +#ifndef NDEBUG + cc->outstanding_tag_count = 0; +#endif grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc); GPR_TIMER_END("grpc_completion_queue_create", 0); @@ -176,10 +192,17 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { } } -void grpc_cq_begin_op(grpc_completion_queue *cc) { +void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { #ifndef NDEBUG gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); GPR_ASSERT(!cc->shutdown_called); + if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) { + cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity); + cc->outstanding_tags = + gpr_realloc(cc->outstanding_tags, sizeof(*cc->outstanding_tags) * + cc->outstanding_tag_capacity); + } + cc->outstanding_tags[cc->outstanding_tag_count++] = tag; gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); #endif gpr_ref(&cc->pending_events); @@ -196,6 +219,9 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, int shutdown; int i; grpc_pollset_worker *pluck_worker; +#ifndef NDEBUG + int found = 0; +#endif GPR_TIMER_BEGIN("grpc_cq_end_op", 0); @@ -206,6 +232,18 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0)); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); +#ifndef NDEBUG + for (i = 0; i < (int)cc->outstanding_tag_count; i++) { + if (cc->outstanding_tags[i] == tag) { + cc->outstanding_tag_count--; + GPR_SWAP(void *, cc->outstanding_tags[i], + cc->outstanding_tags[cc->outstanding_tag_count]); + found = 1; + break; + } + } + GPR_ASSERT(found); +#endif shutdown = gpr_unref(&cc->pending_events); if (!shutdown) { cc->completed_tail->next = diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index a40bb048ac..d58fb1eb87 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -69,9 +69,10 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc); /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made */ -void grpc_cq_begin_op(grpc_completion_queue *cc); +void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); -/* Queue a GRPC_OP_COMPLETED operation */ +/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to + grpc_cq_begin_op */ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag, int success, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, diff --git a/src/core/surface/server.c b/src/core/surface/server.c index cdbd542d9a..1e1cde3648 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -1007,7 +1007,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); - grpc_cq_begin_op(cq); + grpc_cq_begin_op(cq, tag); if (server->shutdown_published) { grpc_cq_end_op(&exec_ctx, cq, tag, 1, done_published_shutdown, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1176,7 +1176,7 @@ grpc_call_error grpc_server_request_call( error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; } - grpc_cq_begin_op(cq_for_notification); + grpc_cq_begin_op(cq_for_notification, tag); details->reserved = NULL; rc->type = BATCH_CALL; rc->server = server; @@ -1213,7 +1213,7 @@ grpc_call_error grpc_server_request_registered_call( error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; } - grpc_cq_begin_op(cq_for_notification); + grpc_cq_begin_op(cq_for_notification, tag); rc->type = REGISTERED_CALL; rc->server = server; rc->tag = tag; diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 805d05222d..265448641a 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -342,7 +342,6 @@ void grpc_chttp2_cleanup_writing( } if (stream_writing->sent_message) { GPR_ASSERT(stream_writing->send_message == NULL); - GPR_ASSERT(stream_global->send_message_finished); grpc_chttp2_complete_closure_step( exec_ctx, &stream_global->send_message_finished, 1); stream_writing->sent_message = 0; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index aa459c8bac..91c1a27cff 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -115,6 +115,11 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_status_code status, gpr_slice *optional_message); +/** Fail any outstanding ops */ +static void fail_all_outstanding_ops( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); + /** Add endpoint from this transport to pollset */ static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -748,6 +753,21 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx, *pclosure = NULL; } +static void fail_all_outstanding_ops( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->send_initial_metadata_finished, 0); + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->send_trailing_metadata_finished, 0); + grpc_chttp2_complete_closure_step(exec_ctx, + &stream_global->send_message_finished, 0); + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->recv_initial_metadata_finished, 0); + grpc_chttp2_complete_closure_step( + exec_ctx, &stream_global->recv_trailing_metadata_finished, 0); +} + static int contains_non_ok_status( grpc_chttp2_transport_global *transport_global, grpc_metadata_batch *batch) { @@ -1022,6 +1042,9 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, exec_ctx, &stream_global->recv_trailing_metadata_finished, 1); } } + if (stream_global->finished_close) { + fail_all_outstanding_ops(exec_ctx, transport_global, stream_global); + } } } |