diff options
Diffstat (limited to 'src/core/lib/iomgr/tcp_server_posix.c')
-rw-r--r-- | src/core/lib/iomgr/tcp_server_posix.c | 42 |
1 files changed, 22 insertions, 20 deletions
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 7e2fb0f1f9..20efb678b2 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -167,18 +167,18 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, s->so_reuseport = has_so_reuseport && (args->args[i].value.integer != 0); } else { - grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); + grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT " must be an integer"); } } else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_POINTER) { - grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); + grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); s->resource_quota = - grpc_resource_quota_internal_ref(args->args[i].value.pointer.p); + grpc_resource_quota_ref_internal(args->args[i].value.pointer.p); } else { - grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); + grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); @@ -208,7 +208,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { GPR_ASSERT(s->shutdown); gpr_mu_unlock(&s->mu); if (s->shutdown_complete != NULL) { - grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); } gpr_mu_destroy(&s->mu); @@ -219,7 +219,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_free(sp); } - grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); + grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); } @@ -254,8 +254,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { grpc_unlink_if_unix_domain_socket(&sp->addr); - sp->destroyed_closure.cb = destroyed_port; - sp->destroyed_closure.cb_arg = s; + grpc_closure_init(&sp->destroyed_closure, destroyed_port, s, + grpc_schedule_on_exec_ctx); grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, "tcp_listener_shutdown"); } @@ -381,16 +381,12 @@ error: /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_tcp_listener *sp = arg; - grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, - sp->fd_index}; - grpc_pollset *read_notifier_pollset = NULL; - grpc_fd *fdobj; if (err != GRPC_ERROR_NONE) { goto error; } - read_notifier_pollset = + grpc_pollset *read_notifier_pollset = sp->server->pollsets[(size_t)gpr_atm_no_barrier_fetch_add( &sp->server->next_pollset_to_assign, 1) % sp->server->pollset_count]; @@ -426,7 +422,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str); } - fdobj = grpc_fd_create(fd, name); + grpc_fd *fdobj = grpc_fd_create(fd, name); if (read_notifier_pollset == NULL) { gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd"); @@ -435,11 +431,17 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj); + // Create acceptor. + grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); + acceptor->from_server = sp->server; + acceptor->port_index = sp->port_index; + acceptor->fd_index = sp->fd_index; + sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, grpc_tcp_create(fdobj, sp->server->resource_quota, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), - read_notifier_pollset, &acceptor); + read_notifier_pollset, acceptor); gpr_free(name); gpr_free(addr_str); @@ -721,8 +723,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, "clone_port", clone_port(sp, (unsigned)(pollset_count - 1)))); for (i = 0; i < pollset_count; i++) { grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); - sp->read_closure.cb = on_read; - sp->read_closure.cb_arg = sp; + grpc_closure_init(&sp->read_closure, on_read, sp, + grpc_schedule_on_exec_ctx); grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); s->active_ports++; sp = sp->next; @@ -731,8 +733,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, for (i = 0; i < pollset_count; i++) { grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); } - sp->read_closure.cb = on_read; - sp->read_closure.cb_arg = sp; + grpc_closure_init(&sp->read_closure, on_read, sp, + grpc_schedule_on_exec_ctx); grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); s->active_ports++; sp = sp->next; @@ -758,7 +760,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (gpr_unref(&s->refs)) { grpc_tcp_server_shutdown_listeners(exec_ctx, s); gpr_mu_lock(&s->mu); - grpc_exec_ctx_enqueue_list(exec_ctx, &s->shutdown_starting, NULL); + grpc_closure_list_sched(exec_ctx, &s->shutdown_starting); gpr_mu_unlock(&s->mu); tcp_server_destroy(exec_ctx, s); } |