aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/http_proxy.c64
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c4
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c282
-rw-r--r--src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c8
-rw-r--r--src/core/lib/iomgr/ev_epoll_thread_pool_linux.c8
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c12
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.c10
-rw-r--r--src/core/lib/iomgr/lockfree_event.c14
-rw-r--r--src/core/lib/iomgr/lockfree_event.h5
9 files changed, 323 insertions, 84 deletions
diff --git a/src/core/ext/filters/client_channel/http_proxy.c b/src/core/ext/filters/client_channel/http_proxy.c
index aa3f61c991..ef3512ed83 100644
--- a/src/core/ext/filters/client_channel/http_proxy.c
+++ b/src/core/ext/filters/client_channel/http_proxy.c
@@ -30,15 +30,23 @@
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/slice/b64.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
-static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) {
+/**
+ * Parses the 'http_proxy' env var and returns the proxy hostname to resolve or
+ * NULL on error. Also sets 'user_cred' to user credentials if present in the
+ * 'http_proxy' env var, otherwise leaves it unchanged. It is caller's
+ * responsibility to gpr_free user_cred.
+ */
+static char* get_http_proxy_server(grpc_exec_ctx* exec_ctx, char** user_cred) {
+ GPR_ASSERT(user_cred != NULL);
+ char* proxy_name = NULL;
char* uri_str = gpr_getenv("http_proxy");
if (uri_str == NULL) return NULL;
grpc_uri* uri =
grpc_uri_parse(exec_ctx, uri_str, false /* suppress_errors */);
- char* proxy_name = NULL;
if (uri == NULL || uri->authority == NULL) {
gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var");
goto done;
@@ -47,11 +55,27 @@ static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) {
gpr_log(GPR_ERROR, "'%s' scheme not supported in proxy URI", uri->scheme);
goto done;
}
- if (strchr(uri->authority, '@') != NULL) {
- gpr_log(GPR_ERROR, "userinfo not supported in proxy URI");
- goto done;
+ /* Split on '@' to separate user credentials from host */
+ char** authority_strs = NULL;
+ size_t authority_nstrs;
+ gpr_string_split(uri->authority, "@", &authority_strs, &authority_nstrs);
+ GPR_ASSERT(authority_nstrs != 0); /* should have at least 1 string */
+ if (authority_nstrs == 1) {
+ /* User cred not present in authority */
+ proxy_name = authority_strs[0];
+ } else if (authority_nstrs == 2) {
+ /* User cred found */
+ *user_cred = authority_strs[0];
+ proxy_name = authority_strs[1];
+ gpr_log(GPR_DEBUG, "userinfo found in proxy URI");
+ } else {
+ /* Bad authority */
+ for (size_t i = 0; i < authority_nstrs; i++) {
+ gpr_free(authority_strs[i]);
+ }
+ proxy_name = NULL;
}
- proxy_name = gpr_strdup(uri->authority);
+ gpr_free(authority_strs);
done:
gpr_free(uri_str);
grpc_uri_destroy(uri);
@@ -64,7 +88,8 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx,
const grpc_channel_args* args,
char** name_to_resolve,
grpc_channel_args** new_args) {
- *name_to_resolve = grpc_get_http_proxy_server(exec_ctx);
+ char* user_cred = NULL;
+ *name_to_resolve = get_http_proxy_server(exec_ctx, &user_cred);
if (*name_to_resolve == NULL) return false;
grpc_uri* uri =
grpc_uri_parse(exec_ctx, server_uri, false /* suppress_errors */);
@@ -73,12 +98,16 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx,
"'http_proxy' environment variable set, but cannot "
"parse server URI '%s' -- not using proxy",
server_uri);
- if (uri != NULL) grpc_uri_destroy(uri);
+ if (uri != NULL) {
+ gpr_free(user_cred);
+ grpc_uri_destroy(uri);
+ }
return false;
}
if (strcmp(uri->scheme, "unix") == 0) {
gpr_log(GPR_INFO, "not using proxy for Unix domain socket '%s'",
server_uri);
+ gpr_free(user_cred);
grpc_uri_destroy(uri);
return false;
}
@@ -126,10 +155,25 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx,
}
}
}
- grpc_arg new_arg = grpc_channel_arg_string_create(
+ grpc_arg args_to_add[2];
+ args_to_add[0] = grpc_channel_arg_string_create(
GRPC_ARG_HTTP_CONNECT_SERVER,
uri->path[0] == '/' ? uri->path + 1 : uri->path);
- *new_args = grpc_channel_args_copy_and_add(args, &new_arg, 1);
+ if (user_cred != NULL) {
+ /* Use base64 encoding for user credentials as stated in RFC 7617 */
+ char* encoded_user_cred =
+ grpc_base64_encode(user_cred, strlen(user_cred), 0, 0);
+ char* header;
+ gpr_asprintf(&header, "Proxy-Authorization:Basic %s", encoded_user_cred);
+ gpr_free(encoded_user_cred);
+ args_to_add[1] =
+ grpc_channel_arg_string_create(GRPC_ARG_HTTP_CONNECT_HEADERS, header);
+ *new_args = grpc_channel_args_copy_and_add(args, args_to_add, 2);
+ gpr_free(header);
+ } else {
+ *new_args = grpc_channel_args_copy_and_add(args, args_to_add, 1);
+ }
+ gpr_free(user_cred);
grpc_uri_destroy(uri);
return true;
}
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 3c8b470b4f..9d46cfa22e 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -657,6 +657,10 @@ static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx,
"ignoring grpc_chttp2_stream with non-client generated index %d",
t->incoming_stream_id));
return init_skip_frame_parser(exec_ctx, t, 1);
+ } else if (grpc_chttp2_stream_map_size(&t->stream_map) >=
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Max stream count exceeded");
}
t->last_new_stream_id = t->incoming_stream_id;
s = t->incoming_stream =
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 66ba601adb..b89b8af15a 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -45,6 +45,7 @@
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
+#include "src/core/lib/support/string.h"
static grpc_wakeup_fd global_wakeup_fd;
static int g_epfd;
@@ -77,8 +78,21 @@ static void fd_global_shutdown(void);
typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
+static const char *kick_state_string(kick_state st) {
+ switch (st) {
+ case UNKICKED:
+ return "UNKICKED";
+ case KICKED:
+ return "KICKED";
+ case DESIGNATED_POLLER:
+ return "DESIGNATED_POLLER";
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
struct grpc_pollset_worker {
kick_state kick_state;
+ int kick_state_mutator; // which line of code last changed kick state
bool initialized_cv;
grpc_pollset_worker *next;
grpc_pollset_worker *prev;
@@ -86,6 +100,12 @@ struct grpc_pollset_worker {
grpc_closure_list schedule_on_end_work;
};
+#define SET_KICK_STATE(worker, state) \
+ do { \
+ (worker)->kick_state = (state); \
+ (worker)->kick_state_mutator = __LINE__; \
+ } while (false)
+
#define MAX_NEIGHBOURHOODS 1024
typedef struct pollset_neighbourhood {
@@ -100,10 +120,15 @@ struct grpc_pollset {
bool reassigning_neighbourhood;
grpc_pollset_worker *root_worker;
bool kicked_without_poller;
+
+ /* Set to true if the pollset is observed to have no workers available to
+ * poll */
bool seen_inactive;
- bool shutting_down; /* Is the pollset shutting down ? */
- bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
+ bool shutting_down; /* Is the pollset shutting down ? */
grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
+
+ /* Number of workers who are *about-to* attach themselves to the pollset
+ * worker list */
int begin_refs;
grpc_pollset *next;
@@ -263,29 +288,23 @@ static bool fd_is_shutdown(grpc_fd *fd) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
-
- /* Note, it is possible that fd_become_readable might be called twice with
- different 'notifier's when an fd becomes readable and it is in two epoll
- sets (This can happen briefly during polling island merges). In such cases
- it does not really matter which notifer is set as the read_notifier_pollset
- (They would both point to the same polling island anyway) */
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Use release store to match with acquire load in fd_get_read_notifier */
gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
/*******************************************************************************
@@ -410,18 +429,28 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
if (pollset->root_worker != NULL) {
grpc_pollset_worker *worker = pollset->root_worker;
do {
- if (worker->initialized_cv) {
- worker->kick_state = KICKED;
- gpr_cv_signal(&worker->cv);
- } else {
- worker->kick_state = KICKED;
- append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
- "pollset_shutdown");
+ switch (worker->kick_state) {
+ case KICKED:
+ break;
+ case UNKICKED:
+ SET_KICK_STATE(worker, KICKED);
+ if (worker->initialized_cv) {
+ gpr_cv_signal(&worker->cv);
+ }
+ break;
+ case DESIGNATED_POLLER:
+ SET_KICK_STATE(worker, KICKED);
+ append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
+ "pollset_kick_all");
+ break;
}
worker = worker->next;
} while (worker != pollset->root_worker);
}
+ // TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
+ // in the else case
+
return error;
}
@@ -437,7 +466,9 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(pollset->shutdown_closure == NULL);
+ GPR_ASSERT(!pollset->shutting_down);
pollset->shutdown_closure = closure;
+ pollset->shutting_down = true;
GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
@@ -510,10 +541,14 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline) {
if (worker_hdl != NULL) *worker_hdl = worker;
worker->initialized_cv = false;
- worker->kick_state = UNKICKED;
+ SET_KICK_STATE(worker, UNKICKED);
worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
pollset->begin_refs++;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker);
+ }
+
if (pollset->seen_inactive) {
// pollset has been observed to be inactive, we need to move back to the
// active list
@@ -529,6 +564,11 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
retry_lock_neighbourhood:
gpr_mu_lock(&neighbourhood->mu);
gpr_mu_lock(&pollset->mu);
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
+ pollset, worker, kick_state_string(worker->kick_state),
+ is_reassigning);
+ }
if (pollset->seen_inactive) {
if (neighbourhood != pollset->neighbourhood) {
gpr_mu_unlock(&neighbourhood->mu);
@@ -539,8 +579,14 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
pollset->seen_inactive = false;
if (neighbourhood->active_root == NULL) {
neighbourhood->active_root = pollset->next = pollset->prev = pollset;
- if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
- worker->kick_state = DESIGNATED_POLLER;
+ /* TODO: sreek. Why would this worker state be other than UNKICKED
+ * here ? (since the worker isn't added to the pollset yet, there is no
+ * way it can be "found" by other threads to get kicked). */
+
+ /* If there is no designated poller, make this the designated poller */
+ if (worker->kick_state == UNKICKED &&
+ gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
+ SET_KICK_STATE(worker, DESIGNATED_POLLER);
}
} else {
pollset->next = neighbourhood->active_root;
@@ -554,24 +600,53 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
}
gpr_mu_unlock(&neighbourhood->mu);
}
+
worker_insert(pollset, worker);
pollset->begin_refs--;
- if (worker->kick_state == UNKICKED) {
+ if (worker->kick_state == UNKICKED && !pollset->kicked_without_poller) {
GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
- while (worker->kick_state == UNKICKED &&
- pollset->shutdown_closure == NULL) {
+ while (worker->kick_state == UNKICKED && !pollset->shutting_down) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
+ pollset, worker, kick_state_string(worker->kick_state),
+ pollset->shutting_down);
+ }
+
if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
worker->kick_state == UNKICKED) {
- worker->kick_state = KICKED;
+ /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
+ received a kick */
+ SET_KICK_STATE(worker, KICKED);
}
}
*now = gpr_now(now->clock_type);
}
- return worker->kick_state == DESIGNATED_POLLER &&
- pollset->shutdown_closure == NULL;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR,
+ "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
+ "kicked_without_poller: %d",
+ pollset, worker, kick_state_string(worker->kick_state),
+ pollset->shutting_down, pollset->kicked_without_poller);
+ }
+
+ /* We release pollset lock in this function at a couple of places:
+ * 1. Briefly when assigning pollset to a neighbourhood
+ * 2. When doing gpr_cv_wait()
+ * It is possible that 'kicked_without_poller' was set to true during (1) and
+ * 'shutting_down' is set to true during (1) or (2). If either of them is
+ * true, this worker cannot do polling */
+ /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
+ * case; especially when the worker is the DESIGNATED_POLLER */
+
+ if (pollset->kicked_without_poller) {
+ pollset->kicked_without_poller = false;
+ return false;
+ }
+
+ return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
}
static bool check_neighbourhood_for_available_poller(
@@ -591,10 +666,18 @@ static bool check_neighbourhood_for_available_poller(
case UNKICKED:
if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
(gpr_atm)inspect_worker)) {
- inspect_worker->kick_state = DESIGNATED_POLLER;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. choose next poller to be %p",
+ inspect_worker);
+ }
+ SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
if (inspect_worker->initialized_cv) {
gpr_cv_signal(&inspect_worker->cv);
}
+ } else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. beaten to choose next poller");
+ }
}
// even if we didn't win the cas, there's a worker, we can stop
found_worker = true;
@@ -607,9 +690,12 @@ static bool check_neighbourhood_for_available_poller(
break;
}
inspect_worker = inspect_worker->next;
- } while (inspect_worker != inspect->root_worker);
+ } while (!found_worker && inspect_worker != inspect->root_worker);
}
if (!found_worker) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
+ }
inspect->seen_inactive = true;
if (inspect == neighbourhood->active_root) {
neighbourhood->active_root =
@@ -627,15 +713,22 @@ static bool check_neighbourhood_for_available_poller(
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
+ }
if (worker_hdl != NULL) *worker_hdl = NULL;
- worker->kick_state = KICKED;
+ /* Make sure we appear kicked */
+ SET_KICK_STATE(worker, KICKED);
grpc_closure_list_move(&worker->schedule_on_end_work,
&exec_ctx->closure_list);
if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
if (worker->next != worker && worker->next->kick_state == UNKICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
+ }
GPR_ASSERT(worker->next->initialized_cv);
gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
- worker->next->kick_state = DESIGNATED_POLLER;
+ SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
gpr_cv_signal(&worker->next->cv);
if (grpc_exec_ctx_has_work(exec_ctx)) {
gpr_mu_unlock(&pollset->mu);
@@ -644,9 +737,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else {
gpr_atm_no_barrier_store(&g_active_poller, 0);
- gpr_mu_unlock(&pollset->mu);
size_t poller_neighbourhood_idx =
(size_t)(pollset->neighbourhood - g_neighbourhoods);
+ gpr_mu_unlock(&pollset->mu);
bool found_worker = false;
bool scan_state[MAX_NEIGHBOURHOODS];
for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
@@ -682,6 +775,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. remove worker");
+ }
if (EMPTIED == worker_remove(pollset, worker)) {
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
@@ -702,16 +798,18 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->kicked_without_poller = false;
return GRPC_ERROR_NONE;
}
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
- GPR_ASSERT(!pollset->shutdown_closure);
+ GPR_ASSERT(!pollset->shutting_down);
GPR_ASSERT(!pollset->seen_inactive);
gpr_mu_unlock(&pollset->mu);
append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
err_desc);
gpr_mu_lock(&pollset->mu);
gpr_tls_set(&g_current_thread_worker, 0);
+ } else {
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
}
end_worker(exec_ctx, pollset, &worker, worker_hdl);
gpr_tls_set(&g_current_thread_pollset, 0);
@@ -720,46 +818,136 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
static grpc_error *pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_strvec log;
+ gpr_strvec_init(&log);
+ char *tmp;
+ gpr_asprintf(
+ &tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
+ specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
+ (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker);
+ gpr_strvec_add(&log, tmp);
+ if (pollset->root_worker != NULL) {
+ gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
+ kick_state_string(pollset->root_worker->kick_state),
+ pollset->root_worker->next,
+ kick_state_string(pollset->root_worker->next->kick_state));
+ gpr_strvec_add(&log, tmp);
+ }
+ if (specific_worker != NULL) {
+ gpr_asprintf(&tmp, " worker_kick_state=%s",
+ kick_state_string(specific_worker->kick_state));
+ gpr_strvec_add(&log, tmp);
+ }
+ tmp = gpr_strvec_flatten(&log, NULL);
+ gpr_strvec_destroy(&log);
+ gpr_log(GPR_ERROR, "%s", tmp);
+ gpr_free(tmp);
+ }
if (specific_worker == NULL) {
if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
grpc_pollset_worker *root_worker = pollset->root_worker;
if (root_worker == NULL) {
pollset->kicked_without_poller = true;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked_without_poller");
+ }
return GRPC_ERROR_NONE;
}
grpc_pollset_worker *next_worker = root_worker->next;
- if (root_worker == next_worker &&
- root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
- &g_active_poller)) {
- root_worker->kick_state = KICKED;
+ if (root_worker->kick_state == KICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
+ }
+ SET_KICK_STATE(root_worker, KICKED);
+ return GRPC_ERROR_NONE;
+ } else if (next_worker->kick_state == KICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
+ }
+ SET_KICK_STATE(next_worker, KICKED);
+ return GRPC_ERROR_NONE;
+ } else if (root_worker ==
+ next_worker && // only try and wake up a poller if
+ // there is no next worker
+ root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
+ &g_active_poller)) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
+ }
+ SET_KICK_STATE(root_worker, KICKED);
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
} else if (next_worker->kick_state == UNKICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
+ }
GPR_ASSERT(next_worker->initialized_cv);
- next_worker->kick_state = KICKED;
+ SET_KICK_STATE(next_worker, KICKED);
gpr_cv_signal(&next_worker->cv);
return GRPC_ERROR_NONE;
+ } else if (next_worker->kick_state == DESIGNATED_POLLER) {
+ if (root_worker->kick_state != DESIGNATED_POLLER) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(
+ GPR_ERROR,
+ " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
+ root_worker, root_worker->initialized_cv, next_worker);
+ }
+ SET_KICK_STATE(root_worker, KICKED);
+ if (root_worker->initialized_cv) {
+ gpr_cv_signal(&root_worker->cv);
+ }
+ return GRPC_ERROR_NONE;
+ } else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
+ root_worker);
+ }
+ SET_KICK_STATE(next_worker, KICKED);
+ return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+ }
} else {
+ GPR_ASSERT(next_worker->kick_state == KICKED);
+ SET_KICK_STATE(next_worker, KICKED);
return GRPC_ERROR_NONE;
}
} else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked while waking up");
+ }
return GRPC_ERROR_NONE;
}
} else if (specific_worker->kick_state == KICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. specific worker already kicked");
+ }
return GRPC_ERROR_NONE;
} else if (gpr_tls_get(&g_current_thread_worker) ==
(intptr_t)specific_worker) {
- specific_worker->kick_state = KICKED;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
+ }
+ SET_KICK_STATE(specific_worker, KICKED);
return GRPC_ERROR_NONE;
} else if (specific_worker ==
(grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
- specific_worker->kick_state = KICKED;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kick active poller");
+ }
+ SET_KICK_STATE(specific_worker, KICKED);
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
} else if (specific_worker->initialized_cv) {
- specific_worker->kick_state = KICKED;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kick waiting worker");
+ }
+ SET_KICK_STATE(specific_worker, KICKED);
gpr_cv_signal(&specific_worker->cv);
return GRPC_ERROR_NONE;
} else {
- specific_worker->kick_state = KICKED;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kick non-waiting worker");
+ }
+ SET_KICK_STATE(specific_worker, KICKED);
return GRPC_ERROR_NONE;
}
}
@@ -805,6 +993,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
+ close(g_epfd);
}
static const grpc_event_engine_vtable vtable = {
@@ -841,9 +1030,6 @@ static const grpc_event_engine_vtable vtable = {
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
* Create a dummy epoll_fd to make sure epoll support is available */
const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
- /* TODO(ctiller): temporary, until this stabilizes */
- if (!explicit_request) return NULL;
-
if (!grpc_has_wakeup_fd()) {
return NULL;
}
@@ -862,6 +1048,8 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
return NULL;
}
+ gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd);
+
return &vtable;
}
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
index 27b4892d1d..5166dc2ac2 100644
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
@@ -1007,12 +1007,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@@ -1223,7 +1223,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@@ -1235,7 +1235,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
index 49be72c03e..1058f69a83 100644
--- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
@@ -560,12 +560,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@@ -696,11 +696,11 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
}
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static void pollset_release_epoll_set(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 5690431759..a2fa4ad9a5 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -438,12 +438,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@@ -710,7 +710,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@@ -722,7 +722,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static grpc_error *fd_become_pollable_locked(grpc_fd *fd) {
@@ -1049,8 +1049,8 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
/* Introduce a spurious completion.
If we do not, then it may be that the fd-specific epoll set consumed
a completion without being polled, leading to a missed edge going up. */
- grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure);
- grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read");
+ grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write");
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &pollset->pollable;
if (append_error(&error, pollable_materialize(&pollset->pollable),
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index 0d969dccce..a84c208e1a 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -933,12 +933,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@@ -1115,7 +1115,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@@ -1127,7 +1127,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
@@ -1731,7 +1731,7 @@ const grpc_event_engine_vtable *grpc_init_epollsig_linux(
if (!is_grpc_wakeup_signal_initialized) {
/* TODO(ctiller): when other epoll engines are ready, remove the true || to
* force this to be explitly chosen if needed */
- if (true || explicit_request) {
+ if (explicit_request) {
grpc_use_signal(SIGRTMIN + 6);
} else {
return NULL;
diff --git a/src/core/lib/iomgr/lockfree_event.c b/src/core/lib/iomgr/lockfree_event.c
index c2ceecb3c5..f967b22ba9 100644
--- a/src/core/lib/iomgr/lockfree_event.c
+++ b/src/core/lib/iomgr/lockfree_event.c
@@ -79,12 +79,12 @@ bool grpc_lfev_is_shutdown(gpr_atm *state) {
}
void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state,
- grpc_closure *closure) {
+ grpc_closure *closure, const char *variable) {
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "lfev_notify_on: %p curr=%p closure=%p", state,
- (void *)curr, closure);
+ gpr_log(GPR_ERROR, "lfev_notify_on[%s]: %p curr=%p closure=%p", variable,
+ state, (void *)curr, closure);
}
switch (curr) {
case CLOSURE_NOT_READY: {
@@ -149,7 +149,7 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state,
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "lfev_set_shutdown: %p curr=%p err=%s", state,
+ gpr_log(GPR_ERROR, "lfev_set_shutdown: %p curr=%p err=%s", state,
(void *)curr, grpc_error_string(shutdown_err));
}
switch (curr) {
@@ -193,12 +193,14 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state,
GPR_UNREACHABLE_CODE(return false);
}
-void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state) {
+void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state,
+ const char *variable) {
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "lfev_set_ready: %p curr=%p", state, (void *)curr);
+ gpr_log(GPR_ERROR, "lfev_set_ready[%s]: %p curr=%p", variable, state,
+ (void *)curr);
}
switch (curr) {
diff --git a/src/core/lib/iomgr/lockfree_event.h b/src/core/lib/iomgr/lockfree_event.h
index ef3844a07a..6a14a0f3b2 100644
--- a/src/core/lib/iomgr/lockfree_event.h
+++ b/src/core/lib/iomgr/lockfree_event.h
@@ -30,10 +30,11 @@ void grpc_lfev_destroy(gpr_atm *state);
bool grpc_lfev_is_shutdown(gpr_atm *state);
void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state,
- grpc_closure *closure);
+ grpc_closure *closure, const char *variable);
/* Returns true on first successful shutdown */
bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state,
grpc_error *shutdown_err);
-void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state);
+void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state,
+ const char *variable);
#endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */