aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_server_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/tcp_server_posix.c')
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c84
1 files changed, 55 insertions, 29 deletions
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 127f8595d1..179f47ef76 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -134,6 +134,8 @@ struct grpc_tcp_server {
/* next pollset to assign a channel to */
gpr_atm next_pollset_to_assign;
+
+ grpc_resource_quota *resource_quota;
};
static gpr_once check_init = GPR_ONCE_INIT;
@@ -150,23 +152,37 @@ static void init(void) {
#endif
}
-grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
+ grpc_closure *shutdown_complete,
const grpc_channel_args *args,
grpc_tcp_server **server) {
gpr_once_init(&check_init, init);
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
s->so_reuseport = has_so_reuseport;
+ s->resource_quota = grpc_resource_quota_create(NULL);
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_INTEGER) {
s->so_reuseport =
has_so_reuseport && (args->args[i].value.integer != 0);
} else {
+ grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT
" must be an integer");
}
+ } else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
+ if (args->args[i].type == GRPC_ARG_POINTER) {
+ grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ s->resource_quota =
+ grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
+ } else {
+ grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ gpr_free(s);
+ return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
+ " must be a pointer to a buffer pool");
+ }
}
}
gpr_ref_init(&s->refs, 1);
@@ -203,6 +219,8 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(sp);
}
+ grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+
gpr_free(s);
}
@@ -363,16 +381,12 @@ error:
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_tcp_listener *sp = arg;
- grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
- sp->fd_index};
- grpc_pollset *read_notifier_pollset = NULL;
- grpc_fd *fdobj;
if (err != GRPC_ERROR_NONE) {
goto error;
}
- read_notifier_pollset =
+ grpc_pollset *read_notifier_pollset =
sp->server->pollsets[(size_t)gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign, 1) %
sp->server->pollset_count];
@@ -408,7 +422,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
- fdobj = grpc_fd_create(fd, name);
+ grpc_fd *fdobj = grpc_fd_create(fd, name);
if (read_notifier_pollset == NULL) {
gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
@@ -417,10 +431,17 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
+ // Create acceptor.
+ grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
+ acceptor->from_server = sp->server;
+ acceptor->port_index = sp->port_index;
+ acceptor->fd_index = sp->fd_index;
+
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
- grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
- read_notifier_pollset, &acceptor);
+ grpc_tcp_create(fdobj, sp->server->resource_quota,
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
+ read_notifier_pollset, acceptor);
gpr_free(name);
gpr_free(addr_str);
@@ -638,41 +659,46 @@ done:
}
}
+/* Return listener at port_index or NULL. Should only be called with s->mu
+ locked. */
+static grpc_tcp_listener *get_port_index(grpc_tcp_server *s,
+ unsigned port_index) {
+ unsigned num_ports = 0;
+ grpc_tcp_listener *sp;
+ for (sp = s->head; sp; sp = sp->next) {
+ if (!sp->is_sibling) {
+ if (++num_ports > port_index) {
+ return sp;
+ }
+ }
+ }
+ return NULL;
+}
+
unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s,
unsigned port_index) {
unsigned num_fds = 0;
- grpc_tcp_listener *sp;
gpr_mu_lock(&s->mu);
- for (sp = s->head; sp && port_index != 0; sp = sp->next) {
- if (!sp->is_sibling) {
- --port_index;
- }
+ grpc_tcp_listener *sp = get_port_index(s, port_index);
+ for (; sp; sp = sp->sibling) {
+ ++num_fds;
}
- for (; sp; sp = sp->sibling, ++num_fds)
- ;
gpr_mu_unlock(&s->mu);
return num_fds;
}
int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
unsigned fd_index) {
- grpc_tcp_listener *sp;
- int fd;
gpr_mu_lock(&s->mu);
- for (sp = s->head; sp && port_index != 0; sp = sp->next) {
- if (!sp->is_sibling) {
- --port_index;
+ grpc_tcp_listener *sp = get_port_index(s, port_index);
+ for (; sp; sp = sp->sibling, --fd_index) {
+ if (fd_index == 0) {
+ gpr_mu_unlock(&s->mu);
+ return sp->fd;
}
}
- for (; sp && fd_index != 0; sp = sp->sibling, --fd_index)
- ;
- if (sp) {
- fd = sp->fd;
- } else {
- fd = -1;
- }
gpr_mu_unlock(&s->mu);
- return fd;
+ return -1;
}
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,