diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.c | 40 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 9 |
2 files changed, 13 insertions, 36 deletions
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 4d2ec5eb98..4f64d31c97 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -183,7 +183,6 @@ struct grpc_pollset_worker { struct grpc_pollset { gpr_mu mu; grpc_pollset_worker root_worker; - int in_flight_cbs; int shutting_down; int called_shutdown; int kicked_without_pollers; @@ -193,10 +192,6 @@ struct grpc_pollset { size_t fd_count; size_t fd_capacity; grpc_fd **fds; - /* fds that have been removed from the pollset explicitly */ - size_t del_count; - size_t del_capacity; - grpc_fd **dels; /* Local cache of eventfds for workers */ grpc_cached_wakeup_fd *local_wakeup_cache; }; @@ -728,7 +723,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->mu); *mu = &pollset->mu; pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; - pollset->in_flight_cbs = 0; pollset->shutting_down = 0; pollset->called_shutdown = 0; pollset->kicked_without_pollers = 0; @@ -737,14 +731,10 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->kicked_without_pollers = 0; pollset->fd_count = 0; pollset->fd_capacity = 0; - pollset->del_count = 0; - pollset->del_capacity = 0; pollset->fds = NULL; - pollset->dels = NULL; } static void pollset_destroy(grpc_pollset *pollset) { - GPR_ASSERT(pollset->in_flight_cbs == 0); GPR_ASSERT(!pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); while (pollset->local_wakeup_cache) { @@ -754,17 +744,14 @@ static void pollset_destroy(grpc_pollset *pollset) { pollset->local_wakeup_cache = next; } gpr_free(pollset->fds); - gpr_free(pollset->dels); gpr_mu_destroy(&pollset->mu); } static void pollset_reset(grpc_pollset *pollset) { GPR_ASSERT(pollset->shutting_down); - GPR_ASSERT(pollset->in_flight_cbs == 0); GPR_ASSERT(!pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); GPR_ASSERT(pollset->fd_count == 0); - GPR_ASSERT(pollset->del_count == 0); pollset->shutting_down = 0; pollset->called_shutdown = 0; pollset->kicked_without_pollers = 0; @@ -797,11 +784,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { for (i = 0; i < pollset->fd_count; i++) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } - for (i = 0; i < pollset->del_count; i++) { - GRPC_FD_UNREF(pollset->dels[i], "multipoller_del"); - } pollset->fd_count = 0; - pollset->del_count = 0; grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); } @@ -841,13 +824,6 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GPR_TIMER_MARK("pollset_work.shutting_down", 0); goto done; } - /* Give do_promote priority so we don't starve it out */ - if (pollset->in_flight_cbs) { - GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0); - gpr_mu_unlock(&pollset->mu); - locked = 0; - goto done; - } /* Start polling, and keep doing so while we're being asked to re-evaluate our pollers (this allows poll() based pollers to ensure they don't miss wakeups) */ @@ -867,7 +843,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int timeout; int r; - size_t i, j, fd_count; + size_t i, fd_count; nfds_t pfd_count; /* TODO(ctiller): inline some elements to avoid an allocation */ grpc_fd_watcher *watchers; @@ -887,11 +863,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pfds[1].events = POLLIN; pfds[1].revents = 0; for (i = 0; i < pollset->fd_count; i++) { - int remove = fd_is_orphaned(pollset->fds[i]); - for (j = 0; !remove && j < pollset->del_count; j++) { - if (pollset->fds[i] == pollset->dels[j]) remove = 1; - } - if (remove) { + if (fd_is_orphaned(pollset->fds[i])) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } else { pollset->fds[fd_count++] = pollset->fds[i]; @@ -902,10 +874,6 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pfd_count++; } } - for (j = 0; j < pollset->del_count; j++) { - GRPC_FD_UNREF(pollset->dels[j], "multipoller_del"); - } - pollset->del_count = 0; pollset->fd_count = fd_count; gpr_mu_unlock(&pollset->mu); @@ -997,7 +965,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->shutting_down) { if (pollset_has_workers(pollset)) { pollset_kick(pollset, NULL); - } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { + } else if (!pollset->called_shutdown) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); finish_shutdown(exec_ctx, pollset); @@ -1027,7 +995,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (!pollset_has_workers(pollset)) { grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); } - if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && + if (!pollset->called_shutdown && !pollset_has_workers(pollset)) { pollset->called_shutdown = 1; finish_shutdown(exec_ctx, pollset); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 54b76d8aa5..165e20a062 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -527,6 +527,8 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) { if (request_id == -1) { continue; } else { + gpr_log(GPR_DEBUG, "queue lockfree, retries=%d chose=%d", i, cq_idx); + gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); @@ -537,6 +539,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) { } /* no cq to take the request found: queue it on the slow list */ + gpr_log(GPR_DEBUG, "queue slowpath"); gpr_mu_lock(&server->mu_call); gpr_mu_lock(&calld->mu_state); calld->state = PENDING; @@ -1298,12 +1301,14 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, server->requested_calls[request_id] = *rc; gpr_free(rc); if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { + gpr_log(GPR_DEBUG, "request against empty"); /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); if (request_id == -1) break; + gpr_log(GPR_DEBUG, "drain1"); rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); gpr_mu_lock(&calld->mu_state); @@ -1324,6 +1329,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&server->mu_call); } gpr_mu_unlock(&server->mu_call); + } else { + gpr_log(GPR_DEBUG, "request lockfree"); } return GRPC_CALL_OK; } @@ -1377,6 +1384,7 @@ grpc_call_error grpc_server_request_registered_call( grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; requested_call *rc = gpr_malloc(sizeof(*rc)); registered_method *rm = rmp; + gpr_log(GPR_DEBUG, "method: %s", rm->method); GRPC_API_TRACE( "grpc_server_request_registered_call(" "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, " @@ -1391,6 +1399,7 @@ grpc_call_error grpc_server_request_registered_call( break; } } + gpr_log(GPR_DEBUG, "cq_idx=%d, cq_count=%d", cq_idx, server->cq_count); if (cq_idx == server->cq_count) { gpr_free(rc); error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; |