diff options
Diffstat (limited to 'src/core/lib/iomgr/combiner.cc')
-rw-r--r-- | src/core/lib/iomgr/combiner.cc | 90 |
1 files changed, 45 insertions, 45 deletions
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 53f4b7eaa7..ca9c00b935 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -43,7 +43,7 @@ grpc_tracer_flag grpc_combiner_trace = #define STATE_ELEM_COUNT_LOW_BIT 2 struct grpc_combiner { - grpc_combiner *next_combiner_on_this_exec_ctx; + grpc_combiner* next_combiner_on_this_exec_ctx; grpc_closure_scheduler scheduler; grpc_closure_scheduler finally_scheduler; gpr_mpscq queue; @@ -62,20 +62,20 @@ struct grpc_combiner { gpr_refcount refs; }; -static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error); -static void combiner_finally_exec(grpc_exec_ctx *exec_ctx, - grpc_closure *closure, grpc_error *error); +static void combiner_exec(grpc_exec_ctx* exec_ctx, grpc_closure* closure, + grpc_error* error); +static void combiner_finally_exec(grpc_exec_ctx* exec_ctx, + grpc_closure* closure, grpc_error* error); static const grpc_closure_scheduler_vtable scheduler = { combiner_exec, combiner_exec, "combiner:immediately"}; static const grpc_closure_scheduler_vtable finally_scheduler = { combiner_finally_exec, combiner_finally_exec, "combiner:finally"}; -static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); +static void offload(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error); -grpc_combiner *grpc_combiner_create(void) { - grpc_combiner *lock = (grpc_combiner *)gpr_zalloc(sizeof(*lock)); +grpc_combiner* grpc_combiner_create(void) { + grpc_combiner* lock = (grpc_combiner*)gpr_zalloc(sizeof(*lock)); gpr_ref_init(&lock->refs, 1); lock->scheduler.vtable = &scheduler; lock->finally_scheduler.vtable = &finally_scheduler; @@ -88,14 +88,14 @@ grpc_combiner *grpc_combiner_create(void) { return lock; } -static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { +static void really_destroy(grpc_exec_ctx* exec_ctx, grpc_combiner* lock) { GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p really_destroy", lock)); GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); gpr_mpscq_destroy(&lock->queue); gpr_free(lock); } -static void start_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { +static void start_destroy(grpc_exec_ctx* exec_ctx, grpc_combiner* lock) { gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED); GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); @@ -116,22 +116,22 @@ static void start_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { #define GRPC_COMBINER_DEBUG_SPAM(op, delta) #endif -void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, - grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) { +void grpc_combiner_unref(grpc_exec_ctx* exec_ctx, + grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) { GRPC_COMBINER_DEBUG_SPAM("UNREF", -1); if (gpr_unref(&lock->refs)) { start_destroy(exec_ctx, lock); } } -grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) { +grpc_combiner* grpc_combiner_ref(grpc_combiner* lock GRPC_COMBINER_DEBUG_ARGS) { GRPC_COMBINER_DEBUG_SPAM(" REF", 1); gpr_ref(&lock->refs); return lock; } -static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx, - grpc_combiner *lock) { +static void push_last_on_exec_ctx(grpc_exec_ctx* exec_ctx, + grpc_combiner* lock) { lock->next_combiner_on_this_exec_ctx = NULL; if (exec_ctx->active_combiner == NULL) { exec_ctx->active_combiner = exec_ctx->last_combiner = lock; @@ -141,8 +141,8 @@ static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx, } } -static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx, - grpc_combiner *lock) { +static void push_first_on_exec_ctx(grpc_exec_ctx* exec_ctx, + grpc_combiner* lock) { lock->next_combiner_on_this_exec_ctx = exec_ctx->active_combiner; exec_ctx->active_combiner = lock; if (lock->next_combiner_on_this_exec_ctx == NULL) { @@ -151,14 +151,14 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx, } #define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \ - ((grpc_combiner *)(((char *)((closure)->scheduler)) - \ - offsetof(grpc_combiner, scheduler_name))) + ((grpc_combiner*)(((char*)((closure)->scheduler)) - \ + offsetof(grpc_combiner, scheduler_name))) -static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl, - grpc_error *error) { +static void combiner_exec(grpc_exec_ctx* exec_ctx, grpc_closure* cl, + grpc_error* error) { GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx); GPR_TIMER_BEGIN("combiner.execute", 0); - grpc_combiner *lock = COMBINER_FROM_CLOSURE_SCHEDULER(cl, scheduler); + grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(cl, scheduler); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p last=%" PRIdPTR, @@ -187,7 +187,7 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl, GPR_TIMER_END("combiner.execute", 0); } -static void move_next(grpc_exec_ctx *exec_ctx) { +static void move_next(grpc_exec_ctx* exec_ctx) { exec_ctx->active_combiner = exec_ctx->active_combiner->next_combiner_on_this_exec_ctx; if (exec_ctx->active_combiner == NULL) { @@ -195,21 +195,21 @@ static void move_next(grpc_exec_ctx *exec_ctx) { } } -static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_combiner *lock = (grpc_combiner *)arg; +static void offload(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_combiner* lock = (grpc_combiner*)arg; push_last_on_exec_ctx(exec_ctx, lock); } -static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { +static void queue_offload(grpc_exec_ctx* exec_ctx, grpc_combiner* lock) { GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx); move_next(exec_ctx); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload", lock)); GRPC_CLOSURE_SCHED(exec_ctx, &lock->offload, GRPC_ERROR_NONE); } -bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { +bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx* exec_ctx) { GPR_TIMER_BEGIN("combiner.continue_exec_ctx", 0); - grpc_combiner *lock = exec_ctx->active_combiner; + grpc_combiner* lock = exec_ctx->active_combiner; if (lock == NULL) { GPR_TIMER_END("combiner.continue_exec_ctx", 0); return false; @@ -241,7 +241,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { // peek to see if something new has shown up, and execute that with // priority (gpr_atm_acq_load(&lock->state) >> 1) > 1) { - gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue); + gpr_mpscq_node* n = gpr_mpscq_pop(&lock->queue); GRPC_COMBINER_TRACE( gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n)); if (n == NULL) { @@ -253,8 +253,8 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { return true; } GPR_TIMER_BEGIN("combiner.exec1", 0); - grpc_closure *cl = (grpc_closure *)n; - grpc_error *cl_err = cl->error_data.error; + grpc_closure* cl = (grpc_closure*)n; + grpc_error* cl_err = cl->error_data.error; #ifndef NDEBUG cl->scheduled = false; #endif @@ -262,7 +262,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { GRPC_ERROR_UNREF(cl_err); GPR_TIMER_END("combiner.exec1", 0); } else { - grpc_closure *c = lock->final_list.head; + grpc_closure* c = lock->final_list.head; GPR_ASSERT(c != NULL); grpc_closure_list_init(&lock->final_list); int loops = 0; @@ -270,8 +270,8 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { GPR_TIMER_BEGIN("combiner.exec_1final", 0); GRPC_COMBINER_TRACE( gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c)); - grpc_closure *next = c->next_data.next; - grpc_error *error = c->error_data.error; + grpc_closure* next = c->next_data.next; + grpc_error* error = c->error_data.error; #ifndef NDEBUG c->scheduled = false; #endif @@ -327,13 +327,13 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { return true; } -static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, - grpc_error *error); +static void enqueue_finally(grpc_exec_ctx* exec_ctx, void* closure, + grpc_error* error); -static void combiner_finally_exec(grpc_exec_ctx *exec_ctx, - grpc_closure *closure, grpc_error *error) { +static void combiner_finally_exec(grpc_exec_ctx* exec_ctx, + grpc_closure* closure, grpc_error* error) { GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(exec_ctx); - grpc_combiner *lock = + grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p", @@ -356,17 +356,17 @@ static void combiner_finally_exec(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("combiner.execute_finally", 0); } -static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, - grpc_error *error) { - combiner_finally_exec(exec_ctx, (grpc_closure *)closure, +static void enqueue_finally(grpc_exec_ctx* exec_ctx, void* closure, + grpc_error* error) { + combiner_finally_exec(exec_ctx, (grpc_closure*)closure, GRPC_ERROR_REF(error)); } -grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner) { +grpc_closure_scheduler* grpc_combiner_scheduler(grpc_combiner* combiner) { return &combiner->scheduler; } -grpc_closure_scheduler *grpc_combiner_finally_scheduler( - grpc_combiner *combiner) { +grpc_closure_scheduler* grpc_combiner_finally_scheduler( + grpc_combiner* combiner) { return &combiner->finally_scheduler; } |