diff options
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/combiner.cc | 19 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.cc | 56 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.cc | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.h | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.h | 53 | ||||
-rw-r--r-- | src/core/lib/iomgr/fork_posix.cc | 47 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.cc | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_custom.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.cc | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_posix.cc | 13 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_utils_posix.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_utils_posix_common.cc | 16 |
13 files changed, 181 insertions, 49 deletions
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 9429842eb8..6789e4d12d 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -63,11 +63,12 @@ struct grpc_combiner { gpr_refcount refs; }; +static void combiner_run(grpc_closure* closure, grpc_error* error); static void combiner_exec(grpc_closure* closure, grpc_error* error); static void combiner_finally_exec(grpc_closure* closure, grpc_error* error); static const grpc_closure_scheduler_vtable scheduler = { - combiner_exec, combiner_exec, "combiner:immediately"}; + combiner_run, combiner_exec, "combiner:immediately"}; static const grpc_closure_scheduler_vtable finally_scheduler = { combiner_finally_exec, combiner_finally_exec, "combiner:finally"}; @@ -343,6 +344,22 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) { grpc_closure_list_append(&lock->final_list, closure, error); } +static void combiner_run(grpc_closure* closure, grpc_error* error) { + grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler); +#ifndef NDEBUG + closure->scheduled = false; + GRPC_COMBINER_TRACE(gpr_log( + GPR_DEBUG, + "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]", + lock, closure, closure->file_created, closure->line_created, + closure->file_initiated, closure->line_initiated)); +#endif + GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == + lock); + closure->cb(closure->cb_arg, error); + GRPC_ERROR_UNREF(error); +} + static void enqueue_finally(void* closure, grpc_error* error) { combiner_finally_exec(static_cast<grpc_closure*>(closure), GRPC_ERROR_REF(error)); diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 65f1c912af..4c6cff7fe2 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -63,6 +63,7 @@ // a keepalive ping timeout issue. We may want to revert https://github // .com/grpc/grpc/pull/14943 once we figure out the root cause. #define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16 +#define MAX_PROBE_EPOLL_FDS 32 grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false, "pollable_refcount"); @@ -75,6 +76,12 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type; typedef struct pollable pollable; +typedef struct cached_fd { + intptr_t salt; + int fd; + uint64_t last_used; +} cached_fd; + /// A pollable is something that can be polled: it has an epoll set to poll on, /// and a wakeup fd for kicks /// There are three broad types: @@ -103,6 +110,11 @@ struct pollable { int event_cursor; int event_count; struct epoll_event events[MAX_EPOLL_EVENTS]; + + // Maintain a LRU-eviction cache of fds in this pollable + cached_fd fd_cache[MAX_PROBE_EPOLL_FDS]; + int fd_cache_size; + uint64_t fd_cache_counter; }; static const char* pollable_type_string(pollable_type t) { @@ -145,8 +157,11 @@ static void pollable_unref(pollable* p, int line, const char* reason); * Fd Declarations */ +static gpr_atm g_fd_salt; + struct grpc_fd { int fd; + intptr_t salt; /* refst format: bit 0 : 1=Active / 0=Orphaned bits 1-n : refcount @@ -354,6 +369,7 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->pollable_obj = nullptr; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; + new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1); new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); @@ -447,9 +463,13 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) { if (epfd == -1) { return GRPC_OS_ERROR(errno, "epoll_create1"); } + GRPC_FD_TRACE("Pollable_create: created epfd: %d (type: %d)", epfd, type); *p = static_cast<pollable*>(gpr_malloc(sizeof(**p))); grpc_error* err = grpc_wakeup_fd_init(&(*p)->wakeup); if (err != GRPC_ERROR_NONE) { + GRPC_FD_TRACE( + "Pollable_create: closed epfd: %d (type: %d). wakeupfd_init error", + epfd, type); close(epfd); gpr_free(*p); *p = nullptr; @@ -460,6 +480,9 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) { ev.data.ptr = (void*)(1 | (intptr_t) & (*p)->wakeup); if (epoll_ctl(epfd, EPOLL_CTL_ADD, (*p)->wakeup.read_fd, &ev) != 0) { err = GRPC_OS_ERROR(errno, "epoll_ctl"); + GRPC_FD_TRACE( + "Pollable_create: closed epfd: %d (type: %d). epoll_ctl error", epfd, + type); close(epfd); grpc_wakeup_fd_destroy(&(*p)->wakeup); gpr_free(*p); @@ -477,6 +500,8 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) { (*p)->root_worker = nullptr; (*p)->event_cursor = 0; (*p)->event_count = 0; + (*p)->fd_cache_size = 0; + (*p)->fd_cache_counter = 0; return GRPC_ERROR_NONE; } @@ -506,6 +531,7 @@ static void pollable_unref(pollable* p, int line, const char* reason) { } #endif if (p != nullptr && gpr_unref(&p->refs)) { + GRPC_FD_TRACE("pollable_unref: Closing epfd: %d", p->epfd); close(p->epfd); grpc_wakeup_fd_destroy(&p->wakeup); gpr_free(p); @@ -516,7 +542,36 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { grpc_error* error = GRPC_ERROR_NONE; static const char* err_desc = "pollable_add_fd"; const int epfd = p->epfd; + gpr_mu_lock(&p->mu); + p->fd_cache_counter++; + // Handle the case of overflow for our cache counter by + // reseting the recency-counter on all cache objects + if (p->fd_cache_counter == 0) { + for (int i = 0; i < p->fd_cache_size; i++) { + p->fd_cache[i].last_used = 0; + } + } + int lru_idx = 0; + for (int i = 0; i < p->fd_cache_size; i++) { + if (p->fd_cache[i].fd == fd->fd && p->fd_cache[i].salt == fd->salt) { + GRPC_STATS_INC_POLLSET_FD_CACHE_HITS(); + p->fd_cache[i].last_used = p->fd_cache_counter; + gpr_mu_unlock(&p->mu); + return GRPC_ERROR_NONE; + } else if (p->fd_cache[i].last_used < p->fd_cache[lru_idx].last_used) { + lru_idx = i; + } + } + // Add to cache + if (p->fd_cache_size < MAX_PROBE_EPOLL_FDS) { + lru_idx = p->fd_cache_size; + p->fd_cache_size++; + } + p->fd_cache[lru_idx].fd = fd->fd; + p->fd_cache[lru_idx].salt = fd->salt; + p->fd_cache[lru_idx].last_used = p->fd_cache_counter; + gpr_mu_unlock(&p->mu); if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } @@ -525,6 +580,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { ev_fd.events = static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE); ev_fd.data.ptr = fd; + GRPC_STATS_INC_SYSCALL_EPOLL_CTL(); if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { switch (errno) { case EEXIST: diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 4ea63fc6e8..6bd1dc8e50 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -40,6 +40,9 @@ grpc_core::TraceFlag grpc_polling_trace(false, "polling"); /* Disabled by default */ + +/* Traces fd create/close operations */ +grpc_core::TraceFlag grpc_fd_trace(false, "fd_trace"); grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount"); grpc_core::DebugOnlyTraceFlag grpc_polling_api_trace(false, "polling_api"); @@ -192,6 +195,7 @@ void grpc_event_engine_shutdown(void) { grpc_fd* grpc_fd_create(int fd, const char* name) { GRPC_POLLING_API_TRACE("fd_create(%d, %s)", fd, name); + GRPC_FD_TRACE("fd_create(%d, %s)", fd, name); return g_event_engine->fd_create(fd, name); } @@ -204,11 +208,14 @@ void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %d, %s)", grpc_fd_wrapped_fd(fd), on_done, release_fd, already_closed, reason); + GRPC_FD_TRACE("grpc_fd_orphan, fd:%d closed", grpc_fd_wrapped_fd(fd)); + g_event_engine->fd_orphan(fd, on_done, release_fd, already_closed, reason); } void grpc_fd_shutdown(grpc_fd* fd, grpc_error* why) { GRPC_POLLING_API_TRACE("fd_shutdown(%d)", grpc_fd_wrapped_fd(fd)); + GRPC_FD_TRACE("fd_shutdown(%d)", grpc_fd_wrapped_fd(fd)); g_event_engine->fd_shutdown(fd, why); } diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 6a5129a74d..82cbce9a7b 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -29,8 +29,14 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" +extern grpc_core::TraceFlag grpc_fd_trace; /* Disabled by default */ extern grpc_core::TraceFlag grpc_polling_trace; /* Disabled by default */ +#define GRPC_FD_TRACE(format, ...) \ + if (grpc_fd_trace.enabled()) { \ + gpr_log(GPR_INFO, "(fd-trace) " format, __VA_ARGS__); \ + } + typedef struct grpc_fd grpc_fd; typedef struct grpc_event_engine_vtable { diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 72d0ae58c1..8823dc4b51 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -26,6 +26,7 @@ #include <grpc/support/log.h> #include "src/core/lib/gpr/tls.h" +#include "src/core/lib/gprpp/fork.h" #include "src/core/lib/iomgr/closure.h" typedef gpr_atm grpc_millis; @@ -79,30 +80,41 @@ namespace grpc_core { * - Exactly one instance of ExecCtx must be created per thread. Instances must * always be called exec_ctx. * - Do not pass exec_ctx as a parameter to a function. Always access it using - * grpc_core::ExecCtx::Get() + * grpc_core::ExecCtx::Get(). */ class ExecCtx { public: /** Default Constructor */ - ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) { Set(this); } + ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) { + grpc_core::Fork::IncExecCtxCount(); + Set(this); + } /** Parameterised Constructor */ - ExecCtx(uintptr_t fl) : flags_(fl) { Set(this); } + ExecCtx(uintptr_t fl) : flags_(fl) { + grpc_core::Fork::IncExecCtxCount(); + Set(this); + } /** Destructor */ virtual ~ExecCtx() { flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; Flush(); Set(last_exec_ctx_); + grpc_core::Fork::DecExecCtxCount(); } /** Disallow copy and assignment operators */ ExecCtx(const ExecCtx&) = delete; ExecCtx& operator=(const ExecCtx&) = delete; - /** Return starting_cpu */ + /** Return starting_cpu. This is only required for stats collection and is + * hence only defined if GRPC_COLLECT_STATS is enabled. + */ +#if defined(GRPC_COLLECT_STATS) || !defined(NDEBUG) unsigned starting_cpu() const { return starting_cpu_; } +#endif /* defined(GRPC_COLLECT_STATS) || !defined(NDEBUG) */ struct CombinerData { /* currently active combiner: updated only via combiner.c */ @@ -128,12 +140,14 @@ class ExecCtx { /** Flush any work that has been enqueued onto this grpc_exec_ctx. * Caller must guarantee that no interfering locks are held. - * Returns true if work was performed, false otherwise. */ + * Returns true if work was performed, false otherwise. + */ bool Flush(); /** Returns true if we'd like to leave this execution context as soon as -possible: useful for deciding whether to do something more or not depending -on outside context */ + * possible: useful for deciding whether to do something more or not + * depending on outside context. + */ bool IsReadyToFinish() { if ((flags_ & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) { if (CheckReadyToFinish()) { @@ -147,12 +161,14 @@ on outside context */ } /** Returns the stored current time relative to start if valid, - * otherwise refreshes the stored time, sets it valid and returns the new - * value */ + * otherwise refreshes the stored time, sets it valid and returns the new + * value. + */ grpc_millis Now(); /** Invalidates the stored time value. A new time value will be set on calling - * Now() */ + * Now(). + */ void InvalidateNow() { now_is_valid_ = false; } /** To be used only by shutdown code in iomgr */ @@ -162,20 +178,20 @@ on outside context */ } /** To be used only for testing. - * Sets the now value + * Sets the now value. */ void TestOnlySetNow(grpc_millis new_val) { now_ = new_val; now_is_valid_ = true; } - /** Global initialization for ExecCtx. Called by iomgr */ + /** Global initialization for ExecCtx. Called by iomgr. */ static void GlobalInit(void); - /** Global shutdown for ExecCtx. Called by iomgr */ + /** Global shutdown for ExecCtx. Called by iomgr. */ static void GlobalShutdown(void) { gpr_tls_destroy(&exec_ctx_); } - /** Gets pointer to current exec_ctx */ + /** Gets pointer to current exec_ctx. */ static ExecCtx* Get() { return reinterpret_cast<ExecCtx*>(gpr_tls_get(&exec_ctx_)); } @@ -185,19 +201,22 @@ on outside context */ } protected: - /** Check if ready to finish */ + /** Check if ready to finish. */ virtual bool CheckReadyToFinish() { return false; } - /** Disallow delete on ExecCtx */ + /** Disallow delete on ExecCtx. */ static void operator delete(void* p) { abort(); } private: - /** Set exec_ctx_ to exec_ctx */ + /** Set exec_ctx_ to exec_ctx. */ grpc_closure_list closure_list_ = GRPC_CLOSURE_LIST_INIT; CombinerData combiner_data_ = {nullptr, nullptr}; uintptr_t flags_; + +#if defined(GRPC_COLLECT_STATS) || !defined(NDEBUG) unsigned starting_cpu_ = gpr_cpu_current_cpu(); +#endif /* defined(GRPC_COLLECT_STATS) || !defined(NDEBUG) */ bool now_is_valid_ = false; grpc_millis now_ = 0; diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc index 6c506eb5c9..b37384b8db 100644 --- a/src/core/lib/iomgr/fork_posix.cc +++ b/src/core/lib/iomgr/fork_posix.cc @@ -28,7 +28,7 @@ #include <grpc/support/log.h> #include "src/core/lib/gpr/env.h" -#include "src/core/lib/gpr/fork.h" +#include "src/core/lib/gprpp/fork.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/executor.h" @@ -41,46 +41,59 @@ * AROUND VERY SPECIFIC USE CASES. */ +namespace { +bool skipped_handler = true; +bool registered_handlers = false; +} // namespace + void grpc_prefork() { - if (!grpc_fork_support_enabled()) { + grpc_core::ExecCtx exec_ctx; + skipped_handler = true; + if (!grpc_is_initialized()) { + return; + } + if (!grpc_core::Fork::Enabled()) { gpr_log(GPR_ERROR, "Fork support not enabled; try running with the " "environment variable GRPC_ENABLE_FORK_SUPPORT=1"); return; } - if (grpc_is_initialized()) { - grpc_core::ExecCtx exec_ctx; - grpc_timer_manager_set_threading(false); - grpc_executor_set_threading(false); - grpc_core::ExecCtx::Get()->Flush(); - if (!grpc_core::Thread::AwaitAll( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(3, GPR_TIMESPAN)))) { - gpr_log(GPR_ERROR, "gRPC thread still active! Cannot fork!"); - } + if (!grpc_core::Fork::BlockExecCtx()) { + gpr_log(GPR_INFO, + "Other threads are currently calling into gRPC, skipping fork() " + "handlers"); + return; } + grpc_timer_manager_set_threading(false); + grpc_executor_set_threading(false); + grpc_core::ExecCtx::Get()->Flush(); + grpc_core::Fork::AwaitThreads(); + skipped_handler = false; } void grpc_postfork_parent() { - if (grpc_is_initialized()) { - grpc_timer_manager_set_threading(true); + if (!skipped_handler) { + grpc_core::Fork::AllowExecCtx(); grpc_core::ExecCtx exec_ctx; + grpc_timer_manager_set_threading(true); grpc_executor_set_threading(true); } } void grpc_postfork_child() { - if (grpc_is_initialized()) { - grpc_timer_manager_set_threading(true); + if (!skipped_handler) { + grpc_core::Fork::AllowExecCtx(); grpc_core::ExecCtx exec_ctx; + grpc_timer_manager_set_threading(true); grpc_executor_set_threading(true); } } void grpc_fork_handlers_auto_register() { - if (grpc_fork_support_enabled()) { + if (grpc_core::Fork::Enabled() & !registered_handlers) { #ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK pthread_atfork(grpc_prefork, grpc_postfork_parent, grpc_postfork_child); + registered_handlers = true; #endif // GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK } } diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 8cf4fe9928..539bc120ce 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -386,7 +386,7 @@ static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) { resource_quota->debug_only_last_reclaimer_resource_user = resource_user; resource_quota->debug_only_last_initiated_reclaimer = c; resource_user->reclaimers[destructive] = nullptr; - GRPC_CLOSURE_RUN(c, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE); return true; } diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 6144d389f7..900c056575 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -45,6 +45,7 @@ #include "src/core/lib/iomgr/tcp_posix.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" +#include "src/core/lib/slice/slice_internal.h" extern grpc_core::TraceFlag grpc_tcp_trace; @@ -233,7 +234,7 @@ finish: error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, addr_str_slice /* takes ownership */); } else { - grpc_slice_unref(addr_str_slice); + grpc_slice_unref_internal(addr_str_slice); } if (done) { // This is safe even outside the lock, because "done", the sentinel, is diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc index b3b2934014..990e8d632b 100644 --- a/src/core/lib/iomgr/tcp_custom.cc +++ b/src/core/lib/iomgr/tcp_custom.cc @@ -141,7 +141,7 @@ static void call_read_cb(custom_tcp_endpoint* tcp, grpc_error* error) { TCP_UNREF(tcp, "read"); tcp->read_slices = nullptr; tcp->read_cb = nullptr; - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); } static void custom_read_callback(grpc_custom_socket* socket, size_t nread, diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 153be05e83..b79ffe20f1 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -366,7 +366,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) { tcp->read_cb = nullptr; tcp->incoming_buffer = nullptr; - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); } #define MAX_READ_IOVEC 4 @@ -629,7 +629,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { gpr_log(GPR_INFO, "write: %s", str); } - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); TCP_UNREF(tcp, "write"); } } diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 524beba9ab..484d2b6077 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -187,11 +187,6 @@ static void on_read(void* arg, grpc_error* err) { goto error; } - read_notifier_pollset = - sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add( - &sp->server->next_pollset_to_assign, 1)) % - sp->server->pollset_count]; - /* loop until accept4 returns EAGAIN, and then re-arm notification */ for (;;) { grpc_resolved_address addr; @@ -233,6 +228,11 @@ static void on_read(void* arg, grpc_error* err) { grpc_fd* fdobj = grpc_fd_create(fd, name); + read_notifier_pollset = + sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add( + &sp->server->next_pollset_to_assign, 1)) % + sp->server->pollset_count]; + grpc_pollset_add_fd(read_notifier_pollset, fdobj); // Create acceptor. @@ -346,7 +346,8 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) { err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode, &fd); if (err != GRPC_ERROR_NONE) return err; - err = grpc_tcp_server_prepare_socket(fd, &listener->addr, true, &port); + err = grpc_tcp_server_prepare_socket(listener->server, fd, &listener->addr, + true, &port); if (err != GRPC_ERROR_NONE) return err; listener->server->nports++; grpc_sockaddr_to_string(&addr_str, &listener->addr, 1); diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index 34d68130c9..dd199097b2 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -113,7 +113,7 @@ grpc_error* grpc_tcp_server_add_all_local_addrs(grpc_tcp_server* s, int* out_port); /* Prepare a recently-created socket for listening. */ -grpc_error* grpc_tcp_server_prepare_socket(int fd, +grpc_error* grpc_tcp_server_prepare_socket(grpc_tcp_server*, int fd, const grpc_resolved_address* addr, bool so_reuseport, int* port); /* Ruturn true if the platform supports ifaddrs */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index 9f4e58ca16..2d95aa66d6 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -87,7 +87,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd, char* name; grpc_error* err = - grpc_tcp_server_prepare_socket(fd, addr, s->so_reuseport, &port); + grpc_tcp_server_prepare_socket(s, fd, addr, s->so_reuseport, &port); if (err == GRPC_ERROR_NONE) { GPR_ASSERT(port > 0); grpc_sockaddr_to_string(&addr_str, addr, 1); @@ -144,7 +144,7 @@ grpc_error* grpc_tcp_server_add_addr(grpc_tcp_server* s, } /* Prepare a recently-created socket for listening. */ -grpc_error* grpc_tcp_server_prepare_socket(int fd, +grpc_error* grpc_tcp_server_prepare_socket(grpc_tcp_server* s, int fd, const grpc_resolved_address* addr, bool so_reuseport, int* port) { grpc_resolved_address sockname_temp; @@ -170,6 +170,18 @@ grpc_error* grpc_tcp_server_prepare_socket(int fd, err = grpc_set_socket_no_sigpipe_if_possible(fd); if (err != GRPC_ERROR_NONE) goto error; + if (s->channel_args) { + for (size_t i = 0; i < s->channel_args->num_args; i++) { + if (0 == strcmp(s->channel_args->args[i].key, GRPC_ARG_SOCKET_MUTATOR)) { + GPR_ASSERT(s->channel_args->args[i].type == GRPC_ARG_POINTER); + grpc_socket_mutator* mutator = static_cast<grpc_socket_mutator*>( + s->channel_args->args[i].value.pointer.p); + err = grpc_set_socket_with_mutator(fd, mutator); + if (err != GRPC_ERROR_NONE) goto error; + } + } + } + if (bind(fd, reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr)), addr->len) < 0) { err = GRPC_OS_ERROR(errno, "bind"); |