diff options
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/iocp_windows.c | 34 | ||||
-rw-r--r-- | src/core/iomgr/iocp_windows.h | 1 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 11 | ||||
-rw-r--r-- | src/core/iomgr/socket_windows.c | 7 | ||||
-rw-r--r-- | src/core/iomgr/socket_windows.h | 6 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server.h | 6 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 60 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 36 |
8 files changed, 120 insertions, 41 deletions
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 8b019e8049..aec626509a 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -52,10 +52,11 @@ static OVERLAPPED g_iocp_custom_overlap; static gpr_event g_shutdown_iocp; static gpr_event g_iocp_done; +static gpr_atm g_orphans = 0; static HANDLE g_iocp; -static int do_iocp_work() { +static void do_iocp_work() { BOOL success; DWORD bytes = 0; DWORD flags = 0; @@ -71,14 +72,14 @@ static int do_iocp_work() { gpr_time_to_millis(wait_time)); if (!success && !overlapped) { /* The deadline got attained. */ - return 0; + return; } GPR_ASSERT(completion_key && overlapped); if (overlapped == &g_iocp_custom_overlap) { if (completion_key == (ULONG_PTR) &g_iocp_kick_token) { /* We were awoken from a kick. */ gpr_log(GPR_DEBUG, "do_iocp_work - got a kick"); - return 1; + return; } gpr_log(GPR_ERROR, "Unknown custom completion key."); abort(); @@ -97,8 +98,13 @@ static int do_iocp_work() { } success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes, FALSE, &flags); - gpr_log(GPR_DEBUG, "bytes: %u, flags: %u - op %s", bytes, flags, - success ? "succeeded" : "failed"); + gpr_log(GPR_DEBUG, "bytes: %u, flags: %u - op %s %s", bytes, flags, + success ? "succeeded" : "failed", socket->orphan ? "orphan" : ""); + if (socket->orphan) { + grpc_winsocket_destroy(socket); + gpr_atm_full_fetch_add(&g_orphans, -1); + return; + } info->bytes_transfered = bytes; info->wsa_error = success ? 0 : WSAGetLastError(); GPR_ASSERT(overlapped == &info->overlapped); @@ -113,12 +119,10 @@ static int do_iocp_work() { } gpr_mu_unlock(&socket->state_mu); if (f) f(opaque, 1); - - return 1; } static void iocp_loop(void *p) { - while (!gpr_event_get(&g_shutdown_iocp)) { + while (gpr_atm_acq_load(&g_orphans) || !gpr_event_get(&g_shutdown_iocp)) { grpc_maybe_call_delayed_callbacks(NULL, 1); do_iocp_work(); } @@ -138,13 +142,19 @@ void grpc_iocp_init(void) { gpr_thd_new(&id, iocp_loop, NULL, NULL); } -void grpc_iocp_shutdown(void) { +void grpc_iocp_kick(void) { BOOL success; - gpr_event_set(&g_shutdown_iocp, (void *)1); + success = PostQueuedCompletionStatus(g_iocp, 0, (ULONG_PTR) &g_iocp_kick_token, &g_iocp_custom_overlap); GPR_ASSERT(success); +} + +void grpc_iocp_shutdown(void) { + BOOL success; + gpr_event_set(&g_shutdown_iocp, (void *)1); + grpc_iocp_kick(); gpr_event_wait(&g_iocp_done, gpr_inf_future); success = CloseHandle(g_iocp); GPR_ASSERT(success); @@ -166,6 +176,10 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) { GPR_ASSERT(ret == g_iocp); } +void grpc_iocp_socket_orphan(grpc_winsocket *socket) { + gpr_atm_full_fetch_add(&g_orphans, 1); +} + static void socket_notify_on_iocp(grpc_winsocket *socket, void(*cb)(void *, int), void *opaque, grpc_winsocket_callback_info *info) { diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h index 33133193a1..fa3f5eee10 100644 --- a/src/core/iomgr/iocp_windows.h +++ b/src/core/iomgr/iocp_windows.h @@ -42,6 +42,7 @@ void grpc_iocp_init(void); void grpc_iocp_shutdown(void); void grpc_iocp_add_socket(grpc_winsocket *); +void grpc_iocp_socket_orphan(grpc_winsocket *); void grpc_socket_notify_on_write(grpc_winsocket *, void(*cb)(void *, int success), void *opaque); diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 058685b295..d0e6706fbd 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -117,7 +117,16 @@ void grpc_iomgr_shutdown(void) { gpr_mu_lock(&g_mu); } if (g_refs) { - if (gpr_cv_wait(&g_rcv, &g_mu, shutdown_deadline) && g_cbs_head == NULL) { + int timeout = 0; + gpr_timespec short_deadline = gpr_time_add(gpr_now(), + gpr_time_from_millis(100)); + while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) { + if (gpr_time_cmp(gpr_now(), shutdown_deadline) > 0) { + timeout = 1; + break; + } + } + if (timeout) { gpr_log(GPR_DEBUG, "Failed to free %d iomgr objects before shutdown deadline: " "memory leaks are likely", diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index 99f38b0e03..22dad41783 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -55,7 +55,7 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) { return r; } -void shutdown_op(grpc_winsocket_callback_info *info) { +static void shutdown_op(grpc_winsocket_callback_info *info) { if (!info->cb) return; grpc_iomgr_add_delayed_callback(info->cb, info->opaque, 0); } @@ -68,8 +68,13 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket) { void grpc_winsocket_orphan(grpc_winsocket *socket) { gpr_log(GPR_DEBUG, "grpc_winsocket_orphan"); + grpc_iocp_socket_orphan(socket); + socket->orphan = 1; grpc_iomgr_unref(); closesocket(socket->socket); +} + +void grpc_winsocket_destroy(grpc_winsocket *socket) { gpr_mu_destroy(&socket->state_mu); gpr_free(socket); } diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h index d4776ab10f..cbae91692c 100644 --- a/src/core/iomgr/socket_windows.h +++ b/src/core/iomgr/socket_windows.h @@ -57,12 +57,13 @@ typedef struct grpc_winsocket_callback_info { typedef struct grpc_winsocket { SOCKET socket; - int added_to_iocp; - grpc_winsocket_callback_info write_info; grpc_winsocket_callback_info read_info; gpr_mu state_mu; + + int added_to_iocp; + int orphan; } grpc_winsocket; /* Create a wrapped windows handle. @@ -71,5 +72,6 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket); void grpc_winsocket_shutdown(grpc_winsocket *socket); void grpc_winsocket_orphan(grpc_winsocket *socket); +void grpc_winsocket_destroy(grpc_winsocket *socket); #endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */ diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 68ee85c5a7..66bb3ef701 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -71,6 +71,8 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, up when grpc_tcp_server_destroy is called. */ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index); -void grpc_tcp_server_destroy(grpc_tcp_server *server); +void grpc_tcp_server_destroy(grpc_tcp_server *server, + void (*shutdown_done)(void *shutdown_done_arg), + void *shutdown_done_arg); -#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */ +#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */ diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 90b7eb451d..895f85fc68 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -102,12 +102,18 @@ struct grpc_tcp_server { gpr_cv cv; /* active port count: how many ports are actually still listening */ - int active_ports; + size_t active_ports; + /* destroyed port count: how many ports are completely destroyed */ + size_t destroyed_ports; /* all listening ports */ server_port *ports; size_t nports; size_t port_capacity; + + /* shutdown callback */ + void (*shutdown_complete)(void *); + void *shutdown_complete_arg; }; grpc_tcp_server *grpc_tcp_server_create(void) { @@ -115,6 +121,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) { gpr_mu_init(&s->mu); gpr_cv_init(&s->cv); s->active_ports = 0; + s->destroyed_ports = 0; s->cb = NULL; s->cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); @@ -123,29 +130,64 @@ grpc_tcp_server *grpc_tcp_server_create(void) { return s; } -void grpc_tcp_server_destroy(grpc_tcp_server *s) { +static void finish_shutdown(grpc_tcp_server *s) { + s->shutdown_complete(s->shutdown_complete_arg); + + gpr_mu_destroy(&s->mu); + gpr_cv_destroy(&s->cv); + + gpr_free(s->ports); + gpr_free(s); +} + +static void destroyed_port(void *server, int success) { + grpc_tcp_server *s = server; + gpr_mu_lock(&s->mu); + s->destroyed_ports++; + if (s->destroyed_ports == s->nports) { + gpr_mu_unlock(&s->mu); + finish_shutdown(s); + } else { + gpr_mu_unlock(&s->mu); + } +} + +static void dont_care_about_shutdown_completion(void *ignored) {} + +void grpc_tcp_server_destroy( + grpc_tcp_server *s, void (*shutdown_complete)(void *shutdown_complete_arg), + void *shutdown_complete_arg) { size_t i; gpr_mu_lock(&s->mu); + + s->shutdown_complete = shutdown_complete + ? shutdown_complete + : dont_care_about_shutdown_completion; + s->shutdown_complete_arg = shutdown_complete_arg; + /* shutdown all fd's */ for (i = 0; i < s->nports; i++) { grpc_fd_shutdown(s->ports[i].emfd); } /* wait while that happens */ + /* TODO(ctiller): make this asynchronous also */ while (s->active_ports) { gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); } gpr_mu_unlock(&s->mu); /* delete ALL the things */ - for (i = 0; i < s->nports; i++) { - server_port *sp = &s->ports[i]; - if (sp->addr.sockaddr.sa_family == AF_UNIX) { - unlink_if_unix_domain_socket(&sp->addr.un); + if (s->nports) { + for (i = 0; i < s->nports; i++) { + server_port *sp = &s->ports[i]; + if (sp->addr.sockaddr.sa_family == AF_UNIX) { + unlink_if_unix_domain_socket(&sp->addr.un); + } + grpc_fd_orphan(sp->emfd, destroyed_port, s); } - grpc_fd_orphan(sp->emfd, NULL, NULL); + } else { + finish_shutdown(s); } - gpr_free(s->ports); - gpr_free(s); } /* get max listen queue size on linux */ diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 0c3ab1dc91..a43d5670a4 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -92,7 +92,9 @@ grpc_tcp_server *grpc_tcp_server_create(void) { return s; } -void grpc_tcp_server_destroy(grpc_tcp_server *s) { +void grpc_tcp_server_destroy(grpc_tcp_server *s, + void (*shutdown_done)(void *shutdown_done_arg), + void *shutdown_done_arg) { size_t i; gpr_mu_lock(&s->mu); /* shutdown all fd's */ @@ -112,11 +114,15 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s) { } gpr_free(s->ports); gpr_free(s); + + if (shutdown_done) { + shutdown_done(shutdown_done_arg); + } } /* Prepare a recently-created socket for listening. */ -static int prepare_socket(SOCKET sock, - const struct sockaddr *addr, int addr_len) { +static int prepare_socket(SOCKET sock, const struct sockaddr *addr, + int addr_len) { struct sockaddr_storage sockname_temp; socklen_t sockname_len; @@ -147,15 +153,15 @@ static int prepare_socket(SOCKET sock, } sockname_len = sizeof(sockname_temp); - if (getsockname(sock, (struct sockaddr *) &sockname_temp, &sockname_len) - == SOCKET_ERROR) { + if (getsockname(sock, (struct sockaddr *)&sockname_temp, &sockname_len) == + SOCKET_ERROR) { char *utf8_message = gpr_format_message(WSAGetLastError()); gpr_log(GPR_ERROR, "getsockname: %s", utf8_message); gpr_free(utf8_message); goto error; } - return grpc_sockaddr_get_port((struct sockaddr *) &sockname_temp); + return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); error: if (sock != INVALID_SOCKET) closesocket(sock); @@ -221,8 +227,7 @@ static void on_accept(void *arg, int success) { DWORD transfered_bytes = 0; DWORD flags; BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, - &transfered_bytes, FALSE, - &flags); + &transfered_bytes, FALSE, &flags); if (!wsa_success) { char *utf8_message = gpr_format_message(WSAGetLastError()); gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message); @@ -257,9 +262,9 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, if (sock == INVALID_SOCKET) return -1; - status = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, - &guid, sizeof(guid), &AcceptEx, sizeof(AcceptEx), - &ioctl_num_bytes, NULL, NULL); + status = + WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), + &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL); if (status != 0) { char *utf8_message = gpr_format_message(WSAGetLastError()); @@ -307,9 +312,8 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, for (i = 0; i < s->nports; i++) { sockname_len = sizeof(sockname_temp); if (0 == getsockname(s->ports[i].socket->socket, - (struct sockaddr *) &sockname_temp, - &sockname_len)) { - port = grpc_sockaddr_get_port((struct sockaddr *) &sockname_temp); + (struct sockaddr *)&sockname_temp, &sockname_len)) { + port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); if (port > 0) { allocated_addr = malloc(addr_len); memcpy(allocated_addr, addr, addr_len); @@ -330,7 +334,7 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, if (grpc_sockaddr_is_wildcard(addr, &port)) { grpc_sockaddr_make_wildcard6(port, &wildcard); - addr = (struct sockaddr *) &wildcard; + addr = (struct sockaddr *)&wildcard; addr_len = sizeof(wildcard); } @@ -369,4 +373,4 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset, gpr_mu_unlock(&s->mu); } -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GPR_WINSOCK_SOCKET */ |