diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/gpr/cpu_posix.cc | 2 | ||||
-rw-r--r-- | src/core/lib/gprpp/manual_constructor.h | 2 | ||||
-rw-r--r-- | src/core/lib/gprpp/memory.h | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.cc | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.cc | 476 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.h | 62 | ||||
-rw-r--r-- | src/core/lib/surface/lame_client.cc | 10 |
7 files changed, 312 insertions, 250 deletions
diff --git a/src/core/lib/gpr/cpu_posix.cc b/src/core/lib/gpr/cpu_posix.cc index 7a77f7ab64..915fd4976c 100644 --- a/src/core/lib/gpr/cpu_posix.cc +++ b/src/core/lib/gpr/cpu_posix.cc @@ -37,7 +37,7 @@ static long ncpus = 0; static pthread_key_t thread_id_key; static void init_ncpus() { - ncpus = sysconf(_SC_NPROCESSORS_ONLN); + ncpus = sysconf(_SC_NPROCESSORS_CONF); if (ncpus < 1 || ncpus > INT32_MAX) { gpr_log(GPR_ERROR, "Cannot determine number of CPUs: assuming 1"); ncpus = 1; diff --git a/src/core/lib/gprpp/manual_constructor.h b/src/core/lib/gprpp/manual_constructor.h index a177048605..7f827ca8b7 100644 --- a/src/core/lib/gprpp/manual_constructor.h +++ b/src/core/lib/gprpp/manual_constructor.h @@ -156,7 +156,7 @@ class PolymorphicManualConstructor { static_assert( manual_ctor_impl::is_one_of<DerivedType, DerivedTypes...>::value, "DerivedType must be one of the predeclared DerivedTypes"); - GPR_ASSERT(reinterpret_cast<BaseType*>(static_cast<DerivedType*>(p)) == p); + GPR_ASSERT(static_cast<BaseType*>(p) == p); } typename std::aligned_storage< diff --git a/src/core/lib/gprpp/memory.h b/src/core/lib/gprpp/memory.h index f84e20eeea..ba2f546675 100644 --- a/src/core/lib/gprpp/memory.h +++ b/src/core/lib/gprpp/memory.h @@ -30,12 +30,12 @@ namespace grpc_core { // The alignment of memory returned by gpr_malloc(). -constexpr size_t kAllignmentForDefaultAllocationInBytes = 8; +constexpr size_t kAlignmentForDefaultAllocationInBytes = 8; // Alternative to new, since we cannot use it (for fear of libstdc++) template <typename T, typename... Args> inline T* New(Args&&... args) { - void* p = alignof(T) > kAllignmentForDefaultAllocationInBytes + void* p = alignof(T) > kAlignmentForDefaultAllocationInBytes ? gpr_malloc_aligned(sizeof(T), alignof(T)) : gpr_malloc(sizeof(T)); return new (p) T(std::forward<Args>(args)...); @@ -45,7 +45,7 @@ inline T* New(Args&&... args) { template <typename T> inline void Delete(T* p) { p->~T(); - if (alignof(T) > kAllignmentForDefaultAllocationInBytes) { + if (alignof(T) > kAlignmentForDefaultAllocationInBytes) { gpr_free_aligned(p); } else { gpr_free(p); diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 6120f9f44b..436537bc0b 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -876,7 +876,6 @@ static grpc_error* pollset_work(grpc_pollset* pollset, grpc_pollset_worker** worker_hdl, grpc_millis deadline) { GPR_TIMER_SCOPE("pollset_work", 0); - grpc_pollset_worker worker; if (worker_hdl) *worker_hdl = &worker; grpc_error* error = GRPC_ERROR_NONE; @@ -927,7 +926,8 @@ static grpc_error* pollset_work(grpc_pollset* pollset, gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); while (keep_polling) { keep_polling = 0; - if (!pollset->kicked_without_pollers) { + if (!pollset->kicked_without_pollers || + deadline <= grpc_core::ExecCtx::Get()->Now()) { if (!added_worker) { push_front_worker(pollset, &worker); added_worker = 1; diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index ec65497d79..e739a5df93 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -52,6 +52,8 @@ #include <grpc/support/time.h> #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/executor.h" @@ -62,41 +64,102 @@ #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" -/* one listening port */ -typedef struct grpc_udp_listener grpc_udp_listener; -struct grpc_udp_listener { - int fd; - grpc_fd* emfd; - grpc_udp_server* server; - grpc_resolved_address addr; - grpc_closure read_closure; - grpc_closure write_closure; +/* A listener which implements basic features of Listening on a port for + * I/O events*/ +class GrpcUdpListener { + public: + GrpcUdpListener(grpc_udp_server* server, int fd, + const grpc_resolved_address* addr); + ~GrpcUdpListener(); + + /* Called when grpc server starts to listening on the grpc_fd. */ + void StartListening(grpc_pollset** pollsets, size_t pollset_count, + GrpcUdpHandlerFactory* handler_factory); + + /* Called when data is available to read from the socket. + * Return true if there is more data to read from fd. */ + void OnRead(grpc_error* error, void* do_read_arg); + + /* Called when the socket is writeable. The given closure should be scheduled + * when the socket becomes blocked next time. */ + void OnCanWrite(grpc_error* error, void* do_write_arg); + + /* Called when the grpc_fd is about to be orphaned (and the FD closed). */ + void OnFdAboutToOrphan(); + + /* Called to orphan fd of this listener.*/ + void OrphanFd(); + + /* Called when this listener is going to be destroyed. */ + void OnDestroy(); + + int fd() const { return fd_; } + + protected: + grpc_fd* emfd() const { return emfd_; } + + gpr_mu* mutex() { return &mutex_; } + + private: + /* event manager callback when reads are ready */ + static void on_read(void* arg, grpc_error* error); + static void on_write(void* arg, grpc_error* error); + + static void do_read(void* arg, grpc_error* error); + static void do_write(void* arg, grpc_error* error); + // Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback + // interface. + static void fd_notify_on_write_wrapper(void* arg, grpc_error* error); + + static void shutdown_fd(void* args, grpc_error* error); + + int fd_; + grpc_fd* emfd_; + grpc_udp_server* server_; + grpc_resolved_address addr_; + grpc_closure read_closure_; + grpc_closure write_closure_; // To be called when corresponding QuicGrpcServer closes all active // connections. - grpc_closure orphan_fd_closure; - grpc_closure destroyed_closure; - grpc_udp_server_read_cb read_cb; - grpc_udp_server_write_cb write_cb; - grpc_udp_server_orphan_cb orphan_cb; - grpc_udp_server_start_cb start_cb; + grpc_closure orphan_fd_closure_; + grpc_closure destroyed_closure_; // To be scheduled on another thread to actually read/write. - grpc_closure do_read_closure; - grpc_closure do_write_closure; - grpc_closure notify_on_write_closure; + grpc_closure do_read_closure_; + grpc_closure do_write_closure_; + grpc_closure notify_on_write_closure_; // True if orphan_cb is trigered. - bool orphan_notified; + bool orphan_notified_; // True if grpc_fd_notify_on_write() is called after on_write() call. - bool notify_on_write_armed; + bool notify_on_write_armed_; // True if fd has been shutdown. - bool already_shutdown; - - struct grpc_udp_listener* next; + bool already_shutdown_; + // Object actually handles I/O events. Assigned in StartListening(). + GrpcUdpHandler* udp_handler_ = nullptr; + // To be notified on destruction. + GrpcUdpHandlerFactory* handler_factory_ = nullptr; + // Required to access above fields. + gpr_mu mutex_; }; -struct shutdown_fd_args { - grpc_udp_listener* sp; - gpr_mu* server_mu; -}; +GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd, + const grpc_resolved_address* addr) + : fd_(fd), + server_(server), + orphan_notified_(false), + already_shutdown_(false) { + char* addr_str; + char* name; + grpc_sockaddr_to_string(&addr_str, addr, 1); + gpr_asprintf(&name, "udp-server-listener:%s", addr_str); + gpr_free(addr_str); + emfd_ = grpc_fd_create(fd, name); + memcpy(&addr_, addr, sizeof(grpc_resolved_address)); + GPR_ASSERT(emfd_); + gpr_free(name); + gpr_mu_init(&mutex_); +} + +GrpcUdpListener::~GrpcUdpListener() { gpr_mu_destroy(&mutex_); } /* the overall server */ struct grpc_udp_server { @@ -113,10 +176,11 @@ struct grpc_udp_server { /* is this server shutting down? (boolean) */ int shutdown; - /* linked list of server ports */ - grpc_udp_listener* head; - grpc_udp_listener* tail; - unsigned nports; + /* An array of listeners */ + grpc_core::InlinedVector<GrpcUdpListener, 16> listeners; + + /* factory for use to create udp listeners */ + GrpcUdpHandlerFactory* handler_factory; /* shutdown callback */ grpc_closure* shutdown_complete; @@ -141,8 +205,7 @@ static grpc_socket_factory* get_socket_factory(const grpc_channel_args* args) { } grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { - grpc_udp_server* s = - static_cast<grpc_udp_server*>(gpr_malloc(sizeof(grpc_udp_server))); + grpc_udp_server* s = grpc_core::New<grpc_udp_server>(); gpr_mu_init(&s->mu); s->socket_factory = get_socket_factory(args); if (s->socket_factory) { @@ -151,33 +214,27 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; - s->head = nullptr; - s->tail = nullptr; - s->nports = 0; - return s; } -static void shutdown_fd(void* args, grpc_error* error) { - struct shutdown_fd_args* shutdown_args = - static_cast<struct shutdown_fd_args*>(args); - grpc_udp_listener* sp = shutdown_args->sp; - gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd); - gpr_mu_lock(shutdown_args->server_mu); - grpc_fd_shutdown(sp->emfd, GRPC_ERROR_REF(error)); - sp->already_shutdown = true; - if (!sp->notify_on_write_armed) { +// static +void GrpcUdpListener::shutdown_fd(void* args, grpc_error* error) { + if (args == nullptr) { + // No-op if shutdown args are null. + return; + } + auto sp = static_cast<GrpcUdpListener*>(args); + gpr_mu_lock(sp->mutex()); + gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd_); + grpc_fd_shutdown(sp->emfd_, GRPC_ERROR_REF(error)); + sp->already_shutdown_ = true; + if (!sp->notify_on_write_armed_) { // Re-arm write notification to notify listener with error. This is // necessary to decrement active_ports. - sp->notify_on_write_armed = true; - grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); + sp->notify_on_write_armed_ = true; + grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); } - gpr_mu_unlock(shutdown_args->server_mu); - gpr_free(shutdown_args); -} - -static void dummy_cb(void* arg, grpc_error* error) { - // No-op. + gpr_mu_unlock(sp->mutex()); } static void finish_shutdown(grpc_udp_server* s) { @@ -188,24 +245,22 @@ static void finish_shutdown(grpc_udp_server* s) { gpr_mu_destroy(&s->mu); gpr_log(GPR_DEBUG, "Destroy all listeners."); - while (s->head) { - grpc_udp_listener* sp = s->head; - s->head = sp->next; - gpr_free(sp); + for (size_t i = 0; i < s->listeners.size(); ++i) { + s->listeners[i].OnDestroy(); } if (s->socket_factory) { grpc_socket_factory_unref(s->socket_factory); } - gpr_free(s); + grpc_core::Delete(s); } static void destroyed_port(void* server, grpc_error* error) { grpc_udp_server* s = static_cast<grpc_udp_server*>(server); gpr_mu_lock(&s->mu); s->destroyed_ports++; - if (s->destroyed_ports == s->nports) { + if (s->destroyed_ports == s->listeners.size()) { gpr_mu_unlock(&s->mu); finish_shutdown(s); } else { @@ -222,35 +277,30 @@ static void deactivated_all_ports(grpc_udp_server* s) { GPR_ASSERT(s->shutdown); - if (s->head) { - grpc_udp_listener* sp; - for (sp = s->head; sp; sp = sp->next) { - grpc_unlink_if_unix_domain_socket(&sp->addr); - - GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s, - grpc_schedule_on_exec_ctx); - if (!sp->orphan_notified) { - /* Call the orphan_cb to signal that the FD is about to be closed and - * should no longer be used. Because at this point, all listening ports - * have been shutdown already, no need to shutdown again.*/ - GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp, - grpc_schedule_on_exec_ctx); - GPR_ASSERT(sp->orphan_cb); - gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd); - sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); - } - grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr, - false /* already_closed */, "udp_listener_shutdown"); - } - gpr_mu_unlock(&s->mu); - } else { + if (s->listeners.size() == 0) { gpr_mu_unlock(&s->mu); finish_shutdown(s); + return; } + for (size_t i = 0; i < s->listeners.size(); ++i) { + s->listeners[i].OrphanFd(); + } + gpr_mu_unlock(&s->mu); +} + +void GrpcUdpListener::OrphanFd() { + gpr_log(GPR_DEBUG, "Orphan fd %d, emfd %p", fd_, emfd_); + grpc_unlink_if_unix_domain_socket(&addr_); + + GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_, + grpc_schedule_on_exec_ctx); + /* Because at this point, all listening sockets have been shutdown already, no + * need to call OnFdAboutToOrphan() to notify the handler again. */ + grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, + false /* already_closed */, "udp_listener_shutdown"); } void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { - grpc_udp_listener* sp; gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); @@ -261,16 +311,9 @@ void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { gpr_log(GPR_DEBUG, "start to destroy udp_server"); /* shutdown all fd's */ if (s->active_ports) { - for (sp = s->head; sp; sp = sp->next) { - GPR_ASSERT(sp->orphan_cb); - struct shutdown_fd_args* args = - static_cast<struct shutdown_fd_args*>(gpr_malloc(sizeof(*args))); - args->sp = sp; - args->server_mu = &s->mu; - GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args, - grpc_schedule_on_exec_ctx); - sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); - sp->orphan_notified = true; + for (size_t i = 0; i < s->listeners.size(); ++i) { + GrpcUdpListener* sp = &s->listeners[i]; + sp->OnFdAboutToOrphan(); } gpr_mu_unlock(&s->mu); } else { @@ -279,6 +322,24 @@ void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { } } +void GrpcUdpListener::OnFdAboutToOrphan() { + gpr_mu_lock(&mutex_); + grpc_unlink_if_unix_domain_socket(&addr_); + + GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_, + grpc_schedule_on_exec_ctx); + if (!orphan_notified_ && udp_handler_ != nullptr) { + /* Singals udp_handler that the FD is about to be closed and + * should no longer be used. */ + GRPC_CLOSURE_INIT(&orphan_fd_closure_, shutdown_fd, this, + grpc_schedule_on_exec_ctx); + gpr_log(GPR_DEBUG, "fd %d about to be orphaned", fd_); + udp_handler_->OnFdAboutToOrphan(&orphan_fd_closure_, server_->user_data); + orphan_notified_ = true; + } + gpr_mu_unlock(&mutex_); +} + static int bind_socket(grpc_socket_factory* socket_factory, int sockfd, const grpc_resolved_address* addr) { return (socket_factory != nullptr) @@ -364,163 +425,140 @@ error: return -1; } -static void do_read(void* arg, grpc_error* error) { - grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); - GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE); +// static +void GrpcUdpListener::do_read(void* arg, grpc_error* error) { + GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); + GPR_ASSERT(error == GRPC_ERROR_NONE); /* TODO: the reason we hold server->mu here is merely to prevent fd * shutdown while we are reading. However, it blocks do_write(). Switch to * read lock if available. */ - gpr_mu_lock(&sp->server->mu); + gpr_mu_lock(sp->mutex()); /* Tell the registered callback that data is available to read. */ - if (!sp->already_shutdown && sp->read_cb(sp->emfd)) { + if (!sp->already_shutdown_ && sp->udp_handler_->Read()) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ - GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&sp->do_read_closure_, GRPC_ERROR_NONE); } else { /* Finish reading all the packets, re-arm the notification event so we can * get another chance to read. Or fd already shutdown, re-arm to get a * notification with shutdown error. */ - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(sp->emfd_, &sp->read_closure_); } - gpr_mu_unlock(&sp->server->mu); + gpr_mu_unlock(sp->mutex()); } -/* event manager callback when reads are ready */ -static void on_read(void* arg, grpc_error* error) { - grpc_udp_listener* sp = static_cast<grpc_udp_listener*>(arg); +// static +void GrpcUdpListener::on_read(void* arg, grpc_error* error) { + GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); + sp->OnRead(error, arg); +} - gpr_mu_lock(&sp->server->mu); +void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) { if (error != GRPC_ERROR_NONE) { - if (0 == --sp->server->active_ports && sp->server->shutdown) { - gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(sp->server); + gpr_mu_lock(&server_->mu); + if (0 == --server_->active_ports && server_->shutdown) { + gpr_mu_unlock(&server_->mu); + deactivated_all_ports(server_); } else { - gpr_mu_unlock(&sp->server->mu); + gpr_mu_unlock(&server_->mu); } return; } + /* Read once. If there is more data to read, off load the work to another * thread to finish. */ - GPR_ASSERT(sp->read_cb); - if (sp->read_cb(sp->emfd)) { + if (udp_handler_->Read()) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ - GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, + GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg, grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); - GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE); } else { /* Finish reading all the packets, re-arm the notification event so we can * get another chance to read. Or fd already shutdown, re-arm to get a * notification with shutdown error. */ - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(emfd_, &read_closure_); } - gpr_mu_unlock(&sp->server->mu); } +// static // Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface. -void fd_notify_on_write_wrapper(void* arg, grpc_error* error) { - grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); - gpr_mu_lock(&sp->server->mu); - if (!sp->notify_on_write_armed) { - grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); - sp->notify_on_write_armed = true; - } - gpr_mu_unlock(&sp->server->mu); +void GrpcUdpListener::fd_notify_on_write_wrapper(void* arg, grpc_error* error) { + GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); + gpr_mu_lock(sp->mutex()); + if (!sp->notify_on_write_armed_) { + grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); + sp->notify_on_write_armed_ = true; + } + gpr_mu_unlock(sp->mutex()); } -static void do_write(void* arg, grpc_error* error) { - grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); - gpr_mu_lock(&sp->server->mu); - if (sp->already_shutdown) { +// static +void GrpcUdpListener::do_write(void* arg, grpc_error* error) { + GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); + gpr_mu_lock(sp->mutex()); + if (sp->already_shutdown_) { // If fd has been shutdown, don't write any more and re-arm notification. - grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); + grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_); } else { - sp->notify_on_write_armed = false; + sp->notify_on_write_armed_ = false; /* Tell the registered callback that the socket is writeable. */ - GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE); - GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper, + GPR_ASSERT(error == GRPC_ERROR_NONE); + GRPC_CLOSURE_INIT(&sp->notify_on_write_closure_, fd_notify_on_write_wrapper, arg, grpc_schedule_on_exec_ctx); - sp->write_cb(sp->emfd, sp->server->user_data, &sp->notify_on_write_closure); + sp->udp_handler_->OnCanWrite(sp->server_->user_data, + &sp->notify_on_write_closure_); } - gpr_mu_unlock(&sp->server->mu); + gpr_mu_unlock(sp->mutex()); } -static void on_write(void* arg, grpc_error* error) { - grpc_udp_listener* sp = static_cast<grpc_udp_listener*>(arg); +// static +void GrpcUdpListener::on_write(void* arg, grpc_error* error) { + GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg); + sp->OnCanWrite(error, arg); +} - gpr_mu_lock(&sp->server->mu); +void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) { if (error != GRPC_ERROR_NONE) { - if (0 == --sp->server->active_ports && sp->server->shutdown) { - gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(sp->server); + gpr_mu_lock(&server_->mu); + if (0 == --server_->active_ports && server_->shutdown) { + gpr_mu_unlock(&server_->mu); + deactivated_all_ports(server_); } else { - gpr_mu_unlock(&sp->server->mu); + gpr_mu_unlock(&server_->mu); } return; } /* Schedule actual write in another thread. */ - GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg, + GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg, grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); - GRPC_CLOSURE_SCHED(&sp->do_write_closure, GRPC_ERROR_NONE); - gpr_mu_unlock(&sp->server->mu); + GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE); } static int add_socket_to_server(grpc_udp_server* s, int fd, const grpc_resolved_address* addr, - int rcv_buf_size, int snd_buf_size, - grpc_udp_server_start_cb start_cb, - grpc_udp_server_read_cb read_cb, - grpc_udp_server_write_cb write_cb, - grpc_udp_server_orphan_cb orphan_cb) { - grpc_udp_listener* sp; - int port; - char* addr_str; - char* name; + int rcv_buf_size, int snd_buf_size) { + gpr_log(GPR_DEBUG, "add socket %d to server", fd); - port = + int port = prepare_socket(s->socket_factory, fd, addr, rcv_buf_size, snd_buf_size); if (port >= 0) { - grpc_sockaddr_to_string(&addr_str, addr, 1); - gpr_asprintf(&name, "udp-server-listener:%s", addr_str); - gpr_free(addr_str); gpr_mu_lock(&s->mu); - s->nports++; - sp = static_cast<grpc_udp_listener*>(gpr_malloc(sizeof(grpc_udp_listener))); - sp->next = nullptr; - if (s->head == nullptr) { - s->head = sp; - } else { - s->tail->next = sp; - } - s->tail = sp; - sp->server = s; - sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); - memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); - sp->read_cb = read_cb; - sp->write_cb = write_cb; - sp->orphan_cb = orphan_cb; - sp->start_cb = start_cb; - sp->orphan_notified = false; - sp->already_shutdown = false; - GPR_ASSERT(sp->emfd); + s->listeners.emplace_back(s, fd, addr); + gpr_log(GPR_DEBUG, + "add socket %d to server for port %d, %zu listener(s) in total", fd, + port, s->listeners.size()); gpr_mu_unlock(&s->mu); - gpr_free(name); } - return port; } int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, int rcv_buf_size, int snd_buf_size, - grpc_udp_server_start_cb start_cb, - grpc_udp_server_read_cb read_cb, - grpc_udp_server_write_cb write_cb, - grpc_udp_server_orphan_cb orphan_cb) { - grpc_udp_listener* sp; + GrpcUdpHandlerFactory* handler_factory) { int allocated_port1 = -1; int allocated_port2 = -1; int fd; @@ -536,10 +574,10 @@ int grpc_udp_server_add_port(grpc_udp_server* s, /* Check if this is a wildcard port, and if so, try to keep the port the same as some previously created listener. */ if (grpc_sockaddr_get_port(addr) == 0) { - for (sp = s->head; sp; sp = sp->next) { + for (size_t i = 0; i < s->listeners.size(); ++i) { sockname_temp.len = sizeof(struct sockaddr_storage); if (0 == - getsockname(sp->fd, + getsockname(s->listeners[i].fd(), reinterpret_cast<struct sockaddr*>(sockname_temp.addr), reinterpret_cast<socklen_t*>(&sockname_temp.len))) { port = grpc_sockaddr_get_port(&sockname_temp); @@ -559,6 +597,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, addr = &addr6_v4mapped; } + s->handler_factory = handler_factory; /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ if (grpc_sockaddr_is_wildcard(addr, &port)) { grpc_sockaddr_make_wildcards(port, &wild4, &wild6); @@ -569,8 +608,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); allocated_port1 = - add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb, - read_cb, write_cb, orphan_cb); + add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -593,8 +631,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, addr = &addr4_copy; } allocated_port2 = - add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb, - read_cb, write_cb, orphan_cb); + add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); done: gpr_free(allocated_addr); @@ -602,52 +639,55 @@ done: } int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { - grpc_udp_listener* sp; - if (port_index >= s->nports) { + if (port_index >= s->listeners.size()) { return -1; } - for (sp = s->head; sp && port_index != 0; sp = sp->next) { - --port_index; - } - GPR_ASSERT(sp); // if this fails, our check earlier was bogus - return sp->fd; + return s->listeners[port_index].fd(); } void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, size_t pollset_count, void* user_data) { gpr_log(GPR_DEBUG, "grpc_udp_server_start"); - size_t i; gpr_mu_lock(&s->mu); - grpc_udp_listener* sp; GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; s->user_data = user_data; - sp = s->head; - while (sp != nullptr) { - sp->start_cb(sp->emfd, sp->server->user_data); - for (i = 0; i < pollset_count; i++) { - grpc_pollset_add_fd(pollsets[i], sp->emfd); - } - GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp, - grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + for (size_t i = 0; i < s->listeners.size(); ++i) { + s->listeners[i].StartListening(pollsets, pollset_count, s->handler_factory); + } - GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp, - grpc_schedule_on_exec_ctx); - sp->notify_on_write_armed = true; - grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); + gpr_mu_unlock(&s->mu); +} - /* Registered for both read and write callbacks: increment active_ports - * twice to account for this, and delay free-ing of memory until both - * on_read and on_write have fired. */ - s->active_ports += 2; +void GrpcUdpListener::StartListening(grpc_pollset** pollsets, + size_t pollset_count, + GrpcUdpHandlerFactory* handler_factory) { + gpr_mu_lock(&mutex_); + handler_factory_ = handler_factory; + udp_handler_ = handler_factory->CreateUdpHandler(emfd_, server_->user_data); + for (size_t i = 0; i < pollset_count; i++) { + grpc_pollset_add_fd(pollsets[i], emfd_); + } + GRPC_CLOSURE_INIT(&read_closure_, on_read, this, grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_read(emfd_, &read_closure_); + + GRPC_CLOSURE_INIT(&write_closure_, on_write, this, grpc_schedule_on_exec_ctx); + notify_on_write_armed_ = true; + grpc_fd_notify_on_write(emfd_, &write_closure_); + + /* Registered for both read and write callbacks: increment active_ports + * twice to account for this, and delay free-ing of memory until both + * on_read and on_write have fired. */ + server_->active_ports += 2; + gpr_mu_unlock(&mutex_); +} - sp = sp->next; +void GrpcUdpListener::OnDestroy() { + if (udp_handler_ != nullptr) { + handler_factory_->DestroyUdpHandler(udp_handler_); } - - gpr_mu_unlock(&s->mu); } #endif diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 1be4d04dbb..4e384d2cdf 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -21,6 +21,7 @@ #include <grpc/support/port_platform.h> +#include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -32,22 +33,46 @@ struct grpc_server; /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; -/* Called when grpc server starts to listening on the grpc_fd. */ -typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data); - -/* Called when data is available to read from the socket. - * Return true if there is more data to read from fd. */ -typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd); - -/* Called when the socket is writeable. The given closure should be scheduled - * when the socket becomes blocked next time. */ -typedef void (*grpc_udp_server_write_cb)(grpc_fd* emfd, void* user_data, - grpc_closure* notify_on_write_closure); - -/* Called when the grpc_fd is about to be orphaned (and the FD closed). */ -typedef void (*grpc_udp_server_orphan_cb)(grpc_fd* emfd, - grpc_closure* shutdown_fd_callback, - void* user_data); +/* An interface associated with a socket. udp server delivers I/O event on that + * socket to the subclass of this interface which is created through + * GrpcUdpHandlerFactory. + * Its implementation should do the real IO work, e.g. read packet and write. */ +class GrpcUdpHandler { + public: + GrpcUdpHandler(grpc_fd* emfd, void* user_data) {} + virtual ~GrpcUdpHandler() {} + + // Interfaces to be implemented by subclasses to do the actual setup/tear down + // or I/O. + + // Called when data is available to read from the socket. Returns true if + // there is more data to read after this call. + virtual bool Read() GRPC_ABSTRACT; + // Called when socket becomes write unblocked. The given closure should be + // scheduled when the socket becomes blocked next time. + virtual void OnCanWrite(void* user_data, + grpc_closure* notify_on_write_closure) GRPC_ABSTRACT; + // Called before the gRPC FD is orphaned. Notify udp server to continue + // orphaning fd by scheduling the given closure, afterwards the associated fd + // will be closed. + virtual void OnFdAboutToOrphan(grpc_closure* orphan_fd_closure, + void* user_data) GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS +}; + +class GrpcUdpHandlerFactory { + public: + virtual ~GrpcUdpHandlerFactory() {} + /* Called when start to listen on a socket. + * Return an instance of the implementation of GrpcUdpHandler interface which + * will process I/O events for this socket from now on. */ + virtual GrpcUdpHandler* CreateUdpHandler(grpc_fd* emfd, + void* user_data) GRPC_ABSTRACT; + virtual void DestroyUdpHandler(GrpcUdpHandler* handler) GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS +}; /* Create a server, initially not bound to any ports */ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args); @@ -71,10 +96,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index); int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, int rcv_buf_size, int snd_buf_size, - grpc_udp_server_start_cb start_cb, - grpc_udp_server_read_cb read_cb, - grpc_udp_server_write_cb write_cb, - grpc_udp_server_orphan_cb orphan_cb); + GrpcUdpHandlerFactory* handler_factory); void grpc_udp_server_destroy(grpc_udp_server* server, grpc_closure* on_done); diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index f5aca91f97..5a84428b0e 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -52,14 +52,14 @@ struct ChannelData { }; static void fill_metadata(grpc_call_element* elem, grpc_metadata_batch* mdb) { - CallData* calld = reinterpret_cast<CallData*>(elem->call_data); + CallData* calld = static_cast<CallData*>(elem->call_data); bool expected = false; if (!calld->filled_metadata.compare_exchange_strong( expected, true, grpc_core::memory_order_relaxed, grpc_core::memory_order_relaxed)) { return; } - ChannelData* chand = reinterpret_cast<ChannelData*>(elem->channel_data); + ChannelData* chand = static_cast<ChannelData*>(elem->channel_data); char tmp[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(chand->error_code, tmp); calld->status.md = grpc_mdelem_from_slices( @@ -78,7 +78,7 @@ static void fill_metadata(grpc_call_element* elem, grpc_metadata_batch* mdb) { static void lame_start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - CallData* calld = reinterpret_cast<CallData*>(elem->call_data); + CallData* calld = static_cast<CallData*>(elem->call_data); if (op->recv_initial_metadata) { fill_metadata(elem, op->payload->recv_initial_metadata.recv_initial_metadata); @@ -119,7 +119,7 @@ static void lame_start_transport_op(grpc_channel_element* elem, static grpc_error* init_call_elem(grpc_call_element* elem, const grpc_call_element_args* args) { - CallData* calld = reinterpret_cast<CallData*>(elem->call_data); + CallData* calld = static_cast<CallData*>(elem->call_data); calld->call_combiner = args->call_combiner; return GRPC_ERROR_NONE; } @@ -172,7 +172,7 @@ grpc_channel* grpc_lame_client_channel_create(const char* target, "error_message=%s)", 3, (target, (int)error_code, error_message)); GPR_ASSERT(elem->filter == &grpc_lame_filter); - auto chand = reinterpret_cast<grpc_core::ChannelData*>(elem->channel_data); + auto chand = static_cast<grpc_core::ChannelData*>(elem->channel_data); chand->error_code = error_code; chand->error_message = error_message; |