diff options
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 5 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_posix.c | 6 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_windows.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 9 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.c | 8 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.h | 8 |
8 files changed, 41 insertions, 5 deletions
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 5626b08a47..a4293eb4a4 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -42,6 +42,7 @@ #include <unistd.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -179,7 +180,9 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( pfds[1].events = POLLIN; pfds[1].revents = 0; + GRPC_SCHEDULING_START_BLOCKING_REGION; poll_rv = grpc_poll_function(pfds, 2, timeout_ms); + GRPC_SCHEDULING_END_BLOCKING_REGION; if (poll_rv < 0) { if (errno != EINTR) { @@ -193,6 +196,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( } if (pfds[1].revents) { do { + /* The following epoll_wait never blocks; it has a timeout of 0 */ ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); if (ep_rv < 0) { if (errno != EINTR) { diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 2a18cedb33..44031b8ef6 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -44,6 +44,7 @@ #include "src/core/iomgr/fd_posix.h" #include "src/core/iomgr/iomgr_internal.h" +#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> @@ -147,7 +148,9 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( POLLOUT, &watchers[i]); } + GRPC_SCHEDULING_START_BLOCKING_REGION; r = grpc_poll_function(pfds, pfd_count, timeout); + GRPC_SCHEDULING_END_BLOCKING_REGION; for (i = 1; i < pfd_count; i++) { grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN, diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 43ad22c16d..e80963e0ea 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -47,6 +47,7 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/profiling/timers.h" +#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> @@ -118,11 +119,13 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { void grpc_pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); + gpr_tls_init(&g_current_thread_worker); grpc_wakeup_fd_global_init(); } void grpc_pollset_global_shutdown(void) { gpr_tls_destroy(&g_current_thread_poller); + gpr_tls_destroy(&g_current_thread_worker); grpc_wakeup_fd_global_destroy(); } @@ -468,7 +471,9 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, /* poll fd count (argument 2) is shortened by one if we have no events to poll on - such that it only includes the kicker */ + GRPC_SCHEDULING_START_BLOCKING_REGION; r = grpc_poll_function(pfd, nfds, timeout); + GRPC_SCHEDULING_END_BLOCKING_REGION; GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); if (fd) { diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index 99bd566e10..ed0a93fcc9 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -43,6 +43,7 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/support/block_annotate.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> @@ -103,13 +104,18 @@ grpc_resolved_addresses *grpc_blocking_resolve_address( hints.ai_socktype = SOCK_STREAM; /* stream socket */ hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ + GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); + GRPC_SCHEDULING_END_BLOCKING_REGION; + if (s != 0) { /* Retry if well-known service name is recognized */ char *svc[][2] = {{"http", "80"}, {"https", "443"}}; for (i = 0; i < GPR_ARRAY_SIZE(svc); i++) { if (strcmp(port, svc[i][0]) == 0) { + GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, svc[i][1], &hints, &result); + GRPC_SCHEDULING_END_BLOCKING_REGION; break; } } diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c index fcd80b3912..82a5602996 100644 --- a/src/core/iomgr/resolve_address_windows.c +++ b/src/core/iomgr/resolve_address_windows.c @@ -42,6 +42,7 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/support/block_annotate.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> @@ -88,7 +89,9 @@ grpc_resolved_addresses *grpc_blocking_resolve_address( hints.ai_socktype = SOCK_STREAM; /* stream socket */ hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ + GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); + GRPC_SCHEDULING_END_BLOCKING_REGION; if (s != 0) { gpr_log(GPR_ERROR, "getaddrinfo: %s", gai_strerror(s)); goto done; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index c98f0125f8..54ebad7dbc 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -78,6 +78,9 @@ typedef struct { size_t slice_size; gpr_refcount refcount; + /* garbage after the last read */ + gpr_slice_buffer last_read_buffer; + gpr_slice_buffer *incoming_buffer; gpr_slice_buffer *outgoing_buffer; /** slice within outgoing_buffer to write next */ @@ -106,6 +109,7 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_fd_orphan(exec_ctx, tcp->em_fd, NULL, "tcp_unref_orphan"); + gpr_slice_buffer_destroy(&tcp->last_read_buffer); gpr_free(tcp->peer_string); gpr_free(tcp); } @@ -226,7 +230,8 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if ((size_t)read_bytes < tcp->incoming_buffer->length) { gpr_slice_buffer_trim_end( tcp->incoming_buffer, - tcp->incoming_buffer->length - (size_t)read_bytes); + tcp->incoming_buffer->length - (size_t)read_bytes, + &tcp->last_read_buffer); } else if (tcp->iov_size < MAX_READ_IOVEC) { ++tcp->iov_size; } @@ -259,6 +264,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->read_cb = cb; tcp->incoming_buffer = incoming_buffer; gpr_slice_buffer_reset_and_unref(incoming_buffer); + gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = 0; @@ -457,6 +463,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, tcp->read_closure.cb_arg = tcp; tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; + gpr_slice_buffer_init(&tcp->last_read_buffer); return &tcp->base; } diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index ae7c889d0c..9baaf1edc7 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -117,6 +117,8 @@ struct grpc_udp_server { grpc_pollset **pollsets; /* number of pollsets in the pollsets array */ size_t pollset_count; + /* The parent grpc server */ + grpc_server* grpc_server; }; grpc_udp_server *grpc_udp_server_create(void) { @@ -276,7 +278,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { /* Tell the registered callback that data is available to read. */ GPR_ASSERT(sp->read_cb); - sp->read_cb(sp->fd); + sp->read_cb(sp->fd, sp->server->grpc_server); /* Re-arm the notification event so we get another chance to read. */ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); @@ -402,11 +404,13 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) { } void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, - grpc_pollset **pollsets, size_t pollset_count) { + grpc_pollset **pollsets, size_t pollset_count, + grpc_server *server) { size_t i, j; gpr_mu_lock(&s->mu); GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; + s->grpc_server = server; for (i = 0; i < s->nports; i++) { for (j = 0; j < pollset_count; j++) { grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd); diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index 76082d7761..8e3abae864 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -36,18 +36,22 @@ #include "src/core/iomgr/endpoint.h" +/* Forward decl of grpc_server */ +typedef struct grpc_server grpc_server; + /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; /* Called when data is available to read from the socket. */ -typedef void (*grpc_udp_server_read_cb)(int fd); +typedef void (*grpc_udp_server_read_cb)(int fd, grpc_server *server); /* Create a server, initially not bound to any ports */ grpc_udp_server *grpc_udp_server_create(void); /* Start listening to bound ports */ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server, - grpc_pollset **pollsets, size_t pollset_count); + grpc_pollset **pollsets, size_t pollset_count, + grpc_server *server); int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); |