diff options
author | 2015-09-16 16:15:47 -0700 | |
---|---|---|
committer | 2015-09-16 16:15:47 -0700 | |
commit | 3cd6a5158d388c6ae2d07bc78951c986dc4f41ea (patch) | |
tree | 3c8abac6cf7c3e7b8be75ad57d31a617e4f46c7b /src/core/iomgr | |
parent | 8f3addcc4d8822b68f3c2367353dac38eb56c38c (diff) |
Getting stuff working
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/fd_posix.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/workqueue.h | 15 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.c | 79 |
6 files changed, 97 insertions, 7 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 69518597d5..5bdce0bfd8 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -72,7 +72,7 @@ static gpr_mu fd_freelist_mu; static void freelist_fd(grpc_fd *fd) { if (fd->workqueue->wakeup_read_fd != fd) { - grpc_workqueue_unref(fd->workqueue); + GRPC_WORKQUEUE_UNREF(fd->workqueue, "fd"); } gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; @@ -167,7 +167,7 @@ grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name) { /* if the wakeup_read_fd is NULL, then the workqueue is under construction ==> this fd will be the wakeup_read_fd, and we shouldn't take a ref */ if (workqueue->wakeup_read_fd != NULL) { - grpc_workqueue_ref(workqueue); + GRPC_WORKQUEUE_REF(workqueue, "fd"); } grpc_iomgr_register_object(&r->iomgr_object, name); return r; diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 5d1fc68767..67eff3e528 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -126,8 +126,6 @@ void grpc_iomgr_shutdown(void) { } gpr_mu_unlock(&g_mu); - memset(&g_root_object, 0, sizeof(g_root_object)); - grpc_alarm_list_shutdown(); grpc_iomgr_platform_shutdown(); diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 02d37350f7..c6c716e4df 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -151,7 +151,7 @@ static void finish_shutdown(grpc_tcp_server *s) { gpr_mu_destroy(&s->mu); gpr_free(s->ports); - grpc_workqueue_unref(s->workqueue); + GRPC_WORKQUEUE_UNREF(s->workqueue, "destroy"); gpr_free(s); } diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index 96688054fb..30957f8dee 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -144,7 +144,7 @@ static void finish_shutdown(grpc_udp_server *s) { gpr_cv_destroy(&s->cv); gpr_free(s->ports); - grpc_workqueue_unref(s->workqueue); + GRPC_WORKQUEUE_UNREF(s->workqueue, "workqueue"); gpr_free(s); } diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h index 0bfa959953..a236651fbd 100644 --- a/src/core/iomgr/workqueue.h +++ b/src/core/iomgr/workqueue.h @@ -52,8 +52,23 @@ typedef struct grpc_workqueue grpc_workqueue; /** Create a work queue */ grpc_workqueue *grpc_workqueue_create(void); +void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously); + +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +#define GRPC_WORKQUEUE_REF(p, r) \ + grpc_workqueue_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_WORKQUEUE_UNREF(p, r) \ + grpc_workqueue_unref((p), __FILE__, __LINE__, (r)) +void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, + const char *reason); +void grpc_workqueue_unref(grpc_workqueue *workqueue, const char *file, int line, + const char *reason); +#else +#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p)) +#define GRPC_WORKQUEUE_UNREF(p, r) grpc_workqueue_unref((p)) void grpc_workqueue_ref(grpc_workqueue *workqueue); void grpc_workqueue_unref(grpc_workqueue *workqueue); +#endif /** Bind this workqueue to a pollset */ void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue, diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index 26626bef3b..ec3ce713b0 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -35,12 +35,15 @@ #ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/fd_posix.h" #include "src/core/iomgr/workqueue.h" #include <stdio.h> #include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/thd.h> + +#include "src/core/iomgr/fd_posix.h" static void on_readable(void *arg, int success); @@ -61,15 +64,81 @@ grpc_workqueue *grpc_workqueue_create(void) { return workqueue; } +static void shutdown_thread(void *arg) { + grpc_iomgr_closure *todo = arg; + + while (todo) { + grpc_iomgr_closure *next = todo->next; + todo->cb(todo->cb_arg, todo->success); + todo = next; + } +} + +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +static size_t count_waiting(grpc_workqueue *workqueue) { + size_t i = 0; + grpc_iomgr_closure *c; + for (c = workqueue->head.next; c; c = c->next) { + i++; + } + return i; +} +#endif + +void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously) { + grpc_iomgr_closure *todo; + gpr_thd_id thd; + + gpr_mu_lock(&workqueue->mu); +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG + gpr_log(GPR_DEBUG, "WORKQUEUE:%p flush %d objects %s", workqueue, + count_waiting(workqueue), + asynchronously ? "asynchronously" : "synchronously"); +#endif + todo = workqueue->head.next; + workqueue->head.next = NULL; + workqueue->tail = &workqueue->head; + gpr_mu_unlock(&workqueue->mu); + + if (todo != NULL) { + if (asynchronously) { + gpr_thd_new(&thd, shutdown_thread, todo, NULL); + } else { + while (todo) { + grpc_iomgr_closure *next = todo->next; + todo->cb(todo->cb_arg, todo->success); + todo = next; + } + } + } +} + static void workqueue_destroy(grpc_workqueue *workqueue) { + GPR_ASSERT(workqueue->tail == &workqueue->head); grpc_fd_shutdown(workqueue->wakeup_read_fd); } +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, + const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p ref %d -> %d %s", + workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1, + reason); +#else void grpc_workqueue_ref(grpc_workqueue *workqueue) { +#endif gpr_ref(&workqueue->refs); } +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +void grpc_workqueue_unref(grpc_workqueue *workqueue, const char *file, int line, + const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s", + workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1, + reason); +#else void grpc_workqueue_unref(grpc_workqueue *workqueue) { +#endif if (gpr_unref(&workqueue->refs)) { workqueue_destroy(workqueue); } @@ -94,6 +163,10 @@ static void on_readable(void *arg, int success) { return; } else { gpr_mu_lock(&workqueue->mu); +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG + gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue, + count_waiting(workqueue)); +#endif todo = workqueue->head.next; workqueue->head.next = NULL; workqueue->tail = &workqueue->head; @@ -119,6 +192,10 @@ void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure, } workqueue->tail->next = closure; workqueue->tail = closure; +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG + gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue, + count_waiting(workqueue)); +#endif gpr_mu_unlock(&workqueue->mu); } |