diff options
author | Vijay Pai <vpai@google.com> | 2016-02-25 14:43:57 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2016-02-25 14:43:57 -0800 |
commit | c05865259bb662a4a777b5ba16b3ab6ed9964c51 (patch) | |
tree | d5785b4c1a8195df5b1e89a00039c2ca1d1f265f /src/core | |
parent | 84a74f89668fbf2b6aebbde7ac0e1675cf7f064c (diff) | |
parent | 9114a142c9211e0f3401166c0b753349573caf14 (diff) |
Merge pull request #5307 from ctiller/hide-the-worker
Change pollset_work signature to allow moving grpc_pollset_worker to implementation
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/iomgr/pollset.h | 4 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 38 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.c | 36 | ||||
-rw-r--r-- | src/core/security/google_default_credentials.c | 2 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 12 |
5 files changed, 50 insertions, 42 deletions
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index c6b0214dea..6585326f81 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -83,7 +83,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset); pollset lock */ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker, gpr_timespec now, gpr_timespec deadline); /* Break one polling thread out of polling work for this pollset. diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 1063727248..ee7e9f48f4 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -246,8 +246,11 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { + grpc_pollset_worker worker; + *worker_hdl = &worker; + /* pollset->mu already held */ int added_worker = 0; int locked = 1; @@ -255,16 +258,16 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int keep_polling = 0; GPR_TIMER_BEGIN("grpc_pollset_work", 0); /* this must happen before we (potentially) drop pollset->mu */ - worker->next = worker->prev = NULL; - worker->reevaluate_polling_on_wakeup = 0; + worker.next = worker.prev = NULL; + worker.reevaluate_polling_on_wakeup = 0; if (pollset->local_wakeup_cache != NULL) { - worker->wakeup_fd = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker->wakeup_fd->next; + worker.wakeup_fd = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd->next; } else { - worker->wakeup_fd = gpr_malloc(sizeof(*worker->wakeup_fd)); - grpc_wakeup_fd_init(&worker->wakeup_fd->fd); + worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); + grpc_wakeup_fd_init(&worker.wakeup_fd->fd); } - worker->kicked_specifically = 0; + worker.kicked_specifically = 0; /* If there's work waiting for the pollset to be idle, and the pollset is idle, then do that work */ if (!grpc_pollset_has_workers(pollset) && @@ -293,13 +296,13 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, keep_polling = 0; if (!pollset->kicked_without_pollers) { if (!added_worker) { - push_front_worker(pollset, worker); + push_front_worker(pollset, &worker); added_worker = 1; - gpr_tls_set(&g_current_thread_worker, (intptr_t)worker); + gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); } gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, + pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, deadline, now); GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; @@ -321,10 +324,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* If we're forced to re-evaluate polling (via grpc_pollset_kick with GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force a loop */ - if (worker->reevaluate_polling_on_wakeup) { - worker->reevaluate_polling_on_wakeup = 0; + if (worker.reevaluate_polling_on_wakeup) { + worker.reevaluate_polling_on_wakeup = 0; pollset->kicked_without_pollers = 0; - if (queued_work || worker->kicked_specifically) { + if (queued_work || worker.kicked_specifically) { /* If there's queued work on the list, then set the deadline to be immediate so we get back out of the polling loop quickly */ deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); @@ -333,12 +336,12 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } if (added_worker) { - remove_worker(pollset, worker); + remove_worker(pollset, &worker); gpr_tls_set(&g_current_thread_worker, 0); } /* release wakeup fd to the local pool */ - worker->wakeup_fd->next = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker->wakeup_fd; + worker.wakeup_fd->next = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd; /* check shutdown conditions */ if (pollset->shutting_down) { if (grpc_pollset_has_workers(pollset)) { @@ -360,6 +363,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); } } + *worker_hdl = NULL; GPR_TIMER_END("grpc_pollset_work", 0); } diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 35a956b27f..bbce23b46a 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -125,22 +125,25 @@ void grpc_pollset_reset(grpc_pollset *pollset) { } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { + grpc_pollset_worker worker; + *worker_hdl = &worker; + int added_worker = 0; - worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = - worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = - worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = - worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL; - worker->kicked = 0; - worker->pollset = pollset; - gpr_cv_init(&worker->cv); + worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = + worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = + worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = + worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL; + worker.kicked = 0; + worker.pollset = pollset; + gpr_cv_init(&worker.cv); if (!pollset->kicked_without_pollers && !pollset->shutting_down) { if (g_active_poller == NULL) { grpc_pollset_worker *next_worker; /* become poller */ pollset->is_iocp_worker = 1; - g_active_poller = worker; + g_active_poller = &worker; gpr_mu_unlock(&grpc_polling_mu); grpc_iocp_work(exec_ctx, deadline); grpc_exec_ctx_flush(exec_ctx); @@ -167,12 +170,12 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, goto done; } push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL, - worker); + &worker); push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET, - worker); + &worker); added_worker = 1; - while (!worker->kicked) { - if (gpr_cv_wait(&worker->cv, &grpc_polling_mu, deadline)) { + while (!worker.kicked) { + if (gpr_cv_wait(&worker.cv, &grpc_polling_mu, deadline)) { break; } } @@ -186,10 +189,11 @@ done: gpr_mu_lock(&grpc_polling_mu); } if (added_worker) { - remove_worker(worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); - remove_worker(worker, GRPC_POLLSET_WORKER_LINK_POLLSET); + remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); + remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET); } - gpr_cv_destroy(&worker->cv); + gpr_cv_destroy(&worker.cv); + *worker_hdl = NULL; } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index bd4b0e462a..458d0d3ac3 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -122,7 +122,7 @@ static int is_stack_running_on_compute_engine(void) { called once for the lifetime of the process by the default credentials. */ gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset)); while (!detector.is_done) { - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; grpc_pollset_work(&exec_ctx, &detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 0a80680f02..f9cb852722 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -50,7 +50,7 @@ #include <grpc/support/time.h> typedef struct { - grpc_pollset_worker *worker; + grpc_pollset_worker **worker; void *tag; } plucker; @@ -252,7 +252,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, pluck_worker = NULL; for (i = 0; i < cc->num_pluckers; i++) { if (cc->pluckers[i].tag == tag) { - pluck_worker = cc->pluckers[i].worker; + pluck_worker = *cc->pluckers[i].worker; break; } } @@ -275,7 +275,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline, void *reserved) { grpc_event ret; - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; int first_loop = 1; gpr_timespec now; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -348,7 +348,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, } static int add_plucker(grpc_completion_queue *cc, void *tag, - grpc_pollset_worker *worker) { + grpc_pollset_worker **worker) { if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } @@ -359,7 +359,7 @@ static int add_plucker(grpc_completion_queue *cc, void *tag, } static void del_plucker(grpc_completion_queue *cc, void *tag, - grpc_pollset_worker *worker) { + grpc_pollset_worker **worker) { int i; for (i = 0; i < cc->num_pluckers; i++) { if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) { @@ -376,7 +376,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; gpr_timespec now; int first_loop = 1; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |