diff options
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/fd_posix.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 10 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 12 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_posix.c | 16 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_windows.c | 7 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server.h | 6 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 33 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 34 | ||||
-rw-r--r-- | src/core/iomgr/tcp_windows.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.c | 20 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.h | 12 |
12 files changed, 85 insertions, 74 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 2d08a77a70..38a543e36e 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -213,10 +213,9 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, const char *reason) { fd->on_done_closure = on_done; shutdown(fd->fd, SHUT_RDWR); - REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ gpr_mu_lock(&fd->watcher_mu); + REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ if (!has_watchers(fd)) { - GPR_ASSERT(!fd->closed); fd->closed = 1; close(fd->fd); if (fd->on_done_closure) { diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 1dd03992ae..d6ca5d1f71 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -34,16 +34,18 @@ #include "src/core/iomgr/iomgr.h" #include <stdlib.h> +#include <string.h> -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/alarm_internal.h" -#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/thd.h> +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/alarm_internal.h" +#include "src/core/support/string.h" + static gpr_mu g_mu; static gpr_cv g_rcv; static grpc_iomgr_closure *g_cbs_head = NULL; @@ -179,6 +181,8 @@ void grpc_iomgr_shutdown(void) { } gpr_mu_unlock(&g_mu); + memset(&g_root_object, 0, sizeof(g_root_object)); + grpc_kick_poller(); gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future(GPR_CLOCK_REALTIME)); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 8f62ce2954..481bdc4ede 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -72,7 +72,7 @@ static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) { to this pollset whilst adding, but that should be benign. */ GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0); if (watcher.fd != NULL) { - ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET); ev.data.ptr = fd; err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); if (err < 0) { diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index dec2f5490f..f3e424e83c 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -187,6 +187,12 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, if (pollset->shutting_down) { goto done; } + if (pollset->in_flight_cbs) { + /* Give do_promote priority so we don't starve it out */ + gpr_mu_unlock(&pollset->mu); + gpr_mu_lock(&pollset->mu); + goto done; + } if (!pollset->kicked_without_pollers) { push_front_worker(pollset, worker); added_worker = 1; @@ -422,12 +428,6 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, int r; nfds_t nfds; - if (pollset->in_flight_cbs) { - /* Give do_promote priority so we don't starve it out */ - gpr_mu_unlock(&pollset->mu); - gpr_mu_lock(&pollset->mu); - return; - } fd = pollset->data.ptr; if (fd && grpc_fd_is_orphaned(fd)) { GRPC_FD_UNREF(fd, "basicpoll"); diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index c3668f6a92..07fa44ad37 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -54,6 +54,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> +extern int grpc_tcp_trace; + typedef struct { void (*cb)(void *arg, grpc_endpoint *tcp); void *cb_arg; @@ -92,6 +94,10 @@ error: static void tc_on_alarm(void *acp, int success) { int done; async_connect *ac = acp; + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: success=%d", ac->addr_str, + success); + } gpr_mu_lock(&ac->mu); if (ac->fd != NULL) { grpc_fd_shutdown(ac->fd); @@ -116,6 +122,11 @@ static void on_writable(void *acp, int success) { void *cb_arg = ac->cb_arg; grpc_fd *fd; + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_writable: success=%d", + ac->addr_str, success); + } + gpr_mu_lock(&ac->mu); GPR_ASSERT(ac->fd); fd = ac->fd; @@ -264,6 +275,11 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->write_closure.cb = on_writable; ac->write_closure.cb_arg = ac; + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", + ac->addr_str); + } + gpr_mu_lock(&ac->mu); grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 05198dbff4..6f57de0289 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -77,7 +77,6 @@ static void on_alarm(void *acp, int occured) { async_connect *ac = acp; gpr_mu_lock(&ac->mu); /* If the alarm didn't occur, it got cancelled. */ - gpr_log(GPR_DEBUG, "on_alarm: %p", ac->socket); if (ac->socket != NULL && occured) { grpc_winsocket_shutdown(ac->socket); } @@ -96,8 +95,6 @@ static void on_connect(void *acp, int from_iocp) { gpr_mu_lock(&ac->mu); - gpr_log(GPR_DEBUG, "on_connect: %p", ac->socket); - if (from_iocp) { DWORD transfered_bytes = 0; DWORD flags; @@ -124,7 +121,7 @@ static void on_connect(void *acp, int from_iocp) { notification request for the connection, and one timeout alert. */ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), void *arg, grpc_pollset_set *interested_parties, - const struct sockaddr *addr, int addr_len, + const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { SOCKET sock = INVALID_SOCKET; BOOL success; @@ -179,7 +176,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), socket = grpc_winsocket_create(sock, "client"); info = &socket->write_info; - success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped); + success = ConnectEx(sock, addr, (int)addr_len, NULL, 0, NULL, &info->overlapped); /* It wouldn't be unusual to get a success immediately. But we'll still get an IOCP notification, so let's ignore it. */ diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 5165f5c5ca..9303975781 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -39,7 +39,7 @@ /* Forward decl of grpc_tcp_server */ typedef struct grpc_tcp_server grpc_tcp_server; -/* New server callback: tcp is the newly connected tcp connection */ +/* Called for newly connected TCP connections. */ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep); /* Create a server, initially not bound to any ports */ @@ -47,8 +47,8 @@ grpc_tcp_server *grpc_tcp_server_create(void); /* Start listening to bound ports */ void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset **pollsets, - size_t pollset_count, grpc_tcp_server_cb cb, - void *cb_arg); + size_t pollset_count, + grpc_tcp_server_cb on_accept_cb, void *cb_arg); /* Add a port to the server, returning port number on success, or negative on failure. diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index bcbd0afe6b..f7b692a76b 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -98,8 +98,9 @@ static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { /* the overall server */ struct grpc_tcp_server { - grpc_tcp_server_cb cb; - void *cb_arg; + /* Called whenever accept() succeeds on a server port. */ + grpc_tcp_server_cb on_accept_cb; + void *on_accept_cb_arg; gpr_mu mu; @@ -132,8 +133,8 @@ grpc_tcp_server *grpc_tcp_server_create(void) { s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; - s->cb = NULL; - s->cb_arg = NULL; + s->on_accept_cb = NULL; + s->on_accept_cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; @@ -339,6 +340,10 @@ static void on_read(void *arg, int success) { addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr); gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str); + } + fdobj = grpc_fd_create(fd, name); /* TODO(ctiller): revise this when we have server-side sharding of channels -- we certainly should not be automatically adding every @@ -346,8 +351,8 @@ static void on_read(void *arg, int success) { for (i = 0; i < sp->server->pollset_count; i++) { grpc_pollset_add_fd(sp->server->pollsets[i], fdobj); } - sp->server->cb( - sp->server->cb_arg, + sp->server->on_accept_cb( + sp->server->on_accept_cb_arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); gpr_free(name); @@ -378,7 +383,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb && "must add ports before starting server"); + GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); /* append it to the list under a lock */ if (s->nports == s->port_capacity) { s->port_capacity *= 2; @@ -484,16 +489,16 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index) { return (index < s->nports) ? s->ports[index].fd : -1; } -void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, - size_t pollset_count, grpc_tcp_server_cb cb, - void *cb_arg) { +void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, size_t + pollset_count, grpc_tcp_server_cb on_accept_cb, void + *on_accept_cb_arg) { size_t i, j; - GPR_ASSERT(cb); + GPR_ASSERT(on_accept_cb); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb); + GPR_ASSERT(!s->on_accept_cb); GPR_ASSERT(s->active_ports == 0); - s->cb = cb; - s->cb_arg = cb_arg; + s->on_accept_cb = on_accept_cb; + s->on_accept_cb_arg = on_accept_cb_arg; s->pollsets = pollsets; s->pollset_count = pollset_count; for (i = 0; i < s->nports; i++) { diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index b513d854aa..c42e5e7527 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -71,8 +71,9 @@ typedef struct server_port { /* the overall server */ struct grpc_tcp_server { - grpc_tcp_server_cb cb; - void *cb_arg; + /* Called whenever accept() succeeds on a server port. */ + grpc_tcp_server_cb on_accept_cb; + void *on_accept_cb_arg; gpr_mu mu; @@ -95,8 +96,8 @@ grpc_tcp_server *grpc_tcp_server_create(void) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); gpr_mu_init(&s->mu); s->active_ports = 0; - s->cb = NULL; - s->cb_arg = NULL; + s->on_accept_cb = NULL; + s->on_accept_cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; @@ -154,7 +155,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, /* Prepare (bind) a recently-created socket for listening. */ static int prepare_socket(SOCKET sock, const struct sockaddr *addr, - int addr_len) { + size_t addr_len) { struct sockaddr_storage sockname_temp; socklen_t sockname_len; @@ -167,7 +168,7 @@ static int prepare_socket(SOCKET sock, const struct sockaddr *addr, goto error; } - if (bind(sock, addr, addr_len) == SOCKET_ERROR) { + if (bind(sock, addr, (int)addr_len) == SOCKET_ERROR) { char *addr_str; char *utf8_message = gpr_format_message(WSAGetLastError()); grpc_sockaddr_to_string(&addr_str, addr, 0); @@ -344,7 +345,7 @@ static void on_accept(void *arg, int from_iocp) { /* The only time we should call our callback, is where we successfully managed to accept a connection, and created an endpoint. */ - if (ep) sp->server->cb(sp->server->cb_arg, ep); + if (ep) sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep); /* As we were notified from the IOCP of one and exactly one accept, the former socked we created has now either been destroy or assigned to the new connection. We need to create a new one for the next @@ -353,7 +354,7 @@ static void on_accept(void *arg, int from_iocp) { } static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, - const struct sockaddr *addr, int addr_len) { + const struct sockaddr *addr, size_t addr_len) { server_port *sp; int port; int status; @@ -380,7 +381,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, port = prepare_socket(sock, addr, addr_len); if (port >= 0) { gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb && "must add ports before starting server"); + GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); /* append it to the list under a lock */ if (s->nports == s->port_capacity) { s->port_capacity *= 2; @@ -400,7 +401,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, } int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - int addr_len) { + size_t addr_len) { int allocated_port = -1; unsigned i; SOCKET sock; @@ -462,15 +463,16 @@ SOCKET grpc_tcp_server_get_socket(grpc_tcp_server *s, unsigned index) { } void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset, - size_t pollset_count, grpc_tcp_server_cb cb, - void *cb_arg) { + size_t pollset_count, + grpc_tcp_server_cb on_accept_cb, + void *on_accept_cb_arg) { size_t i; - GPR_ASSERT(cb); + GPR_ASSERT(on_accept_cb); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb); + GPR_ASSERT(!s->on_accept_cb); GPR_ASSERT(s->active_ports == 0); - s->cb = cb; - s->cb_arg = cb_arg; + s->on_accept_cb = on_accept_cb; + s->on_accept_cb_arg = on_accept_cb_arg; for (i = 0; i < s->nports; i++) { start_accept(s->ports + i); s->active_ports++; diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index fe3673c607..725c18e6cc 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -144,7 +144,7 @@ static int on_read(grpc_tcp *tcp, int success) { int do_abort = 0; if (success) { - if (socket->read_info.wsa_error != 0) { + if (socket->read_info.wsa_error != 0 && !tcp->shutting_down) { if (socket->read_info.wsa_error != WSAECONNRESET) { char *utf8_message = gpr_format_message(info->wsa_error); gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); @@ -153,7 +153,7 @@ static int on_read(grpc_tcp *tcp, int success) { success = 0; gpr_slice_unref(tcp->read_slice); } else { - if (info->bytes_transfered != 0) { + if (info->bytes_transfered != 0 && !tcp->shutting_down) { sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); gpr_slice_buffer_add(tcp->read_slices, sub); success = 1; diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index ed9eee8726..7957066598 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -94,9 +94,6 @@ static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { /* the overall server */ struct grpc_udp_server { - grpc_udp_server_cb cb; - void *cb_arg; - gpr_mu mu; gpr_cv cv; @@ -130,8 +127,6 @@ grpc_udp_server *grpc_udp_server_create(void) { s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; - s->cb = NULL; - s->cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; @@ -232,6 +227,11 @@ static int prepare_socket(int fd, const struct sockaddr *addr, goto error; } + if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1)) { + gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, + strerror(errno)); + } + get_local_ip = 1; rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, sizeof(get_local_ip)); @@ -282,7 +282,7 @@ static void on_read(void *arg, int success) { /* Tell the registered callback that data is available to read. */ GPR_ASSERT(sp->read_cb); - sp->read_cb(sp->fd, sp->server->cb, sp->server->cb_arg); + sp->read_cb(sp->fd); /* Re-arm the notification event so we get another chance to read. */ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); @@ -301,7 +301,6 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb && "must add ports before starting server"); /* append it to the list under a lock */ if (s->nports == s->port_capacity) { s->port_capacity *= 2; @@ -407,15 +406,10 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) { } void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets, - size_t pollset_count, - grpc_udp_server_cb new_transport_cb, void *cb_arg) { + size_t pollset_count) { size_t i, j; - GPR_ASSERT(new_transport_cb); gpr_mu_lock(&s->mu); - GPR_ASSERT(!s->cb); GPR_ASSERT(s->active_ports == 0); - s->cb = new_transport_cb; - s->cb_arg = cb_arg; s->pollsets = pollsets; for (i = 0; i < s->nports; i++) { for (j = 0; j < pollset_count; j++) { diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index 389f84ecca..c930e81cbc 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -39,21 +39,15 @@ /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; -/* New server callback: ep is the newly connected connection */ -typedef void (*grpc_udp_server_cb)(void *arg, grpc_endpoint *ep); - /* Called when data is available to read from the socket. */ -typedef void (*grpc_udp_server_read_cb)(int fd, - grpc_udp_server_cb new_transport_cb, - void *cb_arg); +typedef void (*grpc_udp_server_read_cb)(int fd); /* Create a server, initially not bound to any ports */ grpc_udp_server *grpc_udp_server_create(void); /* Start listening to bound ports */ -void grpc_udp_server_start(grpc_udp_server *server, grpc_pollset **pollsets, - size_t pollset_count, grpc_udp_server_cb cb, - void *cb_arg); +void grpc_udp_server_start(grpc_udp_server *udp_server, grpc_pollset **pollsets, + size_t pollset_count); int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); |