aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/iomgr/fd_posix.c6
-rw-r--r--src/core/iomgr/pollset.h15
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c9
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c12
-rw-r--r--src/core/iomgr/pollset_posix.c45
-rw-r--r--src/core/iomgr/pollset_posix.h14
-rw-r--r--src/core/iomgr/pollset_set_posix.c1
-rw-r--r--src/core/iomgr/pollset_set_posix.h1
-rw-r--r--src/core/iomgr/pollset_windows.h2
-rw-r--r--src/core/iomgr/tcp_posix.c5
-rw-r--r--src/core/iomgr/workqueue_posix.c1
-rw-r--r--src/core/iomgr/workqueue_posix.h2
-rw-r--r--src/core/security/google_default_credentials.c23
-rw-r--r--src/core/surface/completion_queue.c84
14 files changed, 118 insertions, 102 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 85eadd754b..812ff0992e 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -46,6 +46,8 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
+#include "src/core/iomgr/pollset_posix.h"
+
#define CLOSURE_NOT_READY ((grpc_closure *)0)
#define CLOSURE_READY ((grpc_closure *)1)
@@ -175,11 +177,11 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
}
static void pollset_kick_locked(grpc_fd_watcher *watcher) {
- gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset));
+ gpr_mu_lock(watcher->pollset->mu);
GPR_ASSERT(watcher->worker);
grpc_pollset_kick_ext(watcher->pollset, watcher->worker,
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
- gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset));
+ gpr_mu_unlock(watcher->pollset->mu);
}
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index 0c0efad760..dfbd4a40ec 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -35,8 +35,11 @@
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_H
#include <grpc/support/port_platform.h>
+#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include "src/core/iomgr/exec_ctx.h"
+
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
/* A grpc_pollset is a set of file descriptors that a higher level item is
@@ -46,15 +49,11 @@
- a completion queue might keep a pollset with an entry for each transport
that is servicing a call that it's tracking */
-#ifdef GPR_POSIX_SOCKET
-#include "src/core/iomgr/pollset_posix.h"
-#endif
-
-#ifdef GPR_WIN32
-#include "src/core/iomgr/pollset_windows.h"
-#endif
+typedef struct grpc_pollset grpc_pollset;
+typedef struct grpc_pollset_worker grpc_pollset_worker;
-void grpc_pollset_init(grpc_pollset *pollset);
+size_t grpc_pollset_size(void);
+void grpc_pollset_init(grpc_pollset *pollset, gpr_mu *mu);
/* Begin shutting down the pollset, and call closure when done.
* GRPC_POLLSET_MU(pollset) must be held */
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 4acae2bb71..e1af2b5241 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -45,6 +45,7 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/pollset_posix.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/block_annotate.h"
@@ -148,7 +149,7 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
finally_add_fd(exec_ctx, da->pollset, da->fd);
}
- gpr_mu_lock(&da->pollset->mu);
+ gpr_mu_lock(da->pollset->mu);
da->pollset->in_flight_cbs--;
if (da->pollset->shutting_down) {
/* We don't care about this pollset anymore. */
@@ -157,7 +158,7 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL);
}
}
- gpr_mu_unlock(&da->pollset->mu);
+ gpr_mu_unlock(da->pollset->mu);
GRPC_FD_UNREF(da->fd, "delayed_add");
@@ -169,7 +170,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
grpc_fd *fd,
int and_unlock_pollset) {
if (and_unlock_pollset) {
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(pollset->mu);
finally_add_fd(exec_ctx, pollset, fd);
} else {
delayed_add *da = gpr_malloc(sizeof(*da));
@@ -201,7 +202,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
* here.
*/
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(pollset->mu);
timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 809f8f39da..348d339104 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -42,13 +42,15 @@
#include <stdlib.h>
#include <string.h>
-#include "src/core/iomgr/fd_posix.h"
-#include "src/core/iomgr/iomgr_internal.h"
-#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
+#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/pollset_posix.h"
+#include "src/core/support/block_annotate.h"
+
typedef struct {
/* all polled fds */
size_t fd_count;
@@ -78,7 +80,7 @@ static void multipoll_with_poll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
GRPC_FD_REF(fd, "multipoller");
exit:
if (and_unlock_pollset) {
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(pollset->mu);
}
}
@@ -130,7 +132,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
}
h->del_count = 0;
h->fd_count = fd_count;
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(pollset->mu);
for (i = 2; i < pfd_count; i++) {
pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, worker,
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index ee7e9f48f4..63321638ba 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -42,16 +42,16 @@
#include <string.h>
#include <unistd.h>
-#include "src/core/iomgr/fd_posix.h"
-#include "src/core/iomgr/iomgr_internal.h"
-#include "src/core/iomgr/socket_utils_posix.h"
-#include "src/core/profiling/timers.h"
-#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/tls.h>
#include <grpc/support/useful.h>
+#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/socket_utils_posix.h"
+#include "src/core/profiling/timers.h"
+#include "src/core/support/block_annotate.h"
GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker);
@@ -97,6 +97,8 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next->prev = worker;
}
+size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); }
+
void grpc_pollset_kick_ext(grpc_pollset *p,
grpc_pollset_worker *specific_worker,
uint32_t flags) {
@@ -186,8 +188,8 @@ void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
-void grpc_pollset_init(grpc_pollset *pollset) {
- gpr_mu_init(&pollset->mu);
+void grpc_pollset_init(grpc_pollset *pollset, gpr_mu *mu) {
+ pollset->mu = mu;
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
@@ -204,7 +206,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(!grpc_pollset_has_workers(pollset));
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
pollset->vtable->destroy(pollset);
- gpr_mu_destroy(&pollset->mu);
while (pollset->local_wakeup_cache) {
grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
@@ -227,15 +228,15 @@ void grpc_pollset_reset(grpc_pollset *pollset) {
void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {
- gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(pollset->mu);
pollset->vtable->add_fd(exec_ctx, pollset, fd, 1);
/* the following (enabled only in debug) will reacquire and then release
our lock - meaning that if the unlocking flag passed to add_fd above is
not respected, the code will deadlock (in a way that we have a chance of
debugging) */
#ifndef NDEBUG
- gpr_mu_lock(&pollset->mu);
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_lock(pollset->mu);
+ gpr_mu_unlock(pollset->mu);
#endif
}
@@ -284,7 +285,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* Give do_promote priority so we don't starve it out */
if (pollset->in_flight_cbs) {
GPR_TIMER_MARK("grpc_pollset_work.in_flight_cbs", 0);
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(pollset->mu);
locked = 0;
goto done;
}
@@ -318,7 +319,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
done:
if (!locked) {
queued_work |= grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(pollset->mu);
locked = 1;
}
/* If we're forced to re-evaluate polling (via grpc_pollset_kick with
@@ -348,19 +349,19 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_kick(pollset, NULL);
} else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
pollset->called_shutdown = 1;
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(pollset->mu);
finish_shutdown(exec_ctx, pollset);
grpc_exec_ctx_flush(exec_ctx);
/* Continuing to access pollset here is safe -- it is the caller's
* responsibility to not destroy when it has outstanding calls to
* grpc_pollset_work.
* TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
- gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(pollset->mu);
} else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(pollset->mu);
grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(pollset->mu);
}
}
*worker_hdl = NULL;
@@ -428,7 +429,7 @@ static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args,
* 4. The pollset may be shutting down.
*/
- gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(pollset->mu);
/* First we need to ensure that nobody is polling concurrently */
GPR_ASSERT(!grpc_pollset_has_workers(pollset));
@@ -469,7 +470,7 @@ static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args,
}
}
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(pollset->mu);
/* Matching ref in basic_pollset_add_fd */
GRPC_FD_UNREF(fd, "basicpoll_add");
@@ -522,7 +523,7 @@ static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
exit:
if (and_unlock_pollset) {
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(pollset->mu);
}
}
@@ -558,14 +559,14 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
pfd[2].fd = fd->fd;
pfd[2].revents = 0;
GRPC_FD_REF(fd, "basicpoll_begin");
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(pollset->mu);
pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN,
POLLOUT, &fd_watcher);
if (pfd[2].events != 0) {
nfds++;
}
} else {
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(pollset->mu);
}
/* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index b34bb09426..58158e3d46 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -37,8 +37,10 @@
#include <poll.h>
#include <grpc/support/sync.h>
+
#include "src/core/iomgr/exec_ctx.h"
#include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/wakeup_fd_posix.h"
typedef struct grpc_pollset_vtable grpc_pollset_vtable;
@@ -53,21 +55,21 @@ typedef struct grpc_cached_wakeup_fd {
struct grpc_cached_wakeup_fd *next;
} grpc_cached_wakeup_fd;
-typedef struct grpc_pollset_worker {
+struct grpc_pollset_worker {
grpc_cached_wakeup_fd *wakeup_fd;
int reevaluate_polling_on_wakeup;
int kicked_specifically;
struct grpc_pollset_worker *next;
struct grpc_pollset_worker *prev;
-} grpc_pollset_worker;
+};
-typedef struct grpc_pollset {
+struct grpc_pollset {
/* pollsets under posix can mutate representation as fds are added and
removed.
For example, we may choose a poll() based implementation on linux for
few fds, and an epoll() based implementation for many fds */
const grpc_pollset_vtable *vtable;
- gpr_mu mu;
+ gpr_mu *mu;
grpc_pollset_worker root_worker;
int in_flight_cbs;
int shutting_down;
@@ -81,7 +83,7 @@ typedef struct grpc_pollset {
} data;
/* Local cache of eventfds for workers */
grpc_cached_wakeup_fd *local_wakeup_cache;
-} grpc_pollset;
+};
struct grpc_pollset_vtable {
void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -93,8 +95,6 @@ struct grpc_pollset_vtable {
void (*destroy)(grpc_pollset *pollset);
};
-#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
-
/* Add an fd to a pollset */
void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
struct grpc_fd *fd);
diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c
index 4ec92202e3..85a0cadfc7 100644
--- a/src/core/iomgr/pollset_set_posix.c
+++ b/src/core/iomgr/pollset_set_posix.c
@@ -41,6 +41,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/useful.h>
+#include "src/core/iomgr/pollset_posix.h"
#include "src/core/iomgr/pollset_set.h"
void grpc_pollset_set_init(grpc_pollset_set *pollset_set) {
diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h
index 4820a61e4b..7ce8ec7343 100644
--- a/src/core/iomgr/pollset_set_posix.h
+++ b/src/core/iomgr/pollset_set_posix.h
@@ -35,7 +35,6 @@
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H
#include "src/core/iomgr/fd_posix.h"
-#include "src/core/iomgr/pollset_posix.h"
typedef struct grpc_pollset_set {
gpr_mu mu;
diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h
index 65ba80619b..dfec5821b3 100644
--- a/src/core/iomgr/pollset_windows.h
+++ b/src/core/iomgr/pollset_windows.h
@@ -74,6 +74,4 @@ struct grpc_pollset {
extern gpr_mu grpc_polling_mu;
-#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu)
-
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 048e907441..fba3563427 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -40,8 +40,8 @@
#include <errno.h>
#include <stdlib.h>
#include <string.h>
-#include <sys/types.h>
#include <sys/socket.h>
+#include <sys/types.h>
#include <unistd.h>
#include <grpc/support/alloc.h>
@@ -51,9 +51,10 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
-#include "src/core/support/string.h"
#include "src/core/debug/trace.h"
+#include "src/core/iomgr/pollset_posix.h"
#include "src/core/profiling/timers.h"
+#include "src/core/support/string.h"
#ifdef GPR_HAVE_MSG_NOSIGNAL
#define SENDMSG_FLAGS MSG_NOSIGNAL
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index da11df67ef..c096dbfb30 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -44,6 +44,7 @@
#include <grpc/support/useful.h>
#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/pollset_posix.h"
static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success);
diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h
index 589034fe1b..6f29f4004c 100644
--- a/src/core/iomgr/workqueue_posix.h
+++ b/src/core/iomgr/workqueue_posix.h
@@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H
+#include "src/core/iomgr/wakeup_fd_posix.h"
+
struct grpc_fd;
struct grpc_workqueue {
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index cc9c958298..25006e16c7 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -58,7 +58,7 @@ static gpr_once g_once = GPR_ONCE_INIT;
static void init_default_credentials(void) { gpr_mu_init(&g_mu); }
typedef struct {
- grpc_pollset pollset;
+ grpc_pollset *pollset;
int is_done;
int success;
} compute_engine_detector;
@@ -80,10 +80,10 @@ static void on_compute_engine_detection_http_response(
}
}
}
- gpr_mu_lock(GRPC_POLLSET_MU(&detector->pollset));
+ gpr_mu_lock(&g_mu);
detector->is_done = 1;
- grpc_pollset_kick(&detector->pollset, NULL);
- gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset));
+ grpc_pollset_kick(detector->pollset, NULL);
+ gpr_mu_unlock(&g_mu);
}
static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool s) {
@@ -101,7 +101,8 @@ static int is_stack_running_on_compute_engine(void) {
on compute engine. */
gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN);
- grpc_pollset_init(&detector.pollset);
+ detector.pollset = gpr_malloc(grpc_pollset_size());
+ grpc_pollset_init(detector.pollset, &g_mu);
detector.is_done = 0;
detector.success = 0;
@@ -112,7 +113,7 @@ static int is_stack_running_on_compute_engine(void) {
grpc_httpcli_context_init(&context);
grpc_httpcli_get(
- &exec_ctx, &context, &detector.pollset, &request,
+ &exec_ctx, &context, detector.pollset, &request,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
on_compute_engine_detection_http_response, &detector);
@@ -120,20 +121,22 @@ static int is_stack_running_on_compute_engine(void) {
/* Block until we get the response. This is not ideal but this should only be
called once for the lifetime of the process by the default credentials. */
- gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
+ gpr_mu_lock(&g_mu);
while (!detector.is_done) {
grpc_pollset_worker *worker = NULL;
- grpc_pollset_work(&exec_ctx, &detector.pollset, &worker,
+ grpc_pollset_work(&exec_ctx, detector.pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
+ gpr_mu_unlock(&g_mu);
grpc_httpcli_context_destroy(&context);
grpc_closure_init(&destroy_closure, destroy_pollset, &detector.pollset);
- grpc_pollset_shutdown(&exec_ctx, &detector.pollset, &destroy_closure);
+ grpc_pollset_shutdown(&exec_ctx, detector.pollset, &destroy_closure);
grpc_exec_ctx_finish(&exec_ctx);
+ gpr_free(detector.pollset);
+
return detector.success;
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 376feb1bbe..d0659c7e52 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -36,18 +36,19 @@
#include <stdio.h>
#include <string.h>
-#include "src/core/iomgr/timer.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/atm.h>
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+
#include "src/core/iomgr/pollset.h"
+#include "src/core/iomgr/timer.h"
+#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/surface/api_trace.h"
#include "src/core/surface/call.h"
#include "src/core/surface/event_string.h"
#include "src/core/surface/surface_trace.h"
-#include "src/core/profiling/timers.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/atm.h>
-#include <grpc/support/log.h>
-#include <grpc/support/time.h>
typedef struct {
grpc_pollset_worker **worker;
@@ -56,6 +57,7 @@ typedef struct {
/* Completion queue structure */
struct grpc_completion_queue {
+ gpr_mu mu;
/** completed events */
grpc_cq_completion completed_head;
grpc_cq_completion *completed_tail;
@@ -63,8 +65,6 @@ struct grpc_completion_queue {
gpr_refcount pending_events;
/** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
- /** the set of low level i/o things that concern this cq */
- grpc_pollset pollset;
/** 0 initially, 1 once we've begun shutting down */
int shutdown;
int shutdown_called;
@@ -82,6 +82,8 @@ struct grpc_completion_queue {
grpc_completion_queue *next_free;
};
+#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
+
static gpr_mu g_freelist_mu;
grpc_completion_queue *g_freelist;
@@ -94,7 +96,8 @@ void grpc_cq_global_shutdown(void) {
gpr_mu_destroy(&g_freelist_mu);
while (g_freelist) {
grpc_completion_queue *next = g_freelist->next_free;
- grpc_pollset_destroy(&g_freelist->pollset);
+ grpc_pollset_destroy(POLLSET_FROM_CQ(g_freelist));
+ gpr_mu_destroy(&g_freelist->mu);
#ifndef NDEBUG
gpr_free(g_freelist->outstanding_tags);
#endif
@@ -124,8 +127,9 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
if (g_freelist == NULL) {
gpr_mu_unlock(&g_freelist_mu);
- cc = gpr_malloc(sizeof(grpc_completion_queue));
- grpc_pollset_init(&cc->pollset);
+ cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
+ gpr_mu_init(&cc->mu);
+ grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
#ifndef NDEBUG
cc->outstanding_tags = NULL;
cc->outstanding_tag_capacity = 0;
@@ -184,7 +188,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
if (gpr_unref(&cc->owning_refs)) {
GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head);
- grpc_pollset_reset(&cc->pollset);
+ grpc_pollset_reset(POLLSET_FROM_CQ(cc));
gpr_mu_lock(&g_freelist_mu);
cc->next_free = g_freelist;
g_freelist = cc;
@@ -194,7 +198,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
#ifndef NDEBUG
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&cc->mu);
GPR_ASSERT(!cc->shutdown_called);
if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) {
cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity);
@@ -203,7 +207,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
cc->outstanding_tag_capacity);
}
cc->outstanding_tags[cc->outstanding_tag_count++] = tag;
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&cc->mu);
#endif
gpr_ref(&cc->pending_events);
}
@@ -231,7 +235,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
storage->next =
((uintptr_t)&cc->completed_head) | ((uintptr_t)(success != 0));
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&cc->mu);
#ifndef NDEBUG
for (i = 0; i < (int)cc->outstanding_tag_count; i++) {
if (cc->outstanding_tags[i] == tag) {
@@ -256,8 +260,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
break;
}
}
- grpc_pollset_kick(&cc->pollset, pluck_worker);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker);
+ gpr_mu_unlock(&cc->mu);
} else {
cc->completed_tail->next =
((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
@@ -265,8 +269,9 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
- grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_shutdown_done);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
+ &cc->pollset_shutdown_done);
+ gpr_mu_unlock(&cc->mu);
}
GPR_TIMER_END("grpc_cq_end_op", 0);
@@ -294,7 +299,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "next");
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&cc->mu);
for (;;) {
if (cc->completed_tail != &cc->completed_head) {
grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
@@ -302,7 +307,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
if (c == cc->completed_tail) {
cc->completed_tail = &cc->completed_head;
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&cc->mu);
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
@@ -310,14 +315,14 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
break;
}
if (cc->shutdown) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
@@ -330,12 +335,12 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec iteration_deadline = deadline;
if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
GPR_TIMER_MARK("alarm_triggered", 0);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&cc->mu);
grpc_exec_ctx_flush(&exec_ctx);
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&cc->mu);
continue;
}
- grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now,
+ grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now,
iteration_deadline);
}
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
@@ -395,7 +400,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "pluck");
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&cc->mu);
for (;;) {
prev = &cc->completed_head;
while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
@@ -405,7 +410,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
if (c == cc->completed_tail) {
cc->completed_tail = prev;
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&cc->mu);
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
@@ -415,7 +420,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
prev = c;
}
if (cc->shutdown) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
@@ -425,7 +430,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
"Too many outstanding grpc_completion_queue_pluck calls: maximum "
"is %d",
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&cc->mu);
memset(&ret, 0, sizeof(ret));
/* TODO(ctiller): should we use a different result here */
ret.type = GRPC_QUEUE_TIMEOUT;
@@ -434,7 +439,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
del_plucker(cc, tag, &worker);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
@@ -447,12 +452,12 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec iteration_deadline = deadline;
if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
GPR_TIMER_MARK("alarm_triggered", 0);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&cc->mu);
grpc_exec_ctx_flush(&exec_ctx);
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&cc->mu);
continue;
}
- grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now,
+ grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now,
iteration_deadline);
del_plucker(cc, tag, &worker);
}
@@ -472,9 +477,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&cc->mu);
if (cc->shutdown_called) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&cc->mu);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
@@ -482,9 +487,10 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cc->pending_events)) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_shutdown_done);
+ grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc),
+ &cc->pollset_shutdown_done);
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&cc->mu);
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
}
@@ -498,7 +504,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
}
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
- return &cc->pollset;
+ return POLLSET_FROM_CQ(cc);
}
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }