diff options
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.c | 123 | ||||
-rw-r--r-- | src/core/lib/surface/call.h | 32 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 18 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 167 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 3 | ||||
-rw-r--r-- | src/core/lib/surface/init.c | 3 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 33 |
7 files changed, 283 insertions, 96 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 119f5e82ab..97bfd587d2 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -230,33 +230,33 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error); -grpc_call *grpc_call_create( - grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, - grpc_completion_queue *cq, grpc_pollset_set *pollset_set_alternative, - const void *server_transport_data, grpc_mdelem **add_initial_metadata, - size_t add_initial_metadata_count, gpr_timespec send_deadline) { +grpc_error *grpc_call_create(const grpc_call_create_args *args, + grpc_call **out_call) { size_t i, j; - grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); + grpc_channel_stack *channel_stack = + grpc_channel_get_channel_stack(args->channel); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_call *call; GPR_TIMER_BEGIN("grpc_call_create", 0); - call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); + *out_call = call = + gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); memset(call, 0, sizeof(grpc_call)); gpr_mu_init(&call->mu); - call->channel = channel; - call->cq = cq; - call->parent = parent_call; + call->channel = args->channel; + call->cq = args->cq; + call->parent = args->parent_call; /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); - call->is_client = server_transport_data == NULL; + call->is_client = args->server_transport_data == NULL; if (call->is_client) { - GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); - for (i = 0; i < add_initial_metadata_count; i++) { - call->send_extra_metadata[i].md = add_initial_metadata[i]; + GPR_ASSERT(args->add_initial_metadata_count < + MAX_SEND_EXTRA_METADATA_COUNT); + for (i = 0; i < args->add_initial_metadata_count; i++) { + call->send_extra_metadata[i].md = args->add_initial_metadata[i]; } - call->send_extra_metadata_count = (int)add_initial_metadata_count; + call->send_extra_metadata_count = (int)args->add_initial_metadata_count; } else { - GPR_ASSERT(add_initial_metadata_count == 0); + GPR_ASSERT(args->add_initial_metadata_count == 0); call->send_extra_metadata_count = 0; } for (i = 0; i < 2; i++) { @@ -265,78 +265,79 @@ grpc_call *grpc_call_create( } } call->send_deadline = - gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC); - GRPC_CHANNEL_INTERNAL_REF(channel, "call"); + gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC); + GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); /* initial refcount dropped by grpc_call_destroy */ grpc_error *error = grpc_call_stack_init( &exec_ctx, channel_stack, 1, destroy_call, call, call->context, - server_transport_data, CALL_STACK_FROM_CALL(call)); + args->server_transport_data, CALL_STACK_FROM_CALL(call)); if (error != GRPC_ERROR_NONE) { intptr_t status; - if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) + if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) { status = GRPC_STATUS_UNKNOWN; + } const char *error_str = grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION); close_with_status(&exec_ctx, call, (grpc_status_code)status, error_str == NULL ? "unknown error" : error_str); - GRPC_ERROR_UNREF(error); } - if (cq != NULL) { + if (args->cq != NULL) { GPR_ASSERT( - pollset_set_alternative == NULL && + args->pollset_set_alternative == NULL && "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL."); - GRPC_CQ_INTERNAL_REF(cq, "bind"); + GRPC_CQ_INTERNAL_REF(args->cq, "bind"); call->pollent = - grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)); + grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq)); } - if (pollset_set_alternative != NULL) { - call->pollent = - grpc_polling_entity_create_from_pollset_set(pollset_set_alternative); + if (args->pollset_set_alternative != NULL) { + call->pollent = grpc_polling_entity_create_from_pollset_set( + args->pollset_set_alternative); } if (!grpc_polling_entity_is_empty(&call->pollent)) { grpc_call_stack_set_pollset_or_pollset_set( &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); } - if (parent_call != NULL) { - GRPC_CALL_INTERNAL_REF(parent_call, "child"); + gpr_timespec send_deadline = args->send_deadline; + if (args->parent_call != NULL) { + GRPC_CALL_INTERNAL_REF(args->parent_call, "child"); GPR_ASSERT(call->is_client); - GPR_ASSERT(!parent_call->is_client); + GPR_ASSERT(!args->parent_call->is_client); - gpr_mu_lock(&parent_call->mu); + gpr_mu_lock(&args->parent_call->mu); - if (propagation_mask & GRPC_PROPAGATE_DEADLINE) { + if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( gpr_convert_clock_type(send_deadline, - parent_call->send_deadline.clock_type), - parent_call->send_deadline); + args->parent_call->send_deadline.clock_type), + args->parent_call->send_deadline); } /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with * GRPC_PROPAGATE_STATS_CONTEXT */ /* TODO(ctiller): This should change to use the appropriate census start_op * call. */ - if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) { - GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); - grpc_call_context_set(call, GRPC_CONTEXT_TRACING, - parent_call->context[GRPC_CONTEXT_TRACING].value, - NULL); + if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) { + GPR_ASSERT(args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); + grpc_call_context_set( + call, GRPC_CONTEXT_TRACING, + args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL); } else { - GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); + GPR_ASSERT(args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); } - if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) { + if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; } - if (parent_call->first_child == NULL) { - parent_call->first_child = call; + if (args->parent_call->first_child == NULL) { + args->parent_call->first_child = call; call->sibling_next = call->sibling_prev = call; } else { - call->sibling_next = parent_call->first_child; - call->sibling_prev = parent_call->first_child->sibling_prev; + call->sibling_next = args->parent_call->first_child; + call->sibling_prev = args->parent_call->first_child->sibling_prev; call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = call; } - gpr_mu_unlock(&parent_call->mu); + gpr_mu_unlock(&args->parent_call->mu); } if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != 0) { @@ -344,7 +345,7 @@ grpc_call *grpc_call_create( } grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_call_create", 0); - return call; + return error; } void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, @@ -1089,9 +1090,14 @@ static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; + grpc_error *error = bctl->error; + if (bctl->recv_final_op) { + GRPC_ERROR_UNREF(error); + error = GRPC_ERROR_NONE; + } if (bctl->is_notify_tag_closure) { /* unrefs bctl->error */ - grpc_exec_ctx_sched(exec_ctx, bctl->notify_tag, bctl->error, NULL); + grpc_closure_run(exec_ctx, bctl->notify_tag, error); gpr_mu_lock(&call->mu); bctl->call->used_batches = (uint8_t)(bctl->call->used_batches & @@ -1100,7 +1106,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { /* unrefs bctl->error */ - grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->error, + grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, error, finish_batch_completion, bctl, &bctl->cq_completion); } } @@ -1257,6 +1263,14 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, } } +static void add_batch_error(batch_control *bctl, grpc_error *error) { + if (error == GRPC_ERROR_NONE) return; + if (bctl->error == GRPC_ERROR_NONE) { + bctl->error = GRPC_ERROR_CREATE("Call batch operation failed"); + } + bctl->error = grpc_error_add_child(bctl->error, error); +} + static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; @@ -1264,9 +1278,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&call->mu); - if (error != GRPC_ERROR_NONE) { - bctl->error = GRPC_ERROR_REF(error); - } else { + add_batch_error(bctl, GRPC_ERROR_REF(error)); + if (error == GRPC_ERROR_NONE) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; grpc_metadata_batch_filter(md, recv_initial_filter, call); @@ -1359,8 +1372,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } - GRPC_ERROR_UNREF(bctl->error); - bctl->error = GRPC_ERROR_REF(error); + add_batch_error(bctl, GRPC_ERROR_REF(error)); gpr_mu_unlock(&call->mu); if (gpr_unref(&bctl->steps_to_complete)) { post_batch_completion(exec_ctx, bctl); @@ -1396,6 +1408,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *stream_op = &bctl->op; memset(stream_op, 0, sizeof(*stream_op)); + stream_op->covered_by_poller = true; if (nops == 0) { GRPC_CALL_INTERNAL_REF(call, "completion"); diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index 3a78fe3aa3..18af41b7fb 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -49,15 +49,29 @@ typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx, grpc_call *call, int success, void *user_data); -grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, - uint32_t propagation_mask, - grpc_completion_queue *cq, - /* if not NULL, it'll be used in lieu of \a cq */ - grpc_pollset_set *pollset_set_alternative, - const void *server_transport_data, - grpc_mdelem **add_initial_metadata, - size_t add_initial_metadata_count, - gpr_timespec send_deadline); +typedef struct grpc_call_create_args { + grpc_channel *channel; + + grpc_call *parent_call; + uint32_t propagation_mask; + + grpc_completion_queue *cq; + /* if not NULL, it'll be used in lieu of cq */ + grpc_pollset_set *pollset_set_alternative; + + const void *server_transport_data; + + grpc_mdelem **add_initial_metadata; + size_t add_initial_metadata_count; + + gpr_timespec send_deadline; +} grpc_call_create_args; + +/* Create a new call based on \a args. + Regardless of success or failure, always returns a valid new call into *call + */ +grpc_error *grpc_call_create(const grpc_call_create_args *args, + grpc_call **call); void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_completion_queue *cq); diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 52e78567bd..aa8c052b41 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -208,9 +208,21 @@ static grpc_call *grpc_channel_create_call_internal( send_metadata[num_metadata++] = GRPC_MDELEM_REF(channel->default_authority); } - return grpc_call_create(channel, parent_call, propagation_mask, cq, - pollset_set_alternative, NULL, send_metadata, - num_metadata, deadline); + grpc_call_create_args args; + memset(&args, 0, sizeof(args)); + args.channel = channel; + args.parent_call = parent_call; + args.propagation_mask = propagation_mask; + args.cq = cq; + args.pollset_set_alternative = pollset_set_alternative; + args.server_transport_data = NULL; + args.add_initial_metadata = send_metadata; + args.add_initial_metadata_count = num_metadata; + args.send_deadline = deadline; + + grpc_call *call; + GRPC_LOG_IF_ERROR("call_create", grpc_call_create(&args, &call)); + return call; } grpc_call *grpc_channel_create_call(grpc_channel *channel, diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 5978884db8..4e0feb56ac 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -39,6 +39,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/time.h> #include "src/core/lib/iomgr/pollset.h" @@ -50,6 +51,9 @@ #include "src/core/lib/surface/event_string.h" int grpc_trace_operation_failures; +#ifndef NDEBUG +int grpc_trace_pending_tags; +#endif typedef struct { grpc_pollset_worker **worker; @@ -67,6 +71,9 @@ struct grpc_completion_queue { gpr_refcount pending_events; /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; + /** counter of how many things have ever been queued on this completion queue + useful for avoiding locks to check the queue */ + gpr_atm things_queued_ever; /** 0 initially, 1 once we've begun shutting down */ int shutdown; int shutdown_called; @@ -121,15 +128,6 @@ void grpc_cq_global_shutdown(void) { } } -struct grpc_cq_alarm { - grpc_timer alarm; - grpc_cq_completion completion; - /** completion queue where events about this alarm will be posted */ - grpc_completion_queue *cq; - /** user supplied tag */ - void *tag; -}; - grpc_completion_queue *grpc_completion_queue_create(void *reserved) { grpc_completion_queue *cc; GPR_ASSERT(!reserved); @@ -166,6 +164,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { cc->is_server_cq = 0; cc->is_non_listening_server_cq = 0; cc->num_pluckers = 0; + gpr_atm_no_barrier_store(&cc->things_queued_ever, 0); #ifndef NDEBUG cc->outstanding_tag_count = 0; #endif @@ -276,6 +275,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GPR_ASSERT(found); #endif shutdown = gpr_unref(&cc->pending_events); + gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1); if (!shutdown) { cc->completed_tail->next = ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); @@ -313,13 +313,66 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GRPC_ERROR_UNREF(error); } +typedef struct { + gpr_atm last_seen_things_queued_ever; + grpc_completion_queue *cq; + gpr_timespec deadline; + grpc_cq_completion *stolen_completion; + void *tag; /* for pluck */ + bool first_loop; +} cq_is_finished_arg; + +static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { + cq_is_finished_arg *a = arg; + grpc_completion_queue *cq = a->cq; + GPR_ASSERT(a->stolen_completion == NULL); + gpr_atm current_last_seen_things_queued_ever = + gpr_atm_no_barrier_load(&cq->things_queued_ever); + if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { + gpr_mu_lock(cq->mu); + a->last_seen_things_queued_ever = + gpr_atm_no_barrier_load(&cq->things_queued_ever); + if (cq->completed_tail != &cq->completed_head) { + a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next; + cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1; + if (a->stolen_completion == cq->completed_tail) { + cq->completed_tail = &cq->completed_head; + } + gpr_mu_unlock(cq->mu); + return true; + } + gpr_mu_unlock(cq->mu); + } + return !a->first_loop && + gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; +} + +#ifndef NDEBUG +static void dump_pending_tags(grpc_completion_queue *cc) { + if (!grpc_trace_pending_tags) return; + + gpr_strvec v; + gpr_strvec_init(&v); + gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:")); + for (size_t i = 0; i < cc->outstanding_tag_count; i++) { + char *s; + gpr_asprintf(&s, " %p", cc->outstanding_tags[i]); + gpr_strvec_add(&v, s); + } + char *out = gpr_strvec_flatten(&v, NULL); + gpr_strvec_destroy(&v); + gpr_log(GPR_DEBUG, "%s", out); + gpr_free(out); +} +#else +static void dump_pending_tags(grpc_completion_queue *cc) {} +#endif + grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_pollset_worker *worker = NULL; - int first_loop = 1; gpr_timespec now; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -333,11 +386,33 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, reserved)); GPR_ASSERT(!reserved); + dump_pending_tags(cc); + deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "next"); gpr_mu_lock(cc->mu); + cq_is_finished_arg is_finished_arg = { + .last_seen_things_queued_ever = + gpr_atm_no_barrier_load(&cc->things_queued_ever), + .cq = cc, + .deadline = deadline, + .stolen_completion = NULL, + .tag = NULL, + .first_loop = true}; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK( + cq_is_next_finished, &is_finished_arg); for (;;) { + if (is_finished_arg.stolen_completion != NULL) { + gpr_mu_unlock(cc->mu); + grpc_cq_completion *c = is_finished_arg.stolen_completion; + is_finished_arg.stolen_completion = NULL; + ret.type = GRPC_OP_COMPLETE; + ret.success = c->next & 1u; + ret.tag = c->tag; + c->done(&exec_ctx, c->done_arg, c); + break; + } if (cc->completed_tail != &cc->completed_head) { grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; cc->completed_head.next = c->next & ~(uintptr_t)1; @@ -358,13 +433,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, break; } now = gpr_now(GPR_CLOCK_MONOTONIC); - if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { + if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; + dump_pending_tags(cc); break; } - first_loop = 0; /* Check alarms - these are a global resource so we just ping each time through on every pollset. May update deadline to ensure timely wakeups. @@ -387,13 +462,16 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; + dump_pending_tags(cc); break; } } + is_finished_arg.first_loop = false; } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(is_finished_arg.stolen_completion == NULL); GPR_TIMER_END("grpc_completion_queue_next", 0); @@ -424,6 +502,37 @@ static void del_plucker(grpc_completion_queue *cc, void *tag, GPR_UNREACHABLE_CODE(return ); } +static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { + cq_is_finished_arg *a = arg; + grpc_completion_queue *cq = a->cq; + GPR_ASSERT(a->stolen_completion == NULL); + gpr_atm current_last_seen_things_queued_ever = + gpr_atm_no_barrier_load(&cq->things_queued_ever); + if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { + gpr_mu_lock(cq->mu); + a->last_seen_things_queued_ever = + gpr_atm_no_barrier_load(&cq->things_queued_ever); + grpc_cq_completion *c; + grpc_cq_completion *prev = &cq->completed_head; + while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != + &cq->completed_head) { + if (c->tag == a->tag) { + prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); + if (c == cq->completed_tail) { + cq->completed_tail = prev; + } + gpr_mu_unlock(cq->mu); + a->stolen_completion = c; + return true; + } + prev = c; + } + gpr_mu_unlock(cq->mu); + } + return !a->first_loop && + gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; +} + grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec deadline, void *reserved) { grpc_event ret; @@ -431,8 +540,6 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; gpr_timespec now; - int first_loop = 1; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); @@ -448,11 +555,33 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, } GPR_ASSERT(!reserved); + dump_pending_tags(cc); + deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "pluck"); gpr_mu_lock(cc->mu); + cq_is_finished_arg is_finished_arg = { + .last_seen_things_queued_ever = + gpr_atm_no_barrier_load(&cc->things_queued_ever), + .cq = cc, + .deadline = deadline, + .stolen_completion = NULL, + .tag = tag, + .first_loop = true}; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK( + cq_is_pluck_finished, &is_finished_arg); for (;;) { + if (is_finished_arg.stolen_completion != NULL) { + gpr_mu_unlock(cc->mu); + c = is_finished_arg.stolen_completion; + is_finished_arg.stolen_completion = NULL; + ret.type = GRPC_OP_COMPLETE; + ret.success = c->next & 1u; + ret.tag = c->tag; + c->done(&exec_ctx, c->done_arg, c); + break; + } prev = &cc->completed_head; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != &cc->completed_head) { @@ -485,17 +614,18 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; + dump_pending_tags(cc); break; } now = gpr_now(GPR_CLOCK_MONOTONIC); - if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { + if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { del_plucker(cc, tag, &worker); gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; + dump_pending_tags(cc); break; } - first_loop = 0; /* Check alarms - these are a global resource so we just ping each time through on every pollset. May update deadline to ensure timely wakeups. @@ -518,15 +648,18 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; + dump_pending_tags(cc); break; } } + is_finished_arg.first_loop = false; del_plucker(cc, tag, &worker); } done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(is_finished_arg.stolen_completion == NULL); GPR_TIMER_END("grpc_completion_queue_pluck", 0); diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 3049284f68..e9d840df77 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -44,6 +44,9 @@ extern int grpc_cq_pluck_trace; extern int grpc_cq_event_timeout_trace; extern int grpc_trace_operation_failures; +#ifndef NDEBUG +extern int grpc_trace_pending_tags; +#endif typedef struct grpc_cq_completion { /** user supplied tag */ diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index edda0c85fa..ac111253ef 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -173,6 +173,9 @@ void grpc_init(void) { // Default timeout trace to 1 grpc_cq_event_timeout_trace = 1; grpc_register_tracer("op_failure", &grpc_trace_operation_failures); +#ifndef NDEBUG + grpc_register_tracer("pending_tags", &grpc_trace_pending_tags); +#endif grpc_security_pre_init(); grpc_iomgr_init(); grpc_executor_init(); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 56fb80e92e..9a9fcddb6e 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -280,6 +280,7 @@ static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_channel_element *elem; op->send_goaway = send_goaway; + op->set_accept_stream = true; sc->slice = gpr_slice_from_copied_string("Server shutdown"); op->goaway_message = &sc->slice; op->goaway_status = GRPC_STATUS_OK; @@ -439,6 +440,13 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; + if (error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(error); + gpr_log(GPR_INFO, "Disconnected client: %s", msg); + grpc_error_free_string(msg); + } + GRPC_ERROR_UNREF(error); + grpc_transport_op *op = grpc_make_transport_op(&chand->finish_destroy_channel_closure); op->set_accept_stream = true; @@ -446,13 +454,6 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_channel_stack_element( grpc_channel_get_channel_stack(chand->channel), 0), op); - - if (error != GRPC_ERROR_NONE) { - const char *msg = grpc_error_string(error); - gpr_log(GPR_INFO, "Disconnected client: %s", msg); - grpc_error_free_string(msg); - } - GRPC_ERROR_UNREF(error); } static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { @@ -773,8 +774,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, GRPC_ERROR_CREATE_REFERENCING("Missing :authority or :path", &error, 1); } - grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv_initial_metadata, error, - NULL); + grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata, error); } static void server_mutate_op(grpc_call_element *elem, @@ -829,11 +829,20 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, const void *transport_server_data) { channel_data *chand = cd; /* create a call */ - grpc_call *call = grpc_call_create(chand->channel, NULL, 0, NULL, NULL, - transport_server_data, NULL, 0, - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_call_create_args args; + memset(&args, 0, sizeof(args)); + args.channel = chand->channel; + args.server_transport_data = transport_server_data; + args.send_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + grpc_call *call; + grpc_error *error = grpc_call_create(&args, &call); grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + if (error != GRPC_ERROR_NONE) { + got_initial_metadata(exec_ctx, elem, error); + GRPC_ERROR_UNREF(error); + return; + } call_data *calld = elem->call_data; grpc_op op; memset(&op, 0, sizeof(op)); |