diff options
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/exec_ctx.c | 8 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 7 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 32 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 22 | ||||
-rw-r--r-- | src/core/iomgr/wakeup_fd_eventfd.c | 6 |
5 files changed, 51 insertions, 24 deletions
diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c index f2914d376e..410b34c521 100644 --- a/src/core/iomgr/exec_ctx.c +++ b/src/core/iomgr/exec_ctx.c @@ -35,18 +35,24 @@ #include <grpc/support/log.h> +#include "src/core/profiling/timers.h" + int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { int did_something = 0; + GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); while (!grpc_closure_list_empty(exec_ctx->closure_list)) { grpc_closure *c = exec_ctx->closure_list.head; exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; while (c != NULL) { grpc_closure *next = c->next; - did_something = 1; + did_something++; + GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0); c->cb(exec_ctx, c->cb_arg, c->success); + GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0); c = next; } } + GPR_TIMER_END("grpc_exec_ctx_flush", 0); return did_something; } diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index ba9ba73f9d..2aafd21dfb 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -41,10 +41,11 @@ #include <sys/epoll.h> #include <unistd.h> -#include "src/core/iomgr/fd_posix.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/iomgr/fd_posix.h" +#include "src/core/support/block_annotate.h" +#include "src/core/profiling/timers.h" typedef struct wakeup_fd_hdl { grpc_wakeup_fd wakeup_fd; @@ -182,9 +183,11 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid even going into the blocking annotation if possible */ + GPR_TIMER_BEGIN("poll", 0); GRPC_SCHEDULING_START_BLOCKING_REGION; poll_rv = grpc_poll_function(pfds, 2, timeout_ms); GRPC_SCHEDULING_END_BLOCKING_REGION; + GPR_TIMER_END("poll", 0); if (poll_rv < 0) { if (errno != EINTR) { diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index d4362bb53a..c4e1987491 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -101,9 +101,12 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { void grpc_pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, gpr_uint32 flags) { + GPR_TIMER_BEGIN("grpc_pollset_kick_ext", 0); + /* pollset->mu already held */ if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { + GPR_TIMER_BEGIN("grpc_pollset_kick_ext.broadcast", 0); GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); for (specific_worker = p->root_worker.next; specific_worker != &p->root_worker; @@ -111,44 +114,50 @@ void grpc_pollset_kick_ext(grpc_pollset *p, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); } p->kicked_without_pollers = 1; - return; + GPR_TIMER_END("grpc_pollset_kick_ext.broadcast", 0); } else if (gpr_tls_get(&g_current_thread_worker) != (gpr_intptr)specific_worker) { + GPR_TIMER_MARK("different_thread_worker", 0); if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { specific_worker->reevaluate_polling_on_wakeup = 1; } grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); - return; } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { + GPR_TIMER_MARK("kick_yoself", 0); if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { specific_worker->reevaluate_polling_on_wakeup = 1; } grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); - return; } } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) { GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); + GPR_TIMER_MARK("kick_anonymous", 0); specific_worker = pop_front_worker(p); if (specific_worker != NULL) { if (gpr_tls_get(&g_current_thread_worker) == (gpr_intptr)specific_worker) { + GPR_TIMER_MARK("kick_anonymous_not_self", 0); push_back_worker(p, specific_worker); specific_worker = pop_front_worker(p); if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && gpr_tls_get(&g_current_thread_worker) == (gpr_intptr)specific_worker) { push_back_worker(p, specific_worker); - return; + specific_worker = NULL; } } - push_back_worker(p, specific_worker); - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); - return; + if (specific_worker != NULL) { + GPR_TIMER_MARK("finally_kick", 0); + push_back_worker(p, specific_worker); + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + } } else { + GPR_TIMER_MARK("kicked_no_pollers", 0); p->kicked_without_pollers = 1; - return; } } + + GPR_TIMER_END("grpc_pollset_kick_ext", 0); } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { @@ -229,6 +238,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int locked = 1; int queued_work = 0; int keep_polling = 0; + GPR_TIMER_BEGIN("grpc_pollset_work", 0); /* this must happen before we (potentially) drop pollset->mu */ worker->next = worker->prev = NULL; worker->reevaluate_polling_on_wakeup = 0; @@ -273,8 +283,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker); + GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, deadline, now); + GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; gpr_tls_set(&g_current_thread_poller, 0); gpr_tls_set(&g_current_thread_worker, 0); @@ -329,6 +341,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); } } + GPR_TIMER_END("grpc_pollset_work", 0); } void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -563,10 +576,11 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, even going into the blocking annotation if possible */ /* poll fd count (argument 2) is shortened by one if we have no events to poll on - such that it only includes the kicker */ + GPR_TIMER_BEGIN("poll", 0); GRPC_SCHEDULING_START_BLOCKING_REGION; r = grpc_poll_function(pfd, nfds, timeout); GRPC_SCHEDULING_END_BLOCKING_REGION; - GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); + GPR_TIMER_END("poll", 0); if (r < 0) { gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 4a57037a72..915553d509 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -180,7 +180,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { GPR_ASSERT(!tcp->finished_edge); GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC); GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC); - GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0); + GPR_TIMER_BEGIN("tcp_continue_read", 0); while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { gpr_slice_buffer_add_indexed(tcp->incoming_buffer, @@ -199,11 +199,11 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { msg.msg_controllen = 0; msg.msg_flags = 0; - GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0); + GPR_TIMER_BEGIN("recvmsg", 1); do { read_bytes = recvmsg(tcp->fd, &msg, 0); } while (read_bytes < 0 && errno == EINTR); - GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0); + GPR_TIMER_END("recvmsg", 0); if (read_bytes < 0) { /* NB: After calling call_read_cb a parallel call of the read handler may @@ -240,7 +240,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { TCP_UNREF(exec_ctx, tcp, "read"); } - GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0); + GPR_TIMER_END("tcp_continue_read", 0); } static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, @@ -316,12 +316,12 @@ static flush_result tcp_flush(grpc_tcp *tcp) { msg.msg_controllen = 0; msg.msg_flags = 0; - GRPC_TIMER_BEGIN(GRPC_PTAG_SENDMSG, 0); + GPR_TIMER_BEGIN("sendmsg", 1); do { /* TODO(klempner): Cork if this is a partial write */ sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS); } while (sent_length < 0 && errno == EINTR); - GRPC_TIMER_END(GRPC_PTAG_SENDMSG, 0); + GPR_TIMER_END("sendmsg", 0); if (sent_length < 0) { if (errno == EAGAIN) { @@ -370,17 +370,17 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, return; } - GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0); status = tcp_flush(tcp); if (status == FLUSH_PENDING) { grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { cb = tcp->write_cb; tcp->write_cb = NULL; + GPR_TIMER_BEGIN("tcp_handle_write.cb", 0); cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE); + GPR_TIMER_END("tcp_handle_write.cb", 0); TCP_UNREF(exec_ctx, tcp, "write"); } - GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0); } static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -399,11 +399,11 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } } - GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0); + GPR_TIMER_BEGIN("tcp_write", 0); GPR_ASSERT(tcp->write_cb == NULL); if (buf->length == 0) { - GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); + GPR_TIMER_END("tcp_write", 0); grpc_exec_ctx_enqueue(exec_ctx, cb, 1); return; } @@ -420,7 +420,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE); } - GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); + GPR_TIMER_END("tcp_write", 0); } static void tcp_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, diff --git a/src/core/iomgr/wakeup_fd_eventfd.c b/src/core/iomgr/wakeup_fd_eventfd.c index 48eb1afb3d..f67379e4fc 100644 --- a/src/core/iomgr/wakeup_fd_eventfd.c +++ b/src/core/iomgr/wakeup_fd_eventfd.c @@ -39,9 +39,11 @@ #include <sys/eventfd.h> #include <unistd.h> -#include "src/core/iomgr/wakeup_fd_posix.h" #include <grpc/support/log.h> +#include "src/core/iomgr/wakeup_fd_posix.h" +#include "src/core/profiling/timers.h" + static void eventfd_create(grpc_wakeup_fd* fd_info) { int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); /* TODO(klempner): Handle failure more gracefully */ @@ -60,9 +62,11 @@ static void eventfd_consume(grpc_wakeup_fd* fd_info) { static void eventfd_wakeup(grpc_wakeup_fd* fd_info) { int err; + GPR_TIMER_BEGIN("eventfd_wakeup", 0); do { err = eventfd_write(fd_info->read_fd, 1); } while (err < 0 && errno == EINTR); + GPR_TIMER_END("eventfd_wakeup", 0); } static void eventfd_destroy(grpc_wakeup_fd* fd_info) { |