diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_setup.c | 17 | ||||
-rw-r--r-- | src/core/channel/client_setup.h | 3 | ||||
-rw-r--r-- | src/core/eventmanager/em.h | 344 | ||||
-rw-r--r-- | src/core/httpcli/httpcli.c | 16 | ||||
-rw-r--r-- | src/core/httpcli/httpcli.h | 5 | ||||
-rw-r--r-- | src/core/iomgr/alarm.h | 85 | ||||
-rw-r--r-- | src/core/iomgr/endpoint_pair.h (renamed from src/core/surface/surface_em.h) | 19 | ||||
-rw-r--r-- | src/core/iomgr/endpoint_pair_posix.c | 61 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.h | 56 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_completion_queue_interface.h (renamed from src/core/surface/surface_em.c) | 26 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_libevent.c (renamed from src/core/eventmanager/em.c) | 508 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_libevent.h | 207 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_libevent_use_threads.c (renamed from src/core/eventmanager/em_posix.c) | 0 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address.h (renamed from src/core/endpoint/resolve_address.h) | 6 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_posix.c (renamed from src/core/endpoint/resolve_address.c) | 7 | ||||
-rw-r--r-- | src/core/iomgr/sockaddr.h | 47 | ||||
-rw-r--r-- | src/core/iomgr/sockaddr_posix.h | 40 | ||||
-rw-r--r-- | src/core/iomgr/sockaddr_utils.c (renamed from src/core/endpoint/socket_utils.c) | 114 | ||||
-rw-r--r-- | src/core/iomgr/sockaddr_utils.h | 75 | ||||
-rw-r--r-- | src/core/iomgr/sockaddr_win32.h (renamed from src/core/eventmanager/em_win32.c) | 7 | ||||
-rw-r--r-- | src/core/iomgr/socket_utils_common_posix.c | 154 | ||||
-rw-r--r-- | src/core/iomgr/socket_utils_linux.c (renamed from src/core/endpoint/socket_utils_linux.c) | 2 | ||||
-rw-r--r-- | src/core/iomgr/socket_utils_posix.c (renamed from src/core/endpoint/socket_utils_posix.c) | 0 | ||||
-rw-r--r-- | src/core/iomgr/socket_utils_posix.h (renamed from src/core/endpoint/socket_utils.h) | 46 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client.h (renamed from src/core/endpoint/tcp_client.h) | 17 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_posix.c (renamed from src/core/endpoint/tcp_client.c) | 33 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c (renamed from src/core/endpoint/tcp.c) | 54 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.h (renamed from src/core/endpoint/tcp.h) | 19 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server.h (renamed from src/core/endpoint/tcp_server.h) | 11 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c (renamed from src/core/endpoint/tcp_server.c) | 40 | ||||
-rw-r--r-- | src/core/security/credentials.c | 14 | ||||
-rw-r--r-- | src/core/security/server_secure_chttp2.c | 7 | ||||
-rw-r--r-- | src/core/surface/call.c | 18 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 15 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 65 | ||||
-rw-r--r-- | src/core/surface/init.c | 6 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 14 | ||||
-rw-r--r-- | src/core/surface/server.c | 16 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 7 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.h | 2 |
40 files changed, 1150 insertions, 1033 deletions
diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c index ea256706fd..29fe915add 100644 --- a/src/core/channel/client_setup.c +++ b/src/core/channel/client_setup.c @@ -34,6 +34,7 @@ #include "src/core/channel/client_setup.h" #include "src/core/channel/channel_args.h" #include "src/core/channel/channel_stack.h" +#include "src/core/iomgr/alarm.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/time.h> @@ -45,8 +46,7 @@ struct grpc_client_setup { void *user_data; grpc_channel_args *args; grpc_mdctx *mdctx; - grpc_em *em; - grpc_em_alarm backoff_alarm; + grpc_alarm backoff_alarm; gpr_timespec current_backoff_interval; int in_alarm; @@ -115,7 +115,7 @@ static void setup_cancel(grpc_transport_setup *sp) { /* effectively cancels the current request (if any) */ s->active_request = NULL; if (s->in_alarm) { - grpc_em_alarm_cancel(&s->backoff_alarm); + grpc_alarm_cancel(&s->backoff_alarm); } if (--s->refs == 0) { gpr_mu_unlock(&s->mu); @@ -133,7 +133,7 @@ void grpc_client_setup_create_and_attach( grpc_channel_stack *newly_minted_channel, const grpc_channel_args *args, grpc_mdctx *mdctx, void (*initiate)(void *user_data, grpc_client_setup_request *request), - void (*done)(void *user_data), void *user_data, grpc_em *em) { + void (*done)(void *user_data), void *user_data) { grpc_client_setup *s = gpr_malloc(sizeof(grpc_client_setup)); s->base.vtable = &setup_vtable; @@ -143,7 +143,6 @@ void grpc_client_setup_create_and_attach( s->initiate = initiate; s->done = done; s->user_data = user_data; - s->em = em; s->active_request = NULL; s->args = grpc_channel_args_copy(args); s->current_backoff_interval = gpr_time_from_micros(1000000); @@ -164,7 +163,7 @@ int grpc_client_setup_request_should_continue(grpc_client_setup_request *r) { } static void backoff_alarm_done(void *arg /* grpc_client_setup */, - grpc_em_cb_status status) { + grpc_iomgr_cb_status status) { grpc_client_setup *s = arg; grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request)); r->setup = s; @@ -215,9 +214,9 @@ void grpc_client_setup_request_finish(grpc_client_setup_request *r, gpr_timespec max_backoff = gpr_time_from_micros(120000000); GPR_ASSERT(!s->in_alarm); s->in_alarm = 1; - grpc_em_alarm_init(&s->backoff_alarm, s->em, backoff_alarm_done, s); - grpc_em_alarm_add(&s->backoff_alarm, - gpr_time_add(s->current_backoff_interval, gpr_now())); + grpc_alarm_init(&s->backoff_alarm, backoff_alarm_done, s); + grpc_alarm_add(&s->backoff_alarm, + gpr_time_add(s->current_backoff_interval, gpr_now())); s->current_backoff_interval = gpr_time_add(s->current_backoff_interval, s->current_backoff_interval); if (gpr_time_cmp(s->current_backoff_interval, max_backoff) > 0) { diff --git a/src/core/channel/client_setup.h b/src/core/channel/client_setup.h index 862c1325a3..a508785e60 100644 --- a/src/core/channel/client_setup.h +++ b/src/core/channel/client_setup.h @@ -35,7 +35,6 @@ #define __GRPC_INTERNAL_CHANNEL_CLIENT_SETUP_H__ #include "src/core/channel/client_channel.h" -#include "src/core/eventmanager/em.h" #include "src/core/transport/metadata.h" #include <grpc/support/time.h> @@ -48,7 +47,7 @@ void grpc_client_setup_create_and_attach( grpc_channel_stack *newly_minted_channel, const grpc_channel_args *args, grpc_mdctx *mdctx, void (*initiate)(void *user_data, grpc_client_setup_request *request), - void (*done)(void *user_data), void *user_data, grpc_em *em); + void (*done)(void *user_data), void *user_data); /* Check that r is the active request: needs to be performed at each callback. If this races, we'll have two connection attempts running at once and the diff --git a/src/core/eventmanager/em.h b/src/core/eventmanager/em.h deleted file mode 100644 index f190bc8743..0000000000 --- a/src/core/eventmanager/em.h +++ /dev/null @@ -1,344 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef __GRPC_INTERNAL_EVENTMANAGER_EM_H__ -#define __GRPC_INTERNAL_EVENTMANAGER_EM_H__ -/* grpc_em is an event manager wrapping event loop with multithread support. - It executes a callback function when a specific event occurs on a file - descriptor or after a timeout has passed. - All methods are threadsafe and can be called from any thread. - - To use the event manager, a grpc_em instance needs to be initialized to - maintains the internal states. The grpc_em instance can be used to - initialize file descriptor instance of grpc_em_fd, or alarm instance of - grpc_em_alarm. The former is used to register a callback with a IO event. - The later is used to schedule an alarm. - - Instantiating any of these data structures requires including em_internal.h - A typical usage example is shown in the end of that header file. */ - -#include <grpc/support/atm.h> -#include <grpc/support/sync.h> -#include <grpc/support/thd.h> -#include <grpc/support/time.h> - -/* =============== Enums used in GRPC event manager API ==================== */ - -/* Result of a grpc_em operation */ -typedef enum grpc_em_error { - GRPC_EM_OK = 0, /* everything went ok */ - GRPC_EM_ERROR, /* internal errors not caused by the caller */ - GRPC_EM_INVALID_ARGUMENTS /* invalid arguments from the caller */ -} grpc_em_error; - -/* Status passed to callbacks for grpc_em_fd_notify_on_read and - grpc_em_fd_notify_on_write. */ -typedef enum grpc_em_cb_status { - GRPC_CALLBACK_SUCCESS = 0, - GRPC_CALLBACK_TIMED_OUT, - GRPC_CALLBACK_CANCELLED, - GRPC_CALLBACK_DO_NOT_USE -} grpc_em_cb_status; - -/* ======= Useful forward struct typedefs for GRPC event manager API ======= */ - -struct grpc_em; -struct grpc_em_alarm; -struct grpc_fd; - -typedef struct grpc_em grpc_em; -typedef struct grpc_em_alarm grpc_em_alarm; -typedef struct grpc_em_fd grpc_em_fd; - -/* gRPC Callback definition */ -typedef void (*grpc_em_cb_func)(void *arg, grpc_em_cb_status status); - -/* ============================ grpc_em =============================== */ -/* Initialize *em and start polling, return GRPC_EM_OK on success, return - GRPC_EM_ERROR on failure. Upon failure, caller should call grpc_em_destroy() - to clean partially initialized *em. - - Requires: *em uninitialized. */ -grpc_em_error grpc_em_init(grpc_em *em); - -/* Stop polling and cause *em no longer to be initialized. - Return GRPC_EM_OK if event polling is cleanly stopped. - Otherwise, return GRPC_EM_ERROR if polling is shutdown with errors. - Requires: *em initialized; no other concurrent operation on *em. */ -grpc_em_error grpc_em_destroy(grpc_em *em); - -/* do some work; assumes em->mu locked; may unlock and relock em->mu */ -int grpc_em_work(grpc_em *em, gpr_timespec deadline); - -/* =========================== grpc_em_am ============================== */ -/* Initialize *alarm. When expired or canceled, alarm_cb will be called with - *alarm_cb_arg and status to indicate if it expired (SUCCESS) or was - canceled (CANCELLED). alarm_cb is guaranteed to be called exactly once, - and application code should check the status to determine how it was - invoked. The application callback is also responsible for maintaining - information about when to free up any user-level state. */ -grpc_em_error grpc_em_alarm_init(grpc_em_alarm *alarm, grpc_em *em, - grpc_em_cb_func alarm_cb, void *alarm_cb_arg); - -/* Note that there is no alarm destroy function. This is because the - alarm is a one-time occurrence with a guarantee that the callback will - be called exactly once, either at expiration or cancellation. Thus, all - the internal alarm event management state is destroyed just before - that callback is invoked. If the user has additional state associated with - the alarm, the user is responsible for determining when it is safe to - destroy that state. */ - -/* Schedule *alarm to expire at deadline. If *alarm is - re-added before expiration, the *delay is simply reset to the new value. - Return GRPC_EM_OK on success, or GRPC_EM_ERROR on failure. - Upon failure, caller should abort further operations on *alarm */ -grpc_em_error grpc_em_alarm_add(grpc_em_alarm *alarm, gpr_timespec deadline); - -/* Cancel an *alarm. - There are three cases: - 1. We normally cancel the alarm - 2. The alarm has already run - 3. We can't cancel the alarm because it is "in flight". - - In all of these cases, the cancellation is still considered successful. - They are essentially distinguished in that the alarm_cb will be run - exactly once from either the cancellation (with status CANCELLED) - or from the activation (with status SUCCESS) - - Requires: cancel() must happen after add() on a given alarm */ -grpc_em_error grpc_em_alarm_cancel(grpc_em_alarm *alarm); - -/* ========================== grpc_em_fd ============================= */ - -/* Initialize *em_fd, return GRPM_EM_OK on success, GRPC_EM_ERROR on internal - errors, or GRPC_EM_INVALID_ARGUMENTS if fd is a blocking file descriptor. - Upon failure, caller should call grpc_em_fd_destroy() to clean partially - initialized *em_fd. - fd is a non-blocking file descriptor. - - This takes ownership of closing fd. - - Requires: *em_fd uninitialized. fd is a non-blocking file descriptor. */ -grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd); - -/* Cause *em_fd no longer to be initialized and closes the underlying fd. - Requires: *em_fd initialized; no outstanding notify_on_read or - notify_on_write. */ -void grpc_em_fd_destroy(grpc_em_fd *em_fd); - -/* Returns the file descriptor associated with *em_fd. */ -int grpc_em_fd_get(grpc_em_fd *em_fd); - -/* Returns the event manager associated with *em_fd. */ -grpc_em *grpc_em_fd_get_em(grpc_em_fd *em_fd); - -/* Register read interest, causing read_cb to be called once when em_fd becomes - readable, on deadline specified by deadline, or on shutdown triggered by - grpc_em_fd_shutdown. - Return GRPC_EM_OK on success, or GRPC_EM_ERROR on failure. - Upon Failure, caller should abort further operations on *em_fd except - grpc_em_fd_shutdown(). - read_cb will be called with read_cb_arg when *em_fd becomes readable. - read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable, - GRPC_CALLBACK_TIMED_OUT if the call timed out, - and CANCELLED if the call was cancelled. - - Requires:This method must not be called before the read_cb for any previous - call runs. Edge triggered events are used whenever they are supported by the - underlying platform. This means that users must drain em_fd in read_cb before - calling notify_on_read again. Users are also expected to handle spurious - events, i.e read_cb is called while nothing can be readable from em_fd */ -grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd, - grpc_em_cb_func read_cb, - void *read_cb_arg, - gpr_timespec deadline); - -/* Exactly the same semantics as above, except based on writable events. */ -grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *fd, - grpc_em_cb_func write_cb, - void *write_cb_arg, - gpr_timespec deadline); - -/* Cause any current and all future read/write callbacks to error out with - GRPC_CALLBACK_CANCELLED. */ -void grpc_em_fd_shutdown(grpc_em_fd *em_fd); - -/* ================== Other functions =================== */ - -/* This function is called from within a callback or from anywhere else - and causes the invocation of a callback at some point in the future */ -grpc_em_error grpc_em_add_callback(grpc_em *em, grpc_em_cb_func cb, - void *cb_arg); - -/* ========== Declarations related to queue management (non-API) =========== */ - -/* Forward declarations */ -struct grpc_em_activation_data; -struct grpc_em_fd_impl; - -/* ================== Actual structure definitions ========================= */ -/* gRPC event manager handle. - The handle is used to initialize both grpc_em_alarm and grpc_em_fd. */ -struct em_thread_arg; - -struct grpc_em { - struct event_base *event_base; - - gpr_mu mu; - gpr_cv cv; - struct grpc_em_activation_data *q; - int num_pollers; - int num_fds; - gpr_timespec last_poll_completed; - - int shutdown_backup_poller; - gpr_event backup_poller_done; - - struct grpc_em_fd_impl *fds_to_free; - - struct event *timeout_ev; /* activated to break out of the event loop early */ -}; - -/* gRPC event manager task "base class". This is pretend-inheritance in C89. - This should be the first member of any actual grpc_em task type. - - Memory warning: expanding this will increase memory usage in any derived - class, so be careful. - - For generality, this base can be on multiple task queues and can have - multiple event callbacks registered. Not all "derived classes" will use - this feature. */ - -typedef enum grpc_em_task_type { - GRPC_EM_TASK_ALARM, - GRPC_EM_TASK_FD, - GRPC_EM_TASK_DO_NOT_USE -} grpc_em_task_type; - -/* Different activity types to shape the callback and queueing arrays */ -typedef enum grpc_em_task_activity_type { - GRPC_EM_TA_READ, /* use this also for single-type events */ - GRPC_EM_TA_WRITE, - GRPC_EM_TA_COUNT -} grpc_em_task_activity_type; - -/* Include the following #define for convenience for tasks like alarms that - only have a single type */ -#define GRPC_EM_TA_ONLY GRPC_EM_TA_READ - -typedef struct grpc_em_activation_data { - struct event *ev; /* event activated on this callback type */ - grpc_em_cb_func cb; /* function pointer for callback */ - void *arg; /* argument passed to cb */ - - /* Hold the status associated with the callback when queued */ - grpc_em_cb_status status; - /* Now set up to link activations into scheduler queues */ - struct grpc_em_activation_data *prev; - struct grpc_em_activation_data *next; -} grpc_em_activation_data; - -typedef struct grpc_em_task { - grpc_em_task_type type; - grpc_em *em; - - /* Now have an array of activation data elements: one for each activity - type that could get activated */ - grpc_em_activation_data activation[GRPC_EM_TA_COUNT]; -} grpc_em_task; - -/* gRPC alarm handle. - The handle is used to add an alarm which expires after specified timeout. */ -struct grpc_em_alarm { - grpc_em_task task; /* Include the base class */ - - gpr_atm triggered; /* To be used atomically if alarm triggered */ -}; - -/* =================== Event caching =================== - In order to not miss or double-return edges in the context of edge triggering - and multithreading, we need a per-fd caching layer in the eventmanager itself - to cache relevant events. - - There are two types of events we care about: calls to notify_on_[read|write] - and readable/writable events for the socket from eventfd. There are separate - event caches for read and write. - - There are three states: - 0. "waiting" -- There's been a call to notify_on_[read|write] which has not - had a corresponding event. In other words, we're waiting for an event so we - can run the callback. - 1. "idle" -- We are neither waiting nor have a cached event. - 2. "cached" -- There has been a read/write event without a waiting callback, - so we want to run the event next time the application calls - notify_on_[read|write]. - - The high level state diagram: - - +--------------------------------------------------------------------+ - | WAITING | IDLE | CACHED | - | | | | - | 1. --*-> 2. --+-> 3. --+\ - | | | <--+/ - | | | | - x+-- 6. 5. <-+-- 4. <-*-- | - | | | | - +--------------------------------------------------------------------+ - - Transitions right occur on read|write events. Transitions left occur on - notify_on_[read|write] events. - State transitions: - 1. Read|Write event while waiting -> run the callback and transition to idle. - 2. Read|Write event while idle -> transition to cached. - 3. Read|Write event with one already cached -> still cached. - 4. notify_on_[read|write] with event cached: run callback and transition to - idle. - 5. notify_on_[read|write] when idle: Store callback and transition to - waiting. - 6. notify_on_[read|write] when waiting: invalid. */ - -typedef enum grpc_em_fd_state { - GRPC_EM_FD_WAITING = 0, - GRPC_EM_FD_IDLE = 1, - GRPC_EM_FD_CACHED = 2 -} grpc_em_fd_state; - -struct grpc_em_fd_impl; - -/* gRPC file descriptor handle. - The handle is used to register read/write callbacks to a file descriptor */ -struct grpc_em_fd { - struct grpc_em_fd_impl *impl; -}; - -#endif /* __GRPC_INTERNAL_EVENTMANAGER_EM_H__ */ diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 6c0a688390..84a97a4783 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -36,8 +36,8 @@ #include <string.h> #include "src/core/endpoint/endpoint.h" -#include "src/core/endpoint/resolve_address.h" -#include "src/core/endpoint/tcp_client.h" +#include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/tcp_client.h" #include "src/core/httpcli/format_request.h" #include "src/core/httpcli/httpcli_security_context.h" #include "src/core/httpcli/parser.h" @@ -54,7 +54,6 @@ typedef struct { grpc_resolved_addresses *addresses; size_t next_address; grpc_endpoint *ep; - grpc_em *em; char *host; gpr_timespec deadline; int have_read_byte; @@ -200,9 +199,8 @@ static void next_address(internal_request *req) { return; } addr = &req->addresses->addrs[req->next_address++]; - grpc_tcp_client_connect(on_connected, req, req->em, - (struct sockaddr *)&addr->addr, addr->len, - req->deadline); + grpc_tcp_client_connect(on_connected, req, (struct sockaddr *)&addr->addr, + addr->len, req->deadline); } static void on_resolved(void *arg, grpc_resolved_addresses *addresses) { @@ -217,7 +215,7 @@ static void on_resolved(void *arg, grpc_resolved_addresses *addresses) { } void grpc_httpcli_get(const grpc_httpcli_request *request, - gpr_timespec deadline, grpc_em *em, + gpr_timespec deadline, grpc_httpcli_response_cb on_response, void *user_data) { internal_request *req = gpr_malloc(sizeof(internal_request)); memset(req, 0, sizeof(*req)); @@ -225,7 +223,6 @@ void grpc_httpcli_get(const grpc_httpcli_request *request, grpc_httpcli_parser_init(&req->parser); req->on_response = on_response; req->user_data = user_data; - req->em = em; req->deadline = deadline; req->use_ssl = request->use_ssl; if (req->use_ssl) { @@ -238,7 +235,7 @@ void grpc_httpcli_get(const grpc_httpcli_request *request, void grpc_httpcli_post(const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, - gpr_timespec deadline, grpc_em *em, + gpr_timespec deadline, grpc_httpcli_response_cb on_response, void *user_data) { internal_request *req = gpr_malloc(sizeof(internal_request)); memset(req, 0, sizeof(*req)); @@ -247,7 +244,6 @@ void grpc_httpcli_post(const grpc_httpcli_request *request, grpc_httpcli_parser_init(&req->parser); req->on_response = on_response; req->user_data = user_data; - req->em = em; req->deadline = deadline; req->use_ssl = request->use_ssl; if (req->use_ssl) { diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h index aef0edfdd4..56eebe951e 100644 --- a/src/core/httpcli/httpcli.h +++ b/src/core/httpcli/httpcli.h @@ -36,7 +36,6 @@ #include <stddef.h> -#include "src/core/eventmanager/em.h" #include <grpc/support/time.h> /* User agent this library reports */ @@ -90,7 +89,7 @@ typedef void (*grpc_httpcli_response_cb)(void *user_data, 'on_response' is a callback to report results to (and 'user_data' is a user supplied pointer to pass to said call) */ void grpc_httpcli_get(const grpc_httpcli_request *request, - gpr_timespec deadline, grpc_em *em, + gpr_timespec deadline, grpc_httpcli_response_cb on_response, void *user_data); /* Asynchronously perform a HTTP POST. @@ -98,7 +97,7 @@ void grpc_httpcli_get(const grpc_httpcli_request *request, Does not support ?var1=val1&var2=val2 in the path. */ void grpc_httpcli_post(const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, - gpr_timespec deadline, grpc_em *em, + gpr_timespec deadline, grpc_httpcli_response_cb on_response, void *user_data); #endif /* __GRPC_INTERNAL_HTTPCLI_HTTPCLI_H__ */ diff --git a/src/core/iomgr/alarm.h b/src/core/iomgr/alarm.h new file mode 100644 index 0000000000..5bd00d9736 --- /dev/null +++ b/src/core/iomgr/alarm.h @@ -0,0 +1,85 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_ALARM_H__ +#define __GRPC_INTERNAL_IOMGR_ALARM_H__ + +#include "src/core/iomgr/iomgr.h" +#include <grpc/support/port_platform.h> +#include <grpc/support/time.h> + +typedef struct grpc_alarm grpc_alarm; + +/* One of the following headers should provide struct grpc_alarm */ +#ifdef GPR_LIBEVENT +#include "src/core/iomgr/iomgr_libevent.h" +#endif + +/* Initialize *alarm. When expired or canceled, alarm_cb will be called with + *alarm_cb_arg and status to indicate if it expired (SUCCESS) or was + canceled (CANCELLED). alarm_cb is guaranteed to be called exactly once, + and application code should check the status to determine how it was + invoked. The application callback is also responsible for maintaining + information about when to free up any user-level state. */ +void grpc_alarm_init(grpc_alarm *alarm, grpc_iomgr_cb_func alarm_cb, + void *alarm_cb_arg); + +/* Note that there is no alarm destroy function. This is because the + alarm is a one-time occurrence with a guarantee that the callback will + be called exactly once, either at expiration or cancellation. Thus, all + the internal alarm event management state is destroyed just before + that callback is invoked. If the user has additional state associated with + the alarm, the user is responsible for determining when it is safe to + destroy that state. */ + +/* Schedule *alarm to expire at deadline. If *alarm is + re-added before expiration, the *delay is simply reset to the new value. + Return GRPC_EM_OK on success, or GRPC_EM_ERROR on failure. + Upon failure, caller should abort further operations on *alarm */ +int grpc_alarm_add(grpc_alarm *alarm, gpr_timespec deadline); + +/* Cancel an *alarm. + There are three cases: + 1. We normally cancel the alarm + 2. The alarm has already run + 3. We can't cancel the alarm because it is "in flight". + + In all of these cases, the cancellation is still considered successful. + They are essentially distinguished in that the alarm_cb will be run + exactly once from either the cancellation (with status CANCELLED) + or from the activation (with status SUCCESS) + + Requires: cancel() must happen after add() on a given alarm */ +int grpc_alarm_cancel(grpc_alarm *alarm); + +#endif /* __GRPC_INTERNAL_IOMGR_ALARM_H__ */ diff --git a/src/core/surface/surface_em.h b/src/core/iomgr/endpoint_pair.h index 165f42f868..4a97ebf0f6 100644 --- a/src/core/surface/surface_em.h +++ b/src/core/iomgr/endpoint_pair.h @@ -31,17 +31,16 @@ * */ -#ifndef __GRPC_INTERNAL_SURFACE_SURFACE_EM_H__ -#define __GRPC_INTERNAL_SURFACE_SURFACE_EM_H__ +#ifndef __GRPC_INTERNAL_IOMGR_ENDPOINT_PAIR_H_ +#define __GRPC_INTERNAL_IOMGR_ENDPOINT_PAIR_H_ -#include "src/core/eventmanager/em.h" +#include "src/core/endpoint/endpoint.h" -/* Returns a global singleton event manager for - the surface apis, and is passed down to channels and - transports as needed. */ -grpc_em *grpc_surface_em(); +typedef struct { + grpc_endpoint *client; + grpc_endpoint *server; +} grpc_endpoint_pair; -void grpc_surface_em_init(); -void grpc_surface_em_shutdown(); +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size); -#endif /* __GRPC_INTERNAL_SURFACE_SURFACE_EM_H__ */ +#endif /* __GRPC_INTERNAL_IOMGR_ENDPOINT_PAIR_H_ */ diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c new file mode 100644 index 0000000000..f08d1344eb --- /dev/null +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -0,0 +1,61 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/endpoint_pair.h" + +#include <errno.h> +#include <fcntl.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> + +#include "src/core/iomgr/tcp_posix.h" +#include <grpc/support/log.h> + +static void create_sockets(int sv[2]) { + int flags; + GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); + flags = fcntl(sv[0], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); + flags = fcntl(sv[1], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); +} + +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size) { + int sv[2]; + grpc_endpoint_pair p; + create_sockets(sv); + p.client = grpc_tcp_create(grpc_fd_create(sv[1]), read_slice_size); + p.server = grpc_tcp_create(grpc_fd_create(sv[0]), read_slice_size); + return p; +} diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h new file mode 100644 index 0000000000..cf39f947bc --- /dev/null +++ b/src/core/iomgr/iomgr.h @@ -0,0 +1,56 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_H__ +#define __GRPC_INTERNAL_IOMGR_IOMGR_H__ + +/* Status passed to callbacks for grpc_em_fd_notify_on_read and + grpc_em_fd_notify_on_write. */ +typedef enum grpc_em_cb_status { + GRPC_CALLBACK_SUCCESS = 0, + GRPC_CALLBACK_TIMED_OUT, + GRPC_CALLBACK_CANCELLED, + GRPC_CALLBACK_DO_NOT_USE +} grpc_iomgr_cb_status; + +/* gRPC Callback definition */ +typedef void (*grpc_iomgr_cb_func)(void *arg, grpc_iomgr_cb_status status); + +void grpc_iomgr_init(); +void grpc_iomgr_shutdown(); + +/* This function is called from within a callback or from anywhere else + and causes the invocation of a callback at some point in the future */ +void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg); + +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_H__ */ diff --git a/src/core/surface/surface_em.c b/src/core/iomgr/iomgr_completion_queue_interface.h index e1785d1a44..3c4efe773a 100644 --- a/src/core/surface/surface_em.c +++ b/src/core/iomgr/iomgr_completion_queue_interface.h @@ -31,25 +31,15 @@ * */ -#include "src/core/surface/surface_em.h" -#include <grpc/support/log.h> +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ +#define __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ -static int initialized = 0; -static grpc_em em; +/* Internals of iomgr that are exposed only to be used for completion queue + implementation */ -grpc_em *grpc_surface_em() { - GPR_ASSERT(initialized && "call grpc_init()"); - return &em; -} +extern gpr_mu grpc_iomgr_mu; +extern gpr_cv grpc_iomgr_cv; -void grpc_surface_em_init() { - GPR_ASSERT(!initialized); - initialized = 1; - grpc_em_init(&em); -} +int grpc_iomgr_work(gpr_timespec deadline); -void grpc_surface_em_shutdown() { - GPR_ASSERT(initialized); - grpc_em_destroy(&em); - initialized = 0; -} +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ */ diff --git a/src/core/eventmanager/em.c b/src/core/iomgr/iomgr_libevent.c index 0dc6c6a6d0..1af03dcf12 100644 --- a/src/core/eventmanager/em.c +++ b/src/core/iomgr/iomgr_libevent.c @@ -31,71 +31,66 @@ * */ -#include "src/core/eventmanager/em.h" +#include "src/core/iomgr/iomgr_libevent.h" #include <unistd.h> #include <fcntl.h> +#include "src/core/iomgr/alarm.h" #include <grpc/support/atm.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> +#include <grpc/support/thd.h> #include <grpc/support/time.h> #include <event2/event.h> #include <event2/thread.h> -int evthread_use_threads(void); - -static void grpc_em_fd_impl_destroy(struct grpc_em_fd_impl *impl); - #define ALARM_TRIGGER_INIT ((gpr_atm)0) #define ALARM_TRIGGER_INCREMENT ((gpr_atm)1) #define DONE_SHUTDOWN ((void *)1) #define POLLER_ID_INVALID ((gpr_atm)-1) -typedef struct grpc_em_fd_impl { - grpc_em_task task; /* Base class, callbacks, queues, etc */ - int fd; /* File descriptor */ - - /* Note that the shutdown event is only needed as a workaround for libevent - not properly handling event_active on an in flight event. */ - struct event *shutdown_ev; /* activated to trigger shutdown */ - - /* protect shutdown_started|read_state|write_state and ensure barriers - between notify_on_[read|write] and read|write callbacks */ - gpr_mu mu; - int shutdown_started; /* 0 -> shutdown not started, 1 -> started */ - grpc_em_fd_state read_state; - grpc_em_fd_state write_state; +/* Global data */ +struct event_base *g_event_base; +gpr_mu grpc_iomgr_mu; +gpr_cv grpc_iomgr_cv; +static grpc_libevent_activation_data *g_activation_queue; +static int g_num_pollers; +static int g_num_fds; +static gpr_timespec g_last_poll_completed; +static int g_shutdown_backup_poller; +static gpr_event g_backup_poller_done; +/* activated to break out of the event loop early */ +static struct event *g_timeout_ev; +static grpc_fd *g_fds_to_free; - /* descriptor delete list. These are destroyed during polling. */ - struct grpc_em_fd_impl *next; -} grpc_em_fd_impl; - -/* ================== grpc_em implementation ===================== */ +int evthread_use_threads(void); +static void grpc_fd_impl_destroy(grpc_fd *impl); /* If anything is in the work queue, process one item and return 1. Return 0 if there were no work items to complete. - Requires em->mu locked, may unlock and relock during the call. */ -static int maybe_do_queue_work(grpc_em *em) { - grpc_em_activation_data *work = em->q; + Requires grpc_iomgr_mu locked, may unlock and relock during the call. */ +static int maybe_do_queue_work() { + grpc_libevent_activation_data *work = g_activation_queue; if (work == NULL) return 0; if (work->next == work) { - em->q = NULL; + g_activation_queue = NULL; } else { - em->q = work->next; - em->q->prev = work->prev; - em->q->next->prev = em->q->prev->next = em->q; + g_activation_queue = work->next; + g_activation_queue->prev = work->prev; + g_activation_queue->next->prev = g_activation_queue->prev->next = + g_activation_queue; } work->next = work->prev = NULL; - gpr_mu_unlock(&em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); work->cb(work->arg, work->status); - gpr_mu_lock(&em->mu); + gpr_mu_lock(&grpc_iomgr_mu); return 1; } @@ -104,58 +99,59 @@ static void timer_callback(int fd, short events, void *context) { event_base_loopbreak((struct event_base *)context); } -static void free_fd_list(grpc_em_fd_impl *impl) { +static void free_fd_list(grpc_fd *impl) { while (impl != NULL) { - grpc_em_fd_impl *current = impl; + grpc_fd *current = impl; impl = impl->next; - grpc_em_fd_impl_destroy(current); + grpc_fd_impl_destroy(current); gpr_free(current); } } +static void maybe_free_fds() { + if (g_fds_to_free) { + free_fd_list(g_fds_to_free); + g_fds_to_free = NULL; + } +} + /* Spend some time doing polling and libevent maintenance work if no other thread is. This includes both polling for events and destroying/closing file descriptor objects. Returns 1 if polling was performed, 0 otherwise. - Requires em->mu locked, may unlock and relock during the call. */ -static int maybe_do_polling_work(grpc_em *em, struct timeval delay) { + Requires grpc_iomgr_mu locked, may unlock and relock during the call. */ +static int maybe_do_polling_work(struct timeval delay) { int status; - if (em->num_pollers) return 0; + if (g_num_pollers) return 0; - em->num_pollers = 1; + g_num_pollers = 1; - free_fd_list(em->fds_to_free); - em->fds_to_free = NULL; + maybe_free_fds(); - gpr_mu_unlock(&em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); - event_add(em->timeout_ev, &delay); - status = event_base_loop(em->event_base, EVLOOP_ONCE); + event_add(g_timeout_ev, &delay); + status = event_base_loop(g_event_base, EVLOOP_ONCE); if (status < 0) { gpr_log(GPR_ERROR, "event polling loop stops with error status %d", status); } - event_del(em->timeout_ev); + event_del(g_timeout_ev); - gpr_mu_lock(&em->mu); - if (em->fds_to_free) { - free_fd_list(em->fds_to_free); - em->fds_to_free = NULL; - } + gpr_mu_lock(&grpc_iomgr_mu); + maybe_free_fds(); - em->num_pollers = 0; - gpr_cv_broadcast(&em->cv); + g_num_pollers = 0; + gpr_cv_broadcast(&grpc_iomgr_cv); return 1; } -int grpc_em_work(grpc_em *em, gpr_timespec deadline) { +int grpc_iomgr_work(gpr_timespec deadline) { gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); /* poll for no longer than one second */ gpr_timespec max_delay = {1, 0}; struct timeval delay; - GPR_ASSERT(em); - if (gpr_time_cmp(delay_timespec, gpr_time_0) <= 0) { return 0; } @@ -166,8 +162,8 @@ int grpc_em_work(grpc_em *em, gpr_timespec deadline) { delay = gpr_timeval_from_timespec(delay_timespec); - if (maybe_do_queue_work(em) || maybe_do_polling_work(em, delay)) { - em->last_poll_completed = gpr_now(); + if (maybe_do_queue_work() || maybe_do_polling_work(delay)) { + g_last_poll_completed = gpr_now(); return 1; } @@ -175,158 +171,154 @@ int grpc_em_work(grpc_em *em, gpr_timespec deadline) { } static void backup_poller_thread(void *p) { - grpc_em *em = p; int backup_poller_engaged = 0; /* allow no pollers for 100 milliseconds, then engage backup polling */ gpr_timespec allow_no_pollers = gpr_time_from_micros(100 * 1000); - gpr_mu_lock(&em->mu); - while (!em->shutdown_backup_poller) { - if (em->num_pollers == 0) { + gpr_mu_lock(&grpc_iomgr_mu); + while (!g_shutdown_backup_poller) { + if (g_num_pollers == 0) { gpr_timespec now = gpr_now(); gpr_timespec time_until_engage = gpr_time_sub( - allow_no_pollers, gpr_time_sub(now, em->last_poll_completed)); + allow_no_pollers, gpr_time_sub(now, g_last_poll_completed)); if (gpr_time_cmp(time_until_engage, gpr_time_0) <= 0) { if (!backup_poller_engaged) { gpr_log(GPR_DEBUG, "No pollers for a while - engaging backup poller"); backup_poller_engaged = 1; } - if (!maybe_do_queue_work(em)) { + if (!maybe_do_queue_work()) { struct timeval tv = {1, 0}; - maybe_do_polling_work(em, tv); + maybe_do_polling_work(tv); } } else { if (backup_poller_engaged) { gpr_log(GPR_DEBUG, "Backup poller disengaged"); backup_poller_engaged = 0; } - gpr_mu_unlock(&em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); gpr_sleep_until(gpr_time_add(now, time_until_engage)); - gpr_mu_lock(&em->mu); + gpr_mu_lock(&grpc_iomgr_mu); } } else { if (backup_poller_engaged) { gpr_log(GPR_DEBUG, "Backup poller disengaged"); backup_poller_engaged = 0; } - gpr_cv_wait(&em->cv, &em->mu, gpr_inf_future); + gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, gpr_inf_future); } } - gpr_mu_unlock(&em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); - gpr_event_set(&em->backup_poller_done, (void *)1); + gpr_event_set(&g_backup_poller_done, (void *)1); } -grpc_em_error grpc_em_init(grpc_em *em) { +void grpc_iomgr_init() { gpr_thd_id backup_poller_id; if (evthread_use_threads() != 0) { gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!"); - return GRPC_EM_ERROR; + abort(); } - gpr_mu_init(&em->mu); - gpr_cv_init(&em->cv); - em->q = NULL; - em->num_pollers = 0; - em->num_fds = 0; - em->last_poll_completed = gpr_now(); - em->shutdown_backup_poller = 0; - em->fds_to_free = NULL; + gpr_mu_init(&grpc_iomgr_mu); + gpr_cv_init(&grpc_iomgr_cv); + g_activation_queue = NULL; + g_num_pollers = 0; + g_num_fds = 0; + g_last_poll_completed = gpr_now(); + g_shutdown_backup_poller = 0; + g_fds_to_free = NULL; - gpr_event_init(&em->backup_poller_done); + gpr_event_init(&g_backup_poller_done); - em->event_base = NULL; - em->timeout_ev = NULL; + g_event_base = NULL; + g_timeout_ev = NULL; - em->event_base = event_base_new(); - if (!em->event_base) { + g_event_base = event_base_new(); + if (!g_event_base) { gpr_log(GPR_ERROR, "Failed to create the event base"); - return GRPC_EM_ERROR; + abort(); } - if (evthread_make_base_notifiable(em->event_base) != 0) { + if (evthread_make_base_notifiable(g_event_base) != 0) { gpr_log(GPR_ERROR, "Couldn't make event base notifiable cross threads!"); - return GRPC_EM_ERROR; + abort(); } - em->timeout_ev = evtimer_new(em->event_base, timer_callback, em->event_base); + g_timeout_ev = evtimer_new(g_event_base, timer_callback, g_event_base); - gpr_thd_new(&backup_poller_id, backup_poller_thread, em, NULL); - - return GRPC_EM_OK; + gpr_thd_new(&backup_poller_id, backup_poller_thread, NULL, NULL); } -grpc_em_error grpc_em_destroy(grpc_em *em) { +void grpc_iomgr_shutdown() { gpr_timespec fd_shutdown_deadline = - gpr_time_add(gpr_now(), gpr_time_from_micros(10 * 1000 * 1000)); + gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); /* broadcast shutdown */ - gpr_mu_lock(&em->mu); - while (em->num_fds) { + gpr_mu_lock(&grpc_iomgr_mu); + while (g_num_fds) { gpr_log(GPR_INFO, "waiting for %d fds to be destroyed before closing event manager", - em->num_fds); - if (gpr_cv_wait(&em->cv, &em->mu, fd_shutdown_deadline)) { + g_num_fds); + if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, fd_shutdown_deadline)) { gpr_log(GPR_ERROR, "not all fds destroyed before shutdown deadline: memory leaks " "are likely"); break; - } else if (em->num_fds == 0) { + } else if (g_num_fds == 0) { gpr_log(GPR_INFO, "all fds closed"); } } - em->shutdown_backup_poller = 1; - gpr_cv_broadcast(&em->cv); - gpr_mu_unlock(&em->mu); + g_shutdown_backup_poller = 1; + gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_mu_unlock(&grpc_iomgr_mu); - gpr_event_wait(&em->backup_poller_done, gpr_inf_future); + gpr_event_wait(&g_backup_poller_done, gpr_inf_future); /* drain pending work */ - gpr_mu_lock(&em->mu); - while (maybe_do_queue_work(em)) + gpr_mu_lock(&grpc_iomgr_mu); + while (maybe_do_queue_work()) ; - gpr_mu_unlock(&em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); - free_fd_list(em->fds_to_free); + free_fd_list(g_fds_to_free); /* complete shutdown */ - gpr_mu_destroy(&em->mu); - gpr_cv_destroy(&em->cv); + gpr_mu_destroy(&grpc_iomgr_mu); + gpr_cv_destroy(&grpc_iomgr_cv); - if (em->timeout_ev != NULL) { - event_free(em->timeout_ev); + if (g_timeout_ev != NULL) { + event_free(g_timeout_ev); } - if (em->event_base != NULL) { - event_base_free(em->event_base); - em->event_base = NULL; + if (g_event_base != NULL) { + event_base_free(g_event_base); + g_event_base = NULL; } - - return GRPC_EM_OK; } -static void add_task(grpc_em *em, grpc_em_activation_data *adata) { - gpr_mu_lock(&em->mu); - if (em->q) { - adata->next = em->q; +static void add_task(grpc_libevent_activation_data *adata) { + gpr_mu_lock(&grpc_iomgr_mu); + if (g_activation_queue) { + adata->next = g_activation_queue; adata->prev = adata->next->prev; adata->next->prev = adata->prev->next = adata; } else { - em->q = adata; + g_activation_queue = adata; adata->next = adata->prev = adata; } - gpr_cv_broadcast(&em->cv); - gpr_mu_unlock(&em->mu); + gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_mu_unlock(&grpc_iomgr_mu); } -/* ===============grpc_em_alarm implementation==================== */ +/* ===============grpc_alarm implementation==================== */ /* The following function frees up the alarm's libevent structure and should always be invoked just before calling the alarm's callback */ -static void alarm_ev_destroy(grpc_em_alarm *alarm) { - grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY]; +static void alarm_ev_destroy(grpc_alarm *alarm) { + grpc_libevent_activation_data *adata = + &alarm->task.activation[GRPC_EM_TA_ONLY]; if (adata->ev != NULL) { /* TODO(klempner): Is this safe to do when we're cancelling? */ event_free(adata->ev); @@ -335,8 +327,9 @@ static void alarm_ev_destroy(grpc_em_alarm *alarm) { } /* Proxy callback triggered by alarm->ev to call alarm->cb */ static void libevent_alarm_cb(int fd, short what, void *arg /*=alarm*/) { - grpc_em_alarm *alarm = arg; - grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY]; + grpc_alarm *alarm = arg; + grpc_libevent_activation_data *adata = + &alarm->task.activation[GRPC_EM_TA_ONLY]; int trigger_old; /* First check if this alarm has been canceled, atomically */ @@ -346,26 +339,26 @@ static void libevent_alarm_cb(int fd, short what, void *arg /*=alarm*/) { /* Before invoking user callback, destroy the libevent structure */ alarm_ev_destroy(alarm); adata->status = GRPC_CALLBACK_SUCCESS; - add_task(alarm->task.em, adata); + add_task(adata); } } -grpc_em_error grpc_em_alarm_init(grpc_em_alarm *alarm, grpc_em *em, - grpc_em_cb_func alarm_cb, void *alarm_cb_arg) { - grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY]; +void grpc_alarm_init(grpc_alarm *alarm, grpc_iomgr_cb_func alarm_cb, + void *alarm_cb_arg) { + grpc_libevent_activation_data *adata = + &alarm->task.activation[GRPC_EM_TA_ONLY]; alarm->task.type = GRPC_EM_TASK_ALARM; - alarm->task.em = em; gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT); adata->cb = alarm_cb; adata->arg = alarm_cb_arg; adata->prev = NULL; adata->next = NULL; adata->ev = NULL; - return GRPC_EM_OK; } -grpc_em_error grpc_em_alarm_add(grpc_em_alarm *alarm, gpr_timespec deadline) { - grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY]; +int grpc_alarm_add(grpc_alarm *alarm, gpr_timespec deadline) { + grpc_libevent_activation_data *adata = + &alarm->task.activation[GRPC_EM_TA_ONLY]; gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); struct timeval delay = gpr_timeval_from_timespec(delay_timespec); if (adata->ev) { @@ -373,20 +366,17 @@ grpc_em_error grpc_em_alarm_add(grpc_em_alarm *alarm, gpr_timespec deadline) { gpr_log(GPR_INFO, "Adding an alarm that already has an event."); adata->ev = NULL; } - adata->ev = evtimer_new(alarm->task.em->event_base, libevent_alarm_cb, alarm); + adata->ev = evtimer_new(g_event_base, libevent_alarm_cb, alarm); /* Set the trigger field to untriggered. Do this as the last store since it is a release of previous stores. */ gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT); - if (adata->ev != NULL && evtimer_add(adata->ev, &delay) == 0) { - return GRPC_EM_OK; - } else { - return GRPC_EM_ERROR; - } + return adata->ev != NULL && evtimer_add(adata->ev, &delay) == 0; } -grpc_em_error grpc_em_alarm_cancel(grpc_em_alarm *alarm) { - grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY]; +int grpc_alarm_cancel(grpc_alarm *alarm) { + grpc_libevent_activation_data *adata = + &alarm->task.activation[GRPC_EM_TA_ONLY]; int trigger_old; /* First check if this alarm has been triggered, atomically */ @@ -400,25 +390,44 @@ grpc_em_error grpc_em_alarm_cancel(grpc_em_alarm *alarm) { if (evtimer_del(adata->ev) != 0) { /* The delete was unsuccessful for some reason. */ gpr_log(GPR_ERROR, "Attempt to delete alarm event was unsuccessful"); - return GRPC_EM_ERROR; + return 0; } /* Free up the event structure before invoking callback */ alarm_ev_destroy(alarm); adata->status = GRPC_CALLBACK_CANCELLED; - add_task(alarm->task.em, adata); + add_task(adata); } - return GRPC_EM_OK; + return 1; } -/* ==================== grpc_em_fd implementation =================== */ +static void grpc_fd_impl_destroy(grpc_fd *impl) { + grpc_em_task_activity_type type; + grpc_libevent_activation_data *adata; + + for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) { + adata = &(impl->task.activation[type]); + GPR_ASSERT(adata->next == NULL); + if (adata->ev != NULL) { + event_free(adata->ev); + adata->ev = NULL; + } + } + + if (impl->shutdown_ev != NULL) { + event_free(impl->shutdown_ev); + impl->shutdown_ev = NULL; + } + gpr_mu_destroy(&impl->mu); + close(impl->fd); +} /* Proxy callback to call a gRPC read/write callback */ -static void em_fd_cb(int fd, short what, void *arg /*=em_fd_impl*/) { - grpc_em_fd_impl *em_fd = arg; - grpc_em_cb_status status = GRPC_CALLBACK_SUCCESS; +static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) { + grpc_fd *em_fd = arg; + grpc_iomgr_cb_status status = GRPC_CALLBACK_SUCCESS; int run_read_cb = 0; int run_write_cb = 0; - grpc_em_activation_data *rdata, *wdata; + grpc_libevent_activation_data *rdata, *wdata; gpr_mu_lock(&em_fd->mu); if (em_fd->shutdown_started) { @@ -433,35 +442,35 @@ static void em_fd_cb(int fd, short what, void *arg /*=em_fd_impl*/) { if (what & EV_READ) { switch (em_fd->read_state) { - case GRPC_EM_FD_WAITING: + case GRPC_FD_WAITING: run_read_cb = 1; - em_fd->read_state = GRPC_EM_FD_IDLE; + em_fd->read_state = GRPC_FD_IDLE; break; - case GRPC_EM_FD_IDLE: - case GRPC_EM_FD_CACHED: - em_fd->read_state = GRPC_EM_FD_CACHED; + case GRPC_FD_IDLE: + case GRPC_FD_CACHED: + em_fd->read_state = GRPC_FD_CACHED; } } if (what & EV_WRITE) { switch (em_fd->write_state) { - case GRPC_EM_FD_WAITING: + case GRPC_FD_WAITING: run_write_cb = 1; - em_fd->write_state = GRPC_EM_FD_IDLE; + em_fd->write_state = GRPC_FD_IDLE; break; - case GRPC_EM_FD_IDLE: - case GRPC_EM_FD_CACHED: - em_fd->write_state = GRPC_EM_FD_CACHED; + case GRPC_FD_IDLE: + case GRPC_FD_CACHED: + em_fd->write_state = GRPC_FD_CACHED; } } if (run_read_cb) { rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]); rdata->status = status; - add_task(em_fd->task.em, rdata); + add_task(rdata); } else if (run_write_cb) { wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]); wdata->status = status; - add_task(em_fd->task.em, wdata); + add_task(wdata); } gpr_mu_unlock(&em_fd->mu); } @@ -471,41 +480,34 @@ static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) { that libevent's handling of event_active() on an event which is already in flight on a different thread is racy and easily triggers TSAN. */ - grpc_em_fd_impl *impl = arg; + grpc_fd *impl = arg; gpr_mu_lock(&impl->mu); impl->shutdown_started = 1; - if (impl->read_state == GRPC_EM_FD_WAITING) { + if (impl->read_state == GRPC_FD_WAITING) { event_active(impl->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1); } - if (impl->write_state == GRPC_EM_FD_WAITING) { + if (impl->write_state == GRPC_FD_WAITING) { event_active(impl->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1); } gpr_mu_unlock(&impl->mu); } -grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) { +grpc_fd *grpc_fd_create(int fd) { int flags; - grpc_em_activation_data *rdata, *wdata; - grpc_em_fd_impl *impl = gpr_malloc(sizeof(grpc_em_fd_impl)); + grpc_libevent_activation_data *rdata, *wdata; + grpc_fd *impl = gpr_malloc(sizeof(grpc_fd)); - gpr_mu_lock(&em->mu); - em->num_fds++; - - gpr_mu_unlock(&em->mu); - - em_fd->impl = impl; + gpr_mu_lock(&grpc_iomgr_mu); + g_num_fds++; + gpr_mu_unlock(&grpc_iomgr_mu); impl->shutdown_ev = NULL; gpr_mu_init(&impl->mu); flags = fcntl(fd, F_GETFL, 0); - if ((flags & O_NONBLOCK) == 0) { - gpr_log(GPR_ERROR, "File descriptor %d is blocking", fd); - return GRPC_EM_INVALID_ARGUMENTS; - } + GPR_ASSERT((flags & O_NONBLOCK) != 0); impl->task.type = GRPC_EM_TASK_FD; - impl->task.em = em; impl->fd = fd; rdata = &(impl->task.activation[GRPC_EM_TA_READ]); @@ -524,100 +526,57 @@ grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) { wdata->prev = NULL; wdata->next = NULL; - impl->read_state = GRPC_EM_FD_IDLE; - impl->write_state = GRPC_EM_FD_IDLE; + impl->read_state = GRPC_FD_IDLE; + impl->write_state = GRPC_FD_IDLE; impl->shutdown_started = 0; impl->next = NULL; /* TODO(chenw): detect platforms where only level trigger is supported, and set the event to non-persist. */ - rdata->ev = event_new(em->event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ, + rdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ, em_fd_cb, impl); - if (!rdata->ev) { - gpr_log(GPR_ERROR, "Failed to create read event"); - return GRPC_EM_ERROR; - } + GPR_ASSERT(rdata->ev); - wdata->ev = event_new(em->event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE, + wdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE, em_fd_cb, impl); - if (!wdata->ev) { - gpr_log(GPR_ERROR, "Failed to create write event"); - return GRPC_EM_ERROR; - } + GPR_ASSERT(wdata->ev); impl->shutdown_ev = - event_new(em->event_base, -1, EV_READ, em_fd_shutdown_cb, impl); - - if (!impl->shutdown_ev) { - gpr_log(GPR_ERROR, "Failed to create shutdown event"); - return GRPC_EM_ERROR; - } - - return GRPC_EM_OK; -} - -static void grpc_em_fd_impl_destroy(grpc_em_fd_impl *impl) { - grpc_em_task_activity_type type; - grpc_em_activation_data *adata; - - for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) { - adata = &(impl->task.activation[type]); - GPR_ASSERT(adata->next == NULL); - if (adata->ev != NULL) { - event_free(adata->ev); - adata->ev = NULL; - } - } + event_new(g_event_base, -1, EV_READ, em_fd_shutdown_cb, impl); + GPR_ASSERT(impl->shutdown_ev); - if (impl->shutdown_ev != NULL) { - event_free(impl->shutdown_ev); - impl->shutdown_ev = NULL; - } - gpr_mu_destroy(&impl->mu); - close(impl->fd); + return impl; } -void grpc_em_fd_destroy(grpc_em_fd *em_fd) { - grpc_em_fd_impl *impl = em_fd->impl; - grpc_em *em = impl->task.em; +void grpc_fd_destroy(grpc_fd *impl) { + gpr_mu_lock(&grpc_iomgr_mu); - gpr_mu_lock(&em->mu); - - if (em->num_pollers == 0) { + if (g_num_pollers == 0) { /* it is safe to simply free it */ - grpc_em_fd_impl_destroy(impl); + grpc_fd_impl_destroy(impl); gpr_free(impl); } else { /* Put the impl on the list to be destroyed by the poller. */ - impl->next = em->fds_to_free; - em->fds_to_free = impl; - /* Kick the poller so it closes the fd promptly. - * TODO(klempner): maybe this should be a different event. - */ - event_active(em_fd->impl->shutdown_ev, EV_READ, 1); + impl->next = g_fds_to_free; + g_fds_to_free = impl; + /* TODO(ctiller): kick the poller so it destroys this fd promptly + (currently we may wait up to a second) */ } - em->num_fds--; - gpr_cv_broadcast(&em->cv); - gpr_mu_unlock(&em->mu); + g_num_fds--; + gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_mu_unlock(&grpc_iomgr_mu); } -int grpc_em_fd_get(struct grpc_em_fd *em_fd) { return em_fd->impl->fd; } - -/* Returns the event manager associated with *em_fd. */ -grpc_em *grpc_em_fd_get_em(grpc_em_fd *em_fd) { return em_fd->impl->task.em; } +int grpc_fd_get(struct grpc_fd *em_fd) { return em_fd->fd; } /* TODO(chenw): should we enforce the contract that notify_on_read cannot be called when the previously registered callback has not been called yet. */ -grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd, - grpc_em_cb_func read_cb, - void *read_cb_arg, - gpr_timespec deadline) { - grpc_em_fd_impl *impl = em_fd->impl; +int grpc_fd_notify_on_read(grpc_fd *impl, grpc_iomgr_cb_func read_cb, + void *read_cb_arg, gpr_timespec deadline) { int force_event = 0; - grpc_em_activation_data *rdata; - grpc_em_error result = GRPC_EM_OK; + grpc_libevent_activation_data *rdata; gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); struct timeval delay = gpr_timeval_from_timespec(delay_timespec); struct timeval *delayp = @@ -629,27 +588,23 @@ grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd, rdata->cb = read_cb; rdata->arg = read_cb_arg; - force_event = - (impl->shutdown_started || impl->read_state == GRPC_EM_FD_CACHED); - impl->read_state = GRPC_EM_FD_WAITING; + force_event = (impl->shutdown_started || impl->read_state == GRPC_FD_CACHED); + impl->read_state = GRPC_FD_WAITING; if (force_event) { event_active(rdata->ev, EV_READ, 1); } else if (event_add(rdata->ev, delayp) == -1) { - result = GRPC_EM_ERROR; + gpr_mu_unlock(&impl->mu); + return 0; } gpr_mu_unlock(&impl->mu); - return result; + return 1; } -grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *em_fd, - grpc_em_cb_func write_cb, - void *write_cb_arg, - gpr_timespec deadline) { - grpc_em_fd_impl *impl = em_fd->impl; +int grpc_fd_notify_on_write(grpc_fd *impl, grpc_iomgr_cb_func write_cb, + void *write_cb_arg, gpr_timespec deadline) { int force_event = 0; - grpc_em_activation_data *wdata; - grpc_em_error result = GRPC_EM_OK; + grpc_libevent_activation_data *wdata; gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); struct timeval delay = gpr_timeval_from_timespec(delay_timespec); struct timeval *delayp = @@ -661,25 +616,23 @@ grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *em_fd, wdata->cb = write_cb; wdata->arg = write_cb_arg; - force_event = - (impl->shutdown_started || impl->write_state == GRPC_EM_FD_CACHED); - impl->write_state = GRPC_EM_FD_WAITING; + force_event = (impl->shutdown_started || impl->write_state == GRPC_FD_CACHED); + impl->write_state = GRPC_FD_WAITING; if (force_event) { event_active(wdata->ev, EV_WRITE, 1); } else if (event_add(wdata->ev, delayp) == -1) { - result = GRPC_EM_ERROR; + gpr_mu_unlock(&impl->mu); + return 0; } gpr_mu_unlock(&impl->mu); - return result; + return 1; } -void grpc_em_fd_shutdown(grpc_em_fd *em_fd) { - event_active(em_fd->impl->shutdown_ev, EV_READ, 1); +void grpc_fd_shutdown(grpc_fd *em_fd) { + event_active(em_fd->shutdown_ev, EV_READ, 1); } -/*====================== Other callback functions ======================*/ - /* Sometimes we want a followup callback: something to be added from the current callback for the EM to invoke once this callback is complete. This is implemented by inserting an entry into an EM queue. */ @@ -689,27 +642,23 @@ void grpc_em_fd_shutdown(grpc_em_fd *em_fd) { the function to use for the followup callback, and the activation data pointer used for the queues (to free in the CB) */ struct followup_callback_arg { - grpc_em_cb_func func; + grpc_iomgr_cb_func func; void *cb_arg; - grpc_em_activation_data adata; + grpc_libevent_activation_data adata; }; -static void followup_proxy_callback(void *cb_arg, grpc_em_cb_status status) { +static void followup_proxy_callback(void *cb_arg, grpc_iomgr_cb_status status) { struct followup_callback_arg *fcb_arg = cb_arg; /* Invoke the function */ fcb_arg->func(fcb_arg->cb_arg, status); gpr_free(fcb_arg); } -grpc_em_error grpc_em_add_callback(grpc_em *em, grpc_em_cb_func cb, - void *cb_arg) { - grpc_em_activation_data *adptr; +void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) { + grpc_libevent_activation_data *adptr; struct followup_callback_arg *fcb_arg; fcb_arg = gpr_malloc(sizeof(*fcb_arg)); - if (fcb_arg == NULL) { - return GRPC_EM_ERROR; - } /* Set up the activation data and followup callback argument structures */ adptr = &fcb_arg->adata; adptr->ev = NULL; @@ -723,6 +672,5 @@ grpc_em_error grpc_em_add_callback(grpc_em *em, grpc_em_cb_func cb, fcb_arg->cb_arg = cb_arg; /* Insert an activation data for the specified em */ - add_task(em, adptr); - return GRPC_EM_OK; + add_task(adptr); } diff --git a/src/core/iomgr/iomgr_libevent.h b/src/core/iomgr/iomgr_libevent.h new file mode 100644 index 0000000000..77e7b59989 --- /dev/null +++ b/src/core/iomgr/iomgr_libevent.h @@ -0,0 +1,207 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ +#define __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ + +#include "src/core/iomgr/iomgr.h" +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +typedef struct grpc_fd grpc_fd; + +/* gRPC event manager task "base class". This is pretend-inheritance in C89. + This should be the first member of any actual grpc_em task type. + + Memory warning: expanding this will increase memory usage in any derived + class, so be careful. + + For generality, this base can be on multiple task queues and can have + multiple event callbacks registered. Not all "derived classes" will use + this feature. */ + +typedef enum grpc_libevent_task_type { + GRPC_EM_TASK_ALARM, + GRPC_EM_TASK_FD, + GRPC_EM_TASK_DO_NOT_USE +} grpc_libevent_task_type; + +/* Different activity types to shape the callback and queueing arrays */ +typedef enum grpc_em_task_activity_type { + GRPC_EM_TA_READ, /* use this also for single-type events */ + GRPC_EM_TA_WRITE, + GRPC_EM_TA_COUNT +} grpc_em_task_activity_type; + +/* Include the following #define for convenience for tasks like alarms that + only have a single type */ +#define GRPC_EM_TA_ONLY GRPC_EM_TA_READ + +typedef struct grpc_libevent_activation_data { + struct event *ev; /* event activated on this callback type */ + grpc_iomgr_cb_func cb; /* function pointer for callback */ + void *arg; /* argument passed to cb */ + + /* Hold the status associated with the callback when queued */ + grpc_iomgr_cb_status status; + /* Now set up to link activations into scheduler queues */ + struct grpc_libevent_activation_data *prev; + struct grpc_libevent_activation_data *next; +} grpc_libevent_activation_data; + +typedef struct grpc_libevent_task { + grpc_libevent_task_type type; + + /* Now have an array of activation data elements: one for each activity + type that could get activated */ + grpc_libevent_activation_data activation[GRPC_EM_TA_COUNT]; +} grpc_libevent_task; + +/* Initialize *em_fd. + Requires fd is a non-blocking file descriptor. + + This takes ownership of closing fd. + + Requires: *em_fd uninitialized. fd is a non-blocking file descriptor. */ +grpc_fd *grpc_fd_create(int fd); + +/* Cause *em_fd no longer to be initialized and closes the underlying fd. + Requires: *em_fd initialized; no outstanding notify_on_read or + notify_on_write. */ +void grpc_fd_destroy(grpc_fd *em_fd); + +/* Returns the file descriptor associated with *em_fd. */ +int grpc_fd_get(grpc_fd *em_fd); + +/* Register read interest, causing read_cb to be called once when em_fd becomes + readable, on deadline specified by deadline, or on shutdown triggered by + grpc_fd_shutdown. + read_cb will be called with read_cb_arg when *em_fd becomes readable. + read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable, + GRPC_CALLBACK_TIMED_OUT if the call timed out, + and CANCELLED if the call was cancelled. + + Requires:This method must not be called before the read_cb for any previous + call runs. Edge triggered events are used whenever they are supported by the + underlying platform. This means that users must drain em_fd in read_cb before + calling notify_on_read again. Users are also expected to handle spurious + events, i.e read_cb is called while nothing can be readable from em_fd */ +int grpc_fd_notify_on_read(grpc_fd *em_fd, grpc_iomgr_cb_func read_cb, + void *read_cb_arg, gpr_timespec deadline); + +/* Exactly the same semantics as above, except based on writable events. */ +int grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, + void *write_cb_arg, gpr_timespec deadline); + +/* Cause any current and all future read/write callbacks to error out with + GRPC_CALLBACK_CANCELLED. */ +void grpc_fd_shutdown(grpc_fd *em_fd); + +/* =================== Event caching =================== + In order to not miss or double-return edges in the context of edge triggering + and multithreading, we need a per-fd caching layer in the eventmanager itself + to cache relevant events. + + There are two types of events we care about: calls to notify_on_[read|write] + and readable/writable events for the socket from eventfd. There are separate + event caches for read and write. + + There are three states: + 0. "waiting" -- There's been a call to notify_on_[read|write] which has not + had a corresponding event. In other words, we're waiting for an event so we + can run the callback. + 1. "idle" -- We are neither waiting nor have a cached event. + 2. "cached" -- There has been a read/write event without a waiting callback, + so we want to run the event next time the application calls + notify_on_[read|write]. + + The high level state diagram: + + +--------------------------------------------------------------------+ + | WAITING | IDLE | CACHED | + | | | | + | 1. --*-> 2. --+-> 3. --+\ + | | | <--+/ + | | | | + x+-- 6. 5. <-+-- 4. <-*-- | + | | | | + +--------------------------------------------------------------------+ + + Transitions right occur on read|write events. Transitions left occur on + notify_on_[read|write] events. + State transitions: + 1. Read|Write event while waiting -> run the callback and transition to idle. + 2. Read|Write event while idle -> transition to cached. + 3. Read|Write event with one already cached -> still cached. + 4. notify_on_[read|write] with event cached: run callback and transition to + idle. + 5. notify_on_[read|write] when idle: Store callback and transition to + waiting. + 6. notify_on_[read|write] when waiting: invalid. */ + +typedef enum grpc_fd_state { + GRPC_FD_WAITING = 0, + GRPC_FD_IDLE = 1, + GRPC_FD_CACHED = 2 +} grpc_fd_state; + +/* gRPC file descriptor handle. + The handle is used to register read/write callbacks to a file descriptor */ +struct grpc_fd { + grpc_libevent_task task; /* Base class, callbacks, queues, etc */ + int fd; /* File descriptor */ + + /* Note that the shutdown event is only needed as a workaround for libevent + not properly handling event_active on an in flight event. */ + struct event *shutdown_ev; /* activated to trigger shutdown */ + + /* protect shutdown_started|read_state|write_state and ensure barriers + between notify_on_[read|write] and read|write callbacks */ + gpr_mu mu; + int shutdown_started; /* 0 -> shutdown not started, 1 -> started */ + grpc_fd_state read_state; + grpc_fd_state write_state; + + /* descriptor delete list. These are destroyed during polling. */ + struct grpc_fd *next; +}; + +/* gRPC alarm handle. + The handle is used to add an alarm which expires after specified timeout. */ +struct grpc_alarm { + grpc_libevent_task task; /* Include the base class */ + + gpr_atm triggered; /* To be used atomically if alarm triggered */ +}; + +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */ diff --git a/src/core/eventmanager/em_posix.c b/src/core/iomgr/iomgr_libevent_use_threads.c index af449342f0..af449342f0 100644 --- a/src/core/eventmanager/em_posix.c +++ b/src/core/iomgr/iomgr_libevent_use_threads.c diff --git a/src/core/endpoint/resolve_address.h b/src/core/iomgr/resolve_address.h index cc32c47cef..37ec0f0335 100644 --- a/src/core/endpoint/resolve_address.h +++ b/src/core/iomgr/resolve_address.h @@ -31,8 +31,8 @@ * */ -#ifndef __GRPC_INTERNAL_ENDPOINT_RESOLVE_ADDRESS_H__ -#define __GRPC_INTERNAL_ENDPOINT_RESOLVE_ADDRESS_H__ +#ifndef __GRPC_INTERNAL_IOMGR_RESOLVE_ADDRESS_H__ +#define __GRPC_INTERNAL_IOMGR_RESOLVE_ADDRESS_H__ #include <sys/socket.h> @@ -64,4 +64,4 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses); grpc_resolved_addresses *grpc_blocking_resolve_address( const char *addr, const char *default_port); -#endif /* __GRPC_INTERNAL_ENDPOINT_RESOLVE_ADDRESS_H__ */ +#endif /* __GRPC_INTERNAL_IOMGR_RESOLVE_ADDRESS_H__ */ diff --git a/src/core/endpoint/resolve_address.c b/src/core/iomgr/resolve_address_posix.c index 1993b9bdc5..d3ea3780ce 100644 --- a/src/core/endpoint/resolve_address.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -33,7 +33,7 @@ #define _POSIX_SOURCE -#include "src/core/endpoint/resolve_address.h" +#include "src/core/iomgr/resolve_address.h" #include <sys/types.h> #include <sys/socket.h> @@ -41,10 +41,11 @@ #include <unistd.h> #include <string.h> -#include "src/core/endpoint/socket_utils.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/iomgr/socket_utils_posix.h" #include <grpc/support/alloc.h> -#include <grpc/support/string.h> #include <grpc/support/log.h> +#include <grpc/support/string.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> diff --git a/src/core/iomgr/sockaddr.h b/src/core/iomgr/sockaddr.h new file mode 100644 index 0000000000..b980b3029f --- /dev/null +++ b/src/core/iomgr/sockaddr.h @@ -0,0 +1,47 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_SOCKADDR_H_ +#define __GRPC_INTERNAL_IOMGR_SOCKADDR_H_ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_WIN32 +#include "src/core/iomgr/sockaddr_win32.h" +#endif + +#ifdef GPR_POSIX_SOCKETADDR +#include "src/core/iomgr/sockaddr_posix.h" +#endif + +#endif /* __GRPC_INTERNAL_IOMGR_SOCKADDR_H_ */ diff --git a/src/core/iomgr/sockaddr_posix.h b/src/core/iomgr/sockaddr_posix.h new file mode 100644 index 0000000000..79ef3ca3cf --- /dev/null +++ b/src/core/iomgr/sockaddr_posix.h @@ -0,0 +1,40 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_SOCKADDR_POSIX_H_ +#define __GRPC_INTERNAL_IOMGR_SOCKADDR_POSIX_H_ + +#include <sys/socket.h> +#include <netinet/in.h> + +#endif /* __GRPC_INTERNAL_IOMGR_SOCKADDR_POSIX_H_ */ diff --git a/src/core/endpoint/socket_utils.c b/src/core/iomgr/sockaddr_utils.c index ef160d7ea4..f709d35162 100644 --- a/src/core/endpoint/socket_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -31,127 +31,17 @@ * */ -#include "src/core/endpoint/socket_utils.h" +#include "src/core/iomgr/sockaddr_utils.h" #include <arpa/inet.h> -#include <limits.h> -#include <fcntl.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <stdio.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <unistd.h> -#include <string.h> #include <errno.h> +#include <string.h> #include <grpc/support/host_port.h> #include <grpc/support/string.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> -/* set a socket to non blocking mode */ -int grpc_set_socket_nonblocking(int fd, int non_blocking) { - int oldflags = fcntl(fd, F_GETFL, 0); - if (oldflags < 0) { - return 0; - } - - if (non_blocking) { - oldflags |= O_NONBLOCK; - } else { - oldflags &= ~O_NONBLOCK; - } - - if (fcntl(fd, F_SETFL, oldflags) != 0) { - return 0; - } - - return 1; -} - -/* set a socket to close on exec */ -int grpc_set_socket_cloexec(int fd, int close_on_exec) { - int oldflags = fcntl(fd, F_GETFD, 0); - if (oldflags < 0) { - return 0; - } - - if (close_on_exec) { - oldflags |= FD_CLOEXEC; - } else { - oldflags &= ~FD_CLOEXEC; - } - - if (fcntl(fd, F_SETFD, oldflags) != 0) { - return 0; - } - - return 1; -} - -/* set a socket to reuse old addresses */ -int grpc_set_socket_reuse_addr(int fd, int reuse) { - int val = (reuse != 0); - int newval; - socklen_t intlen = sizeof(newval); - return 0 == setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) && - 0 == getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen) && - newval == val; -} - -/* disable nagle */ -int grpc_set_socket_low_latency(int fd, int low_latency) { - int val = (low_latency != 0); - int newval; - socklen_t intlen = sizeof(newval); - return 0 == setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) && - 0 == getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen) && - newval == val; -} - -/* This should be 0 in production, but it may be enabled for testing or - debugging purposes, to simulate an environment where IPv6 sockets can't - also speak IPv4. */ -int grpc_forbid_dualstack_sockets_for_testing = 0; - -static int set_socket_dualstack(int fd) { - if (!grpc_forbid_dualstack_sockets_for_testing) { - const int off = 0; - return 0 == setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof(off)); - } else { - /* Force an IPv6-only socket, for testing purposes. */ - const int on = 1; - setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)); - return 0; - } -} - -int grpc_create_dualstack_socket(const struct sockaddr *addr, int type, - int protocol, grpc_dualstack_mode *dsmode) { - int family = addr->sa_family; - if (family == AF_INET6) { - int fd = socket(family, type, protocol); - /* Check if we've got a valid dualstack socket. */ - if (fd >= 0 && set_socket_dualstack(fd)) { - *dsmode = GRPC_DSMODE_DUALSTACK; - return fd; - } - /* If this isn't an IPv4 address, then return whatever we've got. */ - if (!grpc_sockaddr_is_v4mapped(addr, NULL)) { - *dsmode = GRPC_DSMODE_IPV6; - return fd; - } - /* Fall back to AF_INET. */ - if (fd >= 0) { - close(fd); - } - family = AF_INET; - } - *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE; - return socket(family, type, protocol); -} - static const gpr_uint8 kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; diff --git a/src/core/iomgr/sockaddr_utils.h b/src/core/iomgr/sockaddr_utils.h new file mode 100644 index 0000000000..753d0c824a --- /dev/null +++ b/src/core/iomgr/sockaddr_utils.h @@ -0,0 +1,75 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_SOCKADDR_UTILS_H__ +#define __GRPC_INTERNAL_IOMGR_SOCKADDR_UTILS_H__ + +#include "src/core/iomgr/sockaddr.h" + +/* Returns true if addr is an IPv4-mapped IPv6 address within the + ::ffff:0.0.0.0/96 range, or false otherwise. + + If addr4_out is non-NULL, the inner IPv4 address will be copied here when + returning true. */ +int grpc_sockaddr_is_v4mapped(const struct sockaddr *addr, + struct sockaddr_in *addr4_out); + +/* If addr is an AF_INET address, writes the corresponding ::ffff:0.0.0.0/96 + address to addr6_out and returns true. Otherwise returns false. */ +int grpc_sockaddr_to_v4mapped(const struct sockaddr *addr, + struct sockaddr_in6 *addr6_out); + +/* If addr is ::, 0.0.0.0, or ::ffff:0.0.0.0, writes the port number to + *port_out (if not NULL) and returns true, otherwise returns false. */ +int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out); + +/* Writes 0.0.0.0:port and [::]:port to separate sockaddrs. */ +void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out, + struct sockaddr_in6 *wild6_out); + +/* Converts a sockaddr into a newly-allocated human-readable string. + + Currently, only the AF_INET and AF_INET6 families are recognized. + If the normalize flag is enabled, ::ffff:0.0.0.0/96 IPv6 addresses are + displayed as plain IPv4. + + Usage is similar to gpr_asprintf: returns the number of bytes written + (excluding the final '\0'), and *out points to a string which must later be + destroyed using gpr_free(). + + In the unlikely event of an error, returns -1 and sets *out to NULL. + The existing value of errno is always preserved. */ +int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, + int normalize); + +#endif /* __GRPC_INTERNAL_IOMGR_SOCKADDR_UTILS_H__ */ diff --git a/src/core/eventmanager/em_win32.c b/src/core/iomgr/sockaddr_win32.h index 4d5c3b5126..751ac3d2e7 100644 --- a/src/core/eventmanager/em_win32.c +++ b/src/core/iomgr/sockaddr_win32.h @@ -31,8 +31,7 @@ * */ -/* Windows event manager support code. */ -#include <event2/thread.h> +#ifndef __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_ +#define __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_ -/* Notify LibEvent that Windows thread is used. */ -int evthread_use_threads() { return evthread_use_windows_threads(); } +#endif // __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_ diff --git a/src/core/iomgr/socket_utils_common_posix.c b/src/core/iomgr/socket_utils_common_posix.c new file mode 100644 index 0000000000..0767d6f918 --- /dev/null +++ b/src/core/iomgr/socket_utils_common_posix.c @@ -0,0 +1,154 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/socket_utils_posix.h" + +#include <arpa/inet.h> +#include <limits.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <stdio.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> +#include <string.h> +#include <errno.h> + +#include "src/core/iomgr/sockaddr_utils.h" +#include <grpc/support/host_port.h> +#include <grpc/support/string.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> + +/* set a socket to non blocking mode */ +int grpc_set_socket_nonblocking(int fd, int non_blocking) { + int oldflags = fcntl(fd, F_GETFL, 0); + if (oldflags < 0) { + return 0; + } + + if (non_blocking) { + oldflags |= O_NONBLOCK; + } else { + oldflags &= ~O_NONBLOCK; + } + + if (fcntl(fd, F_SETFL, oldflags) != 0) { + return 0; + } + + return 1; +} + +/* set a socket to close on exec */ +int grpc_set_socket_cloexec(int fd, int close_on_exec) { + int oldflags = fcntl(fd, F_GETFD, 0); + if (oldflags < 0) { + return 0; + } + + if (close_on_exec) { + oldflags |= FD_CLOEXEC; + } else { + oldflags &= ~FD_CLOEXEC; + } + + if (fcntl(fd, F_SETFD, oldflags) != 0) { + return 0; + } + + return 1; +} + +/* set a socket to reuse old addresses */ +int grpc_set_socket_reuse_addr(int fd, int reuse) { + int val = (reuse != 0); + int newval; + socklen_t intlen = sizeof(newval); + return 0 == setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) && + 0 == getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen) && + newval == val; +} + +/* disable nagle */ +int grpc_set_socket_low_latency(int fd, int low_latency) { + int val = (low_latency != 0); + int newval; + socklen_t intlen = sizeof(newval); + return 0 == setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) && + 0 == getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen) && + newval == val; +} + +/* This should be 0 in production, but it may be enabled for testing or + debugging purposes, to simulate an environment where IPv6 sockets can't + also speak IPv4. */ +int grpc_forbid_dualstack_sockets_for_testing = 0; + +static int set_socket_dualstack(int fd) { + if (!grpc_forbid_dualstack_sockets_for_testing) { + const int off = 0; + return 0 == setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof(off)); + } else { + /* Force an IPv6-only socket, for testing purposes. */ + const int on = 1; + setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)); + return 0; + } +} + +int grpc_create_dualstack_socket(const struct sockaddr *addr, int type, + int protocol, grpc_dualstack_mode *dsmode) { + int family = addr->sa_family; + if (family == AF_INET6) { + int fd = socket(family, type, protocol); + /* Check if we've got a valid dualstack socket. */ + if (fd >= 0 && set_socket_dualstack(fd)) { + *dsmode = GRPC_DSMODE_DUALSTACK; + return fd; + } + /* If this isn't an IPv4 address, then return whatever we've got. */ + if (!grpc_sockaddr_is_v4mapped(addr, NULL)) { + *dsmode = GRPC_DSMODE_IPV6; + return fd; + } + /* Fall back to AF_INET. */ + if (fd >= 0) { + close(fd); + } + family = AF_INET; + } + *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE; + return socket(family, type, protocol); +} diff --git a/src/core/endpoint/socket_utils_linux.c b/src/core/iomgr/socket_utils_linux.c index 479675ec7d..f971cb33bc 100644 --- a/src/core/endpoint/socket_utils_linux.c +++ b/src/core/iomgr/socket_utils_linux.c @@ -36,7 +36,7 @@ #ifdef GPR_LINUX -#include "src/core/endpoint/socket_utils.h" +#include "src/core/iomgr/socket_utils_posix.h" #include <sys/types.h> #include <sys/socket.h> diff --git a/src/core/endpoint/socket_utils_posix.c b/src/core/iomgr/socket_utils_posix.c index 262d606af9..262d606af9 100644 --- a/src/core/endpoint/socket_utils_posix.c +++ b/src/core/iomgr/socket_utils_posix.c diff --git a/src/core/endpoint/socket_utils.h b/src/core/iomgr/socket_utils_posix.h index 23fa19284a..5c31e5e6d8 100644 --- a/src/core/endpoint/socket_utils.h +++ b/src/core/iomgr/socket_utils_posix.h @@ -31,16 +31,12 @@ * */ -#ifndef __GRPC_INTERNAL_ENDPOINT_SOCKET_UTILS_H__ -#define __GRPC_INTERNAL_ENDPOINT_SOCKET_UTILS_H__ +#ifndef __GRPC_INTERNAL_IOMGR_SOCKET_UTILS_POSIX_H__ +#define __GRPC_INTERNAL_IOMGR_SOCKET_UTILS_POSIX_H__ #include <unistd.h> #include <sys/socket.h> -struct sockaddr; -struct sockaddr_in; -struct sockaddr_in6; - /* a wrapper for accept or accept4 */ int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int nonblock, int cloexec); @@ -99,40 +95,4 @@ extern int grpc_forbid_dualstack_sockets_for_testing; int grpc_create_dualstack_socket(const struct sockaddr *addr, int type, int protocol, grpc_dualstack_mode *dsmode); -/* Returns true if addr is an IPv4-mapped IPv6 address within the - ::ffff:0.0.0.0/96 range, or false otherwise. - - If addr4_out is non-NULL, the inner IPv4 address will be copied here when - returning true. */ -int grpc_sockaddr_is_v4mapped(const struct sockaddr *addr, - struct sockaddr_in *addr4_out); - -/* If addr is an AF_INET address, writes the corresponding ::ffff:0.0.0.0/96 - address to addr6_out and returns true. Otherwise returns false. */ -int grpc_sockaddr_to_v4mapped(const struct sockaddr *addr, - struct sockaddr_in6 *addr6_out); - -/* If addr is ::, 0.0.0.0, or ::ffff:0.0.0.0, writes the port number to - *port_out (if not NULL) and returns true, otherwise returns false. */ -int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out); - -/* Writes 0.0.0.0:port and [::]:port to separate sockaddrs. */ -void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out, - struct sockaddr_in6 *wild6_out); - -/* Converts a sockaddr into a newly-allocated human-readable string. - - Currently, only the AF_INET and AF_INET6 families are recognized. - If the normalize flag is enabled, ::ffff:0.0.0.0/96 IPv6 addresses are - displayed as plain IPv4. - - Usage is similar to gpr_asprintf: returns the number of bytes written - (excluding the final '\0'), and *out points to a string which must later be - destroyed using gpr_free(). - - In the unlikely event of an error, returns -1 and sets *out to NULL. - The existing value of errno is always preserved. */ -int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, - int normalize); - -#endif /* __GRPC_INTERNAL_ENDPOINT_SOCKET_UTILS_H__ */ +#endif /* __GRPC_INTERNAL_IOMGR_SOCKET_UTILS_POSIX_H__ */ diff --git a/src/core/endpoint/tcp_client.h b/src/core/iomgr/tcp_client.h index 69b1b62f37..a4632d81cf 100644 --- a/src/core/endpoint/tcp_client.h +++ b/src/core/iomgr/tcp_client.h @@ -31,21 +31,18 @@ * */ -#ifndef __GRPC_INTERNAL_ENDPOINT_TCP_CLIENT_H__ -#define __GRPC_INTERNAL_ENDPOINT_TCP_CLIENT_H__ +#ifndef __GRPC_INTERNAL_IOMGR_TCP_CLIENT_H__ +#define __GRPC_INTERNAL_IOMGR_TCP_CLIENT_H__ -#include "src/core/endpoint/tcp.h" +#include "src/core/endpoint/endpoint.h" +#include "src/core/iomgr/sockaddr.h" #include <grpc/support/time.h> -#include <sys/types.h> -#include <sys/socket.h> - /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and NULL on failure) */ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), - void *arg, grpc_em *em, - const struct sockaddr *addr, int addr_len, - gpr_timespec deadline); + void *arg, const struct sockaddr *addr, + int addr_len, gpr_timespec deadline); -#endif /* __GRPC_INTERNAL_ENDPOINT_TCP_CLIENT_H__ */ +#endif /* __GRPC_INTERNAL_IOMGR_TCP_CLIENT_H__ */ diff --git a/src/core/endpoint/tcp_client.c b/src/core/iomgr/tcp_client_posix.c index c6f470ba88..8d2d7ab081 100644 --- a/src/core/endpoint/tcp_client.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -31,14 +31,17 @@ * */ -#include "src/core/endpoint/tcp_client.h" +#include "src/core/iomgr/tcp_client.h" #include <errno.h> #include <netinet/in.h> #include <string.h> #include <unistd.h> -#include "src/core/endpoint/socket_utils.h" +#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/iomgr/tcp_posix.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/time.h> @@ -46,7 +49,7 @@ typedef struct { void (*cb)(void *arg, grpc_endpoint *tcp); void *cb_arg; - grpc_em_fd *fd; + grpc_fd *fd; gpr_timespec deadline; } async_connect; @@ -71,12 +74,12 @@ error: return 0; } -static void on_writable(void *acp, grpc_em_cb_status status) { +static void on_writable(void *acp, grpc_iomgr_cb_status status) { async_connect *ac = acp; int so_error = 0; socklen_t so_error_size; int err; - int fd = grpc_em_fd_get(ac->fd); + int fd = grpc_fd_get(ac->fd); if (status == GRPC_CALLBACK_SUCCESS) { do { @@ -103,7 +106,7 @@ static void on_writable(void *acp, grpc_em_cb_status status) { opened too many network connections. The "easy" fix: don't do that! */ gpr_log(GPR_ERROR, "kernel out of buffers"); - grpc_em_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline); + grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline); return; } else { goto error; @@ -120,20 +123,18 @@ static void on_writable(void *acp, grpc_em_cb_status status) { error: ac->cb(ac->cb_arg, NULL); - grpc_em_fd_destroy(ac->fd); - gpr_free(ac->fd); + grpc_fd_destroy(ac->fd); gpr_free(ac); return; great_success: - ac->cb(ac->cb_arg, grpc_tcp_create_emfd(ac->fd)); + ac->cb(ac->cb_arg, grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); gpr_free(ac); } void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), - void *arg, grpc_em *em, - const struct sockaddr *addr, int addr_len, - gpr_timespec deadline) { + void *arg, const struct sockaddr *addr, + int addr_len, gpr_timespec deadline) { int fd; grpc_dualstack_mode dsmode; int err; @@ -167,7 +168,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), } while (err < 0 && errno == EINTR); if (err >= 0) { - cb(arg, grpc_tcp_create(fd, em)); + cb(arg, + grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); return; } @@ -182,7 +184,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->cb = cb; ac->cb_arg = arg; ac->deadline = deadline; - ac->fd = gpr_malloc(sizeof(grpc_em_fd)); - grpc_em_fd_init(ac->fd, em, fd); - grpc_em_fd_notify_on_write(ac->fd, on_writable, ac, deadline); + ac->fd = grpc_fd_create(fd); + grpc_fd_notify_on_write(ac->fd, on_writable, ac, deadline); } diff --git a/src/core/endpoint/tcp.c b/src/core/iomgr/tcp_posix.c index 482344d265..8f63f75612 100644 --- a/src/core/endpoint/tcp.c +++ b/src/core/iomgr/tcp_posix.c @@ -31,7 +31,7 @@ * */ -#include "src/core/endpoint/tcp.h" +#include "src/core/iomgr/tcp_posix.h" #include <errno.h> #include <stdlib.h> @@ -40,7 +40,6 @@ #include <sys/socket.h> #include <unistd.h> -#include "src/core/eventmanager/em.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice.h> @@ -249,8 +248,7 @@ static void slice_state_remove_last(grpc_tcp_slice_state *state, size_t bytes) { typedef struct { grpc_endpoint base; - grpc_em *em; - grpc_em_fd *em_fd; + grpc_fd *em_fd; int fd; size_t slice_size; gpr_refcount refcount; @@ -266,25 +264,19 @@ typedef struct { } grpc_tcp; static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, - grpc_em_cb_status status); + grpc_iomgr_cb_status status); static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, - grpc_em_cb_status status); - -#define DEFAULT_SLICE_SIZE 8192 -grpc_endpoint *grpc_tcp_create(int fd, grpc_em *em) { - return grpc_tcp_create_dbg(fd, em, DEFAULT_SLICE_SIZE); -} + grpc_iomgr_cb_status status); static void grpc_tcp_shutdown(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_em_fd_shutdown(tcp->em_fd); + grpc_fd_shutdown(tcp->em_fd); } static void grpc_tcp_unref(grpc_tcp *tcp) { int refcount_zero = gpr_unref(&tcp->refcount); if (refcount_zero) { - grpc_em_fd_destroy(tcp->em_fd); - gpr_free(tcp->em_fd); + grpc_fd_destroy(tcp->em_fd); gpr_free(tcp); } } @@ -317,7 +309,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, #define INLINE_SLICE_BUFFER_SIZE 8 #define MAX_READ_IOVEC 4 static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, - grpc_em_cb_status status) { + grpc_iomgr_cb_status status) { grpc_tcp *tcp = (grpc_tcp *)arg; int iov_size = 1; gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE]; @@ -385,8 +377,8 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, } else { /* Spurious read event, consume it here */ slice_state_destroy(&read_state); - grpc_em_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, - tcp->read_deadline); + grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, + tcp->read_deadline); } } else { /* TODO(klempner): Log interesting errors */ @@ -422,7 +414,7 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, tcp->read_user_data = user_data; tcp->read_deadline = deadline; gpr_ref(&tcp->refcount); - grpc_em_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, deadline); + grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, deadline); } #define MAX_WRITE_IOVEC 16 @@ -469,7 +461,7 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) { } static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, - grpc_em_cb_status status) { + grpc_iomgr_cb_status status) { grpc_tcp *tcp = (grpc_tcp *)arg; grpc_endpoint_write_status write_status; grpc_endpoint_cb_status cb_status; @@ -494,8 +486,8 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, write_status = grpc_tcp_flush(tcp); if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { - grpc_em_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, - tcp->write_deadline); + grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, + tcp->write_deadline); } else { slice_state_destroy(&tcp->write_state); if (write_status == GRPC_ENDPOINT_WRITE_DONE) { @@ -539,8 +531,8 @@ static grpc_endpoint_write_status grpc_tcp_write( tcp->write_cb = cb; tcp->write_user_data = user_data; tcp->write_deadline = deadline; - grpc_em_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, - tcp->write_deadline); + grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, + tcp->write_deadline); } return status; @@ -550,12 +542,10 @@ static const grpc_endpoint_vtable vtable = {grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_shutdown, grpc_tcp_destroy}; -static grpc_endpoint *grpc_tcp_create_generic(grpc_em_fd *em_fd, - size_t slice_size) { +grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; - tcp->fd = grpc_em_fd_get(em_fd); - tcp->em = grpc_em_fd_get_em(em_fd); + tcp->fd = grpc_fd_get(em_fd); tcp->read_cb = NULL; tcp->write_cb = NULL; tcp->read_user_data = NULL; @@ -569,13 +559,3 @@ static grpc_endpoint *grpc_tcp_create_generic(grpc_em_fd *em_fd, tcp->em_fd = em_fd; return &tcp->base; } - -grpc_endpoint *grpc_tcp_create_dbg(int fd, grpc_em *em, size_t slice_size) { - grpc_em_fd *em_fd = gpr_malloc(sizeof(grpc_em_fd)); - grpc_em_fd_init(em_fd, em, fd); - return grpc_tcp_create_generic(em_fd, slice_size); -} - -grpc_endpoint *grpc_tcp_create_emfd(grpc_em_fd *em_fd) { - return grpc_tcp_create_generic(em_fd, DEFAULT_SLICE_SIZE); -} diff --git a/src/core/endpoint/tcp.h b/src/core/iomgr/tcp_posix.h index f6a2a19ec4..8a3c52894c 100644 --- a/src/core/endpoint/tcp.h +++ b/src/core/iomgr/tcp_posix.h @@ -31,8 +31,8 @@ * */ -#ifndef __GRPC_INTERNAL_ENDPOINT_TCP_H__ -#define __GRPC_INTERNAL_ENDPOINT_TCP_H__ +#ifndef __GRPC_INTERNAL_IOMGR_TCP_POSIX_H__ +#define __GRPC_INTERNAL_IOMGR_TCP_POSIX_H__ /* Low level TCP "bottom half" implementation, for use by transports built on top of a TCP connection. @@ -45,15 +45,12 @@ */ #include "src/core/endpoint/endpoint.h" -#include "src/core/eventmanager/em.h" +#include "src/core/iomgr/iomgr_libevent.h" -/* Create a tcp from an already connected file descriptor. */ -grpc_endpoint *grpc_tcp_create(int fd, grpc_em *em); -/* Special version for debugging slice changes */ -grpc_endpoint *grpc_tcp_create_dbg(int fd, grpc_em *em, size_t slice_size); +#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 -/* Special version for handing off ownership of an existing already created - eventmanager fd. Must not have any outstanding callbacks. */ -grpc_endpoint *grpc_tcp_create_emfd(grpc_em_fd *em_fd); +/* Create a tcp endpoint given a file desciptor and a read slice size. + Takes ownership of fd. */ +grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size); -#endif /* __GRPC_INTERNAL_ENDPOINT_TCP_H__ */ +#endif /* __GRPC_INTERNAL_IOMGR_TCP_POSIX_H__ */ diff --git a/src/core/endpoint/tcp_server.h b/src/core/iomgr/tcp_server.h index d81cdd000e..bd6b46f538 100644 --- a/src/core/endpoint/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -31,14 +31,13 @@ * */ -#ifndef __GRPC_INTERNAL_ENDPOINT_TCP_SERVER_H__ -#define __GRPC_INTERNAL_ENDPOINT_TCP_SERVER_H__ +#ifndef __GRPC_INTERNAL_IOMGR_TCP_SERVER_H__ +#define __GRPC_INTERNAL_IOMGR_TCP_SERVER_H__ #include <sys/types.h> #include <sys/socket.h> -#include "src/core/endpoint/tcp.h" -#include "src/core/eventmanager/em.h" +#include "src/core/endpoint/endpoint.h" /* Forward decl of grpc_tcp_server */ typedef struct grpc_tcp_server grpc_tcp_server; @@ -47,7 +46,7 @@ typedef struct grpc_tcp_server grpc_tcp_server; typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep); /* Create a server, initially not bound to any ports */ -grpc_tcp_server *grpc_tcp_server_create(grpc_em *em); +grpc_tcp_server *grpc_tcp_server_create(); /* Start listening to bound ports */ void grpc_tcp_server_start(grpc_tcp_server *server, grpc_tcp_server_cb cb, @@ -73,4 +72,4 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, int index); void grpc_tcp_server_destroy(grpc_tcp_server *server); -#endif /* __GRPC_INTERNAL_ENDPOINT_TCP_SERVER_H__ */ +#endif /* __GRPC_INTERNAL_IOMGR_TCP_SERVER_H__ */ diff --git a/src/core/endpoint/tcp_server.c b/src/core/iomgr/tcp_server_posix.c index efd3dede50..22bbd45351 100644 --- a/src/core/endpoint/tcp_server.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -32,7 +32,7 @@ */ #define _GNU_SOURCE -#include "src/core/endpoint/tcp_server.h" +#include "src/core/iomgr/tcp_server.h" #include <limits.h> #include <fcntl.h> @@ -45,7 +45,10 @@ #include <string.h> #include <errno.h> -#include "src/core/endpoint/socket_utils.h" +#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/iomgr/tcp_posix.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> @@ -60,13 +63,12 @@ static int s_max_accept_queue_size; /* one listening port */ typedef struct { int fd; - grpc_em_fd *emfd; + grpc_fd *emfd; grpc_tcp_server *server; } server_port; /* the overall server */ struct grpc_tcp_server { - grpc_em *em; grpc_tcp_server_cb cb; void *cb_arg; @@ -82,12 +84,11 @@ struct grpc_tcp_server { size_t port_capacity; }; -grpc_tcp_server *grpc_tcp_server_create(grpc_em *em) { +grpc_tcp_server *grpc_tcp_server_create() { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); gpr_mu_init(&s->mu); gpr_cv_init(&s->cv); s->active_ports = 0; - s->em = em; s->cb = NULL; s->cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); @@ -101,7 +102,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s) { gpr_mu_lock(&s->mu); /* shutdown all fd's */ for (i = 0; i < s->nports; i++) { - grpc_em_fd_shutdown(s->ports[i].emfd); + grpc_fd_shutdown(s->ports[i].emfd); } /* wait while that happens */ while (s->active_ports) { @@ -112,8 +113,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s) { /* delete ALL the things */ for (i = 0; i < s->nports; i++) { server_port *sp = &s->ports[i]; - grpc_em_fd_destroy(sp->emfd); - gpr_free(sp->emfd); + grpc_fd_destroy(sp->emfd); } gpr_free(s->ports); gpr_free(s); @@ -189,7 +189,7 @@ error: } /* event manager callback when reads are ready */ -static void on_read(void *arg, grpc_em_cb_status status) { +static void on_read(void *arg, grpc_iomgr_cb_status status) { server_port *sp = arg; if (status != GRPC_CALLBACK_SUCCESS) { @@ -208,11 +208,7 @@ static void on_read(void *arg, grpc_em_cb_status status) { case EINTR: continue; case EAGAIN: - if (GRPC_EM_OK != grpc_em_fd_notify_on_read(sp->emfd, on_read, sp, - gpr_inf_future)) { - gpr_log(GPR_ERROR, "Failed to register read request with em"); - goto error; - } + grpc_fd_notify_on_read(sp->emfd, on_read, sp, gpr_inf_future); return; default: gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); @@ -220,7 +216,9 @@ static void on_read(void *arg, grpc_em_cb_status status) { } } - sp->server->cb(sp->server->cb_arg, grpc_tcp_create(fd, sp->server->em)); + sp->server->cb( + sp->server->cb_arg, + grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); } abort(); @@ -249,13 +247,11 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, s->ports = gpr_realloc(s->ports, sizeof(server_port *) * s->port_capacity); } sp = &s->ports[s->nports++]; - sp->emfd = gpr_malloc(sizeof(grpc_em_fd)); + sp->emfd = grpc_fd_create(fd); sp->fd = fd; sp->server = s; /* initialize the em desc */ - if (GRPC_EM_OK != grpc_em_fd_init(sp->emfd, s->em, fd)) { - grpc_em_fd_destroy(sp->emfd); - gpr_free(sp->emfd); + if (sp->emfd == NULL) { s->nports--; gpr_mu_unlock(&s->mu); return 0; @@ -326,8 +322,8 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb, s->cb = cb; s->cb_arg = cb_arg; for (i = 0; i < s->nports; i++) { - grpc_em_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i], - gpr_inf_future); + grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i], + gpr_inf_future); s->active_ports++; } gpr_mu_unlock(&s->mu); diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index 7ff48f9123..bfc2e3361a 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -34,7 +34,7 @@ #include "src/core/security/credentials.h" #include "src/core/httpcli/httpcli.h" -#include "src/core/surface/surface_em.h" +#include "src/core/iomgr/iomgr.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string.h> @@ -379,7 +379,7 @@ static void compute_engine_get_request_metadata(grpc_credentials *creds, request.hdr_count = 1; request.hdrs = &header; grpc_httpcli_get( - &request, gpr_time_add(gpr_now(), refresh_threshold), grpc_surface_em(), + &request, gpr_time_add(gpr_now(), refresh_threshold), on_compute_engine_token_response, grpc_credentials_metadata_request_create(creds, cb, user_data)); } else { @@ -433,7 +433,8 @@ static int fake_oauth2_has_request_metadata_only( return 1; } -void on_simulated_token_fetch_done(void *user_data, grpc_em_cb_status status) { +void on_simulated_token_fetch_done(void *user_data, + grpc_iomgr_cb_status status) { grpc_credentials_metadata_request *r = (grpc_credentials_metadata_request *)user_data; grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)r->creds; @@ -448,10 +449,9 @@ static void fake_oauth2_get_request_metadata(grpc_credentials *creds, grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)creds; if (c->is_async) { - GPR_ASSERT(grpc_em_add_callback(grpc_surface_em(), - on_simulated_token_fetch_done, - grpc_credentials_metadata_request_create( - creds, cb, user_data)) == GRPC_EM_OK); + grpc_iomgr_add_callback( + on_simulated_token_fetch_done, + grpc_credentials_metadata_request_create(creds, cb, user_data)); } else { cb(user_data, &c->access_token_md, 1, GRPC_CREDENTIALS_OK); } diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 335d502217..28b56dd4c9 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -35,12 +35,11 @@ #include "src/core/channel/http_filter.h" #include "src/core/channel/http_server_filter.h" -#include "src/core/endpoint/resolve_address.h" -#include "src/core/endpoint/tcp_server.h" +#include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/tcp_server.h" #include "src/core/security/security_context.h" #include "src/core/security/secure_transport_setup.h" #include "src/core/surface/server.h" -#include "src/core/surface/surface_em.h" #include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -101,7 +100,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr) { goto error; } - tcp = grpc_tcp_server_create(grpc_surface_em()); + tcp = grpc_tcp_server_create(); if (!tcp) { goto error; } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index a731c7cab7..1cffe3d32a 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -34,12 +34,12 @@ #include "src/core/surface/call.h" #include "src/core/channel/channel_stack.h" #include "src/core/channel/metadata_buffer.h" +#include "src/core/iomgr/alarm.h" +#include "src/core/surface/channel.h" +#include "src/core/surface/completion_queue.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string.h> -#include "src/core/surface/channel.h" -#include "src/core/surface/completion_queue.h" -#include "src/core/surface/surface_em.h" #include <stdio.h> #include <stdlib.h> @@ -184,7 +184,7 @@ struct grpc_call { void *finished_tag; pending_read_queue prq; - grpc_em_alarm alarm; + grpc_alarm alarm; /* The current outstanding send message/context/invoke/end tag (only valid if have_write == 1) */ @@ -258,7 +258,7 @@ void grpc_call_internal_unref(grpc_call *c) { void grpc_call_destroy(grpc_call *c) { gpr_mu_lock(&c->read_mu); if (c->have_alarm) { - grpc_em_alarm_cancel(&c->alarm); + grpc_alarm_cancel(&c->alarm); c->have_alarm = 0; } gpr_mu_unlock(&c->read_mu); @@ -813,7 +813,7 @@ void grpc_call_recv_finish(grpc_call_element *elem, int is_full_close) { } if (is_full_close) { if (call->have_alarm) { - grpc_em_alarm_cancel(&call->alarm); + grpc_alarm_cancel(&call->alarm); call->have_alarm = 0; } call->received_finish = 1; @@ -852,7 +852,7 @@ grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) { return &call->incoming_metadata; } -static void call_alarm(void *arg, grpc_em_cb_status status) { +static void call_alarm(void *arg, grpc_iomgr_cb_status status) { grpc_call *call = arg; if (status == GRPC_CALLBACK_SUCCESS) { grpc_call_cancel(call); @@ -868,6 +868,6 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { } grpc_call_internal_ref(call); call->have_alarm = 1; - grpc_em_alarm_init(&call->alarm, grpc_surface_em(), call_alarm, call); - grpc_em_alarm_add(&call->alarm, deadline); + grpc_alarm_init(&call->alarm, call_alarm, call); + grpc_alarm_add(&call->alarm, deadline); } diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index ec1c8477fa..7d30b64204 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -43,12 +43,11 @@ #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" #include "src/core/channel/http_filter.h" -#include "src/core/endpoint/resolve_address.h" -#include "src/core/endpoint/tcp.h" -#include "src/core/endpoint/tcp_client.h" +#include "src/core/endpoint/endpoint.h" +#include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/tcp_client.h" #include "src/core/surface/channel.h" #include "src/core/surface/client.h" -#include "src/core/surface/surface_em.h" #include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -74,7 +73,6 @@ struct setup { const char *target; grpc_transport_setup_callback setup_callback; void *setup_user_data; - grpc_em *em; }; static int maybe_try_next_resolved(request *r); @@ -123,8 +121,8 @@ static int maybe_try_next_resolved(request *r) { if (!r->resolved) return 0; if (r->resolved_index == r->resolved->naddrs) return 0; addr = &r->resolved->addrs[r->resolved_index++]; - grpc_tcp_client_connect(on_connect, r, r->setup->em, - (struct sockaddr *)&addr->addr, addr->len, + grpc_tcp_client_connect(on_connect, r, (struct sockaddr *)&addr->addr, + addr->len, grpc_client_setup_request_deadline(r->cs_request)); return 1; } @@ -201,13 +199,12 @@ grpc_channel *grpc_channel_create(const char *target, channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); s->target = gpr_strdup(target); - s->em = grpc_surface_em(); s->setup_callback = complete_setup; s->setup_user_data = grpc_channel_get_channel_stack(channel); grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel), args, mdctx, initiate_setup, done_setup, - s, s->em); + s); return channel; } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 2002476777..1f3074fd42 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -36,10 +36,9 @@ #include <stdio.h> #include <string.h> -#include "src/core/eventmanager/em.h" +#include "src/core/iomgr/iomgr_completion_queue_interface.h" #include "src/core/surface/call.h" #include "src/core/surface/event_string.h" -#include "src/core/surface/surface_em.h" #include "src/core/surface/surface_trace.h" #include <grpc/support/alloc.h> #include <grpc/support/atm.h> @@ -62,7 +61,6 @@ typedef struct event { /* Completion queue structure */ struct grpc_completion_queue { - grpc_em *em; int allow_polling; /* When refs drops to zero, we are in shutdown mode, and will be destroyable @@ -89,7 +87,6 @@ grpc_completion_queue *grpc_completion_queue_create() { memset(cc, 0, sizeof(*cc)); /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->refs, 1); - cc->em = grpc_surface_em(); cc->allow_polling = 1; return cc; } @@ -100,7 +97,7 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) { /* Create and append an event to the queue. Returns the event so that its data members can be filled in. - Requires cc->em->mu locked. */ + Requires grpc_iomgr_mu locked. */ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data) { @@ -126,7 +123,7 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, ev->bucket_prev = cc->buckets[bucket]->bucket_prev; ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev; } - gpr_cv_broadcast(&cc->em->cv); + gpr_cv_broadcast(&grpc_iomgr_cv); return ev; } @@ -149,7 +146,7 @@ static void end_op_locked(grpc_completion_queue *cc, if (gpr_unref(&cc->refs)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_cv_broadcast(&cc->em->cv); + gpr_cv_broadcast(&grpc_iomgr_cv); } } @@ -157,11 +154,11 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data, grpc_byte_buffer *read) { event *ev; - gpr_mu_lock(&cc->em->mu); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data); ev->base.data.read = read; end_op_locked(cc, GRPC_READ); - gpr_mu_unlock(&cc->em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, @@ -169,11 +166,11 @@ void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(&cc->em->mu); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.invoke_accepted = error; end_op_locked(cc, GRPC_INVOKE_ACCEPTED); - gpr_mu_unlock(&cc->em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, @@ -181,11 +178,11 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(&cc->em->mu); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.write_accepted = error; end_op_locked(cc, GRPC_WRITE_ACCEPTED); - gpr_mu_unlock(&cc->em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, @@ -193,11 +190,11 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(&cc->em->mu); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.finish_accepted = error; end_op_locked(cc, GRPC_FINISH_ACCEPTED); - gpr_mu_unlock(&cc->em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, @@ -206,24 +203,24 @@ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, void *user_data, size_t count, grpc_metadata *elements) { event *ev; - gpr_mu_lock(&cc->em->mu); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish, user_data); ev->base.data.client_metadata_read.count = count; ev->base.data.client_metadata_read.elements = elements; end_op_locked(cc, GRPC_CLIENT_METADATA_READ); - gpr_mu_unlock(&cc->em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data, grpc_status status) { event *ev; - gpr_mu_lock(&cc->em->mu); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data); ev->base.data.finished = status; end_op_locked(cc, GRPC_FINISHED); - gpr_mu_unlock(&cc->em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, @@ -232,7 +229,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, gpr_timespec deadline, size_t metadata_count, grpc_metadata *metadata_elements) { event *ev; - gpr_mu_lock(&cc->em->mu); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data); ev->base.data.server_rpc_new.method = method; ev->base.data.server_rpc_new.host = host; @@ -240,7 +237,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, ev->base.data.server_rpc_new.metadata_count = metadata_count; ev->base.data.server_rpc_new.metadata_elements = metadata_elements; end_op_locked(cc, GRPC_SERVER_RPC_NEW); - gpr_mu_unlock(&cc->em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); } /* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ @@ -257,7 +254,7 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline) { event *ev = NULL; - gpr_mu_lock(&cc->em->mu); + gpr_mu_lock(&grpc_iomgr_mu); for (;;) { if (cc->queue != NULL) { gpr_uintptr bucket; @@ -283,15 +280,15 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_em_work(cc->em, deadline)) { + if (cc->allow_polling && grpc_iomgr_work(deadline)) { continue; } - if (gpr_cv_wait(&cc->em->cv, &cc->em->mu, deadline)) { - gpr_mu_unlock(&cc->em->mu); + if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) { + gpr_mu_unlock(&grpc_iomgr_mu); return NULL; } } - gpr_mu_unlock(&cc->em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); return &ev->base; } @@ -329,7 +326,7 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec deadline) { event *ev = NULL; - gpr_mu_lock(&cc->em->mu); + gpr_mu_lock(&grpc_iomgr_mu); for (;;) { if ((ev = pluck_event(cc, tag))) { break; @@ -338,15 +335,15 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_em_work(cc->em, deadline)) { + if (cc->allow_polling && grpc_iomgr_work(deadline)) { continue; } - if (gpr_cv_wait(&cc->em->cv, &cc->em->mu, deadline)) { - gpr_mu_unlock(&cc->em->mu); + if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) { + gpr_mu_unlock(&grpc_iomgr_mu); return NULL; } } - gpr_mu_unlock(&cc->em->mu); + gpr_mu_unlock(&grpc_iomgr_mu); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); return &ev->base; } @@ -355,11 +352,11 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->refs)) { - gpr_mu_lock(&cc->em->mu); + gpr_mu_lock(&grpc_iomgr_mu); GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_cv_broadcast(&cc->em->cv); - gpr_mu_unlock(&cc->em->mu); + gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_mu_unlock(&grpc_iomgr_mu); } } diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 92c0ac880d..832ec085c7 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -33,14 +33,14 @@ #include <grpc/grpc.h> #include "src/core/statistics/census_interface.h" -#include "src/core/surface/surface_em.h" +#include "src/core/iomgr/iomgr.h" void grpc_init() { - grpc_surface_em_init(); + grpc_iomgr_init(); census_init(); } void grpc_shutdown() { - grpc_surface_em_shutdown(); + grpc_iomgr_shutdown(); census_shutdown(); } diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index f330b83521..3d5727927d 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -43,15 +43,13 @@ #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" #include "src/core/channel/http_filter.h" -#include "src/core/endpoint/resolve_address.h" -#include "src/core/endpoint/tcp.h" -#include "src/core/endpoint/tcp_client.h" +#include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/tcp_client.h" #include "src/core/security/auth.h" #include "src/core/security/security_context.h" #include "src/core/security/secure_transport_setup.h" #include "src/core/surface/channel.h" #include "src/core/surface/client.h" -#include "src/core/surface/surface_em.h" #include "src/core/transport/chttp2_transport.h" #include <grpc/grpc_security.h> #include <grpc/support/alloc.h> @@ -78,7 +76,6 @@ struct setup { const char *target; grpc_transport_setup_callback setup_callback; void *setup_user_data; - grpc_em *em; }; static int maybe_try_next_resolved(request *r); @@ -139,8 +136,8 @@ static int maybe_try_next_resolved(request *r) { if (!r->resolved) return 0; if (r->resolved_index == r->resolved->naddrs) return 0; addr = &r->resolved->addrs[r->resolved_index++]; - grpc_tcp_client_connect(on_connect, r, r->setup->em, - (struct sockaddr *)&addr->addr, addr->len, + grpc_tcp_client_connect(on_connect, r, (struct sockaddr *)&addr->addr, + addr->len, grpc_client_setup_request_deadline(r->cs_request)); return 1; } @@ -230,7 +227,6 @@ grpc_channel *grpc_secure_channel_create_internal( grpc_channel_args_destroy(args_copy); s->target = gpr_strdup(target); - s->em = grpc_surface_em(); s->setup_callback = complete_setup; s->setup_user_data = grpc_channel_get_channel_stack(channel); s->security_context = @@ -238,6 +234,6 @@ grpc_channel *grpc_secure_channel_create_internal( &context->base); grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel), args, mdctx, initiate_setup, done_setup, - s, s->em); + s); return channel; } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index d8d5a7adf1..2c859060eb 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -39,10 +39,10 @@ #include "src/core/channel/census_filter.h" #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" +#include "src/core/iomgr/iomgr.h" #include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" -#include "src/core/surface/surface_em.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string.h> @@ -73,7 +73,6 @@ struct grpc_server { const grpc_channel_filter **channel_filters; grpc_channel_args *channel_args; grpc_completion_queue *cq; - grpc_em *em; gpr_mu mu; @@ -193,7 +192,7 @@ static void orphan_channel(channel_data *chand) { chand->next = chand->prev = chand; } -static void finish_destroy_channel(void *cd, grpc_em_cb_status status) { +static void finish_destroy_channel(void *cd, grpc_iomgr_cb_status status) { channel_data *chand = cd; grpc_server *server = chand->server; /*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/ @@ -206,7 +205,7 @@ static void destroy_channel(channel_data *chand) { GPR_ASSERT(chand->server != NULL); orphan_channel(chand); server_ref(chand->server); - grpc_em_add_callback(chand->server->em, finish_destroy_channel, chand); + grpc_iomgr_add_callback(finish_destroy_channel, chand); } static void queue_new_rpc(grpc_server *server, call_data *calld, void *tag) { @@ -254,7 +253,7 @@ static void start_new_rpc(grpc_call_element *elem) { gpr_mu_unlock(&server->mu); } -static void kill_zombie(void *elem, grpc_em_cb_status status) { +static void kill_zombie(void *elem, grpc_iomgr_cb_status status) { grpc_call_destroy(grpc_call_from_top_element(elem)); } @@ -275,7 +274,7 @@ static void finish_rpc(grpc_call_element *elem, int is_full_close) { /* fallthrough intended */ case NOT_STARTED: calld->state = ZOMBIED; - grpc_em_add_callback(chand->server->em, kill_zombie, elem); + grpc_iomgr_add_callback(kill_zombie, elem); break; case ZOMBIED: break; @@ -341,7 +340,7 @@ static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) { } } -static void finish_shutdown_channel(void *cd, grpc_em_cb_status status) { +static void finish_shutdown_channel(void *cd, grpc_iomgr_cb_status status) { channel_data *chand = cd; grpc_channel_op op; op.type = GRPC_CHANNEL_DISCONNECT; @@ -354,7 +353,7 @@ static void finish_shutdown_channel(void *cd, grpc_em_cb_status status) { static void shutdown_channel(channel_data *chand) { grpc_channel_internal_ref(chand->channel); - grpc_em_add_callback(chand->server->em, finish_shutdown_channel, chand); + grpc_iomgr_add_callback(finish_shutdown_channel, chand); } static void init_call_elem(grpc_call_element *elem, @@ -442,7 +441,6 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, gpr_mu_init(&server->mu); server->cq = cq; - server->em = grpc_surface_em(); /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); server->root_channel_data.next = server->root_channel_data.prev = diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index db8924efea..a5fdd03774 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -35,10 +35,9 @@ #include "src/core/channel/http_filter.h" #include "src/core/channel/http_server_filter.h" -#include "src/core/endpoint/resolve_address.h" -#include "src/core/endpoint/tcp_server.h" +#include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/tcp_server.h" #include "src/core/surface/server.h" -#include "src/core/surface/surface_em.h" #include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -83,7 +82,7 @@ int grpc_server_add_http2_port(grpc_server *server, const char *addr) { goto error; } - tcp = grpc_tcp_server_create(grpc_surface_em()); + tcp = grpc_tcp_server_create(); if (!tcp) { goto error; } diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h index 37eb84ed02..24f26068bc 100644 --- a/src/core/transport/chttp2_transport.h +++ b/src/core/transport/chttp2_transport.h @@ -34,7 +34,7 @@ #ifndef __GRPC_INTERNAL_TRANSPORT_CHTTP2_TRANSPORT_H__ #define __GRPC_INTERNAL_TRANSPORT_CHTTP2_TRANSPORT_H__ -#include "src/core/endpoint/tcp.h" +#include "src/core/endpoint/endpoint.h" #include "src/core/transport/transport.h" void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, |