aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/udp_server.cc
diff options
context:
space:
mode:
authorGravatar Dan Zhang <danzh@google.com>2018-02-27 18:27:10 -0500
committerGravatar Dan Zhang <danzh@google.com>2018-02-27 18:27:10 -0500
commitb293e9e8224d1ebe03cc7ef28782c4148542bff0 (patch)
treec0ee27b95b43b14d883ce0210a72a7a51f4c6573 /src/core/lib/iomgr/udp_server.cc
parent0fc97adc9ee41d517ee49ec8e3a8338b793fba7e (diff)
Refactors grpc udp_server_listener to be object oriented. Also adds a mutex to each listener. Instead of sharing the mutex in udp_server for all listeners, this per-listener mutex can make most of the call to different listeners in parallel.
Diffstat (limited to 'src/core/lib/iomgr/udp_server.cc')
-rw-r--r--src/core/lib/iomgr/udp_server.cc482
1 files changed, 263 insertions, 219 deletions
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index ec65497d79..d829c76034 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -52,6 +52,8 @@
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
+#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/executor.h"
@@ -62,41 +64,104 @@
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
-/* one listening port */
-typedef struct grpc_udp_listener grpc_udp_listener;
-struct grpc_udp_listener {
- int fd;
- grpc_fd* emfd;
- grpc_udp_server* server;
- grpc_resolved_address addr;
- grpc_closure read_closure;
- grpc_closure write_closure;
+/* A listener which implements basic features of Listening on a port for
+ * I/O events*/
+class GrpcUdpListener {
+ public:
+ GrpcUdpListener(grpc_udp_server* server, int fd,
+ const grpc_resolved_address* addr);
+ ~GrpcUdpListener();
+
+ /* Called when grpc server starts to listening on the grpc_fd. */
+ void StartListening(grpc_pollset** pollsets, size_t pollset_count,
+ GrpcUdpHandlerFactory* handler_factory);
+
+ /* Called when data is available to read from the socket.
+ * Return true if there is more data to read from fd. */
+ void OnRead(grpc_error* error, void* do_read_arg);
+
+ /* Called when the socket is writeable. The given closure should be scheduled
+ * when the socket becomes blocked next time. */
+ void OnCanWrite(grpc_error* error, void* do_write_arg);
+
+ /* Called when the grpc_fd is about to be orphaned (and the FD closed). */
+ void OnFdAboutToOrphan();
+
+ /* Called to orphan fd of this listener.*/
+ void OrphanFd();
+
+ /* Called when this listener is going to be destroyed. */
+ void OnDestroy();
+
+ int fd() const { return fd_; }
+
+ protected:
+ grpc_fd* emfd() const { return emfd_; }
+
+ gpr_mu* mutex() { return &mutex_; }
+
+ private:
+ /* event manager callback when reads are ready */
+ static void on_read(void* arg, grpc_error* error);
+ static void on_write(void* arg, grpc_error* error);
+
+ static void do_read(void* arg, grpc_error* error);
+ static void do_write(void* arg, grpc_error* error);
+ // Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback
+ // interface.
+ static void fd_notify_on_write_wrapper(void* arg, grpc_error* error);
+
+ static void shutdown_fd(void* args, grpc_error* error);
+
+ int fd_;
+ grpc_fd* emfd_;
+ grpc_udp_server* server_;
+ grpc_resolved_address addr_;
+ grpc_closure read_closure_;
+ grpc_closure write_closure_;
// To be called when corresponding QuicGrpcServer closes all active
// connections.
- grpc_closure orphan_fd_closure;
- grpc_closure destroyed_closure;
- grpc_udp_server_read_cb read_cb;
- grpc_udp_server_write_cb write_cb;
- grpc_udp_server_orphan_cb orphan_cb;
- grpc_udp_server_start_cb start_cb;
- // To be scheduled on another thread to actually read/write.
- grpc_closure do_read_closure;
- grpc_closure do_write_closure;
- grpc_closure notify_on_write_closure;
+ grpc_closure orphan_fd_closure_;
+ grpc_closure destroyed_closure_;
+ // To be scheduled on another thread to actually read/write.
+ grpc_closure do_read_closure_;
+ grpc_closure do_write_closure_;
+ grpc_closure notify_on_write_closure_;
// True if orphan_cb is trigered.
- bool orphan_notified;
+ bool orphan_notified_;
// True if grpc_fd_notify_on_write() is called after on_write() call.
- bool notify_on_write_armed;
+ bool notify_on_write_armed_;
// True if fd has been shutdown.
- bool already_shutdown;
-
- struct grpc_udp_listener* next;
+ bool already_shutdown_;
+ // Object actually handles I/O events. Assigned in StartListening().
+ GrpcUdpHandler* udp_handler_ = nullptr;
+ // To be notified on destruction.
+ GrpcUdpHandlerFactory* handler_factory_ = nullptr;
+ // Required to access above fields.
+ gpr_mu mutex_;
};
-struct shutdown_fd_args {
- grpc_udp_listener* sp;
- gpr_mu* server_mu;
-};
+GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd,
+ const grpc_resolved_address* addr)
+ : fd_(fd),
+ server_(server),
+ orphan_notified_(false),
+ already_shutdown_(false) {
+ char* addr_str;
+ char* name;
+ grpc_sockaddr_to_string(&addr_str, addr, 1);
+ gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
+ gpr_free(addr_str);
+ emfd_ = grpc_fd_create(fd, name);
+ memcpy(&addr_, addr, sizeof(grpc_resolved_address));
+ GPR_ASSERT(emfd_);
+ gpr_free(name);
+ gpr_mu_init(&mutex_);
+}
+
+GrpcUdpListener::~GrpcUdpListener() {
+ gpr_mu_destroy(&mutex_);
+}
/* the overall server */
struct grpc_udp_server {
@@ -113,10 +178,11 @@ struct grpc_udp_server {
/* is this server shutting down? (boolean) */
int shutdown;
- /* linked list of server ports */
- grpc_udp_listener* head;
- grpc_udp_listener* tail;
- unsigned nports;
+ /* An array of listeners */
+ grpc_core::InlinedVector<GrpcUdpListener, 16> listeners;
+
+ /* factory for use to create udp listeners */
+ GrpcUdpHandlerFactory* handler_factory;
/* shutdown callback */
grpc_closure* shutdown_complete;
@@ -141,8 +207,7 @@ static grpc_socket_factory* get_socket_factory(const grpc_channel_args* args) {
}
grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) {
- grpc_udp_server* s =
- static_cast<grpc_udp_server*>(gpr_malloc(sizeof(grpc_udp_server)));
+ grpc_udp_server* s = grpc_core::New<grpc_udp_server>();
gpr_mu_init(&s->mu);
s->socket_factory = get_socket_factory(args);
if (s->socket_factory) {
@@ -151,34 +216,29 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) {
s->active_ports = 0;
s->destroyed_ports = 0;
s->shutdown = 0;
- s->head = nullptr;
- s->tail = nullptr;
- s->nports = 0;
-
return s;
}
-static void shutdown_fd(void* args, grpc_error* error) {
- struct shutdown_fd_args* shutdown_args =
- static_cast<struct shutdown_fd_args*>(args);
- grpc_udp_listener* sp = shutdown_args->sp;
- gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd);
- gpr_mu_lock(shutdown_args->server_mu);
- grpc_fd_shutdown(sp->emfd, GRPC_ERROR_REF(error));
- sp->already_shutdown = true;
- if (!sp->notify_on_write_armed) {
+// static
+void GrpcUdpListener::shutdown_fd(void* args, grpc_error* error) {
+ if (args == nullptr) {
+ // No-op if shutdown args are null.
+ return;
+ }
+ auto sp = static_cast<GrpcUdpListener*>(args);
+ gpr_mu_lock(sp->mutex());
+ gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd_);
+ grpc_fd_shutdown(sp->emfd_, GRPC_ERROR_REF(error));
+ sp->already_shutdown_ = true;
+ if (!sp->notify_on_write_armed_) {
// Re-arm write notification to notify listener with error. This is
// necessary to decrement active_ports.
- sp->notify_on_write_armed = true;
- grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
+ sp->notify_on_write_armed_ = true;
+ grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_);
}
- gpr_mu_unlock(shutdown_args->server_mu);
- gpr_free(shutdown_args);
+ gpr_mu_unlock(sp->mutex());
}
-static void dummy_cb(void* arg, grpc_error* error) {
- // No-op.
-}
static void finish_shutdown(grpc_udp_server* s) {
if (s->shutdown_complete != nullptr) {
@@ -188,24 +248,23 @@ static void finish_shutdown(grpc_udp_server* s) {
gpr_mu_destroy(&s->mu);
gpr_log(GPR_DEBUG, "Destroy all listeners.");
- while (s->head) {
- grpc_udp_listener* sp = s->head;
- s->head = sp->next;
- gpr_free(sp);
+ for(size_t i = 0; i < s->listeners.size(); ++i) {
+ s->listeners[i].OnDestroy();
}
if (s->socket_factory) {
grpc_socket_factory_unref(s->socket_factory);
}
- gpr_free(s);
+ grpc_core::Delete(s);
+
}
static void destroyed_port(void* server, grpc_error* error) {
grpc_udp_server* s = static_cast<grpc_udp_server*>(server);
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
- if (s->destroyed_ports == s->nports) {
+ if (s->destroyed_ports == s->listeners.size()) {
gpr_mu_unlock(&s->mu);
finish_shutdown(s);
} else {
@@ -222,35 +281,30 @@ static void deactivated_all_ports(grpc_udp_server* s) {
GPR_ASSERT(s->shutdown);
- if (s->head) {
- grpc_udp_listener* sp;
- for (sp = s->head; sp; sp = sp->next) {
- grpc_unlink_if_unix_domain_socket(&sp->addr);
-
- GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
- grpc_schedule_on_exec_ctx);
- if (!sp->orphan_notified) {
- /* Call the orphan_cb to signal that the FD is about to be closed and
- * should no longer be used. Because at this point, all listening ports
- * have been shutdown already, no need to shutdown again.*/
- GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp,
- grpc_schedule_on_exec_ctx);
- GPR_ASSERT(sp->orphan_cb);
- gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd);
- sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data);
- }
- grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
- false /* already_closed */, "udp_listener_shutdown");
- }
- gpr_mu_unlock(&s->mu);
- } else {
- gpr_mu_unlock(&s->mu);
+ if (s->listeners.size() == 0) {
+ gpr_mu_unlock(&s->mu);
finish_shutdown(s);
+ return;
+ }
+ for (size_t i = 0; i < s->listeners.size(); ++i) {
+ s->listeners[i].OrphanFd();
}
+ gpr_mu_unlock(&s->mu);
+}
+
+void GrpcUdpListener::OrphanFd() {
+ gpr_log(GPR_DEBUG, "Orphan fd %d, emfd %p", fd_, emfd_);
+ grpc_unlink_if_unix_domain_socket(&addr_);
+
+ GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_,
+ grpc_schedule_on_exec_ctx);
+ /* Because at this point, all listening sockets have been shutdown already, no
+ * need to call OnFdAboutToOrphan() to notify the handler again. */
+ grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr,
+ false /* already_closed */, "udp_listener_shutdown");
}
void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) {
- grpc_udp_listener* sp;
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
@@ -261,16 +315,9 @@ void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) {
gpr_log(GPR_DEBUG, "start to destroy udp_server");
/* shutdown all fd's */
if (s->active_ports) {
- for (sp = s->head; sp; sp = sp->next) {
- GPR_ASSERT(sp->orphan_cb);
- struct shutdown_fd_args* args =
- static_cast<struct shutdown_fd_args*>(gpr_malloc(sizeof(*args)));
- args->sp = sp;
- args->server_mu = &s->mu;
- GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args,
- grpc_schedule_on_exec_ctx);
- sp->orphan_cb(sp->emfd, &sp->orphan_fd_closure, sp->server->user_data);
- sp->orphan_notified = true;
+ for (size_t i = 0; i < s->listeners.size(); ++i) {
+ GrpcUdpListener* sp = &s->listeners[i];
+ sp->OnFdAboutToOrphan();
}
gpr_mu_unlock(&s->mu);
} else {
@@ -279,6 +326,24 @@ void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) {
}
}
+void GrpcUdpListener::OnFdAboutToOrphan() {
+ gpr_mu_lock(&mutex_);
+ grpc_unlink_if_unix_domain_socket(&addr_);
+
+ GRPC_CLOSURE_INIT(&destroyed_closure_, destroyed_port, server_,
+ grpc_schedule_on_exec_ctx);
+ if (!orphan_notified_ && udp_handler_ != nullptr) {
+ /* Singals udp_handler that the FD is about to be closed and
+ * should no longer be used. */
+ GRPC_CLOSURE_INIT(&orphan_fd_closure_, shutdown_fd, this,
+ grpc_schedule_on_exec_ctx);
+ gpr_log(GPR_DEBUG, "fd %d about to be orphaned", fd_);
+ udp_handler_->OnFdAboutToOrphan(&orphan_fd_closure_, server_->user_data);
+ orphan_notified_ = true;
+ }
+ gpr_mu_unlock(&mutex_);
+}
+
static int bind_socket(grpc_socket_factory* socket_factory, int sockfd,
const grpc_resolved_address* addr) {
return (socket_factory != nullptr)
@@ -364,163 +429,140 @@ error:
return -1;
}
-static void do_read(void* arg, grpc_error* error) {
- grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
- GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE);
+// static
+void GrpcUdpListener::do_read(void* arg, grpc_error* error) {
+ GrpcUdpListener* sp = reinterpret_cast<GrpcUdpListener*>(arg);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
/* TODO: the reason we hold server->mu here is merely to prevent fd
* shutdown while we are reading. However, it blocks do_write(). Switch to
* read lock if available. */
- gpr_mu_lock(&sp->server->mu);
+ gpr_mu_lock(sp->mutex());
/* Tell the registered callback that data is available to read. */
- if (!sp->already_shutdown && sp->read_cb(sp->emfd)) {
+ if (!sp->already_shutdown_ && sp->udp_handler_->Read()) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
- GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&sp->do_read_closure_, GRPC_ERROR_NONE);
} else {
/* Finish reading all the packets, re-arm the notification event so we can
* get another chance to read. Or fd already shutdown, re-arm to get a
* notification with shutdown error. */
- grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+ grpc_fd_notify_on_read(sp->emfd_, &sp->read_closure_);
}
- gpr_mu_unlock(&sp->server->mu);
+ gpr_mu_unlock(sp->mutex());
}
-/* event manager callback when reads are ready */
-static void on_read(void* arg, grpc_error* error) {
- grpc_udp_listener* sp = static_cast<grpc_udp_listener*>(arg);
+// static
+void GrpcUdpListener::on_read(void* arg, grpc_error* error) {
+ GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg);
+ sp->OnRead(error, arg);
+}
- gpr_mu_lock(&sp->server->mu);
+void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) {
if (error != GRPC_ERROR_NONE) {
- if (0 == --sp->server->active_ports && sp->server->shutdown) {
- gpr_mu_unlock(&sp->server->mu);
- deactivated_all_ports(sp->server);
+ gpr_mu_lock(&server_->mu);
+ if (0 == --server_->active_ports && server_->shutdown) {
+ gpr_mu_unlock(&server_->mu);
+ deactivated_all_ports(server_);
} else {
- gpr_mu_unlock(&sp->server->mu);
+ gpr_mu_unlock(&server_->mu);
}
return;
}
+
/* Read once. If there is more data to read, off load the work to another
* thread to finish. */
- GPR_ASSERT(sp->read_cb);
- if (sp->read_cb(sp->emfd)) {
+ if (udp_handler_->Read()) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
- GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg,
+ GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg,
grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
- GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE);
} else {
/* Finish reading all the packets, re-arm the notification event so we can
* get another chance to read. Or fd already shutdown, re-arm to get a
* notification with shutdown error. */
- grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+ grpc_fd_notify_on_read(emfd_, &read_closure_);
}
- gpr_mu_unlock(&sp->server->mu);
}
+// static
// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface.
-void fd_notify_on_write_wrapper(void* arg, grpc_error* error) {
- grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
- gpr_mu_lock(&sp->server->mu);
- if (!sp->notify_on_write_armed) {
- grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
- sp->notify_on_write_armed = true;
- }
- gpr_mu_unlock(&sp->server->mu);
+void GrpcUdpListener::fd_notify_on_write_wrapper(void* arg, grpc_error* error) {
+ GrpcUdpListener* sp = reinterpret_cast<GrpcUdpListener*>(arg);
+ gpr_mu_lock(sp->mutex());
+ if (!sp->notify_on_write_armed_) {
+ grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_);
+ sp->notify_on_write_armed_ = true;
+ }
+ gpr_mu_unlock(sp->mutex());
}
-static void do_write(void* arg, grpc_error* error) {
- grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
- gpr_mu_lock(&sp->server->mu);
- if (sp->already_shutdown) {
+// static
+void GrpcUdpListener::do_write(void* arg, grpc_error* error) {
+ GrpcUdpListener* sp = reinterpret_cast<GrpcUdpListener*>(arg);
+ gpr_mu_lock(sp->mutex());
+ if (sp->already_shutdown_) {
// If fd has been shutdown, don't write any more and re-arm notification.
- grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
+ grpc_fd_notify_on_write(sp->emfd_, &sp->write_closure_);
} else {
- sp->notify_on_write_armed = false;
+ sp->notify_on_write_armed_ = false;
/* Tell the registered callback that the socket is writeable. */
- GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE);
- GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper,
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GRPC_CLOSURE_INIT(&sp->notify_on_write_closure_, fd_notify_on_write_wrapper,
arg, grpc_schedule_on_exec_ctx);
- sp->write_cb(sp->emfd, sp->server->user_data, &sp->notify_on_write_closure);
+ sp->udp_handler_->OnCanWrite(sp->server_->user_data,
+ &sp->notify_on_write_closure_);
}
- gpr_mu_unlock(&sp->server->mu);
+ gpr_mu_unlock(sp->mutex());
}
-static void on_write(void* arg, grpc_error* error) {
- grpc_udp_listener* sp = static_cast<grpc_udp_listener*>(arg);
+// static
+void GrpcUdpListener::on_write(void* arg, grpc_error* error) {
+ GrpcUdpListener* sp = static_cast<GrpcUdpListener*>(arg);
+ sp->OnCanWrite(error, arg);
+}
- gpr_mu_lock(&sp->server->mu);
+void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) {
if (error != GRPC_ERROR_NONE) {
- if (0 == --sp->server->active_ports && sp->server->shutdown) {
- gpr_mu_unlock(&sp->server->mu);
- deactivated_all_ports(sp->server);
+ gpr_mu_lock(&server_->mu);
+ if (0 == --server_->active_ports && server_->shutdown) {
+ gpr_mu_unlock(&server_->mu);
+ deactivated_all_ports(server_);
} else {
- gpr_mu_unlock(&sp->server->mu);
+ gpr_mu_unlock(&server_->mu);
}
return;
}
/* Schedule actual write in another thread. */
- GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg,
+ GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg,
grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
- GRPC_CLOSURE_SCHED(&sp->do_write_closure, GRPC_ERROR_NONE);
- gpr_mu_unlock(&sp->server->mu);
+ GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE);
}
static int add_socket_to_server(grpc_udp_server* s, int fd,
const grpc_resolved_address* addr,
- int rcv_buf_size, int snd_buf_size,
- grpc_udp_server_start_cb start_cb,
- grpc_udp_server_read_cb read_cb,
- grpc_udp_server_write_cb write_cb,
- grpc_udp_server_orphan_cb orphan_cb) {
- grpc_udp_listener* sp;
- int port;
- char* addr_str;
- char* name;
+ int rcv_buf_size, int snd_buf_size) {
+ gpr_log(GPR_DEBUG, "add socket %d to server", fd);
- port =
+ int port =
prepare_socket(s->socket_factory, fd, addr, rcv_buf_size, snd_buf_size);
if (port >= 0) {
- grpc_sockaddr_to_string(&addr_str, addr, 1);
- gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
- gpr_free(addr_str);
gpr_mu_lock(&s->mu);
- s->nports++;
- sp = static_cast<grpc_udp_listener*>(gpr_malloc(sizeof(grpc_udp_listener)));
- sp->next = nullptr;
- if (s->head == nullptr) {
- s->head = sp;
- } else {
- s->tail->next = sp;
- }
- s->tail = sp;
- sp->server = s;
- sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name);
- memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
- sp->read_cb = read_cb;
- sp->write_cb = write_cb;
- sp->orphan_cb = orphan_cb;
- sp->start_cb = start_cb;
- sp->orphan_notified = false;
- sp->already_shutdown = false;
- GPR_ASSERT(sp->emfd);
+ s->listeners.emplace_back(s, fd, addr);
+ gpr_log(GPR_DEBUG,
+ "add socket %d to server for port %d, %zu listener(s) in total", fd,
+ port, s->listeners.size());
gpr_mu_unlock(&s->mu);
- gpr_free(name);
}
-
return port;
}
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
int rcv_buf_size, int snd_buf_size,
- grpc_udp_server_start_cb start_cb,
- grpc_udp_server_read_cb read_cb,
- grpc_udp_server_write_cb write_cb,
- grpc_udp_server_orphan_cb orphan_cb) {
- grpc_udp_listener* sp;
+ GrpcUdpHandlerFactory* handler_factory) {
int allocated_port1 = -1;
int allocated_port2 = -1;
int fd;
@@ -536,10 +578,10 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
/* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) {
- for (sp = s->head; sp; sp = sp->next) {
+ for (size_t i = 0; i < s->listeners.size(); ++i) {
sockname_temp.len = sizeof(struct sockaddr_storage);
if (0 ==
- getsockname(sp->fd,
+ getsockname(s->listeners[i].fd(),
reinterpret_cast<struct sockaddr*>(sockname_temp.addr),
reinterpret_cast<socklen_t*>(&sockname_temp.len))) {
port = grpc_sockaddr_get_port(&sockname_temp);
@@ -559,6 +601,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
addr = &addr6_v4mapped;
}
+ s->handler_factory = handler_factory;
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
if (grpc_sockaddr_is_wildcard(addr, &port)) {
grpc_sockaddr_make_wildcards(port, &wild4, &wild6);
@@ -569,8 +612,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
allocated_port1 =
- add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb,
- read_cb, write_cb, orphan_cb);
+ add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@@ -593,8 +635,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
addr = &addr4_copy;
}
allocated_port2 =
- add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb,
- read_cb, write_cb, orphan_cb);
+ add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size);
done:
gpr_free(allocated_addr);
@@ -602,52 +643,55 @@ done:
}
int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) {
- grpc_udp_listener* sp;
- if (port_index >= s->nports) {
+ if (port_index >= s->listeners.size()) {
return -1;
}
- for (sp = s->head; sp && port_index != 0; sp = sp->next) {
- --port_index;
- }
- GPR_ASSERT(sp); // if this fails, our check earlier was bogus
- return sp->fd;
+ return s->listeners[port_index].fd();
}
void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
size_t pollset_count, void* user_data) {
gpr_log(GPR_DEBUG, "grpc_udp_server_start");
- size_t i;
gpr_mu_lock(&s->mu);
- grpc_udp_listener* sp;
GPR_ASSERT(s->active_ports == 0);
s->pollsets = pollsets;
s->user_data = user_data;
- sp = s->head;
- while (sp != nullptr) {
- sp->start_cb(sp->emfd, sp->server->user_data);
- for (i = 0; i < pollset_count; i++) {
- grpc_pollset_add_fd(pollsets[i], sp->emfd);
- }
- GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
- grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+ for(size_t i = 0; i < s->listeners.size(); ++i) {
+ s->listeners[i].StartListening(pollsets, pollset_count, s->handler_factory);
+ }
- GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp,
- grpc_schedule_on_exec_ctx);
- sp->notify_on_write_armed = true;
- grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
+ gpr_mu_unlock(&s->mu);
+}
- /* Registered for both read and write callbacks: increment active_ports
- * twice to account for this, and delay free-ing of memory until both
- * on_read and on_write have fired. */
- s->active_ports += 2;
+void GrpcUdpListener::StartListening(grpc_pollset** pollsets,
+ size_t pollset_count,
+ GrpcUdpHandlerFactory* handler_factory) {
+ gpr_mu_lock(&mutex_);
+ handler_factory_ = handler_factory;
+ udp_handler_ = handler_factory->CreateUdpHandler(emfd_, server_->user_data);
+ for (size_t i = 0; i < pollset_count; i++) {
+ grpc_pollset_add_fd(pollsets[i], emfd_);
+ }
+ GRPC_CLOSURE_INIT(&read_closure_, on_read, this, grpc_schedule_on_exec_ctx);
+ grpc_fd_notify_on_read(emfd_, &read_closure_);
+
+ GRPC_CLOSURE_INIT(&write_closure_, on_write, this, grpc_schedule_on_exec_ctx);
+ notify_on_write_armed_ = true;
+ grpc_fd_notify_on_write(emfd_, &write_closure_);
+
+ /* Registered for both read and write callbacks: increment active_ports
+ * twice to account for this, and delay free-ing of memory until both
+ * on_read and on_write have fired. */
+ server_->active_ports += 2;
+ gpr_mu_unlock(&mutex_);
+}
- sp = sp->next;
+void GrpcUdpListener::OnDestroy() {
+ if (udp_handler_ != nullptr) {
+ handler_factory_->DestroyUdpHandler(udp_handler_);
}
-
- gpr_mu_unlock(&s->mu);
}
#endif