aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/lib/iomgr/endpoint_pair_windows.c9
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c30
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c37
-rw-r--r--src/core/lib/iomgr/tcp_windows.c35
-rw-r--r--src/core/lib/iomgr/tcp_windows.h4
5 files changed, 95 insertions, 20 deletions
diff --git a/src/core/lib/iomgr/endpoint_pair_windows.c b/src/core/lib/iomgr/endpoint_pair_windows.c
index 5c78c95492..93f71b745c 100644
--- a/src/core/lib/iomgr/endpoint_pair_windows.c
+++ b/src/core/lib/iomgr/endpoint_pair_windows.c
@@ -82,15 +82,16 @@ static void create_sockets(SOCKET sv[2]) {
sv[0] = svr_sock;
}
-grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
- size_t read_slice_size) {
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
+ const char *name, grpc_resource_quota *resource_quota,
+ size_t read_slice_size) {
SOCKET sv[2];
grpc_endpoint_pair p;
create_sockets(sv);
p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"),
- "endpoint:server");
+ resource_quota, "endpoint:server");
p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"),
- "endpoint:client");
+ resource_quota, "endpoint:client");
return p;
}
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index fdd8c1a1f8..30f7c66f15 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -43,6 +43,7 @@
#include <grpc/support/slice_buffer.h>
#include <grpc/support/useful.h>
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -61,13 +62,16 @@ typedef struct {
int refs;
grpc_closure on_connect;
grpc_endpoint **endpoint;
+ grpc_resource_quota *resource_quota;
} async_connect;
-static void async_connect_unlock_and_cleanup(async_connect *ac,
+static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx,
+ async_connect *ac,
grpc_winsocket *socket) {
int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (done) {
+ grpc_resource_quota_internal_unref(exec_ctx, ac->resource_quota);
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name);
gpr_free(ac);
@@ -83,7 +87,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (socket != NULL) {
grpc_winsocket_shutdown(socket);
}
- async_connect_unlock_and_cleanup(ac, socket);
+ async_connect_unlock_and_cleanup(exec_ctx, ac, socket);
}
static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
@@ -113,12 +117,12 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (!wsa_success) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx");
} else {
- *ep = grpc_tcp_create(socket, ac->addr_name);
+ *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name);
socket = NULL;
}
}
- async_connect_unlock_and_cleanup(ac, socket);
+ async_connect_unlock_and_cleanup(exec_ctx, ac, socket);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL);
@@ -129,6 +133,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
grpc_endpoint **endpoint,
grpc_pollset_set *interested_parties,
+ const grpc_channel_args *channel_args,
const grpc_resolved_address *addr,
gpr_timespec deadline) {
SOCKET sock = INVALID_SOCKET;
@@ -144,6 +149,17 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
grpc_winsocket_callback_info *info;
grpc_error *error = GRPC_ERROR_NONE;
+ grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
+ if (channel_args != NULL) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
+ grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+ resource_quota = grpc_resource_quota_internal_ref(
+ channel_args->args[i].value.pointer.p);
+ }
+ }
+ }
+
*endpoint = NULL;
/* Use dualstack sockets where available. */
@@ -177,8 +193,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
grpc_sockaddr_make_wildcard6(0, &local_address);
- status =
- bind(sock, (struct sockaddr *)&local_address.addr, local_address.len);
+ status = bind(sock, (struct sockaddr *)&local_address.addr,
+ (int)local_address.len);
if (status != 0) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
goto failure;
@@ -206,6 +222,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
ac->refs = 2;
ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->endpoint = endpoint;
+ ac->resource_quota = resource_quota;
grpc_closure_init(&ac->on_connect, on_connect, ac);
grpc_timer_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac,
@@ -225,6 +242,7 @@ failure:
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
+ grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL);
}
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index ad6769a6ba..ae54c70d2d 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -100,14 +100,32 @@ struct grpc_tcp_server {
/* shutdown callback */
grpc_closure *shutdown_complete;
+
+ grpc_resource_quota *resource_quota;
};
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */
-grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
+ grpc_closure *shutdown_complete,
const grpc_channel_args *args,
grpc_tcp_server **server) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+ s->resource_quota = grpc_resource_quota_create(NULL);
+ for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
+ if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
+ if (args->args[i].type == GRPC_ARG_POINTER) {
+ grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ s->resource_quota =
+ grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
+ } else {
+ grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ gpr_free(s);
+ return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
+ " must be a pointer to a buffer pool");
+ }
+ }
+ }
gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
s->active_ports = 0;
@@ -137,6 +155,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_winsocket_destroy(sp->socket);
gpr_free(sp);
}
+ grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s);
}
@@ -207,12 +226,13 @@ static grpc_error *prepare_socket(SOCKET sock,
goto failure;
}
- sockname_temp.len = sizeof(struct sockaddr_storage);
+ int sockname_temp_len = sizeof(struct sockaddr_storage);
if (getsockname(sock, (struct sockaddr *)sockname_temp.addr,
- &sockname_temp.len) == SOCKET_ERROR) {
+ &sockname_temp_len) == SOCKET_ERROR) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
goto failure;
}
+ sockname_temp.len = sockname_temp_len;
*port = grpc_sockaddr_get_port(&sockname_temp);
return GRPC_ERROR_NONE;
@@ -357,8 +377,10 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
gpr_free(utf8_message);
}
+ int peer_name_len = (int)peer_name.len;
err =
- getpeername(sock, (struct sockaddr *)peer_name.addr, &peer_name.len);
+ getpeername(sock, (struct sockaddr *)peer_name.addr, &peer_name_len);
+ peer_name.len = peer_name_len;
if (!err) {
peer_name_string = grpc_sockaddr_to_uri(&peer_name);
} else {
@@ -368,7 +390,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
}
gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string);
ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name),
- peer_name_string);
+ sp->server->resource_quota, peer_name_string);
gpr_free(fd_name);
gpr_free(peer_name_string);
} else {
@@ -466,10 +488,11 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) {
for (sp = s->head; sp; sp = sp->next) {
- sockname_temp.len = sizeof(struct sockaddr_storage);
+ int sockname_temp_len = sizeof(struct sockaddr_storage);
if (0 == getsockname(sp->socket->socket,
(struct sockaddr *)sockname_temp.addr,
- &sockname_temp.len)) {
+ &sockname_temp_len)) {
+ sockname_temp.len = sockname_temp_len;
*port = grpc_sockaddr_get_port(&sockname_temp);
if (*port > 0) {
allocated_addr = gpr_malloc(sizeof(grpc_resolved_address));
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index a5f508a2c3..46f0491d10 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -109,14 +109,29 @@ typedef struct grpc_tcp {
gpr_slice_buffer *write_slices;
gpr_slice_buffer *read_slices;
+ grpc_resource_user resource_user;
+
/* The IO Completion Port runs from another thread. We need some mechanism
to protect ourselves when requesting a shutdown. */
gpr_mu mu;
int shutting_down;
+ gpr_atm resource_user_shutdown_count;
+
char *peer_string;
} grpc_tcp;
+static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
+ grpc_error *error);
+
+static void win_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx,
+ grpc_tcp *tcp) {
+ if (gpr_atm_full_fetch_add(&tcp->resource_user_shutdown_count, 1) == 0) {
+ grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user,
+ grpc_closure_create(win_unref_closure, tcp));
+ }
+}
+
static void tcp_free(grpc_tcp *tcp) {
grpc_winsocket_destroy(tcp->socket);
gpr_mu_destroy(&tcp->mu);
@@ -155,6 +170,11 @@ static void tcp_unref(grpc_tcp *tcp) {
static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
+static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ TCP_UNREF(arg, "resource_user");
+}
+
/* Asynchronous callback from the IOCP, or the background thread. */
static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
grpc_tcp *tcp = tcpp;
@@ -376,12 +396,14 @@ static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
callback. See the comments in on_read and on_write. */
tcp->shutting_down = 1;
grpc_winsocket_shutdown(tcp->socket);
+ win_maybe_shutdown_resource_user(exec_ctx, tcp);
gpr_mu_unlock(&tcp->mu);
}
static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
+ win_maybe_shutdown_resource_user(exec_ctx, tcp);
TCP_UNREF(tcp, "destroy");
}
@@ -392,6 +414,11 @@ static char *win_get_peer(grpc_endpoint *ep) {
static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; }
+static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return &tcp->resource_user;
+}
+
static grpc_endpoint_vtable vtable = {win_read,
win_write,
win_get_workqueue,
@@ -399,18 +426,22 @@ static grpc_endpoint_vtable vtable = {win_read,
win_add_to_pollset_set,
win_shutdown,
win_destroy,
+ win_get_resource_user,
win_get_peer};
-grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
+grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
+ grpc_resource_quota *resource_quota,
+ char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
- gpr_ref_init(&tcp->refcount, 1);
+ gpr_ref_init(&tcp->refcount, 2);
grpc_closure_init(&tcp->on_read, on_read, tcp);
grpc_closure_init(&tcp->on_write, on_write, tcp);
tcp->peer_string = gpr_strdup(peer_string);
+ grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h
index 86d777235e..4402de1c38 100644
--- a/src/core/lib/iomgr/tcp_windows.h
+++ b/src/core/lib/iomgr/tcp_windows.h
@@ -50,7 +50,9 @@
/* Create a tcp endpoint given a winsock handle.
* Takes ownership of the handle.
*/
-grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string);
+grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
+ grpc_resource_quota *resource_quota,
+ char *peer_string);
grpc_error *grpc_tcp_prepare_socket(SOCKET sock);