aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/iomgr/error.h4
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c338
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c5
-rw-r--r--src/core/lib/iomgr/socket_utils_windows.c4
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c14
5 files changed, 267 insertions, 98 deletions
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index ffacdac393..2613512acb 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -137,8 +137,8 @@ typedef enum {
} grpc_error_times;
/// The following "special" errors can be propagated without allocating memory.
-/// They are always even so that other code (particularly combiner locks) can
-/// safely use the lower bit for themselves.
+/// They are always even so that other code (particularly combiner locks,
+/// polling engines) can safely use the lower bit for themselves.
#define GRPC_ERROR_NONE ((grpc_error *)NULL)
#define GRPC_ERROR_OOM ((grpc_error *)2)
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index fac3705142..11208b9ad1 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -68,7 +68,7 @@ static int grpc_polling_trace = 0; /* Disabled by default */
gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
}
-/* Uncomment the following enable extra checks on poll_object operations */
+/* Uncomment the following to enable extra checks on poll_object operations */
/* #define PO_DEBUG */
static int grpc_wakeup_signal = -1;
@@ -140,24 +140,61 @@ struct grpc_fd {
Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst;
- /* Indicates that the fd is shutdown and that any pending read/write closures
- should fail */
- bool shutdown;
- grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */
+ /* Internally stores data of type (grpc_error *). If the FD is shutdown, this
+ contains reason for shutdown (i.e a pointer to grpc_error) ORed with
+ FD_SHUTDOWN_BIT. Since address allocations are word-aligned, the lower bit
+ of (grpc_error *) addresses is guaranteed to be zero. Even if the
+ (grpc_error *), is of special types like GRPC_ERROR_NONE, GRPC_ERROR_OOM
+ etc, the lower bit is guaranteed to be zero.
- /* The fd is either closed or we relinquished control of it. In either cases,
- this indicates that the 'fd' on this structure is no longer valid */
+ Once an fd is shutdown, any pending or future read/write closures on the
+ fd should fail */
+ gpr_atm shutdown_error;
+
+ /* The fd is either closed or we relinquished control of it. In either
+ cases, this indicates that the 'fd' on this structure is no longer
+ valid */
bool orphaned;
- /* TODO: sreek - Move this to a lockfree implementation */
- grpc_closure *read_closure;
- grpc_closure *write_closure;
+ /* Closures to call when the fd is readable or writable respectively. These
+ fields contain one of the following values:
+ CLOSURE_READY : The fd has an I/O event of interest but there is no
+ closure yet to execute
+
+ CLOSURE_NOT_READY : The fd has no I/O event of interest
+
+ closure ptr : The closure to be executed when the fd has an I/O
+ event of interest
+
+ shutdown_error | FD_SHUTDOWN_BIT :
+ 'shutdown_error' field ORed with FD_SHUTDOWN_BIT.
+ This indicates that the fd is shutdown. Since all
+ memory allocations are word-aligned, the lower two
+ bits of the shutdown_error pointer are always 0. So
+ it is safe to OR these with FD_SHUTDOWN_BIT
+
+ Valid state transitions:
+
+ <closure ptr> <-----3------ CLOSURE_NOT_READY ----1----> CLOSURE_READY
+ | | ^ | ^ | |
+ | | | | | | |
+ | +--------------4----------+ 6 +---------2---------------+ |
+ | | |
+ | v |
+ +-----5-------> [shutdown_error | FD_SHUTDOWN_BIT] <----7---------+
+
+ For 1, 4 : See set_ready() function
+ For 2, 3 : See notify_on() function
+ For 5,6,7: See set_shutdown() function */
+ gpr_atm read_closure;
+ gpr_atm write_closure;
struct grpc_fd *freelist_next;
grpc_closure *on_done_closure;
- /* The pollset that last noticed that the fd is readable */
- grpc_pollset *read_notifier_pollset;
+ /* The pollset that last noticed that the fd is readable. The actual type
+ * stored in this is (grpc_pollset *) */
+ gpr_atm read_notifier_pollset;
grpc_iomgr_object iomgr_object;
};
@@ -180,8 +217,10 @@ static void fd_unref(grpc_fd *fd);
static void fd_global_init(void);
static void fd_global_shutdown(void);
-#define CLOSURE_NOT_READY ((grpc_closure *)0)
-#define CLOSURE_READY ((grpc_closure *)1)
+#define CLOSURE_NOT_READY ((gpr_atm)0)
+#define CLOSURE_READY ((gpr_atm)2)
+
+#define FD_SHUTDOWN_BIT 1
/*******************************************************************************
* Polling island Declarations
@@ -908,7 +947,11 @@ static void unref_by(grpc_fd *fd, int n) {
fd->freelist_next = fd_freelist;
fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object);
- if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
+
+ grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error);
+ /* Clear the least significant bit if it set (in case fd was shutdown) */
+ err = (grpc_error *)((intptr_t)err & ~FD_SHUTDOWN_BIT);
+ GRPC_ERROR_UNREF(err);
gpr_mu_unlock(&fd_freelist_mu);
} else {
@@ -972,13 +1015,14 @@ static grpc_fd *fd_create(int fd, const char *name) {
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
- new_fd->shutdown = false;
+ gpr_atm_no_barrier_store(&new_fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE);
new_fd->orphaned = false;
- new_fd->read_closure = CLOSURE_NOT_READY;
- new_fd->write_closure = CLOSURE_NOT_READY;
+ gpr_atm_no_barrier_store(&new_fd->read_closure, CLOSURE_NOT_READY);
+ gpr_atm_no_barrier_store(&new_fd->write_closure, CLOSURE_NOT_READY);
+ gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
+
new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL;
- new_fd->read_notifier_pollset = NULL;
gpr_mu_unlock(&new_fd->po.mu);
@@ -1060,101 +1104,206 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
GRPC_ERROR_UNREF(error);
}
-static grpc_error *fd_shutdown_error(grpc_fd *fd) {
- if (!fd->shutdown) {
- return GRPC_ERROR_NONE;
- } else {
- return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
+static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
+ grpc_closure *closure) {
+ while (true) {
+ /* Fast-path: CLOSURE_NOT_READY -> <closure>.
+ The 'release' cas here matches the 'acquire' load in set_ready and
+ set_shutdown ensuring that the closure (scheduled by set_ready or
+ set_shutdown) happens-after the I/O event on the fd */
+ if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
+ return; /* Fast-path successful. Return */
+ }
+
+ /* Slowpath. The 'acquire' load matches the 'release' cas in set_ready and
+ set_shutdown */
+ gpr_atm curr = gpr_atm_acq_load(state);
+ switch (curr) {
+ case CLOSURE_NOT_READY: {
+ break; /* retry */
+ }
+
+ case CLOSURE_READY: {
+ /* Change the state to CLOSURE_NOT_READY. Schedule the closure if
+ successful. If not, the state most likely transitioned to shutdown.
+ We should retry.
+
+ This can be a no-barrier cas since the state is being transitioned to
+ CLOSURE_NOT_READY; set_ready and set_shutdown do not schedule any
+ closure when transitioning out of CLOSURE_NO_READY state (i.e there
+ is no other code that needs to 'happen-after' this) */
+ if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ return; /* Slow-path successful. Return */
+ }
+
+ break; /* retry */
+ }
+
+ default: {
+ /* 'curr' is either a closure or the fd is shutdown(in which case 'curr'
+ contains a pointer to the shutdown-error). If the fd is shutdown,
+ schedule the closure with the shutdown error */
+ if ((curr & FD_SHUTDOWN_BIT) > 0) {
+ grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT);
+ grpc_closure_sched(
+ exec_ctx, closure,
+ GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1));
+ return;
+ }
+
+ /* There is already a closure!. This indicates a bug in the code */
+ gpr_log(GPR_ERROR,
+ "notify_on called with a previous callback still pending");
+ abort();
+ }
+ }
}
+
+ GPR_UNREACHABLE_CODE(return );
}
-static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure **st, grpc_closure *closure) {
- if (fd->shutdown) {
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
- } else if (*st == CLOSURE_NOT_READY) {
- /* not ready ==> switch to a waiting state by setting the closure */
- *st = closure;
- } else if (*st == CLOSURE_READY) {
- /* already ready ==> queue the closure to run immediately */
- *st = CLOSURE_NOT_READY;
- grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
- } else {
- /* upcallptr was set to a different closure. This is an error! */
- gpr_log(GPR_ERROR,
- "User called a notify_on function with a previous callback still "
- "pending");
- abort();
+static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
+ grpc_error *shutdown_err) {
+ /* Try the fast-path first (i.e expect the current value to be
+ CLOSURE_NOT_READY */
+ gpr_atm curr = CLOSURE_NOT_READY;
+ gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT;
+
+ while (true) {
+ /* The 'release' cas here matches the 'acquire' load in notify_on to ensure
+ that the closure it schedules 'happens-after' the set_shutdown is called
+ on the fd */
+ if (gpr_atm_rel_cas(state, curr, new_state)) {
+ return; /* Fast-path successful. Return */
+ }
+
+ /* Fallback to slowpath. This 'acquire' load matches the 'release' cas in
+ notify_on and set_ready */
+ curr = gpr_atm_acq_load(state);
+ switch (curr) {
+ case CLOSURE_READY: {
+ break; /* retry */
+ }
+
+ case CLOSURE_NOT_READY: {
+ break; /* retry */
+ }
+
+ default: {
+ /* 'curr' is either a closure or the fd is already shutdown */
+
+ /* If fd is already shutdown, we are done */
+ if ((curr & FD_SHUTDOWN_BIT) > 0) {
+ return;
+ }
+
+ /* Fd is not shutdown. Schedule the closure and move the state to
+ shutdown state. The 'release' cas here matches the 'acquire' load in
+ notify_on to ensure that the closure it schedules 'happens-after'
+ the set_shutdown is called on the fd */
+ if (gpr_atm_rel_cas(state, curr, new_state)) {
+ grpc_closure_sched(
+ exec_ctx, (grpc_closure *)curr,
+ GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1));
+ return;
+ }
+
+ /* 'curr' was a closure but now changed to a different state. We will
+ have to retry */
+ break;
+ }
+ }
}
+
+ GPR_UNREACHABLE_CODE(return );
}
-/* returns 1 if state becomes not ready */
-static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure **st) {
- if (*st == CLOSURE_READY) {
- /* duplicate ready ==> ignore */
- return 0;
- } else if (*st == CLOSURE_NOT_READY) {
- /* not ready, and not waiting ==> flag ready */
- *st = CLOSURE_READY;
- return 0;
- } else {
- /* waiting ==> queue closure */
- grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
- *st = CLOSURE_NOT_READY;
- return 1;
+static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) {
+ /* Try an optimistic case first (i.e assume current state is
+ CLOSURE_NOT_READY).
+
+ This 'release' cas matches the 'acquire' load in notify_on ensuring that
+ any closure (scheduled by notify_on) 'happens-after' the return from
+ epoll_pwait */
+ if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
+ return; /* early out */
+ }
+
+ /* The 'acquire' load here matches the 'release' cas in notify_on and
+ set_shutdown */
+ gpr_atm curr = gpr_atm_acq_load(state);
+ switch (curr) {
+ case CLOSURE_READY: {
+ /* Already ready. We are done here */
+ break;
+ }
+
+ case CLOSURE_NOT_READY: {
+ /* The state was not CLOSURE_NOT_READY when we checked initially at the
+ beginning of this function but now it is CLOSURE_NOT_READY again.
+ This is only possible if the state transitioned out of
+ CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then
+ back to CLOSURE_NOT_READY again (i.e after we entered this function,
+ the fd became "ready" and the necessary actions were already done).
+ So there is no need to make the state CLOSURE_READY now */
+ break;
+ }
+
+ default: {
+ /* 'curr' is either a closure or the fd is shutdown */
+ if ((curr & FD_SHUTDOWN_BIT) > 0) {
+ /* The fd is shutdown. Do nothing */
+ } else if (gpr_atm_no_barrier_cas(state, curr, CLOSURE_NOT_READY)) {
+ /* The cas above was no-barrier since the state is being transitioned to
+ CLOSURE_NOT_READY; notify_on and set_shutdown do not schedule any
+ closures when transitioning out of CLOSURE_NO_READY state (i.e there
+ is no other code that needs to 'happen-after' this) */
+
+ grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
+ }
+ /* else the state changed again (only possible by either a racing
+ set_ready or set_shutdown functions. In both these cases, the closure
+ would have been scheduled for execution. So we are done here */
+ break;
+ }
}
}
static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
grpc_fd *fd) {
- grpc_pollset *notifier = NULL;
-
- gpr_mu_lock(&fd->po.mu);
- notifier = fd->read_notifier_pollset;
- gpr_mu_unlock(&fd->po.mu);
-
- return notifier;
+ gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
+ return (grpc_pollset *)notifier;
}
static bool fd_is_shutdown(grpc_fd *fd) {
- gpr_mu_lock(&fd->po.mu);
- const bool r = fd->shutdown;
- gpr_mu_unlock(&fd->po.mu);
- return r;
+ grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error);
+ return (((intptr_t)err & FD_SHUTDOWN_BIT) > 0);
}
/* Might be called multiple times */
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
- gpr_mu_lock(&fd->po.mu);
- /* Do the actual shutdown only once */
- if (!fd->shutdown) {
- fd->shutdown = true;
- fd->shutdown_error = why;
-
+ /* Store the shutdown error ORed with FD_SHUTDOWN_BIT in fd->shutdown_error */
+ if (gpr_atm_rel_cas(&fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE,
+ (gpr_atm)why | FD_SHUTDOWN_BIT)) {
shutdown(fd->fd, SHUT_RDWR);
- /* Flush any pending read and write closures. Since fd->shutdown is 'true'
- at this point, the closures would be called with 'success = false' */
- set_ready_locked(exec_ctx, fd, &fd->read_closure);
- set_ready_locked(exec_ctx, fd, &fd->write_closure);
+
+ set_shutdown(exec_ctx, fd, &fd->read_closure, why);
+ set_shutdown(exec_ctx, fd, &fd->write_closure, why);
} else {
+ /* Shutdown already called */
GRPC_ERROR_UNREF(why);
}
- gpr_mu_unlock(&fd->po.mu);
}
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- gpr_mu_lock(&fd->po.mu);
- notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
- gpr_mu_unlock(&fd->po.mu);
+ notify_on(exec_ctx, fd, &fd->read_closure, closure);
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- gpr_mu_lock(&fd->po.mu);
- notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
- gpr_mu_unlock(&fd->po.mu);
+ notify_on(exec_ctx, fd, &fd->write_closure, closure);
}
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
@@ -1343,18 +1492,19 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- /* Need the fd->po.mu since we might be racing with fd_notify_on_read */
- gpr_mu_lock(&fd->po.mu);
- set_ready_locked(exec_ctx, fd, &fd->read_closure);
- fd->read_notifier_pollset = notifier;
- gpr_mu_unlock(&fd->po.mu);
+ set_ready(exec_ctx, fd, &fd->read_closure);
+
+ /* Note, it is possible that fd_become_readable might be called twice with
+ different 'notifier's when an fd becomes readable and it is in two epoll
+ sets (This can happen briefly during polling island merges). In such cases
+ it does not really matter which notifer is set as the read_notifier_pollset
+ (They would both point to the same polling island anyway) */
+ /* Use release store to match with acquire load in fd_get_read_notifier */
+ gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- /* Need the fd->po.mu since we might be racing with fd_notify_on_write */
- gpr_mu_lock(&fd->po.mu);
- set_ready_locked(exec_ctx, fd, &fd->write_closure);
- gpr_mu_unlock(&fd->po.mu);
+ set_ready(exec_ctx, fd, &fd->write_closure);
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
@@ -1737,7 +1887,7 @@ retry:
(void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
(void *)bag);
/* No need to lock 'pi_new' here since this is a new polling island
- * and no one has a reference to it yet */
+ and no one has a reference to it yet */
polling_island_remove_all_fds_locked(pi_new, true, &error);
/* Ref and unref so that the polling island gets deleted during unref
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index 9b5f3209f0..79ff910738 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -113,14 +113,15 @@ static grpc_error *try_split_host_port(const char *name,
/* parse name, splitting it into host and port parts */
grpc_error *error;
gpr_split_host_port(name, host, port);
- if (host == NULL) {
+ if (*host == NULL) {
char *msg;
gpr_asprintf(&msg, "unparseable host:port: '%s'", name);
error = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
return error;
}
- if (port == NULL) {
+ if (*port == NULL) {
+ // TODO(murgatroid99): add tests for this case
if (default_port == NULL) {
char *msg;
gpr_asprintf(&msg, "no port in name '%s'", name);
diff --git a/src/core/lib/iomgr/socket_utils_windows.c b/src/core/lib/iomgr/socket_utils_windows.c
index 628ad4a45b..5bbe8fa34c 100644
--- a/src/core/lib/iomgr/socket_utils_windows.c
+++ b/src/core/lib/iomgr/socket_utils_windows.c
@@ -41,8 +41,12 @@
#include <grpc/support/log.h>
const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size) {
+#ifdef GPR_WIN_INET_NTOP
+ return inet_ntop(af, src, dst, size);
+#else
/* Windows InetNtopA wants a mutable ip pointer */
return InetNtopA(af, (void *)src, dst, size);
+#endif /* GPR_WIN_INET_NTOP */
}
#endif /* GRPC_WINDOWS_SOCKETUTILS */
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index 5225a5402b..3de0795187 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -46,6 +46,8 @@
#include "src/core/lib/iomgr/tcp_uv.h"
#include "src/core/lib/iomgr/timer.h"
+extern int grpc_tcp_trace;
+
typedef struct grpc_uv_tcp_connect {
uv_connect_t connect_req;
grpc_timer alarm;
@@ -70,6 +72,12 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp,
grpc_error *error) {
int done;
grpc_uv_tcp_connect *connect = acp;
+ if (grpc_tcp_trace) {
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s",
+ connect->addr_name, str);
+ grpc_error_free_string(str);
+ }
if (error == GRPC_ERROR_NONE) {
/* error == NONE implies that the timer ran out, and wasn't cancelled. If
it was cancelled, then the handler that cancelled it also should close
@@ -145,6 +153,12 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
connect->resource_quota = resource_quota;
uv_tcp_init(uv_default_loop(), connect->tcp_handle);
connect->connect_req.data = connect;
+
+ if (grpc_tcp_trace) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
+ connect->addr_name);
+ }
+
// TODO(murgatroid99): figure out what the return value here means
uv_tcp_connect(&connect->connect_req, connect->tcp_handle,
(const struct sockaddr *)resolved_addr->addr,