diff options
author | Sree Kuchibhotla <sreek@google.com> | 2017-07-25 14:08:33 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2017-07-25 14:08:33 -0700 |
commit | 59beeff53158d0f27337e7305c730f0835de2f58 (patch) | |
tree | fa27404847048c476f094681b3c7d102f3f828b8 /src/core/lib/surface | |
parent | b633a86e1a39c9d3bb74a226a6174b88683ca372 (diff) | |
parent | ad5a9c2a0db1926eaec110a7fe573875840c6ce3 (diff) |
Merge branch 'master' into fix_alarm
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/alarm.c | 3 | ||||
-rw-r--r-- | src/core/lib/surface/api_trace.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 10 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 648 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 3 | ||||
-rw-r--r-- | src/core/lib/surface/init.c | 44 | ||||
-rw-r--r-- | src/core/lib/surface/init_secure.c | 10 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 120 |
8 files changed, 486 insertions, 354 deletions
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index 343e48c101..300648627c 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -23,7 +23,8 @@ #include "src/core/lib/surface/completion_queue.h" #ifndef NDEBUG -grpc_tracer_flag grpc_trace_alarm_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_alarm_refcount = + GRPC_TRACER_INITIALIZER(false, "alarm_refcount"); #endif struct grpc_alarm { diff --git a/src/core/lib/surface/api_trace.c b/src/core/lib/surface/api_trace.c index f88ffd57aa..56973303da 100644 --- a/src/core/lib/surface/api_trace.c +++ b/src/core/lib/surface/api_trace.c @@ -19,4 +19,4 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/debug/trace.h" -grpc_tracer_flag grpc_api_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_api_trace = GRPC_TRACER_INITIALIZER(false, "api"); diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index c769866ceb..2365d27307 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -229,8 +229,10 @@ struct grpc_call { void *saved_receiving_stream_ready_bctlp; }; -grpc_tracer_flag grpc_call_error_trace = GRPC_TRACER_INITIALIZER(false); -grpc_tracer_flag grpc_compression_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_call_error_trace = + GRPC_TRACER_INITIALIZER(false, "call_error"); +grpc_tracer_flag grpc_compression_trace = + GRPC_TRACER_INITIALIZER(false, "compression"); #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) @@ -1504,7 +1506,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, goto done_with_error; } /* TODO(ctiller): just make these the same variable? */ - call->metadata_batch[0][0].deadline = call->send_deadline; + if (call->is_client) { + call->metadata_batch[0][0].deadline = call->send_deadline; + } stream_op_payload->send_initial_metadata.send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; stream_op_payload->send_initial_metadata.send_initial_metadata_flags = diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b04aee6c73..978d7b4171 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -35,10 +35,13 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/event_string.h" -grpc_tracer_flag grpc_trace_operation_failures = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_operation_failures = + GRPC_TRACER_INITIALIZER(false, "op_failure"); #ifndef NDEBUG -grpc_tracer_flag grpc_trace_pending_tags = GRPC_TRACER_INITIALIZER(false); -grpc_tracer_flag grpc_trace_cq_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_pending_tags = + GRPC_TRACER_INITIALIZER(false, "pending_tags"); +grpc_tracer_flag grpc_trace_cq_refcount = + GRPC_TRACER_INITIALIZER(false, "cq_refcount"); #endif typedef struct { @@ -189,16 +192,19 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; - size_t (*size)(); - void (*begin_op)(grpc_completion_queue *cc, void *tag); - void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag, + size_t data_size; + void (*init)(void *data); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); + void (*destroy)(void *data); + void (*begin_op)(grpc_completion_queue *cq, void *tag); + void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); - grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline, + grpc_event (*next)(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved); - grpc_event (*pluck)(grpc_completion_queue *cc, void *tag, + grpc_event (*pluck)(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved); } cq_vtable; @@ -218,25 +224,28 @@ typedef struct grpc_cq_event_queue { gpr_atm num_queue_items; } grpc_cq_event_queue; -/* TODO: sreek Refactor this based on the completion_type. Put completion-type - * specific data in a different structure (and co-allocate memory for it along - * with completion queue + pollset )*/ -typedef struct cq_data { - gpr_mu *mu; +typedef struct cq_next_data { + /** Completed events for completion-queues of type GRPC_CQ_NEXT */ + grpc_cq_event_queue queue; + /** Counter of how many things have ever been queued on this completion queue + useful for avoiding locks to check the queue */ + gpr_atm things_queued_ever; + + /* Number of outstanding events (+1 if not shut down) */ + gpr_atm pending_events; + + int shutdown_called; +} cq_next_data; + +typedef struct cq_pluck_data { /** Completed events for completion-queues of type GRPC_CQ_PLUCK */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; - /** Completed events for completion-queues of type GRPC_CQ_NEXT */ - grpc_cq_event_queue queue; - /** Number of pending events (+1 if we're not shutdown) */ gpr_refcount pending_events; - /** Once owning_refs drops to zero, we will destroy the cq */ - gpr_refcount owning_refs; - /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; @@ -245,37 +254,45 @@ typedef struct cq_data { gpr_atm shutdown; int shutdown_called; - int is_server_cq; - int num_pluckers; - int num_polls; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; - grpc_closure pollset_shutdown_done; +} cq_pluck_data; + +/* Completion queue structure */ +struct grpc_completion_queue { + /** Once owning_refs drops to zero, we will destroy the cq */ + gpr_refcount owning_refs; + + gpr_mu *mu; + + const cq_vtable *vtable; + const cq_poller_vtable *poller_vtable; #ifndef NDEBUG void **outstanding_tags; size_t outstanding_tag_count; size_t outstanding_tag_capacity; #endif -} cq_data; -/* Completion queue structure */ -struct grpc_completion_queue { - cq_data data; - const cq_vtable *vtable; - const cq_poller_vtable *poller_vtable; + grpc_closure pollset_shutdown_done; + int num_polls; }; /* Forward declarations */ -static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc); - -static size_t cq_size(grpc_completion_queue *cc); - -static void cq_begin_op(grpc_completion_queue *cc, void *tag); +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); + +static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); +static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -283,42 +300,56 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage); static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); -static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, +static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved); -static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, +static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved); +static void cq_init_next(void *data); +static void cq_init_pluck(void *data); +static void cq_destroy_next(void *data); +static void cq_destroy_pluck(void *data); + /* Completion queue vtables based on the completion-type */ static const cq_vtable g_cq_vtable[] = { /* GRPC_CQ_NEXT */ - {.cq_completion_type = GRPC_CQ_NEXT, - .size = cq_size, - .begin_op = cq_begin_op, + {.data_size = sizeof(cq_next_data), + .cq_completion_type = GRPC_CQ_NEXT, + .init = cq_init_next, + .shutdown = cq_shutdown_next, + .destroy = cq_destroy_next, + .begin_op = cq_begin_op_for_next, .end_op = cq_end_op_for_next, .next = cq_next, .pluck = NULL}, /* GRPC_CQ_PLUCK */ - {.cq_completion_type = GRPC_CQ_PLUCK, - .size = cq_size, - .begin_op = cq_begin_op, + {.data_size = sizeof(cq_pluck_data), + .cq_completion_type = GRPC_CQ_PLUCK, + .init = cq_init_pluck, + .shutdown = cq_shutdown_pluck, + .destroy = cq_destroy_pluck, + .begin_op = cq_begin_op_for_pluck, .end_op = cq_end_op_for_pluck, .next = NULL, .pluck = cq_pluck}, }; -#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) -#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1) +#define DATA_FROM_CQ(cq) ((void *)(cq + 1)) +#define POLLSET_FROM_CQ(cq) \ + ((grpc_pollset *)(cq->vtable->data_size + (char *)DATA_FROM_CQ(cq))) -grpc_tracer_flag grpc_cq_pluck_trace = GRPC_TRACER_INITIALIZER(true); -grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); +grpc_tracer_flag grpc_cq_pluck_trace = + GRPC_TRACER_INITIALIZER(true, "queue_pluck"); +grpc_tracer_flag grpc_cq_event_timeout_trace = + GRPC_TRACER_INITIALIZER(true, "queue_timeout"); #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \ if (GRPC_TRACER_ON(grpc_api_trace) && \ @@ -329,7 +360,7 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); gpr_free(_ev); \ } -static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, +static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq, grpc_error *error); static void cq_event_queue_init(grpc_cq_event_queue *q) { @@ -342,9 +373,9 @@ static void cq_event_queue_destroy(grpc_cq_event_queue *q) { gpr_mpscq_destroy(&q->queue); } -static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { +static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c); - gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1); + return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0; } static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { @@ -367,16 +398,10 @@ static long cq_event_queue_num_items(grpc_cq_event_queue *q) { return (long)gpr_atm_no_barrier_load(&q->num_queue_items); } -static size_t cq_size(grpc_completion_queue *cc) { - /* Size of the completion queue and the size of the pollset whose memory is - allocated right after that of completion queue */ - return sizeof(grpc_completion_queue) + cc->poller_vtable->size(); -} - grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type) { - grpc_completion_queue *cc; + grpc_completion_queue *cq; GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0); @@ -389,158 +414,173 @@ grpc_completion_queue *grpc_completion_queue_create_internal( const cq_poller_vtable *poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; - cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size()); - cq_data *cqd = &cc->data; + cq = gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + + poller_vtable->size()); - cc->vtable = vtable; - cc->poller_vtable = poller_vtable; + cq->vtable = vtable; + cq->poller_vtable = poller_vtable; - poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu); + /* One for destroy(), one for pollset_shutdown */ + gpr_ref_init(&cq->owning_refs, 2); -#ifndef NDEBUG - cqd->outstanding_tags = NULL; - cqd->outstanding_tag_capacity = 0; -#endif + poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu); + vtable->init(DATA_FROM_CQ(cq)); + + GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq, + grpc_schedule_on_exec_ctx); + + GPR_TIMER_END("grpc_completion_queue_create_internal", 0); + + return cq; +} +static void cq_init_next(void *ptr) { + cq_next_data *cqd = ptr; + /* Initial ref is dropped by grpc_completion_queue_shutdown */ + gpr_atm_no_barrier_store(&cqd->pending_events, 1); + cqd->shutdown_called = false; + gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); + cq_event_queue_init(&cqd->queue); +} + +static void cq_destroy_next(void *ptr) { + cq_next_data *cqd = ptr; + GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0); + cq_event_queue_destroy(&cqd->queue); +} + +static void cq_init_pluck(void *ptr) { + cq_pluck_data *cqd = ptr; /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cqd->pending_events, 1); - /* One for destroy(), one for pollset_shutdown */ - gpr_ref_init(&cqd->owning_refs, 2); cqd->completed_tail = &cqd->completed_head; cqd->completed_head.next = (uintptr_t)cqd->completed_tail; gpr_atm_no_barrier_store(&cqd->shutdown, 0); cqd->shutdown_called = 0; - cqd->is_server_cq = 0; cqd->num_pluckers = 0; - cqd->num_polls = 0; gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); -#ifndef NDEBUG - cqd->outstanding_tag_count = 0; -#endif - cq_event_queue_init(&cqd->queue); - GRPC_CLOSURE_INIT(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc, - grpc_schedule_on_exec_ctx); - - GPR_TIMER_END("grpc_completion_queue_create_internal", 0); +} - return cc; +static void cq_destroy_pluck(void *ptr) { + cq_pluck_data *cqd = ptr; + GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); } -grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { - return cc->vtable->cq_completion_type; +grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq) { + return cq->vtable->cq_completion_type; } -int grpc_get_cq_poll_num(grpc_completion_queue *cc) { +int grpc_get_cq_poll_num(grpc_completion_queue *cq) { int cur_num_polls; - gpr_mu_lock(cc->data.mu); - cur_num_polls = cc->data.num_polls; - gpr_mu_unlock(cc->data.mu); + gpr_mu_lock(cq->mu); + cur_num_polls = cq->num_polls; + gpr_mu_unlock(cq->mu); return cur_num_polls; } #ifndef NDEBUG -void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, +void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason, const char *file, int line) { - cq_data *cqd = &cc->data; if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) { - gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count); + gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val + 1, + "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1, reason); } #else -void grpc_cq_internal_ref(grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; +void grpc_cq_internal_ref(grpc_completion_queue *cq) { #endif - gpr_ref(&cqd->owning_refs); + gpr_ref(&cq->owning_refs); } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_completion_queue *cc = arg; - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cc, "pollset_destroy"); + grpc_completion_queue *cq = arg; + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy"); } #ifndef NDEBUG -void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, const char *reason, const char *file, int line) { - cq_data *cqd = &cc->data; if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) { - gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count); + gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val - 1, + "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1, reason); } #else void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; + grpc_completion_queue *cq) { #endif - if (gpr_unref(&cqd->owning_refs)) { - GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); - cc->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cc)); - cq_event_queue_destroy(&cqd->queue); + if (gpr_unref(&cq->owning_refs)) { + cq->vtable->destroy(DATA_FROM_CQ(cq)); + cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq)); #ifndef NDEBUG - gpr_free(cqd->outstanding_tags); + gpr_free(cq->outstanding_tags); #endif - gpr_free(cc); + gpr_free(cq); } } -static void cq_begin_op(grpc_completion_queue *cc, void *tag) { - cq_data *cqd = &cc->data; -#ifndef NDEBUG - gpr_mu_lock(cqd->mu); +static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + GPR_ASSERT(!cqd->shutdown_called); + gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1); +} + +static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(!cqd->shutdown_called); - if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) { - cqd->outstanding_tag_capacity = - GPR_MAX(4, 2 * cqd->outstanding_tag_capacity); - cqd->outstanding_tags = - gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) * - cqd->outstanding_tag_capacity); - } - cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag; - gpr_mu_unlock(cqd->mu); -#endif gpr_ref(&cqd->pending_events); } -void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { - cc->vtable->begin_op(cc, tag); +void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { +#ifndef NDEBUG + gpr_mu_lock(cq->mu); + if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { + cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); + cq->outstanding_tags = + gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * + cq->outstanding_tag_capacity); + } + cq->outstanding_tags[cq->outstanding_tag_count++] = tag; + gpr_mu_unlock(cq->mu); +#endif + cq->vtable->begin_op(cq, tag); } #ifndef NDEBUG -static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) { - cq_data *cqd = &cc->data; +static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { int found = 0; if (lock_cq) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); } - for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) { - if (cqd->outstanding_tags[i] == tag) { - cqd->outstanding_tag_count--; - GPR_SWAP(void *, cqd->outstanding_tags[i], - cqd->outstanding_tags[cqd->outstanding_tag_count]); + for (int i = 0; i < (int)cq->outstanding_tag_count; i++) { + if (cq->outstanding_tags[i] == tag) { + cq->outstanding_tag_count--; + GPR_SWAP(void *, cq->outstanding_tags[i], + cq->outstanding_tags[cq->outstanding_tag_count]); found = 1; break; } } if (lock_cq) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); } GPR_ASSERT(found); } #else -static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {} +static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} #endif -/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a + * completion * type of GRPC_CQ_NEXT) */ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -553,16 +593,16 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } } - cq_data *cqd = &cc->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); storage->tag = tag; @@ -570,28 +610,42 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, storage->done_arg = done_arg; storage->next = (uintptr_t)(is_success); - cq_check_tag(cc, tag, true); /* Used in debug builds only */ + cq_check_tag(cq, tag, true); /* Used in debug builds only */ /* Add the completion to the queue */ - cq_event_queue_push(&cqd->queue, storage); + bool is_first = cq_event_queue_push(&cqd->queue, storage); gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - - gpr_mu_lock(cqd->mu); - - int shutdown = gpr_unref(&cqd->pending_events); - if (!shutdown) { - grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL); - gpr_mu_unlock(cqd->mu); - - if (kick_error != GRPC_ERROR_NONE) { - const char *msg = grpc_error_string(kick_error); - gpr_log(GPR_ERROR, "Kick failed: %s", msg); - - GRPC_ERROR_UNREF(kick_error); + bool will_definitely_shutdown = + gpr_atm_no_barrier_load(&cqd->pending_events) == 1; + + if (!will_definitely_shutdown) { + /* Only kick if this is the first item queued */ + if (is_first) { + gpr_mu_lock(cq->mu); + grpc_error *kick_error = + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); + + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + GRPC_ERROR_UNREF(kick_error); + } + } + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } } else { - cq_finish_shutdown(exec_ctx, cc); - gpr_mu_unlock(cqd->mu); + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_atm_rel_store(&cqd->pending_events, 0); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } GPR_TIMER_END("cq_end_op_for_next", 0); @@ -599,16 +653,17 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a + * completion * type of GRPC_CQ_PLUCK) */ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); @@ -618,9 +673,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); @@ -632,8 +687,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, storage->done_arg = done_arg; storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success)); - gpr_mu_lock(cqd->mu); - cq_check_tag(cc, tag, false); /* Used in debug builds only */ + gpr_mu_lock(cq->mu); + cq_check_tag(cq, tag, false); /* Used in debug builds only */ /* Add to the list of completions */ gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); @@ -652,9 +707,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, } grpc_error *kick_error = - cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); @@ -663,8 +718,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(kick_error); } } else { - cq_finish_shutdown(exec_ctx, cc); - gpr_mu_unlock(cqd->mu); + cq_finish_shutdown_pluck(exec_ctx, cq); + gpr_mu_unlock(cq->mu); } GPR_TIMER_END("cq_end_op_for_pluck", 0); @@ -672,12 +727,12 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, +void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage); + cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage); } typedef struct { @@ -692,7 +747,7 @@ typedef struct { static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; - cq_data *cqd = &cq->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = @@ -703,7 +758,8 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { gpr_atm_no_barrier_load(&cqd->things_queued_ever); /* Pop a cq_completion from the queue. Returns NULL if the queue is empty - * might return NULL in some cases even if the queue is not empty; but that + * might return NULL in some cases even if the queue is not empty; but + * that * is ok and doesn't affect correctness. Might effect the tail latencies a * bit) */ a->stolen_completion = cq_event_queue_pop(&cqd->queue); @@ -716,58 +772,56 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { } #ifndef NDEBUG -static void dump_pending_tags(grpc_completion_queue *cc) { +static void dump_pending_tags(grpc_completion_queue *cq) { if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return; - cq_data *cqd = &cc->data; - gpr_strvec v; gpr_strvec_init(&v); gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:")); - gpr_mu_lock(cqd->mu); - for (size_t i = 0; i < cqd->outstanding_tag_count; i++) { + gpr_mu_lock(cq->mu); + for (size_t i = 0; i < cq->outstanding_tag_count; i++) { char *s; - gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]); + gpr_asprintf(&s, " %p", cq->outstanding_tags[i]); gpr_strvec_add(&v, s); } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); char *out = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); gpr_log(GPR_DEBUG, "%s", out); gpr_free(out); } #else -static void dump_pending_tags(grpc_completion_queue *cc) {} +static void dump_pending_tags(grpc_completion_queue *cq) {} #endif -static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, +static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { grpc_event ret; gpr_timespec now; - cq_data *cqd = &cc->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); GRPC_API_TRACE( "grpc_completion_queue_next(" - "cc=%p, " + "cq=%p, " "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, + 5, (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, reserved)); GPR_ASSERT(!reserved); - dump_pending_tags(cc); + dump_pending_tags(cq); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cc, "next"); + GRPC_CQ_INTERNAL_REF(cq, "next"); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), - .cq = cc, + .cq = cq, .deadline = deadline, .stolen_completion = NULL, .tag = NULL, @@ -800,21 +854,24 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, /* If c == NULL it means either the queue is empty OR in an transient inconsistent state. If it is the latter, we shold do a 0-timeout poll so that the thread comes back quickly from poll to make a second - attempt at popping. Not doing this can potentially deadlock this thread + attempt at popping. Not doing this can potentially deadlock this + thread forever (if the deadline is infinity) */ if (cq_event_queue_num_items(&cqd->queue) > 0) { iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); } } - if (gpr_atm_no_barrier_load(&cqd->shutdown)) { + if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) { /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ if (cq_event_queue_num_items(&cqd->queue) > 0) { /* Go to the beginning of the loop. No point doing a poll because - (cc->shutdown == true) is only possible when there is no pending work - (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion + (cq->shutdown == true) is only possible when there is no pending + work + (i.e cq->pending_events == 0) and any outstanding + grpc_cq_completion events are already queued on this cq */ continue; } @@ -828,16 +885,16 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } /* The main polling work happens in grpc_pollset_work */ - gpr_mu_lock(cqd->mu); - cqd->num_polls++; - grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + gpr_mu_lock(cq->mu); + cq->num_polls++; + grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), NULL, now, iteration_deadline); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(err); @@ -846,30 +903,74 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } is_finished_arg.first_loop = false; } - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "next"); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); + if (cq_event_queue_num_items(&cqd->queue) > 0 && + gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { + gpr_mu_lock(cq->mu); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); + } + GPR_TIMER_END("grpc_completion_queue_next", 0); return ret; } -grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, +/* Finishes the completion queue shutdown. This means that there are no more + completion events / tags expected from the completion queue + - Must be called under completion queue lock + - Must be called only once in completion queue's lifetime + - grpc_completion_queue_shutdown() MUST have been called before calling + this function */ +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GPR_ASSERT(cqd->shutdown_called); + GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); + + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); +} + +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + if (cqd->shutdown_called) { + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); + return; + } + cqd->shutdown_called = 1; + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + cq_finish_shutdown_next(exec_ctx, cq); + } + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); +} + +grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { - return cc->vtable->next(cc, deadline, reserved); + return cq->vtable->next(cq, deadline, reserved); } -static int add_plucker(grpc_completion_queue *cc, void *tag, +static int add_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } @@ -879,9 +980,9 @@ static int add_plucker(grpc_completion_queue *cc, void *tag, return 1; } -static void del_plucker(grpc_completion_queue *cc, void *tag, +static void del_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); for (int i = 0; i < cqd->num_pluckers; i++) { if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { cqd->num_pluckers--; @@ -895,13 +996,13 @@ static void del_plucker(grpc_completion_queue *cc, void *tag, static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; - cq_data *cqd = &cq->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); a->last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); grpc_cq_completion *c; @@ -913,51 +1014,51 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { if (c == cqd->completed_tail) { cqd->completed_tail = prev; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); a->stolen_completion = c; return true; } prev = c; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); } return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; } -static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, +static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; gpr_timespec now; - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); if (GRPC_TRACER_ON(grpc_cq_pluck_trace)) { GRPC_API_TRACE( "grpc_completion_queue_pluck(" - "cc=%p, tag=%p, " + "cq=%p, tag=%p, " "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec, + 6, (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, reserved)); } GPR_ASSERT(!reserved); - dump_pending_tags(cc); + dump_pending_tags(cq); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cc, "pluck"); - gpr_mu_lock(cqd->mu); + GRPC_CQ_INTERNAL_REF(cq, "pluck"); + gpr_mu_lock(cq->mu); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), - .cq = cc, + .cq = cq, .deadline = deadline, .stolen_completion = NULL, .tag = tag, @@ -966,7 +1067,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg); for (;;) { if (is_finished_arg.stolen_completion != NULL) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); c = is_finished_arg.stolen_completion; is_finished_arg.stolen_completion = NULL; ret.type = GRPC_OP_COMPLETE; @@ -983,7 +1084,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, if (c == cqd->completed_tail) { cqd->completed_tail = prev; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -993,54 +1094,54 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, prev = c; } if (gpr_atm_no_barrier_load(&cqd->shutdown)) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } - if (!add_plucker(cc, tag, &worker)) { + if (!add_plucker(cq, tag, &worker)) { gpr_log(GPR_DEBUG, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } now = gpr_now(GPR_CLOCK_MONOTONIC); if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { - del_plucker(cc, tag, &worker); - gpr_mu_unlock(cqd->mu); + del_plucker(cq, tag, &worker); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } - cqd->num_polls++; - grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + cq->num_polls++; + grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), &worker, now, deadline); if (err != GRPC_ERROR_NONE) { - del_plucker(cc, tag, &worker); - gpr_mu_unlock(cqd->mu); + del_plucker(cq, tag, &worker); + gpr_mu_unlock(cq->mu); const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg); GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } is_finished_arg.first_loop = false; - del_plucker(cc, tag, &worker); + del_plucker(cq, tag, &worker); } done: - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck"); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); @@ -1049,85 +1150,66 @@ done: return ret; } -grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, +grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) { - return cc->vtable->pluck(cc, tag, deadline, reserved); + return cq->vtable->pluck(cq, tag, deadline, reserved); } -/* Finishes the completion queue shutdown. This means that there are no more - completion events / tags expected from the completion queue - - Must be called under completion queue lock - - Must be called only once in completion queue's lifetime - - grpc_completion_queue_shutdown() MUST have been called before calling - this function */ -static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; +static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); gpr_atm_no_barrier_store(&cqd->shutdown, 1); - cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cqd->pollset_shutdown_done); + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); } -/* Shutdown simply drops a ref that we reserved at creation time; if we drop - to zero here, then enter shutdown mode and wake up any waiters */ -void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); - GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - cq_data *cqd = &cc->data; +static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } cqd->shutdown_called = 1; if (gpr_unref(&cqd->pending_events)) { - cq_finish_shutdown(&exec_ctx, cc); + cq_finish_shutdown_pluck(exec_ctx, cq); } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); +} + +/* Shutdown simply drops a ref that we reserved at creation time; if we drop + to zero here, then enter shutdown mode and wake up any waiters */ +void grpc_completion_queue_shutdown(grpc_completion_queue *cq) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); + GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); + cq->vtable->shutdown(&exec_ctx, cq); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } -void grpc_completion_queue_destroy(grpc_completion_queue *cc) { - GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); +void grpc_completion_queue_destroy(grpc_completion_queue *cq) { + GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq)); GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); - grpc_completion_queue_shutdown(cc); - - /* TODO (sreek): This should not ideally be here. Refactor it into the - * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */ - if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) { - GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0); - } + grpc_completion_queue_shutdown(cq); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy"); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy"); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_destroy", 0); } -grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL; -} - -grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { - return CQ_FROM_POLLSET(ps); -} - -void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { - cc->data.is_server_cq = 1; -} - -bool grpc_cq_is_server_cq(grpc_completion_queue *cc) { - return cc->data.is_server_cq; +grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cq) { + return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : NULL; } -bool grpc_cq_can_listen(grpc_completion_queue *cc) { - return cc->poller_vtable->can_listen; +bool grpc_cq_can_listen(grpc_completion_queue *cq) { + return cq->poller_vtable->can_listen; } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 97ea9cae20..af44482513 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -84,10 +84,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *done_arg, grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); -grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps); -void grpc_cq_mark_server_cq(grpc_completion_queue *cc); -bool grpc_cq_is_server_cq(grpc_completion_queue *cc); bool grpc_cq_can_listen(grpc_completion_queue *cc); grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 7a5f398c96..d199ac060e 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -121,30 +121,28 @@ void grpc_init(void) { grpc_slice_intern_init(); grpc_mdctx_global_init(); grpc_channel_init_init(); - grpc_register_tracer("api", &grpc_api_trace); - grpc_register_tracer("channel", &grpc_trace_channel); - grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace); - grpc_register_tracer("channel_stack_builder", - &grpc_trace_channel_stack_builder); - grpc_register_tracer("http1", &grpc_http1_trace); - grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace); // default on - grpc_register_tracer("combiner", &grpc_combiner_trace); - grpc_register_tracer("server_channel", &grpc_server_channel_trace); - grpc_register_tracer("bdp_estimator", &grpc_bdp_estimator_trace); - grpc_register_tracer("queue_timeout", - &grpc_cq_event_timeout_trace); // default on - grpc_register_tracer("op_failure", &grpc_trace_operation_failures); - grpc_register_tracer("resource_quota", &grpc_resource_quota_trace); - grpc_register_tracer("call_error", &grpc_call_error_trace); + grpc_register_tracer(&grpc_api_trace); + grpc_register_tracer(&grpc_trace_channel); + grpc_register_tracer(&grpc_connectivity_state_trace); + grpc_register_tracer(&grpc_trace_channel_stack_builder); + grpc_register_tracer(&grpc_http1_trace); + grpc_register_tracer(&grpc_cq_pluck_trace); // default on + grpc_register_tracer(&grpc_combiner_trace); + grpc_register_tracer(&grpc_server_channel_trace); + grpc_register_tracer(&grpc_bdp_estimator_trace); + grpc_register_tracer(&grpc_cq_event_timeout_trace); // default on + grpc_register_tracer(&grpc_trace_operation_failures); + grpc_register_tracer(&grpc_resource_quota_trace); + grpc_register_tracer(&grpc_call_error_trace); #ifndef NDEBUG - grpc_register_tracer("pending_tags", &grpc_trace_pending_tags); - grpc_register_tracer("alarm_refcount", &grpc_trace_alarm_refcount); - grpc_register_tracer("queue_refcount", &grpc_trace_cq_refcount); - grpc_register_tracer("closure", &grpc_trace_closure); - grpc_register_tracer("error_refcount", &grpc_trace_error_refcount); - grpc_register_tracer("stream_refcount", &grpc_trace_stream_refcount); - grpc_register_tracer("fd_refcount", &grpc_trace_fd_refcount); - grpc_register_tracer("metadata", &grpc_trace_metadata); + grpc_register_tracer(&grpc_trace_pending_tags); + grpc_register_tracer(&grpc_trace_alarm_refcount); + grpc_register_tracer(&grpc_trace_cq_refcount); + grpc_register_tracer(&grpc_trace_closure); + grpc_register_tracer(&grpc_trace_error_refcount); + grpc_register_tracer(&grpc_trace_stream_refcount); + grpc_register_tracer(&grpc_trace_fd_refcount); + grpc_register_tracer(&grpc_trace_metadata); #endif grpc_security_pre_init(); grpc_iomgr_init(&exec_ctx); diff --git a/src/core/lib/surface/init_secure.c b/src/core/lib/surface/init_secure.c index 7dbea581d0..2366c24910 100644 --- a/src/core/lib/surface/init_secure.c +++ b/src/core/lib/surface/init_secure.c @@ -37,13 +37,11 @@ #endif void grpc_security_pre_init(void) { - grpc_register_tracer("secure_endpoint", &grpc_trace_secure_endpoint); - grpc_register_tracer("transport_security", &tsi_tracing_enabled); + grpc_register_tracer(&grpc_trace_secure_endpoint); + grpc_register_tracer(&tsi_tracing_enabled); #ifndef NDEBUG - grpc_register_tracer("auth_context_refcount", - &grpc_trace_auth_context_refcount); - grpc_register_tracer("security_connector_refcount", - &grpc_trace_security_connector_refcount); + grpc_register_tracer(&grpc_trace_auth_context_refcount); + grpc_register_tracer(&grpc_trace_security_connector_refcount); #endif } diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 84ddf74ab9..fce7f8dca1 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -32,8 +32,7 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/mpscq.h" -#include "src/core/lib/support/spinlock.h" +#include "src/core/lib/support/stack_lockfree.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -59,10 +58,10 @@ typedef struct registered_method registered_method; typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; -grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_server_channel_trace = + GRPC_TRACER_INITIALIZER(false, "server_channel"); typedef struct requested_call { - gpr_mpscq_node request_link; /* must be first */ requested_call_type type; size_t cq_idx; void *tag; @@ -162,7 +161,7 @@ struct request_matcher { grpc_server *server; call_data *pending_head; call_data *pending_tail; - gpr_locked_mpscq *requests_per_cq; + gpr_stack_lockfree **requests_per_cq; }; struct registered_method { @@ -207,6 +206,11 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; + /** free list of available requested_calls_per_cq indices */ + gpr_stack_lockfree **request_freelist_per_cq; + /** requested call backing data */ + requested_call **requested_calls_per_cq; + int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -306,20 +310,21 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, * request_matcher */ -static void request_matcher_init(request_matcher *rm, grpc_server *server) { +static void request_matcher_init(request_matcher *rm, size_t entries, + grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; rm->requests_per_cq = gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); for (size_t i = 0; i < server->cq_count; i++) { - gpr_locked_mpscq_init(&rm->requests_per_cq[i]); + rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); } } static void request_matcher_destroy(request_matcher *rm) { for (size_t i = 0; i < rm->server->cq_count; i++) { - GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL); - gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); + GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); + gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); } gpr_free(rm->requests_per_cq); } @@ -349,17 +354,13 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, grpc_server *server, request_matcher *rm, grpc_error *error) { - requested_call *rc; + int request_id; for (size_t i = 0; i < server->cq_count; i++) { - /* Here we know: - 1. no requests are being added (since the server is shut down) - 2. no other threads are pulling (since the shut down process is single - threaded) - So, we can ignore the queue lock and just pop, with the guarantee that a - NULL returned here truly means that the queue is empty */ - while ((rc = (requested_call *)gpr_mpscq_pop( - &rm->requests_per_cq[i].queue)) != NULL) { - fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error)); + while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != + -1) { + fail_call(exec_ctx, server, i, + &server->requested_calls_per_cq[i][request_id], + GRPC_ERROR_REF(error)); } } GRPC_ERROR_UNREF(error); @@ -394,7 +395,13 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); + if (server->started) { + gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); + gpr_free(server->requested_calls_per_cq[i]); + } } + gpr_free(server->request_freelist_per_cq); + gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -452,7 +459,21 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, grpc_cq_completion *c) { - gpr_free(req); + requested_call *rc = req; + grpc_server *server = rc->server; + + if (rc >= server->requested_calls_per_cq[rc->cq_idx] && + rc < server->requested_calls_per_cq[rc->cq_idx] + + server->max_requested_calls_per_cq) { + GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); + gpr_stack_lockfree_push( + server->request_freelist_per_cq[rc->cq_idx], + (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); + } else { + gpr_free(req); + } + + server_unref(exec_ctx, server); } static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -482,6 +503,10 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, GPR_UNREACHABLE_CODE(return ); } + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + channel_data *chand = elem->channel_data; + server_ref(chand->server); grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, rc, &rc->completion); } @@ -509,15 +534,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, for (size_t i = 0; i < server->cq_count; i++) { size_t cq_idx = (chand->cq_idx + i) % server->cq_count; - requested_call *rc = - (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); - if (rc == NULL) { + int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) { continue; } else { gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls_per_cq[cq_idx][request_id]); return; /* early out */ } } @@ -951,8 +976,6 @@ static void register_completion_queue(grpc_server *server, if (server->cqs[i] == cq) return; } - grpc_cq_mark_server_cq(cq); - GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, @@ -992,6 +1015,8 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; + /* TODO(ctiller): expose a channel_arg for this */ + server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1064,15 +1089,29 @@ void grpc_server_start(grpc_server *server) { server->started = true; server->pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); + server->request_freelist_per_cq = + gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); + server->requested_calls_per_cq = + gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } + server->request_freelist_per_cq[i] = + gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); + for (int j = 0; j < server->max_requested_calls_per_cq; j++) { + gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); + } + server->requested_calls_per_cq[i] = + gpr_malloc((size_t)server->max_requested_calls_per_cq * + sizeof(*server->requested_calls_per_cq[i])); } - request_matcher_init(&server->unregistered_request_matcher, server); + request_matcher_init(&server->unregistered_request_matcher, + (size_t)server->max_requested_calls_per_cq, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, server); + request_matcher_init(&rm->request_matcher, + (size_t)server->max_requested_calls_per_cq, server); } server_ref(server); @@ -1116,9 +1155,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, chand->channel = channel; size_t cq_idx; - grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset); for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) { - if (s->cqs[cq_idx] == accepting_cq) break; + if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break; } if (cq_idx == s->cq_count) { /* completion queue not found: pick a random one to publish new calls to */ @@ -1326,11 +1364,21 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, requested_call *rc) { call_data *calld = NULL; request_matcher *rm = NULL; + int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { fail_call(exec_ctx, server, cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } + request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); + if (request_id == -1) { + /* out of request ids: just fail this one */ + fail_call(exec_ctx, server, cq_idx, rc, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"), + GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq)); + return GRPC_CALL_OK; + } switch (rc->type) { case BATCH_CALL: rm = &server->unregistered_request_matcher; @@ -1339,13 +1387,15 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &rc->data.registered.registered_method->request_matcher; break; } - if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { + server->requested_calls_per_cq[cq_idx][request_id] = *rc; + gpr_free(rc); + if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { - rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); - if (rc == NULL) break; + request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) break; rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); gpr_mu_lock(&calld->mu_state); @@ -1361,7 +1411,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls_per_cq[cq_idx][request_id]); } gpr_mu_lock(&server->mu_call); } @@ -1468,6 +1519,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); + server_ref(server); grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, done_request_event, rc, &rc->completion); } |