aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-30 16:14:04 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-30 16:14:04 -0700
commit561300fba09145fe0b2a0731c480815bea3dacfd (patch)
treeb3bde176094deaa599b90a09bf49fde12cd33795
parent0c1cea106491cedb3ad462d9054ce1ece81c0414 (diff)
parentd1885e0ba90bcd3ed89acb0c51702fec006546c5 (diff)
Merge branch 'plucking-hell' of github.com:ctiller/grpc into plucking-hell
-rw-r--r--src/core/iomgr/pollset_windows.c83
-rw-r--r--src/core/iomgr/pollset_windows.h9
2 files changed, 82 insertions, 10 deletions
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index a9c4739c7c..22dc5891c3 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -42,6 +42,38 @@
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h"
+static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev->next = worker->next;
+ worker->next->prev = worker->prev;
+}
+
+static int has_workers(grpc_pollset *p) {
+ return p->root_worker.next != &p->root_worker;
+}
+
+static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
+ if (has_workers(p)) {
+ grpc_pollset_worker *w = p->root_worker.next;
+ remove_worker(p, w);
+ return w;
+ }
+ else {
+ return NULL;
+ }
+}
+
+static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->next = &p->root_worker;
+ worker->prev = worker->next->prev;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev = &p->root_worker;
+ worker->next = worker->prev->next;
+ worker->prev->next = worker->next->prev = worker;
+}
+
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. We're still going to provide a minimal
set of features for the sake of the rest of grpc. But grpc_pollset_work
@@ -50,7 +82,8 @@
void grpc_pollset_init(grpc_pollset *pollset) {
memset(pollset, 0, sizeof(*pollset));
gpr_mu_init(&pollset->mu);
- gpr_cv_init(&pollset->cv);
+ pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
+ pollset->kicked_without_pollers = 0;
}
void grpc_pollset_shutdown(grpc_pollset *pollset,
@@ -58,34 +91,66 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
void *shutdown_done_arg) {
gpr_mu_lock(&pollset->mu);
pollset->shutting_down = 1;
- gpr_cv_broadcast(&pollset->cv);
+ grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
gpr_mu_unlock(&pollset->mu);
shutdown_done(shutdown_done_arg);
}
void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
- gpr_cv_destroy(&pollset->cv);
}
-int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
+int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline) {
gpr_timespec now;
+ int added_worker = 0;
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}
+ worker->next = worker->prev = NULL;
+ gpr_cv_init(&worker->cv);
if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
- return 1 /* GPR_TRUE */;
+ goto done;
}
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
- return 1 /* GPR_TRUE */;
+ goto done;
}
- if (!pollset->shutting_down) {
- gpr_cv_wait(&pollset->cv, &pollset->mu, deadline);
+ if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
+ push_front_worker(pollset, worker);
+ added_worker = 1;
+ gpr_cv_wait(&worker->cv, &pollset->mu, deadline);
+ } else {
+ pollset->kicked_without_pollers = 0;
+ }
+done:
+ gpr_cv_destroy(&worker->cv);
+ if (added_worker) {
+ remove_worker(pollset, worker);
}
return 1 /* GPR_TRUE */;
}
-void grpc_pollset_kick(grpc_pollset *p) { gpr_cv_signal(&p->cv); }
+void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+ if (specific_worker != NULL) {
+ if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+ for (specific_worker = p->root_worker.next;
+ specific_worker != &p->root_worker;
+ specific_worker = specific_worker->next) {
+ gpr_cv_signal(&specific_worker->cv);
+ }
+ p->kicked_without_pollers = 1;
+ } else {
+ gpr_cv_signal(&specific_worker->cv);
+ }
+ } else {
+ specific_worker = pop_front_worker(p);
+ if (specific_worker != NULL) {
+ push_back_worker(p, specific_worker);
+ gpr_cv_signal(&specific_worker->cv);
+ } else {
+ p->kicked_without_pollers = 1;
+ }
+ }
+}
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h
index c9b8d3f374..8b51006c74 100644
--- a/src/core/iomgr/pollset_windows.h
+++ b/src/core/iomgr/pollset_windows.h
@@ -42,10 +42,17 @@
nature of the IO completion ports. A Windows "pollset" is merely a mutex
and a condition variable, used to synchronize with the IOCP. */
+typedef struct grpc_pollset_worker {
+ gpr_cv cv;
+ struct grpc_pollset_worker *next;
+ struct grpc_pollset_worker *prev;
+} grpc_pollset_worker;
+
typedef struct grpc_pollset {
gpr_mu mu;
- gpr_cv cv;
int shutting_down;
+ int kicked_without_pollers;
+ grpc_pollset_worker root_worker;
} grpc_pollset;
#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)