aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/completion_queue.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/completion_queue.cc')
-rw-r--r--src/core/lib/surface/completion_queue.cc488
1 files changed, 245 insertions, 243 deletions
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 5009f786e6..9dabe76510 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -58,62 +58,62 @@ GPR_TLS_DECL(g_cached_event);
GPR_TLS_DECL(g_cached_cq);
typedef struct {
- grpc_pollset_worker **worker;
- void *tag;
+ grpc_pollset_worker** worker;
+ void* tag;
} plucker;
typedef struct {
bool can_get_pollset;
bool can_listen;
size_t (*size)(void);
- void (*init)(grpc_pollset *pollset, gpr_mu **mu);
- grpc_error *(*kick)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker *specific_worker);
- grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker, grpc_millis deadline);
- void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_closure *closure);
- void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset);
+ void (*init)(grpc_pollset* pollset, gpr_mu** mu);
+ grpc_error* (*kick)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
+ grpc_pollset_worker* specific_worker);
+ grpc_error* (*work)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
+ grpc_pollset_worker** worker, grpc_millis deadline);
+ void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
+ grpc_closure* closure);
+ void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset);
} cq_poller_vtable;
typedef struct non_polling_worker {
gpr_cv cv;
bool kicked;
- struct non_polling_worker *next;
- struct non_polling_worker *prev;
+ struct non_polling_worker* next;
+ struct non_polling_worker* prev;
} non_polling_worker;
typedef struct {
gpr_mu mu;
- non_polling_worker *root;
- grpc_closure *shutdown;
+ non_polling_worker* root;
+ grpc_closure* shutdown;
} non_polling_poller;
static size_t non_polling_poller_size(void) {
return sizeof(non_polling_poller);
}
-static void non_polling_poller_init(grpc_pollset *pollset, gpr_mu **mu) {
- non_polling_poller *npp = (non_polling_poller *)pollset;
+static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
+ non_polling_poller* npp = (non_polling_poller*)pollset;
gpr_mu_init(&npp->mu);
*mu = &npp->mu;
}
-static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset) {
- non_polling_poller *npp = (non_polling_poller *)pollset;
+static void non_polling_poller_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_pollset* pollset) {
+ non_polling_poller* npp = (non_polling_poller*)pollset;
gpr_mu_destroy(&npp->mu);
}
-static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset,
- grpc_pollset_worker **worker,
+static grpc_error* non_polling_poller_work(grpc_exec_ctx* exec_ctx,
+ grpc_pollset* pollset,
+ grpc_pollset_worker** worker,
grpc_millis deadline) {
- non_polling_poller *npp = (non_polling_poller *)pollset;
+ non_polling_poller* npp = (non_polling_poller*)pollset;
if (npp->shutdown) return GRPC_ERROR_NONE;
non_polling_worker w;
gpr_cv_init(&w.cv);
- if (worker != NULL) *worker = (grpc_pollset_worker *)&w;
+ if (worker != NULL) *worker = (grpc_pollset_worker*)&w;
if (npp->root == NULL) {
npp->root = w.next = w.prev = &w;
} else {
@@ -143,13 +143,13 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE;
}
-static grpc_error *non_polling_poller_kick(
- grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker *specific_worker) {
- non_polling_poller *p = (non_polling_poller *)pollset;
- if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root;
+static grpc_error* non_polling_poller_kick(
+ grpc_exec_ctx* exec_ctx, grpc_pollset* pollset,
+ grpc_pollset_worker* specific_worker) {
+ non_polling_poller* p = (non_polling_poller*)pollset;
+ if (specific_worker == NULL) specific_worker = (grpc_pollset_worker*)p->root;
if (specific_worker != NULL) {
- non_polling_worker *w = (non_polling_worker *)specific_worker;
+ non_polling_worker* w = (non_polling_worker*)specific_worker;
if (!w->kicked) {
w->kicked = true;
gpr_cv_signal(&w->cv);
@@ -158,16 +158,16 @@ static grpc_error *non_polling_poller_kick(
return GRPC_ERROR_NONE;
}
-static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset,
- grpc_closure *closure) {
- non_polling_poller *p = (non_polling_poller *)pollset;
+static void non_polling_poller_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_pollset* pollset,
+ grpc_closure* closure) {
+ non_polling_poller* p = (non_polling_poller*)pollset;
GPR_ASSERT(closure != NULL);
p->shutdown = closure;
if (p->root == NULL) {
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
- non_polling_worker *w = p->root;
+ non_polling_worker* w = p->root;
do {
gpr_cv_signal(&w->cv);
w = w->next;
@@ -191,19 +191,19 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
- void (*init)(void *data);
- void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
- void (*destroy)(void *data);
- bool (*begin_op)(grpc_completion_queue *cq, void *tag);
- void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
- grpc_error *error,
- void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
- grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage);
- grpc_event (*next)(grpc_completion_queue *cq, gpr_timespec deadline,
- void *reserved);
- grpc_event (*pluck)(grpc_completion_queue *cq, void *tag,
- gpr_timespec deadline, void *reserved);
+ void (*init)(void* data);
+ void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq);
+ void (*destroy)(void* data);
+ bool (*begin_op)(grpc_completion_queue* cq, void* tag);
+ void (*end_op)(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq, void* tag,
+ grpc_error* error,
+ void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg,
+ grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage);
+ grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
+ void* reserved);
+ grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
+ gpr_timespec deadline, void* reserved);
} cq_vtable;
/* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
@@ -240,7 +240,7 @@ typedef struct cq_next_data {
typedef struct cq_pluck_data {
/** Completed events for completion-queues of type GRPC_CQ_PLUCK */
grpc_cq_completion completed_head;
- grpc_cq_completion *completed_tail;
+ grpc_cq_completion* completed_tail;
/** Number of pending events (+1 if we're not shutdown) */
gpr_atm pending_events;
@@ -267,13 +267,13 @@ struct grpc_completion_queue {
/** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
- gpr_mu *mu;
+ gpr_mu* mu;
- const cq_vtable *vtable;
- const cq_poller_vtable *poller_vtable;
+ const cq_vtable* vtable;
+ const cq_poller_vtable* poller_vtable;
#ifndef NDEBUG
- void **outstanding_tags;
+ void** outstanding_tags;
size_t outstanding_tag_count;
size_t outstanding_tag_capacity;
#endif
@@ -283,44 +283,44 @@ struct grpc_completion_queue {
};
/* Forward declarations */
-static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq);
-static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq);
-static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq);
-static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq);
-
-static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
-static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
-
-static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq, void *tag,
- grpc_error *error,
- void (*done)(grpc_exec_ctx *exec_ctx,
- void *done_arg,
- grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage);
-
-static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq, void *tag,
- grpc_error *error,
- void (*done)(grpc_exec_ctx *exec_ctx,
- void *done_arg,
- grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage);
-
-static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
- void *reserved);
-
-static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
- gpr_timespec deadline, void *reserved);
-
-static void cq_init_next(void *data);
-static void cq_init_pluck(void *data);
-static void cq_destroy_next(void *data);
-static void cq_destroy_pluck(void *data);
+static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq);
+static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq);
+static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq);
+static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq);
+
+static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
+static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
+
+static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq, void* tag,
+ grpc_error* error,
+ void (*done)(grpc_exec_ctx* exec_ctx,
+ void* done_arg,
+ grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage);
+
+static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq, void* tag,
+ grpc_error* error,
+ void (*done)(grpc_exec_ctx* exec_ctx,
+ void* done_arg,
+ grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage);
+
+static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
+ void* reserved);
+
+static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
+ gpr_timespec deadline, void* reserved);
+
+static void cq_init_next(void* data);
+static void cq_init_pluck(void* data);
+static void cq_destroy_next(void* data);
+static void cq_destroy_pluck(void* data);
/* Completion queue vtables based on the completion-type */
static const cq_vtable g_cq_vtable[] = {
@@ -333,9 +333,9 @@ static const cq_vtable g_cq_vtable[] = {
cq_pluck},
};
-#define DATA_FROM_CQ(cq) ((void *)(cq + 1))
+#define DATA_FROM_CQ(cq) ((void*)(cq + 1))
#define POLLSET_FROM_CQ(cq) \
- ((grpc_pollset *)(cq->vtable->data_size + (char *)DATA_FROM_CQ(cq)))
+ ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
grpc_tracer_flag grpc_cq_pluck_trace =
GRPC_TRACER_INITIALIZER(true, "queue_pluck");
@@ -346,39 +346,39 @@ grpc_tracer_flag grpc_cq_event_timeout_trace =
if (GRPC_TRACER_ON(grpc_api_trace) && \
(GRPC_TRACER_ON(grpc_cq_pluck_trace) || \
(event)->type != GRPC_QUEUE_TIMEOUT)) { \
- char *_ev = grpc_event_string(event); \
+ char* _ev = grpc_event_string(event); \
gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \
gpr_free(_ev); \
}
-static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq,
- grpc_error *error);
+static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* cq,
+ grpc_error* error);
void grpc_cq_global_init() {
gpr_tls_init(&g_cached_event);
gpr_tls_init(&g_cached_cq);
}
-void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue *cq) {
- if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == nullptr) {
+void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
+ if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == nullptr) {
gpr_tls_set(&g_cached_event, (intptr_t)0);
gpr_tls_set(&g_cached_cq, (intptr_t)cq);
}
}
-int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq,
- void **tag, int *ok) {
- grpc_cq_completion *storage =
- (grpc_cq_completion *)gpr_tls_get(&g_cached_event);
+int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
+ void** tag, int* ok) {
+ grpc_cq_completion* storage =
+ (grpc_cq_completion*)gpr_tls_get(&g_cached_event);
int ret = 0;
if (storage != NULL &&
- (grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq) {
+ (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
*tag = storage->tag;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
storage->done(&exec_ctx, storage->done_arg, storage);
*ok = (storage->next & (uintptr_t)(1)) == 1;
ret = 1;
- cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
@@ -394,30 +394,30 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq,
return ret;
}
-static void cq_event_queue_init(grpc_cq_event_queue *q) {
+static void cq_event_queue_init(grpc_cq_event_queue* q) {
gpr_mpscq_init(&q->queue);
q->queue_lock = GPR_SPINLOCK_INITIALIZER;
gpr_atm_no_barrier_store(&q->num_queue_items, 0);
}
-static void cq_event_queue_destroy(grpc_cq_event_queue *q) {
+static void cq_event_queue_destroy(grpc_cq_event_queue* q) {
gpr_mpscq_destroy(&q->queue);
}
-static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
- gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c);
+static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) {
+ gpr_mpscq_push(&q->queue, (gpr_mpscq_node*)c);
return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0;
}
-static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
- grpc_cq_completion *c = NULL;
+static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) {
+ grpc_cq_completion* c = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
if (gpr_spinlock_trylock(&q->queue_lock)) {
GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx);
bool is_empty = false;
- c = (grpc_cq_completion *)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty);
+ c = (grpc_cq_completion*)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty);
gpr_spinlock_unlock(&q->queue_lock);
if (c == NULL && !is_empty) {
@@ -438,14 +438,14 @@ static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
/* Note: The counter is not incremented/decremented atomically with push/pop.
* The count is only eventually consistent */
-static long cq_event_queue_num_items(grpc_cq_event_queue *q) {
+static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
return (long)gpr_atm_no_barrier_load(&q->num_queue_items);
}
-grpc_completion_queue *grpc_completion_queue_create_internal(
+grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type,
grpc_cq_polling_type polling_type) {
- grpc_completion_queue *cq;
+ grpc_completion_queue* cq;
GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0);
@@ -454,17 +454,17 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
"polling_type=%d)",
2, (completion_type, polling_type));
- const cq_vtable *vtable = &g_cq_vtable[completion_type];
- const cq_poller_vtable *poller_vtable =
+ const cq_vtable* vtable = &g_cq_vtable[completion_type];
+ const cq_poller_vtable* poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_STATS_INC_CQS_CREATED(&exec_ctx);
grpc_exec_ctx_finish(&exec_ctx);
- cq = (grpc_completion_queue *)gpr_zalloc(sizeof(grpc_completion_queue) +
- vtable->data_size +
- poller_vtable->size());
+ cq = (grpc_completion_queue*)gpr_zalloc(sizeof(grpc_completion_queue) +
+ vtable->data_size +
+ poller_vtable->size());
cq->vtable = vtable;
cq->poller_vtable = poller_vtable;
@@ -483,8 +483,8 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
return cq;
}
-static void cq_init_next(void *ptr) {
- cq_next_data *cqd = (cq_next_data *)ptr;
+static void cq_init_next(void* ptr) {
+ cq_next_data* cqd = (cq_next_data*)ptr;
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->shutdown_called = false;
@@ -492,14 +492,14 @@ static void cq_init_next(void *ptr) {
cq_event_queue_init(&cqd->queue);
}
-static void cq_destroy_next(void *ptr) {
- cq_next_data *cqd = (cq_next_data *)ptr;
+static void cq_destroy_next(void* ptr) {
+ cq_next_data* cqd = (cq_next_data*)ptr;
GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0);
cq_event_queue_destroy(&cqd->queue);
}
-static void cq_init_pluck(void *ptr) {
- cq_pluck_data *cqd = (cq_pluck_data *)ptr;
+static void cq_init_pluck(void* ptr) {
+ cq_pluck_data* cqd = (cq_pluck_data*)ptr;
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->completed_tail = &cqd->completed_head;
@@ -510,16 +510,16 @@ static void cq_init_pluck(void *ptr) {
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
}
-static void cq_destroy_pluck(void *ptr) {
- cq_pluck_data *cqd = (cq_pluck_data *)ptr;
+static void cq_destroy_pluck(void* ptr) {
+ cq_pluck_data* cqd = (cq_pluck_data*)ptr;
GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
}
-grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq) {
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
return cq->vtable->cq_completion_type;
}
-int grpc_get_cq_poll_num(grpc_completion_queue *cq) {
+int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
int cur_num_polls;
gpr_mu_lock(cq->mu);
cur_num_polls = cq->num_polls;
@@ -528,8 +528,8 @@ int grpc_get_cq_poll_num(grpc_completion_queue *cq) {
}
#ifndef NDEBUG
-void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason,
- const char *file, int line) {
+void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
+ const char* file, int line) {
if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -537,20 +537,20 @@ void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason,
reason);
}
#else
-void grpc_cq_internal_ref(grpc_completion_queue *cq) {
+void grpc_cq_internal_ref(grpc_completion_queue* cq) {
#endif
gpr_ref(&cq->owning_refs);
}
-static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_completion_queue *cq = (grpc_completion_queue *)arg;
+static void on_pollset_shutdown_done(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_completion_queue* cq = (grpc_completion_queue*)arg;
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy");
}
#ifndef NDEBUG
-void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
- const char *reason, const char *file, int line) {
+void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq,
+ const char* reason, const char* file, int line) {
if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -558,8 +558,8 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
reason);
}
#else
-void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq) {
+void grpc_cq_internal_unref(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq) {
#endif
if (gpr_unref(&cq->owning_refs)) {
cq->vtable->destroy(DATA_FROM_CQ(cq));
@@ -572,7 +572,7 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
}
#ifndef NDEBUG
-static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
+static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
int found = 0;
if (lock_cq) {
gpr_mu_lock(cq->mu);
@@ -581,7 +581,7 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
for (int i = 0; i < (int)cq->outstanding_tag_count; i++) {
if (cq->outstanding_tags[i] == tag) {
cq->outstanding_tag_count--;
- GPR_SWAP(void *, cq->outstanding_tags[i],
+ GPR_SWAP(void*, cq->outstanding_tags[i],
cq->outstanding_tags[cq->outstanding_tag_count]);
found = 1;
break;
@@ -595,12 +595,12 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
GPR_ASSERT(found);
}
#else
-static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
+static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {}
#endif
/* Atomically increments a counter only if the counter is not zero. Returns
* true if the increment was successful; false if the counter is zero */
-static bool atm_inc_if_nonzero(gpr_atm *counter) {
+static bool atm_inc_if_nonzero(gpr_atm* counter) {
while (true) {
gpr_atm count = gpr_atm_acq_load(counter);
/* If zero, we are done. If not, we must to a CAS (instead of an atomic
@@ -616,22 +616,22 @@ static bool atm_inc_if_nonzero(gpr_atm *counter) {
return true;
}
-static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
- cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag) {
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events);
}
-static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) {
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events);
}
-bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
+bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
#ifndef NDEBUG
gpr_mu_lock(cq->mu);
if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
- cq->outstanding_tags = (void **)gpr_realloc(
+ cq->outstanding_tags = (void**)gpr_realloc(
cq->outstanding_tags,
sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity);
}
@@ -644,19 +644,19 @@ bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_NEXT) */
-static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq, void *tag,
- grpc_error *error,
- void (*done)(grpc_exec_ctx *exec_ctx,
- void *done_arg,
- grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage) {
+static void cq_end_op_for_next(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq, void* tag,
+ grpc_error* error,
+ void (*done)(grpc_exec_ctx* exec_ctx,
+ void* done_arg,
+ grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage) {
GPR_TIMER_BEGIN("cq_end_op_for_next", 0);
if (GRPC_TRACER_ON(grpc_api_trace) ||
(GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE)) {
- const char *errmsg = grpc_error_string(error);
+ const char* errmsg = grpc_error_string(error);
GRPC_API_TRACE(
"cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
@@ -666,7 +666,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
}
- cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
storage->tag = tag;
@@ -676,8 +676,8 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
cq_check_tag(cq, tag, true); /* Used in debug builds only */
- if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq &&
- (grpc_cq_completion *)gpr_tls_get(&g_cached_event) == nullptr) {
+ if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq &&
+ (grpc_cq_completion*)gpr_tls_get(&g_cached_event) == nullptr) {
gpr_tls_set(&g_cached_event, (intptr_t)storage);
} else {
/* Add the completion to the queue */
@@ -695,12 +695,12 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
/* Only kick if this is the first item queued */
if (is_first) {
gpr_mu_lock(cq->mu);
- grpc_error *kick_error =
+ grpc_error* kick_error =
cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL);
gpr_mu_unlock(cq->mu);
if (kick_error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(kick_error);
+ const char* msg = grpc_error_string(kick_error);
gpr_log(GPR_ERROR, "Kick failed: %s", msg);
GRPC_ERROR_UNREF(kick_error);
}
@@ -730,14 +730,14 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_PLUCK) */
-static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq, void *tag,
- grpc_error *error,
- void (*done)(grpc_exec_ctx *exec_ctx,
- void *done_arg,
- grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage) {
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static void cq_end_op_for_pluck(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq, void* tag,
+ grpc_error* error,
+ void (*done)(grpc_exec_ctx* exec_ctx,
+ void* done_arg,
+ grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage) {
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0);
@@ -745,7 +745,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
if (GRPC_TRACER_ON(grpc_api_trace) ||
(GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE)) {
- const char *errmsg = grpc_error_string(error);
+ const char* errmsg = grpc_error_string(error);
GRPC_API_TRACE(
"cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
@@ -774,7 +774,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
cq_finish_shutdown_pluck(exec_ctx, cq);
gpr_mu_unlock(cq->mu);
} else {
- grpc_pollset_worker *pluck_worker = NULL;
+ grpc_pollset_worker* pluck_worker = NULL;
for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag) {
pluck_worker = *cqd->pluckers[i].worker;
@@ -782,13 +782,13 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
}
}
- grpc_error *kick_error =
+ grpc_error* kick_error =
cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), pluck_worker);
gpr_mu_unlock(cq->mu);
if (kick_error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(kick_error);
+ const char* msg = grpc_error_string(kick_error);
gpr_log(GPR_ERROR, "Kick failed: %s", msg);
GRPC_ERROR_UNREF(kick_error);
@@ -800,27 +800,27 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq,
- void *tag, grpc_error *error,
- void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
- grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage) {
+void grpc_cq_end_op(grpc_exec_ctx* exec_ctx, grpc_completion_queue* cq,
+ void* tag, grpc_error* error,
+ void (*done)(grpc_exec_ctx* exec_ctx, void* done_arg,
+ grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage) {
cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage);
}
typedef struct {
gpr_atm last_seen_things_queued_ever;
- grpc_completion_queue *cq;
+ grpc_completion_queue* cq;
grpc_millis deadline;
- grpc_cq_completion *stolen_completion;
- void *tag; /* for pluck */
+ grpc_cq_completion* stolen_completion;
+ void* tag; /* for pluck */
bool first_loop;
} cq_is_finished_arg;
-static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
- cq_is_finished_arg *a = (cq_is_finished_arg *)arg;
- grpc_completion_queue *cq = a->cq;
- cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+static bool cq_is_next_finished(grpc_exec_ctx* exec_ctx, void* arg) {
+ cq_is_finished_arg* a = (cq_is_finished_arg*)arg;
+ grpc_completion_queue* cq = a->cq;
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
@@ -844,7 +844,7 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
}
#ifndef NDEBUG
-static void dump_pending_tags(grpc_completion_queue *cq) {
+static void dump_pending_tags(grpc_completion_queue* cq) {
if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return;
gpr_strvec v;
@@ -852,24 +852,24 @@ static void dump_pending_tags(grpc_completion_queue *cq) {
gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
gpr_mu_lock(cq->mu);
for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
- char *s;
+ char* s;
gpr_asprintf(&s, " %p", cq->outstanding_tags[i]);
gpr_strvec_add(&v, s);
}
gpr_mu_unlock(cq->mu);
- char *out = gpr_strvec_flatten(&v, NULL);
+ char* out = gpr_strvec_flatten(&v, NULL);
gpr_strvec_destroy(&v);
gpr_log(GPR_DEBUG, "%s", out);
gpr_free(out);
}
#else
-static void dump_pending_tags(grpc_completion_queue *cq) {}
+static void dump_pending_tags(grpc_completion_queue* cq) {}
#endif
-static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
- void *reserved) {
+static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
+ void* reserved) {
grpc_event ret;
- cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@@ -879,8 +879,9 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
"deadline=gpr_timespec { tv_sec: %" PRId64
", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 5, (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
- reserved));
+ 5,
+ (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
+ reserved));
GPR_ASSERT(!reserved);
dump_pending_tags(cq);
@@ -901,7 +902,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
grpc_millis iteration_deadline = deadline_millis;
if (is_finished_arg.stolen_completion != NULL) {
- grpc_cq_completion *c = is_finished_arg.stolen_completion;
+ grpc_cq_completion* c = is_finished_arg.stolen_completion;
is_finished_arg.stolen_completion = NULL;
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
@@ -910,7 +911,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
break;
}
- grpc_cq_completion *c = cq_event_queue_pop(&cqd->queue);
+ grpc_cq_completion* c = cq_event_queue_pop(&cqd->queue);
if (c != NULL) {
ret.type = GRPC_OP_COMPLETE;
@@ -957,12 +958,12 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
/* The main polling work happens in grpc_pollset_work */
gpr_mu_lock(cq->mu);
cq->num_polls++;
- grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
+ grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
NULL, iteration_deadline);
gpr_mu_unlock(cq->mu);
if (err != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(err);
+ const char* msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
GRPC_ERROR_UNREF(err);
@@ -997,9 +998,9 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
- Must be called only once in completion queue's lifetime
- grpc_completion_queue_shutdown() MUST have been called before calling
this function */
-static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq) {
- cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+static void cq_finish_shutdown_next(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq) {
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
@@ -1008,9 +1009,9 @@ static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
&cq->pollset_shutdown_done);
}
-static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq) {
- cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+static void cq_shutdown_next(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq) {
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
* We call cq_finish_shutdown_next() below, that would call pollset shutdown.
@@ -1036,14 +1037,14 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
}
-grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
- gpr_timespec deadline, void *reserved) {
+grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
+ gpr_timespec deadline, void* reserved) {
return cq->vtable->next(cq, deadline, reserved);
}
-static int add_plucker(grpc_completion_queue *cq, void *tag,
- grpc_pollset_worker **worker) {
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static int add_plucker(grpc_completion_queue* cq, void* tag,
+ grpc_pollset_worker** worker) {
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
return 0;
}
@@ -1053,9 +1054,9 @@ static int add_plucker(grpc_completion_queue *cq, void *tag,
return 1;
}
-static void del_plucker(grpc_completion_queue *cq, void *tag,
- grpc_pollset_worker **worker) {
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static void del_plucker(grpc_completion_queue* cq, void* tag,
+ grpc_pollset_worker** worker) {
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
cqd->num_pluckers--;
@@ -1066,10 +1067,10 @@ static void del_plucker(grpc_completion_queue *cq, void *tag,
GPR_UNREACHABLE_CODE(return );
}
-static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
- cq_is_finished_arg *a = (cq_is_finished_arg *)arg;
- grpc_completion_queue *cq = a->cq;
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static bool cq_is_pluck_finished(grpc_exec_ctx* exec_ctx, void* arg) {
+ cq_is_finished_arg* a = (cq_is_finished_arg*)arg;
+ grpc_completion_queue* cq = a->cq;
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
@@ -1078,9 +1079,9 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
gpr_mu_lock(cq->mu);
a->last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
- grpc_cq_completion *c;
- grpc_cq_completion *prev = &cqd->completed_head;
- while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
+ grpc_cq_completion* c;
+ grpc_cq_completion* prev = &cqd->completed_head;
+ while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) !=
&cqd->completed_head) {
if (c->tag == a->tag) {
prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
@@ -1098,13 +1099,13 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
return !a->first_loop && a->deadline < grpc_exec_ctx_now(exec_ctx);
}
-static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
- gpr_timespec deadline, void *reserved) {
+static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
+ gpr_timespec deadline, void* reserved) {
grpc_event ret;
- grpc_cq_completion *c;
- grpc_cq_completion *prev;
- grpc_pollset_worker *worker = NULL;
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+ grpc_cq_completion* c;
+ grpc_cq_completion* prev;
+ grpc_pollset_worker* worker = NULL;
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
@@ -1115,8 +1116,9 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
"deadline=gpr_timespec { tv_sec: %" PRId64
", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 6, (cq, tag, deadline.tv_sec, deadline.tv_nsec,
- (int)deadline.clock_type, reserved));
+ 6,
+ (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
+ reserved));
}
GPR_ASSERT(!reserved);
@@ -1146,7 +1148,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
break;
}
prev = &cqd->completed_head;
- while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
+ while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) !=
&cqd->completed_head) {
if (c->tag == tag) {
prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
@@ -1190,12 +1192,12 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
break;
}
cq->num_polls++;
- grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
+ grpc_error* err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq),
&worker, deadline_millis);
if (err != GRPC_ERROR_NONE) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
- const char *msg = grpc_error_string(err);
+ const char* msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
GRPC_ERROR_UNREF(err);
@@ -1218,14 +1220,14 @@ done:
return ret;
}
-grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
- gpr_timespec deadline, void *reserved) {
+grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
+ gpr_timespec deadline, void* reserved) {
return cq->vtable->pluck(cq, tag, deadline, reserved);
}
-static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq) {
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static void cq_finish_shutdown_pluck(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq) {
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
@@ -1237,9 +1239,9 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
/* NOTE: This function is almost exactly identical to cq_shutdown_next() but
* merging them is a bit tricky and probably not worth it */
-static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
- grpc_completion_queue *cq) {
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static void cq_shutdown_pluck(grpc_exec_ctx* exec_ctx,
+ grpc_completion_queue* cq) {
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
* We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
@@ -1264,7 +1266,7 @@ static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
-void grpc_completion_queue_shutdown(grpc_completion_queue *cq) {
+void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
@@ -1273,7 +1275,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cq) {
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
}
-void grpc_completion_queue_destroy(grpc_completion_queue *cq) {
+void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
grpc_completion_queue_shutdown(cq);
@@ -1284,10 +1286,10 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cq) {
GPR_TIMER_END("grpc_completion_queue_destroy", 0);
}
-grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cq) {
+grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : NULL;
}
-bool grpc_cq_can_listen(grpc_completion_queue *cq) {
+bool grpc_cq_can_listen(grpc_completion_queue* cq) {
return cq->poller_vtable->can_listen;
}