aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-05-20 22:23:37 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-05-20 22:23:37 -0700
commitae09d9dca9ac0f6d6c6e877e2935ad8cfba9da05 (patch)
tree1c58539b2c9e975507a9f06622ad5956515fe1c5 /src/core
parentd88e15cee750cd647a900098d82f87cc25aa8dbe (diff)
Fixes and code simplification
Diffstat (limited to 'src/core')
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c40
-rw-r--r--src/core/lib/surface/server.c9
2 files changed, 13 insertions, 36 deletions
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 4d2ec5eb98..4f64d31c97 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -183,7 +183,6 @@ struct grpc_pollset_worker {
struct grpc_pollset {
gpr_mu mu;
grpc_pollset_worker root_worker;
- int in_flight_cbs;
int shutting_down;
int called_shutdown;
int kicked_without_pollers;
@@ -193,10 +192,6 @@ struct grpc_pollset {
size_t fd_count;
size_t fd_capacity;
grpc_fd **fds;
- /* fds that have been removed from the pollset explicitly */
- size_t del_count;
- size_t del_capacity;
- grpc_fd **dels;
/* Local cache of eventfds for workers */
grpc_cached_wakeup_fd *local_wakeup_cache;
};
@@ -728,7 +723,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->mu);
*mu = &pollset->mu;
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
- pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
pollset->kicked_without_pollers = 0;
@@ -737,14 +731,10 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset->kicked_without_pollers = 0;
pollset->fd_count = 0;
pollset->fd_capacity = 0;
- pollset->del_count = 0;
- pollset->del_capacity = 0;
pollset->fds = NULL;
- pollset->dels = NULL;
}
static void pollset_destroy(grpc_pollset *pollset) {
- GPR_ASSERT(pollset->in_flight_cbs == 0);
GPR_ASSERT(!pollset_has_workers(pollset));
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
while (pollset->local_wakeup_cache) {
@@ -754,17 +744,14 @@ static void pollset_destroy(grpc_pollset *pollset) {
pollset->local_wakeup_cache = next;
}
gpr_free(pollset->fds);
- gpr_free(pollset->dels);
gpr_mu_destroy(&pollset->mu);
}
static void pollset_reset(grpc_pollset *pollset) {
GPR_ASSERT(pollset->shutting_down);
- GPR_ASSERT(pollset->in_flight_cbs == 0);
GPR_ASSERT(!pollset_has_workers(pollset));
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
GPR_ASSERT(pollset->fd_count == 0);
- GPR_ASSERT(pollset->del_count == 0);
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
pollset->kicked_without_pollers = 0;
@@ -797,11 +784,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
for (i = 0; i < pollset->fd_count; i++) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
}
- for (i = 0; i < pollset->del_count; i++) {
- GRPC_FD_UNREF(pollset->dels[i], "multipoller_del");
- }
pollset->fd_count = 0;
- pollset->del_count = 0;
grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
}
@@ -841,13 +824,6 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_MARK("pollset_work.shutting_down", 0);
goto done;
}
- /* Give do_promote priority so we don't starve it out */
- if (pollset->in_flight_cbs) {
- GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0);
- gpr_mu_unlock(&pollset->mu);
- locked = 0;
- goto done;
- }
/* Start polling, and keep doing so while we're being asked to
re-evaluate our pollers (this allows poll() based pollers to
ensure they don't miss wakeups) */
@@ -867,7 +843,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
int timeout;
int r;
- size_t i, j, fd_count;
+ size_t i, fd_count;
nfds_t pfd_count;
/* TODO(ctiller): inline some elements to avoid an allocation */
grpc_fd_watcher *watchers;
@@ -887,11 +863,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pfds[1].events = POLLIN;
pfds[1].revents = 0;
for (i = 0; i < pollset->fd_count; i++) {
- int remove = fd_is_orphaned(pollset->fds[i]);
- for (j = 0; !remove && j < pollset->del_count; j++) {
- if (pollset->fds[i] == pollset->dels[j]) remove = 1;
- }
- if (remove) {
+ if (fd_is_orphaned(pollset->fds[i])) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
} else {
pollset->fds[fd_count++] = pollset->fds[i];
@@ -902,10 +874,6 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pfd_count++;
}
}
- for (j = 0; j < pollset->del_count; j++) {
- GRPC_FD_UNREF(pollset->dels[j], "multipoller_del");
- }
- pollset->del_count = 0;
pollset->fd_count = fd_count;
gpr_mu_unlock(&pollset->mu);
@@ -997,7 +965,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (pollset->shutting_down) {
if (pollset_has_workers(pollset)) {
pollset_kick(pollset, NULL);
- } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
+ } else if (!pollset->called_shutdown) {
pollset->called_shutdown = 1;
gpr_mu_unlock(&pollset->mu);
finish_shutdown(exec_ctx, pollset);
@@ -1027,7 +995,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (!pollset_has_workers(pollset)) {
grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
}
- if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
+ if (!pollset->called_shutdown &&
!pollset_has_workers(pollset)) {
pollset->called_shutdown = 1;
finish_shutdown(exec_ctx, pollset);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 54b76d8aa5..165e20a062 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -527,6 +527,8 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
if (request_id == -1) {
continue;
} else {
+ gpr_log(GPR_DEBUG, "queue lockfree, retries=%d chose=%d", i, cq_idx);
+
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
@@ -537,6 +539,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
}
/* no cq to take the request found: queue it on the slow list */
+ gpr_log(GPR_DEBUG, "queue slowpath");
gpr_mu_lock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
calld->state = PENDING;
@@ -1298,12 +1301,14 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
server->requested_calls[request_id] = *rc;
gpr_free(rc);
if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
+ gpr_log(GPR_DEBUG, "request against empty");
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head) != NULL) {
request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
if (request_id == -1) break;
+ gpr_log(GPR_DEBUG, "drain1");
rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
@@ -1324,6 +1329,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&server->mu_call);
}
gpr_mu_unlock(&server->mu_call);
+ } else {
+ gpr_log(GPR_DEBUG, "request lockfree");
}
return GRPC_CALL_OK;
}
@@ -1377,6 +1384,7 @@ grpc_call_error grpc_server_request_registered_call(
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
requested_call *rc = gpr_malloc(sizeof(*rc));
registered_method *rm = rmp;
+ gpr_log(GPR_DEBUG, "method: %s", rm->method);
GRPC_API_TRACE(
"grpc_server_request_registered_call("
"server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
@@ -1391,6 +1399,7 @@ grpc_call_error grpc_server_request_registered_call(
break;
}
}
+ gpr_log(GPR_DEBUG, "cq_idx=%d, cq_count=%d", cq_idx, server->cq_count);
if (cq_idx == server->cq_count) {
gpr_free(rc);
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;