diff options
Diffstat (limited to 'src/core/iomgr/tcp_server_windows.c')
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 49 |
1 files changed, 22 insertions, 27 deletions
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index be2c0055db..9881a41152 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -67,6 +67,8 @@ typedef struct server_port { /* The cached AcceptEx for that port. */ LPFN_ACCEPTEX AcceptEx; int shutting_down; + /* closure for socket notification of accept being ready */ + grpc_closure on_accept; } server_port; /* the overall server */ @@ -86,8 +88,7 @@ struct grpc_tcp_server { size_t port_capacity; /* shutdown callback */ - void (*shutdown_complete)(void *); - void *shutdown_complete_arg; + grpc_closure *shutdown_complete; }; /* Public function. Allocates the proper data structures to hold a @@ -107,10 +108,10 @@ grpc_tcp_server *grpc_tcp_server_create(void) { static void dont_care_about_shutdown_completion(void *arg) {} -static void finish_shutdown(grpc_tcp_server *s) { +static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { size_t i; - s->shutdown_complete(s->shutdown_complete_arg); + grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1); /* Now that the accepts have been aborted, we can destroy the sockets. The IOCP won't get notified on these, so we can flag them as already @@ -124,17 +125,13 @@ static void finish_shutdown(grpc_tcp_server *s) { } /* Public function. Stops and destroys a grpc_tcp_server. */ -void grpc_tcp_server_destroy(grpc_tcp_server *s, - void (*shutdown_complete)(void *shutdown_done_arg), - void *shutdown_complete_arg) { +void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, + grpc_closure *shutdown_complete) { size_t i; int immediately_done = 0; gpr_mu_lock(&s->mu); - s->shutdown_complete = shutdown_complete - ? shutdown_complete - : dont_care_about_shutdown_completion; - s->shutdown_complete_arg = shutdown_complete_arg; + s->shutdown_complete = shutdown_complete; /* First, shutdown all fd's. This will queue abortion calls for all of the pending accepts due to the normal operation mechanism. */ @@ -149,7 +146,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, gpr_mu_unlock(&s->mu); if (immediately_done) { - finish_shutdown(s); + finish_shutdown(exec_ctx, s); } } @@ -201,7 +198,7 @@ error: return -1; } -static void decrement_active_ports_and_notify(server_port *sp) { +static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx, server_port *sp) { int notify = 0; sp->shutting_down = 0; gpr_mu_lock(&sp->server->mu); @@ -212,16 +209,13 @@ static void decrement_active_ports_and_notify(server_port *sp) { } gpr_mu_unlock(&sp->server->mu); if (notify) { - finish_shutdown(sp->server); + finish_shutdown(exec_ctx, sp->server); } } -/* start_accept will reference that for the IOCP notification request. */ -static void on_accept(void *arg, int from_iocp); - /* In order to do an async accept, we need to create a socket first which will be the one assigned to the new incoming connection. */ -static void start_accept(server_port *port) { +static void start_accept(grpc_exec_ctx *exec_ctx, server_port *port) { SOCKET sock = INVALID_SOCKET; char *message; char *utf8_message; @@ -260,7 +254,7 @@ static void start_accept(server_port *port) { /* We're ready to do the accept. Calling grpc_socket_notify_on_read may immediately process an accept that happened in the meantime. */ port->new_socket = sock; - grpc_socket_notify_on_read(port->socket, on_accept, port); + grpc_socket_notify_on_read(exec_ctx, port->socket, &port->on_accept); return; failure: @@ -270,7 +264,7 @@ failure: change is not seen by on_accept and we proceed to trying new accept, but we fail there because the listening port has been closed in the meantime. */ - decrement_active_ports_and_notify(port); + decrement_active_ports_and_notify(exec_ctx, port); return; } utf8_message = gpr_format_message(WSAGetLastError()); @@ -280,7 +274,7 @@ failure: } /* Event manager callback when reads are ready. */ -static void on_accept(void *arg, int from_iocp) { +static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { server_port *sp = arg; SOCKET sock = sp->new_socket; grpc_winsocket_callback_info *info = &sp->socket->read_info; @@ -310,7 +304,7 @@ static void on_accept(void *arg, int from_iocp) { if (sp->shutting_down) { /* During the shutdown case, we ARE expecting an error. So that's well, and we can wake up the shutdown thread. */ - decrement_active_ports_and_notify(sp); + decrement_active_ports_and_notify(exec_ctx, sp); return; } else { char *utf8_message = gpr_format_message(WSAGetLastError()); @@ -346,12 +340,12 @@ static void on_accept(void *arg, int from_iocp) { /* The only time we should call our callback, is where we successfully managed to accept a connection, and created an endpoint. */ - if (ep) sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep); + if (ep) sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep); /* As we were notified from the IOCP of one and exactly one accept, the former socked we created has now either been destroy or assigned to the new connection. We need to create a new one for the next connection. */ - start_accept(sp); + start_accept(exec_ctx, sp); } static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, @@ -402,7 +396,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock, } int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - int addr_len) { + size_t addr_len) { int allocated_port = -1; unsigned i; SOCKET sock; @@ -464,7 +458,8 @@ grpc_tcp_server_get_socket(grpc_tcp_server *s, unsigned index) { return (index < s->nports) ? s->ports[index].socket->socket : INVALID_SOCKET; } -void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset, +void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, + grpc_pollset **pollset, size_t pollset_count, grpc_tcp_server_cb on_accept_cb, void *on_accept_cb_arg) { @@ -476,7 +471,7 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset, s->on_accept_cb = on_accept_cb; s->on_accept_cb_arg = on_accept_cb_arg; for (i = 0; i < s->nports; i++) { - start_accept(s->ports + i); + start_accept(exec_ctx, s->ports + i); s->active_ports++; } gpr_mu_unlock(&s->mu); |