aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-12-12 06:36:43 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-12-12 06:36:43 -0800
commitb995e8bcdabd5e40bf5384a64ee57e83cc9691ff (patch)
tree4c1441e5648faad865e7480d36b063882e9fd77b /src/core/lib
parent298d481f1e8348cb7713d53a26fc2c41eb9d8f7c (diff)
parentb62bffbea5eef106bfbe644e8af161889c927401 (diff)
Merge github.com:grpc/grpc into slice_with_exec_ctx
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/handshaker.c27
-rw-r--r--src/core/lib/channel/handshaker.h13
-rw-r--r--src/core/lib/http/httpcli_security_connector.c17
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c585
-rw-r--r--src/core/lib/iomgr/tcp_posix.c5
-rw-r--r--src/core/lib/iomgr/tcp_server.h3
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c16
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c9
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c8
-rw-r--r--src/core/lib/security/transport/security_connector.c59
-rw-r--r--src/core/lib/security/transport/security_connector.h18
-rw-r--r--src/core/lib/security/transport/security_handshaker.c24
-rw-r--r--src/core/lib/security/transport/security_handshaker.h13
-rw-r--r--src/core/lib/support/string.h2
-rw-r--r--src/core/lib/support/tmpfile.h2
15 files changed, 399 insertions, 402 deletions
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
index 90626dc2d1..23edc826ca 100644
--- a/src/core/lib/channel/handshaker.c
+++ b/src/core/lib/channel/handshaker.c
@@ -49,21 +49,21 @@ void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
handshaker->vtable = vtable;
}
-static void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
- grpc_handshaker* handshaker) {
+void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker) {
handshaker->vtable->destroy(exec_ctx, handshaker);
}
-static void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
- grpc_handshaker* handshaker) {
+void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker) {
handshaker->vtable->shutdown(exec_ctx, handshaker);
}
-static void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
- grpc_handshaker* handshaker,
- grpc_tcp_server_acceptor* acceptor,
- grpc_closure* on_handshake_done,
- grpc_handshaker_args* args) {
+void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker,
+ grpc_tcp_server_acceptor* acceptor,
+ grpc_closure* on_handshake_done,
+ grpc_handshaker_args* args) {
handshaker->vtable->do_handshake(exec_ctx, handshaker, acceptor,
on_handshake_done, args);
}
@@ -157,10 +157,11 @@ static bool call_next_handshaker_locked(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr,
grpc_error* error) {
GPR_ASSERT(mgr->index <= mgr->count);
- // If we got an error or we've been shut down or we've finished the last
- // handshaker, invoke the on_handshake_done callback. Otherwise, call the
- // next handshaker.
- if (error != GRPC_ERROR_NONE || mgr->shutdown || mgr->index == mgr->count) {
+ // If we got an error or we've been shut down or we're exiting early or
+ // we've finished the last handshaker, invoke the on_handshake_done
+ // callback. Otherwise, call the next handshaker.
+ if (error != GRPC_ERROR_NONE || mgr->shutdown || mgr->args.exit_early ||
+ mgr->index == mgr->count) {
// Cancel deadline timer, since we're invoking the on_handshake_done
// callback now.
grpc_timer_cancel(exec_ctx, &mgr->deadline_timer);
diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h
index ebbc1ff7f3..450b7adaee 100644
--- a/src/core/lib/channel/handshaker.h
+++ b/src/core/lib/channel/handshaker.h
@@ -72,6 +72,9 @@ typedef struct {
grpc_endpoint* endpoint;
grpc_channel_args* args;
grpc_slice_buffer* read_buffer;
+ // A handshaker may set this to true before invoking on_handshake_done
+ // to indicate that subsequent handshakers should be skipped.
+ bool exit_early;
// User data passed through the handshake manager. Not used by
// individual handshakers.
void* user_data;
@@ -105,6 +108,16 @@ struct grpc_handshaker {
void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
grpc_handshaker* handshaker);
+void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker);
+void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker);
+void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker,
+ grpc_tcp_server_acceptor* acceptor,
+ grpc_closure* on_handshake_done,
+ grpc_handshaker_args* args);
+
///
/// grpc_handshake_manager
///
diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c
index 414383f8ca..7dd50b7dab 100644
--- a/src/core/lib/http/httpcli_security_connector.c
+++ b/src/core/lib/http/httpcli_security_connector.c
@@ -62,9 +62,9 @@ static void httpcli_ssl_destroy(grpc_exec_ctx *exec_ctx,
gpr_free(sc);
}
-static void httpcli_ssl_create_handshakers(
- grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
- grpc_handshake_manager *handshake_mgr) {
+static void httpcli_ssl_add_handshakers(grpc_exec_ctx *exec_ctx,
+ grpc_channel_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr) {
grpc_httpcli_ssl_channel_security_connector *c =
(grpc_httpcli_ssl_channel_security_connector *)sc;
tsi_handshaker *handshaker = NULL;
@@ -76,8 +76,9 @@ static void httpcli_ssl_create_handshakers(
tsi_result_to_string(result));
}
}
- grpc_security_create_handshakers(exec_ctx, handshaker, &sc->base,
- handshake_mgr);
+ grpc_handshake_manager_add(
+ handshake_mgr,
+ grpc_security_handshaker_create(exec_ctx, handshaker, &sc->base));
}
static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx,
@@ -135,7 +136,7 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create(
*sc = NULL;
return GRPC_SECURITY_ERROR;
}
- c->base.create_handshakers = httpcli_ssl_create_handshakers;
+ c->base.add_handshakers = httpcli_ssl_add_handshakers;
*sc = &c->base;
return GRPC_SECURITY_OK;
}
@@ -188,8 +189,8 @@ static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(httpcli_ssl_channel_security_connector_create(
exec_ctx, pem_root_certs, pem_root_certs_size, host, &sc) ==
GRPC_SECURITY_OK);
- grpc_channel_security_connector_create_handshakers(exec_ctx, sc,
- c->handshake_mgr);
+ grpc_channel_security_connector_add_handshakers(exec_ctx, sc,
+ c->handshake_mgr);
grpc_handshake_manager_do_handshake(
exec_ctx, c->handshake_mgr, tcp, NULL /* channel_args */, deadline,
NULL /* acceptor */, on_handshake_done, c /* user_data */);
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 843ca42422..59b1fe896f 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -68,6 +68,9 @@ static int grpc_polling_trace = 0; /* Disabled by default */
gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
}
+/* Uncomment the following enable extra checks on poll_object operations */
+/* #define PO_DEBUG */
+
static int grpc_wakeup_signal = -1;
static bool is_grpc_wakeup_signal_initialized = false;
@@ -94,10 +97,42 @@ void grpc_use_signal(int signum) {
struct polling_island;
+typedef enum {
+ POLL_OBJ_FD,
+ POLL_OBJ_POLLSET,
+ POLL_OBJ_POLLSET_SET
+} poll_obj_type;
+
+typedef struct poll_obj {
+#ifdef PO_DEBUG
+ poll_obj_type obj_type;
+#endif
+ gpr_mu mu;
+ struct polling_island *pi;
+} poll_obj;
+
+const char *poll_obj_string(poll_obj_type po_type) {
+ switch (po_type) {
+ case POLL_OBJ_FD:
+ return "fd";
+ case POLL_OBJ_POLLSET:
+ return "pollset";
+ case POLL_OBJ_POLLSET_SET:
+ return "pollset_set";
+ }
+
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
/*******************************************************************************
* Fd Declarations
*/
+
+#define FD_FROM_PO(po) ((grpc_fd *)(po))
+
struct grpc_fd {
+ poll_obj po;
+
int fd;
/* refst format:
bit 0 : 1=Active / 0=Orphaned
@@ -105,8 +140,6 @@ struct grpc_fd {
Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst;
- gpr_mu mu;
-
/* Indicates that the fd is shutdown and that any pending read/write closures
should fail */
bool shutdown;
@@ -119,9 +152,6 @@ struct grpc_fd {
grpc_closure *read_closure;
grpc_closure *write_closure;
- /* The polling island to which this fd belongs to (protected by mu) */
- struct polling_island *polling_island;
-
struct grpc_fd *freelist_next;
grpc_closure *on_done_closure;
@@ -224,41 +254,21 @@ struct grpc_pollset_worker {
};
struct grpc_pollset {
- gpr_mu mu;
+ poll_obj po;
+
grpc_pollset_worker root_worker;
bool kicked_without_pollers;
bool shutting_down; /* Is the pollset shutting down ? */
bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
grpc_closure *shutdown_done; /* Called after after shutdown is complete */
-
- /* The polling island to which this pollset belongs to */
- struct polling_island *polling_island;
};
/*******************************************************************************
* Pollset-set Declarations
*/
-/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
- * directly points to a polling_island (and adding an fd/pollset/pollset_set to
- * the current pollset_set would result in polling island merges. This would
- * remove the need to maintain fd_count here. This will also significantly
- * simplify the grpc_fd structure since we would no longer need to explicitly
- * maintain the orphaned state */
struct grpc_pollset_set {
- gpr_mu mu;
-
- size_t pollset_count;
- size_t pollset_capacity;
- grpc_pollset **pollsets;
-
- size_t pollset_set_count;
- size_t pollset_set_capacity;
- struct grpc_pollset_set **pollset_sets;
-
- size_t fd_count;
- size_t fd_capacity;
- grpc_fd **fds;
+ poll_obj po;
};
/*******************************************************************************
@@ -914,7 +924,7 @@ static void fd_global_shutdown(void) {
while (fd_freelist != NULL) {
grpc_fd *fd = fd_freelist;
fd_freelist = fd_freelist->freelist_next;
- gpr_mu_destroy(&fd->mu);
+ gpr_mu_destroy(&fd->po.mu);
gpr_free(fd);
}
gpr_mu_destroy(&fd_freelist_mu);
@@ -932,13 +942,17 @@ static grpc_fd *fd_create(int fd, const char *name) {
if (new_fd == NULL) {
new_fd = gpr_malloc(sizeof(grpc_fd));
- gpr_mu_init(&new_fd->mu);
+ gpr_mu_init(&new_fd->po.mu);
}
- /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
- newly created fd (or an fd we got from the freelist), no one else would be
- holding a lock to it anyway. */
- gpr_mu_lock(&new_fd->mu);
+ /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
+ * is a newly created fd (or an fd we got from the freelist), no one else
+ * would be holding a lock to it anyway. */
+ gpr_mu_lock(&new_fd->po.mu);
+ new_fd->po.pi = NULL;
+#ifdef PO_DEBUG
+ new_fd->po.obj_type = POLL_OBJ_FD;
+#endif
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
@@ -946,12 +960,11 @@ static grpc_fd *fd_create(int fd, const char *name) {
new_fd->orphaned = false;
new_fd->read_closure = CLOSURE_NOT_READY;
new_fd->write_closure = CLOSURE_NOT_READY;
- new_fd->polling_island = NULL;
new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL;
new_fd->read_notifier_pollset = NULL;
- gpr_mu_unlock(&new_fd->mu);
+ gpr_mu_unlock(&new_fd->po.mu);
char *fd_name;
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
@@ -963,17 +976,13 @@ static grpc_fd *fd_create(int fd, const char *name) {
return new_fd;
}
-static bool fd_is_orphaned(grpc_fd *fd) {
- return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
-}
-
static int fd_wrapped_fd(grpc_fd *fd) {
int ret_fd = -1;
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
if (!fd->orphaned) {
ret_fd = fd->fd;
}
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
return ret_fd;
}
@@ -985,7 +994,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL;
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
fd->on_done_closure = on_done;
/* If release_fd is not NULL, we should be relinquishing control of the file
@@ -1005,25 +1014,25 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
/* Remove the fd from the polling island:
- Get a lock on the latest polling island (i.e the last island in the
- linked list pointed by fd->polling_island). This is the island that
+ linked list pointed by fd->po.pi). This is the island that
would actually contain the fd
- Remove the fd from the latest polling island
- Unlock the latest polling island
- - Set fd->polling_island to NULL (but remove the ref on the polling island
+ - Set fd->po.pi to NULL (but remove the ref on the polling island
before doing this.) */
- if (fd->polling_island != NULL) {
- polling_island *pi_latest = polling_island_lock(fd->polling_island);
+ if (fd->po.pi != NULL) {
+ polling_island *pi_latest = polling_island_lock(fd->po.pi);
polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
- unref_pi = fd->polling_island;
- fd->polling_island = NULL;
+ unref_pi = fd->po.pi;
+ fd->po.pi = NULL;
}
grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
NULL);
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
if (unref_pi != NULL) {
/* Unref stale polling island here, outside the fd lock above.
@@ -1088,23 +1097,23 @@ static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
grpc_fd *fd) {
grpc_pollset *notifier = NULL;
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
notifier = fd->read_notifier_pollset;
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
return notifier;
}
static bool fd_is_shutdown(grpc_fd *fd) {
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
const bool r = fd->shutdown;
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
return r;
}
/* Might be called multiple times */
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
/* Do the actual shutdown only once */
if (!fd->shutdown) {
fd->shutdown = true;
@@ -1115,28 +1124,28 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
}
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
}
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- gpr_mu_lock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
}
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
- gpr_mu_lock(&fd->mu);
- grpc_workqueue *workqueue = GRPC_WORKQUEUE_REF(
- (grpc_workqueue *)fd->polling_island, "fd_get_workqueue");
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
+ grpc_workqueue *workqueue =
+ GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
+ gpr_mu_unlock(&fd->po.mu);
return workqueue;
}
@@ -1276,8 +1285,12 @@ static grpc_error *kick_poller(void) {
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
- gpr_mu_init(&pollset->mu);
- *mu = &pollset->mu;
+ gpr_mu_init(&pollset->po.mu);
+ *mu = &pollset->po.mu;
+ pollset->po.pi = NULL;
+#ifdef PO_DEBUG
+ pollset->po.obj_type = POLL_OBJ_POLLSET;
+#endif
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->kicked_without_pollers = false;
@@ -1285,8 +1298,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset->shutting_down = false;
pollset->finish_shutdown_called = false;
pollset->shutdown_done = NULL;
-
- pollset->polling_island = NULL;
}
/* Convert a timespec to milliseconds:
@@ -1316,26 +1327,26 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- /* Need the fd->mu since we might be racing with fd_notify_on_read */
- gpr_mu_lock(&fd->mu);
+ /* Need the fd->po.mu since we might be racing with fd_notify_on_read */
+ gpr_mu_lock(&fd->po.mu);
set_ready_locked(exec_ctx, fd, &fd->read_closure);
fd->read_notifier_pollset = notifier;
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- /* Need the fd->mu since we might be racing with fd_notify_on_write */
- gpr_mu_lock(&fd->mu);
+ /* Need the fd->po.mu since we might be racing with fd_notify_on_write */
+ gpr_mu_lock(&fd->po.mu);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&fd->po.mu);
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
grpc_pollset *ps, char *reason) {
- if (ps->polling_island != NULL) {
- PI_UNREF(exec_ctx, ps->polling_island, reason);
+ if (ps->po.pi != NULL) {
+ PI_UNREF(exec_ctx, ps->po.pi, reason);
}
- ps->polling_island = NULL;
+ ps->po.pi = NULL;
}
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
@@ -1345,12 +1356,12 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
pollset->finish_shutdown_called = true;
- /* Release the ref and set pollset->polling_island to NULL */
+ /* Release the ref and set pollset->po.pi to NULL */
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
}
-/* pollset->mu lock must be held by the caller before calling this */
+/* pollset->po.mu lock must be held by the caller before calling this */
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_TIMER_BEGIN("pollset_shutdown", 0);
@@ -1375,7 +1386,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* here */
static void pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(!pollset_has_workers(pollset));
- gpr_mu_destroy(&pollset->mu);
+ gpr_mu_destroy(&pollset->po.mu);
}
static void pollset_reset(grpc_pollset *pollset) {
@@ -1385,7 +1396,7 @@ static void pollset_reset(grpc_pollset *pollset) {
pollset->finish_shutdown_called = false;
pollset->kicked_without_pollers = false;
pollset->shutdown_done = NULL;
- GPR_ASSERT(pollset->polling_island == NULL);
+ GPR_ASSERT(pollset->po.pi == NULL);
}
static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
@@ -1425,7 +1436,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
- latest polling island pointed by pollset->polling_island.
+ latest polling island pointed by pollset->po.pi
Since epoll_fd is immutable, we can read it without obtaining the polling
island lock. There is however a possibility that the polling island (from
@@ -1434,36 +1445,36 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
right-away from epoll_wait() and pick up the latest polling_island the next
this function (i.e pollset_work_and_unlock()) is called */
- if (pollset->polling_island == NULL) {
- pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
- if (pollset->polling_island == NULL) {
+ if (pollset->po.pi == NULL) {
+ pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
+ if (pollset->po.pi == NULL) {
GPR_TIMER_END("pollset_work_and_unlock", 0);
return; /* Fatal error. We cannot continue */
}
- PI_ADD_REF(pollset->polling_island, "ps");
+ PI_ADD_REF(pollset->po.pi, "ps");
GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
- (void *)pollset, (void *)pollset->polling_island);
+ (void *)pollset, (void *)pollset->po.pi);
}
- pi = polling_island_maybe_get_latest(pollset->polling_island);
+ pi = polling_island_maybe_get_latest(pollset->po.pi);
epoll_fd = pi->epoll_fd;
- /* Update the pollset->polling_island since the island being pointed by
- pollset->polling_island maybe older than the one pointed by pi) */
- if (pollset->polling_island != pi) {
+ /* Update the pollset->po.pi since the island being pointed by
+ pollset->po.pi maybe older than the one pointed by pi) */
+ if (pollset->po.pi != pi) {
/* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
polling island to be deleted */
PI_ADD_REF(pi, "ps");
- PI_UNREF(exec_ctx, pollset->polling_island, "ps");
- pollset->polling_island = pi;
+ PI_UNREF(exec_ctx, pollset->po.pi, "ps");
+ pollset->po.pi = pi;
}
/* Add an extra ref so that the island does not get destroyed (which means
the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
epoll_fd */
PI_ADD_REF(pi, "ps_work");
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(&pollset->po.mu);
/* If we get some workqueue work to do, it might end up completing an item on
the completion queue, so there's no need to poll... so we skip that and
@@ -1536,17 +1547,17 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(pi != NULL);
/* Before leaving, release the extra ref we added to the polling island. It
- is important to use "pi" here (i.e our old copy of pollset->polling_island
+ is important to use "pi" here (i.e our old copy of pollset->po.pi
that we got before releasing the polling island lock). This is because
- pollset->polling_island pointer might get udpated in other parts of the
+ pollset->po.pi pointer might get udpated in other parts of the
code when there is an island merge while we are doing epoll_wait() above */
PI_UNREF(exec_ctx, pi, "ps_work");
GPR_TIMER_END("pollset_work_and_unlock", 0);
}
-/* pollset->mu lock must be held by the caller before calling this.
- The function pollset_work() may temporarily release the lock (pollset->mu)
+/* pollset->po.mu lock must be held by the caller before calling this.
+ The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -1616,7 +1627,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
&g_orig_sigmask, &error);
grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(&pollset->po.mu);
/* Note: There is no need to reset worker.is_kicked to 0 since we are no
longer going to use this worker */
@@ -1636,9 +1647,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
finish_shutdown_locked(exec_ctx, pollset);
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(&pollset->po.mu);
grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(&pollset->po.mu);
}
*worker_hdl = NULL;
@@ -1652,130 +1663,160 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
return error;
}
-static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_fd *fd) {
- GPR_TIMER_BEGIN("pollset_add_fd", 0);
-
- grpc_error *error = GRPC_ERROR_NONE;
+static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
+ poll_obj_type bag_type, poll_obj *item,
+ poll_obj_type item_type) {
+ GPR_TIMER_BEGIN("add_poll_object", 0);
- gpr_mu_lock(&pollset->mu);
- gpr_mu_lock(&fd->mu);
+#ifdef PO_DEBUG
+ GPR_ASSERT(item->obj_type == item_type);
+ GPR_ASSERT(bag->obj_type == bag_type);
+#endif
+ grpc_error *error = GRPC_ERROR_NONE;
polling_island *pi_new = NULL;
+ gpr_mu_lock(&bag->mu);
+ gpr_mu_lock(&item->mu);
+
retry:
- /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
- * equal, do nothing.
- * 2) If fd->polling_island and pollset->polling_island are both NULL, create
- * a new polling island (with a refcount of 2) and make the polling_island
- * fields in both fd and pollset to point to the new island
- * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
- * the NULL polling_island field to point to the non-NULL polling_island
- * field (ensure that the refcount on the polling island is incremented by
- * 1 to account for the newly added reference)
- * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
- * and different, merge both the polling islands and update the
- * polling_island fields in both fd and pollset to point to the merged
- * polling island.
+ /*
+ * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
+ * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
+ * a refcount of 2) and point item->pi and bag->pi to the new island
+ * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
+ * the other's non-NULL pi
+ * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
+ * polling islands and update item->pi and bag->pi to point to the new
+ * island
*/
- if (fd->orphaned) {
- gpr_mu_unlock(&fd->mu);
- gpr_mu_unlock(&pollset->mu);
- /* early out */
+ /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
+ * orphaned */
+ if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
+ gpr_mu_unlock(&item->mu);
+ gpr_mu_unlock(&bag->mu);
return;
}
- if (fd->polling_island == pollset->polling_island) {
- pi_new = fd->polling_island;
+ if (item->pi == bag->pi) {
+ pi_new = item->pi;
if (pi_new == NULL) {
- /* Unlock before creating a new polling island: the polling island will
- create a workqueue which creates a file descriptor, and holding an fd
- lock here can eventually cause a loop to appear to TSAN (making it
- unhappy). We don't think it's a real loop (there's an epoch point where
- that loop possibility disappears), but the advantages of keeping TSAN
- happy outweigh any performance advantage we might have by keeping the
- lock held. */
- gpr_mu_unlock(&fd->mu);
- pi_new = polling_island_create(exec_ctx, fd, &error);
- gpr_mu_lock(&fd->mu);
- /* Need to reverify any assumptions made between the initial lock and
- getting to this branch: if they've changed, we need to throw away our
- work and figure things out again. */
- if (fd->polling_island != NULL) {
- GRPC_POLLING_TRACE(
- "pollset_add_fd: Raced creating new polling island. pi_new: %p "
- "(fd: %d, pollset: %p)",
- (void *)pi_new, fd->fd, (void *)pollset);
-
- /* No need to lock 'pi_new' here since this is a new polling island and
- * no one has a reference to it yet */
- polling_island_remove_all_fds_locked(pi_new, true, &error);
-
- /* Ref and unref so that the polling island gets deleted during unref */
- PI_ADD_REF(pi_new, "dance_of_destruction");
- PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
- goto retry;
+ /* GPR_ASSERT(item->pi == bag->pi == NULL) */
+
+ /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
+ * we need to do some extra work to make TSAN happy */
+ if (item_type == POLL_OBJ_FD) {
+ /* Unlock before creating a new polling island: the polling island will
+ create a workqueue which creates a file descriptor, and holding an fd
+ lock here can eventually cause a loop to appear to TSAN (making it
+ unhappy). We don't think it's a real loop (there's an epoch point
+ where that loop possibility disappears), but the advantages of
+ keeping TSAN happy outweigh any performance advantage we might have
+ by keeping the lock held. */
+ gpr_mu_unlock(&item->mu);
+ pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
+ gpr_mu_lock(&item->mu);
+
+ /* Need to reverify any assumptions made between the initial lock and
+ getting to this branch: if they've changed, we need to throw away our
+ work and figure things out again. */
+ if (item->pi != NULL) {
+ GRPC_POLLING_TRACE(
+ "add_poll_object: Raced creating new polling island. pi_new: %p "
+ "(fd: %d, %s: %p)",
+ (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
+ (void *)bag);
+ /* No need to lock 'pi_new' here since this is a new polling island
+ * and no one has a reference to it yet */
+ polling_island_remove_all_fds_locked(pi_new, true, &error);
+
+ /* Ref and unref so that the polling island gets deleted during unref
+ */
+ PI_ADD_REF(pi_new, "dance_of_destruction");
+ PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
+ goto retry;
+ }
} else {
- GRPC_POLLING_TRACE(
- "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
- "pollset: %p)",
- (void *)pi_new, fd->fd, (void *)pollset);
+ pi_new = polling_island_create(exec_ctx, NULL, &error);
}
+
+ GRPC_POLLING_TRACE(
+ "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
+ "%s: %p)",
+ (void *)pi_new, poll_obj_string(item_type), (void *)item,
+ poll_obj_string(bag_type), (void *)bag);
+ } else {
+ GRPC_POLLING_TRACE(
+ "add_poll_object: Same polling island. pi: %p (%s, %s)",
+ (void *)pi_new, poll_obj_string(item_type),
+ poll_obj_string(bag_type));
+ }
+ } else if (item->pi == NULL) {
+ /* GPR_ASSERT(bag->pi != NULL) */
+ /* Make pi_new point to latest pi*/
+ pi_new = polling_island_lock(bag->pi);
+
+ if (item_type == POLL_OBJ_FD) {
+ grpc_fd *fd = FD_FROM_PO(item);
+ polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
}
- } else if (fd->polling_island == NULL) {
- pi_new = polling_island_lock(pollset->polling_island);
- polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
- gpr_mu_unlock(&pi_new->mu);
+ gpr_mu_unlock(&pi_new->mu);
GRPC_POLLING_TRACE(
- "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
- "pollset->pi: %p)",
- (void *)pi_new, fd->fd, (void *)pollset,
- (void *)pollset->polling_island);
- } else if (pollset->polling_island == NULL) {
- pi_new = polling_island_lock(fd->polling_island);
+ "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
+ "bag(%s): %p)",
+ (void *)pi_new, poll_obj_string(item_type), (void *)item,
+ poll_obj_string(bag_type), (void *)bag);
+ } else if (bag->pi == NULL) {
+ /* GPR_ASSERT(item->pi != NULL) */
+ /* Make pi_new to point to latest pi */
+ pi_new = polling_island_lock(item->pi);
gpr_mu_unlock(&pi_new->mu);
-
GRPC_POLLING_TRACE(
- "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
- "%p, fd->pi: %p",
- (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
+ "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
+ "bag(%s): %p)",
+ (void *)pi_new, poll_obj_string(item_type), (void *)item,
+ poll_obj_string(bag_type), (void *)bag);
} else {
- pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
- &error);
+ pi_new = polling_island_merge(item->pi, bag->pi, &error);
GRPC_POLLING_TRACE(
- "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
- "%p, fd->pi: %p, pollset->pi: %p)",
- (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
- (void *)pollset->polling_island);
+ "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
+ "bag(%s): %p)",
+ (void *)pi_new, poll_obj_string(item_type), (void *)item,
+ poll_obj_string(bag_type), (void *)bag);
}
- /* At this point, pi_new is the polling island that both fd->polling_island
- and pollset->polling_island must be pointing to */
+ /* At this point, pi_new is the polling island that both item->pi and bag->pi
+ MUST be pointing to */
- if (fd->polling_island != pi_new) {
- PI_ADD_REF(pi_new, "fd");
- if (fd->polling_island != NULL) {
- PI_UNREF(exec_ctx, fd->polling_island, "fd");
+ if (item->pi != pi_new) {
+ PI_ADD_REF(pi_new, poll_obj_string(item_type));
+ if (item->pi != NULL) {
+ PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
}
- fd->polling_island = pi_new;
+ item->pi = pi_new;
}
- if (pollset->polling_island != pi_new) {
- PI_ADD_REF(pi_new, "ps");
- if (pollset->polling_island != NULL) {
- PI_UNREF(exec_ctx, pollset->polling_island, "ps");
+ if (bag->pi != pi_new) {
+ PI_ADD_REF(pi_new, poll_obj_string(bag_type));
+ if (bag->pi != NULL) {
+ PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
}
- pollset->polling_island = pi_new;
+ bag->pi = pi_new;
}
- gpr_mu_unlock(&fd->mu);
- gpr_mu_unlock(&pollset->mu);
+ gpr_mu_unlock(&item->mu);
+ gpr_mu_unlock(&bag->mu);
- GRPC_LOG_IF_ERROR("pollset_add_fd", error);
+ GRPC_LOG_IF_ERROR("add_poll_object", error);
+ GPR_TIMER_END("add_poll_object", 0);
+}
- GPR_TIMER_END("pollset_add_fd", 0);
+static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd) {
+ add_poll_object(exec_ctx, &pollset->po, POLL_OBJ_POLLSET, &fd->po,
+ POLL_OBJ_FD);
}
/*******************************************************************************
@@ -1783,142 +1824,60 @@ retry:
*/
static grpc_pollset_set *pollset_set_create(void) {
- grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
- memset(pollset_set, 0, sizeof(*pollset_set));
- gpr_mu_init(&pollset_set->mu);
- return pollset_set;
+ grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
+ gpr_mu_init(&pss->po.mu);
+ pss->po.pi = NULL;
+#ifdef PO_DEBUG
+ pss->po.obj_type = POLL_OBJ_POLLSET_SET;
+#endif
+ return pss;
}
-static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
- size_t i;
- gpr_mu_destroy(&pollset_set->mu);
- for (i = 0; i < pollset_set->fd_count; i++) {
- GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
+static void pollset_set_destroy(grpc_pollset_set *pss) {
+ gpr_mu_destroy(&pss->po.mu);
+
+ if (pss->po.pi != NULL) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ PI_UNREF(&exec_ctx, pss->po.pi, "pss_destroy");
+ grpc_exec_ctx_finish(&exec_ctx);
}
- gpr_free(pollset_set->pollsets);
- gpr_free(pollset_set->pollset_sets);
- gpr_free(pollset_set->fds);
- gpr_free(pollset_set);
+
+ gpr_free(pss);
}
-static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pollset_set, grpc_fd *fd) {
- size_t i;
- gpr_mu_lock(&pollset_set->mu);
- if (pollset_set->fd_count == pollset_set->fd_capacity) {
- pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
- pollset_set->fds = gpr_realloc(
- pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
- }
- GRPC_FD_REF(fd, "pollset_set");
- pollset_set->fds[pollset_set->fd_count++] = fd;
- for (i = 0; i < pollset_set->pollset_count; i++) {
- pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
- }
- for (i = 0; i < pollset_set->pollset_set_count; i++) {
- pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
- }
- gpr_mu_unlock(&pollset_set->mu);
+static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
+ grpc_fd *fd) {
+ add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &fd->po,
+ POLL_OBJ_FD);
}
-static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pollset_set, grpc_fd *fd) {
- size_t i;
- gpr_mu_lock(&pollset_set->mu);
- for (i = 0; i < pollset_set->fd_count; i++) {
- if (pollset_set->fds[i] == fd) {
- pollset_set->fd_count--;
- GPR_SWAP(grpc_fd *, pollset_set->fds[i],
- pollset_set->fds[pollset_set->fd_count]);
- GRPC_FD_UNREF(fd, "pollset_set");
- break;
- }
- }
- for (i = 0; i < pollset_set->pollset_set_count; i++) {
- pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
- }
- gpr_mu_unlock(&pollset_set->mu);
+static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
+ grpc_fd *fd) {
+ /* Nothing to do */
}
static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pollset_set,
- grpc_pollset *pollset) {
- size_t i, j;
- gpr_mu_lock(&pollset_set->mu);
- if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
- pollset_set->pollset_capacity =
- GPR_MAX(8, 2 * pollset_set->pollset_capacity);
- pollset_set->pollsets =
- gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
- sizeof(*pollset_set->pollsets));
- }
- pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
- for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
- if (fd_is_orphaned(pollset_set->fds[i])) {
- GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
- } else {
- pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
- pollset_set->fds[j++] = pollset_set->fds[i];
- }
- }
- pollset_set->fd_count = j;
- gpr_mu_unlock(&pollset_set->mu);
+ grpc_pollset_set *pss, grpc_pollset *ps) {
+ add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &ps->po,
+ POLL_OBJ_POLLSET);
}
static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pollset_set,
- grpc_pollset *pollset) {
- size_t i;
- gpr_mu_lock(&pollset_set->mu);
- for (i = 0; i < pollset_set->pollset_count; i++) {
- if (pollset_set->pollsets[i] == pollset) {
- pollset_set->pollset_count--;
- GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
- pollset_set->pollsets[pollset_set->pollset_count]);
- break;
- }
- }
- gpr_mu_unlock(&pollset_set->mu);
+ grpc_pollset_set *pss, grpc_pollset *ps) {
+ /* Nothing to do */
}
static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item) {
- size_t i, j;
- gpr_mu_lock(&bag->mu);
- if (bag->pollset_set_count == bag->pollset_set_capacity) {
- bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
- bag->pollset_sets =
- gpr_realloc(bag->pollset_sets,
- bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
- }
- bag->pollset_sets[bag->pollset_set_count++] = item;
- for (i = 0, j = 0; i < bag->fd_count; i++) {
- if (fd_is_orphaned(bag->fds[i])) {
- GRPC_FD_UNREF(bag->fds[i], "pollset_set");
- } else {
- pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
- bag->fds[j++] = bag->fds[i];
- }
- }
- bag->fd_count = j;
- gpr_mu_unlock(&bag->mu);
+ add_poll_object(exec_ctx, &bag->po, POLL_OBJ_POLLSET_SET, &item->po,
+ POLL_OBJ_POLLSET_SET);
}
static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *bag,
grpc_pollset_set *item) {
- size_t i;
- gpr_mu_lock(&bag->mu);
- for (i = 0; i < bag->pollset_set_count; i++) {
- if (bag->pollset_sets[i] == item) {
- bag->pollset_set_count--;
- GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
- bag->pollset_sets[bag->pollset_set_count]);
- break;
- }
- }
- gpr_mu_unlock(&bag->mu);
+ /* Nothing to do */
}
/* Test helper functions
@@ -1926,9 +1885,9 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
void *grpc_fd_get_polling_island(grpc_fd *fd) {
polling_island *pi;
- gpr_mu_lock(&fd->mu);
- pi = fd->polling_island;
- gpr_mu_unlock(&fd->mu);
+ gpr_mu_lock(&fd->po.mu);
+ pi = fd->po.pi;
+ gpr_mu_unlock(&fd->po.mu);
return pi;
}
@@ -1936,9 +1895,9 @@ void *grpc_fd_get_polling_island(grpc_fd *fd) {
void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
polling_island *pi;
- gpr_mu_lock(&ps->mu);
- pi = ps->polling_island;
- gpr_mu_unlock(&ps->mu);
+ gpr_mu_lock(&ps->po.mu);
+ pi = ps->po.pi;
+ gpr_mu_unlock(&ps->po.mu);
return pi;
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 25f9b84e11..2b46544f82 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -377,6 +377,11 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
tcp->outgoing_slice_idx = unwind_slice_idx;
tcp->outgoing_byte_idx = unwind_byte_idx;
return false;
+ } else if (errno == EPIPE) {
+ *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"),
+ GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE);
+ return true;
} else {
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
return true;
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 6eba8c4057..437a94beff 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -52,7 +52,8 @@ typedef struct grpc_tcp_server_acceptor {
unsigned fd_index;
} grpc_tcp_server_acceptor;
-/* Called for newly connected TCP connections. */
+/* Called for newly connected TCP connections.
+ Takes ownership of acceptor. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep,
grpc_pollset *accepting_pollset,
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index c8401d442e..1e63142e0e 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -381,16 +381,12 @@ error:
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_tcp_listener *sp = arg;
- grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
- sp->fd_index};
- grpc_pollset *read_notifier_pollset = NULL;
- grpc_fd *fdobj;
if (err != GRPC_ERROR_NONE) {
goto error;
}
- read_notifier_pollset =
+ grpc_pollset *read_notifier_pollset =
sp->server->pollsets[(size_t)gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign, 1) %
sp->server->pollset_count];
@@ -426,7 +422,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
- fdobj = grpc_fd_create(fd, name);
+ grpc_fd *fdobj = grpc_fd_create(fd, name);
if (read_notifier_pollset == NULL) {
gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
@@ -435,11 +431,17 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
+ // Create acceptor.
+ grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
+ acceptor->from_server = sp->server;
+ acceptor->port_index = sp->port_index;
+ acceptor->fd_index = sp->fd_index;
+
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, sp->server->resource_quota,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
- read_notifier_pollset, &acceptor);
+ read_notifier_pollset, acceptor);
gpr_free(name);
gpr_free(addr_str);
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index 050bb9e141..4ccc80c9f0 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -188,7 +188,6 @@ static void accepted_connection_close_cb(uv_handle_t *handle) {
static void on_connect(uv_stream_t *server, int status) {
grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data;
- grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
uv_tcp_t *client;
grpc_endpoint *ep = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -201,6 +200,7 @@ static void on_connect(uv_stream_t *server, int status) {
uv_strerror(status));
return;
}
+
client = gpr_malloc(sizeof(uv_tcp_t));
uv_tcp_init(uv_default_loop(), client);
// UV documentation says this is guaranteed to succeed
@@ -220,8 +220,13 @@ static void on_connect(uv_stream_t *server, int status) {
gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status));
}
ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string);
+ // Create acceptor.
+ grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
+ acceptor->from_server = sp->server;
+ acceptor->port_index = sp->port_index;
+ acceptor->fd_index = 0;
sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
- &acceptor);
+ acceptor);
grpc_exec_ctx_finish(&exec_ctx);
}
}
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index f15fa6c0b2..053bb35f6f 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -323,7 +323,6 @@ failure:
/* Event manager callback when reads are ready. */
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_tcp_listener *sp = arg;
- grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
@@ -396,8 +395,13 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
if (ep) {
+ // Create acceptor.
+ grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
+ acceptor->from_server = sp->server;
+ acceptor->port_index = sp->port_index;
+ acceptor->fd_index = 0;
sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
- &acceptor);
+ acceptor);
}
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c
index d9c2361ae0..c0fdb6e0d6 100644
--- a/src/core/lib/security/transport/security_connector.c
+++ b/src/core/lib/security/transport/security_connector.c
@@ -43,6 +43,7 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/transport/chttp2/alpn/alpn.h"
+#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/iomgr/load_file.h"
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/credentials/credentials.h"
@@ -111,19 +112,19 @@ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer,
return NULL;
}
-void grpc_channel_security_connector_create_handshakers(
+void grpc_channel_security_connector_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector,
grpc_handshake_manager *handshake_mgr) {
if (connector != NULL) {
- connector->create_handshakers(exec_ctx, connector, handshake_mgr);
+ connector->add_handshakers(exec_ctx, connector, handshake_mgr);
}
}
-void grpc_server_security_connector_create_handshakers(
+void grpc_server_security_connector_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector,
grpc_handshake_manager *handshake_mgr) {
if (connector != NULL) {
- connector->create_handshakers(exec_ctx, connector, handshake_mgr);
+ connector->add_handshakers(exec_ctx, connector, handshake_mgr);
}
}
@@ -291,20 +292,24 @@ static void fake_channel_check_call_host(grpc_exec_ctx *exec_ctx,
cb(exec_ctx, user_data, GRPC_SECURITY_OK);
}
-static void fake_channel_create_handshakers(
+static void fake_channel_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
grpc_handshake_manager *handshake_mgr) {
- grpc_security_create_handshakers(
- exec_ctx, tsi_create_fake_handshaker(true /* is_client */), &sc->base,
- handshake_mgr);
+ grpc_handshake_manager_add(
+ handshake_mgr,
+ grpc_security_handshaker_create(
+ exec_ctx, tsi_create_fake_handshaker(true /* is_client */),
+ &sc->base));
}
-static void fake_server_create_handshakers(
- grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
- grpc_handshake_manager *handshake_mgr) {
- grpc_security_create_handshakers(
- exec_ctx, tsi_create_fake_handshaker(false /* is_client */), &sc->base,
- handshake_mgr);
+static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx,
+ grpc_server_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr) {
+ grpc_handshake_manager_add(
+ handshake_mgr,
+ grpc_security_handshaker_create(
+ exec_ctx, tsi_create_fake_handshaker(false /* is_client */),
+ &sc->base));
}
static grpc_security_connector_vtable fake_channel_vtable = {
@@ -322,7 +327,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
c->base.vtable = &fake_channel_vtable;
c->request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds);
c->check_call_host = fake_channel_check_call_host;
- c->create_handshakers = fake_channel_create_handshakers;
+ c->add_handshakers = fake_channel_add_handshakers;
return c;
}
@@ -334,7 +339,7 @@ grpc_server_security_connector *grpc_fake_server_security_connector_create(
gpr_ref_init(&c->base.refcount, 1);
c->base.vtable = &fake_server_vtable;
c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
- c->create_handshakers = fake_server_create_handshakers;
+ c->add_handshakers = fake_server_add_handshakers;
return c;
}
@@ -390,9 +395,9 @@ static grpc_security_status ssl_create_handshaker(
return GRPC_SECURITY_OK;
}
-static void ssl_channel_create_handshakers(
- grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
- grpc_handshake_manager *handshake_mgr) {
+static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx,
+ grpc_channel_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr) {
grpc_ssl_channel_security_connector *c =
(grpc_ssl_channel_security_connector *)sc;
// Instantiate TSI handshaker.
@@ -403,12 +408,13 @@ static void ssl_channel_create_handshakers(
: c->target_name,
&tsi_hs);
// Create handshakers.
- grpc_security_create_handshakers(exec_ctx, tsi_hs, &sc->base, handshake_mgr);
+ grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create(
+ exec_ctx, tsi_hs, &sc->base));
}
-static void ssl_server_create_handshakers(
- grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
- grpc_handshake_manager *handshake_mgr) {
+static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx,
+ grpc_server_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr) {
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
// Instantiate TSI handshaker.
@@ -416,7 +422,8 @@ static void ssl_server_create_handshakers(
ssl_create_handshaker(c->handshaker_factory, false /* is_client */,
NULL /* peer_name */, &tsi_hs);
// Create handshakers.
- grpc_security_create_handshakers(exec_ctx, tsi_hs, &sc->base, handshake_mgr);
+ grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create(
+ exec_ctx, tsi_hs, &sc->base));
}
static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) {
@@ -716,7 +723,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
c->base.request_metadata_creds =
grpc_call_credentials_ref(request_metadata_creds);
c->base.check_call_host = ssl_channel_check_call_host;
- c->base.create_handshakers = ssl_channel_create_handshakers;
+ c->base.add_handshakers = ssl_channel_add_handshakers;
gpr_split_host_port(target_name, &c->target_name, &port);
gpr_free(port);
if (overridden_target_name != NULL) {
@@ -792,7 +799,7 @@ grpc_security_status grpc_ssl_server_security_connector_create(
*sc = NULL;
goto error;
}
- c->base.create_handshakers = ssl_server_create_handshakers;
+ c->base.add_handshakers = ssl_server_add_handshakers;
*sc = &c->base;
gpr_free((void *)alpn_protocol_strings);
gpr_free(alpn_protocol_string_lengths);
diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h
index df77434a65..eba4e6d1d7 100644
--- a/src/core/lib/security/transport/security_connector.h
+++ b/src/core/lib/security/transport/security_connector.h
@@ -101,7 +101,7 @@ void grpc_security_connector_unref(grpc_exec_ctx *exec_ctx,
#endif
/* Check the peer. Callee takes ownership of the peer object.
- Sets *auth_context and invokes on_peer_checked when done. */
+ When done, sets *auth_context and invokes on_peer_checked. */
void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc,
tsi_peer peer,
@@ -136,9 +136,9 @@ struct grpc_channel_security_connector {
grpc_channel_security_connector *sc, const char *host,
grpc_auth_context *auth_context,
grpc_security_call_host_check_cb cb, void *user_data);
- void (*create_handshakers)(grpc_exec_ctx *exec_ctx,
- grpc_channel_security_connector *sc,
- grpc_handshake_manager *handshake_mgr);
+ void (*add_handshakers)(grpc_exec_ctx *exec_ctx,
+ grpc_channel_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr);
};
/* Checks that the host that will be set for a call is acceptable. */
@@ -148,7 +148,7 @@ void grpc_channel_security_connector_check_call_host(
grpc_security_call_host_check_cb cb, void *user_data);
/* Registers handshakers with \a handshake_mgr. */
-void grpc_channel_security_connector_create_handshakers(
+void grpc_channel_security_connector_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector,
grpc_handshake_manager *handshake_mgr);
@@ -161,12 +161,12 @@ typedef struct grpc_server_security_connector grpc_server_security_connector;
struct grpc_server_security_connector {
grpc_security_connector base;
- void (*create_handshakers)(grpc_exec_ctx *exec_ctx,
- grpc_server_security_connector *sc,
- grpc_handshake_manager *handshake_mgr);
+ void (*add_handshakers)(grpc_exec_ctx *exec_ctx,
+ grpc_server_security_connector *sc,
+ grpc_handshake_manager *handshake_mgr);
};
-void grpc_server_security_connector_create_handshakers(
+void grpc_server_security_connector_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
grpc_handshake_manager *handshake_mgr);
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index e96c480044..4f9f97ed71 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -132,7 +132,14 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx,
grpc_endpoint_shutdown(exec_ctx, h->args->endpoint);
// Not shutting down, so the write failed. Clean up before
// invoking the callback.
+<<<<<<< HEAD
cleanup_args_for_failure_locked(exec_ctx, h);
+=======
+ cleanup_args_for_failure_locked(h);
+ // Set shutdown to true so that subsequent calls to
+ // security_handshaker_shutdown() do nothing.
+ h->shutdown = true;
+>>>>>>> b62bffbea5eef106bfbe644e8af161889c927401
}
// Invoke callback.
grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, error, NULL);
@@ -436,17 +443,14 @@ static grpc_handshaker *fail_handshaker_create() {
// exported functions
//
-void grpc_security_create_handshakers(grpc_exec_ctx *exec_ctx,
- tsi_handshaker *handshaker,
- grpc_security_connector *connector,
- grpc_handshake_manager *handshake_mgr) {
- // If no TSI handshaker was created, add a handshaker that always fails.
- // Otherwise, add a real security handshaker.
+grpc_handshaker *grpc_security_handshaker_create(
+ grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
+ grpc_security_connector *connector) {
+ // If no TSI handshaker was created, return a handshaker that always fails.
+ // Otherwise, return a real security handshaker.
if (handshaker == NULL) {
- grpc_handshake_manager_add(handshake_mgr, fail_handshaker_create());
+ return fail_handshaker_create();
} else {
- grpc_handshake_manager_add(
- handshake_mgr,
- security_handshaker_create(exec_ctx, handshaker, connector));
+ return security_handshaker_create(exec_ctx, handshaker, connector);
}
}
diff --git a/src/core/lib/security/transport/security_handshaker.h b/src/core/lib/security/transport/security_handshaker.h
index f71f43a359..5ddbf4b451 100644
--- a/src/core/lib/security/transport/security_handshaker.h
+++ b/src/core/lib/security/transport/security_handshaker.h
@@ -34,14 +34,13 @@
#ifndef GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H
#define GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H
-#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/security/transport/security_connector.h"
-/// Creates any necessary security handshakers and adds them to
-/// \a handshake_mgr.
-void grpc_security_create_handshakers(grpc_exec_ctx *exec_ctx,
- tsi_handshaker *handshaker,
- grpc_security_connector *connector,
- grpc_handshake_manager *handshake_mgr);
+/// Creates a security handshaker using \a handshaker.
+grpc_handshaker *grpc_security_handshaker_create(
+ grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
+ grpc_security_connector *connector);
#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H */
diff --git a/src/core/lib/support/string.h b/src/core/lib/support/string.h
index 43ab4dc1be..e81af68cfa 100644
--- a/src/core/lib/support/string.h
+++ b/src/core/lib/support/string.h
@@ -36,8 +36,6 @@
#include <stddef.h>
-#include <grpc/slice.h>
-#include <grpc/slice_buffer.h>
#include <grpc/support/port_platform.h>
#ifdef __cplusplus
diff --git a/src/core/lib/support/tmpfile.h b/src/core/lib/support/tmpfile.h
index 8952e5ec3d..f613cf9bc8 100644
--- a/src/core/lib/support/tmpfile.h
+++ b/src/core/lib/support/tmpfile.h
@@ -36,8 +36,6 @@
#include <stdio.h>
-#include <grpc/slice.h>
-
#ifdef __cplusplus
extern "C" {
#endif