diff options
author | 2014-12-09 14:39:16 -0800 | |
---|---|---|
committer | 2014-12-09 16:20:55 -0800 | |
commit | 18b49ab914ea5a57f22ed6d77520cd7d4372749b (patch) | |
tree | c2ec5971eebd10e3ef52c0c084c797b8d06bb267 /src/core/iomgr | |
parent | 98bffb779b8c47f4d76c72c7807d9f1b1074a795 (diff) |
Introducing iomgr.
Move eventmanager and platform dependent endpoint functionality into a single
library called 'iomgr'.
This is primarily to prepare for a Windows port - where posix socket semantics
lead to poor quality code.
Mostly this is a code movement CL, with some small changes to help prepare the
way for porting:
- em style fd objects can only be held internally in iomgr, and own their memory
- added grpc_iomgr_create_endpoint_pair() to accomodate the common pattern of
creating a tcp endpoint from the output of socketpair - this will help keep
our tests portable
- separated em alarm interface into a separate file, as this part of event
manager is needed higher up the stack
- made the eventmanager bits a true singleton, simplifying API's across the
stack as there's no longer a reason to carry a pointer there.
Initial design document is here:
https://docs.google.com/document/d/1VmafcHvvrP5kwtQkz84R5yXF7u7fW-9Pn0bkSUQHDt8/edit?disco=AAAAARNByxg
Change on 2014/12/09 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=81716456
Diffstat (limited to 'src/core/iomgr')
25 files changed, 3492 insertions, 0 deletions
diff --git a/src/core/iomgr/alarm.h b/src/core/iomgr/alarm.h new file mode 100644 index 0000000000..5bd00d9736 --- /dev/null +++ b/src/core/iomgr/alarm.h @@ -0,0 +1,85 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_ALARM_H__ +#define __GRPC_INTERNAL_IOMGR_ALARM_H__ + +#include "src/core/iomgr/iomgr.h" +#include <grpc/support/port_platform.h> +#include <grpc/support/time.h> + +typedef struct grpc_alarm grpc_alarm; + +/* One of the following headers should provide struct grpc_alarm */ +#ifdef GPR_LIBEVENT +#include "src/core/iomgr/iomgr_libevent.h" +#endif + +/* Initialize *alarm. When expired or canceled, alarm_cb will be called with + *alarm_cb_arg and status to indicate if it expired (SUCCESS) or was + canceled (CANCELLED). alarm_cb is guaranteed to be called exactly once, + and application code should check the status to determine how it was + invoked. The application callback is also responsible for maintaining + information about when to free up any user-level state. */ +void grpc_alarm_init(grpc_alarm *alarm, grpc_iomgr_cb_func alarm_cb, + void *alarm_cb_arg); + +/* Note that there is no alarm destroy function. This is because the + alarm is a one-time occurrence with a guarantee that the callback will + be called exactly once, either at expiration or cancellation. Thus, all + the internal alarm event management state is destroyed just before + that callback is invoked. If the user has additional state associated with + the alarm, the user is responsible for determining when it is safe to + destroy that state. */ + +/* Schedule *alarm to expire at deadline. If *alarm is + re-added before expiration, the *delay is simply reset to the new value. + Return GRPC_EM_OK on success, or GRPC_EM_ERROR on failure. + Upon failure, caller should abort further operations on *alarm */ +int grpc_alarm_add(grpc_alarm *alarm, gpr_timespec deadline); + +/* Cancel an *alarm. + There are three cases: + 1. We normally cancel the alarm + 2. The alarm has already run + 3. We can't cancel the alarm because it is "in flight". + + In all of these cases, the cancellation is still considered successful. + They are essentially distinguished in that the alarm_cb will be run + exactly once from either the cancellation (with status CANCELLED) + or from the activation (with status SUCCESS) + + Requires: cancel() must happen after add() on a given alarm */ +int grpc_alarm_cancel(grpc_alarm *alarm); + +#endif /* __GRPC_INTERNAL_IOMGR_ALARM_H__ */ diff --git a/src/core/iomgr/endpoint_pair.h b/src/core/iomgr/endpoint_pair.h new file mode 100644 index 0000000000..4a97ebf0f6 --- /dev/null +++ b/src/core/iomgr/endpoint_pair.h @@ -0,0 +1,46 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_ENDPOINT_PAIR_H_ +#define __GRPC_INTERNAL_IOMGR_ENDPOINT_PAIR_H_ + +#include "src/core/endpoint/endpoint.h" + +typedef struct { + grpc_endpoint *client; + grpc_endpoint *server; +} grpc_endpoint_pair; + +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size); + +#endif /* __GRPC_INTERNAL_IOMGR_ENDPOINT_PAIR_H_ */ diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c new file mode 100644 index 0000000000..f08d1344eb --- /dev/null +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -0,0 +1,61 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/endpoint_pair.h" + +#include <errno.h> +#include <fcntl.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> + +#include "src/core/iomgr/tcp_posix.h" +#include <grpc/support/log.h> + +static void create_sockets(int sv[2]) { + int flags; + GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); + flags = fcntl(sv[0], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); + flags = fcntl(sv[1], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); +} + +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size) { + int sv[2]; + grpc_endpoint_pair p; + create_sockets(sv); + p.client = grpc_tcp_create(grpc_fd_create(sv[1]), read_slice_size); + p.server = grpc_tcp_create(grpc_fd_create(sv[0]), read_slice_size); + return p; +} diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h new file mode 100644 index 0000000000..cf39f947bc --- /dev/null +++ b/src/core/iomgr/iomgr.h @@ -0,0 +1,56 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_H__ +#define __GRPC_INTERNAL_IOMGR_IOMGR_H__ + +/* Status passed to callbacks for grpc_em_fd_notify_on_read and + grpc_em_fd_notify_on_write. */ +typedef enum grpc_em_cb_status { + GRPC_CALLBACK_SUCCESS = 0, + GRPC_CALLBACK_TIMED_OUT, + GRPC_CALLBACK_CANCELLED, + GRPC_CALLBACK_DO_NOT_USE +} grpc_iomgr_cb_status; + +/* gRPC Callback definition */ +typedef void (*grpc_iomgr_cb_func)(void *arg, grpc_iomgr_cb_status status); + +void grpc_iomgr_init(); +void grpc_iomgr_shutdown(); + +/* This function is called from within a callback or from anywhere else + and causes the invocation of a callback at some point in the future */ +void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg); + +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_H__ */ diff --git a/src/core/iomgr/iomgr_completion_queue_interface.h b/src/core/iomgr/iomgr_completion_queue_interface.h new file mode 100644 index 0000000000..3c4efe773a --- /dev/null +++ b/src/core/iomgr/iomgr_completion_queue_interface.h @@ -0,0 +1,45 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ +#define __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ + +/* Internals of iomgr that are exposed only to be used for completion queue + implementation */ + +extern gpr_mu grpc_iomgr_mu; +extern gpr_cv grpc_iomgr_cv; + +int grpc_iomgr_work(gpr_timespec deadline); + +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ */ diff --git a/src/core/iomgr/iomgr_libevent.c b/src/core/iomgr/iomgr_libevent.c new file mode 100644 index 0000000000..1af03dcf12 --- /dev/null +++ b/src/core/iomgr/iomgr_libevent.c @@ -0,0 +1,676 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/iomgr_libevent.h" + +#include <unistd.h> +#include <fcntl.h> + +#include "src/core/iomgr/alarm.h" +#include <grpc/support/atm.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> +#include <event2/event.h> +#include <event2/thread.h> + +#define ALARM_TRIGGER_INIT ((gpr_atm)0) +#define ALARM_TRIGGER_INCREMENT ((gpr_atm)1) +#define DONE_SHUTDOWN ((void *)1) + +#define POLLER_ID_INVALID ((gpr_atm)-1) + +/* Global data */ +struct event_base *g_event_base; +gpr_mu grpc_iomgr_mu; +gpr_cv grpc_iomgr_cv; +static grpc_libevent_activation_data *g_activation_queue; +static int g_num_pollers; +static int g_num_fds; +static gpr_timespec g_last_poll_completed; +static int g_shutdown_backup_poller; +static gpr_event g_backup_poller_done; +/* activated to break out of the event loop early */ +static struct event *g_timeout_ev; +static grpc_fd *g_fds_to_free; + +int evthread_use_threads(void); +static void grpc_fd_impl_destroy(grpc_fd *impl); + +/* If anything is in the work queue, process one item and return 1. + Return 0 if there were no work items to complete. + Requires grpc_iomgr_mu locked, may unlock and relock during the call. */ +static int maybe_do_queue_work() { + grpc_libevent_activation_data *work = g_activation_queue; + + if (work == NULL) return 0; + + if (work->next == work) { + g_activation_queue = NULL; + } else { + g_activation_queue = work->next; + g_activation_queue->prev = work->prev; + g_activation_queue->next->prev = g_activation_queue->prev->next = + g_activation_queue; + } + work->next = work->prev = NULL; + gpr_mu_unlock(&grpc_iomgr_mu); + + work->cb(work->arg, work->status); + + gpr_mu_lock(&grpc_iomgr_mu); + return 1; +} + +/* Break out of the event loop on timeout */ +static void timer_callback(int fd, short events, void *context) { + event_base_loopbreak((struct event_base *)context); +} + +static void free_fd_list(grpc_fd *impl) { + while (impl != NULL) { + grpc_fd *current = impl; + impl = impl->next; + grpc_fd_impl_destroy(current); + gpr_free(current); + } +} + +static void maybe_free_fds() { + if (g_fds_to_free) { + free_fd_list(g_fds_to_free); + g_fds_to_free = NULL; + } +} + +/* Spend some time doing polling and libevent maintenance work if no other + thread is. This includes both polling for events and destroying/closing file + descriptor objects. + Returns 1 if polling was performed, 0 otherwise. + Requires grpc_iomgr_mu locked, may unlock and relock during the call. */ +static int maybe_do_polling_work(struct timeval delay) { + int status; + + if (g_num_pollers) return 0; + + g_num_pollers = 1; + + maybe_free_fds(); + + gpr_mu_unlock(&grpc_iomgr_mu); + + event_add(g_timeout_ev, &delay); + status = event_base_loop(g_event_base, EVLOOP_ONCE); + if (status < 0) { + gpr_log(GPR_ERROR, "event polling loop stops with error status %d", status); + } + event_del(g_timeout_ev); + + gpr_mu_lock(&grpc_iomgr_mu); + maybe_free_fds(); + + g_num_pollers = 0; + gpr_cv_broadcast(&grpc_iomgr_cv); + return 1; +} + +int grpc_iomgr_work(gpr_timespec deadline) { + gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); + /* poll for no longer than one second */ + gpr_timespec max_delay = {1, 0}; + struct timeval delay; + + if (gpr_time_cmp(delay_timespec, gpr_time_0) <= 0) { + return 0; + } + + if (gpr_time_cmp(delay_timespec, max_delay) > 0) { + delay_timespec = max_delay; + } + + delay = gpr_timeval_from_timespec(delay_timespec); + + if (maybe_do_queue_work() || maybe_do_polling_work(delay)) { + g_last_poll_completed = gpr_now(); + return 1; + } + + return 0; +} + +static void backup_poller_thread(void *p) { + int backup_poller_engaged = 0; + /* allow no pollers for 100 milliseconds, then engage backup polling */ + gpr_timespec allow_no_pollers = gpr_time_from_micros(100 * 1000); + + gpr_mu_lock(&grpc_iomgr_mu); + while (!g_shutdown_backup_poller) { + if (g_num_pollers == 0) { + gpr_timespec now = gpr_now(); + gpr_timespec time_until_engage = gpr_time_sub( + allow_no_pollers, gpr_time_sub(now, g_last_poll_completed)); + if (gpr_time_cmp(time_until_engage, gpr_time_0) <= 0) { + if (!backup_poller_engaged) { + gpr_log(GPR_DEBUG, "No pollers for a while - engaging backup poller"); + backup_poller_engaged = 1; + } + if (!maybe_do_queue_work()) { + struct timeval tv = {1, 0}; + maybe_do_polling_work(tv); + } + } else { + if (backup_poller_engaged) { + gpr_log(GPR_DEBUG, "Backup poller disengaged"); + backup_poller_engaged = 0; + } + gpr_mu_unlock(&grpc_iomgr_mu); + gpr_sleep_until(gpr_time_add(now, time_until_engage)); + gpr_mu_lock(&grpc_iomgr_mu); + } + } else { + if (backup_poller_engaged) { + gpr_log(GPR_DEBUG, "Backup poller disengaged"); + backup_poller_engaged = 0; + } + gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, gpr_inf_future); + } + } + gpr_mu_unlock(&grpc_iomgr_mu); + + gpr_event_set(&g_backup_poller_done, (void *)1); +} + +void grpc_iomgr_init() { + gpr_thd_id backup_poller_id; + + if (evthread_use_threads() != 0) { + gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!"); + abort(); + } + + gpr_mu_init(&grpc_iomgr_mu); + gpr_cv_init(&grpc_iomgr_cv); + g_activation_queue = NULL; + g_num_pollers = 0; + g_num_fds = 0; + g_last_poll_completed = gpr_now(); + g_shutdown_backup_poller = 0; + g_fds_to_free = NULL; + + gpr_event_init(&g_backup_poller_done); + + g_event_base = NULL; + g_timeout_ev = NULL; + + g_event_base = event_base_new(); + if (!g_event_base) { + gpr_log(GPR_ERROR, "Failed to create the event base"); + abort(); + } + + if (evthread_make_base_notifiable(g_event_base) != 0) { + gpr_log(GPR_ERROR, "Couldn't make event base notifiable cross threads!"); + abort(); + } + + g_timeout_ev = evtimer_new(g_event_base, timer_callback, g_event_base); + + gpr_thd_new(&backup_poller_id, backup_poller_thread, NULL, NULL); +} + +void grpc_iomgr_shutdown() { + gpr_timespec fd_shutdown_deadline = + gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); + + /* broadcast shutdown */ + gpr_mu_lock(&grpc_iomgr_mu); + while (g_num_fds) { + gpr_log(GPR_INFO, + "waiting for %d fds to be destroyed before closing event manager", + g_num_fds); + if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, fd_shutdown_deadline)) { + gpr_log(GPR_ERROR, + "not all fds destroyed before shutdown deadline: memory leaks " + "are likely"); + break; + } else if (g_num_fds == 0) { + gpr_log(GPR_INFO, "all fds closed"); + } + } + + g_shutdown_backup_poller = 1; + gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_mu_unlock(&grpc_iomgr_mu); + + gpr_event_wait(&g_backup_poller_done, gpr_inf_future); + + /* drain pending work */ + gpr_mu_lock(&grpc_iomgr_mu); + while (maybe_do_queue_work()) + ; + gpr_mu_unlock(&grpc_iomgr_mu); + + free_fd_list(g_fds_to_free); + + /* complete shutdown */ + gpr_mu_destroy(&grpc_iomgr_mu); + gpr_cv_destroy(&grpc_iomgr_cv); + + if (g_timeout_ev != NULL) { + event_free(g_timeout_ev); + } + + if (g_event_base != NULL) { + event_base_free(g_event_base); + g_event_base = NULL; + } +} + +static void add_task(grpc_libevent_activation_data *adata) { + gpr_mu_lock(&grpc_iomgr_mu); + if (g_activation_queue) { + adata->next = g_activation_queue; + adata->prev = adata->next->prev; + adata->next->prev = adata->prev->next = adata; + } else { + g_activation_queue = adata; + adata->next = adata->prev = adata; + } + gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_mu_unlock(&grpc_iomgr_mu); +} + +/* ===============grpc_alarm implementation==================== */ + +/* The following function frees up the alarm's libevent structure and + should always be invoked just before calling the alarm's callback */ +static void alarm_ev_destroy(grpc_alarm *alarm) { + grpc_libevent_activation_data *adata = + &alarm->task.activation[GRPC_EM_TA_ONLY]; + if (adata->ev != NULL) { + /* TODO(klempner): Is this safe to do when we're cancelling? */ + event_free(adata->ev); + adata->ev = NULL; + } +} +/* Proxy callback triggered by alarm->ev to call alarm->cb */ +static void libevent_alarm_cb(int fd, short what, void *arg /*=alarm*/) { + grpc_alarm *alarm = arg; + grpc_libevent_activation_data *adata = + &alarm->task.activation[GRPC_EM_TA_ONLY]; + int trigger_old; + + /* First check if this alarm has been canceled, atomically */ + trigger_old = + gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT); + if (trigger_old == ALARM_TRIGGER_INIT) { + /* Before invoking user callback, destroy the libevent structure */ + alarm_ev_destroy(alarm); + adata->status = GRPC_CALLBACK_SUCCESS; + add_task(adata); + } +} + +void grpc_alarm_init(grpc_alarm *alarm, grpc_iomgr_cb_func alarm_cb, + void *alarm_cb_arg) { + grpc_libevent_activation_data *adata = + &alarm->task.activation[GRPC_EM_TA_ONLY]; + alarm->task.type = GRPC_EM_TASK_ALARM; + gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT); + adata->cb = alarm_cb; + adata->arg = alarm_cb_arg; + adata->prev = NULL; + adata->next = NULL; + adata->ev = NULL; +} + +int grpc_alarm_add(grpc_alarm *alarm, gpr_timespec deadline) { + grpc_libevent_activation_data *adata = + &alarm->task.activation[GRPC_EM_TA_ONLY]; + gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); + struct timeval delay = gpr_timeval_from_timespec(delay_timespec); + if (adata->ev) { + event_free(adata->ev); + gpr_log(GPR_INFO, "Adding an alarm that already has an event."); + adata->ev = NULL; + } + adata->ev = evtimer_new(g_event_base, libevent_alarm_cb, alarm); + /* Set the trigger field to untriggered. Do this as the last store since + it is a release of previous stores. */ + gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT); + + return adata->ev != NULL && evtimer_add(adata->ev, &delay) == 0; +} + +int grpc_alarm_cancel(grpc_alarm *alarm) { + grpc_libevent_activation_data *adata = + &alarm->task.activation[GRPC_EM_TA_ONLY]; + int trigger_old; + + /* First check if this alarm has been triggered, atomically */ + trigger_old = + gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT); + if (trigger_old == ALARM_TRIGGER_INIT) { + /* We need to make sure that we only invoke the callback if it hasn't + already been invoked */ + /* First remove this event from libevent. This returns success even if the + event has gone active or invoked its callback. */ + if (evtimer_del(adata->ev) != 0) { + /* The delete was unsuccessful for some reason. */ + gpr_log(GPR_ERROR, "Attempt to delete alarm event was unsuccessful"); + return 0; + } + /* Free up the event structure before invoking callback */ + alarm_ev_destroy(alarm); + adata->status = GRPC_CALLBACK_CANCELLED; + add_task(adata); + } + return 1; +} + +static void grpc_fd_impl_destroy(grpc_fd *impl) { + grpc_em_task_activity_type type; + grpc_libevent_activation_data *adata; + + for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) { + adata = &(impl->task.activation[type]); + GPR_ASSERT(adata->next == NULL); + if (adata->ev != NULL) { + event_free(adata->ev); + adata->ev = NULL; + } + } + + if (impl->shutdown_ev != NULL) { + event_free(impl->shutdown_ev); + impl->shutdown_ev = NULL; + } + gpr_mu_destroy(&impl->mu); + close(impl->fd); +} + +/* Proxy callback to call a gRPC read/write callback */ +static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) { + grpc_fd *em_fd = arg; + grpc_iomgr_cb_status status = GRPC_CALLBACK_SUCCESS; + int run_read_cb = 0; + int run_write_cb = 0; + grpc_libevent_activation_data *rdata, *wdata; + + gpr_mu_lock(&em_fd->mu); + if (em_fd->shutdown_started) { + status = GRPC_CALLBACK_CANCELLED; + } else if (status == GRPC_CALLBACK_SUCCESS && (what & EV_TIMEOUT)) { + status = GRPC_CALLBACK_TIMED_OUT; + /* TODO(klempner): This is broken if we are monitoring both read and write + events on the same fd -- generating a spurious event is okay, but + generating a spurious timeout is not. */ + what |= (EV_READ | EV_WRITE); + } + + if (what & EV_READ) { + switch (em_fd->read_state) { + case GRPC_FD_WAITING: + run_read_cb = 1; + em_fd->read_state = GRPC_FD_IDLE; + break; + case GRPC_FD_IDLE: + case GRPC_FD_CACHED: + em_fd->read_state = GRPC_FD_CACHED; + } + } + if (what & EV_WRITE) { + switch (em_fd->write_state) { + case GRPC_FD_WAITING: + run_write_cb = 1; + em_fd->write_state = GRPC_FD_IDLE; + break; + case GRPC_FD_IDLE: + case GRPC_FD_CACHED: + em_fd->write_state = GRPC_FD_CACHED; + } + } + + if (run_read_cb) { + rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]); + rdata->status = status; + add_task(rdata); + } else if (run_write_cb) { + wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]); + wdata->status = status; + add_task(wdata); + } + gpr_mu_unlock(&em_fd->mu); +} + +static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) { + /* TODO(klempner): This could just run directly in the calling thread, except + that libevent's handling of event_active() on an event which is already in + flight on a different thread is racy and easily triggers TSAN. + */ + grpc_fd *impl = arg; + gpr_mu_lock(&impl->mu); + impl->shutdown_started = 1; + if (impl->read_state == GRPC_FD_WAITING) { + event_active(impl->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1); + } + if (impl->write_state == GRPC_FD_WAITING) { + event_active(impl->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1); + } + gpr_mu_unlock(&impl->mu); +} + +grpc_fd *grpc_fd_create(int fd) { + int flags; + grpc_libevent_activation_data *rdata, *wdata; + grpc_fd *impl = gpr_malloc(sizeof(grpc_fd)); + + gpr_mu_lock(&grpc_iomgr_mu); + g_num_fds++; + gpr_mu_unlock(&grpc_iomgr_mu); + + impl->shutdown_ev = NULL; + gpr_mu_init(&impl->mu); + + flags = fcntl(fd, F_GETFL, 0); + GPR_ASSERT((flags & O_NONBLOCK) != 0); + + impl->task.type = GRPC_EM_TASK_FD; + impl->fd = fd; + + rdata = &(impl->task.activation[GRPC_EM_TA_READ]); + rdata->ev = NULL; + rdata->cb = NULL; + rdata->arg = NULL; + rdata->status = GRPC_CALLBACK_SUCCESS; + rdata->prev = NULL; + rdata->next = NULL; + + wdata = &(impl->task.activation[GRPC_EM_TA_WRITE]); + wdata->ev = NULL; + wdata->cb = NULL; + wdata->arg = NULL; + wdata->status = GRPC_CALLBACK_SUCCESS; + wdata->prev = NULL; + wdata->next = NULL; + + impl->read_state = GRPC_FD_IDLE; + impl->write_state = GRPC_FD_IDLE; + + impl->shutdown_started = 0; + impl->next = NULL; + + /* TODO(chenw): detect platforms where only level trigger is supported, + and set the event to non-persist. */ + rdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ, + em_fd_cb, impl); + GPR_ASSERT(rdata->ev); + + wdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE, + em_fd_cb, impl); + GPR_ASSERT(wdata->ev); + + impl->shutdown_ev = + event_new(g_event_base, -1, EV_READ, em_fd_shutdown_cb, impl); + GPR_ASSERT(impl->shutdown_ev); + + return impl; +} + +void grpc_fd_destroy(grpc_fd *impl) { + gpr_mu_lock(&grpc_iomgr_mu); + + if (g_num_pollers == 0) { + /* it is safe to simply free it */ + grpc_fd_impl_destroy(impl); + gpr_free(impl); + } else { + /* Put the impl on the list to be destroyed by the poller. */ + impl->next = g_fds_to_free; + g_fds_to_free = impl; + /* TODO(ctiller): kick the poller so it destroys this fd promptly + (currently we may wait up to a second) */ + } + + g_num_fds--; + gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_mu_unlock(&grpc_iomgr_mu); +} + +int grpc_fd_get(struct grpc_fd *em_fd) { return em_fd->fd; } + +/* TODO(chenw): should we enforce the contract that notify_on_read cannot be + called when the previously registered callback has not been called yet. */ +int grpc_fd_notify_on_read(grpc_fd *impl, grpc_iomgr_cb_func read_cb, + void *read_cb_arg, gpr_timespec deadline) { + int force_event = 0; + grpc_libevent_activation_data *rdata; + gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); + struct timeval delay = gpr_timeval_from_timespec(delay_timespec); + struct timeval *delayp = + gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; + + rdata = &impl->task.activation[GRPC_EM_TA_READ]; + + gpr_mu_lock(&impl->mu); + rdata->cb = read_cb; + rdata->arg = read_cb_arg; + + force_event = (impl->shutdown_started || impl->read_state == GRPC_FD_CACHED); + impl->read_state = GRPC_FD_WAITING; + + if (force_event) { + event_active(rdata->ev, EV_READ, 1); + } else if (event_add(rdata->ev, delayp) == -1) { + gpr_mu_unlock(&impl->mu); + return 0; + } + gpr_mu_unlock(&impl->mu); + return 1; +} + +int grpc_fd_notify_on_write(grpc_fd *impl, grpc_iomgr_cb_func write_cb, + void *write_cb_arg, gpr_timespec deadline) { + int force_event = 0; + grpc_libevent_activation_data *wdata; + gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); + struct timeval delay = gpr_timeval_from_timespec(delay_timespec); + struct timeval *delayp = + gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; + + wdata = &impl->task.activation[GRPC_EM_TA_WRITE]; + + gpr_mu_lock(&impl->mu); + wdata->cb = write_cb; + wdata->arg = write_cb_arg; + + force_event = (impl->shutdown_started || impl->write_state == GRPC_FD_CACHED); + impl->write_state = GRPC_FD_WAITING; + + if (force_event) { + event_active(wdata->ev, EV_WRITE, 1); + } else if (event_add(wdata->ev, delayp) == -1) { + gpr_mu_unlock(&impl->mu); + return 0; + } + gpr_mu_unlock(&impl->mu); + return 1; +} + +void grpc_fd_shutdown(grpc_fd *em_fd) { + event_active(em_fd->shutdown_ev, EV_READ, 1); +} + +/* Sometimes we want a followup callback: something to be added from the + current callback for the EM to invoke once this callback is complete. + This is implemented by inserting an entry into an EM queue. */ + +/* The following structure holds the field needed for adding the + followup callback. These are the argument for the followup callback, + the function to use for the followup callback, and the + activation data pointer used for the queues (to free in the CB) */ +struct followup_callback_arg { + grpc_iomgr_cb_func func; + void *cb_arg; + grpc_libevent_activation_data adata; +}; + +static void followup_proxy_callback(void *cb_arg, grpc_iomgr_cb_status status) { + struct followup_callback_arg *fcb_arg = cb_arg; + /* Invoke the function */ + fcb_arg->func(fcb_arg->cb_arg, status); + gpr_free(fcb_arg); +} + +void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) { + grpc_libevent_activation_data *adptr; + struct followup_callback_arg *fcb_arg; + + fcb_arg = gpr_malloc(sizeof(*fcb_arg)); + /* Set up the activation data and followup callback argument structures */ + adptr = &fcb_arg->adata; + adptr->ev = NULL; + adptr->cb = followup_proxy_callback; + adptr->arg = fcb_arg; + adptr->status = GRPC_CALLBACK_SUCCESS; + adptr->prev = NULL; + adptr->next = NULL; + + fcb_arg->func = cb; + fcb_arg->cb_arg = cb_arg; + + /* Insert an activation data for the specified em */ + add_task(adptr); +} diff --git a/src/core/iomgr/iomgr_libevent.h b/src/core/iomgr/iomgr_libevent.h new file mode 100644 index 0000000000..77e7b59989 --- /dev/null +++ b/src/core/iomgr/iomgr_libevent.h @@ -0,0 +1,207 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ +#define __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ + +#include "src/core/iomgr/iomgr.h" +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +typedef struct grpc_fd grpc_fd; + +/* gRPC event manager task "base class". This is pretend-inheritance in C89. + This should be the first member of any actual grpc_em task type. + + Memory warning: expanding this will increase memory usage in any derived + class, so be careful. + + For generality, this base can be on multiple task queues and can have + multiple event callbacks registered. Not all "derived classes" will use + this feature. */ + +typedef enum grpc_libevent_task_type { + GRPC_EM_TASK_ALARM, + GRPC_EM_TASK_FD, + GRPC_EM_TASK_DO_NOT_USE +} grpc_libevent_task_type; + +/* Different activity types to shape the callback and queueing arrays */ +typedef enum grpc_em_task_activity_type { + GRPC_EM_TA_READ, /* use this also for single-type events */ + GRPC_EM_TA_WRITE, + GRPC_EM_TA_COUNT +} grpc_em_task_activity_type; + +/* Include the following #define for convenience for tasks like alarms that + only have a single type */ +#define GRPC_EM_TA_ONLY GRPC_EM_TA_READ + +typedef struct grpc_libevent_activation_data { + struct event *ev; /* event activated on this callback type */ + grpc_iomgr_cb_func cb; /* function pointer for callback */ + void *arg; /* argument passed to cb */ + + /* Hold the status associated with the callback when queued */ + grpc_iomgr_cb_status status; + /* Now set up to link activations into scheduler queues */ + struct grpc_libevent_activation_data *prev; + struct grpc_libevent_activation_data *next; +} grpc_libevent_activation_data; + +typedef struct grpc_libevent_task { + grpc_libevent_task_type type; + + /* Now have an array of activation data elements: one for each activity + type that could get activated */ + grpc_libevent_activation_data activation[GRPC_EM_TA_COUNT]; +} grpc_libevent_task; + +/* Initialize *em_fd. + Requires fd is a non-blocking file descriptor. + + This takes ownership of closing fd. + + Requires: *em_fd uninitialized. fd is a non-blocking file descriptor. */ +grpc_fd *grpc_fd_create(int fd); + +/* Cause *em_fd no longer to be initialized and closes the underlying fd. + Requires: *em_fd initialized; no outstanding notify_on_read or + notify_on_write. */ +void grpc_fd_destroy(grpc_fd *em_fd); + +/* Returns the file descriptor associated with *em_fd. */ +int grpc_fd_get(grpc_fd *em_fd); + +/* Register read interest, causing read_cb to be called once when em_fd becomes + readable, on deadline specified by deadline, or on shutdown triggered by + grpc_fd_shutdown. + read_cb will be called with read_cb_arg when *em_fd becomes readable. + read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable, + GRPC_CALLBACK_TIMED_OUT if the call timed out, + and CANCELLED if the call was cancelled. + + Requires:This method must not be called before the read_cb for any previous + call runs. Edge triggered events are used whenever they are supported by the + underlying platform. This means that users must drain em_fd in read_cb before + calling notify_on_read again. Users are also expected to handle spurious + events, i.e read_cb is called while nothing can be readable from em_fd */ +int grpc_fd_notify_on_read(grpc_fd *em_fd, grpc_iomgr_cb_func read_cb, + void *read_cb_arg, gpr_timespec deadline); + +/* Exactly the same semantics as above, except based on writable events. */ +int grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, + void *write_cb_arg, gpr_timespec deadline); + +/* Cause any current and all future read/write callbacks to error out with + GRPC_CALLBACK_CANCELLED. */ +void grpc_fd_shutdown(grpc_fd *em_fd); + +/* =================== Event caching =================== + In order to not miss or double-return edges in the context of edge triggering + and multithreading, we need a per-fd caching layer in the eventmanager itself + to cache relevant events. + + There are two types of events we care about: calls to notify_on_[read|write] + and readable/writable events for the socket from eventfd. There are separate + event caches for read and write. + + There are three states: + 0. "waiting" -- There's been a call to notify_on_[read|write] which has not + had a corresponding event. In other words, we're waiting for an event so we + can run the callback. + 1. "idle" -- We are neither waiting nor have a cached event. + 2. "cached" -- There has been a read/write event without a waiting callback, + so we want to run the event next time the application calls + notify_on_[read|write]. + + The high level state diagram: + + +--------------------------------------------------------------------+ + | WAITING | IDLE | CACHED | + | | | | + | 1. --*-> 2. --+-> 3. --+\ + | | | <--+/ + | | | | + x+-- 6. 5. <-+-- 4. <-*-- | + | | | | + +--------------------------------------------------------------------+ + + Transitions right occur on read|write events. Transitions left occur on + notify_on_[read|write] events. + State transitions: + 1. Read|Write event while waiting -> run the callback and transition to idle. + 2. Read|Write event while idle -> transition to cached. + 3. Read|Write event with one already cached -> still cached. + 4. notify_on_[read|write] with event cached: run callback and transition to + idle. + 5. notify_on_[read|write] when idle: Store callback and transition to + waiting. + 6. notify_on_[read|write] when waiting: invalid. */ + +typedef enum grpc_fd_state { + GRPC_FD_WAITING = 0, + GRPC_FD_IDLE = 1, + GRPC_FD_CACHED = 2 +} grpc_fd_state; + +/* gRPC file descriptor handle. + The handle is used to register read/write callbacks to a file descriptor */ +struct grpc_fd { + grpc_libevent_task task; /* Base class, callbacks, queues, etc */ + int fd; /* File descriptor */ + + /* Note that the shutdown event is only needed as a workaround for libevent + not properly handling event_active on an in flight event. */ + struct event *shutdown_ev; /* activated to trigger shutdown */ + + /* protect shutdown_started|read_state|write_state and ensure barriers + between notify_on_[read|write] and read|write callbacks */ + gpr_mu mu; + int shutdown_started; /* 0 -> shutdown not started, 1 -> started */ + grpc_fd_state read_state; + grpc_fd_state write_state; + + /* descriptor delete list. These are destroyed during polling. */ + struct grpc_fd *next; +}; + +/* gRPC alarm handle. + The handle is used to add an alarm which expires after specified timeout. */ +struct grpc_alarm { + grpc_libevent_task task; /* Include the base class */ + + gpr_atm triggered; /* To be used atomically if alarm triggered */ +}; + +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */ diff --git a/src/core/iomgr/iomgr_libevent_use_threads.c b/src/core/iomgr/iomgr_libevent_use_threads.c new file mode 100644 index 0000000000..af449342f0 --- /dev/null +++ b/src/core/iomgr/iomgr_libevent_use_threads.c @@ -0,0 +1,56 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Posix grpc event manager support code. */ +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <event2/thread.h> + +static int error_code = 0; +static gpr_once threads_once = GPR_ONCE_INIT; +static void evthread_threads_initialize(void) { + error_code = evthread_use_pthreads(); + if (error_code) { + gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!"); + } +} + +/* Notify LibEvent that Posix pthread is used. */ +int evthread_use_threads() { + gpr_once_init(&threads_once, &evthread_threads_initialize); + /* For Pthreads or Windows threads, Libevent provides simple APIs to set + mutexes and conditional variables to support cross thread operations. + For other platforms, LibEvent provide callback APIs to hook mutexes and + conditional variables. */ + return error_code; +} diff --git a/src/core/iomgr/resolve_address.h b/src/core/iomgr/resolve_address.h new file mode 100644 index 0000000000..37ec0f0335 --- /dev/null +++ b/src/core/iomgr/resolve_address.h @@ -0,0 +1,67 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_RESOLVE_ADDRESS_H__ +#define __GRPC_INTERNAL_IOMGR_RESOLVE_ADDRESS_H__ + +#include <sys/socket.h> + +typedef struct { + struct sockaddr_storage addr; + int len; +} grpc_resolved_address; + +typedef struct { + size_t naddrs; + grpc_resolved_address *addrs; +} grpc_resolved_addresses; + +/* Async result callback: + On success: addresses is the result, and the callee must call + grpc_resolved_addresses_destroy when it's done with them + On failure: addresses is NULL */ +typedef void (*grpc_resolve_cb)(void *arg, grpc_resolved_addresses *addresses); +/* Asynchronously resolve addr. Use default_port if a port isn't designated + in addr, otherwise use the port in addr. */ +/* TODO(ctiller): add a timeout here */ +void grpc_resolve_address(const char *addr, const char *default_port, + grpc_resolve_cb cb, void *arg); +/* Destroy resolved addresses */ +void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses); + +/* Resolve addr in a blocking fashion. Returns NULL on failure. On success, + result must be freed with grpc_resolved_addresses_destroy. */ +grpc_resolved_addresses *grpc_blocking_resolve_address( + const char *addr, const char *default_port); + +#endif /* __GRPC_INTERNAL_IOMGR_RESOLVE_ADDRESS_H__ */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c new file mode 100644 index 0000000000..d3ea3780ce --- /dev/null +++ b/src/core/iomgr/resolve_address_posix.c @@ -0,0 +1,215 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#define _POSIX_SOURCE + +#include "src/core/iomgr/resolve_address.h" + +#include <sys/types.h> +#include <sys/socket.h> +#include <netdb.h> +#include <unistd.h> +#include <string.h> + +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> + +typedef struct { + char *name; + char *default_port; + grpc_resolve_cb cb; + void *arg; +} request; + +static void split_host_port(const char *name, char **host, char **port) { + const char *host_start; + size_t host_len; + const char *port_start; + + *host = NULL; + *port = NULL; + + if (name[0] == '[') { + /* Parse a bracketed host, typically an IPv6 literal. */ + const char *rbracket = strchr(name, ']'); + if (rbracket == NULL) { + /* Unmatched [ */ + return; + } + if (rbracket[1] == '\0') { + /* ]<end> */ + port_start = NULL; + } else if (rbracket[1] == ':') { + /* ]:<port?> */ + port_start = rbracket + 2; + } else { + /* ]<invalid> */ + return; + } + host_start = name + 1; + host_len = rbracket - host_start; + if (memchr(host_start, ':', host_len) == NULL) { + /* Require all bracketed hosts to contain a colon, because a hostname or + IPv4 address should never use brackets. */ + return; + } + } else { + const char *colon = strchr(name, ':'); + if (colon != NULL && strchr(colon + 1, ':') == NULL) { + /* Exactly 1 colon. Split into host:port. */ + host_start = name; + host_len = colon - name; + port_start = colon + 1; + } else { + /* 0 or 2+ colons. Bare hostname or IPv6 litearal. */ + host_start = name; + host_len = strlen(name); + port_start = NULL; + } + } + + /* Allocate return values. */ + *host = gpr_malloc(host_len + 1); + memcpy(*host, host_start, host_len); + (*host)[host_len] = '\0'; + + if (port_start != NULL) { + *port = gpr_strdup(port_start); + } +} + +grpc_resolved_addresses *grpc_blocking_resolve_address( + const char *name, const char *default_port) { + struct addrinfo hints; + struct addrinfo *result = NULL, *resp; + char *host; + char *port; + int s; + size_t i; + grpc_resolved_addresses *addrs = NULL; + const gpr_timespec start_time = gpr_now(); + + /* parse name, splitting it into host and port parts */ + split_host_port(name, &host, &port); + if (host == NULL) { + gpr_log(GPR_ERROR, "unparseable host:port: '%s'", name); + goto done; + } + if (port == NULL) { + if (default_port == NULL) { + gpr_log(GPR_ERROR, "no port in name '%s'", name); + goto done; + } + port = gpr_strdup(default_port); + } + + /* Call getaddrinfo */ + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; /* ipv4 or ipv6 */ + hints.ai_socktype = SOCK_STREAM; /* stream socket */ + hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ + + s = getaddrinfo(host, port, &hints, &result); + if (s != 0) { + gpr_log(GPR_ERROR, "getaddrinfo: %s", gai_strerror(s)); + goto done; + } + + /* Success path: set addrs non-NULL, fill it in */ + addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); + addrs->naddrs = 0; + for (resp = result; resp != NULL; resp = resp->ai_next) { + addrs->naddrs++; + } + addrs->addrs = gpr_malloc(sizeof(grpc_resolved_address) * addrs->naddrs); + i = 0; + for (resp = result; resp != NULL; resp = resp->ai_next) { + memcpy(&addrs->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); + addrs->addrs[i].len = resp->ai_addrlen; + i++; + } + + /* Temporary logging, to help identify flakiness in dualstack_socket_test. */ + { + const gpr_timespec delay = gpr_time_sub(gpr_now(), start_time); + const int delay_ms = + delay.tv_sec * GPR_MS_PER_SEC + delay.tv_nsec / GPR_NS_PER_MS; + gpr_log(GPR_INFO, "logspam: getaddrinfo(%s, %s) resolved %d addrs in %dms:", + host, port, addrs->naddrs, delay_ms); + for (i = 0; i < addrs->naddrs; i++) { + char *buf; + grpc_sockaddr_to_string(&buf, (struct sockaddr *)&addrs->addrs[i].addr, + 0); + gpr_log(GPR_INFO, "logspam: [%d] %s", i, buf); + gpr_free(buf); + } + } + +done: + gpr_free(host); + gpr_free(port); + if (result) { + freeaddrinfo(result); + } + return addrs; +} + +/* Thread function to asynch-ify grpc_blocking_resolve_address */ +static void do_request(void *rp) { + request *r = rp; + r->cb(r->arg, grpc_blocking_resolve_address(r->name, r->default_port)); + gpr_free(r->name); + gpr_free(r->default_port); + gpr_free(r); +} + +void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { + gpr_free(addrs->addrs); + gpr_free(addrs); +} + +void grpc_resolve_address(const char *name, const char *default_port, + grpc_resolve_cb cb, void *arg) { + request *r = gpr_malloc(sizeof(request)); + gpr_thd_id id; + r->name = gpr_strdup(name); + r->default_port = gpr_strdup(default_port); + r->cb = cb; + r->arg = arg; + gpr_thd_new(&id, do_request, r, NULL); +} diff --git a/src/core/iomgr/sockaddr.h b/src/core/iomgr/sockaddr.h new file mode 100644 index 0000000000..b980b3029f --- /dev/null +++ b/src/core/iomgr/sockaddr.h @@ -0,0 +1,47 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_SOCKADDR_H_ +#define __GRPC_INTERNAL_IOMGR_SOCKADDR_H_ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_WIN32 +#include "src/core/iomgr/sockaddr_win32.h" +#endif + +#ifdef GPR_POSIX_SOCKETADDR +#include "src/core/iomgr/sockaddr_posix.h" +#endif + +#endif /* __GRPC_INTERNAL_IOMGR_SOCKADDR_H_ */ diff --git a/src/core/iomgr/sockaddr_posix.h b/src/core/iomgr/sockaddr_posix.h new file mode 100644 index 0000000000..79ef3ca3cf --- /dev/null +++ b/src/core/iomgr/sockaddr_posix.h @@ -0,0 +1,40 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_SOCKADDR_POSIX_H_ +#define __GRPC_INTERNAL_IOMGR_SOCKADDR_POSIX_H_ + +#include <sys/socket.h> +#include <netinet/in.h> + +#endif /* __GRPC_INTERNAL_IOMGR_SOCKADDR_POSIX_H_ */ diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c new file mode 100644 index 0000000000..f709d35162 --- /dev/null +++ b/src/core/iomgr/sockaddr_utils.c @@ -0,0 +1,155 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/sockaddr_utils.h" + +#include <arpa/inet.h> +#include <errno.h> +#include <string.h> + +#include <grpc/support/host_port.h> +#include <grpc/support/string.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> + +static const gpr_uint8 kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0xff, 0xff}; + +int grpc_sockaddr_is_v4mapped(const struct sockaddr *addr, + struct sockaddr_in *addr4_out) { + GPR_ASSERT(addr != (struct sockaddr *)addr4_out); + if (addr->sa_family == AF_INET6) { + const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr; + if (memcmp(addr6->sin6_addr.s6_addr, kV4MappedPrefix, + sizeof(kV4MappedPrefix)) == 0) { + if (addr4_out != NULL) { + /* Normalize ::ffff:0.0.0.0/96 to IPv4. */ + memset(addr4_out, 0, sizeof(*addr4_out)); + addr4_out->sin_family = AF_INET; + /* s6_addr32 would be nice, but it's non-standard. */ + memcpy(&addr4_out->sin_addr, &addr6->sin6_addr.s6_addr[12], 4); + addr4_out->sin_port = addr6->sin6_port; + } + return 1; + } + } + return 0; +} + +int grpc_sockaddr_to_v4mapped(const struct sockaddr *addr, + struct sockaddr_in6 *addr6_out) { + GPR_ASSERT(addr != (struct sockaddr *)addr6_out); + if (addr->sa_family == AF_INET) { + const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr; + memset(addr6_out, 0, sizeof(*addr6_out)); + addr6_out->sin6_family = AF_INET6; + memcpy(&addr6_out->sin6_addr.s6_addr[0], kV4MappedPrefix, 12); + memcpy(&addr6_out->sin6_addr.s6_addr[12], &addr4->sin_addr, 4); + addr6_out->sin6_port = addr4->sin_port; + return 1; + } + return 0; +} + +int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out) { + struct sockaddr_in addr4_normalized; + if (grpc_sockaddr_is_v4mapped(addr, &addr4_normalized)) { + addr = (struct sockaddr *)&addr4_normalized; + } + if (addr->sa_family == AF_INET) { + /* Check for 0.0.0.0 */ + const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr; + if (addr4->sin_addr.s_addr != 0) { + return 0; + } + *port_out = ntohs(addr4->sin_port); + return 1; + } else if (addr->sa_family == AF_INET6) { + /* Check for :: */ + const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr; + int i; + for (i = 0; i < 16; i++) { + if (addr6->sin6_addr.s6_addr[i] != 0) { + return 0; + } + } + *port_out = ntohs(addr6->sin6_port); + return 1; + } else { + return 0; + } +} + +void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out, + struct sockaddr_in6 *wild6_out) { + memset(wild4_out, 0, sizeof(*wild4_out)); + wild4_out->sin_family = AF_INET; + wild4_out->sin_port = htons(port); + + memset(wild6_out, 0, sizeof(*wild6_out)); + wild6_out->sin6_family = AF_INET6; + wild6_out->sin6_port = htons(port); +} + +int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, + int normalize) { + const int save_errno = errno; + struct sockaddr_in addr_normalized; + char ntop_buf[INET6_ADDRSTRLEN]; + const void *ip = NULL; + int port; + int ret; + + *out = NULL; + if (normalize && grpc_sockaddr_is_v4mapped(addr, &addr_normalized)) { + addr = (const struct sockaddr *)&addr_normalized; + } + if (addr->sa_family == AF_INET) { + const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr; + ip = &addr4->sin_addr; + port = ntohs(addr4->sin_port); + } else if (addr->sa_family == AF_INET6) { + const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr; + ip = &addr6->sin6_addr; + port = ntohs(addr6->sin6_port); + } + if (ip != NULL && + inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != NULL) { + ret = gpr_join_host_port(out, ntop_buf, port); + } else { + ret = gpr_asprintf(out, "(sockaddr family=%d)", addr->sa_family); + } + /* This is probably redundant, but we wouldn't want to log the wrong error. */ + errno = save_errno; + return ret; +} diff --git a/src/core/iomgr/sockaddr_utils.h b/src/core/iomgr/sockaddr_utils.h new file mode 100644 index 0000000000..753d0c824a --- /dev/null +++ b/src/core/iomgr/sockaddr_utils.h @@ -0,0 +1,75 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_SOCKADDR_UTILS_H__ +#define __GRPC_INTERNAL_IOMGR_SOCKADDR_UTILS_H__ + +#include "src/core/iomgr/sockaddr.h" + +/* Returns true if addr is an IPv4-mapped IPv6 address within the + ::ffff:0.0.0.0/96 range, or false otherwise. + + If addr4_out is non-NULL, the inner IPv4 address will be copied here when + returning true. */ +int grpc_sockaddr_is_v4mapped(const struct sockaddr *addr, + struct sockaddr_in *addr4_out); + +/* If addr is an AF_INET address, writes the corresponding ::ffff:0.0.0.0/96 + address to addr6_out and returns true. Otherwise returns false. */ +int grpc_sockaddr_to_v4mapped(const struct sockaddr *addr, + struct sockaddr_in6 *addr6_out); + +/* If addr is ::, 0.0.0.0, or ::ffff:0.0.0.0, writes the port number to + *port_out (if not NULL) and returns true, otherwise returns false. */ +int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out); + +/* Writes 0.0.0.0:port and [::]:port to separate sockaddrs. */ +void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out, + struct sockaddr_in6 *wild6_out); + +/* Converts a sockaddr into a newly-allocated human-readable string. + + Currently, only the AF_INET and AF_INET6 families are recognized. + If the normalize flag is enabled, ::ffff:0.0.0.0/96 IPv6 addresses are + displayed as plain IPv4. + + Usage is similar to gpr_asprintf: returns the number of bytes written + (excluding the final '\0'), and *out points to a string which must later be + destroyed using gpr_free(). + + In the unlikely event of an error, returns -1 and sets *out to NULL. + The existing value of errno is always preserved. */ +int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, + int normalize); + +#endif /* __GRPC_INTERNAL_IOMGR_SOCKADDR_UTILS_H__ */ diff --git a/src/core/iomgr/sockaddr_win32.h b/src/core/iomgr/sockaddr_win32.h new file mode 100644 index 0000000000..751ac3d2e7 --- /dev/null +++ b/src/core/iomgr/sockaddr_win32.h @@ -0,0 +1,37 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_ +#define __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_ + +#endif // __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_ diff --git a/src/core/iomgr/socket_utils_common_posix.c b/src/core/iomgr/socket_utils_common_posix.c new file mode 100644 index 0000000000..0767d6f918 --- /dev/null +++ b/src/core/iomgr/socket_utils_common_posix.c @@ -0,0 +1,154 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/socket_utils_posix.h" + +#include <arpa/inet.h> +#include <limits.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <stdio.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> +#include <string.h> +#include <errno.h> + +#include "src/core/iomgr/sockaddr_utils.h" +#include <grpc/support/host_port.h> +#include <grpc/support/string.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> + +/* set a socket to non blocking mode */ +int grpc_set_socket_nonblocking(int fd, int non_blocking) { + int oldflags = fcntl(fd, F_GETFL, 0); + if (oldflags < 0) { + return 0; + } + + if (non_blocking) { + oldflags |= O_NONBLOCK; + } else { + oldflags &= ~O_NONBLOCK; + } + + if (fcntl(fd, F_SETFL, oldflags) != 0) { + return 0; + } + + return 1; +} + +/* set a socket to close on exec */ +int grpc_set_socket_cloexec(int fd, int close_on_exec) { + int oldflags = fcntl(fd, F_GETFD, 0); + if (oldflags < 0) { + return 0; + } + + if (close_on_exec) { + oldflags |= FD_CLOEXEC; + } else { + oldflags &= ~FD_CLOEXEC; + } + + if (fcntl(fd, F_SETFD, oldflags) != 0) { + return 0; + } + + return 1; +} + +/* set a socket to reuse old addresses */ +int grpc_set_socket_reuse_addr(int fd, int reuse) { + int val = (reuse != 0); + int newval; + socklen_t intlen = sizeof(newval); + return 0 == setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) && + 0 == getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen) && + newval == val; +} + +/* disable nagle */ +int grpc_set_socket_low_latency(int fd, int low_latency) { + int val = (low_latency != 0); + int newval; + socklen_t intlen = sizeof(newval); + return 0 == setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) && + 0 == getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen) && + newval == val; +} + +/* This should be 0 in production, but it may be enabled for testing or + debugging purposes, to simulate an environment where IPv6 sockets can't + also speak IPv4. */ +int grpc_forbid_dualstack_sockets_for_testing = 0; + +static int set_socket_dualstack(int fd) { + if (!grpc_forbid_dualstack_sockets_for_testing) { + const int off = 0; + return 0 == setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof(off)); + } else { + /* Force an IPv6-only socket, for testing purposes. */ + const int on = 1; + setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)); + return 0; + } +} + +int grpc_create_dualstack_socket(const struct sockaddr *addr, int type, + int protocol, grpc_dualstack_mode *dsmode) { + int family = addr->sa_family; + if (family == AF_INET6) { + int fd = socket(family, type, protocol); + /* Check if we've got a valid dualstack socket. */ + if (fd >= 0 && set_socket_dualstack(fd)) { + *dsmode = GRPC_DSMODE_DUALSTACK; + return fd; + } + /* If this isn't an IPv4 address, then return whatever we've got. */ + if (!grpc_sockaddr_is_v4mapped(addr, NULL)) { + *dsmode = GRPC_DSMODE_IPV6; + return fd; + } + /* Fall back to AF_INET. */ + if (fd >= 0) { + close(fd); + } + family = AF_INET; + } + *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE; + return socket(family, type, protocol); +} diff --git a/src/core/iomgr/socket_utils_linux.c b/src/core/iomgr/socket_utils_linux.c new file mode 100644 index 0000000000..f971cb33bc --- /dev/null +++ b/src/core/iomgr/socket_utils_linux.c @@ -0,0 +1,52 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#define _GNU_SOURCE +#include <grpc/support/port_platform.h> + +#ifdef GPR_LINUX + +#include "src/core/iomgr/socket_utils_posix.h" + +#include <sys/types.h> +#include <sys/socket.h> + +int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, + int nonblock, int cloexec) { + int flags = 0; + flags |= nonblock ? SOCK_NONBLOCK : 0; + flags |= cloexec ? SOCK_CLOEXEC : 0; + return accept4(sockfd, addr, addrlen, flags); +} + +#endif diff --git a/src/core/iomgr/socket_utils_posix.c b/src/core/iomgr/socket_utils_posix.c new file mode 100644 index 0000000000..262d606af9 --- /dev/null +++ b/src/core/iomgr/socket_utils_posix.c @@ -0,0 +1,61 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_SOCKETUTILS + +#define _BSD_SOURCE +#include "src/core/endpoint/socket_utils.h" + +#include <fcntl.h> +#include <sys/socket.h> +#include <unistd.h> + +#include <grpc/support/log.h> + +int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, + int nonblock, int cloexec) { + int fd, flags; + + fd = accept(sockfd, addr, addrlen); + if (fd >= 0) { + flags = fcntl(fd, F_GETFL, 0); + flags |= nonblock ? O_NONBLOCK : 0; + flags |= cloexec ? FD_CLOEXEC : 0; + GPR_ASSERT(fcntl(fd, F_SETFL, flags) == 0); + } + return fd; +} + +#endif /* GPR_POSIX_SOCKETUTILS */ diff --git a/src/core/iomgr/socket_utils_posix.h b/src/core/iomgr/socket_utils_posix.h new file mode 100644 index 0000000000..5c31e5e6d8 --- /dev/null +++ b/src/core/iomgr/socket_utils_posix.h @@ -0,0 +1,98 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_SOCKET_UTILS_POSIX_H__ +#define __GRPC_INTERNAL_IOMGR_SOCKET_UTILS_POSIX_H__ + +#include <unistd.h> +#include <sys/socket.h> + +/* a wrapper for accept or accept4 */ +int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, + int nonblock, int cloexec); + +/* set a socket to non blocking mode */ +int grpc_set_socket_nonblocking(int fd, int non_blocking); + +/* set a socket to close on exec */ +int grpc_set_socket_cloexec(int fd, int close_on_exec); + +/* set a socket to reuse old addresses */ +int grpc_set_socket_reuse_addr(int fd, int reuse); + +/* disable nagle */ +int grpc_set_socket_low_latency(int fd, int low_latency); + +/* An enum to keep track of IPv4/IPv6 socket modes. + + Currently, this information is only used when a socket is first created, but + in the future we may wish to store it alongside the fd. This would let calls + like sendto() know which family to use without asking the kernel first. */ +typedef enum grpc_dualstack_mode { + /* Uninitialized, or a non-IP socket. */ + GRPC_DSMODE_NONE, + /* AF_INET only. */ + GRPC_DSMODE_IPV4, + /* AF_INET6 only, because IPV6_V6ONLY could not be cleared. */ + GRPC_DSMODE_IPV6, + /* AF_INET6, which also supports ::ffff-mapped IPv4 addresses. */ + GRPC_DSMODE_DUALSTACK +} grpc_dualstack_mode; + +/* Only tests should use this flag. */ +extern int grpc_forbid_dualstack_sockets_for_testing; + +/* Creates a new socket for connecting to (or listening on) an address. + + If addr is AF_INET6, this creates an IPv6 socket first. If that fails, + and addr is within ::ffff:0.0.0.0/96, then it automatically falls back to + an IPv4 socket. + + If addr is AF_INET, AF_UNIX, or anything else, then this is similar to + calling socket() directly. + + Returns an fd on success, otherwise returns -1 with errno set to the result + of a failed socket() call. + + The *dsmode output indicates which address family was actually created. + The recommended way to use this is: + - First convert to IPv6 using grpc_sockaddr_to_v4mapped(). + - Create the socket. + - If *dsmode is IPV4, use grpc_sockaddr_is_v4mapped() to convert back to + IPv4, so that bind() or connect() see the correct family. + Also, it's important to distinguish between DUALSTACK and IPV6 when + listening on the [::] wildcard address. */ +int grpc_create_dualstack_socket(const struct sockaddr *addr, int type, + int protocol, grpc_dualstack_mode *dsmode); + +#endif /* __GRPC_INTERNAL_IOMGR_SOCKET_UTILS_POSIX_H__ */ diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h new file mode 100644 index 0000000000..a4632d81cf --- /dev/null +++ b/src/core/iomgr/tcp_client.h @@ -0,0 +1,48 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_TCP_CLIENT_H__ +#define __GRPC_INTERNAL_IOMGR_TCP_CLIENT_H__ + +#include "src/core/endpoint/endpoint.h" +#include "src/core/iomgr/sockaddr.h" +#include <grpc/support/time.h> + +/* Asynchronously connect to an address (specified as (addr, len)), and call + cb with arg and the completed connection when done (or call cb with arg and + NULL on failure) */ +void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), + void *arg, const struct sockaddr *addr, + int addr_len, gpr_timespec deadline); + +#endif /* __GRPC_INTERNAL_IOMGR_TCP_CLIENT_H__ */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c new file mode 100644 index 0000000000..8d2d7ab081 --- /dev/null +++ b/src/core/iomgr/tcp_client_posix.c @@ -0,0 +1,189 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/tcp_client.h" + +#include <errno.h> +#include <netinet/in.h> +#include <string.h> +#include <unistd.h> + +#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/iomgr/tcp_posix.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> + +typedef struct { + void (*cb)(void *arg, grpc_endpoint *tcp); + void *cb_arg; + grpc_fd *fd; + gpr_timespec deadline; +} async_connect; + +static int prepare_socket(int fd) { + if (fd < 0) { + goto error; + } + + if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) || + !grpc_set_socket_low_latency(fd, 1)) { + gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, + strerror(errno)); + goto error; + } + + return 1; + +error: + if (fd >= 0) { + close(fd); + } + return 0; +} + +static void on_writable(void *acp, grpc_iomgr_cb_status status) { + async_connect *ac = acp; + int so_error = 0; + socklen_t so_error_size; + int err; + int fd = grpc_fd_get(ac->fd); + + if (status == GRPC_CALLBACK_SUCCESS) { + do { + so_error_size = sizeof(so_error); + err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); + } while (err < 0 && errno == EINTR); + if (err < 0) { + gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno)); + goto error; + } else if (so_error != 0) { + if (so_error == ENOBUFS) { + /* We will get one of these errors if we have run out of + memory in the kernel for the data structures allocated + when you connect a socket. If this happens it is very + likely that if we wait a little bit then try again the + connection will work (since other programs or this + program will close their network connections and free up + memory). This does _not_ indicate that there is anything + wrong with the server we are connecting to, this is a + local problem. + + If you are looking at this code, then chances are that + your program or another program on the same computer + opened too many network connections. The "easy" fix: + don't do that! */ + gpr_log(GPR_ERROR, "kernel out of buffers"); + grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline); + return; + } else { + goto error; + } + } else { + goto great_success; + } + } else { + gpr_log(GPR_ERROR, "on_writable failed during connect: status=%d", status); + goto error; + } + + abort(); + +error: + ac->cb(ac->cb_arg, NULL); + grpc_fd_destroy(ac->fd); + gpr_free(ac); + return; + +great_success: + ac->cb(ac->cb_arg, grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + gpr_free(ac); +} + +void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), + void *arg, const struct sockaddr *addr, + int addr_len, gpr_timespec deadline) { + int fd; + grpc_dualstack_mode dsmode; + int err; + async_connect *ac; + struct sockaddr_in6 addr6_v4mapped; + struct sockaddr_in addr4_copy; + + /* Use dualstack sockets where available. */ + if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { + addr = (const struct sockaddr *)&addr6_v4mapped; + addr_len = sizeof(addr6_v4mapped); + } + + fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); + if (fd < 0) { + gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); + } + if (dsmode == GRPC_DSMODE_IPV4) { + /* If we got an AF_INET socket, map the address back to IPv4. */ + GPR_ASSERT(grpc_sockaddr_is_v4mapped(addr, &addr4_copy)); + addr = (struct sockaddr *)&addr4_copy; + addr_len = sizeof(addr4_copy); + } + if (!prepare_socket(fd)) { + cb(arg, NULL); + return; + } + + do { + err = connect(fd, addr, addr_len); + } while (err < 0 && errno == EINTR); + + if (err >= 0) { + cb(arg, + grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + return; + } + + if (errno != EWOULDBLOCK && errno != EINPROGRESS) { + gpr_log(GPR_ERROR, "connect error: %s", strerror(errno)); + close(fd); + cb(arg, NULL); + return; + } + + ac = gpr_malloc(sizeof(async_connect)); + ac->cb = cb; + ac->cb_arg = arg; + ac->deadline = deadline; + ac->fd = grpc_fd_create(fd); + grpc_fd_notify_on_write(ac->fd, on_writable, ac, deadline); +} diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c new file mode 100644 index 0000000000..8f63f75612 --- /dev/null +++ b/src/core/iomgr/tcp_posix.c @@ -0,0 +1,561 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/tcp_posix.h" + +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice.h> +#include <grpc/support/string.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +/* Holds a slice array and associated state. */ +typedef struct grpc_tcp_slice_state { + gpr_slice *slices; /* Array of slices */ + size_t nslices; /* Size of slices array. */ + ssize_t first_slice; /* First valid slice in array */ + ssize_t last_slice; /* Last valid slice in array */ + gpr_slice working_slice; /* pointer to original final slice */ + int working_slice_valid; /* True if there is a working slice */ + int memory_owned; /* True if slices array is owned */ +} grpc_tcp_slice_state; + +static void slice_state_init(grpc_tcp_slice_state *state, gpr_slice *slices, + size_t nslices, size_t valid_slices) { + state->slices = slices; + state->nslices = nslices; + if (valid_slices == 0) { + state->first_slice = -1; + } else { + state->first_slice = 0; + } + state->last_slice = valid_slices - 1; + state->working_slice_valid = 0; + state->memory_owned = 0; +} + +/* Returns true if there is still available data */ +static int slice_state_has_available(grpc_tcp_slice_state *state) { + return state->first_slice != -1 && state->last_slice >= state->first_slice; +} + +static ssize_t slice_state_slices_allocated(grpc_tcp_slice_state *state) { + if (state->first_slice == -1) { + return 0; + } else { + return state->last_slice - state->first_slice + 1; + } +} + +static void slice_state_realloc(grpc_tcp_slice_state *state, size_t new_size) { + /* TODO(klempner): use realloc instead when first_slice is 0 */ + /* TODO(klempner): Avoid a realloc in cases where it is unnecessary */ + gpr_slice *slices = state->slices; + size_t original_size = slice_state_slices_allocated(state); + size_t i; + gpr_slice *new_slices = gpr_malloc(sizeof(gpr_slice) * new_size); + + for (i = 0; i < original_size; ++i) { + new_slices[i] = slices[i + state->first_slice]; + } + + state->slices = new_slices; + state->last_slice = original_size - 1; + if (original_size > 0) { + state->first_slice = 0; + } else { + state->first_slice = -1; + } + state->nslices = new_size; + + if (state->memory_owned) { + gpr_free(slices); + } + state->memory_owned = 1; +} + +static void slice_state_remove_prefix(grpc_tcp_slice_state *state, + size_t prefix_bytes) { + gpr_slice *current_slice = &state->slices[state->first_slice]; + size_t current_slice_size; + + while (slice_state_has_available(state)) { + current_slice_size = GPR_SLICE_LENGTH(*current_slice); + if (current_slice_size > prefix_bytes) { + /* TODO(klempner): Get rid of the extra refcount created here by adding a + native "trim the first N bytes" operation to splice */ + /* TODO(klempner): This really shouldn't be modifying the current slice + unless we own the slices array. */ + *current_slice = gpr_slice_split_tail(current_slice, prefix_bytes); + gpr_slice_unref(*current_slice); + return; + } else { + gpr_slice_unref(*current_slice); + ++state->first_slice; + ++current_slice; + prefix_bytes -= current_slice_size; + } + } +} + +static void slice_state_destroy(grpc_tcp_slice_state *state) { + while (slice_state_has_available(state)) { + gpr_slice_unref(state->slices[state->first_slice]); + ++state->first_slice; + } + + if (state->memory_owned) { + gpr_free(state->slices); + state->memory_owned = 0; + } +} + +void slice_state_transfer_ownership(grpc_tcp_slice_state *state, + gpr_slice **slices, size_t *nslices) { + *slices = state->slices + state->first_slice; + *nslices = state->last_slice - state->first_slice + 1; + + state->first_slice = -1; + state->last_slice = -1; +} + +/* Fills iov with the first min(iov_size, available) slices, returns number + filled */ +static size_t slice_state_to_iovec(grpc_tcp_slice_state *state, + struct iovec *iov, size_t iov_size) { + size_t nslices = state->last_slice - state->first_slice + 1; + gpr_slice *slices = state->slices + state->first_slice; + size_t i; + if (nslices < iov_size) { + iov_size = nslices; + } + + for (i = 0; i < iov_size; ++i) { + iov[i].iov_base = GPR_SLICE_START_PTR(slices[i]); + iov[i].iov_len = GPR_SLICE_LENGTH(slices[i]); + } + return iov_size; +} + +/* Makes n blocks available at the end of state, writes them into iov, and + returns the number of bytes allocated */ +static size_t slice_state_append_blocks_into_iovec(grpc_tcp_slice_state *state, + struct iovec *iov, size_t n, + size_t slice_size) { + size_t target_size; + size_t i; + size_t allocated_bytes; + ssize_t allocated_slices = slice_state_slices_allocated(state); + + if (n - state->working_slice_valid >= state->nslices - state->last_slice) { + /* Need to grow the slice array */ + target_size = state->nslices; + do { + target_size = target_size * 2; + } while (target_size < allocated_slices + n - state->working_slice_valid); + /* TODO(klempner): If this ever needs to support both prefix removal and + append, we should be smarter about the growth logic here */ + slice_state_realloc(state, target_size); + } + + i = 0; + allocated_bytes = 0; + + if (state->working_slice_valid) { + iov[0].iov_base = GPR_SLICE_END_PTR(state->slices[state->last_slice]); + iov[0].iov_len = GPR_SLICE_LENGTH(state->working_slice) - + GPR_SLICE_LENGTH(state->slices[state->last_slice]); + allocated_bytes += iov[0].iov_len; + ++i; + state->slices[state->last_slice] = state->working_slice; + state->working_slice_valid = 0; + } + + for (; i < n; ++i) { + ++state->last_slice; + state->slices[state->last_slice] = gpr_slice_malloc(slice_size); + iov[i].iov_base = GPR_SLICE_START_PTR(state->slices[state->last_slice]); + iov[i].iov_len = slice_size; + allocated_bytes += slice_size; + } + if (state->first_slice == -1) { + state->first_slice = 0; + } + return allocated_bytes; +} + +/* Remove the last n bytes from state */ +/* TODO(klempner): Consider having this defer actual deletion until later */ +static void slice_state_remove_last(grpc_tcp_slice_state *state, size_t bytes) { + while (bytes > 0 && slice_state_has_available(state)) { + if (GPR_SLICE_LENGTH(state->slices[state->last_slice]) > bytes) { + state->working_slice = state->slices[state->last_slice]; + state->working_slice_valid = 1; + /* TODO(klempner): Combine these into a single operation that doesn't need + to refcount */ + gpr_slice_unref(gpr_slice_split_tail( + &state->slices[state->last_slice], + GPR_SLICE_LENGTH(state->slices[state->last_slice]) - bytes)); + bytes = 0; + } else { + bytes -= GPR_SLICE_LENGTH(state->slices[state->last_slice]); + gpr_slice_unref(state->slices[state->last_slice]); + --state->last_slice; + if (state->last_slice == -1) { + state->first_slice = -1; + } + } + } +} + +typedef struct { + grpc_endpoint base; + grpc_fd *em_fd; + int fd; + size_t slice_size; + gpr_refcount refcount; + + grpc_endpoint_read_cb read_cb; + void *read_user_data; + gpr_timespec read_deadline; + grpc_endpoint_write_cb write_cb; + void *write_user_data; + gpr_timespec write_deadline; + + grpc_tcp_slice_state write_state; +} grpc_tcp; + +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, + grpc_iomgr_cb_status status); +static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, + grpc_iomgr_cb_status status); + +static void grpc_tcp_shutdown(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_fd_shutdown(tcp->em_fd); +} + +static void grpc_tcp_unref(grpc_tcp *tcp) { + int refcount_zero = gpr_unref(&tcp->refcount); + if (refcount_zero) { + grpc_fd_destroy(tcp->em_fd); + gpr_free(tcp); + } +} + +static void grpc_tcp_destroy(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_tcp_unref(tcp); +} + +static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, + grpc_endpoint_cb_status status) { + grpc_endpoint_read_cb cb = tcp->read_cb; + +#ifdef GRPC_TRACE_TCP + size_t i; + gpr_log(GPR_DEBUG, "read: status=%d", status); + for (i = 0; i < nslices; i++) { + char *dump = + gpr_hexdump((char *)GPR_SLICE_START_PTR(slices[i]), + GPR_SLICE_LENGTH(slices[i]), GPR_HEXDUMP_PLAINTEXT); + gpr_log(GPR_DEBUG, "READ: %s", dump); + gpr_free(dump); + } +#endif + + tcp->read_cb = NULL; + cb(tcp->read_user_data, slices, nslices, status); +} + +#define INLINE_SLICE_BUFFER_SIZE 8 +#define MAX_READ_IOVEC 4 +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, + grpc_iomgr_cb_status status) { + grpc_tcp *tcp = (grpc_tcp *)arg; + int iov_size = 1; + gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE]; + struct msghdr msg; + struct iovec iov[MAX_READ_IOVEC]; + ssize_t read_bytes; + ssize_t allocated_bytes; + struct grpc_tcp_slice_state read_state; + gpr_slice *final_slices; + size_t final_nslices; + + slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE, + 0); + + if (status == GRPC_CALLBACK_CANCELLED) { + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); + grpc_tcp_unref(tcp); + return; + } + + if (status == GRPC_CALLBACK_TIMED_OUT) { + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_TIMED_OUT); + grpc_tcp_unref(tcp); + return; + } + + /* TODO(klempner): Limit the amount we read at once. */ + for (;;) { + allocated_bytes = slice_state_append_blocks_into_iovec( + &read_state, iov, iov_size, tcp->slice_size); + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iov_size; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + do { + read_bytes = recvmsg(tcp->fd, &msg, 0); + } while (read_bytes < 0 && errno == EINTR); + + if (read_bytes < allocated_bytes) { + /* TODO(klempner): Consider a second read first, in hopes of getting a + * quick EAGAIN and saving a bunch of allocations. */ + slice_state_remove_last(&read_state, read_bytes < 0 + ? allocated_bytes + : allocated_bytes - read_bytes); + } + + if (read_bytes < 0) { + /* NB: After calling the user_cb a parallel call of the read handler may + * be running. */ + if (errno == EAGAIN) { + if (slice_state_has_available(&read_state)) { + /* TODO(klempner): We should probably do the call into the application + without all this junk on the stack */ + /* FIXME(klempner): Refcount properly */ + slice_state_transfer_ownership(&read_state, &final_slices, + &final_nslices); + call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); + slice_state_destroy(&read_state); + grpc_tcp_unref(tcp); + } else { + /* Spurious read event, consume it here */ + slice_state_destroy(&read_state); + grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, + tcp->read_deadline); + } + } else { + /* TODO(klempner): Log interesting errors */ + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR); + slice_state_destroy(&read_state); + grpc_tcp_unref(tcp); + } + return; + } else if (read_bytes == 0) { + /* 0 read size ==> end of stream */ + if (slice_state_has_available(&read_state)) { + /* there were bytes already read: pass them up to the application */ + slice_state_transfer_ownership(&read_state, &final_slices, + &final_nslices); + call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF); + } else { + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF); + } + slice_state_destroy(&read_state); + grpc_tcp_unref(tcp); + return; + } else if (iov_size < MAX_READ_IOVEC) { + ++iov_size; + } + } +} + +static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, + void *user_data, gpr_timespec deadline) { + grpc_tcp *tcp = (grpc_tcp *)ep; + GPR_ASSERT(tcp->read_cb == NULL); + tcp->read_cb = cb; + tcp->read_user_data = user_data; + tcp->read_deadline = deadline; + gpr_ref(&tcp->refcount); + grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, deadline); +} + +#define MAX_WRITE_IOVEC 16 +static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) { + struct msghdr msg; + struct iovec iov[MAX_WRITE_IOVEC]; + int iov_size; + ssize_t sent_length; + grpc_tcp_slice_state *state = &tcp->write_state; + + for (;;) { + iov_size = slice_state_to_iovec(state, iov, MAX_WRITE_IOVEC); + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iov_size; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + do { + /* TODO(klempner): Cork if this is a partial write */ + sent_length = sendmsg(tcp->fd, &msg, 0); + } while (sent_length < 0 && errno == EINTR); + + if (sent_length < 0) { + if (errno == EAGAIN) { + return GRPC_ENDPOINT_WRITE_PENDING; + } else { + /* TODO(klempner): Log some of these */ + slice_state_destroy(state); + return GRPC_ENDPOINT_WRITE_ERROR; + } + } + + /* TODO(klempner): Probably better to batch this after we finish flushing */ + slice_state_remove_prefix(state, sent_length); + + if (!slice_state_has_available(state)) { + return GRPC_ENDPOINT_WRITE_DONE; + } + }; +} + +static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, + grpc_iomgr_cb_status status) { + grpc_tcp *tcp = (grpc_tcp *)arg; + grpc_endpoint_write_status write_status; + grpc_endpoint_cb_status cb_status; + grpc_endpoint_write_cb cb; + + cb_status = GRPC_ENDPOINT_CB_OK; + + if (status == GRPC_CALLBACK_CANCELLED) { + cb_status = GRPC_ENDPOINT_CB_SHUTDOWN; + } else if (status == GRPC_CALLBACK_TIMED_OUT) { + cb_status = GRPC_ENDPOINT_CB_TIMED_OUT; + } + + if (cb_status != GRPC_ENDPOINT_CB_OK) { + slice_state_destroy(&tcp->write_state); + cb = tcp->write_cb; + tcp->write_cb = NULL; + cb(tcp->write_user_data, cb_status); + grpc_tcp_unref(tcp); + return; + } + + write_status = grpc_tcp_flush(tcp); + if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { + grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, + tcp->write_deadline); + } else { + slice_state_destroy(&tcp->write_state); + if (write_status == GRPC_ENDPOINT_WRITE_DONE) { + cb_status = GRPC_ENDPOINT_CB_OK; + } else { + cb_status = GRPC_ENDPOINT_CB_ERROR; + } + cb = tcp->write_cb; + tcp->write_cb = NULL; + cb(tcp->write_user_data, cb_status); + grpc_tcp_unref(tcp); + } +} + +static grpc_endpoint_write_status grpc_tcp_write( + grpc_endpoint *ep, gpr_slice *slices, size_t nslices, + grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) { + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_endpoint_write_status status; + +#ifdef GRPC_TRACE_TCP + size_t i; + + for (i = 0; i < nslices; i++) { + char *data = + gpr_hexdump((char *)GPR_SLICE_START_PTR(slices[i]), + GPR_SLICE_LENGTH(slices[i]), GPR_HEXDUMP_PLAINTEXT); + gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data); + gpr_free(data); + } +#endif + + GPR_ASSERT(tcp->write_cb == NULL); + slice_state_init(&tcp->write_state, slices, nslices, nslices); + + status = grpc_tcp_flush(tcp); + if (status == GRPC_ENDPOINT_WRITE_PENDING) { + /* TODO(klempner): Consider inlining rather than malloc for small nslices */ + slice_state_realloc(&tcp->write_state, nslices); + gpr_ref(&tcp->refcount); + tcp->write_cb = cb; + tcp->write_user_data = user_data; + tcp->write_deadline = deadline; + grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, + tcp->write_deadline); + } + + return status; +} + +static const grpc_endpoint_vtable vtable = {grpc_tcp_notify_on_read, + grpc_tcp_write, grpc_tcp_shutdown, + grpc_tcp_destroy}; + +grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { + grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); + tcp->base.vtable = &vtable; + tcp->fd = grpc_fd_get(em_fd); + tcp->read_cb = NULL; + tcp->write_cb = NULL; + tcp->read_user_data = NULL; + tcp->write_user_data = NULL; + tcp->slice_size = slice_size; + tcp->read_deadline = gpr_inf_future; + tcp->write_deadline = gpr_inf_future; + slice_state_init(&tcp->write_state, NULL, 0, 0); + /* paired with unref in grpc_tcp_destroy */ + gpr_ref_init(&tcp->refcount, 1); + tcp->em_fd = em_fd; + return &tcp->base; +} diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h new file mode 100644 index 0000000000..8a3c52894c --- /dev/null +++ b/src/core/iomgr/tcp_posix.h @@ -0,0 +1,56 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_TCP_POSIX_H__ +#define __GRPC_INTERNAL_IOMGR_TCP_POSIX_H__ +/* + Low level TCP "bottom half" implementation, for use by transports built on + top of a TCP connection. + + Note that this file does not (yet) include APIs for creating the socket in + the first place. + + All calls passing slice transfer ownership of a slice refcount unless + otherwise specified. +*/ + +#include "src/core/endpoint/endpoint.h" +#include "src/core/iomgr/iomgr_libevent.h" + +#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 + +/* Create a tcp endpoint given a file desciptor and a read slice size. + Takes ownership of fd. */ +grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size); + +#endif /* __GRPC_INTERNAL_IOMGR_TCP_POSIX_H__ */ diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h new file mode 100644 index 0000000000..bd6b46f538 --- /dev/null +++ b/src/core/iomgr/tcp_server.h @@ -0,0 +1,75 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_TCP_SERVER_H__ +#define __GRPC_INTERNAL_IOMGR_TCP_SERVER_H__ + +#include <sys/types.h> +#include <sys/socket.h> + +#include "src/core/endpoint/endpoint.h" + +/* Forward decl of grpc_tcp_server */ +typedef struct grpc_tcp_server grpc_tcp_server; + +/* New server callback: tcp is the newly connected tcp connection */ +typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep); + +/* Create a server, initially not bound to any ports */ +grpc_tcp_server *grpc_tcp_server_create(); + +/* Start listening to bound ports */ +void grpc_tcp_server_start(grpc_tcp_server *server, grpc_tcp_server_cb cb, + void *cb_arg); + +/* Add a port to the server, returning true on success, or false otherwise. + + The :: and 0.0.0.0 wildcard addresses are treated identically, accepting + both IPv4 and IPv6 connections, but :: is the preferred style. This usually + creates one socket, but possibly two on systems which support IPv6, + but not dualstack sockets. + + For raw access to the underlying sockets, see grpc_tcp_server_get_fd(). */ +int grpc_tcp_server_add_port(grpc_tcp_server *s, const struct sockaddr *addr, + int addr_len); + +/* Returns the file descriptor of the Nth listening socket on this server, + or -1 if the index is out of bounds. + + The file descriptor remains owned by the server, and will be cleaned + up when grpc_tcp_server_destroy is called. */ +int grpc_tcp_server_get_fd(grpc_tcp_server *s, int index); + +void grpc_tcp_server_destroy(grpc_tcp_server *server); + +#endif /* __GRPC_INTERNAL_IOMGR_TCP_SERVER_H__ */ diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c new file mode 100644 index 0000000000..22bbd45351 --- /dev/null +++ b/src/core/iomgr/tcp_server_posix.c @@ -0,0 +1,330 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#define _GNU_SOURCE +#include "src/core/iomgr/tcp_server.h" + +#include <limits.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <stdio.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> +#include <string.h> +#include <errno.h> + +#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/iomgr/tcp_posix.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +#define INIT_PORT_CAP 2 +#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 + +static gpr_once s_init_max_accept_queue_size; +static int s_max_accept_queue_size; + +/* one listening port */ +typedef struct { + int fd; + grpc_fd *emfd; + grpc_tcp_server *server; +} server_port; + +/* the overall server */ +struct grpc_tcp_server { + grpc_tcp_server_cb cb; + void *cb_arg; + + gpr_mu mu; + gpr_cv cv; + + /* active port count: how many ports are actually still listening */ + int active_ports; + + /* all listening ports */ + server_port *ports; + size_t nports; + size_t port_capacity; +}; + +grpc_tcp_server *grpc_tcp_server_create() { + grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); + gpr_mu_init(&s->mu); + gpr_cv_init(&s->cv); + s->active_ports = 0; + s->cb = NULL; + s->cb_arg = NULL; + s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); + s->nports = 0; + s->port_capacity = INIT_PORT_CAP; + return s; +} + +void grpc_tcp_server_destroy(grpc_tcp_server *s) { + size_t i; + gpr_mu_lock(&s->mu); + /* shutdown all fd's */ + for (i = 0; i < s->nports; i++) { + grpc_fd_shutdown(s->ports[i].emfd); + } + /* wait while that happens */ + while (s->active_ports) { + gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); + } + gpr_mu_unlock(&s->mu); + + /* delete ALL the things */ + for (i = 0; i < s->nports; i++) { + server_port *sp = &s->ports[i]; + grpc_fd_destroy(sp->emfd); + } + gpr_free(s->ports); + gpr_free(s); +} + +/* get max listen queue size on linux */ +static void init_max_accept_queue_size() { + int n = SOMAXCONN; + char buf[64]; + FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r"); + if (fp == NULL) { + /* 2.4 kernel. */ + s_max_accept_queue_size = SOMAXCONN; + return; + } + if (fgets(buf, sizeof buf, fp)) { + char *end; + long i = strtol(buf, &end, 10); + if (i > 0 && i <= INT_MAX && end && *end == 0) { + n = i; + } + } + fclose(fp); + s_max_accept_queue_size = n; + + if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) { + gpr_log(GPR_INFO, + "Suspiciously small accept queue (%d) will probably lead to " + "connection drops", + s_max_accept_queue_size); + } +} + +static int get_max_accept_queue_size() { + gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size); + return s_max_accept_queue_size; +} + +/* Prepare a recently-created socket for listening. */ +static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) { + if (fd < 0) { + goto error; + } + + if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) || + !grpc_set_socket_low_latency(fd, 1) || + !grpc_set_socket_reuse_addr(fd, 1)) { + gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, + strerror(errno)); + goto error; + } + + if (bind(fd, addr, addr_len) < 0) { + char *addr_str; + grpc_sockaddr_to_string(&addr_str, addr, 0); + gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); + gpr_free(addr_str); + goto error; + } + + if (listen(fd, get_max_accept_queue_size()) < 0) { + gpr_log(GPR_ERROR, "listen: %s", strerror(errno)); + goto error; + } + + return 1; + +error: + if (fd >= 0) { + close(fd); + } + return 0; +} + +/* event manager callback when reads are ready */ +static void on_read(void *arg, grpc_iomgr_cb_status status) { + server_port *sp = arg; + + if (status != GRPC_CALLBACK_SUCCESS) { + goto error; + } + + /* loop until accept4 returns EAGAIN, and then re-arm notification */ + for (;;) { + struct sockaddr_storage addr; + socklen_t addrlen = sizeof(addr); + /* Note: If we ever decide to return this address to the user, remember to + strip off the ::ffff:0.0.0.0/96 prefix first. */ + int fd = grpc_accept4(sp->fd, (struct sockaddr *)&addr, &addrlen, 1, 1); + if (fd < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + grpc_fd_notify_on_read(sp->emfd, on_read, sp, gpr_inf_future); + return; + default: + gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); + goto error; + } + } + + sp->server->cb( + sp->server->cb_arg, + grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + } + + abort(); + +error: + gpr_mu_lock(&sp->server->mu); + if (0 == --sp->server->active_ports) { + gpr_cv_broadcast(&sp->server->cv); + } + gpr_mu_unlock(&sp->server->mu); +} + +static int add_socket_to_server(grpc_tcp_server *s, int fd, + const struct sockaddr *addr, int addr_len) { + server_port *sp; + + if (!prepare_socket(fd, addr, addr_len)) { + return 0; + } + + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb && "must add ports before starting server"); + /* append it to the list under a lock */ + if (s->nports == s->port_capacity) { + s->port_capacity *= 2; + s->ports = gpr_realloc(s->ports, sizeof(server_port *) * s->port_capacity); + } + sp = &s->ports[s->nports++]; + sp->emfd = grpc_fd_create(fd); + sp->fd = fd; + sp->server = s; + /* initialize the em desc */ + if (sp->emfd == NULL) { + s->nports--; + gpr_mu_unlock(&s->mu); + return 0; + } + gpr_mu_unlock(&s->mu); + + return 1; +} + +int grpc_tcp_server_add_port(grpc_tcp_server *s, const struct sockaddr *addr, + int addr_len) { + int ok = 0; + int fd; + grpc_dualstack_mode dsmode; + struct sockaddr_in6 addr6_v4mapped; + struct sockaddr_in wild4; + struct sockaddr_in6 wild6; + struct sockaddr_in addr4_copy; + int port; + + if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { + addr = (const struct sockaddr *)&addr6_v4mapped; + addr_len = sizeof(addr6_v4mapped); + } + + /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ + if (grpc_sockaddr_is_wildcard(addr, &port)) { + grpc_sockaddr_make_wildcards(port, &wild4, &wild6); + + /* Try listening on IPv6 first. */ + addr = (struct sockaddr *)&wild6; + addr_len = sizeof(wild6); + fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); + ok |= add_socket_to_server(s, fd, addr, addr_len); + if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { + return ok; + } + + /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ + addr = (struct sockaddr *)&wild4; + addr_len = sizeof(wild4); + } + + fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode); + if (fd < 0) { + gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); + } + if (dsmode == GRPC_DSMODE_IPV4 && + grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { + addr = (struct sockaddr *)&addr4_copy; + addr_len = sizeof(addr4_copy); + } + ok |= add_socket_to_server(s, fd, addr, addr_len); + return ok; +} + +int grpc_tcp_server_get_fd(grpc_tcp_server *s, int index) { + return (0 <= index && index < s->nports) ? s->ports[index].fd : -1; +} + +void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb, + void *cb_arg) { + size_t i; + GPR_ASSERT(cb); + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb); + GPR_ASSERT(s->active_ports == 0); + s->cb = cb; + s->cb_arg = cb_arg; + for (i = 0; i < s->nports; i++) { + grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i], + gpr_inf_future); + s->active_ports++; + } + gpr_mu_unlock(&s->mu); +} |