aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/client_setup.c17
-rw-r--r--src/core/channel/client_setup.h3
-rw-r--r--src/core/eventmanager/em.h344
-rw-r--r--src/core/httpcli/httpcli.c16
-rw-r--r--src/core/httpcli/httpcli.h5
-rw-r--r--src/core/iomgr/alarm.h85
-rw-r--r--src/core/iomgr/endpoint_pair.h (renamed from src/core/surface/surface_em.h)19
-rw-r--r--src/core/iomgr/endpoint_pair_posix.c61
-rw-r--r--src/core/iomgr/iomgr.h56
-rw-r--r--src/core/iomgr/iomgr_completion_queue_interface.h (renamed from src/core/surface/surface_em.c)26
-rw-r--r--src/core/iomgr/iomgr_libevent.c (renamed from src/core/eventmanager/em.c)508
-rw-r--r--src/core/iomgr/iomgr_libevent.h207
-rw-r--r--src/core/iomgr/iomgr_libevent_use_threads.c (renamed from src/core/eventmanager/em_posix.c)0
-rw-r--r--src/core/iomgr/resolve_address.h (renamed from src/core/endpoint/resolve_address.h)6
-rw-r--r--src/core/iomgr/resolve_address_posix.c (renamed from src/core/endpoint/resolve_address.c)7
-rw-r--r--src/core/iomgr/sockaddr.h47
-rw-r--r--src/core/iomgr/sockaddr_posix.h40
-rw-r--r--src/core/iomgr/sockaddr_utils.c (renamed from src/core/endpoint/socket_utils.c)114
-rw-r--r--src/core/iomgr/sockaddr_utils.h75
-rw-r--r--src/core/iomgr/sockaddr_win32.h (renamed from src/core/eventmanager/em_win32.c)7
-rw-r--r--src/core/iomgr/socket_utils_common_posix.c154
-rw-r--r--src/core/iomgr/socket_utils_linux.c (renamed from src/core/endpoint/socket_utils_linux.c)2
-rw-r--r--src/core/iomgr/socket_utils_posix.c (renamed from src/core/endpoint/socket_utils_posix.c)0
-rw-r--r--src/core/iomgr/socket_utils_posix.h (renamed from src/core/endpoint/socket_utils.h)46
-rw-r--r--src/core/iomgr/tcp_client.h (renamed from src/core/endpoint/tcp_client.h)17
-rw-r--r--src/core/iomgr/tcp_client_posix.c (renamed from src/core/endpoint/tcp_client.c)33
-rw-r--r--src/core/iomgr/tcp_posix.c (renamed from src/core/endpoint/tcp.c)54
-rw-r--r--src/core/iomgr/tcp_posix.h (renamed from src/core/endpoint/tcp.h)19
-rw-r--r--src/core/iomgr/tcp_server.h (renamed from src/core/endpoint/tcp_server.h)11
-rw-r--r--src/core/iomgr/tcp_server_posix.c (renamed from src/core/endpoint/tcp_server.c)40
-rw-r--r--src/core/security/credentials.c14
-rw-r--r--src/core/security/server_secure_chttp2.c7
-rw-r--r--src/core/surface/call.c18
-rw-r--r--src/core/surface/channel_create.c15
-rw-r--r--src/core/surface/completion_queue.c65
-rw-r--r--src/core/surface/init.c6
-rw-r--r--src/core/surface/secure_channel_create.c14
-rw-r--r--src/core/surface/server.c16
-rw-r--r--src/core/surface/server_chttp2.c7
-rw-r--r--src/core/transport/chttp2_transport.h2
40 files changed, 1150 insertions, 1033 deletions
diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c
index ea256706fd..29fe915add 100644
--- a/src/core/channel/client_setup.c
+++ b/src/core/channel/client_setup.c
@@ -34,6 +34,7 @@
#include "src/core/channel/client_setup.h"
#include "src/core/channel/channel_args.h"
#include "src/core/channel/channel_stack.h"
+#include "src/core/iomgr/alarm.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -45,8 +46,7 @@ struct grpc_client_setup {
void *user_data;
grpc_channel_args *args;
grpc_mdctx *mdctx;
- grpc_em *em;
- grpc_em_alarm backoff_alarm;
+ grpc_alarm backoff_alarm;
gpr_timespec current_backoff_interval;
int in_alarm;
@@ -115,7 +115,7 @@ static void setup_cancel(grpc_transport_setup *sp) {
/* effectively cancels the current request (if any) */
s->active_request = NULL;
if (s->in_alarm) {
- grpc_em_alarm_cancel(&s->backoff_alarm);
+ grpc_alarm_cancel(&s->backoff_alarm);
}
if (--s->refs == 0) {
gpr_mu_unlock(&s->mu);
@@ -133,7 +133,7 @@ void grpc_client_setup_create_and_attach(
grpc_channel_stack *newly_minted_channel, const grpc_channel_args *args,
grpc_mdctx *mdctx,
void (*initiate)(void *user_data, grpc_client_setup_request *request),
- void (*done)(void *user_data), void *user_data, grpc_em *em) {
+ void (*done)(void *user_data), void *user_data) {
grpc_client_setup *s = gpr_malloc(sizeof(grpc_client_setup));
s->base.vtable = &setup_vtable;
@@ -143,7 +143,6 @@ void grpc_client_setup_create_and_attach(
s->initiate = initiate;
s->done = done;
s->user_data = user_data;
- s->em = em;
s->active_request = NULL;
s->args = grpc_channel_args_copy(args);
s->current_backoff_interval = gpr_time_from_micros(1000000);
@@ -164,7 +163,7 @@ int grpc_client_setup_request_should_continue(grpc_client_setup_request *r) {
}
static void backoff_alarm_done(void *arg /* grpc_client_setup */,
- grpc_em_cb_status status) {
+ grpc_iomgr_cb_status status) {
grpc_client_setup *s = arg;
grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request));
r->setup = s;
@@ -215,9 +214,9 @@ void grpc_client_setup_request_finish(grpc_client_setup_request *r,
gpr_timespec max_backoff = gpr_time_from_micros(120000000);
GPR_ASSERT(!s->in_alarm);
s->in_alarm = 1;
- grpc_em_alarm_init(&s->backoff_alarm, s->em, backoff_alarm_done, s);
- grpc_em_alarm_add(&s->backoff_alarm,
- gpr_time_add(s->current_backoff_interval, gpr_now()));
+ grpc_alarm_init(&s->backoff_alarm, backoff_alarm_done, s);
+ grpc_alarm_add(&s->backoff_alarm,
+ gpr_time_add(s->current_backoff_interval, gpr_now()));
s->current_backoff_interval =
gpr_time_add(s->current_backoff_interval, s->current_backoff_interval);
if (gpr_time_cmp(s->current_backoff_interval, max_backoff) > 0) {
diff --git a/src/core/channel/client_setup.h b/src/core/channel/client_setup.h
index 862c1325a3..a508785e60 100644
--- a/src/core/channel/client_setup.h
+++ b/src/core/channel/client_setup.h
@@ -35,7 +35,6 @@
#define __GRPC_INTERNAL_CHANNEL_CLIENT_SETUP_H__
#include "src/core/channel/client_channel.h"
-#include "src/core/eventmanager/em.h"
#include "src/core/transport/metadata.h"
#include <grpc/support/time.h>
@@ -48,7 +47,7 @@ void grpc_client_setup_create_and_attach(
grpc_channel_stack *newly_minted_channel, const grpc_channel_args *args,
grpc_mdctx *mdctx,
void (*initiate)(void *user_data, grpc_client_setup_request *request),
- void (*done)(void *user_data), void *user_data, grpc_em *em);
+ void (*done)(void *user_data), void *user_data);
/* Check that r is the active request: needs to be performed at each callback.
If this races, we'll have two connection attempts running at once and the
diff --git a/src/core/eventmanager/em.h b/src/core/eventmanager/em.h
deleted file mode 100644
index f190bc8743..0000000000
--- a/src/core/eventmanager/em.h
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef __GRPC_INTERNAL_EVENTMANAGER_EM_H__
-#define __GRPC_INTERNAL_EVENTMANAGER_EM_H__
-/* grpc_em is an event manager wrapping event loop with multithread support.
- It executes a callback function when a specific event occurs on a file
- descriptor or after a timeout has passed.
- All methods are threadsafe and can be called from any thread.
-
- To use the event manager, a grpc_em instance needs to be initialized to
- maintains the internal states. The grpc_em instance can be used to
- initialize file descriptor instance of grpc_em_fd, or alarm instance of
- grpc_em_alarm. The former is used to register a callback with a IO event.
- The later is used to schedule an alarm.
-
- Instantiating any of these data structures requires including em_internal.h
- A typical usage example is shown in the end of that header file. */
-
-#include <grpc/support/atm.h>
-#include <grpc/support/sync.h>
-#include <grpc/support/thd.h>
-#include <grpc/support/time.h>
-
-/* =============== Enums used in GRPC event manager API ==================== */
-
-/* Result of a grpc_em operation */
-typedef enum grpc_em_error {
- GRPC_EM_OK = 0, /* everything went ok */
- GRPC_EM_ERROR, /* internal errors not caused by the caller */
- GRPC_EM_INVALID_ARGUMENTS /* invalid arguments from the caller */
-} grpc_em_error;
-
-/* Status passed to callbacks for grpc_em_fd_notify_on_read and
- grpc_em_fd_notify_on_write. */
-typedef enum grpc_em_cb_status {
- GRPC_CALLBACK_SUCCESS = 0,
- GRPC_CALLBACK_TIMED_OUT,
- GRPC_CALLBACK_CANCELLED,
- GRPC_CALLBACK_DO_NOT_USE
-} grpc_em_cb_status;
-
-/* ======= Useful forward struct typedefs for GRPC event manager API ======= */
-
-struct grpc_em;
-struct grpc_em_alarm;
-struct grpc_fd;
-
-typedef struct grpc_em grpc_em;
-typedef struct grpc_em_alarm grpc_em_alarm;
-typedef struct grpc_em_fd grpc_em_fd;
-
-/* gRPC Callback definition */
-typedef void (*grpc_em_cb_func)(void *arg, grpc_em_cb_status status);
-
-/* ============================ grpc_em =============================== */
-/* Initialize *em and start polling, return GRPC_EM_OK on success, return
- GRPC_EM_ERROR on failure. Upon failure, caller should call grpc_em_destroy()
- to clean partially initialized *em.
-
- Requires: *em uninitialized. */
-grpc_em_error grpc_em_init(grpc_em *em);
-
-/* Stop polling and cause *em no longer to be initialized.
- Return GRPC_EM_OK if event polling is cleanly stopped.
- Otherwise, return GRPC_EM_ERROR if polling is shutdown with errors.
- Requires: *em initialized; no other concurrent operation on *em. */
-grpc_em_error grpc_em_destroy(grpc_em *em);
-
-/* do some work; assumes em->mu locked; may unlock and relock em->mu */
-int grpc_em_work(grpc_em *em, gpr_timespec deadline);
-
-/* =========================== grpc_em_am ============================== */
-/* Initialize *alarm. When expired or canceled, alarm_cb will be called with
- *alarm_cb_arg and status to indicate if it expired (SUCCESS) or was
- canceled (CANCELLED). alarm_cb is guaranteed to be called exactly once,
- and application code should check the status to determine how it was
- invoked. The application callback is also responsible for maintaining
- information about when to free up any user-level state. */
-grpc_em_error grpc_em_alarm_init(grpc_em_alarm *alarm, grpc_em *em,
- grpc_em_cb_func alarm_cb, void *alarm_cb_arg);
-
-/* Note that there is no alarm destroy function. This is because the
- alarm is a one-time occurrence with a guarantee that the callback will
- be called exactly once, either at expiration or cancellation. Thus, all
- the internal alarm event management state is destroyed just before
- that callback is invoked. If the user has additional state associated with
- the alarm, the user is responsible for determining when it is safe to
- destroy that state. */
-
-/* Schedule *alarm to expire at deadline. If *alarm is
- re-added before expiration, the *delay is simply reset to the new value.
- Return GRPC_EM_OK on success, or GRPC_EM_ERROR on failure.
- Upon failure, caller should abort further operations on *alarm */
-grpc_em_error grpc_em_alarm_add(grpc_em_alarm *alarm, gpr_timespec deadline);
-
-/* Cancel an *alarm.
- There are three cases:
- 1. We normally cancel the alarm
- 2. The alarm has already run
- 3. We can't cancel the alarm because it is "in flight".
-
- In all of these cases, the cancellation is still considered successful.
- They are essentially distinguished in that the alarm_cb will be run
- exactly once from either the cancellation (with status CANCELLED)
- or from the activation (with status SUCCESS)
-
- Requires: cancel() must happen after add() on a given alarm */
-grpc_em_error grpc_em_alarm_cancel(grpc_em_alarm *alarm);
-
-/* ========================== grpc_em_fd ============================= */
-
-/* Initialize *em_fd, return GRPM_EM_OK on success, GRPC_EM_ERROR on internal
- errors, or GRPC_EM_INVALID_ARGUMENTS if fd is a blocking file descriptor.
- Upon failure, caller should call grpc_em_fd_destroy() to clean partially
- initialized *em_fd.
- fd is a non-blocking file descriptor.
-
- This takes ownership of closing fd.
-
- Requires: *em_fd uninitialized. fd is a non-blocking file descriptor. */
-grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd);
-
-/* Cause *em_fd no longer to be initialized and closes the underlying fd.
- Requires: *em_fd initialized; no outstanding notify_on_read or
- notify_on_write. */
-void grpc_em_fd_destroy(grpc_em_fd *em_fd);
-
-/* Returns the file descriptor associated with *em_fd. */
-int grpc_em_fd_get(grpc_em_fd *em_fd);
-
-/* Returns the event manager associated with *em_fd. */
-grpc_em *grpc_em_fd_get_em(grpc_em_fd *em_fd);
-
-/* Register read interest, causing read_cb to be called once when em_fd becomes
- readable, on deadline specified by deadline, or on shutdown triggered by
- grpc_em_fd_shutdown.
- Return GRPC_EM_OK on success, or GRPC_EM_ERROR on failure.
- Upon Failure, caller should abort further operations on *em_fd except
- grpc_em_fd_shutdown().
- read_cb will be called with read_cb_arg when *em_fd becomes readable.
- read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable,
- GRPC_CALLBACK_TIMED_OUT if the call timed out,
- and CANCELLED if the call was cancelled.
-
- Requires:This method must not be called before the read_cb for any previous
- call runs. Edge triggered events are used whenever they are supported by the
- underlying platform. This means that users must drain em_fd in read_cb before
- calling notify_on_read again. Users are also expected to handle spurious
- events, i.e read_cb is called while nothing can be readable from em_fd */
-grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd,
- grpc_em_cb_func read_cb,
- void *read_cb_arg,
- gpr_timespec deadline);
-
-/* Exactly the same semantics as above, except based on writable events. */
-grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *fd,
- grpc_em_cb_func write_cb,
- void *write_cb_arg,
- gpr_timespec deadline);
-
-/* Cause any current and all future read/write callbacks to error out with
- GRPC_CALLBACK_CANCELLED. */
-void grpc_em_fd_shutdown(grpc_em_fd *em_fd);
-
-/* ================== Other functions =================== */
-
-/* This function is called from within a callback or from anywhere else
- and causes the invocation of a callback at some point in the future */
-grpc_em_error grpc_em_add_callback(grpc_em *em, grpc_em_cb_func cb,
- void *cb_arg);
-
-/* ========== Declarations related to queue management (non-API) =========== */
-
-/* Forward declarations */
-struct grpc_em_activation_data;
-struct grpc_em_fd_impl;
-
-/* ================== Actual structure definitions ========================= */
-/* gRPC event manager handle.
- The handle is used to initialize both grpc_em_alarm and grpc_em_fd. */
-struct em_thread_arg;
-
-struct grpc_em {
- struct event_base *event_base;
-
- gpr_mu mu;
- gpr_cv cv;
- struct grpc_em_activation_data *q;
- int num_pollers;
- int num_fds;
- gpr_timespec last_poll_completed;
-
- int shutdown_backup_poller;
- gpr_event backup_poller_done;
-
- struct grpc_em_fd_impl *fds_to_free;
-
- struct event *timeout_ev; /* activated to break out of the event loop early */
-};
-
-/* gRPC event manager task "base class". This is pretend-inheritance in C89.
- This should be the first member of any actual grpc_em task type.
-
- Memory warning: expanding this will increase memory usage in any derived
- class, so be careful.
-
- For generality, this base can be on multiple task queues and can have
- multiple event callbacks registered. Not all "derived classes" will use
- this feature. */
-
-typedef enum grpc_em_task_type {
- GRPC_EM_TASK_ALARM,
- GRPC_EM_TASK_FD,
- GRPC_EM_TASK_DO_NOT_USE
-} grpc_em_task_type;
-
-/* Different activity types to shape the callback and queueing arrays */
-typedef enum grpc_em_task_activity_type {
- GRPC_EM_TA_READ, /* use this also for single-type events */
- GRPC_EM_TA_WRITE,
- GRPC_EM_TA_COUNT
-} grpc_em_task_activity_type;
-
-/* Include the following #define for convenience for tasks like alarms that
- only have a single type */
-#define GRPC_EM_TA_ONLY GRPC_EM_TA_READ
-
-typedef struct grpc_em_activation_data {
- struct event *ev; /* event activated on this callback type */
- grpc_em_cb_func cb; /* function pointer for callback */
- void *arg; /* argument passed to cb */
-
- /* Hold the status associated with the callback when queued */
- grpc_em_cb_status status;
- /* Now set up to link activations into scheduler queues */
- struct grpc_em_activation_data *prev;
- struct grpc_em_activation_data *next;
-} grpc_em_activation_data;
-
-typedef struct grpc_em_task {
- grpc_em_task_type type;
- grpc_em *em;
-
- /* Now have an array of activation data elements: one for each activity
- type that could get activated */
- grpc_em_activation_data activation[GRPC_EM_TA_COUNT];
-} grpc_em_task;
-
-/* gRPC alarm handle.
- The handle is used to add an alarm which expires after specified timeout. */
-struct grpc_em_alarm {
- grpc_em_task task; /* Include the base class */
-
- gpr_atm triggered; /* To be used atomically if alarm triggered */
-};
-
-/* =================== Event caching ===================
- In order to not miss or double-return edges in the context of edge triggering
- and multithreading, we need a per-fd caching layer in the eventmanager itself
- to cache relevant events.
-
- There are two types of events we care about: calls to notify_on_[read|write]
- and readable/writable events for the socket from eventfd. There are separate
- event caches for read and write.
-
- There are three states:
- 0. "waiting" -- There's been a call to notify_on_[read|write] which has not
- had a corresponding event. In other words, we're waiting for an event so we
- can run the callback.
- 1. "idle" -- We are neither waiting nor have a cached event.
- 2. "cached" -- There has been a read/write event without a waiting callback,
- so we want to run the event next time the application calls
- notify_on_[read|write].
-
- The high level state diagram:
-
- +--------------------------------------------------------------------+
- | WAITING | IDLE | CACHED |
- | | | |
- | 1. --*-> 2. --+-> 3. --+\
- | | | <--+/
- | | | |
- x+-- 6. 5. <-+-- 4. <-*-- |
- | | | |
- +--------------------------------------------------------------------+
-
- Transitions right occur on read|write events. Transitions left occur on
- notify_on_[read|write] events.
- State transitions:
- 1. Read|Write event while waiting -> run the callback and transition to idle.
- 2. Read|Write event while idle -> transition to cached.
- 3. Read|Write event with one already cached -> still cached.
- 4. notify_on_[read|write] with event cached: run callback and transition to
- idle.
- 5. notify_on_[read|write] when idle: Store callback and transition to
- waiting.
- 6. notify_on_[read|write] when waiting: invalid. */
-
-typedef enum grpc_em_fd_state {
- GRPC_EM_FD_WAITING = 0,
- GRPC_EM_FD_IDLE = 1,
- GRPC_EM_FD_CACHED = 2
-} grpc_em_fd_state;
-
-struct grpc_em_fd_impl;
-
-/* gRPC file descriptor handle.
- The handle is used to register read/write callbacks to a file descriptor */
-struct grpc_em_fd {
- struct grpc_em_fd_impl *impl;
-};
-
-#endif /* __GRPC_INTERNAL_EVENTMANAGER_EM_H__ */
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 6c0a688390..84a97a4783 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -36,8 +36,8 @@
#include <string.h>
#include "src/core/endpoint/endpoint.h"
-#include "src/core/endpoint/resolve_address.h"
-#include "src/core/endpoint/tcp_client.h"
+#include "src/core/iomgr/resolve_address.h"
+#include "src/core/iomgr/tcp_client.h"
#include "src/core/httpcli/format_request.h"
#include "src/core/httpcli/httpcli_security_context.h"
#include "src/core/httpcli/parser.h"
@@ -54,7 +54,6 @@ typedef struct {
grpc_resolved_addresses *addresses;
size_t next_address;
grpc_endpoint *ep;
- grpc_em *em;
char *host;
gpr_timespec deadline;
int have_read_byte;
@@ -200,9 +199,8 @@ static void next_address(internal_request *req) {
return;
}
addr = &req->addresses->addrs[req->next_address++];
- grpc_tcp_client_connect(on_connected, req, req->em,
- (struct sockaddr *)&addr->addr, addr->len,
- req->deadline);
+ grpc_tcp_client_connect(on_connected, req, (struct sockaddr *)&addr->addr,
+ addr->len, req->deadline);
}
static void on_resolved(void *arg, grpc_resolved_addresses *addresses) {
@@ -217,7 +215,7 @@ static void on_resolved(void *arg, grpc_resolved_addresses *addresses) {
}
void grpc_httpcli_get(const grpc_httpcli_request *request,
- gpr_timespec deadline, grpc_em *em,
+ gpr_timespec deadline,
grpc_httpcli_response_cb on_response, void *user_data) {
internal_request *req = gpr_malloc(sizeof(internal_request));
memset(req, 0, sizeof(*req));
@@ -225,7 +223,6 @@ void grpc_httpcli_get(const grpc_httpcli_request *request,
grpc_httpcli_parser_init(&req->parser);
req->on_response = on_response;
req->user_data = user_data;
- req->em = em;
req->deadline = deadline;
req->use_ssl = request->use_ssl;
if (req->use_ssl) {
@@ -238,7 +235,7 @@ void grpc_httpcli_get(const grpc_httpcli_request *request,
void grpc_httpcli_post(const grpc_httpcli_request *request,
const char *body_bytes, size_t body_size,
- gpr_timespec deadline, grpc_em *em,
+ gpr_timespec deadline,
grpc_httpcli_response_cb on_response, void *user_data) {
internal_request *req = gpr_malloc(sizeof(internal_request));
memset(req, 0, sizeof(*req));
@@ -247,7 +244,6 @@ void grpc_httpcli_post(const grpc_httpcli_request *request,
grpc_httpcli_parser_init(&req->parser);
req->on_response = on_response;
req->user_data = user_data;
- req->em = em;
req->deadline = deadline;
req->use_ssl = request->use_ssl;
if (req->use_ssl) {
diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h
index aef0edfdd4..56eebe951e 100644
--- a/src/core/httpcli/httpcli.h
+++ b/src/core/httpcli/httpcli.h
@@ -36,7 +36,6 @@
#include <stddef.h>
-#include "src/core/eventmanager/em.h"
#include <grpc/support/time.h>
/* User agent this library reports */
@@ -90,7 +89,7 @@ typedef void (*grpc_httpcli_response_cb)(void *user_data,
'on_response' is a callback to report results to (and 'user_data' is a user
supplied pointer to pass to said call) */
void grpc_httpcli_get(const grpc_httpcli_request *request,
- gpr_timespec deadline, grpc_em *em,
+ gpr_timespec deadline,
grpc_httpcli_response_cb on_response, void *user_data);
/* Asynchronously perform a HTTP POST.
@@ -98,7 +97,7 @@ void grpc_httpcli_get(const grpc_httpcli_request *request,
Does not support ?var1=val1&var2=val2 in the path. */
void grpc_httpcli_post(const grpc_httpcli_request *request,
const char *body_bytes, size_t body_size,
- gpr_timespec deadline, grpc_em *em,
+ gpr_timespec deadline,
grpc_httpcli_response_cb on_response, void *user_data);
#endif /* __GRPC_INTERNAL_HTTPCLI_HTTPCLI_H__ */
diff --git a/src/core/iomgr/alarm.h b/src/core/iomgr/alarm.h
new file mode 100644
index 0000000000..5bd00d9736
--- /dev/null
+++ b/src/core/iomgr/alarm.h
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_ALARM_H__
+#define __GRPC_INTERNAL_IOMGR_ALARM_H__
+
+#include "src/core/iomgr/iomgr.h"
+#include <grpc/support/port_platform.h>
+#include <grpc/support/time.h>
+
+typedef struct grpc_alarm grpc_alarm;
+
+/* One of the following headers should provide struct grpc_alarm */
+#ifdef GPR_LIBEVENT
+#include "src/core/iomgr/iomgr_libevent.h"
+#endif
+
+/* Initialize *alarm. When expired or canceled, alarm_cb will be called with
+ *alarm_cb_arg and status to indicate if it expired (SUCCESS) or was
+ canceled (CANCELLED). alarm_cb is guaranteed to be called exactly once,
+ and application code should check the status to determine how it was
+ invoked. The application callback is also responsible for maintaining
+ information about when to free up any user-level state. */
+void grpc_alarm_init(grpc_alarm *alarm, grpc_iomgr_cb_func alarm_cb,
+ void *alarm_cb_arg);
+
+/* Note that there is no alarm destroy function. This is because the
+ alarm is a one-time occurrence with a guarantee that the callback will
+ be called exactly once, either at expiration or cancellation. Thus, all
+ the internal alarm event management state is destroyed just before
+ that callback is invoked. If the user has additional state associated with
+ the alarm, the user is responsible for determining when it is safe to
+ destroy that state. */
+
+/* Schedule *alarm to expire at deadline. If *alarm is
+ re-added before expiration, the *delay is simply reset to the new value.
+ Return GRPC_EM_OK on success, or GRPC_EM_ERROR on failure.
+ Upon failure, caller should abort further operations on *alarm */
+int grpc_alarm_add(grpc_alarm *alarm, gpr_timespec deadline);
+
+/* Cancel an *alarm.
+ There are three cases:
+ 1. We normally cancel the alarm
+ 2. The alarm has already run
+ 3. We can't cancel the alarm because it is "in flight".
+
+ In all of these cases, the cancellation is still considered successful.
+ They are essentially distinguished in that the alarm_cb will be run
+ exactly once from either the cancellation (with status CANCELLED)
+ or from the activation (with status SUCCESS)
+
+ Requires: cancel() must happen after add() on a given alarm */
+int grpc_alarm_cancel(grpc_alarm *alarm);
+
+#endif /* __GRPC_INTERNAL_IOMGR_ALARM_H__ */
diff --git a/src/core/surface/surface_em.h b/src/core/iomgr/endpoint_pair.h
index 165f42f868..4a97ebf0f6 100644
--- a/src/core/surface/surface_em.h
+++ b/src/core/iomgr/endpoint_pair.h
@@ -31,17 +31,16 @@
*
*/
-#ifndef __GRPC_INTERNAL_SURFACE_SURFACE_EM_H__
-#define __GRPC_INTERNAL_SURFACE_SURFACE_EM_H__
+#ifndef __GRPC_INTERNAL_IOMGR_ENDPOINT_PAIR_H_
+#define __GRPC_INTERNAL_IOMGR_ENDPOINT_PAIR_H_
-#include "src/core/eventmanager/em.h"
+#include "src/core/endpoint/endpoint.h"
-/* Returns a global singleton event manager for
- the surface apis, and is passed down to channels and
- transports as needed. */
-grpc_em *grpc_surface_em();
+typedef struct {
+ grpc_endpoint *client;
+ grpc_endpoint *server;
+} grpc_endpoint_pair;
-void grpc_surface_em_init();
-void grpc_surface_em_shutdown();
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size);
-#endif /* __GRPC_INTERNAL_SURFACE_SURFACE_EM_H__ */
+#endif /* __GRPC_INTERNAL_IOMGR_ENDPOINT_PAIR_H_ */
diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c
new file mode 100644
index 0000000000..f08d1344eb
--- /dev/null
+++ b/src/core/iomgr/endpoint_pair_posix.c
@@ -0,0 +1,61 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/endpoint_pair.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include "src/core/iomgr/tcp_posix.h"
+#include <grpc/support/log.h>
+
+static void create_sockets(int sv[2]) {
+ int flags;
+ GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
+ flags = fcntl(sv[0], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
+ flags = fcntl(sv[1], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
+}
+
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size) {
+ int sv[2];
+ grpc_endpoint_pair p;
+ create_sockets(sv);
+ p.client = grpc_tcp_create(grpc_fd_create(sv[1]), read_slice_size);
+ p.server = grpc_tcp_create(grpc_fd_create(sv[0]), read_slice_size);
+ return p;
+}
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
new file mode 100644
index 0000000000..cf39f947bc
--- /dev/null
+++ b/src/core/iomgr/iomgr.h
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_H__
+#define __GRPC_INTERNAL_IOMGR_IOMGR_H__
+
+/* Status passed to callbacks for grpc_em_fd_notify_on_read and
+ grpc_em_fd_notify_on_write. */
+typedef enum grpc_em_cb_status {
+ GRPC_CALLBACK_SUCCESS = 0,
+ GRPC_CALLBACK_TIMED_OUT,
+ GRPC_CALLBACK_CANCELLED,
+ GRPC_CALLBACK_DO_NOT_USE
+} grpc_iomgr_cb_status;
+
+/* gRPC Callback definition */
+typedef void (*grpc_iomgr_cb_func)(void *arg, grpc_iomgr_cb_status status);
+
+void grpc_iomgr_init();
+void grpc_iomgr_shutdown();
+
+/* This function is called from within a callback or from anywhere else
+ and causes the invocation of a callback at some point in the future */
+void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg);
+
+#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_H__ */
diff --git a/src/core/surface/surface_em.c b/src/core/iomgr/iomgr_completion_queue_interface.h
index e1785d1a44..3c4efe773a 100644
--- a/src/core/surface/surface_em.c
+++ b/src/core/iomgr/iomgr_completion_queue_interface.h
@@ -31,25 +31,15 @@
*
*/
-#include "src/core/surface/surface_em.h"
-#include <grpc/support/log.h>
+#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_
+#define __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_
-static int initialized = 0;
-static grpc_em em;
+/* Internals of iomgr that are exposed only to be used for completion queue
+ implementation */
-grpc_em *grpc_surface_em() {
- GPR_ASSERT(initialized && "call grpc_init()");
- return &em;
-}
+extern gpr_mu grpc_iomgr_mu;
+extern gpr_cv grpc_iomgr_cv;
-void grpc_surface_em_init() {
- GPR_ASSERT(!initialized);
- initialized = 1;
- grpc_em_init(&em);
-}
+int grpc_iomgr_work(gpr_timespec deadline);
-void grpc_surface_em_shutdown() {
- GPR_ASSERT(initialized);
- grpc_em_destroy(&em);
- initialized = 0;
-}
+#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ */
diff --git a/src/core/eventmanager/em.c b/src/core/iomgr/iomgr_libevent.c
index 0dc6c6a6d0..1af03dcf12 100644
--- a/src/core/eventmanager/em.c
+++ b/src/core/iomgr/iomgr_libevent.c
@@ -31,71 +31,66 @@
*
*/
-#include "src/core/eventmanager/em.h"
+#include "src/core/iomgr/iomgr_libevent.h"
#include <unistd.h>
#include <fcntl.h>
+#include "src/core/iomgr/alarm.h"
#include <grpc/support/atm.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <event2/event.h>
#include <event2/thread.h>
-int evthread_use_threads(void);
-
-static void grpc_em_fd_impl_destroy(struct grpc_em_fd_impl *impl);
-
#define ALARM_TRIGGER_INIT ((gpr_atm)0)
#define ALARM_TRIGGER_INCREMENT ((gpr_atm)1)
#define DONE_SHUTDOWN ((void *)1)
#define POLLER_ID_INVALID ((gpr_atm)-1)
-typedef struct grpc_em_fd_impl {
- grpc_em_task task; /* Base class, callbacks, queues, etc */
- int fd; /* File descriptor */
-
- /* Note that the shutdown event is only needed as a workaround for libevent
- not properly handling event_active on an in flight event. */
- struct event *shutdown_ev; /* activated to trigger shutdown */
-
- /* protect shutdown_started|read_state|write_state and ensure barriers
- between notify_on_[read|write] and read|write callbacks */
- gpr_mu mu;
- int shutdown_started; /* 0 -> shutdown not started, 1 -> started */
- grpc_em_fd_state read_state;
- grpc_em_fd_state write_state;
+/* Global data */
+struct event_base *g_event_base;
+gpr_mu grpc_iomgr_mu;
+gpr_cv grpc_iomgr_cv;
+static grpc_libevent_activation_data *g_activation_queue;
+static int g_num_pollers;
+static int g_num_fds;
+static gpr_timespec g_last_poll_completed;
+static int g_shutdown_backup_poller;
+static gpr_event g_backup_poller_done;
+/* activated to break out of the event loop early */
+static struct event *g_timeout_ev;
+static grpc_fd *g_fds_to_free;
- /* descriptor delete list. These are destroyed during polling. */
- struct grpc_em_fd_impl *next;
-} grpc_em_fd_impl;
-
-/* ================== grpc_em implementation ===================== */
+int evthread_use_threads(void);
+static void grpc_fd_impl_destroy(grpc_fd *impl);
/* If anything is in the work queue, process one item and return 1.
Return 0 if there were no work items to complete.
- Requires em->mu locked, may unlock and relock during the call. */
-static int maybe_do_queue_work(grpc_em *em) {
- grpc_em_activation_data *work = em->q;
+ Requires grpc_iomgr_mu locked, may unlock and relock during the call. */
+static int maybe_do_queue_work() {
+ grpc_libevent_activation_data *work = g_activation_queue;
if (work == NULL) return 0;
if (work->next == work) {
- em->q = NULL;
+ g_activation_queue = NULL;
} else {
- em->q = work->next;
- em->q->prev = work->prev;
- em->q->next->prev = em->q->prev->next = em->q;
+ g_activation_queue = work->next;
+ g_activation_queue->prev = work->prev;
+ g_activation_queue->next->prev = g_activation_queue->prev->next =
+ g_activation_queue;
}
work->next = work->prev = NULL;
- gpr_mu_unlock(&em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
work->cb(work->arg, work->status);
- gpr_mu_lock(&em->mu);
+ gpr_mu_lock(&grpc_iomgr_mu);
return 1;
}
@@ -104,58 +99,59 @@ static void timer_callback(int fd, short events, void *context) {
event_base_loopbreak((struct event_base *)context);
}
-static void free_fd_list(grpc_em_fd_impl *impl) {
+static void free_fd_list(grpc_fd *impl) {
while (impl != NULL) {
- grpc_em_fd_impl *current = impl;
+ grpc_fd *current = impl;
impl = impl->next;
- grpc_em_fd_impl_destroy(current);
+ grpc_fd_impl_destroy(current);
gpr_free(current);
}
}
+static void maybe_free_fds() {
+ if (g_fds_to_free) {
+ free_fd_list(g_fds_to_free);
+ g_fds_to_free = NULL;
+ }
+}
+
/* Spend some time doing polling and libevent maintenance work if no other
thread is. This includes both polling for events and destroying/closing file
descriptor objects.
Returns 1 if polling was performed, 0 otherwise.
- Requires em->mu locked, may unlock and relock during the call. */
-static int maybe_do_polling_work(grpc_em *em, struct timeval delay) {
+ Requires grpc_iomgr_mu locked, may unlock and relock during the call. */
+static int maybe_do_polling_work(struct timeval delay) {
int status;
- if (em->num_pollers) return 0;
+ if (g_num_pollers) return 0;
- em->num_pollers = 1;
+ g_num_pollers = 1;
- free_fd_list(em->fds_to_free);
- em->fds_to_free = NULL;
+ maybe_free_fds();
- gpr_mu_unlock(&em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
- event_add(em->timeout_ev, &delay);
- status = event_base_loop(em->event_base, EVLOOP_ONCE);
+ event_add(g_timeout_ev, &delay);
+ status = event_base_loop(g_event_base, EVLOOP_ONCE);
if (status < 0) {
gpr_log(GPR_ERROR, "event polling loop stops with error status %d", status);
}
- event_del(em->timeout_ev);
+ event_del(g_timeout_ev);
- gpr_mu_lock(&em->mu);
- if (em->fds_to_free) {
- free_fd_list(em->fds_to_free);
- em->fds_to_free = NULL;
- }
+ gpr_mu_lock(&grpc_iomgr_mu);
+ maybe_free_fds();
- em->num_pollers = 0;
- gpr_cv_broadcast(&em->cv);
+ g_num_pollers = 0;
+ gpr_cv_broadcast(&grpc_iomgr_cv);
return 1;
}
-int grpc_em_work(grpc_em *em, gpr_timespec deadline) {
+int grpc_iomgr_work(gpr_timespec deadline) {
gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
/* poll for no longer than one second */
gpr_timespec max_delay = {1, 0};
struct timeval delay;
- GPR_ASSERT(em);
-
if (gpr_time_cmp(delay_timespec, gpr_time_0) <= 0) {
return 0;
}
@@ -166,8 +162,8 @@ int grpc_em_work(grpc_em *em, gpr_timespec deadline) {
delay = gpr_timeval_from_timespec(delay_timespec);
- if (maybe_do_queue_work(em) || maybe_do_polling_work(em, delay)) {
- em->last_poll_completed = gpr_now();
+ if (maybe_do_queue_work() || maybe_do_polling_work(delay)) {
+ g_last_poll_completed = gpr_now();
return 1;
}
@@ -175,158 +171,154 @@ int grpc_em_work(grpc_em *em, gpr_timespec deadline) {
}
static void backup_poller_thread(void *p) {
- grpc_em *em = p;
int backup_poller_engaged = 0;
/* allow no pollers for 100 milliseconds, then engage backup polling */
gpr_timespec allow_no_pollers = gpr_time_from_micros(100 * 1000);
- gpr_mu_lock(&em->mu);
- while (!em->shutdown_backup_poller) {
- if (em->num_pollers == 0) {
+ gpr_mu_lock(&grpc_iomgr_mu);
+ while (!g_shutdown_backup_poller) {
+ if (g_num_pollers == 0) {
gpr_timespec now = gpr_now();
gpr_timespec time_until_engage = gpr_time_sub(
- allow_no_pollers, gpr_time_sub(now, em->last_poll_completed));
+ allow_no_pollers, gpr_time_sub(now, g_last_poll_completed));
if (gpr_time_cmp(time_until_engage, gpr_time_0) <= 0) {
if (!backup_poller_engaged) {
gpr_log(GPR_DEBUG, "No pollers for a while - engaging backup poller");
backup_poller_engaged = 1;
}
- if (!maybe_do_queue_work(em)) {
+ if (!maybe_do_queue_work()) {
struct timeval tv = {1, 0};
- maybe_do_polling_work(em, tv);
+ maybe_do_polling_work(tv);
}
} else {
if (backup_poller_engaged) {
gpr_log(GPR_DEBUG, "Backup poller disengaged");
backup_poller_engaged = 0;
}
- gpr_mu_unlock(&em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
gpr_sleep_until(gpr_time_add(now, time_until_engage));
- gpr_mu_lock(&em->mu);
+ gpr_mu_lock(&grpc_iomgr_mu);
}
} else {
if (backup_poller_engaged) {
gpr_log(GPR_DEBUG, "Backup poller disengaged");
backup_poller_engaged = 0;
}
- gpr_cv_wait(&em->cv, &em->mu, gpr_inf_future);
+ gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, gpr_inf_future);
}
}
- gpr_mu_unlock(&em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
- gpr_event_set(&em->backup_poller_done, (void *)1);
+ gpr_event_set(&g_backup_poller_done, (void *)1);
}
-grpc_em_error grpc_em_init(grpc_em *em) {
+void grpc_iomgr_init() {
gpr_thd_id backup_poller_id;
if (evthread_use_threads() != 0) {
gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!");
- return GRPC_EM_ERROR;
+ abort();
}
- gpr_mu_init(&em->mu);
- gpr_cv_init(&em->cv);
- em->q = NULL;
- em->num_pollers = 0;
- em->num_fds = 0;
- em->last_poll_completed = gpr_now();
- em->shutdown_backup_poller = 0;
- em->fds_to_free = NULL;
+ gpr_mu_init(&grpc_iomgr_mu);
+ gpr_cv_init(&grpc_iomgr_cv);
+ g_activation_queue = NULL;
+ g_num_pollers = 0;
+ g_num_fds = 0;
+ g_last_poll_completed = gpr_now();
+ g_shutdown_backup_poller = 0;
+ g_fds_to_free = NULL;
- gpr_event_init(&em->backup_poller_done);
+ gpr_event_init(&g_backup_poller_done);
- em->event_base = NULL;
- em->timeout_ev = NULL;
+ g_event_base = NULL;
+ g_timeout_ev = NULL;
- em->event_base = event_base_new();
- if (!em->event_base) {
+ g_event_base = event_base_new();
+ if (!g_event_base) {
gpr_log(GPR_ERROR, "Failed to create the event base");
- return GRPC_EM_ERROR;
+ abort();
}
- if (evthread_make_base_notifiable(em->event_base) != 0) {
+ if (evthread_make_base_notifiable(g_event_base) != 0) {
gpr_log(GPR_ERROR, "Couldn't make event base notifiable cross threads!");
- return GRPC_EM_ERROR;
+ abort();
}
- em->timeout_ev = evtimer_new(em->event_base, timer_callback, em->event_base);
+ g_timeout_ev = evtimer_new(g_event_base, timer_callback, g_event_base);
- gpr_thd_new(&backup_poller_id, backup_poller_thread, em, NULL);
-
- return GRPC_EM_OK;
+ gpr_thd_new(&backup_poller_id, backup_poller_thread, NULL, NULL);
}
-grpc_em_error grpc_em_destroy(grpc_em *em) {
+void grpc_iomgr_shutdown() {
gpr_timespec fd_shutdown_deadline =
- gpr_time_add(gpr_now(), gpr_time_from_micros(10 * 1000 * 1000));
+ gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
/* broadcast shutdown */
- gpr_mu_lock(&em->mu);
- while (em->num_fds) {
+ gpr_mu_lock(&grpc_iomgr_mu);
+ while (g_num_fds) {
gpr_log(GPR_INFO,
"waiting for %d fds to be destroyed before closing event manager",
- em->num_fds);
- if (gpr_cv_wait(&em->cv, &em->mu, fd_shutdown_deadline)) {
+ g_num_fds);
+ if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, fd_shutdown_deadline)) {
gpr_log(GPR_ERROR,
"not all fds destroyed before shutdown deadline: memory leaks "
"are likely");
break;
- } else if (em->num_fds == 0) {
+ } else if (g_num_fds == 0) {
gpr_log(GPR_INFO, "all fds closed");
}
}
- em->shutdown_backup_poller = 1;
- gpr_cv_broadcast(&em->cv);
- gpr_mu_unlock(&em->mu);
+ g_shutdown_backup_poller = 1;
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_mu_unlock(&grpc_iomgr_mu);
- gpr_event_wait(&em->backup_poller_done, gpr_inf_future);
+ gpr_event_wait(&g_backup_poller_done, gpr_inf_future);
/* drain pending work */
- gpr_mu_lock(&em->mu);
- while (maybe_do_queue_work(em))
+ gpr_mu_lock(&grpc_iomgr_mu);
+ while (maybe_do_queue_work())
;
- gpr_mu_unlock(&em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
- free_fd_list(em->fds_to_free);
+ free_fd_list(g_fds_to_free);
/* complete shutdown */
- gpr_mu_destroy(&em->mu);
- gpr_cv_destroy(&em->cv);
+ gpr_mu_destroy(&grpc_iomgr_mu);
+ gpr_cv_destroy(&grpc_iomgr_cv);
- if (em->timeout_ev != NULL) {
- event_free(em->timeout_ev);
+ if (g_timeout_ev != NULL) {
+ event_free(g_timeout_ev);
}
- if (em->event_base != NULL) {
- event_base_free(em->event_base);
- em->event_base = NULL;
+ if (g_event_base != NULL) {
+ event_base_free(g_event_base);
+ g_event_base = NULL;
}
-
- return GRPC_EM_OK;
}
-static void add_task(grpc_em *em, grpc_em_activation_data *adata) {
- gpr_mu_lock(&em->mu);
- if (em->q) {
- adata->next = em->q;
+static void add_task(grpc_libevent_activation_data *adata) {
+ gpr_mu_lock(&grpc_iomgr_mu);
+ if (g_activation_queue) {
+ adata->next = g_activation_queue;
adata->prev = adata->next->prev;
adata->next->prev = adata->prev->next = adata;
} else {
- em->q = adata;
+ g_activation_queue = adata;
adata->next = adata->prev = adata;
}
- gpr_cv_broadcast(&em->cv);
- gpr_mu_unlock(&em->mu);
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
-/* ===============grpc_em_alarm implementation==================== */
+/* ===============grpc_alarm implementation==================== */
/* The following function frees up the alarm's libevent structure and
should always be invoked just before calling the alarm's callback */
-static void alarm_ev_destroy(grpc_em_alarm *alarm) {
- grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
+static void alarm_ev_destroy(grpc_alarm *alarm) {
+ grpc_libevent_activation_data *adata =
+ &alarm->task.activation[GRPC_EM_TA_ONLY];
if (adata->ev != NULL) {
/* TODO(klempner): Is this safe to do when we're cancelling? */
event_free(adata->ev);
@@ -335,8 +327,9 @@ static void alarm_ev_destroy(grpc_em_alarm *alarm) {
}
/* Proxy callback triggered by alarm->ev to call alarm->cb */
static void libevent_alarm_cb(int fd, short what, void *arg /*=alarm*/) {
- grpc_em_alarm *alarm = arg;
- grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
+ grpc_alarm *alarm = arg;
+ grpc_libevent_activation_data *adata =
+ &alarm->task.activation[GRPC_EM_TA_ONLY];
int trigger_old;
/* First check if this alarm has been canceled, atomically */
@@ -346,26 +339,26 @@ static void libevent_alarm_cb(int fd, short what, void *arg /*=alarm*/) {
/* Before invoking user callback, destroy the libevent structure */
alarm_ev_destroy(alarm);
adata->status = GRPC_CALLBACK_SUCCESS;
- add_task(alarm->task.em, adata);
+ add_task(adata);
}
}
-grpc_em_error grpc_em_alarm_init(grpc_em_alarm *alarm, grpc_em *em,
- grpc_em_cb_func alarm_cb, void *alarm_cb_arg) {
- grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
+void grpc_alarm_init(grpc_alarm *alarm, grpc_iomgr_cb_func alarm_cb,
+ void *alarm_cb_arg) {
+ grpc_libevent_activation_data *adata =
+ &alarm->task.activation[GRPC_EM_TA_ONLY];
alarm->task.type = GRPC_EM_TASK_ALARM;
- alarm->task.em = em;
gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT);
adata->cb = alarm_cb;
adata->arg = alarm_cb_arg;
adata->prev = NULL;
adata->next = NULL;
adata->ev = NULL;
- return GRPC_EM_OK;
}
-grpc_em_error grpc_em_alarm_add(grpc_em_alarm *alarm, gpr_timespec deadline) {
- grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
+int grpc_alarm_add(grpc_alarm *alarm, gpr_timespec deadline) {
+ grpc_libevent_activation_data *adata =
+ &alarm->task.activation[GRPC_EM_TA_ONLY];
gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
if (adata->ev) {
@@ -373,20 +366,17 @@ grpc_em_error grpc_em_alarm_add(grpc_em_alarm *alarm, gpr_timespec deadline) {
gpr_log(GPR_INFO, "Adding an alarm that already has an event.");
adata->ev = NULL;
}
- adata->ev = evtimer_new(alarm->task.em->event_base, libevent_alarm_cb, alarm);
+ adata->ev = evtimer_new(g_event_base, libevent_alarm_cb, alarm);
/* Set the trigger field to untriggered. Do this as the last store since
it is a release of previous stores. */
gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT);
- if (adata->ev != NULL && evtimer_add(adata->ev, &delay) == 0) {
- return GRPC_EM_OK;
- } else {
- return GRPC_EM_ERROR;
- }
+ return adata->ev != NULL && evtimer_add(adata->ev, &delay) == 0;
}
-grpc_em_error grpc_em_alarm_cancel(grpc_em_alarm *alarm) {
- grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
+int grpc_alarm_cancel(grpc_alarm *alarm) {
+ grpc_libevent_activation_data *adata =
+ &alarm->task.activation[GRPC_EM_TA_ONLY];
int trigger_old;
/* First check if this alarm has been triggered, atomically */
@@ -400,25 +390,44 @@ grpc_em_error grpc_em_alarm_cancel(grpc_em_alarm *alarm) {
if (evtimer_del(adata->ev) != 0) {
/* The delete was unsuccessful for some reason. */
gpr_log(GPR_ERROR, "Attempt to delete alarm event was unsuccessful");
- return GRPC_EM_ERROR;
+ return 0;
}
/* Free up the event structure before invoking callback */
alarm_ev_destroy(alarm);
adata->status = GRPC_CALLBACK_CANCELLED;
- add_task(alarm->task.em, adata);
+ add_task(adata);
}
- return GRPC_EM_OK;
+ return 1;
}
-/* ==================== grpc_em_fd implementation =================== */
+static void grpc_fd_impl_destroy(grpc_fd *impl) {
+ grpc_em_task_activity_type type;
+ grpc_libevent_activation_data *adata;
+
+ for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) {
+ adata = &(impl->task.activation[type]);
+ GPR_ASSERT(adata->next == NULL);
+ if (adata->ev != NULL) {
+ event_free(adata->ev);
+ adata->ev = NULL;
+ }
+ }
+
+ if (impl->shutdown_ev != NULL) {
+ event_free(impl->shutdown_ev);
+ impl->shutdown_ev = NULL;
+ }
+ gpr_mu_destroy(&impl->mu);
+ close(impl->fd);
+}
/* Proxy callback to call a gRPC read/write callback */
-static void em_fd_cb(int fd, short what, void *arg /*=em_fd_impl*/) {
- grpc_em_fd_impl *em_fd = arg;
- grpc_em_cb_status status = GRPC_CALLBACK_SUCCESS;
+static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) {
+ grpc_fd *em_fd = arg;
+ grpc_iomgr_cb_status status = GRPC_CALLBACK_SUCCESS;
int run_read_cb = 0;
int run_write_cb = 0;
- grpc_em_activation_data *rdata, *wdata;
+ grpc_libevent_activation_data *rdata, *wdata;
gpr_mu_lock(&em_fd->mu);
if (em_fd->shutdown_started) {
@@ -433,35 +442,35 @@ static void em_fd_cb(int fd, short what, void *arg /*=em_fd_impl*/) {
if (what & EV_READ) {
switch (em_fd->read_state) {
- case GRPC_EM_FD_WAITING:
+ case GRPC_FD_WAITING:
run_read_cb = 1;
- em_fd->read_state = GRPC_EM_FD_IDLE;
+ em_fd->read_state = GRPC_FD_IDLE;
break;
- case GRPC_EM_FD_IDLE:
- case GRPC_EM_FD_CACHED:
- em_fd->read_state = GRPC_EM_FD_CACHED;
+ case GRPC_FD_IDLE:
+ case GRPC_FD_CACHED:
+ em_fd->read_state = GRPC_FD_CACHED;
}
}
if (what & EV_WRITE) {
switch (em_fd->write_state) {
- case GRPC_EM_FD_WAITING:
+ case GRPC_FD_WAITING:
run_write_cb = 1;
- em_fd->write_state = GRPC_EM_FD_IDLE;
+ em_fd->write_state = GRPC_FD_IDLE;
break;
- case GRPC_EM_FD_IDLE:
- case GRPC_EM_FD_CACHED:
- em_fd->write_state = GRPC_EM_FD_CACHED;
+ case GRPC_FD_IDLE:
+ case GRPC_FD_CACHED:
+ em_fd->write_state = GRPC_FD_CACHED;
}
}
if (run_read_cb) {
rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]);
rdata->status = status;
- add_task(em_fd->task.em, rdata);
+ add_task(rdata);
} else if (run_write_cb) {
wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]);
wdata->status = status;
- add_task(em_fd->task.em, wdata);
+ add_task(wdata);
}
gpr_mu_unlock(&em_fd->mu);
}
@@ -471,41 +480,34 @@ static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) {
that libevent's handling of event_active() on an event which is already in
flight on a different thread is racy and easily triggers TSAN.
*/
- grpc_em_fd_impl *impl = arg;
+ grpc_fd *impl = arg;
gpr_mu_lock(&impl->mu);
impl->shutdown_started = 1;
- if (impl->read_state == GRPC_EM_FD_WAITING) {
+ if (impl->read_state == GRPC_FD_WAITING) {
event_active(impl->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1);
}
- if (impl->write_state == GRPC_EM_FD_WAITING) {
+ if (impl->write_state == GRPC_FD_WAITING) {
event_active(impl->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1);
}
gpr_mu_unlock(&impl->mu);
}
-grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) {
+grpc_fd *grpc_fd_create(int fd) {
int flags;
- grpc_em_activation_data *rdata, *wdata;
- grpc_em_fd_impl *impl = gpr_malloc(sizeof(grpc_em_fd_impl));
+ grpc_libevent_activation_data *rdata, *wdata;
+ grpc_fd *impl = gpr_malloc(sizeof(grpc_fd));
- gpr_mu_lock(&em->mu);
- em->num_fds++;
-
- gpr_mu_unlock(&em->mu);
-
- em_fd->impl = impl;
+ gpr_mu_lock(&grpc_iomgr_mu);
+ g_num_fds++;
+ gpr_mu_unlock(&grpc_iomgr_mu);
impl->shutdown_ev = NULL;
gpr_mu_init(&impl->mu);
flags = fcntl(fd, F_GETFL, 0);
- if ((flags & O_NONBLOCK) == 0) {
- gpr_log(GPR_ERROR, "File descriptor %d is blocking", fd);
- return GRPC_EM_INVALID_ARGUMENTS;
- }
+ GPR_ASSERT((flags & O_NONBLOCK) != 0);
impl->task.type = GRPC_EM_TASK_FD;
- impl->task.em = em;
impl->fd = fd;
rdata = &(impl->task.activation[GRPC_EM_TA_READ]);
@@ -524,100 +526,57 @@ grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) {
wdata->prev = NULL;
wdata->next = NULL;
- impl->read_state = GRPC_EM_FD_IDLE;
- impl->write_state = GRPC_EM_FD_IDLE;
+ impl->read_state = GRPC_FD_IDLE;
+ impl->write_state = GRPC_FD_IDLE;
impl->shutdown_started = 0;
impl->next = NULL;
/* TODO(chenw): detect platforms where only level trigger is supported,
and set the event to non-persist. */
- rdata->ev = event_new(em->event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ,
+ rdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ,
em_fd_cb, impl);
- if (!rdata->ev) {
- gpr_log(GPR_ERROR, "Failed to create read event");
- return GRPC_EM_ERROR;
- }
+ GPR_ASSERT(rdata->ev);
- wdata->ev = event_new(em->event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE,
+ wdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE,
em_fd_cb, impl);
- if (!wdata->ev) {
- gpr_log(GPR_ERROR, "Failed to create write event");
- return GRPC_EM_ERROR;
- }
+ GPR_ASSERT(wdata->ev);
impl->shutdown_ev =
- event_new(em->event_base, -1, EV_READ, em_fd_shutdown_cb, impl);
-
- if (!impl->shutdown_ev) {
- gpr_log(GPR_ERROR, "Failed to create shutdown event");
- return GRPC_EM_ERROR;
- }
-
- return GRPC_EM_OK;
-}
-
-static void grpc_em_fd_impl_destroy(grpc_em_fd_impl *impl) {
- grpc_em_task_activity_type type;
- grpc_em_activation_data *adata;
-
- for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) {
- adata = &(impl->task.activation[type]);
- GPR_ASSERT(adata->next == NULL);
- if (adata->ev != NULL) {
- event_free(adata->ev);
- adata->ev = NULL;
- }
- }
+ event_new(g_event_base, -1, EV_READ, em_fd_shutdown_cb, impl);
+ GPR_ASSERT(impl->shutdown_ev);
- if (impl->shutdown_ev != NULL) {
- event_free(impl->shutdown_ev);
- impl->shutdown_ev = NULL;
- }
- gpr_mu_destroy(&impl->mu);
- close(impl->fd);
+ return impl;
}
-void grpc_em_fd_destroy(grpc_em_fd *em_fd) {
- grpc_em_fd_impl *impl = em_fd->impl;
- grpc_em *em = impl->task.em;
+void grpc_fd_destroy(grpc_fd *impl) {
+ gpr_mu_lock(&grpc_iomgr_mu);
- gpr_mu_lock(&em->mu);
-
- if (em->num_pollers == 0) {
+ if (g_num_pollers == 0) {
/* it is safe to simply free it */
- grpc_em_fd_impl_destroy(impl);
+ grpc_fd_impl_destroy(impl);
gpr_free(impl);
} else {
/* Put the impl on the list to be destroyed by the poller. */
- impl->next = em->fds_to_free;
- em->fds_to_free = impl;
- /* Kick the poller so it closes the fd promptly.
- * TODO(klempner): maybe this should be a different event.
- */
- event_active(em_fd->impl->shutdown_ev, EV_READ, 1);
+ impl->next = g_fds_to_free;
+ g_fds_to_free = impl;
+ /* TODO(ctiller): kick the poller so it destroys this fd promptly
+ (currently we may wait up to a second) */
}
- em->num_fds--;
- gpr_cv_broadcast(&em->cv);
- gpr_mu_unlock(&em->mu);
+ g_num_fds--;
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
-int grpc_em_fd_get(struct grpc_em_fd *em_fd) { return em_fd->impl->fd; }
-
-/* Returns the event manager associated with *em_fd. */
-grpc_em *grpc_em_fd_get_em(grpc_em_fd *em_fd) { return em_fd->impl->task.em; }
+int grpc_fd_get(struct grpc_fd *em_fd) { return em_fd->fd; }
/* TODO(chenw): should we enforce the contract that notify_on_read cannot be
called when the previously registered callback has not been called yet. */
-grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd,
- grpc_em_cb_func read_cb,
- void *read_cb_arg,
- gpr_timespec deadline) {
- grpc_em_fd_impl *impl = em_fd->impl;
+int grpc_fd_notify_on_read(grpc_fd *impl, grpc_iomgr_cb_func read_cb,
+ void *read_cb_arg, gpr_timespec deadline) {
int force_event = 0;
- grpc_em_activation_data *rdata;
- grpc_em_error result = GRPC_EM_OK;
+ grpc_libevent_activation_data *rdata;
gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
struct timeval *delayp =
@@ -629,27 +588,23 @@ grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd,
rdata->cb = read_cb;
rdata->arg = read_cb_arg;
- force_event =
- (impl->shutdown_started || impl->read_state == GRPC_EM_FD_CACHED);
- impl->read_state = GRPC_EM_FD_WAITING;
+ force_event = (impl->shutdown_started || impl->read_state == GRPC_FD_CACHED);
+ impl->read_state = GRPC_FD_WAITING;
if (force_event) {
event_active(rdata->ev, EV_READ, 1);
} else if (event_add(rdata->ev, delayp) == -1) {
- result = GRPC_EM_ERROR;
+ gpr_mu_unlock(&impl->mu);
+ return 0;
}
gpr_mu_unlock(&impl->mu);
- return result;
+ return 1;
}
-grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *em_fd,
- grpc_em_cb_func write_cb,
- void *write_cb_arg,
- gpr_timespec deadline) {
- grpc_em_fd_impl *impl = em_fd->impl;
+int grpc_fd_notify_on_write(grpc_fd *impl, grpc_iomgr_cb_func write_cb,
+ void *write_cb_arg, gpr_timespec deadline) {
int force_event = 0;
- grpc_em_activation_data *wdata;
- grpc_em_error result = GRPC_EM_OK;
+ grpc_libevent_activation_data *wdata;
gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
struct timeval *delayp =
@@ -661,25 +616,23 @@ grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *em_fd,
wdata->cb = write_cb;
wdata->arg = write_cb_arg;
- force_event =
- (impl->shutdown_started || impl->write_state == GRPC_EM_FD_CACHED);
- impl->write_state = GRPC_EM_FD_WAITING;
+ force_event = (impl->shutdown_started || impl->write_state == GRPC_FD_CACHED);
+ impl->write_state = GRPC_FD_WAITING;
if (force_event) {
event_active(wdata->ev, EV_WRITE, 1);
} else if (event_add(wdata->ev, delayp) == -1) {
- result = GRPC_EM_ERROR;
+ gpr_mu_unlock(&impl->mu);
+ return 0;
}
gpr_mu_unlock(&impl->mu);
- return result;
+ return 1;
}
-void grpc_em_fd_shutdown(grpc_em_fd *em_fd) {
- event_active(em_fd->impl->shutdown_ev, EV_READ, 1);
+void grpc_fd_shutdown(grpc_fd *em_fd) {
+ event_active(em_fd->shutdown_ev, EV_READ, 1);
}
-/*====================== Other callback functions ======================*/
-
/* Sometimes we want a followup callback: something to be added from the
current callback for the EM to invoke once this callback is complete.
This is implemented by inserting an entry into an EM queue. */
@@ -689,27 +642,23 @@ void grpc_em_fd_shutdown(grpc_em_fd *em_fd) {
the function to use for the followup callback, and the
activation data pointer used for the queues (to free in the CB) */
struct followup_callback_arg {
- grpc_em_cb_func func;
+ grpc_iomgr_cb_func func;
void *cb_arg;
- grpc_em_activation_data adata;
+ grpc_libevent_activation_data adata;
};
-static void followup_proxy_callback(void *cb_arg, grpc_em_cb_status status) {
+static void followup_proxy_callback(void *cb_arg, grpc_iomgr_cb_status status) {
struct followup_callback_arg *fcb_arg = cb_arg;
/* Invoke the function */
fcb_arg->func(fcb_arg->cb_arg, status);
gpr_free(fcb_arg);
}
-grpc_em_error grpc_em_add_callback(grpc_em *em, grpc_em_cb_func cb,
- void *cb_arg) {
- grpc_em_activation_data *adptr;
+void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) {
+ grpc_libevent_activation_data *adptr;
struct followup_callback_arg *fcb_arg;
fcb_arg = gpr_malloc(sizeof(*fcb_arg));
- if (fcb_arg == NULL) {
- return GRPC_EM_ERROR;
- }
/* Set up the activation data and followup callback argument structures */
adptr = &fcb_arg->adata;
adptr->ev = NULL;
@@ -723,6 +672,5 @@ grpc_em_error grpc_em_add_callback(grpc_em *em, grpc_em_cb_func cb,
fcb_arg->cb_arg = cb_arg;
/* Insert an activation data for the specified em */
- add_task(em, adptr);
- return GRPC_EM_OK;
+ add_task(adptr);
}
diff --git a/src/core/iomgr/iomgr_libevent.h b/src/core/iomgr/iomgr_libevent.h
new file mode 100644
index 0000000000..77e7b59989
--- /dev/null
+++ b/src/core/iomgr/iomgr_libevent.h
@@ -0,0 +1,207 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__
+#define __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__
+
+#include "src/core/iomgr/iomgr.h"
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+
+typedef struct grpc_fd grpc_fd;
+
+/* gRPC event manager task "base class". This is pretend-inheritance in C89.
+ This should be the first member of any actual grpc_em task type.
+
+ Memory warning: expanding this will increase memory usage in any derived
+ class, so be careful.
+
+ For generality, this base can be on multiple task queues and can have
+ multiple event callbacks registered. Not all "derived classes" will use
+ this feature. */
+
+typedef enum grpc_libevent_task_type {
+ GRPC_EM_TASK_ALARM,
+ GRPC_EM_TASK_FD,
+ GRPC_EM_TASK_DO_NOT_USE
+} grpc_libevent_task_type;
+
+/* Different activity types to shape the callback and queueing arrays */
+typedef enum grpc_em_task_activity_type {
+ GRPC_EM_TA_READ, /* use this also for single-type events */
+ GRPC_EM_TA_WRITE,
+ GRPC_EM_TA_COUNT
+} grpc_em_task_activity_type;
+
+/* Include the following #define for convenience for tasks like alarms that
+ only have a single type */
+#define GRPC_EM_TA_ONLY GRPC_EM_TA_READ
+
+typedef struct grpc_libevent_activation_data {
+ struct event *ev; /* event activated on this callback type */
+ grpc_iomgr_cb_func cb; /* function pointer for callback */
+ void *arg; /* argument passed to cb */
+
+ /* Hold the status associated with the callback when queued */
+ grpc_iomgr_cb_status status;
+ /* Now set up to link activations into scheduler queues */
+ struct grpc_libevent_activation_data *prev;
+ struct grpc_libevent_activation_data *next;
+} grpc_libevent_activation_data;
+
+typedef struct grpc_libevent_task {
+ grpc_libevent_task_type type;
+
+ /* Now have an array of activation data elements: one for each activity
+ type that could get activated */
+ grpc_libevent_activation_data activation[GRPC_EM_TA_COUNT];
+} grpc_libevent_task;
+
+/* Initialize *em_fd.
+ Requires fd is a non-blocking file descriptor.
+
+ This takes ownership of closing fd.
+
+ Requires: *em_fd uninitialized. fd is a non-blocking file descriptor. */
+grpc_fd *grpc_fd_create(int fd);
+
+/* Cause *em_fd no longer to be initialized and closes the underlying fd.
+ Requires: *em_fd initialized; no outstanding notify_on_read or
+ notify_on_write. */
+void grpc_fd_destroy(grpc_fd *em_fd);
+
+/* Returns the file descriptor associated with *em_fd. */
+int grpc_fd_get(grpc_fd *em_fd);
+
+/* Register read interest, causing read_cb to be called once when em_fd becomes
+ readable, on deadline specified by deadline, or on shutdown triggered by
+ grpc_fd_shutdown.
+ read_cb will be called with read_cb_arg when *em_fd becomes readable.
+ read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable,
+ GRPC_CALLBACK_TIMED_OUT if the call timed out,
+ and CANCELLED if the call was cancelled.
+
+ Requires:This method must not be called before the read_cb for any previous
+ call runs. Edge triggered events are used whenever they are supported by the
+ underlying platform. This means that users must drain em_fd in read_cb before
+ calling notify_on_read again. Users are also expected to handle spurious
+ events, i.e read_cb is called while nothing can be readable from em_fd */
+int grpc_fd_notify_on_read(grpc_fd *em_fd, grpc_iomgr_cb_func read_cb,
+ void *read_cb_arg, gpr_timespec deadline);
+
+/* Exactly the same semantics as above, except based on writable events. */
+int grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
+ void *write_cb_arg, gpr_timespec deadline);
+
+/* Cause any current and all future read/write callbacks to error out with
+ GRPC_CALLBACK_CANCELLED. */
+void grpc_fd_shutdown(grpc_fd *em_fd);
+
+/* =================== Event caching ===================
+ In order to not miss or double-return edges in the context of edge triggering
+ and multithreading, we need a per-fd caching layer in the eventmanager itself
+ to cache relevant events.
+
+ There are two types of events we care about: calls to notify_on_[read|write]
+ and readable/writable events for the socket from eventfd. There are separate
+ event caches for read and write.
+
+ There are three states:
+ 0. "waiting" -- There's been a call to notify_on_[read|write] which has not
+ had a corresponding event. In other words, we're waiting for an event so we
+ can run the callback.
+ 1. "idle" -- We are neither waiting nor have a cached event.
+ 2. "cached" -- There has been a read/write event without a waiting callback,
+ so we want to run the event next time the application calls
+ notify_on_[read|write].
+
+ The high level state diagram:
+
+ +--------------------------------------------------------------------+
+ | WAITING | IDLE | CACHED |
+ | | | |
+ | 1. --*-> 2. --+-> 3. --+\
+ | | | <--+/
+ | | | |
+ x+-- 6. 5. <-+-- 4. <-*-- |
+ | | | |
+ +--------------------------------------------------------------------+
+
+ Transitions right occur on read|write events. Transitions left occur on
+ notify_on_[read|write] events.
+ State transitions:
+ 1. Read|Write event while waiting -> run the callback and transition to idle.
+ 2. Read|Write event while idle -> transition to cached.
+ 3. Read|Write event with one already cached -> still cached.
+ 4. notify_on_[read|write] with event cached: run callback and transition to
+ idle.
+ 5. notify_on_[read|write] when idle: Store callback and transition to
+ waiting.
+ 6. notify_on_[read|write] when waiting: invalid. */
+
+typedef enum grpc_fd_state {
+ GRPC_FD_WAITING = 0,
+ GRPC_FD_IDLE = 1,
+ GRPC_FD_CACHED = 2
+} grpc_fd_state;
+
+/* gRPC file descriptor handle.
+ The handle is used to register read/write callbacks to a file descriptor */
+struct grpc_fd {
+ grpc_libevent_task task; /* Base class, callbacks, queues, etc */
+ int fd; /* File descriptor */
+
+ /* Note that the shutdown event is only needed as a workaround for libevent
+ not properly handling event_active on an in flight event. */
+ struct event *shutdown_ev; /* activated to trigger shutdown */
+
+ /* protect shutdown_started|read_state|write_state and ensure barriers
+ between notify_on_[read|write] and read|write callbacks */
+ gpr_mu mu;
+ int shutdown_started; /* 0 -> shutdown not started, 1 -> started */
+ grpc_fd_state read_state;
+ grpc_fd_state write_state;
+
+ /* descriptor delete list. These are destroyed during polling. */
+ struct grpc_fd *next;
+};
+
+/* gRPC alarm handle.
+ The handle is used to add an alarm which expires after specified timeout. */
+struct grpc_alarm {
+ grpc_libevent_task task; /* Include the base class */
+
+ gpr_atm triggered; /* To be used atomically if alarm triggered */
+};
+
+#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */
diff --git a/src/core/eventmanager/em_posix.c b/src/core/iomgr/iomgr_libevent_use_threads.c
index af449342f0..af449342f0 100644
--- a/src/core/eventmanager/em_posix.c
+++ b/src/core/iomgr/iomgr_libevent_use_threads.c
diff --git a/src/core/endpoint/resolve_address.h b/src/core/iomgr/resolve_address.h
index cc32c47cef..37ec0f0335 100644
--- a/src/core/endpoint/resolve_address.h
+++ b/src/core/iomgr/resolve_address.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef __GRPC_INTERNAL_ENDPOINT_RESOLVE_ADDRESS_H__
-#define __GRPC_INTERNAL_ENDPOINT_RESOLVE_ADDRESS_H__
+#ifndef __GRPC_INTERNAL_IOMGR_RESOLVE_ADDRESS_H__
+#define __GRPC_INTERNAL_IOMGR_RESOLVE_ADDRESS_H__
#include <sys/socket.h>
@@ -64,4 +64,4 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses);
grpc_resolved_addresses *grpc_blocking_resolve_address(
const char *addr, const char *default_port);
-#endif /* __GRPC_INTERNAL_ENDPOINT_RESOLVE_ADDRESS_H__ */
+#endif /* __GRPC_INTERNAL_IOMGR_RESOLVE_ADDRESS_H__ */
diff --git a/src/core/endpoint/resolve_address.c b/src/core/iomgr/resolve_address_posix.c
index 1993b9bdc5..d3ea3780ce 100644
--- a/src/core/endpoint/resolve_address.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -33,7 +33,7 @@
#define _POSIX_SOURCE
-#include "src/core/endpoint/resolve_address.h"
+#include "src/core/iomgr/resolve_address.h"
#include <sys/types.h>
#include <sys/socket.h>
@@ -41,10 +41,11 @@
#include <unistd.h>
#include <string.h>
-#include "src/core/endpoint/socket_utils.h"
+#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/support/alloc.h>
-#include <grpc/support/string.h>
#include <grpc/support/log.h>
+#include <grpc/support/string.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
diff --git a/src/core/iomgr/sockaddr.h b/src/core/iomgr/sockaddr.h
new file mode 100644
index 0000000000..b980b3029f
--- /dev/null
+++ b/src/core/iomgr/sockaddr.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_SOCKADDR_H_
+#define __GRPC_INTERNAL_IOMGR_SOCKADDR_H_
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_WIN32
+#include "src/core/iomgr/sockaddr_win32.h"
+#endif
+
+#ifdef GPR_POSIX_SOCKETADDR
+#include "src/core/iomgr/sockaddr_posix.h"
+#endif
+
+#endif /* __GRPC_INTERNAL_IOMGR_SOCKADDR_H_ */
diff --git a/src/core/iomgr/sockaddr_posix.h b/src/core/iomgr/sockaddr_posix.h
new file mode 100644
index 0000000000..79ef3ca3cf
--- /dev/null
+++ b/src/core/iomgr/sockaddr_posix.h
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_SOCKADDR_POSIX_H_
+#define __GRPC_INTERNAL_IOMGR_SOCKADDR_POSIX_H_
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+#endif /* __GRPC_INTERNAL_IOMGR_SOCKADDR_POSIX_H_ */
diff --git a/src/core/endpoint/socket_utils.c b/src/core/iomgr/sockaddr_utils.c
index ef160d7ea4..f709d35162 100644
--- a/src/core/endpoint/socket_utils.c
+++ b/src/core/iomgr/sockaddr_utils.c
@@ -31,127 +31,17 @@
*
*/
-#include "src/core/endpoint/socket_utils.h"
+#include "src/core/iomgr/sockaddr_utils.h"
#include <arpa/inet.h>
-#include <limits.h>
-#include <fcntl.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <stdio.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <unistd.h>
-#include <string.h>
#include <errno.h>
+#include <string.h>
#include <grpc/support/host_port.h>
#include <grpc/support/string.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
-/* set a socket to non blocking mode */
-int grpc_set_socket_nonblocking(int fd, int non_blocking) {
- int oldflags = fcntl(fd, F_GETFL, 0);
- if (oldflags < 0) {
- return 0;
- }
-
- if (non_blocking) {
- oldflags |= O_NONBLOCK;
- } else {
- oldflags &= ~O_NONBLOCK;
- }
-
- if (fcntl(fd, F_SETFL, oldflags) != 0) {
- return 0;
- }
-
- return 1;
-}
-
-/* set a socket to close on exec */
-int grpc_set_socket_cloexec(int fd, int close_on_exec) {
- int oldflags = fcntl(fd, F_GETFD, 0);
- if (oldflags < 0) {
- return 0;
- }
-
- if (close_on_exec) {
- oldflags |= FD_CLOEXEC;
- } else {
- oldflags &= ~FD_CLOEXEC;
- }
-
- if (fcntl(fd, F_SETFD, oldflags) != 0) {
- return 0;
- }
-
- return 1;
-}
-
-/* set a socket to reuse old addresses */
-int grpc_set_socket_reuse_addr(int fd, int reuse) {
- int val = (reuse != 0);
- int newval;
- socklen_t intlen = sizeof(newval);
- return 0 == setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) &&
- 0 == getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen) &&
- newval == val;
-}
-
-/* disable nagle */
-int grpc_set_socket_low_latency(int fd, int low_latency) {
- int val = (low_latency != 0);
- int newval;
- socklen_t intlen = sizeof(newval);
- return 0 == setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) &&
- 0 == getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen) &&
- newval == val;
-}
-
-/* This should be 0 in production, but it may be enabled for testing or
- debugging purposes, to simulate an environment where IPv6 sockets can't
- also speak IPv4. */
-int grpc_forbid_dualstack_sockets_for_testing = 0;
-
-static int set_socket_dualstack(int fd) {
- if (!grpc_forbid_dualstack_sockets_for_testing) {
- const int off = 0;
- return 0 == setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof(off));
- } else {
- /* Force an IPv6-only socket, for testing purposes. */
- const int on = 1;
- setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on));
- return 0;
- }
-}
-
-int grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
- int protocol, grpc_dualstack_mode *dsmode) {
- int family = addr->sa_family;
- if (family == AF_INET6) {
- int fd = socket(family, type, protocol);
- /* Check if we've got a valid dualstack socket. */
- if (fd >= 0 && set_socket_dualstack(fd)) {
- *dsmode = GRPC_DSMODE_DUALSTACK;
- return fd;
- }
- /* If this isn't an IPv4 address, then return whatever we've got. */
- if (!grpc_sockaddr_is_v4mapped(addr, NULL)) {
- *dsmode = GRPC_DSMODE_IPV6;
- return fd;
- }
- /* Fall back to AF_INET. */
- if (fd >= 0) {
- close(fd);
- }
- family = AF_INET;
- }
- *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE;
- return socket(family, type, protocol);
-}
-
static const gpr_uint8 kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0xff, 0xff};
diff --git a/src/core/iomgr/sockaddr_utils.h b/src/core/iomgr/sockaddr_utils.h
new file mode 100644
index 0000000000..753d0c824a
--- /dev/null
+++ b/src/core/iomgr/sockaddr_utils.h
@@ -0,0 +1,75 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_SOCKADDR_UTILS_H__
+#define __GRPC_INTERNAL_IOMGR_SOCKADDR_UTILS_H__
+
+#include "src/core/iomgr/sockaddr.h"
+
+/* Returns true if addr is an IPv4-mapped IPv6 address within the
+ ::ffff:0.0.0.0/96 range, or false otherwise.
+
+ If addr4_out is non-NULL, the inner IPv4 address will be copied here when
+ returning true. */
+int grpc_sockaddr_is_v4mapped(const struct sockaddr *addr,
+ struct sockaddr_in *addr4_out);
+
+/* If addr is an AF_INET address, writes the corresponding ::ffff:0.0.0.0/96
+ address to addr6_out and returns true. Otherwise returns false. */
+int grpc_sockaddr_to_v4mapped(const struct sockaddr *addr,
+ struct sockaddr_in6 *addr6_out);
+
+/* If addr is ::, 0.0.0.0, or ::ffff:0.0.0.0, writes the port number to
+ *port_out (if not NULL) and returns true, otherwise returns false. */
+int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out);
+
+/* Writes 0.0.0.0:port and [::]:port to separate sockaddrs. */
+void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out,
+ struct sockaddr_in6 *wild6_out);
+
+/* Converts a sockaddr into a newly-allocated human-readable string.
+
+ Currently, only the AF_INET and AF_INET6 families are recognized.
+ If the normalize flag is enabled, ::ffff:0.0.0.0/96 IPv6 addresses are
+ displayed as plain IPv4.
+
+ Usage is similar to gpr_asprintf: returns the number of bytes written
+ (excluding the final '\0'), and *out points to a string which must later be
+ destroyed using gpr_free().
+
+ In the unlikely event of an error, returns -1 and sets *out to NULL.
+ The existing value of errno is always preserved. */
+int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr,
+ int normalize);
+
+#endif /* __GRPC_INTERNAL_IOMGR_SOCKADDR_UTILS_H__ */
diff --git a/src/core/eventmanager/em_win32.c b/src/core/iomgr/sockaddr_win32.h
index 4d5c3b5126..751ac3d2e7 100644
--- a/src/core/eventmanager/em_win32.c
+++ b/src/core/iomgr/sockaddr_win32.h
@@ -31,8 +31,7 @@
*
*/
-/* Windows event manager support code. */
-#include <event2/thread.h>
+#ifndef __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_
+#define __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_
-/* Notify LibEvent that Windows thread is used. */
-int evthread_use_threads() { return evthread_use_windows_threads(); }
+#endif // __GRPC_INTERNAL_IOMGR_SOCKADDR_WIN32_H_
diff --git a/src/core/iomgr/socket_utils_common_posix.c b/src/core/iomgr/socket_utils_common_posix.c
new file mode 100644
index 0000000000..0767d6f918
--- /dev/null
+++ b/src/core/iomgr/socket_utils_common_posix.c
@@ -0,0 +1,154 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/socket_utils_posix.h"
+
+#include <arpa/inet.h>
+#include <limits.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+
+#include "src/core/iomgr/sockaddr_utils.h"
+#include <grpc/support/host_port.h>
+#include <grpc/support/string.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+
+/* set a socket to non blocking mode */
+int grpc_set_socket_nonblocking(int fd, int non_blocking) {
+ int oldflags = fcntl(fd, F_GETFL, 0);
+ if (oldflags < 0) {
+ return 0;
+ }
+
+ if (non_blocking) {
+ oldflags |= O_NONBLOCK;
+ } else {
+ oldflags &= ~O_NONBLOCK;
+ }
+
+ if (fcntl(fd, F_SETFL, oldflags) != 0) {
+ return 0;
+ }
+
+ return 1;
+}
+
+/* set a socket to close on exec */
+int grpc_set_socket_cloexec(int fd, int close_on_exec) {
+ int oldflags = fcntl(fd, F_GETFD, 0);
+ if (oldflags < 0) {
+ return 0;
+ }
+
+ if (close_on_exec) {
+ oldflags |= FD_CLOEXEC;
+ } else {
+ oldflags &= ~FD_CLOEXEC;
+ }
+
+ if (fcntl(fd, F_SETFD, oldflags) != 0) {
+ return 0;
+ }
+
+ return 1;
+}
+
+/* set a socket to reuse old addresses */
+int grpc_set_socket_reuse_addr(int fd, int reuse) {
+ int val = (reuse != 0);
+ int newval;
+ socklen_t intlen = sizeof(newval);
+ return 0 == setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) &&
+ 0 == getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen) &&
+ newval == val;
+}
+
+/* disable nagle */
+int grpc_set_socket_low_latency(int fd, int low_latency) {
+ int val = (low_latency != 0);
+ int newval;
+ socklen_t intlen = sizeof(newval);
+ return 0 == setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) &&
+ 0 == getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen) &&
+ newval == val;
+}
+
+/* This should be 0 in production, but it may be enabled for testing or
+ debugging purposes, to simulate an environment where IPv6 sockets can't
+ also speak IPv4. */
+int grpc_forbid_dualstack_sockets_for_testing = 0;
+
+static int set_socket_dualstack(int fd) {
+ if (!grpc_forbid_dualstack_sockets_for_testing) {
+ const int off = 0;
+ return 0 == setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof(off));
+ } else {
+ /* Force an IPv6-only socket, for testing purposes. */
+ const int on = 1;
+ setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on));
+ return 0;
+ }
+}
+
+int grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
+ int protocol, grpc_dualstack_mode *dsmode) {
+ int family = addr->sa_family;
+ if (family == AF_INET6) {
+ int fd = socket(family, type, protocol);
+ /* Check if we've got a valid dualstack socket. */
+ if (fd >= 0 && set_socket_dualstack(fd)) {
+ *dsmode = GRPC_DSMODE_DUALSTACK;
+ return fd;
+ }
+ /* If this isn't an IPv4 address, then return whatever we've got. */
+ if (!grpc_sockaddr_is_v4mapped(addr, NULL)) {
+ *dsmode = GRPC_DSMODE_IPV6;
+ return fd;
+ }
+ /* Fall back to AF_INET. */
+ if (fd >= 0) {
+ close(fd);
+ }
+ family = AF_INET;
+ }
+ *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE;
+ return socket(family, type, protocol);
+}
diff --git a/src/core/endpoint/socket_utils_linux.c b/src/core/iomgr/socket_utils_linux.c
index 479675ec7d..f971cb33bc 100644
--- a/src/core/endpoint/socket_utils_linux.c
+++ b/src/core/iomgr/socket_utils_linux.c
@@ -36,7 +36,7 @@
#ifdef GPR_LINUX
-#include "src/core/endpoint/socket_utils.h"
+#include "src/core/iomgr/socket_utils_posix.h"
#include <sys/types.h>
#include <sys/socket.h>
diff --git a/src/core/endpoint/socket_utils_posix.c b/src/core/iomgr/socket_utils_posix.c
index 262d606af9..262d606af9 100644
--- a/src/core/endpoint/socket_utils_posix.c
+++ b/src/core/iomgr/socket_utils_posix.c
diff --git a/src/core/endpoint/socket_utils.h b/src/core/iomgr/socket_utils_posix.h
index 23fa19284a..5c31e5e6d8 100644
--- a/src/core/endpoint/socket_utils.h
+++ b/src/core/iomgr/socket_utils_posix.h
@@ -31,16 +31,12 @@
*
*/
-#ifndef __GRPC_INTERNAL_ENDPOINT_SOCKET_UTILS_H__
-#define __GRPC_INTERNAL_ENDPOINT_SOCKET_UTILS_H__
+#ifndef __GRPC_INTERNAL_IOMGR_SOCKET_UTILS_POSIX_H__
+#define __GRPC_INTERNAL_IOMGR_SOCKET_UTILS_POSIX_H__
#include <unistd.h>
#include <sys/socket.h>
-struct sockaddr;
-struct sockaddr_in;
-struct sockaddr_in6;
-
/* a wrapper for accept or accept4 */
int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen,
int nonblock, int cloexec);
@@ -99,40 +95,4 @@ extern int grpc_forbid_dualstack_sockets_for_testing;
int grpc_create_dualstack_socket(const struct sockaddr *addr, int type,
int protocol, grpc_dualstack_mode *dsmode);
-/* Returns true if addr is an IPv4-mapped IPv6 address within the
- ::ffff:0.0.0.0/96 range, or false otherwise.
-
- If addr4_out is non-NULL, the inner IPv4 address will be copied here when
- returning true. */
-int grpc_sockaddr_is_v4mapped(const struct sockaddr *addr,
- struct sockaddr_in *addr4_out);
-
-/* If addr is an AF_INET address, writes the corresponding ::ffff:0.0.0.0/96
- address to addr6_out and returns true. Otherwise returns false. */
-int grpc_sockaddr_to_v4mapped(const struct sockaddr *addr,
- struct sockaddr_in6 *addr6_out);
-
-/* If addr is ::, 0.0.0.0, or ::ffff:0.0.0.0, writes the port number to
- *port_out (if not NULL) and returns true, otherwise returns false. */
-int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out);
-
-/* Writes 0.0.0.0:port and [::]:port to separate sockaddrs. */
-void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out,
- struct sockaddr_in6 *wild6_out);
-
-/* Converts a sockaddr into a newly-allocated human-readable string.
-
- Currently, only the AF_INET and AF_INET6 families are recognized.
- If the normalize flag is enabled, ::ffff:0.0.0.0/96 IPv6 addresses are
- displayed as plain IPv4.
-
- Usage is similar to gpr_asprintf: returns the number of bytes written
- (excluding the final '\0'), and *out points to a string which must later be
- destroyed using gpr_free().
-
- In the unlikely event of an error, returns -1 and sets *out to NULL.
- The existing value of errno is always preserved. */
-int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr,
- int normalize);
-
-#endif /* __GRPC_INTERNAL_ENDPOINT_SOCKET_UTILS_H__ */
+#endif /* __GRPC_INTERNAL_IOMGR_SOCKET_UTILS_POSIX_H__ */
diff --git a/src/core/endpoint/tcp_client.h b/src/core/iomgr/tcp_client.h
index 69b1b62f37..a4632d81cf 100644
--- a/src/core/endpoint/tcp_client.h
+++ b/src/core/iomgr/tcp_client.h
@@ -31,21 +31,18 @@
*
*/
-#ifndef __GRPC_INTERNAL_ENDPOINT_TCP_CLIENT_H__
-#define __GRPC_INTERNAL_ENDPOINT_TCP_CLIENT_H__
+#ifndef __GRPC_INTERNAL_IOMGR_TCP_CLIENT_H__
+#define __GRPC_INTERNAL_IOMGR_TCP_CLIENT_H__
-#include "src/core/endpoint/tcp.h"
+#include "src/core/endpoint/endpoint.h"
+#include "src/core/iomgr/sockaddr.h"
#include <grpc/support/time.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-
/* Asynchronously connect to an address (specified as (addr, len)), and call
cb with arg and the completed connection when done (or call cb with arg and
NULL on failure) */
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
- void *arg, grpc_em *em,
- const struct sockaddr *addr, int addr_len,
- gpr_timespec deadline);
+ void *arg, const struct sockaddr *addr,
+ int addr_len, gpr_timespec deadline);
-#endif /* __GRPC_INTERNAL_ENDPOINT_TCP_CLIENT_H__ */
+#endif /* __GRPC_INTERNAL_IOMGR_TCP_CLIENT_H__ */
diff --git a/src/core/endpoint/tcp_client.c b/src/core/iomgr/tcp_client_posix.c
index c6f470ba88..8d2d7ab081 100644
--- a/src/core/endpoint/tcp_client.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -31,14 +31,17 @@
*
*/
-#include "src/core/endpoint/tcp_client.h"
+#include "src/core/iomgr/tcp_client.h"
#include <errno.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
-#include "src/core/endpoint/socket_utils.h"
+#include "src/core/iomgr/iomgr_libevent.h"
+#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/iomgr/socket_utils_posix.h"
+#include "src/core/iomgr/tcp_posix.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -46,7 +49,7 @@
typedef struct {
void (*cb)(void *arg, grpc_endpoint *tcp);
void *cb_arg;
- grpc_em_fd *fd;
+ grpc_fd *fd;
gpr_timespec deadline;
} async_connect;
@@ -71,12 +74,12 @@ error:
return 0;
}
-static void on_writable(void *acp, grpc_em_cb_status status) {
+static void on_writable(void *acp, grpc_iomgr_cb_status status) {
async_connect *ac = acp;
int so_error = 0;
socklen_t so_error_size;
int err;
- int fd = grpc_em_fd_get(ac->fd);
+ int fd = grpc_fd_get(ac->fd);
if (status == GRPC_CALLBACK_SUCCESS) {
do {
@@ -103,7 +106,7 @@ static void on_writable(void *acp, grpc_em_cb_status status) {
opened too many network connections. The "easy" fix:
don't do that! */
gpr_log(GPR_ERROR, "kernel out of buffers");
- grpc_em_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline);
+ grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline);
return;
} else {
goto error;
@@ -120,20 +123,18 @@ static void on_writable(void *acp, grpc_em_cb_status status) {
error:
ac->cb(ac->cb_arg, NULL);
- grpc_em_fd_destroy(ac->fd);
- gpr_free(ac->fd);
+ grpc_fd_destroy(ac->fd);
gpr_free(ac);
return;
great_success:
- ac->cb(ac->cb_arg, grpc_tcp_create_emfd(ac->fd));
+ ac->cb(ac->cb_arg, grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
gpr_free(ac);
}
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
- void *arg, grpc_em *em,
- const struct sockaddr *addr, int addr_len,
- gpr_timespec deadline) {
+ void *arg, const struct sockaddr *addr,
+ int addr_len, gpr_timespec deadline) {
int fd;
grpc_dualstack_mode dsmode;
int err;
@@ -167,7 +168,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
} while (err < 0 && errno == EINTR);
if (err >= 0) {
- cb(arg, grpc_tcp_create(fd, em));
+ cb(arg,
+ grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
return;
}
@@ -182,7 +184,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac->cb = cb;
ac->cb_arg = arg;
ac->deadline = deadline;
- ac->fd = gpr_malloc(sizeof(grpc_em_fd));
- grpc_em_fd_init(ac->fd, em, fd);
- grpc_em_fd_notify_on_write(ac->fd, on_writable, ac, deadline);
+ ac->fd = grpc_fd_create(fd);
+ grpc_fd_notify_on_write(ac->fd, on_writable, ac, deadline);
}
diff --git a/src/core/endpoint/tcp.c b/src/core/iomgr/tcp_posix.c
index 482344d265..8f63f75612 100644
--- a/src/core/endpoint/tcp.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/endpoint/tcp.h"
+#include "src/core/iomgr/tcp_posix.h"
#include <errno.h>
#include <stdlib.h>
@@ -40,7 +40,6 @@
#include <sys/socket.h>
#include <unistd.h>
-#include "src/core/eventmanager/em.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
@@ -249,8 +248,7 @@ static void slice_state_remove_last(grpc_tcp_slice_state *state, size_t bytes) {
typedef struct {
grpc_endpoint base;
- grpc_em *em;
- grpc_em_fd *em_fd;
+ grpc_fd *em_fd;
int fd;
size_t slice_size;
gpr_refcount refcount;
@@ -266,25 +264,19 @@ typedef struct {
} grpc_tcp;
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */,
- grpc_em_cb_status status);
+ grpc_iomgr_cb_status status);
static void grpc_tcp_handle_write(void *arg /* grpc_tcp */,
- grpc_em_cb_status status);
-
-#define DEFAULT_SLICE_SIZE 8192
-grpc_endpoint *grpc_tcp_create(int fd, grpc_em *em) {
- return grpc_tcp_create_dbg(fd, em, DEFAULT_SLICE_SIZE);
-}
+ grpc_iomgr_cb_status status);
static void grpc_tcp_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_em_fd_shutdown(tcp->em_fd);
+ grpc_fd_shutdown(tcp->em_fd);
}
static void grpc_tcp_unref(grpc_tcp *tcp) {
int refcount_zero = gpr_unref(&tcp->refcount);
if (refcount_zero) {
- grpc_em_fd_destroy(tcp->em_fd);
- gpr_free(tcp->em_fd);
+ grpc_fd_destroy(tcp->em_fd);
gpr_free(tcp);
}
}
@@ -317,7 +309,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
#define INLINE_SLICE_BUFFER_SIZE 8
#define MAX_READ_IOVEC 4
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */,
- grpc_em_cb_status status) {
+ grpc_iomgr_cb_status status) {
grpc_tcp *tcp = (grpc_tcp *)arg;
int iov_size = 1;
gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
@@ -385,8 +377,8 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */,
} else {
/* Spurious read event, consume it here */
slice_state_destroy(&read_state);
- grpc_em_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp,
- tcp->read_deadline);
+ grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp,
+ tcp->read_deadline);
}
} else {
/* TODO(klempner): Log interesting errors */
@@ -422,7 +414,7 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
tcp->read_user_data = user_data;
tcp->read_deadline = deadline;
gpr_ref(&tcp->refcount);
- grpc_em_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, deadline);
+ grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, deadline);
}
#define MAX_WRITE_IOVEC 16
@@ -469,7 +461,7 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
}
static void grpc_tcp_handle_write(void *arg /* grpc_tcp */,
- grpc_em_cb_status status) {
+ grpc_iomgr_cb_status status) {
grpc_tcp *tcp = (grpc_tcp *)arg;
grpc_endpoint_write_status write_status;
grpc_endpoint_cb_status cb_status;
@@ -494,8 +486,8 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */,
write_status = grpc_tcp_flush(tcp);
if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
- grpc_em_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp,
- tcp->write_deadline);
+ grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp,
+ tcp->write_deadline);
} else {
slice_state_destroy(&tcp->write_state);
if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
@@ -539,8 +531,8 @@ static grpc_endpoint_write_status grpc_tcp_write(
tcp->write_cb = cb;
tcp->write_user_data = user_data;
tcp->write_deadline = deadline;
- grpc_em_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp,
- tcp->write_deadline);
+ grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp,
+ tcp->write_deadline);
}
return status;
@@ -550,12 +542,10 @@ static const grpc_endpoint_vtable vtable = {grpc_tcp_notify_on_read,
grpc_tcp_write, grpc_tcp_shutdown,
grpc_tcp_destroy};
-static grpc_endpoint *grpc_tcp_create_generic(grpc_em_fd *em_fd,
- size_t slice_size) {
+grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
- tcp->fd = grpc_em_fd_get(em_fd);
- tcp->em = grpc_em_fd_get_em(em_fd);
+ tcp->fd = grpc_fd_get(em_fd);
tcp->read_cb = NULL;
tcp->write_cb = NULL;
tcp->read_user_data = NULL;
@@ -569,13 +559,3 @@ static grpc_endpoint *grpc_tcp_create_generic(grpc_em_fd *em_fd,
tcp->em_fd = em_fd;
return &tcp->base;
}
-
-grpc_endpoint *grpc_tcp_create_dbg(int fd, grpc_em *em, size_t slice_size) {
- grpc_em_fd *em_fd = gpr_malloc(sizeof(grpc_em_fd));
- grpc_em_fd_init(em_fd, em, fd);
- return grpc_tcp_create_generic(em_fd, slice_size);
-}
-
-grpc_endpoint *grpc_tcp_create_emfd(grpc_em_fd *em_fd) {
- return grpc_tcp_create_generic(em_fd, DEFAULT_SLICE_SIZE);
-}
diff --git a/src/core/endpoint/tcp.h b/src/core/iomgr/tcp_posix.h
index f6a2a19ec4..8a3c52894c 100644
--- a/src/core/endpoint/tcp.h
+++ b/src/core/iomgr/tcp_posix.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef __GRPC_INTERNAL_ENDPOINT_TCP_H__
-#define __GRPC_INTERNAL_ENDPOINT_TCP_H__
+#ifndef __GRPC_INTERNAL_IOMGR_TCP_POSIX_H__
+#define __GRPC_INTERNAL_IOMGR_TCP_POSIX_H__
/*
Low level TCP "bottom half" implementation, for use by transports built on
top of a TCP connection.
@@ -45,15 +45,12 @@
*/
#include "src/core/endpoint/endpoint.h"
-#include "src/core/eventmanager/em.h"
+#include "src/core/iomgr/iomgr_libevent.h"
-/* Create a tcp from an already connected file descriptor. */
-grpc_endpoint *grpc_tcp_create(int fd, grpc_em *em);
-/* Special version for debugging slice changes */
-grpc_endpoint *grpc_tcp_create_dbg(int fd, grpc_em *em, size_t slice_size);
+#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
-/* Special version for handing off ownership of an existing already created
- eventmanager fd. Must not have any outstanding callbacks. */
-grpc_endpoint *grpc_tcp_create_emfd(grpc_em_fd *em_fd);
+/* Create a tcp endpoint given a file desciptor and a read slice size.
+ Takes ownership of fd. */
+grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size);
-#endif /* __GRPC_INTERNAL_ENDPOINT_TCP_H__ */
+#endif /* __GRPC_INTERNAL_IOMGR_TCP_POSIX_H__ */
diff --git a/src/core/endpoint/tcp_server.h b/src/core/iomgr/tcp_server.h
index d81cdd000e..bd6b46f538 100644
--- a/src/core/endpoint/tcp_server.h
+++ b/src/core/iomgr/tcp_server.h
@@ -31,14 +31,13 @@
*
*/
-#ifndef __GRPC_INTERNAL_ENDPOINT_TCP_SERVER_H__
-#define __GRPC_INTERNAL_ENDPOINT_TCP_SERVER_H__
+#ifndef __GRPC_INTERNAL_IOMGR_TCP_SERVER_H__
+#define __GRPC_INTERNAL_IOMGR_TCP_SERVER_H__
#include <sys/types.h>
#include <sys/socket.h>
-#include "src/core/endpoint/tcp.h"
-#include "src/core/eventmanager/em.h"
+#include "src/core/endpoint/endpoint.h"
/* Forward decl of grpc_tcp_server */
typedef struct grpc_tcp_server grpc_tcp_server;
@@ -47,7 +46,7 @@ typedef struct grpc_tcp_server grpc_tcp_server;
typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep);
/* Create a server, initially not bound to any ports */
-grpc_tcp_server *grpc_tcp_server_create(grpc_em *em);
+grpc_tcp_server *grpc_tcp_server_create();
/* Start listening to bound ports */
void grpc_tcp_server_start(grpc_tcp_server *server, grpc_tcp_server_cb cb,
@@ -73,4 +72,4 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, int index);
void grpc_tcp_server_destroy(grpc_tcp_server *server);
-#endif /* __GRPC_INTERNAL_ENDPOINT_TCP_SERVER_H__ */
+#endif /* __GRPC_INTERNAL_IOMGR_TCP_SERVER_H__ */
diff --git a/src/core/endpoint/tcp_server.c b/src/core/iomgr/tcp_server_posix.c
index efd3dede50..22bbd45351 100644
--- a/src/core/endpoint/tcp_server.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -32,7 +32,7 @@
*/
#define _GNU_SOURCE
-#include "src/core/endpoint/tcp_server.h"
+#include "src/core/iomgr/tcp_server.h"
#include <limits.h>
#include <fcntl.h>
@@ -45,7 +45,10 @@
#include <string.h>
#include <errno.h>
-#include "src/core/endpoint/socket_utils.h"
+#include "src/core/iomgr/iomgr_libevent.h"
+#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/iomgr/socket_utils_posix.h"
+#include "src/core/iomgr/tcp_posix.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@@ -60,13 +63,12 @@ static int s_max_accept_queue_size;
/* one listening port */
typedef struct {
int fd;
- grpc_em_fd *emfd;
+ grpc_fd *emfd;
grpc_tcp_server *server;
} server_port;
/* the overall server */
struct grpc_tcp_server {
- grpc_em *em;
grpc_tcp_server_cb cb;
void *cb_arg;
@@ -82,12 +84,11 @@ struct grpc_tcp_server {
size_t port_capacity;
};
-grpc_tcp_server *grpc_tcp_server_create(grpc_em *em) {
+grpc_tcp_server *grpc_tcp_server_create() {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_mu_init(&s->mu);
gpr_cv_init(&s->cv);
s->active_ports = 0;
- s->em = em;
s->cb = NULL;
s->cb_arg = NULL;
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
@@ -101,7 +102,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s) {
gpr_mu_lock(&s->mu);
/* shutdown all fd's */
for (i = 0; i < s->nports; i++) {
- grpc_em_fd_shutdown(s->ports[i].emfd);
+ grpc_fd_shutdown(s->ports[i].emfd);
}
/* wait while that happens */
while (s->active_ports) {
@@ -112,8 +113,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s) {
/* delete ALL the things */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
- grpc_em_fd_destroy(sp->emfd);
- gpr_free(sp->emfd);
+ grpc_fd_destroy(sp->emfd);
}
gpr_free(s->ports);
gpr_free(s);
@@ -189,7 +189,7 @@ error:
}
/* event manager callback when reads are ready */
-static void on_read(void *arg, grpc_em_cb_status status) {
+static void on_read(void *arg, grpc_iomgr_cb_status status) {
server_port *sp = arg;
if (status != GRPC_CALLBACK_SUCCESS) {
@@ -208,11 +208,7 @@ static void on_read(void *arg, grpc_em_cb_status status) {
case EINTR:
continue;
case EAGAIN:
- if (GRPC_EM_OK != grpc_em_fd_notify_on_read(sp->emfd, on_read, sp,
- gpr_inf_future)) {
- gpr_log(GPR_ERROR, "Failed to register read request with em");
- goto error;
- }
+ grpc_fd_notify_on_read(sp->emfd, on_read, sp, gpr_inf_future);
return;
default:
gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
@@ -220,7 +216,9 @@ static void on_read(void *arg, grpc_em_cb_status status) {
}
}
- sp->server->cb(sp->server->cb_arg, grpc_tcp_create(fd, sp->server->em));
+ sp->server->cb(
+ sp->server->cb_arg,
+ grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
}
abort();
@@ -249,13 +247,11 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
s->ports = gpr_realloc(s->ports, sizeof(server_port *) * s->port_capacity);
}
sp = &s->ports[s->nports++];
- sp->emfd = gpr_malloc(sizeof(grpc_em_fd));
+ sp->emfd = grpc_fd_create(fd);
sp->fd = fd;
sp->server = s;
/* initialize the em desc */
- if (GRPC_EM_OK != grpc_em_fd_init(sp->emfd, s->em, fd)) {
- grpc_em_fd_destroy(sp->emfd);
- gpr_free(sp->emfd);
+ if (sp->emfd == NULL) {
s->nports--;
gpr_mu_unlock(&s->mu);
return 0;
@@ -326,8 +322,8 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb,
s->cb = cb;
s->cb_arg = cb_arg;
for (i = 0; i < s->nports; i++) {
- grpc_em_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i],
- gpr_inf_future);
+ grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i],
+ gpr_inf_future);
s->active_ports++;
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index 7ff48f9123..bfc2e3361a 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -34,7 +34,7 @@
#include "src/core/security/credentials.h"
#include "src/core/httpcli/httpcli.h"
-#include "src/core/surface/surface_em.h"
+#include "src/core/iomgr/iomgr.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string.h>
@@ -379,7 +379,7 @@ static void compute_engine_get_request_metadata(grpc_credentials *creds,
request.hdr_count = 1;
request.hdrs = &header;
grpc_httpcli_get(
- &request, gpr_time_add(gpr_now(), refresh_threshold), grpc_surface_em(),
+ &request, gpr_time_add(gpr_now(), refresh_threshold),
on_compute_engine_token_response,
grpc_credentials_metadata_request_create(creds, cb, user_data));
} else {
@@ -433,7 +433,8 @@ static int fake_oauth2_has_request_metadata_only(
return 1;
}
-void on_simulated_token_fetch_done(void *user_data, grpc_em_cb_status status) {
+void on_simulated_token_fetch_done(void *user_data,
+ grpc_iomgr_cb_status status) {
grpc_credentials_metadata_request *r =
(grpc_credentials_metadata_request *)user_data;
grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)r->creds;
@@ -448,10 +449,9 @@ static void fake_oauth2_get_request_metadata(grpc_credentials *creds,
grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)creds;
if (c->is_async) {
- GPR_ASSERT(grpc_em_add_callback(grpc_surface_em(),
- on_simulated_token_fetch_done,
- grpc_credentials_metadata_request_create(
- creds, cb, user_data)) == GRPC_EM_OK);
+ grpc_iomgr_add_callback(
+ on_simulated_token_fetch_done,
+ grpc_credentials_metadata_request_create(creds, cb, user_data));
} else {
cb(user_data, &c->access_token_md, 1, GRPC_CREDENTIALS_OK);
}
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 335d502217..28b56dd4c9 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -35,12 +35,11 @@
#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
-#include "src/core/endpoint/resolve_address.h"
-#include "src/core/endpoint/tcp_server.h"
+#include "src/core/iomgr/resolve_address.h"
+#include "src/core/iomgr/tcp_server.h"
#include "src/core/security/security_context.h"
#include "src/core/security/secure_transport_setup.h"
#include "src/core/surface/server.h"
-#include "src/core/surface/surface_em.h"
#include "src/core/transport/chttp2_transport.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -101,7 +100,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr) {
goto error;
}
- tcp = grpc_tcp_server_create(grpc_surface_em());
+ tcp = grpc_tcp_server_create();
if (!tcp) {
goto error;
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index a731c7cab7..1cffe3d32a 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -34,12 +34,12 @@
#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/channel/metadata_buffer.h"
+#include "src/core/iomgr/alarm.h"
+#include "src/core/surface/channel.h"
+#include "src/core/surface/completion_queue.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string.h>
-#include "src/core/surface/channel.h"
-#include "src/core/surface/completion_queue.h"
-#include "src/core/surface/surface_em.h"
#include <stdio.h>
#include <stdlib.h>
@@ -184,7 +184,7 @@ struct grpc_call {
void *finished_tag;
pending_read_queue prq;
- grpc_em_alarm alarm;
+ grpc_alarm alarm;
/* The current outstanding send message/context/invoke/end tag (only valid if
have_write == 1) */
@@ -258,7 +258,7 @@ void grpc_call_internal_unref(grpc_call *c) {
void grpc_call_destroy(grpc_call *c) {
gpr_mu_lock(&c->read_mu);
if (c->have_alarm) {
- grpc_em_alarm_cancel(&c->alarm);
+ grpc_alarm_cancel(&c->alarm);
c->have_alarm = 0;
}
gpr_mu_unlock(&c->read_mu);
@@ -813,7 +813,7 @@ void grpc_call_recv_finish(grpc_call_element *elem, int is_full_close) {
}
if (is_full_close) {
if (call->have_alarm) {
- grpc_em_alarm_cancel(&call->alarm);
+ grpc_alarm_cancel(&call->alarm);
call->have_alarm = 0;
}
call->received_finish = 1;
@@ -852,7 +852,7 @@ grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) {
return &call->incoming_metadata;
}
-static void call_alarm(void *arg, grpc_em_cb_status status) {
+static void call_alarm(void *arg, grpc_iomgr_cb_status status) {
grpc_call *call = arg;
if (status == GRPC_CALLBACK_SUCCESS) {
grpc_call_cancel(call);
@@ -868,6 +868,6 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
}
grpc_call_internal_ref(call);
call->have_alarm = 1;
- grpc_em_alarm_init(&call->alarm, grpc_surface_em(), call_alarm, call);
- grpc_em_alarm_add(&call->alarm, deadline);
+ grpc_alarm_init(&call->alarm, call_alarm, call);
+ grpc_alarm_add(&call->alarm, deadline);
}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index ec1c8477fa..7d30b64204 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -43,12 +43,11 @@
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_client_filter.h"
#include "src/core/channel/http_filter.h"
-#include "src/core/endpoint/resolve_address.h"
-#include "src/core/endpoint/tcp.h"
-#include "src/core/endpoint/tcp_client.h"
+#include "src/core/endpoint/endpoint.h"
+#include "src/core/iomgr/resolve_address.h"
+#include "src/core/iomgr/tcp_client.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/client.h"
-#include "src/core/surface/surface_em.h"
#include "src/core/transport/chttp2_transport.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -74,7 +73,6 @@ struct setup {
const char *target;
grpc_transport_setup_callback setup_callback;
void *setup_user_data;
- grpc_em *em;
};
static int maybe_try_next_resolved(request *r);
@@ -123,8 +121,8 @@ static int maybe_try_next_resolved(request *r) {
if (!r->resolved) return 0;
if (r->resolved_index == r->resolved->naddrs) return 0;
addr = &r->resolved->addrs[r->resolved_index++];
- grpc_tcp_client_connect(on_connect, r, r->setup->em,
- (struct sockaddr *)&addr->addr, addr->len,
+ grpc_tcp_client_connect(on_connect, r, (struct sockaddr *)&addr->addr,
+ addr->len,
grpc_client_setup_request_deadline(r->cs_request));
return 1;
}
@@ -201,13 +199,12 @@ grpc_channel *grpc_channel_create(const char *target,
channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
s->target = gpr_strdup(target);
- s->em = grpc_surface_em();
s->setup_callback = complete_setup;
s->setup_user_data = grpc_channel_get_channel_stack(channel);
grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel),
args, mdctx, initiate_setup, done_setup,
- s, s->em);
+ s);
return channel;
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 2002476777..1f3074fd42 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -36,10 +36,9 @@
#include <stdio.h>
#include <string.h>
-#include "src/core/eventmanager/em.h"
+#include "src/core/iomgr/iomgr_completion_queue_interface.h"
#include "src/core/surface/call.h"
#include "src/core/surface/event_string.h"
-#include "src/core/surface/surface_em.h"
#include "src/core/surface/surface_trace.h"
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
@@ -62,7 +61,6 @@ typedef struct event {
/* Completion queue structure */
struct grpc_completion_queue {
- grpc_em *em;
int allow_polling;
/* When refs drops to zero, we are in shutdown mode, and will be destroyable
@@ -89,7 +87,6 @@ grpc_completion_queue *grpc_completion_queue_create() {
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->refs, 1);
- cc->em = grpc_surface_em();
cc->allow_polling = 1;
return cc;
}
@@ -100,7 +97,7 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
/* Create and append an event to the queue. Returns the event so that its data
members can be filled in.
- Requires cc->em->mu locked. */
+ Requires grpc_iomgr_mu locked. */
static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data) {
@@ -126,7 +123,7 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
}
- gpr_cv_broadcast(&cc->em->cv);
+ gpr_cv_broadcast(&grpc_iomgr_cv);
return ev;
}
@@ -149,7 +146,7 @@ static void end_op_locked(grpc_completion_queue *cc,
if (gpr_unref(&cc->refs)) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- gpr_cv_broadcast(&cc->em->cv);
+ gpr_cv_broadcast(&grpc_iomgr_cv);
}
}
@@ -157,11 +154,11 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_byte_buffer *read) {
event *ev;
- gpr_mu_lock(&cc->em->mu);
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data);
ev->base.data.read = read;
end_op_locked(cc, GRPC_READ);
- gpr_mu_unlock(&cc->em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
@@ -169,11 +166,11 @@ void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(&cc->em->mu);
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.invoke_accepted = error;
end_op_locked(cc, GRPC_INVOKE_ACCEPTED);
- gpr_mu_unlock(&cc->em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
@@ -181,11 +178,11 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(&cc->em->mu);
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.write_accepted = error;
end_op_locked(cc, GRPC_WRITE_ACCEPTED);
- gpr_mu_unlock(&cc->em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
@@ -193,11 +190,11 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(&cc->em->mu);
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.finish_accepted = error;
end_op_locked(cc, GRPC_FINISH_ACCEPTED);
- gpr_mu_unlock(&cc->em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
@@ -206,24 +203,24 @@ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
void *user_data, size_t count,
grpc_metadata *elements) {
event *ev;
- gpr_mu_lock(&cc->em->mu);
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish,
user_data);
ev->base.data.client_metadata_read.count = count;
ev->base.data.client_metadata_read.elements = elements;
end_op_locked(cc, GRPC_CLIENT_METADATA_READ);
- gpr_mu_unlock(&cc->em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_status status) {
event *ev;
- gpr_mu_lock(&cc->em->mu);
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data);
ev->base.data.finished = status;
end_op_locked(cc, GRPC_FINISHED);
- gpr_mu_unlock(&cc->em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
@@ -232,7 +229,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
gpr_timespec deadline, size_t metadata_count,
grpc_metadata *metadata_elements) {
event *ev;
- gpr_mu_lock(&cc->em->mu);
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data);
ev->base.data.server_rpc_new.method = method;
ev->base.data.server_rpc_new.host = host;
@@ -240,7 +237,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
ev->base.data.server_rpc_new.metadata_count = metadata_count;
ev->base.data.server_rpc_new.metadata_elements = metadata_elements;
end_op_locked(cc, GRPC_SERVER_RPC_NEW);
- gpr_mu_unlock(&cc->em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
@@ -257,7 +254,7 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
event *ev = NULL;
- gpr_mu_lock(&cc->em->mu);
+ gpr_mu_lock(&grpc_iomgr_mu);
for (;;) {
if (cc->queue != NULL) {
gpr_uintptr bucket;
@@ -283,15 +280,15 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
ev = create_shutdown_event();
break;
}
- if (cc->allow_polling && grpc_em_work(cc->em, deadline)) {
+ if (cc->allow_polling && grpc_iomgr_work(deadline)) {
continue;
}
- if (gpr_cv_wait(&cc->em->cv, &cc->em->mu, deadline)) {
- gpr_mu_unlock(&cc->em->mu);
+ if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
+ gpr_mu_unlock(&grpc_iomgr_mu);
return NULL;
}
}
- gpr_mu_unlock(&cc->em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
}
@@ -329,7 +326,7 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
event *ev = NULL;
- gpr_mu_lock(&cc->em->mu);
+ gpr_mu_lock(&grpc_iomgr_mu);
for (;;) {
if ((ev = pluck_event(cc, tag))) {
break;
@@ -338,15 +335,15 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ev = create_shutdown_event();
break;
}
- if (cc->allow_polling && grpc_em_work(cc->em, deadline)) {
+ if (cc->allow_polling && grpc_iomgr_work(deadline)) {
continue;
}
- if (gpr_cv_wait(&cc->em->cv, &cc->em->mu, deadline)) {
- gpr_mu_unlock(&cc->em->mu);
+ if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
+ gpr_mu_unlock(&grpc_iomgr_mu);
return NULL;
}
}
- gpr_mu_unlock(&cc->em->mu);
+ gpr_mu_unlock(&grpc_iomgr_mu);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
}
@@ -355,11 +352,11 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cc->refs)) {
- gpr_mu_lock(&cc->em->mu);
+ gpr_mu_lock(&grpc_iomgr_mu);
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- gpr_cv_broadcast(&cc->em->cv);
- gpr_mu_unlock(&cc->em->mu);
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
}
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 92c0ac880d..832ec085c7 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -33,14 +33,14 @@
#include <grpc/grpc.h>
#include "src/core/statistics/census_interface.h"
-#include "src/core/surface/surface_em.h"
+#include "src/core/iomgr/iomgr.h"
void grpc_init() {
- grpc_surface_em_init();
+ grpc_iomgr_init();
census_init();
}
void grpc_shutdown() {
- grpc_surface_em_shutdown();
+ grpc_iomgr_shutdown();
census_shutdown();
}
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index f330b83521..3d5727927d 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -43,15 +43,13 @@
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_client_filter.h"
#include "src/core/channel/http_filter.h"
-#include "src/core/endpoint/resolve_address.h"
-#include "src/core/endpoint/tcp.h"
-#include "src/core/endpoint/tcp_client.h"
+#include "src/core/iomgr/resolve_address.h"
+#include "src/core/iomgr/tcp_client.h"
#include "src/core/security/auth.h"
#include "src/core/security/security_context.h"
#include "src/core/security/secure_transport_setup.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/client.h"
-#include "src/core/surface/surface_em.h"
#include "src/core/transport/chttp2_transport.h"
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
@@ -78,7 +76,6 @@ struct setup {
const char *target;
grpc_transport_setup_callback setup_callback;
void *setup_user_data;
- grpc_em *em;
};
static int maybe_try_next_resolved(request *r);
@@ -139,8 +136,8 @@ static int maybe_try_next_resolved(request *r) {
if (!r->resolved) return 0;
if (r->resolved_index == r->resolved->naddrs) return 0;
addr = &r->resolved->addrs[r->resolved_index++];
- grpc_tcp_client_connect(on_connect, r, r->setup->em,
- (struct sockaddr *)&addr->addr, addr->len,
+ grpc_tcp_client_connect(on_connect, r, (struct sockaddr *)&addr->addr,
+ addr->len,
grpc_client_setup_request_deadline(r->cs_request));
return 1;
}
@@ -230,7 +227,6 @@ grpc_channel *grpc_secure_channel_create_internal(
grpc_channel_args_destroy(args_copy);
s->target = gpr_strdup(target);
- s->em = grpc_surface_em();
s->setup_callback = complete_setup;
s->setup_user_data = grpc_channel_get_channel_stack(channel);
s->security_context =
@@ -238,6 +234,6 @@ grpc_channel *grpc_secure_channel_create_internal(
&context->base);
grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel),
args, mdctx, initiate_setup, done_setup,
- s, s->em);
+ s);
return channel;
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index d8d5a7adf1..2c859060eb 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -39,10 +39,10 @@
#include "src/core/channel/census_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
+#include "src/core/iomgr/iomgr.h"
#include "src/core/surface/call.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
-#include "src/core/surface/surface_em.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string.h>
@@ -73,7 +73,6 @@ struct grpc_server {
const grpc_channel_filter **channel_filters;
grpc_channel_args *channel_args;
grpc_completion_queue *cq;
- grpc_em *em;
gpr_mu mu;
@@ -193,7 +192,7 @@ static void orphan_channel(channel_data *chand) {
chand->next = chand->prev = chand;
}
-static void finish_destroy_channel(void *cd, grpc_em_cb_status status) {
+static void finish_destroy_channel(void *cd, grpc_iomgr_cb_status status) {
channel_data *chand = cd;
grpc_server *server = chand->server;
/*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/
@@ -206,7 +205,7 @@ static void destroy_channel(channel_data *chand) {
GPR_ASSERT(chand->server != NULL);
orphan_channel(chand);
server_ref(chand->server);
- grpc_em_add_callback(chand->server->em, finish_destroy_channel, chand);
+ grpc_iomgr_add_callback(finish_destroy_channel, chand);
}
static void queue_new_rpc(grpc_server *server, call_data *calld, void *tag) {
@@ -254,7 +253,7 @@ static void start_new_rpc(grpc_call_element *elem) {
gpr_mu_unlock(&server->mu);
}
-static void kill_zombie(void *elem, grpc_em_cb_status status) {
+static void kill_zombie(void *elem, grpc_iomgr_cb_status status) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
@@ -275,7 +274,7 @@ static void finish_rpc(grpc_call_element *elem, int is_full_close) {
/* fallthrough intended */
case NOT_STARTED:
calld->state = ZOMBIED;
- grpc_em_add_callback(chand->server->em, kill_zombie, elem);
+ grpc_iomgr_add_callback(kill_zombie, elem);
break;
case ZOMBIED:
break;
@@ -341,7 +340,7 @@ static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) {
}
}
-static void finish_shutdown_channel(void *cd, grpc_em_cb_status status) {
+static void finish_shutdown_channel(void *cd, grpc_iomgr_cb_status status) {
channel_data *chand = cd;
grpc_channel_op op;
op.type = GRPC_CHANNEL_DISCONNECT;
@@ -354,7 +353,7 @@ static void finish_shutdown_channel(void *cd, grpc_em_cb_status status) {
static void shutdown_channel(channel_data *chand) {
grpc_channel_internal_ref(chand->channel);
- grpc_em_add_callback(chand->server->em, finish_shutdown_channel, chand);
+ grpc_iomgr_add_callback(finish_shutdown_channel, chand);
}
static void init_call_elem(grpc_call_element *elem,
@@ -442,7 +441,6 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
gpr_mu_init(&server->mu);
server->cq = cq;
- server->em = grpc_surface_em();
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
server->root_channel_data.next = server->root_channel_data.prev =
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index db8924efea..a5fdd03774 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -35,10 +35,9 @@
#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
-#include "src/core/endpoint/resolve_address.h"
-#include "src/core/endpoint/tcp_server.h"
+#include "src/core/iomgr/resolve_address.h"
+#include "src/core/iomgr/tcp_server.h"
#include "src/core/surface/server.h"
-#include "src/core/surface/surface_em.h"
#include "src/core/transport/chttp2_transport.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -83,7 +82,7 @@ int grpc_server_add_http2_port(grpc_server *server, const char *addr) {
goto error;
}
- tcp = grpc_tcp_server_create(grpc_surface_em());
+ tcp = grpc_tcp_server_create();
if (!tcp) {
goto error;
}
diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h
index 37eb84ed02..24f26068bc 100644
--- a/src/core/transport/chttp2_transport.h
+++ b/src/core/transport/chttp2_transport.h
@@ -34,7 +34,7 @@
#ifndef __GRPC_INTERNAL_TRANSPORT_CHTTP2_TRANSPORT_H__
#define __GRPC_INTERNAL_TRANSPORT_CHTTP2_TRANSPORT_H__
-#include "src/core/endpoint/tcp.h"
+#include "src/core/endpoint/endpoint.h"
#include "src/core/transport/transport.h"
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,