aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-16 16:15:47 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-16 16:15:47 -0700
commit3cd6a5158d388c6ae2d07bc78951c986dc4f41ea (patch)
tree3c8abac6cf7c3e7b8be75ad57d31a617e4f46c7b /src/core/iomgr
parent8f3addcc4d8822b68f3c2367353dac38eb56c38c (diff)
Getting stuff working
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/fd_posix.c4
-rw-r--r--src/core/iomgr/iomgr.c2
-rw-r--r--src/core/iomgr/tcp_server_posix.c2
-rw-r--r--src/core/iomgr/udp_server.c2
-rw-r--r--src/core/iomgr/workqueue.h15
-rw-r--r--src/core/iomgr/workqueue_posix.c79
6 files changed, 97 insertions, 7 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 69518597d5..5bdce0bfd8 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -72,7 +72,7 @@ static gpr_mu fd_freelist_mu;
static void freelist_fd(grpc_fd *fd) {
if (fd->workqueue->wakeup_read_fd != fd) {
- grpc_workqueue_unref(fd->workqueue);
+ GRPC_WORKQUEUE_UNREF(fd->workqueue, "fd");
}
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
@@ -167,7 +167,7 @@ grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name) {
/* if the wakeup_read_fd is NULL, then the workqueue is under construction
==> this fd will be the wakeup_read_fd, and we shouldn't take a ref */
if (workqueue->wakeup_read_fd != NULL) {
- grpc_workqueue_ref(workqueue);
+ GRPC_WORKQUEUE_REF(workqueue, "fd");
}
grpc_iomgr_register_object(&r->iomgr_object, name);
return r;
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index 5d1fc68767..67eff3e528 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -126,8 +126,6 @@ void grpc_iomgr_shutdown(void) {
}
gpr_mu_unlock(&g_mu);
- memset(&g_root_object, 0, sizeof(g_root_object));
-
grpc_alarm_list_shutdown();
grpc_iomgr_platform_shutdown();
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 02d37350f7..c6c716e4df 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -151,7 +151,7 @@ static void finish_shutdown(grpc_tcp_server *s) {
gpr_mu_destroy(&s->mu);
gpr_free(s->ports);
- grpc_workqueue_unref(s->workqueue);
+ GRPC_WORKQUEUE_UNREF(s->workqueue, "destroy");
gpr_free(s);
}
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index 96688054fb..30957f8dee 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -144,7 +144,7 @@ static void finish_shutdown(grpc_udp_server *s) {
gpr_cv_destroy(&s->cv);
gpr_free(s->ports);
- grpc_workqueue_unref(s->workqueue);
+ GRPC_WORKQUEUE_UNREF(s->workqueue, "workqueue");
gpr_free(s);
}
diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h
index 0bfa959953..a236651fbd 100644
--- a/src/core/iomgr/workqueue.h
+++ b/src/core/iomgr/workqueue.h
@@ -52,8 +52,23 @@ typedef struct grpc_workqueue grpc_workqueue;
/** Create a work queue */
grpc_workqueue *grpc_workqueue_create(void);
+void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously);
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+#define GRPC_WORKQUEUE_REF(p, r) \
+ grpc_workqueue_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_WORKQUEUE_UNREF(p, r) \
+ grpc_workqueue_unref((p), __FILE__, __LINE__, (r))
+void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
+ const char *reason);
+void grpc_workqueue_unref(grpc_workqueue *workqueue, const char *file, int line,
+ const char *reason);
+#else
+#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p))
+#define GRPC_WORKQUEUE_UNREF(p, r) grpc_workqueue_unref((p))
void grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_workqueue *workqueue);
+#endif
/** Bind this workqueue to a pollset */
void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index 26626bef3b..ec3ce713b0 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -35,12 +35,15 @@
#ifdef GPR_POSIX_SOCKET
-#include "src/core/iomgr/fd_posix.h"
#include "src/core/iomgr/workqueue.h"
#include <stdio.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/thd.h>
+
+#include "src/core/iomgr/fd_posix.h"
static void on_readable(void *arg, int success);
@@ -61,15 +64,81 @@ grpc_workqueue *grpc_workqueue_create(void) {
return workqueue;
}
+static void shutdown_thread(void *arg) {
+ grpc_iomgr_closure *todo = arg;
+
+ while (todo) {
+ grpc_iomgr_closure *next = todo->next;
+ todo->cb(todo->cb_arg, todo->success);
+ todo = next;
+ }
+}
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+static size_t count_waiting(grpc_workqueue *workqueue) {
+ size_t i = 0;
+ grpc_iomgr_closure *c;
+ for (c = workqueue->head.next; c; c = c->next) {
+ i++;
+ }
+ return i;
+}
+#endif
+
+void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously) {
+ grpc_iomgr_closure *todo;
+ gpr_thd_id thd;
+
+ gpr_mu_lock(&workqueue->mu);
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+ gpr_log(GPR_DEBUG, "WORKQUEUE:%p flush %d objects %s", workqueue,
+ count_waiting(workqueue),
+ asynchronously ? "asynchronously" : "synchronously");
+#endif
+ todo = workqueue->head.next;
+ workqueue->head.next = NULL;
+ workqueue->tail = &workqueue->head;
+ gpr_mu_unlock(&workqueue->mu);
+
+ if (todo != NULL) {
+ if (asynchronously) {
+ gpr_thd_new(&thd, shutdown_thread, todo, NULL);
+ } else {
+ while (todo) {
+ grpc_iomgr_closure *next = todo->next;
+ todo->cb(todo->cb_arg, todo->success);
+ todo = next;
+ }
+ }
+ }
+}
+
static void workqueue_destroy(grpc_workqueue *workqueue) {
+ GPR_ASSERT(workqueue->tail == &workqueue->head);
grpc_fd_shutdown(workqueue->wakeup_read_fd);
}
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
+ const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p ref %d -> %d %s",
+ workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1,
+ reason);
+#else
void grpc_workqueue_ref(grpc_workqueue *workqueue) {
+#endif
gpr_ref(&workqueue->refs);
}
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+void grpc_workqueue_unref(grpc_workqueue *workqueue, const char *file, int line,
+ const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s",
+ workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1,
+ reason);
+#else
void grpc_workqueue_unref(grpc_workqueue *workqueue) {
+#endif
if (gpr_unref(&workqueue->refs)) {
workqueue_destroy(workqueue);
}
@@ -94,6 +163,10 @@ static void on_readable(void *arg, int success) {
return;
} else {
gpr_mu_lock(&workqueue->mu);
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+ gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue,
+ count_waiting(workqueue));
+#endif
todo = workqueue->head.next;
workqueue->head.next = NULL;
workqueue->tail = &workqueue->head;
@@ -119,6 +192,10 @@ void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure,
}
workqueue->tail->next = closure;
workqueue->tail = closure;
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+ gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue,
+ count_waiting(workqueue));
+#endif
gpr_mu_unlock(&workqueue->mu);
}