diff options
Diffstat (limited to 'src/core/lib/iomgr/tcp_windows.c')
-rw-r--r-- | src/core/lib/iomgr/tcp_windows.c | 33 |
1 files changed, 31 insertions, 2 deletions
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index a5f508a2c3..5782fe9bc3 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -109,14 +109,29 @@ typedef struct grpc_tcp { gpr_slice_buffer *write_slices; gpr_slice_buffer *read_slices; + grpc_resource_user resource_user; + /* The IO Completion Port runs from another thread. We need some mechanism to protect ourselves when requesting a shutdown. */ gpr_mu mu; int shutting_down; + gpr_atm resource_user_shutdown_count; + char *peer_string; } grpc_tcp; +static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + grpc_error *error); + +static void win_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx, + grpc_tcp *tcp) { + if (gpr_atm_full_fetch_add(&tcp->resource_user_shutdown_count, 1) == 0) { + grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user, + grpc_closure_create(win_unref_closure, tcp)); + } +} + static void tcp_free(grpc_tcp *tcp) { grpc_winsocket_destroy(tcp->socket); gpr_mu_destroy(&tcp->mu); @@ -155,6 +170,11 @@ static void tcp_unref(grpc_tcp *tcp) { static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif +static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + TCP_UNREF(arg, "resource_user"); +} + /* Asynchronous callback from the IOCP, or the background thread. */ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { grpc_tcp *tcp = tcpp; @@ -376,12 +396,14 @@ static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { callback. See the comments in on_read and on_write. */ tcp->shutting_down = 1; grpc_winsocket_shutdown(tcp->socket); + win_maybe_shutdown_resource_user(exec_ctx, tcp); gpr_mu_unlock(&tcp->mu); } static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; + win_maybe_shutdown_resource_user(exec_ctx, tcp); TCP_UNREF(tcp, "destroy"); } @@ -392,6 +414,11 @@ static char *win_get_peer(grpc_endpoint *ep) { static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; } +static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return &tcp->resource_user; +} + static grpc_endpoint_vtable vtable = {win_read, win_write, win_get_workqueue, @@ -399,18 +426,20 @@ static grpc_endpoint_vtable vtable = {win_read, win_add_to_pollset_set, win_shutdown, win_destroy, + win_get_resource_user, win_get_peer}; -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { +grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, grpc_resource_quota *resource_quota, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp)); tcp->base.vtable = &vtable; tcp->socket = socket; gpr_mu_init(&tcp->mu); - gpr_ref_init(&tcp->refcount, 1); + gpr_ref_init(&tcp->refcount, 2); grpc_closure_init(&tcp->on_read, on_read, tcp); grpc_closure_init(&tcp->on_write, on_write, tcp); tcp->peer_string = gpr_strdup(peer_string); + grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); |