aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_server_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/tcp_server_posix.c')
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c42
1 files changed, 22 insertions, 20 deletions
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 7e2fb0f1f9..20efb678b2 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -167,18 +167,18 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
s->so_reuseport =
has_so_reuseport && (args->args[i].value.integer != 0);
} else {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT
" must be an integer");
}
} else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
s->resource_quota =
- grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
+ grpc_resource_quota_ref_internal(args->args[i].value.pointer.p);
} else {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
" must be a pointer to a buffer pool");
@@ -208,7 +208,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
GPR_ASSERT(s->shutdown);
gpr_mu_unlock(&s->mu);
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
@@ -219,7 +219,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(sp);
}
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
}
@@ -254,8 +254,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
- sp->destroyed_closure.cb = destroyed_port;
- sp->destroyed_closure.cb_arg = s;
+ grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
+ grpc_schedule_on_exec_ctx);
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"tcp_listener_shutdown");
}
@@ -381,16 +381,12 @@ error:
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_tcp_listener *sp = arg;
- grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
- sp->fd_index};
- grpc_pollset *read_notifier_pollset = NULL;
- grpc_fd *fdobj;
if (err != GRPC_ERROR_NONE) {
goto error;
}
- read_notifier_pollset =
+ grpc_pollset *read_notifier_pollset =
sp->server->pollsets[(size_t)gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign, 1) %
sp->server->pollset_count];
@@ -426,7 +422,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
- fdobj = grpc_fd_create(fd, name);
+ grpc_fd *fdobj = grpc_fd_create(fd, name);
if (read_notifier_pollset == NULL) {
gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
@@ -435,11 +431,17 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
+ // Create acceptor.
+ grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
+ acceptor->from_server = sp->server;
+ acceptor->port_index = sp->port_index;
+ acceptor->fd_index = sp->fd_index;
+
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, sp->server->resource_quota,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
- read_notifier_pollset, &acceptor);
+ read_notifier_pollset, acceptor);
gpr_free(name);
gpr_free(addr_str);
@@ -721,8 +723,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
"clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
- sp->read_closure.cb = on_read;
- sp->read_closure.cb_arg = sp;
+ grpc_closure_init(&sp->read_closure, on_read, sp,
+ grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
sp = sp->next;
@@ -731,8 +733,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
- sp->read_closure.cb = on_read;
- sp->read_closure.cb_arg = sp;
+ grpc_closure_init(&sp->read_closure, on_read, sp,
+ grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
sp = sp->next;
@@ -758,7 +760,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(exec_ctx, s);
gpr_mu_lock(&s->mu);
- grpc_exec_ctx_enqueue_list(exec_ctx, &s->shutdown_starting, NULL);
+ grpc_closure_list_sched(exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(exec_ctx, s);
}