aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar ctiller <ctiller@google.com>2015-01-07 12:13:17 -0800
committerGravatar Nicolas Noble <nnoble@google.com>2015-01-09 17:23:18 -0800
commite4b409364e4c493a66d4b2a6fe897075aa5c174e (patch)
tree29467626f50aea49e072e15004dd141625146709 /src/core
parent8232204a36712553b9eedb2dacab13b7c38642c6 (diff)
Add a --forever flag, to continuously run tests as things change.
Change on 2015/01/07 by ctiller <ctiller@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=83451760
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/child_channel.c36
-rw-r--r--src/core/channel/child_channel.h3
-rw-r--r--src/core/channel/client_channel.c20
-rw-r--r--src/core/channel/client_setup.c5
-rw-r--r--src/core/httpcli/httpcli.c10
-rw-r--r--src/core/iomgr/alarm.c34
-rw-r--r--src/core/iomgr/alarm_internal.h5
-rw-r--r--src/core/iomgr/endpoint.c14
-rw-r--r--src/core/iomgr/endpoint.h17
-rw-r--r--src/core/iomgr/fd_posix.c274
-rw-r--r--src/core/iomgr/fd_posix.h138
-rw-r--r--src/core/iomgr/iomgr.c204
-rw-r--r--src/core/iomgr/iomgr.h11
-rw-r--r--src/core/iomgr/iomgr_completion_queue_interface.h (renamed from src/core/iomgr/iomgr_posix.h)15
-rw-r--r--src/core/iomgr/iomgr_libevent.c652
-rw-r--r--src/core/iomgr/iomgr_libevent.h206
-rw-r--r--src/core/iomgr/iomgr_libevent_use_threads.c (renamed from src/core/iomgr/iomgr_internal.h)35
-rw-r--r--src/core/iomgr/pollset.c (renamed from src/core/iomgr/iomgr_posix.c)7
-rw-r--r--src/core/iomgr/pollset.h21
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c239
-rw-r--r--src/core/iomgr/pollset_posix.c342
-rw-r--r--src/core/iomgr/pollset_posix.h95
-rw-r--r--src/core/iomgr/resolve_address_posix.c6
-rw-r--r--src/core/iomgr/tcp_client_posix.c76
-rw-r--r--src/core/iomgr/tcp_posix.c67
-rw-r--r--src/core/iomgr/tcp_posix.h2
-rw-r--r--src/core/iomgr/tcp_server.h4
-rw-r--r--src/core/iomgr/tcp_server_posix.c38
-rw-r--r--src/core/security/credentials.c5
-rw-r--r--src/core/security/secure_endpoint.c15
-rw-r--r--src/core/security/secure_transport_setup.c12
-rw-r--r--src/core/security/server_secure_chttp2.c4
-rw-r--r--src/core/support/time.c15
-rw-r--r--src/core/surface/call.c4
-rw-r--r--src/core/surface/completion_queue.c66
-rw-r--r--src/core/surface/server.c13
-rw-r--r--src/core/surface/server.h3
-rw-r--r--src/core/surface/server_chttp2.c4
-rw-r--r--src/core/transport/chttp2_transport.c5
39 files changed, 1127 insertions, 1595 deletions
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
index 3778f4fb88..e67b823697 100644
--- a/src/core/channel/child_channel.c
+++ b/src/core/channel/child_channel.c
@@ -85,19 +85,19 @@ static void lb_channel_op(grpc_channel_element *elem,
grpc_channel_op *op) {
lb_channel_data *chand = elem->channel_data;
grpc_channel_element *back;
- int calling_back = 0;
switch (op->dir) {
case GRPC_CALL_UP:
gpr_mu_lock(&chand->mu);
back = chand->back;
- if (back) {
- chand->calling_back++;
- calling_back = 1;
- }
+ if (back) chand->calling_back++;
gpr_mu_unlock(&chand->mu);
if (back) {
back->filter->channel_op(chand->back, elem, op);
+ gpr_mu_lock(&chand->mu);
+ chand->calling_back--;
+ gpr_cv_broadcast(&chand->cv);
+ gpr_mu_unlock(&chand->mu);
} else if (op->type == GRPC_TRANSPORT_GOAWAY) {
gpr_slice_unref(op->data.goaway.message);
}
@@ -107,27 +107,23 @@ static void lb_channel_op(grpc_channel_element *elem,
break;
}
- gpr_mu_lock(&chand->mu);
switch (op->type) {
case GRPC_TRANSPORT_CLOSED:
+ gpr_mu_lock(&chand->mu);
chand->disconnected = 1;
maybe_destroy_channel(grpc_channel_stack_from_top_element(elem));
+ gpr_mu_unlock(&chand->mu);
break;
case GRPC_CHANNEL_GOAWAY:
+ gpr_mu_lock(&chand->mu);
chand->sent_goaway = 1;
+ gpr_mu_unlock(&chand->mu);
break;
case GRPC_CHANNEL_DISCONNECT:
case GRPC_TRANSPORT_GOAWAY:
case GRPC_ACCEPT_CALL:
break;
}
-
- if (calling_back) {
- chand->calling_back--;
- gpr_cv_signal(&chand->cv);
- maybe_destroy_channel(grpc_channel_stack_from_top_element(elem));
- }
- gpr_mu_unlock(&chand->mu);
}
/* Constructor for call_data */
@@ -181,9 +177,7 @@ const grpc_channel_filter grpc_child_channel_top_filter = {
#define LINK_BACK_ELEM_FROM_CALL(call) grpc_call_stack_element((call), 0)
-static void finally_destroy_channel(void *c, int success) {
- /* ignore success or not... this is a destruction callback and will only
- happen once - the only purpose here is to release resources */
+static void finally_destroy_channel(void *c, grpc_iomgr_cb_status status) {
grpc_child_channel *channel = c;
lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
/* wait for the initiator to leave the mutex */
@@ -193,7 +187,7 @@ static void finally_destroy_channel(void *c, int success) {
gpr_free(channel);
}
-static void send_farewells(void *c, int success) {
+static void send_farewells(void *c, grpc_iomgr_cb_status status) {
grpc_child_channel *channel = c;
grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel);
lb_channel_data *chand = lbelem->channel_data;
@@ -227,7 +221,7 @@ static void send_farewells(void *c, int success) {
static void maybe_destroy_channel(grpc_child_channel *channel) {
lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
if (chand->destroyed && chand->disconnected && chand->active_calls == 0 &&
- !chand->sending_farewell && !chand->calling_back) {
+ !chand->sending_farewell) {
grpc_iomgr_add_callback(finally_destroy_channel, channel);
} else if (chand->destroyed && !chand->disconnected &&
chand->active_calls == 0 && !chand->sending_farewell &&
@@ -255,16 +249,14 @@ grpc_child_channel *grpc_child_channel_create(
return stk;
}
-void grpc_child_channel_destroy(grpc_child_channel *channel,
- int wait_for_callbacks) {
+void grpc_child_channel_destroy(grpc_child_channel *channel) {
grpc_channel_element *lbelem = LINK_BACK_ELEM_FROM_CHANNEL(channel);
lb_channel_data *chand = lbelem->channel_data;
gpr_mu_lock(&chand->mu);
- while (wait_for_callbacks && chand->calling_back) {
+ while (chand->calling_back) {
gpr_cv_wait(&chand->cv, &chand->mu, gpr_inf_future);
}
-
chand->back = NULL;
chand->destroyed = 1;
maybe_destroy_channel(channel);
diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h
index 3ba4c1b8a9..9fb2a17e29 100644
--- a/src/core/channel/child_channel.h
+++ b/src/core/channel/child_channel.h
@@ -53,8 +53,7 @@ void grpc_child_channel_handle_op(grpc_child_channel *channel,
grpc_channel_op *op);
grpc_channel_element *grpc_child_channel_get_bottom_element(
grpc_child_channel *channel);
-void grpc_child_channel_destroy(grpc_child_channel *channel,
- int wait_for_callbacks);
+void grpc_child_channel_destroy(grpc_child_channel *channel);
grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
grpc_call_element *parent);
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 46283835a0..fd883a08ca 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -294,6 +294,14 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
}
}
+static void finally_destroy_channel(void *arg, grpc_iomgr_cb_status status) {
+ grpc_child_channel_destroy(arg);
+}
+
+static void destroy_channel_later(grpc_child_channel *channel) {
+ grpc_iomgr_add_callback(finally_destroy_channel, channel);
+}
+
static void channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem, grpc_channel_op *op) {
channel_data *chand = elem->channel_data;
@@ -309,7 +317,7 @@ static void channel_op(grpc_channel_element *elem,
gpr_mu_unlock(&chand->mu);
if (child_channel) {
grpc_child_channel_handle_op(child_channel, op);
- grpc_child_channel_destroy(child_channel, 1);
+ destroy_channel_later(child_channel);
} else {
gpr_slice_unref(op->data.goaway.message);
}
@@ -321,7 +329,7 @@ static void channel_op(grpc_channel_element *elem,
chand->active_child = NULL;
gpr_mu_unlock(&chand->mu);
if (child_channel) {
- grpc_child_channel_destroy(child_channel, 1);
+ destroy_channel_later(child_channel);
}
break;
case GRPC_TRANSPORT_GOAWAY:
@@ -336,7 +344,7 @@ static void channel_op(grpc_channel_element *elem,
}
gpr_mu_unlock(&chand->mu);
if (child_channel) {
- grpc_child_channel_destroy(child_channel, 0);
+ destroy_channel_later(child_channel);
}
gpr_slice_unref(op->data.goaway.message);
break;
@@ -352,7 +360,7 @@ static void channel_op(grpc_channel_element *elem,
}
gpr_mu_unlock(&chand->mu);
if (child_channel) {
- grpc_child_channel_destroy(child_channel, 0);
+ destroy_channel_later(child_channel);
}
break;
default:
@@ -437,7 +445,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
grpc_transport_setup_cancel(chand->transport_setup);
if (chand->active_child) {
- grpc_child_channel_destroy(chand->active_child, 1);
+ grpc_child_channel_destroy(chand->active_child);
chand->active_child = NULL;
}
@@ -541,7 +549,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
gpr_free(child_filters);
if (old_active) {
- grpc_child_channel_destroy(old_active, 1);
+ grpc_child_channel_destroy(old_active);
}
return result;
diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c
index ebaf816902..b1194e278d 100644
--- a/src/core/channel/client_setup.c
+++ b/src/core/channel/client_setup.c
@@ -166,7 +166,8 @@ int grpc_client_setup_request_should_continue(grpc_client_setup_request *r) {
return result;
}
-static void backoff_alarm_done(void *arg /* grpc_client_setup */, int success) {
+static void backoff_alarm_done(void *arg /* grpc_client_setup */,
+ grpc_iomgr_cb_status status) {
grpc_client_setup *s = arg;
grpc_client_setup_request *r = gpr_malloc(sizeof(grpc_client_setup_request));
r->setup = s;
@@ -176,7 +177,7 @@ static void backoff_alarm_done(void *arg /* grpc_client_setup */, int success) {
gpr_mu_lock(&s->mu);
s->active_request = r;
s->in_alarm = 0;
- if (!success) {
+ if (status != GRPC_CALLBACK_SUCCESS) {
if (0 == --s->refs) {
gpr_mu_unlock(&s->mu);
destroy_setup(s);
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 2143eeb63d..06d73e40f5 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -101,11 +101,12 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
switch (status) {
case GRPC_ENDPOINT_CB_OK:
- grpc_endpoint_notify_on_read(req->ep, on_read, req);
+ grpc_endpoint_notify_on_read(req->ep, on_read, req, gpr_inf_future);
break;
case GRPC_ENDPOINT_CB_EOF:
case GRPC_ENDPOINT_CB_ERROR:
case GRPC_ENDPOINT_CB_SHUTDOWN:
+ case GRPC_ENDPOINT_CB_TIMED_OUT:
if (!req->have_read_byte) {
next_address(req);
} else {
@@ -122,7 +123,7 @@ done:
static void on_written(internal_request *req) {
gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
- grpc_endpoint_notify_on_read(req->ep, on_read, req);
+ grpc_endpoint_notify_on_read(req->ep, on_read, req, gpr_inf_future);
}
static void done_write(void *arg, grpc_endpoint_cb_status status) {
@@ -135,6 +136,7 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) {
case GRPC_ENDPOINT_CB_EOF:
case GRPC_ENDPOINT_CB_SHUTDOWN:
case GRPC_ENDPOINT_CB_ERROR:
+ case GRPC_ENDPOINT_CB_TIMED_OUT:
next_address(req);
break;
}
@@ -143,8 +145,8 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) {
static void start_write(internal_request *req) {
gpr_slice_ref(req->request_text);
gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
- switch (
- grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) {
+ switch (grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req,
+ gpr_inf_future)) {
case GRPC_ENDPOINT_WRITE_DONE:
on_written(req);
break;
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c
index 2664879323..b7238f716a 100644
--- a/src/core/iomgr/alarm.c
+++ b/src/core/iomgr/alarm.c
@@ -71,8 +71,8 @@ static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
static shard_type *g_shard_queue[NUM_SHARDS];
-static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
- gpr_timespec *next, int success);
+static int run_some_expired_alarms(gpr_timespec now,
+ grpc_iomgr_cb_status status);
static gpr_timespec compute_min_deadline(shard_type *shard) {
return grpc_alarm_heap_is_empty(&shard->heap)
@@ -102,7 +102,7 @@ void grpc_alarm_list_init(gpr_timespec now) {
void grpc_alarm_list_shutdown() {
int i;
- while (run_some_expired_alarms(NULL, gpr_inf_future, NULL, 0))
+ while (run_some_expired_alarms(gpr_inf_future, GRPC_CALLBACK_CANCELLED))
;
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
@@ -233,7 +233,7 @@ void grpc_alarm_cancel(grpc_alarm *alarm) {
gpr_mu_unlock(&shard->mu);
if (triggered) {
- alarm->cb(alarm->cb_arg, 0);
+ alarm->cb(alarm->cb_arg, GRPC_CALLBACK_CANCELLED);
}
}
@@ -299,8 +299,8 @@ static size_t pop_alarms(shard_type *shard, gpr_timespec now,
return n;
}
-static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
- gpr_timespec *next, int success) {
+static int run_some_expired_alarms(gpr_timespec now,
+ grpc_iomgr_cb_status status) {
size_t n = 0;
size_t i;
grpc_alarm *alarms[MAX_ALARMS_PER_CHECK];
@@ -329,35 +329,19 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
note_deadline_change(g_shard_queue[0]);
}
- if (next) {
- *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline);
- }
-
gpr_mu_unlock(&g_mu);
gpr_mu_unlock(&g_checker_mu);
- } else if (next) {
- gpr_mu_lock(&g_mu);
- *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline);
- gpr_mu_unlock(&g_mu);
- }
-
- if (n && drop_mu) {
- gpr_mu_unlock(drop_mu);
}
for (i = 0; i < n; i++) {
- alarms[i]->cb(alarms[i]->cb_arg, success);
- }
-
- if (n && drop_mu) {
- gpr_mu_lock(drop_mu);
+ alarms[i]->cb(alarms[i]->cb_arg, status);
}
return n;
}
-int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) {
- return run_some_expired_alarms(drop_mu, now, next, 1);
+int grpc_alarm_check(gpr_timespec now) {
+ return run_some_expired_alarms(now, GRPC_CALLBACK_SUCCESS);
}
gpr_timespec grpc_alarm_list_next_timeout() {
diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h
index 12b6ab4286..e605ff84f9 100644
--- a/src/core/iomgr/alarm_internal.h
+++ b/src/core/iomgr/alarm_internal.h
@@ -34,12 +34,9 @@
#ifndef __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_
#define __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_
-#include <grpc/support/sync.h>
-#include <grpc/support/time.h>
-
/* iomgr internal api for dealing with alarms */
-int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next);
+int grpc_alarm_check(gpr_timespec now);
void grpc_alarm_list_init(gpr_timespec now);
void grpc_alarm_list_shutdown();
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index 9e5d56389d..f1944bf672 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -34,16 +34,14 @@
#include "src/core/iomgr/endpoint.h"
void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data) {
- ep->vtable->notify_on_read(ep, cb, user_data);
+ void *user_data, gpr_timespec deadline) {
+ ep->vtable->notify_on_read(ep, cb, user_data, deadline);
}
-grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data) {
- return ep->vtable->write(ep, slices, nslices, cb, user_data);
+grpc_endpoint_write_status grpc_endpoint_write(
+ grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) {
+ return ep->vtable->write(ep, slices, nslices, cb, user_data, deadline);
}
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index ec86d9a146..bbd800bea8 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -48,7 +48,8 @@ typedef enum grpc_endpoint_cb_status {
GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */
GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */
GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */
- GRPC_ENDPOINT_CB_ERROR /* Call interrupted by socket error */
+ GRPC_ENDPOINT_CB_ERROR, /* Call interrupted by socket error */
+ GRPC_ENDPOINT_CB_TIMED_OUT /* Call timed out */
} grpc_endpoint_cb_status;
typedef enum grpc_endpoint_write_status {
@@ -65,10 +66,10 @@ typedef void (*grpc_endpoint_write_cb)(void *user_data,
struct grpc_endpoint_vtable {
void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data);
+ void *user_data, gpr_timespec deadline);
grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices,
size_t nslices, grpc_endpoint_write_cb cb,
- void *user_data);
+ void *user_data, gpr_timespec deadline);
void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*shutdown)(grpc_endpoint *ep);
void (*destroy)(grpc_endpoint *ep);
@@ -76,7 +77,7 @@ struct grpc_endpoint_vtable {
/* When data is available on the connection, calls the callback with slices. */
void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data);
+ void *user_data, gpr_timespec deadline);
/* Write slices out to the socket.
@@ -84,11 +85,9 @@ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
returns GRPC_ENDPOINT_WRITE_DONE.
Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the
connection is ready for more data. */
-grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data);
+grpc_endpoint_write_status grpc_endpoint_write(
+ grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline);
/* Causes any pending read/write callbacks to run immediately with
GRPC_ENDPOINT_CB_SHUTDOWN status */
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
deleted file mode 100644
index 3cd2f9a8e0..0000000000
--- a/src/core/iomgr/fd_posix.c
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/iomgr/fd_posix.h"
-
-#include <assert.h>
-#include <unistd.h>
-
-#include "src/core/iomgr/iomgr_internal.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/useful.h>
-
-enum descriptor_state { NOT_READY, READY, WAITING };
-
-static void destroy(grpc_fd *fd) {
- grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
- gpr_mu_destroy(&fd->set_state_mu);
- gpr_free(fd->watchers);
- gpr_free(fd);
- grpc_iomgr_unref();
-}
-
-static void ref_by(grpc_fd *fd, int n) {
- gpr_atm_no_barrier_fetch_add(&fd->refst, n);
-}
-
-static void unref_by(grpc_fd *fd, int n) {
- if (gpr_atm_full_fetch_add(&fd->refst, -n) == n) {
- destroy(fd);
- }
-}
-
-static void do_nothing(void *ignored, int success) {}
-
-grpc_fd *grpc_fd_create(int fd) {
- grpc_fd *r = gpr_malloc(sizeof(grpc_fd));
- grpc_iomgr_ref();
- gpr_atm_rel_store(&r->refst, 1);
- gpr_atm_rel_store(&r->readst.state, NOT_READY);
- gpr_atm_rel_store(&r->writest.state, NOT_READY);
- gpr_mu_init(&r->set_state_mu);
- gpr_mu_init(&r->watcher_mu);
- gpr_atm_rel_store(&r->shutdown, 0);
- r->fd = fd;
- r->watchers = NULL;
- r->watcher_count = 0;
- r->watcher_capacity = 0;
- grpc_pollset_add_fd(grpc_backup_pollset(), r);
- return r;
-}
-
-int grpc_fd_is_orphaned(grpc_fd *fd) {
- return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
-}
-
-static void wake_watchers(grpc_fd *fd) {
- size_t i, n;
- gpr_mu_lock(&fd->watcher_mu);
- n = fd->watcher_count;
- for (i = 0; i < n; i++) {
- grpc_pollset_force_kick(fd->watchers[i]);
- }
- gpr_mu_unlock(&fd->watcher_mu);
-}
-
-void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
- fd->on_done = on_done ? on_done : do_nothing;
- fd->on_done_user_data = user_data;
- ref_by(fd, 1); /* remove active status, but keep referenced */
- wake_watchers(fd);
- close(fd->fd);
- unref_by(fd, 2); /* drop the reference */
-}
-
-/* increment refcount by two to avoid changing the orphan bit */
-void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
-
-void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
-
-typedef struct {
- grpc_iomgr_cb_func cb;
- void *arg;
-} callback;
-
-static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
- int allow_synchronous_callback) {
- if (allow_synchronous_callback) {
- cb(arg, success);
- } else {
- grpc_iomgr_add_delayed_callback(cb, arg, success);
- }
-}
-
-static void make_callbacks(callback *callbacks, size_t n, int success,
- int allow_synchronous_callback) {
- size_t i;
- for (i = 0; i < n; i++) {
- make_callback(callbacks[i].cb, callbacks[i].arg, success,
- allow_synchronous_callback);
- }
-}
-
-static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb,
- void *arg, int allow_synchronous_callback) {
- switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
- case NOT_READY:
- /* There is no race if the descriptor is already ready, so we skip
- the interlocked op in that case. As long as the app doesn't
- try to set the same upcall twice (which it shouldn't) then
- oldval should never be anything other than READY or NOT_READY. We
- don't
- check for user error on the fast path. */
- st->cb = cb;
- st->cb_arg = arg;
- if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) {
- /* swap was successful -- the closure will run after the next
- set_ready call. NOTE: we don't have an ABA problem here,
- since we should never have concurrent calls to the same
- notify_on function. */
- wake_watchers(fd);
- return;
- }
- /* swap was unsuccessful due to an intervening set_ready call.
- Fall through to the READY code below */
- case READY:
- assert(gpr_atm_acq_load(&st->state) == READY);
- gpr_atm_rel_store(&st->state, NOT_READY);
- make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown),
- allow_synchronous_callback);
- return;
- case WAITING:
- /* upcallptr was set to a different closure. This is an error! */
- gpr_log(GPR_ERROR,
- "User called a notify_on function with a previous callback still "
- "pending");
- abort();
- }
- gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
- abort();
-}
-
-static void set_ready_locked(grpc_fd_state *st, callback *callbacks,
- size_t *ncallbacks) {
- callback *c;
-
- switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
- case NOT_READY:
- if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) {
- /* swap was successful -- the closure will run after the next
- notify_on call. */
- return;
- }
- /* swap was unsuccessful due to an intervening set_ready call.
- Fall through to the WAITING code below */
- case WAITING:
- assert(gpr_atm_acq_load(&st->state) == WAITING);
- c = &callbacks[(*ncallbacks)++];
- c->cb = st->cb;
- c->arg = st->cb_arg;
- gpr_atm_rel_store(&st->state, NOT_READY);
- return;
- case READY:
- /* duplicate ready, ignore */
- return;
- }
-}
-
-static void set_ready(grpc_fd *fd, grpc_fd_state *st,
- int allow_synchronous_callback) {
- /* only one set_ready can be active at once (but there may be a racing
- notify_on) */
- int success;
- callback cb;
- size_t ncb = 0;
- gpr_mu_lock(&fd->set_state_mu);
- set_ready_locked(st, &cb, &ncb);
- gpr_mu_unlock(&fd->set_state_mu);
- success = !gpr_atm_acq_load(&fd->shutdown);
- make_callbacks(&cb, ncb, success, allow_synchronous_callback);
-}
-
-void grpc_fd_shutdown(grpc_fd *fd) {
- callback cb[2];
- size_t ncb = 0;
- gpr_mu_lock(&fd->set_state_mu);
- GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown));
- gpr_atm_rel_store(&fd->shutdown, 1);
- set_ready_locked(&fd->readst, cb, &ncb);
- set_ready_locked(&fd->writest, cb, &ncb);
- gpr_mu_unlock(&fd->set_state_mu);
- make_callbacks(cb, ncb, 0, 0);
-}
-
-void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb,
- void *read_cb_arg) {
- notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0);
-}
-
-void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
- void *write_cb_arg) {
- notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0);
-}
-
-gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- gpr_uint32 read_mask, gpr_uint32 write_mask) {
- /* keep track of pollers that have requested our events, in case they change
- */
- gpr_mu_lock(&fd->watcher_mu);
- if (fd->watcher_capacity == fd->watcher_count) {
- fd->watcher_capacity =
- GPR_MAX(fd->watcher_capacity + 8, fd->watcher_capacity * 3 / 2);
- fd->watchers = gpr_realloc(fd->watchers,
- fd->watcher_capacity * sizeof(grpc_pollset *));
- }
- fd->watchers[fd->watcher_count++] = pollset;
- gpr_mu_unlock(&fd->watcher_mu);
-
- return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) |
- (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0);
-}
-
-void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset) {
- size_t r, w, n;
-
- gpr_mu_lock(&fd->watcher_mu);
- n = fd->watcher_count;
- for (r = 0, w = 0; r < n; r++) {
- if (fd->watchers[r] == pollset) {
- fd->watcher_count--;
- continue;
- }
- fd->watchers[w++] = fd->watchers[r];
- }
- gpr_mu_unlock(&fd->watcher_mu);
-}
-
-void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {
- set_ready(fd, &fd->readst, allow_synchronous_callback);
-}
-
-void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) {
- set_ready(fd, &fd->writest, allow_synchronous_callback);
-}
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
deleted file mode 100644
index 232de0c3e0..0000000000
--- a/src/core/iomgr/fd_posix.h
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef __GRPC_INTERNAL_IOMGR_FD_POSIX_H_
-#define __GRPC_INTERNAL_IOMGR_FD_POSIX_H_
-
-#include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/pollset.h"
-#include <grpc/support/atm.h>
-#include <grpc/support/sync.h>
-#include <grpc/support/time.h>
-
-typedef struct {
- grpc_iomgr_cb_func cb;
- void *cb_arg;
- int success;
- gpr_atm state;
-} grpc_fd_state;
-
-typedef struct grpc_fd {
- int fd;
- /* refst format:
- bit0: 1=active/0=orphaned
- bit1-n: refcount
- meaning that mostly we ref by two to avoid altering the orphaned bit,
- and just unref by 1 when we're ready to flag the object as orphaned */
- gpr_atm refst;
-
- gpr_mu set_state_mu;
- gpr_atm shutdown;
-
- gpr_mu watcher_mu;
- grpc_pollset **watchers;
- size_t watcher_count;
- size_t watcher_capacity;
-
- grpc_fd_state readst;
- grpc_fd_state writest;
-
- grpc_iomgr_cb_func on_done;
- void *on_done_user_data;
-} grpc_fd;
-
-/* Create a wrapped file descriptor.
- Requires fd is a non-blocking file descriptor.
- This takes ownership of closing fd. */
-grpc_fd *grpc_fd_create(int fd);
-
-/* Releases fd to be asynchronously destroyed.
- on_done is called when the underlying file descriptor is definitely close()d.
- If on_done is NULL, no callback will be made.
- Requires: *fd initialized; no outstanding notify_on_read or
- notify_on_write. */
-void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data);
-
-/* Begin polling on an fd.
- Registers that the given pollset is interested in this fd - so that if read
- or writability interest changes, the pollset can be kicked to pick up that
- new interest.
- Return value is:
- (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
- i.e. a combination of read_mask and write_mask determined by the fd's current
- interest in said events.
- Polling strategies that do not need to alter their behavior depending on the
- fd's current interest (such as epoll) do not need to call this function. */
-gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- gpr_uint32 read_mask, gpr_uint32 write_mask);
-/* Complete polling previously started with grpc_fd_begin_poll */
-void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset);
-
-/* Return 1 if this fd is orphaned, 0 otherwise */
-int grpc_fd_is_orphaned(grpc_fd *fd);
-
-/* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */
-void grpc_fd_shutdown(grpc_fd *fd);
-
-/* Register read interest, causing read_cb to be called once when fd becomes
- readable, on deadline specified by deadline, or on shutdown triggered by
- grpc_fd_shutdown.
- read_cb will be called with read_cb_arg when *fd becomes readable.
- read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable,
- GRPC_CALLBACK_TIMED_OUT if the call timed out,
- and CANCELLED if the call was cancelled.
-
- Requires:This method must not be called before the read_cb for any previous
- call runs. Edge triggered events are used whenever they are supported by the
- underlying platform. This means that users must drain fd in read_cb before
- calling notify_on_read again. Users are also expected to handle spurious
- events, i.e read_cb is called while nothing can be readable from fd */
-void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb,
- void *read_cb_arg);
-
-/* Exactly the same semantics as above, except based on writable events. */
-void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
- void *write_cb_arg);
-
-/* Notification from the poller to an fd that it has become readable or
- writable.
- If allow_synchronous_callback is 1, allow running the fd callback inline
- in this callstack, otherwise register an asynchronous callback and return */
-void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback);
-void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback);
-
-/* Reference counting for fds */
-void grpc_fd_ref(grpc_fd *fd);
-void grpc_fd_unref(grpc_fd *fd);
-
-#endif /* __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ */
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
deleted file mode 100644
index 03f56a50a3..0000000000
--- a/src/core/iomgr/iomgr.c
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/iomgr/iomgr.h"
-
-#include <stdlib.h>
-
-#include "src/core/iomgr/iomgr_internal.h"
-#include "src/core/iomgr/alarm_internal.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/thd.h>
-#include <grpc/support/sync.h>
-
-typedef struct delayed_callback {
- grpc_iomgr_cb_func cb;
- void *cb_arg;
- int success;
- struct delayed_callback *next;
-} delayed_callback;
-
-static gpr_mu g_mu;
-static gpr_cv g_cv;
-static delayed_callback *g_cbs_head = NULL;
-static delayed_callback *g_cbs_tail = NULL;
-static int g_shutdown;
-static int g_refs;
-static gpr_event g_background_callback_executor_done;
-
-/* Execute followup callbacks continuously.
- Other threads may check in and help during pollset_work() */
-static void background_callback_executor(void *ignored) {
- gpr_mu_lock(&g_mu);
- while (!g_shutdown) {
- gpr_timespec deadline = gpr_inf_future;
- if (g_cbs_head) {
- delayed_callback *cb = g_cbs_head;
- g_cbs_head = cb->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
- cb->cb(cb->cb_arg, cb->success);
- gpr_free(cb);
- gpr_mu_lock(&g_mu);
- } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) {
- } else {
- gpr_cv_wait(&g_cv, &g_mu, deadline);
- }
- }
- gpr_mu_unlock(&g_mu);
- gpr_event_set(&g_background_callback_executor_done, (void *)1);
-}
-
-void grpc_kick_poller() { gpr_cv_broadcast(&g_cv); }
-
-void grpc_iomgr_init() {
- gpr_thd_id id;
- gpr_mu_init(&g_mu);
- gpr_cv_init(&g_cv);
- grpc_alarm_list_init(gpr_now());
- g_refs = 0;
- grpc_iomgr_platform_init();
- gpr_event_init(&g_background_callback_executor_done);
- gpr_thd_new(&id, background_callback_executor, NULL, NULL);
-}
-
-void grpc_iomgr_shutdown() {
- delayed_callback *cb;
- gpr_timespec shutdown_deadline =
- gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
-
- grpc_iomgr_platform_shutdown();
-
- gpr_mu_lock(&g_mu);
- g_shutdown = 1;
- while (g_cbs_head || g_refs) {
- gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs,
- g_cbs_head ? " and executing final callbacks" : "");
- while (g_cbs_head) {
- cb = g_cbs_head;
- g_cbs_head = cb->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
-
- cb->cb(cb->cb_arg, 0);
- gpr_free(cb);
- gpr_mu_lock(&g_mu);
- }
- if (g_refs) {
- if (gpr_cv_wait(&g_cv, &g_mu, shutdown_deadline) && g_cbs_head == NULL) {
- gpr_log(GPR_DEBUG,
- "Failed to free %d iomgr objects before shutdown deadline: "
- "memory leaks are likely",
- g_refs);
- break;
- }
- }
- }
- gpr_mu_unlock(&g_mu);
-
- gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future);
-
- grpc_alarm_list_shutdown();
- gpr_mu_destroy(&g_mu);
- gpr_cv_destroy(&g_cv);
-}
-
-void grpc_iomgr_ref() {
- gpr_mu_lock(&g_mu);
- ++g_refs;
- gpr_mu_unlock(&g_mu);
-}
-
-void grpc_iomgr_unref() {
- gpr_mu_lock(&g_mu);
- if (0 == --g_refs) {
- gpr_cv_signal(&g_cv);
- }
- gpr_mu_unlock(&g_mu);
-}
-
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
- int success) {
- delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback));
- dcb->cb = cb;
- dcb->cb_arg = cb_arg;
- dcb->success = success;
- gpr_mu_lock(&g_mu);
- dcb->next = NULL;
- if (!g_cbs_tail) {
- g_cbs_head = g_cbs_tail = dcb;
- } else {
- g_cbs_tail->next = dcb;
- g_cbs_tail = dcb;
- }
- gpr_cv_signal(&g_cv);
- gpr_mu_unlock(&g_mu);
-}
-
-void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) {
- grpc_iomgr_add_delayed_callback(cb, cb_arg, 1);
-}
-
-int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
- int n = 0;
- gpr_mu *retake_mu = NULL;
- delayed_callback *cb;
- for (;;) {
- /* check for new work */
- if (!gpr_mu_trylock(&g_mu)) {
- break;
- }
- cb = g_cbs_head;
- if (!cb) {
- gpr_mu_unlock(&g_mu);
- break;
- }
- g_cbs_head = cb->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
- /* if we have a mutex to drop, do so before executing work */
- if (drop_mu) {
- gpr_mu_unlock(drop_mu);
- retake_mu = drop_mu;
- drop_mu = NULL;
- }
- cb->cb(cb->cb_arg, success && cb->success);
- gpr_free(cb);
- n++;
- }
- if (retake_mu) {
- gpr_mu_lock(retake_mu);
- }
- return n;
-}
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index 16991a9b90..cf39f947bc 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -34,8 +34,17 @@
#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_H__
#define __GRPC_INTERNAL_IOMGR_IOMGR_H__
+/* Status passed to callbacks for grpc_em_fd_notify_on_read and
+ grpc_em_fd_notify_on_write. */
+typedef enum grpc_em_cb_status {
+ GRPC_CALLBACK_SUCCESS = 0,
+ GRPC_CALLBACK_TIMED_OUT,
+ GRPC_CALLBACK_CANCELLED,
+ GRPC_CALLBACK_DO_NOT_USE
+} grpc_iomgr_cb_status;
+
/* gRPC Callback definition */
-typedef void (*grpc_iomgr_cb_func)(void *arg, int success);
+typedef void (*grpc_iomgr_cb_func)(void *arg, grpc_iomgr_cb_status status);
void grpc_iomgr_init();
void grpc_iomgr_shutdown();
diff --git a/src/core/iomgr/iomgr_posix.h b/src/core/iomgr/iomgr_completion_queue_interface.h
index ca5af3e527..3c4efe773a 100644
--- a/src/core/iomgr/iomgr_posix.h
+++ b/src/core/iomgr/iomgr_completion_queue_interface.h
@@ -31,12 +31,15 @@
*
*/
-#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_
-#define __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_
+#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_
+#define __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_
-#include "src/core/iomgr/iomgr_internal.h"
+/* Internals of iomgr that are exposed only to be used for completion queue
+ implementation */
-void grpc_pollset_global_init();
-void grpc_pollset_global_shutdown();
+extern gpr_mu grpc_iomgr_mu;
+extern gpr_cv grpc_iomgr_cv;
-#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ */
+int grpc_iomgr_work(gpr_timespec deadline);
+
+#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ */
diff --git a/src/core/iomgr/iomgr_libevent.c b/src/core/iomgr/iomgr_libevent.c
new file mode 100644
index 0000000000..6188ab2749
--- /dev/null
+++ b/src/core/iomgr/iomgr_libevent.c
@@ -0,0 +1,652 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/iomgr_libevent.h"
+
+#include <unistd.h>
+#include <fcntl.h>
+
+#include "src/core/iomgr/alarm.h"
+#include "src/core/iomgr/alarm_internal.h"
+#include <grpc/support/atm.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+#include <event2/event.h>
+#include <event2/thread.h>
+
+#define ALARM_TRIGGER_INIT ((gpr_atm)0)
+#define ALARM_TRIGGER_INCREMENT ((gpr_atm)1)
+#define DONE_SHUTDOWN ((void *)1)
+
+#define POLLER_ID_INVALID ((gpr_atm)-1)
+
+/* Global data */
+struct event_base *g_event_base;
+gpr_mu grpc_iomgr_mu;
+gpr_cv grpc_iomgr_cv;
+static grpc_libevent_activation_data *g_activation_queue;
+static int g_num_pollers;
+static int g_num_fds;
+static int g_num_address_resolutions;
+static gpr_timespec g_last_poll_completed;
+static int g_shutdown_backup_poller;
+static gpr_event g_backup_poller_done;
+/* activated to break out of the event loop early */
+static struct event *g_timeout_ev;
+/* activated to safely break polling from other threads */
+static struct event *g_break_ev;
+static grpc_fd *g_fds_to_free;
+
+int evthread_use_threads(void);
+static void grpc_fd_impl_destroy(grpc_fd *impl);
+
+void grpc_iomgr_ref_address_resolution(int delta) {
+ gpr_mu_lock(&grpc_iomgr_mu);
+ GPR_ASSERT(!g_shutdown_backup_poller);
+ g_num_address_resolutions += delta;
+ if (0 == g_num_address_resolutions) {
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ }
+ gpr_mu_unlock(&grpc_iomgr_mu);
+}
+
+/* If anything is in the work queue, process one item and return 1.
+ Return 0 if there were no work items to complete.
+ Requires grpc_iomgr_mu locked, may unlock and relock during the call. */
+static int maybe_do_queue_work() {
+ grpc_libevent_activation_data *work = g_activation_queue;
+
+ if (work == NULL) return 0;
+
+ if (work->next == work) {
+ g_activation_queue = NULL;
+ } else {
+ g_activation_queue = work->next;
+ g_activation_queue->prev = work->prev;
+ g_activation_queue->next->prev = g_activation_queue->prev->next =
+ g_activation_queue;
+ }
+ work->next = work->prev = NULL;
+ /* force status to cancelled from ok when shutting down */
+ if (g_shutdown_backup_poller && work->status == GRPC_CALLBACK_SUCCESS) {
+ work->status = GRPC_CALLBACK_CANCELLED;
+ }
+ gpr_mu_unlock(&grpc_iomgr_mu);
+
+ work->cb(work->arg, work->status);
+
+ gpr_mu_lock(&grpc_iomgr_mu);
+ return 1;
+}
+
+/* Break out of the event loop on timeout */
+static void timer_callback(int fd, short events, void *context) {
+ event_base_loopbreak((struct event_base *)context);
+}
+
+static void break_callback(int fd, short events, void *context) {
+ event_base_loopbreak((struct event_base *)context);
+}
+
+static void free_fd_list(grpc_fd *impl) {
+ while (impl != NULL) {
+ grpc_fd *current = impl;
+ impl = impl->next;
+ grpc_fd_impl_destroy(current);
+ current->on_done(current->on_done_user_data, GRPC_CALLBACK_SUCCESS);
+ gpr_free(current);
+ }
+}
+
+static void maybe_free_fds() {
+ if (g_fds_to_free) {
+ free_fd_list(g_fds_to_free);
+ g_fds_to_free = NULL;
+ }
+}
+
+void grpc_kick_poller() { event_active(g_break_ev, EV_READ, 0); }
+
+/* Spend some time doing polling and libevent maintenance work if no other
+ thread is. This includes both polling for events and destroying/closing file
+ descriptor objects.
+ Returns 1 if polling was performed, 0 otherwise.
+ Requires grpc_iomgr_mu locked, may unlock and relock during the call. */
+static int maybe_do_polling_work(struct timeval delay) {
+ int status;
+
+ if (g_num_pollers) return 0;
+
+ g_num_pollers = 1;
+
+ maybe_free_fds();
+
+ gpr_mu_unlock(&grpc_iomgr_mu);
+
+ event_add(g_timeout_ev, &delay);
+ status = event_base_loop(g_event_base, EVLOOP_ONCE);
+ if (status < 0) {
+ gpr_log(GPR_ERROR, "event polling loop stops with error status %d", status);
+ }
+ event_del(g_timeout_ev);
+
+ gpr_mu_lock(&grpc_iomgr_mu);
+ maybe_free_fds();
+
+ g_num_pollers = 0;
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ return 1;
+}
+
+static int maybe_do_alarm_work(gpr_timespec now, gpr_timespec next) {
+ int r = 0;
+ if (gpr_time_cmp(next, now) < 0) {
+ gpr_mu_unlock(&grpc_iomgr_mu);
+ r = grpc_alarm_check(now);
+ gpr_mu_lock(&grpc_iomgr_mu);
+ }
+ return r;
+}
+
+int grpc_iomgr_work(gpr_timespec deadline) {
+ gpr_timespec now = gpr_now();
+ gpr_timespec next = grpc_alarm_list_next_timeout();
+ gpr_timespec delay_timespec = gpr_time_sub(deadline, now);
+ /* poll for no longer than one second */
+ gpr_timespec max_delay = gpr_time_from_seconds(1);
+ struct timeval delay;
+
+ if (gpr_time_cmp(delay_timespec, gpr_time_0) <= 0) {
+ return 0;
+ }
+
+ if (gpr_time_cmp(delay_timespec, max_delay) > 0) {
+ delay_timespec = max_delay;
+ }
+
+ /* Adjust delay to account for the next alarm, if applicable. */
+ delay_timespec = gpr_time_min(
+ delay_timespec, gpr_time_sub(grpc_alarm_list_next_timeout(), now));
+
+ delay = gpr_timeval_from_timespec(delay_timespec);
+
+ if (maybe_do_queue_work() || maybe_do_alarm_work(now, next) ||
+ maybe_do_polling_work(delay)) {
+ g_last_poll_completed = gpr_now();
+ return 1;
+ }
+
+ return 0;
+}
+
+static void backup_poller_thread(void *p) {
+ int backup_poller_engaged = 0;
+ /* allow no pollers for 100 milliseconds, then engage backup polling */
+ gpr_timespec allow_no_pollers = gpr_time_from_millis(100);
+
+ gpr_mu_lock(&grpc_iomgr_mu);
+ while (!g_shutdown_backup_poller) {
+ if (g_num_pollers == 0) {
+ gpr_timespec now = gpr_now();
+ gpr_timespec time_until_engage = gpr_time_sub(
+ allow_no_pollers, gpr_time_sub(now, g_last_poll_completed));
+ if (gpr_time_cmp(time_until_engage, gpr_time_0) <= 0) {
+ if (!backup_poller_engaged) {
+ gpr_log(GPR_DEBUG, "No pollers for a while - engaging backup poller");
+ backup_poller_engaged = 1;
+ }
+ if (!maybe_do_queue_work()) {
+ gpr_timespec next = grpc_alarm_list_next_timeout();
+ if (!maybe_do_alarm_work(now, next)) {
+ gpr_timespec deadline =
+ gpr_time_min(next, gpr_time_add(now, gpr_time_from_seconds(1)));
+ maybe_do_polling_work(
+ gpr_timeval_from_timespec(gpr_time_sub(deadline, now)));
+ }
+ }
+ } else {
+ if (backup_poller_engaged) {
+ gpr_log(GPR_DEBUG, "Backup poller disengaged");
+ backup_poller_engaged = 0;
+ }
+ gpr_mu_unlock(&grpc_iomgr_mu);
+ gpr_sleep_until(gpr_time_add(now, time_until_engage));
+ gpr_mu_lock(&grpc_iomgr_mu);
+ }
+ } else {
+ if (backup_poller_engaged) {
+ gpr_log(GPR_DEBUG, "Backup poller disengaged");
+ backup_poller_engaged = 0;
+ }
+ gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, gpr_inf_future);
+ }
+ }
+ gpr_mu_unlock(&grpc_iomgr_mu);
+
+ gpr_event_set(&g_backup_poller_done, (void *)1);
+}
+
+void grpc_iomgr_init() {
+ gpr_thd_id backup_poller_id;
+
+ if (evthread_use_threads() != 0) {
+ gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!");
+ abort();
+ }
+
+ grpc_alarm_list_init(gpr_now());
+
+ gpr_mu_init(&grpc_iomgr_mu);
+ gpr_cv_init(&grpc_iomgr_cv);
+ g_activation_queue = NULL;
+ g_num_pollers = 0;
+ g_num_fds = 0;
+ g_num_address_resolutions = 0;
+ g_last_poll_completed = gpr_now();
+ g_shutdown_backup_poller = 0;
+ g_fds_to_free = NULL;
+
+ gpr_event_init(&g_backup_poller_done);
+
+ g_event_base = NULL;
+ g_timeout_ev = NULL;
+ g_break_ev = NULL;
+
+ g_event_base = event_base_new();
+ if (!g_event_base) {
+ gpr_log(GPR_ERROR, "Failed to create the event base");
+ abort();
+ }
+
+ if (evthread_make_base_notifiable(g_event_base) != 0) {
+ gpr_log(GPR_ERROR, "Couldn't make event base notifiable cross threads!");
+ abort();
+ }
+
+ g_timeout_ev = evtimer_new(g_event_base, timer_callback, g_event_base);
+ g_break_ev = event_new(g_event_base, -1, EV_READ | EV_PERSIST, break_callback,
+ g_event_base);
+
+ event_add(g_break_ev, NULL);
+
+ gpr_thd_new(&backup_poller_id, backup_poller_thread, NULL, NULL);
+}
+
+void grpc_iomgr_shutdown() {
+ gpr_timespec fd_shutdown_deadline =
+ gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
+
+ /* broadcast shutdown */
+ gpr_mu_lock(&grpc_iomgr_mu);
+ while (g_num_fds > 0 || g_num_address_resolutions > 0) {
+ gpr_log(GPR_INFO,
+ "waiting for %d fds and %d name resolutions to be destroyed before "
+ "closing event manager",
+ g_num_fds, g_num_address_resolutions);
+ if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, fd_shutdown_deadline)) {
+ gpr_log(GPR_ERROR,
+ "not all fds or name resolutions destroyed before shutdown "
+ "deadline: memory leaks "
+ "are likely");
+ break;
+ } else if (g_num_fds == 0 && g_num_address_resolutions == 0) {
+ gpr_log(GPR_INFO, "all fds closed, all name resolutions finished");
+ }
+ }
+
+ g_shutdown_backup_poller = 1;
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_mu_unlock(&grpc_iomgr_mu);
+
+ gpr_event_wait(&g_backup_poller_done, gpr_inf_future);
+
+ grpc_alarm_list_shutdown();
+
+ /* drain pending work */
+ gpr_mu_lock(&grpc_iomgr_mu);
+ while (maybe_do_queue_work())
+ ;
+ gpr_mu_unlock(&grpc_iomgr_mu);
+
+ free_fd_list(g_fds_to_free);
+
+ /* complete shutdown */
+ gpr_mu_destroy(&grpc_iomgr_mu);
+ gpr_cv_destroy(&grpc_iomgr_cv);
+
+ if (g_timeout_ev != NULL) {
+ event_free(g_timeout_ev);
+ }
+
+ if (g_break_ev != NULL) {
+ event_free(g_break_ev);
+ }
+
+ if (g_event_base != NULL) {
+ event_base_free(g_event_base);
+ g_event_base = NULL;
+ }
+}
+
+static void add_task(grpc_libevent_activation_data *adata) {
+ gpr_mu_lock(&grpc_iomgr_mu);
+ if (g_activation_queue) {
+ adata->next = g_activation_queue;
+ adata->prev = adata->next->prev;
+ adata->next->prev = adata->prev->next = adata;
+ } else {
+ g_activation_queue = adata;
+ adata->next = adata->prev = adata;
+ }
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_mu_unlock(&grpc_iomgr_mu);
+}
+
+static void grpc_fd_impl_destroy(grpc_fd *impl) {
+ grpc_em_task_activity_type type;
+ grpc_libevent_activation_data *adata;
+
+ for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) {
+ adata = &(impl->task.activation[type]);
+ GPR_ASSERT(adata->next == NULL);
+ if (adata->ev != NULL) {
+ event_free(adata->ev);
+ adata->ev = NULL;
+ }
+ }
+
+ if (impl->shutdown_ev != NULL) {
+ event_free(impl->shutdown_ev);
+ impl->shutdown_ev = NULL;
+ }
+ gpr_mu_destroy(&impl->mu);
+ close(impl->fd);
+}
+
+/* Proxy callback to call a gRPC read/write callback */
+static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) {
+ grpc_fd *em_fd = arg;
+ grpc_iomgr_cb_status status = GRPC_CALLBACK_SUCCESS;
+ int run_read_cb = 0;
+ int run_write_cb = 0;
+ grpc_libevent_activation_data *rdata, *wdata;
+
+ gpr_mu_lock(&em_fd->mu);
+ if (em_fd->shutdown_started) {
+ status = GRPC_CALLBACK_CANCELLED;
+ } else if (status == GRPC_CALLBACK_SUCCESS && (what & EV_TIMEOUT)) {
+ status = GRPC_CALLBACK_TIMED_OUT;
+ /* TODO(klempner): This is broken if we are monitoring both read and write
+ events on the same fd -- generating a spurious event is okay, but
+ generating a spurious timeout is not. */
+ what |= (EV_READ | EV_WRITE);
+ }
+
+ if (what & EV_READ) {
+ switch (em_fd->read_state) {
+ case GRPC_FD_WAITING:
+ run_read_cb = 1;
+ em_fd->read_state = GRPC_FD_IDLE;
+ break;
+ case GRPC_FD_IDLE:
+ case GRPC_FD_CACHED:
+ em_fd->read_state = GRPC_FD_CACHED;
+ }
+ }
+ if (what & EV_WRITE) {
+ switch (em_fd->write_state) {
+ case GRPC_FD_WAITING:
+ run_write_cb = 1;
+ em_fd->write_state = GRPC_FD_IDLE;
+ break;
+ case GRPC_FD_IDLE:
+ case GRPC_FD_CACHED:
+ em_fd->write_state = GRPC_FD_CACHED;
+ }
+ }
+
+ if (run_read_cb) {
+ rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]);
+ rdata->status = status;
+ add_task(rdata);
+ } else if (run_write_cb) {
+ wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]);
+ wdata->status = status;
+ add_task(wdata);
+ }
+ gpr_mu_unlock(&em_fd->mu);
+}
+
+static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) {
+ /* TODO(klempner): This could just run directly in the calling thread, except
+ that libevent's handling of event_active() on an event which is already in
+ flight on a different thread is racy and easily triggers TSAN.
+ */
+ grpc_fd *impl = arg;
+ gpr_mu_lock(&impl->mu);
+ impl->shutdown_started = 1;
+ if (impl->read_state == GRPC_FD_WAITING) {
+ event_active(impl->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1);
+ }
+ if (impl->write_state == GRPC_FD_WAITING) {
+ event_active(impl->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1);
+ }
+ gpr_mu_unlock(&impl->mu);
+}
+
+grpc_fd *grpc_fd_create(int fd) {
+ int flags;
+ grpc_libevent_activation_data *rdata, *wdata;
+ grpc_fd *impl = gpr_malloc(sizeof(grpc_fd));
+
+ gpr_mu_lock(&grpc_iomgr_mu);
+ g_num_fds++;
+ gpr_mu_unlock(&grpc_iomgr_mu);
+
+ impl->shutdown_ev = NULL;
+ gpr_mu_init(&impl->mu);
+
+ flags = fcntl(fd, F_GETFL, 0);
+ GPR_ASSERT((flags & O_NONBLOCK) != 0);
+
+ impl->task.type = GRPC_EM_TASK_FD;
+ impl->fd = fd;
+
+ rdata = &(impl->task.activation[GRPC_EM_TA_READ]);
+ rdata->ev = NULL;
+ rdata->cb = NULL;
+ rdata->arg = NULL;
+ rdata->status = GRPC_CALLBACK_SUCCESS;
+ rdata->prev = NULL;
+ rdata->next = NULL;
+
+ wdata = &(impl->task.activation[GRPC_EM_TA_WRITE]);
+ wdata->ev = NULL;
+ wdata->cb = NULL;
+ wdata->arg = NULL;
+ wdata->status = GRPC_CALLBACK_SUCCESS;
+ wdata->prev = NULL;
+ wdata->next = NULL;
+
+ impl->read_state = GRPC_FD_IDLE;
+ impl->write_state = GRPC_FD_IDLE;
+
+ impl->shutdown_started = 0;
+ impl->next = NULL;
+
+ /* TODO(chenw): detect platforms where only level trigger is supported,
+ and set the event to non-persist. */
+ rdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ,
+ em_fd_cb, impl);
+ GPR_ASSERT(rdata->ev);
+
+ wdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE,
+ em_fd_cb, impl);
+ GPR_ASSERT(wdata->ev);
+
+ impl->shutdown_ev =
+ event_new(g_event_base, -1, EV_READ, em_fd_shutdown_cb, impl);
+ GPR_ASSERT(impl->shutdown_ev);
+
+ return impl;
+}
+
+static void do_nothing(void *ignored, grpc_iomgr_cb_status also_ignored) {}
+
+void grpc_fd_destroy(grpc_fd *impl, grpc_iomgr_cb_func on_done,
+ void *user_data) {
+ if (on_done == NULL) on_done = do_nothing;
+
+ gpr_mu_lock(&grpc_iomgr_mu);
+
+ /* Put the impl on the list to be destroyed by the poller. */
+ impl->on_done = on_done;
+ impl->on_done_user_data = user_data;
+ impl->next = g_fds_to_free;
+ g_fds_to_free = impl;
+ /* TODO(ctiller): kick the poller so it destroys this fd promptly
+ (currently we may wait up to a second) */
+
+ g_num_fds--;
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_mu_unlock(&grpc_iomgr_mu);
+}
+
+int grpc_fd_get(struct grpc_fd *em_fd) { return em_fd->fd; }
+
+/* TODO(chenw): should we enforce the contract that notify_on_read cannot be
+ called when the previously registered callback has not been called yet. */
+int grpc_fd_notify_on_read(grpc_fd *impl, grpc_iomgr_cb_func read_cb,
+ void *read_cb_arg, gpr_timespec deadline) {
+ int force_event = 0;
+ grpc_libevent_activation_data *rdata;
+ gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
+ struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
+ struct timeval *delayp =
+ gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL;
+
+ rdata = &impl->task.activation[GRPC_EM_TA_READ];
+
+ gpr_mu_lock(&impl->mu);
+ rdata->cb = read_cb;
+ rdata->arg = read_cb_arg;
+
+ force_event = (impl->shutdown_started || impl->read_state == GRPC_FD_CACHED);
+ impl->read_state = GRPC_FD_WAITING;
+
+ if (force_event) {
+ event_active(rdata->ev, EV_READ, 1);
+ } else if (event_add(rdata->ev, delayp) == -1) {
+ gpr_mu_unlock(&impl->mu);
+ return 0;
+ }
+ gpr_mu_unlock(&impl->mu);
+ return 1;
+}
+
+int grpc_fd_notify_on_write(grpc_fd *impl, grpc_iomgr_cb_func write_cb,
+ void *write_cb_arg, gpr_timespec deadline) {
+ int force_event = 0;
+ grpc_libevent_activation_data *wdata;
+ gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
+ struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
+ struct timeval *delayp =
+ gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL;
+
+ wdata = &impl->task.activation[GRPC_EM_TA_WRITE];
+
+ gpr_mu_lock(&impl->mu);
+ wdata->cb = write_cb;
+ wdata->arg = write_cb_arg;
+
+ force_event = (impl->shutdown_started || impl->write_state == GRPC_FD_CACHED);
+ impl->write_state = GRPC_FD_WAITING;
+
+ if (force_event) {
+ event_active(wdata->ev, EV_WRITE, 1);
+ } else if (event_add(wdata->ev, delayp) == -1) {
+ gpr_mu_unlock(&impl->mu);
+ return 0;
+ }
+ gpr_mu_unlock(&impl->mu);
+ return 1;
+}
+
+void grpc_fd_shutdown(grpc_fd *em_fd) {
+ event_active(em_fd->shutdown_ev, EV_READ, 1);
+}
+
+/* Sometimes we want a followup callback: something to be added from the
+ current callback for the EM to invoke once this callback is complete.
+ This is implemented by inserting an entry into an EM queue. */
+
+/* The following structure holds the field needed for adding the
+ followup callback. These are the argument for the followup callback,
+ the function to use for the followup callback, and the
+ activation data pointer used for the queues (to free in the CB) */
+struct followup_callback_arg {
+ grpc_iomgr_cb_func func;
+ void *cb_arg;
+ grpc_libevent_activation_data adata;
+};
+
+static void followup_proxy_callback(void *cb_arg, grpc_iomgr_cb_status status) {
+ struct followup_callback_arg *fcb_arg = cb_arg;
+ /* Invoke the function */
+ fcb_arg->func(fcb_arg->cb_arg, status);
+ gpr_free(fcb_arg);
+}
+
+void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) {
+ grpc_libevent_activation_data *adptr;
+ struct followup_callback_arg *fcb_arg;
+
+ fcb_arg = gpr_malloc(sizeof(*fcb_arg));
+ /* Set up the activation data and followup callback argument structures */
+ adptr = &fcb_arg->adata;
+ adptr->ev = NULL;
+ adptr->cb = followup_proxy_callback;
+ adptr->arg = fcb_arg;
+ adptr->status = GRPC_CALLBACK_SUCCESS;
+ adptr->prev = NULL;
+ adptr->next = NULL;
+
+ fcb_arg->func = cb;
+ fcb_arg->cb_arg = cb_arg;
+
+ /* Insert an activation data for the specified em */
+ add_task(adptr);
+}
diff --git a/src/core/iomgr/iomgr_libevent.h b/src/core/iomgr/iomgr_libevent.h
new file mode 100644
index 0000000000..5c088006a0
--- /dev/null
+++ b/src/core/iomgr/iomgr_libevent.h
@@ -0,0 +1,206 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__
+#define __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__
+
+#include "src/core/iomgr/iomgr.h"
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+
+typedef struct grpc_fd grpc_fd;
+
+/* gRPC event manager task "base class". This is pretend-inheritance in C89.
+ This should be the first member of any actual grpc_em task type.
+
+ Memory warning: expanding this will increase memory usage in any derived
+ class, so be careful.
+
+ For generality, this base can be on multiple task queues and can have
+ multiple event callbacks registered. Not all "derived classes" will use
+ this feature. */
+
+typedef enum grpc_libevent_task_type {
+ GRPC_EM_TASK_ALARM,
+ GRPC_EM_TASK_FD,
+ GRPC_EM_TASK_DO_NOT_USE
+} grpc_libevent_task_type;
+
+/* Different activity types to shape the callback and queueing arrays */
+typedef enum grpc_em_task_activity_type {
+ GRPC_EM_TA_READ, /* use this also for single-type events */
+ GRPC_EM_TA_WRITE,
+ GRPC_EM_TA_COUNT
+} grpc_em_task_activity_type;
+
+/* Include the following #define for convenience for tasks like alarms that
+ only have a single type */
+#define GRPC_EM_TA_ONLY GRPC_EM_TA_READ
+
+typedef struct grpc_libevent_activation_data {
+ struct event *ev; /* event activated on this callback type */
+ grpc_iomgr_cb_func cb; /* function pointer for callback */
+ void *arg; /* argument passed to cb */
+
+ /* Hold the status associated with the callback when queued */
+ grpc_iomgr_cb_status status;
+ /* Now set up to link activations into scheduler queues */
+ struct grpc_libevent_activation_data *prev;
+ struct grpc_libevent_activation_data *next;
+} grpc_libevent_activation_data;
+
+typedef struct grpc_libevent_task {
+ grpc_libevent_task_type type;
+
+ /* Now have an array of activation data elements: one for each activity
+ type that could get activated */
+ grpc_libevent_activation_data activation[GRPC_EM_TA_COUNT];
+} grpc_libevent_task;
+
+/* Initialize *em_fd.
+ Requires fd is a non-blocking file descriptor.
+
+ This takes ownership of closing fd.
+
+ Requires: *em_fd uninitialized. fd is a non-blocking file descriptor. */
+grpc_fd *grpc_fd_create(int fd);
+
+/* Cause *em_fd no longer to be initialized and closes the underlying fd.
+ on_done is called when the underlying file descriptor is definitely close()d.
+ If on_done is NULL, no callback will be made.
+ Requires: *em_fd initialized; no outstanding notify_on_read or
+ notify_on_write. */
+void grpc_fd_destroy(grpc_fd *em_fd, grpc_iomgr_cb_func on_done,
+ void *user_data);
+
+/* Returns the file descriptor associated with *em_fd. */
+int grpc_fd_get(grpc_fd *em_fd);
+
+/* Register read interest, causing read_cb to be called once when em_fd becomes
+ readable, on deadline specified by deadline, or on shutdown triggered by
+ grpc_fd_shutdown.
+ read_cb will be called with read_cb_arg when *em_fd becomes readable.
+ read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable,
+ GRPC_CALLBACK_TIMED_OUT if the call timed out,
+ and CANCELLED if the call was cancelled.
+
+ Requires:This method must not be called before the read_cb for any previous
+ call runs. Edge triggered events are used whenever they are supported by the
+ underlying platform. This means that users must drain em_fd in read_cb before
+ calling notify_on_read again. Users are also expected to handle spurious
+ events, i.e read_cb is called while nothing can be readable from em_fd */
+int grpc_fd_notify_on_read(grpc_fd *em_fd, grpc_iomgr_cb_func read_cb,
+ void *read_cb_arg, gpr_timespec deadline);
+
+/* Exactly the same semantics as above, except based on writable events. */
+int grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
+ void *write_cb_arg, gpr_timespec deadline);
+
+/* Cause any current and all future read/write callbacks to error out with
+ GRPC_CALLBACK_CANCELLED. */
+void grpc_fd_shutdown(grpc_fd *em_fd);
+
+/* =================== Event caching ===================
+ In order to not miss or double-return edges in the context of edge triggering
+ and multithreading, we need a per-fd caching layer in the eventmanager itself
+ to cache relevant events.
+
+ There are two types of events we care about: calls to notify_on_[read|write]
+ and readable/writable events for the socket from eventfd. There are separate
+ event caches for read and write.
+
+ There are three states:
+ 0. "waiting" -- There's been a call to notify_on_[read|write] which has not
+ had a corresponding event. In other words, we're waiting for an event so we
+ can run the callback.
+ 1. "idle" -- We are neither waiting nor have a cached event.
+ 2. "cached" -- There has been a read/write event without a waiting callback,
+ so we want to run the event next time the application calls
+ notify_on_[read|write].
+
+ The high level state diagram:
+
+ +--------------------------------------------------------------------+
+ | WAITING | IDLE | CACHED |
+ | | | |
+ | 1. --*-> 2. --+-> 3. --+\
+ | | | <--+/
+ | | | |
+ x+-- 6. 5. <-+-- 4. <-*-- |
+ | | | |
+ +--------------------------------------------------------------------+
+
+ Transitions right occur on read|write events. Transitions left occur on
+ notify_on_[read|write] events.
+ State transitions:
+ 1. Read|Write event while waiting -> run the callback and transition to idle.
+ 2. Read|Write event while idle -> transition to cached.
+ 3. Read|Write event with one already cached -> still cached.
+ 4. notify_on_[read|write] with event cached: run callback and transition to
+ idle.
+ 5. notify_on_[read|write] when idle: Store callback and transition to
+ waiting.
+ 6. notify_on_[read|write] when waiting: invalid. */
+
+typedef enum grpc_fd_state {
+ GRPC_FD_WAITING = 0,
+ GRPC_FD_IDLE = 1,
+ GRPC_FD_CACHED = 2
+} grpc_fd_state;
+
+/* gRPC file descriptor handle.
+ The handle is used to register read/write callbacks to a file descriptor */
+struct grpc_fd {
+ grpc_libevent_task task; /* Base class, callbacks, queues, etc */
+ int fd; /* File descriptor */
+
+ /* Note that the shutdown event is only needed as a workaround for libevent
+ not properly handling event_active on an in flight event. */
+ struct event *shutdown_ev; /* activated to trigger shutdown */
+
+ /* protect shutdown_started|read_state|write_state and ensure barriers
+ between notify_on_[read|write] and read|write callbacks */
+ gpr_mu mu;
+ int shutdown_started; /* 0 -> shutdown not started, 1 -> started */
+ grpc_fd_state read_state;
+ grpc_fd_state write_state;
+
+ /* descriptor delete list. These are destroyed during polling. */
+ struct grpc_fd *next;
+ grpc_iomgr_cb_func on_done;
+ void *on_done_user_data;
+};
+
+void grpc_iomgr_ref_address_resolution(int delta);
+
+#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */
diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_libevent_use_threads.c
index 5f72542777..af449342f0 100644
--- a/src/core/iomgr/iomgr_internal.h
+++ b/src/core/iomgr/iomgr_libevent_use_threads.c
@@ -31,21 +31,26 @@
*
*/
-#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_
-#define __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_
-
-#include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/iomgr_internal.h"
+/* Posix grpc event manager support code. */
+#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include <event2/thread.h>
-int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success);
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
- int success);
-
-void grpc_iomgr_ref();
-void grpc_iomgr_unref();
-
-void grpc_iomgr_platform_init();
-void grpc_iomgr_platform_shutdown();
+static int error_code = 0;
+static gpr_once threads_once = GPR_ONCE_INIT;
+static void evthread_threads_initialize(void) {
+ error_code = evthread_use_pthreads();
+ if (error_code) {
+ gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!");
+ }
+}
-#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ */
+/* Notify LibEvent that Posix pthread is used. */
+int evthread_use_threads() {
+ gpr_once_init(&threads_once, &evthread_threads_initialize);
+ /* For Pthreads or Windows threads, Libevent provides simple APIs to set
+ mutexes and conditional variables to support cross thread operations.
+ For other platforms, LibEvent provide callback APIs to hook mutexes and
+ conditional variables. */
+ return error_code;
+}
diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/pollset.c
index ff9195ec1d..62a0019eb3 100644
--- a/src/core/iomgr/iomgr_posix.c
+++ b/src/core/iomgr/pollset.c
@@ -31,8 +31,7 @@
*
*/
-#include "src/core/iomgr/iomgr_posix.h"
+#include "src/core/iomgr/pollset.h"
-void grpc_iomgr_platform_init() { grpc_pollset_global_init(); }
-
-void grpc_iomgr_platform_shutdown() { grpc_pollset_global_shutdown(); }
+void grpc_pollset_init(grpc_pollset *pollset) { pollset->unused = 0; }
+void grpc_pollset_destroy(grpc_pollset *pollset) {}
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index 7374a4ec13..ba1a9d5429 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -34,31 +34,18 @@
#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_H_
#define __GRPC_INTERNAL_IOMGR_POLLSET_H_
-#include <grpc/support/port_platform.h>
-
/* A grpc_pollset is a set of file descriptors that a higher level item is
interested in. For example:
- a server will typically keep a pollset containing all connected channels,
so that it can find new calls to service
- a completion queue might keep a pollset with an entry for each transport
that is servicing a call that it's tracking */
-
-#ifdef GPR_POSIX_SOCKET
-#include "src/core/iomgr/pollset_posix.h"
-#endif
+/* Eventually different implementations of iomgr will provide their own
+ grpc_pollset structs. As this is just a dummy wrapper to get the API in,
+ we just define a simple type here. */
+typedef struct { char unused; } grpc_pollset;
void grpc_pollset_init(grpc_pollset *pollset);
void grpc_pollset_destroy(grpc_pollset *pollset);
-/* Do some work on a pollset.
- May involve invoking asynchronous callbacks, or actually polling file
- descriptors.
- Requires GRPC_POLLSET_MU(pollset) locked.
- May unlock GRPC_POLLSET_MU(pollset) during its execution. */
-int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline);
-
-/* Break a pollset out of polling work
- Requires GRPC_POLLSET_MU(pollset) locked. */
-void grpc_pollset_kick(grpc_pollset *pollset);
-
#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_H_ */
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
deleted file mode 100644
index e482da94f7..0000000000
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_POSIX_MULTIPOLL_WITH_POLL
-
-#include "src/core/iomgr/pollset_posix.h"
-
-#include <errno.h>
-#include <poll.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include "src/core/iomgr/fd_posix.h"
-#include "src/core/iomgr/iomgr_internal.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/useful.h>
-
-typedef struct {
- /* all polled fds */
- size_t fd_count;
- size_t fd_capacity;
- grpc_fd **fds;
- /* fds being polled by the current poller: parallel arrays of pollfd and the
- * grpc_fd* that the pollfd was constructed from */
- size_t pfd_count;
- size_t pfd_capacity;
- grpc_fd **selfds;
- struct pollfd *pfds;
- /* fds that have been removed from the pollset explicitly */
- size_t del_count;
- size_t del_capacity;
- grpc_fd **dels;
-} pollset_hdr;
-
-static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset,
- grpc_fd *fd) {
- size_t i;
- pollset_hdr *h = pollset->data.ptr;
- /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
- for (i = 0; i < h->fd_count; i++) {
- if (h->fds[i] == fd) return;
- }
- if (h->fd_count == h->fd_capacity) {
- h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2);
- h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity);
- }
- h->fds[h->fd_count++] = fd;
- grpc_fd_ref(fd);
-}
-
-static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset,
- grpc_fd *fd) {
- /* will get removed next poll cycle */
- pollset_hdr *h = pollset->data.ptr;
- if (h->del_count == h->del_capacity) {
- h->del_capacity = GPR_MAX(h->del_capacity + 8, h->del_count * 3 / 2);
- h->dels = gpr_realloc(h->dels, sizeof(grpc_fd *) * h->del_capacity);
- }
- h->dels[h->del_count++] = fd;
- grpc_fd_ref(fd);
-}
-
-static void end_polling(grpc_pollset *pollset) {
- size_t i;
- pollset_hdr *h;
- h = pollset->data.ptr;
- for (i = 1; i < h->pfd_count; i++) {
- grpc_fd_end_poll(h->selfds[i], pollset);
- }
-}
-
-static int multipoll_with_poll_pollset_maybe_work(
- grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
- int timeout;
- int r;
- size_t i, np, nf, nd;
- pollset_hdr *h;
-
- if (pollset->counter) {
- return 0;
- }
- h = pollset->data.ptr;
- if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
- timeout = -1;
- } else {
- timeout = gpr_time_to_millis(gpr_time_sub(deadline, now));
- if (timeout <= 0) {
- return 1;
- }
- }
- if (h->pfd_capacity < h->fd_count + 1) {
- h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1);
- gpr_free(h->pfds);
- gpr_free(h->selfds);
- h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity);
- h->selfds = gpr_malloc(sizeof(grpc_fd *) * h->pfd_capacity);
- }
- nf = 0;
- np = 1;
- h->pfds[0].fd = grpc_kick_read_fd(pollset);
- h->pfds[0].events = POLLIN;
- h->pfds[0].revents = POLLOUT;
- for (i = 0; i < h->fd_count; i++) {
- int remove = grpc_fd_is_orphaned(h->fds[i]);
- for (nd = 0; nd < h->del_count; nd++) {
- if (h->fds[i] == h->dels[nd]) remove = 1;
- }
- if (remove) {
- grpc_fd_unref(h->fds[i]);
- } else {
- h->fds[nf++] = h->fds[i];
- h->pfds[np].events =
- grpc_fd_begin_poll(h->fds[i], pollset, POLLIN, POLLOUT);
- h->selfds[np] = h->fds[i];
- h->pfds[np].fd = h->fds[i]->fd;
- h->pfds[np].revents = 0;
- np++;
- }
- }
- h->pfd_count = np;
- h->fd_count = nf;
- for (nd = 0; nd < h->del_count; nd++) {
- grpc_fd_unref(h->dels[nd]);
- }
- h->del_count = 0;
- if (h->pfd_count == 0) {
- end_polling(pollset);
- return 0;
- }
- pollset->counter = 1;
- gpr_mu_unlock(&pollset->mu);
-
- r = poll(h->pfds, h->pfd_count, timeout);
- if (r < 0) {
- if (errno != EINTR) {
- gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
- }
- } else if (r == 0) {
- /* do nothing */
- } else {
- if (h->pfds[0].revents & POLLIN) {
- grpc_kick_drain(pollset);
- }
- for (i = 1; i < np; i++) {
- if (h->pfds[i].revents & POLLIN) {
- grpc_fd_become_readable(h->selfds[i], allow_synchronous_callback);
- }
- if (h->pfds[i].revents & POLLOUT) {
- grpc_fd_become_writable(h->selfds[i], allow_synchronous_callback);
- }
- }
- }
- end_polling(pollset);
-
- gpr_mu_lock(&pollset->mu);
- pollset->counter = 0;
- gpr_cv_broadcast(&pollset->cv);
- return 1;
-}
-
-static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
- size_t i;
- pollset_hdr *h = pollset->data.ptr;
- GPR_ASSERT(pollset->counter == 0);
- for (i = 0; i < h->fd_count; i++) {
- grpc_fd_unref(h->fds[i]);
- }
- for (i = 0; i < h->del_count; i++) {
- grpc_fd_unref(h->dels[i]);
- }
- gpr_free(h->pfds);
- gpr_free(h->selfds);
- gpr_free(h->fds);
- gpr_free(h->dels);
- gpr_free(h);
-}
-
-static const grpc_pollset_vtable multipoll_with_poll_pollset = {
- multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd,
- multipoll_with_poll_pollset_maybe_work,
- multipoll_with_poll_pollset_destroy};
-
-void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
- size_t nfds) {
- size_t i;
- pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
- pollset->vtable = &multipoll_with_poll_pollset;
- pollset->data.ptr = h;
- h->fd_count = nfds;
- h->fd_capacity = nfds;
- h->fds = gpr_malloc(nfds * sizeof(grpc_fd *));
- h->pfd_count = 0;
- h->pfd_capacity = 0;
- h->pfds = NULL;
- h->selfds = NULL;
- h->del_count = 0;
- h->del_capacity = 0;
- h->dels = NULL;
- for (i = 0; i < nfds; i++) {
- h->fds[i] = fds[i];
- grpc_fd_ref(fds[i]);
- }
-}
-
-#endif
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
deleted file mode 100644
index ff00e06429..0000000000
--- a/src/core/iomgr/pollset_posix.c
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/iomgr/pollset_posix.h"
-
-#include <errno.h>
-#include <poll.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-#include "src/core/iomgr/alarm_internal.h"
-#include "src/core/iomgr/fd_posix.h"
-#include "src/core/iomgr/iomgr_internal.h"
-#include "src/core/iomgr/socket_utils_posix.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/thd.h>
-#include <grpc/support/useful.h>
-
-/* kick pipes: we keep a sharded set of pipes to allow breaking from poll.
- Ideally this would be 1:1 with pollsets, but we'd like to avoid associating
- full kernel objects with each pollset to keep them lightweight, so instead
- keep a sharded set and allow associating a pollset with one of the shards.
-
- TODO(ctiller): move this out from this file, and allow an eventfd
- implementation on linux */
-
-#define LOG2_KICK_SHARDS 6
-#define KICK_SHARDS (1 << LOG2_KICK_SHARDS)
-
-static int g_kick_pipes[KICK_SHARDS][2];
-static grpc_pollset g_backup_pollset;
-static int g_shutdown_backup_poller;
-static gpr_event g_backup_poller_done;
-
-static void backup_poller(void *p) {
- gpr_timespec delta = gpr_time_from_millis(100);
- gpr_timespec last_poll = gpr_now();
-
- gpr_mu_lock(&g_backup_pollset.mu);
- while (g_shutdown_backup_poller == 0) {
- gpr_timespec next_poll = gpr_time_add(last_poll, delta);
- grpc_pollset_work(&g_backup_pollset, next_poll);
- gpr_mu_unlock(&g_backup_pollset.mu);
- gpr_sleep_until(next_poll);
- gpr_mu_lock(&g_backup_pollset.mu);
- last_poll = next_poll;
- }
- gpr_mu_unlock(&g_backup_pollset.mu);
-
- gpr_event_set(&g_backup_poller_done, (void *)1);
-}
-
-static size_t kick_shard(const grpc_pollset *info) {
- size_t x = (size_t)info;
- return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (KICK_SHARDS - 1);
-}
-
-int grpc_kick_read_fd(grpc_pollset *p) {
- return g_kick_pipes[kick_shard(p)][0];
-}
-
-static int grpc_kick_write_fd(grpc_pollset *p) {
- return g_kick_pipes[kick_shard(p)][1];
-}
-
-void grpc_pollset_force_kick(grpc_pollset *p) {
- char c = 0;
- while (write(grpc_kick_write_fd(p), &c, 1) != 1 && errno == EINTR)
- ;
-}
-
-void grpc_pollset_kick(grpc_pollset *p) {
- if (!p->counter) return;
- grpc_pollset_force_kick(p);
-}
-
-void grpc_kick_drain(grpc_pollset *p) {
- int fd = grpc_kick_read_fd(p);
- char buf[128];
- int r;
-
- for (;;) {
- r = read(fd, buf, sizeof(buf));
- if (r > 0) continue;
- if (r == 0) return;
- switch (errno) {
- case EAGAIN:
- return;
- case EINTR:
- continue;
- default:
- gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno));
- return;
- }
- }
-}
-
-/* global state management */
-
-grpc_pollset *grpc_backup_pollset() { return &g_backup_pollset; }
-
-void grpc_pollset_global_init() {
- int i;
- gpr_thd_id id;
-
- /* initialize the kick shards */
- for (i = 0; i < KICK_SHARDS; i++) {
- GPR_ASSERT(0 == pipe(g_kick_pipes[i]));
- GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][0], 1));
- GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][1], 1));
- }
-
- /* initialize the backup pollset */
- grpc_pollset_init(&g_backup_pollset);
-
- /* start the backup poller thread */
- g_shutdown_backup_poller = 0;
- gpr_event_init(&g_backup_poller_done);
- gpr_thd_new(&id, backup_poller, NULL, NULL);
-}
-
-void grpc_pollset_global_shutdown() {
- int i;
-
- /* terminate the backup poller thread */
- gpr_mu_lock(&g_backup_pollset.mu);
- g_shutdown_backup_poller = 1;
- gpr_mu_unlock(&g_backup_pollset.mu);
- gpr_event_wait(&g_backup_poller_done, gpr_inf_future);
-
- /* destroy the backup pollset */
- grpc_pollset_destroy(&g_backup_pollset);
-
- /* destroy the kick shards */
- for (i = 0; i < KICK_SHARDS; i++) {
- close(g_kick_pipes[i][0]);
- close(g_kick_pipes[i][1]);
- }
-}
-
-/* main interface */
-
-static void become_empty_pollset(grpc_pollset *pollset);
-static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd);
-
-void grpc_pollset_init(grpc_pollset *pollset) {
- gpr_mu_init(&pollset->mu);
- gpr_cv_init(&pollset->cv);
- become_empty_pollset(pollset);
-}
-
-void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
- gpr_mu_lock(&pollset->mu);
- pollset->vtable->add_fd(pollset, fd);
- gpr_cv_broadcast(&pollset->cv);
- gpr_mu_unlock(&pollset->mu);
-}
-
-void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
- gpr_mu_lock(&pollset->mu);
- pollset->vtable->del_fd(pollset, fd);
- gpr_cv_broadcast(&pollset->cv);
- gpr_mu_unlock(&pollset->mu);
-}
-
-int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
- /* pollset->mu already held */
- gpr_timespec now;
- now = gpr_now();
- if (gpr_time_cmp(now, deadline) > 0) {
- return 0;
- }
- if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) {
- return 1;
- }
- if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
- return 1;
- }
- return pollset->vtable->maybe_work(pollset, deadline, now, 1);
-}
-
-void grpc_pollset_destroy(grpc_pollset *pollset) {
- pollset->vtable->destroy(pollset);
- gpr_mu_destroy(&pollset->mu);
- gpr_cv_destroy(&pollset->cv);
-}
-
-/*
- * empty_pollset - a vtable that provides polling for NO file descriptors
- */
-
-static void empty_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
- become_unary_pollset(pollset, fd);
-}
-
-static void empty_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {}
-
-static int empty_pollset_maybe_work(grpc_pollset *pollset,
- gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
- return 0;
-}
-
-static void empty_pollset_destroy(grpc_pollset *pollset) {}
-
-static const grpc_pollset_vtable empty_pollset = {
- empty_pollset_add_fd, empty_pollset_del_fd, empty_pollset_maybe_work,
- empty_pollset_destroy};
-
-static void become_empty_pollset(grpc_pollset *pollset) {
- pollset->vtable = &empty_pollset;
-}
-
-/*
- * unary_poll_pollset - a vtable that provides polling for one file descriptor
- * via poll()
- */
-
-static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
- grpc_fd *fds[2];
- if (fd == pollset->data.ptr) return;
- fds[0] = pollset->data.ptr;
- fds[1] = fd;
- grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds));
- grpc_fd_unref(fds[0]);
-}
-
-static void unary_poll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
- if (fd == pollset->data.ptr) {
- grpc_fd_unref(pollset->data.ptr);
- become_empty_pollset(pollset);
- }
-}
-
-static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
- gpr_timespec deadline,
- gpr_timespec now,
- int allow_synchronous_callback) {
- struct pollfd pfd[2];
- grpc_fd *fd;
- int timeout;
- int r;
-
- if (pollset->counter) {
- return 0;
- }
- fd = pollset->data.ptr;
- if (grpc_fd_is_orphaned(fd)) {
- grpc_fd_unref(fd);
- become_empty_pollset(pollset);
- return 0;
- }
- if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
- timeout = -1;
- } else {
- timeout = gpr_time_to_millis(gpr_time_sub(deadline, now));
- if (timeout <= 0) {
- return 1;
- }
- }
- pfd[0].fd = grpc_kick_read_fd(pollset);
- pfd[0].events = POLLIN;
- pfd[0].revents = 0;
- pfd[1].fd = fd->fd;
- pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT);
- pfd[1].revents = 0;
- pollset->counter = 1;
- gpr_mu_unlock(&pollset->mu);
-
- r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout);
- if (r < 0) {
- if (errno != EINTR) {
- gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
- }
- } else if (r == 0) {
- /* do nothing */
- } else {
- if (pfd[0].revents & POLLIN) {
- grpc_kick_drain(pollset);
- }
- if (pfd[1].revents & POLLIN) {
- grpc_fd_become_readable(fd, allow_synchronous_callback);
- }
- if (pfd[1].revents & POLLOUT) {
- grpc_fd_become_writable(fd, allow_synchronous_callback);
- }
- }
-
- gpr_mu_lock(&pollset->mu);
- grpc_fd_end_poll(fd, pollset);
- pollset->counter = 0;
- gpr_cv_broadcast(&pollset->cv);
- return 1;
-}
-
-static void unary_poll_pollset_destroy(grpc_pollset *pollset) {
- GPR_ASSERT(pollset->counter == 0);
- grpc_fd_unref(pollset->data.ptr);
-}
-
-static const grpc_pollset_vtable unary_poll_pollset = {
- unary_poll_pollset_add_fd, unary_poll_pollset_del_fd,
- unary_poll_pollset_maybe_work, unary_poll_pollset_destroy};
-
-static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd) {
- pollset->vtable = &unary_poll_pollset;
- pollset->counter = 0;
- pollset->data.ptr = fd;
- grpc_fd_ref(fd);
-}
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
deleted file mode 100644
index f051079f5b..0000000000
--- a/src/core/iomgr/pollset_posix.h
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_
-#define __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_
-
-#include <grpc/support/sync.h>
-
-typedef struct grpc_pollset_vtable grpc_pollset_vtable;
-
-/* forward declare only in this file to avoid leaking impl details via
- pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not
- use the struct tag */
-struct grpc_fd;
-
-typedef struct grpc_pollset {
- /* pollsets under posix can mutate representation as fds are added and
- removed.
- For example, we may choose a poll() based implementation on linux for
- few fds, and an epoll() based implementation for many fds */
- const grpc_pollset_vtable *vtable;
- gpr_mu mu;
- gpr_cv cv;
- int counter;
- union {
- int fd;
- void *ptr;
- } data;
-} grpc_pollset;
-
-struct grpc_pollset_vtable {
- void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd);
- void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd);
- int (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline,
- gpr_timespec now, int allow_synchronous_callback);
- void (*destroy)(grpc_pollset *pollset);
-};
-
-#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
-#define GRPC_POLLSET_CV(pollset) (&(pollset)->cv)
-
-/* Add an fd to a pollset */
-void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd);
-/* Force remove an fd from a pollset (normally they are removed on the next
- poll after an fd is orphaned) */
-void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd);
-
-/* Force any current pollers to break polling */
-void grpc_pollset_force_kick(grpc_pollset *pollset);
-/* Returns the fd to listen on for kicks */
-int grpc_kick_read_fd(grpc_pollset *p);
-/* Call after polling has been kicked to leave the kicked state */
-void grpc_kick_drain(grpc_pollset *p);
-
-/* All fds get added to a backup pollset to ensure that progress is made
- regardless of applications listening to events. Relying on this is slow
- however (the backup pollset only listens every 100ms or so) - so it's not
- to be relied on. */
-grpc_pollset *grpc_backup_pollset();
-
-/* turn a pollset into a multipoller: platform specific */
-void grpc_platform_become_multipoller(grpc_pollset *pollset,
- struct grpc_fd **fds, size_t fd_count);
-
-#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ */
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index c9c2c5378a..a0a04297eb 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -41,7 +41,7 @@
#include <unistd.h>
#include <string.h>
-#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/iomgr_libevent.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/support/alloc.h>
@@ -201,7 +201,7 @@ static void do_request(void *rp) {
gpr_free(r->default_port);
gpr_free(r);
cb(arg, resolved);
- grpc_iomgr_unref();
+ grpc_iomgr_ref_address_resolution(-1);
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
@@ -213,7 +213,7 @@ void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
gpr_thd_id id;
- grpc_iomgr_ref();
+ grpc_iomgr_ref_address_resolution(1);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->cb = cb;
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index d675c2dcec..88b599b582 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -38,9 +38,7 @@
#include <string.h>
#include <unistd.h>
-#include "src/core/iomgr/alarm.h"
-#include "src/core/iomgr/iomgr_posix.h"
-#include "src/core/iomgr/pollset_posix.h"
+#include "src/core/iomgr/iomgr_libevent.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
@@ -51,11 +49,8 @@
typedef struct {
void (*cb)(void *arg, grpc_endpoint *tcp);
void *cb_arg;
- gpr_mu mu;
grpc_fd *fd;
gpr_timespec deadline;
- grpc_alarm alarm;
- int refs;
} async_connect;
static int prepare_socket(int fd) {
@@ -79,42 +74,21 @@ error:
return 0;
}
-static void on_alarm(void *acp, int success) {
- int done;
- async_connect *ac = acp;
- gpr_mu_lock(&ac->mu);
- if (ac->fd != NULL && success) {
- grpc_fd_shutdown(ac->fd);
- }
- done = (--ac->refs == 0);
- gpr_mu_unlock(&ac->mu);
- if (done) {
- gpr_mu_destroy(&ac->mu);
- gpr_free(ac);
- }
-}
-
-static void on_writable(void *acp, int success) {
+static void on_writable(void *acp, grpc_iomgr_cb_status status) {
async_connect *ac = acp;
int so_error = 0;
socklen_t so_error_size;
int err;
- int fd = ac->fd->fd;
- int done;
- grpc_endpoint *ep = NULL;
- void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
- void *cb_arg = ac->cb_arg;
-
- grpc_alarm_cancel(&ac->alarm);
+ int fd = grpc_fd_get(ac->fd);
- if (success) {
+ if (status == GRPC_CALLBACK_SUCCESS) {
do {
so_error_size = sizeof(so_error);
err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size);
} while (err < 0 && errno == EINTR);
if (err < 0) {
gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno));
- goto finish;
+ goto error;
} else if (so_error != 0) {
if (so_error == ENOBUFS) {
/* We will get one of these errors if we have run out of
@@ -132,7 +106,7 @@ static void on_writable(void *acp, int success) {
opened too many network connections. The "easy" fix:
don't do that! */
gpr_log(GPR_ERROR, "kernel out of buffers");
- grpc_fd_notify_on_write(ac->fd, on_writable, ac);
+ grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline);
return;
} else {
switch (so_error) {
@@ -143,31 +117,27 @@ static void on_writable(void *acp, int success) {
gpr_log(GPR_ERROR, "socket error: %d", so_error);
break;
}
- goto finish;
+ goto error;
}
} else {
- ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
- goto finish;
+ goto great_success;
}
} else {
- gpr_log(GPR_ERROR, "on_writable failed during connect");
- goto finish;
+ gpr_log(GPR_ERROR, "on_writable failed during connect: status=%d", status);
+ goto error;
}
abort();
-finish:
- gpr_mu_lock(&ac->mu);
- if (!ep) {
- grpc_fd_orphan(ac->fd, NULL, NULL);
- }
- done = (--ac->refs == 0);
- gpr_mu_unlock(&ac->mu);
- if (done) {
- gpr_mu_destroy(&ac->mu);
- gpr_free(ac);
- }
- cb(cb_arg, ep);
+error:
+ ac->cb(ac->cb_arg, NULL);
+ grpc_fd_destroy(ac->fd, NULL, NULL);
+ gpr_free(ac);
+ return;
+
+great_success:
+ ac->cb(ac->cb_arg, grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
+ gpr_free(ac);
}
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
@@ -206,7 +176,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
} while (err < 0 && errno == EINTR);
if (err >= 0) {
- gpr_log(GPR_DEBUG, "instant connect");
cb(arg,
grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
return;
@@ -222,10 +191,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac = gpr_malloc(sizeof(async_connect));
ac->cb = cb;
ac->cb_arg = arg;
+ ac->deadline = deadline;
ac->fd = grpc_fd_create(fd);
- gpr_mu_init(&ac->mu);
- ac->refs = 2;
-
- grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
- grpc_fd_notify_on_write(ac->fd, on_writable, ac);
+ grpc_fd_notify_on_write(ac->fd, on_writable, ac, deadline);
}
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 657f34aaf9..bc3ce69e47 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -255,14 +255,18 @@ typedef struct {
grpc_endpoint_read_cb read_cb;
void *read_user_data;
+ gpr_timespec read_deadline;
grpc_endpoint_write_cb write_cb;
void *write_user_data;
+ gpr_timespec write_deadline;
grpc_tcp_slice_state write_state;
} grpc_tcp;
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
-static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success);
+static void grpc_tcp_handle_read(void *arg /* grpc_tcp */,
+ grpc_iomgr_cb_status status);
+static void grpc_tcp_handle_write(void *arg /* grpc_tcp */,
+ grpc_iomgr_cb_status status);
static void grpc_tcp_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
@@ -272,7 +276,7 @@ static void grpc_tcp_shutdown(grpc_endpoint *ep) {
static void grpc_tcp_unref(grpc_tcp *tcp) {
int refcount_zero = gpr_unref(&tcp->refcount);
if (refcount_zero) {
- grpc_fd_orphan(tcp->em_fd, NULL, NULL);
+ grpc_fd_destroy(tcp->em_fd, NULL, NULL);
gpr_free(tcp);
}
}
@@ -304,7 +308,8 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
#define INLINE_SLICE_BUFFER_SIZE 8
#define MAX_READ_IOVEC 4
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
+static void grpc_tcp_handle_read(void *arg /* grpc_tcp */,
+ grpc_iomgr_cb_status status) {
grpc_tcp *tcp = (grpc_tcp *)arg;
int iov_size = 1;
gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
@@ -319,12 +324,18 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
0);
- if (!success) {
+ if (status == GRPC_CALLBACK_CANCELLED) {
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
grpc_tcp_unref(tcp);
return;
}
+ if (status == GRPC_CALLBACK_TIMED_OUT) {
+ call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_TIMED_OUT);
+ grpc_tcp_unref(tcp);
+ return;
+ }
+
/* TODO(klempner): Limit the amount we read at once. */
for (;;) {
allocated_bytes = slice_state_append_blocks_into_iovec(
@@ -366,7 +377,8 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
} else {
/* Spurious read event, consume it here */
slice_state_destroy(&read_state);
- grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp);
+ grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp,
+ tcp->read_deadline);
}
} else {
/* TODO(klempner): Log interesting errors */
@@ -395,13 +407,14 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
}
static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data) {
+ void *user_data, gpr_timespec deadline) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->read_user_data = user_data;
+ tcp->read_deadline = deadline;
gpr_ref(&tcp->refcount);
- grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp);
+ grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, deadline);
}
#define MAX_WRITE_IOVEC 16
@@ -447,24 +460,34 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
};
}
-static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
+static void grpc_tcp_handle_write(void *arg /* grpc_tcp */,
+ grpc_iomgr_cb_status status) {
grpc_tcp *tcp = (grpc_tcp *)arg;
grpc_endpoint_write_status write_status;
grpc_endpoint_cb_status cb_status;
grpc_endpoint_write_cb cb;
- if (!success) {
+ cb_status = GRPC_ENDPOINT_CB_OK;
+
+ if (status == GRPC_CALLBACK_CANCELLED) {
+ cb_status = GRPC_ENDPOINT_CB_SHUTDOWN;
+ } else if (status == GRPC_CALLBACK_TIMED_OUT) {
+ cb_status = GRPC_ENDPOINT_CB_TIMED_OUT;
+ }
+
+ if (cb_status != GRPC_ENDPOINT_CB_OK) {
slice_state_destroy(&tcp->write_state);
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN);
+ cb(tcp->write_user_data, cb_status);
grpc_tcp_unref(tcp);
return;
}
write_status = grpc_tcp_flush(tcp);
if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
- grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp);
+ grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp,
+ tcp->write_deadline);
} else {
slice_state_destroy(&tcp->write_state);
if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
@@ -479,11 +502,9 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
}
}
-static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data) {
+static grpc_endpoint_write_status grpc_tcp_write(
+ grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_endpoint_write_status status;
@@ -509,15 +530,17 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
gpr_ref(&tcp->refcount);
tcp->write_cb = cb;
tcp->write_user_data = user_data;
- grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp);
+ tcp->write_deadline = deadline;
+ grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp,
+ tcp->write_deadline);
}
return status;
}
static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
- grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_pollset_add_fd(pollset, tcp->em_fd);
+ /* tickle the pollset so we crash if things aren't wired correctly */
+ pollset->unused++;
}
static const grpc_endpoint_vtable vtable = {
@@ -527,12 +550,14 @@ static const grpc_endpoint_vtable vtable = {
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
- tcp->fd = em_fd->fd;
+ tcp->fd = grpc_fd_get(em_fd);
tcp->read_cb = NULL;
tcp->write_cb = NULL;
tcp->read_user_data = NULL;
tcp->write_user_data = NULL;
tcp->slice_size = slice_size;
+ tcp->read_deadline = gpr_inf_future;
+ tcp->write_deadline = gpr_inf_future;
slice_state_init(&tcp->write_state, NULL, 0, 0);
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h
index c3eef1b4b7..830394d534 100644
--- a/src/core/iomgr/tcp_posix.h
+++ b/src/core/iomgr/tcp_posix.h
@@ -45,7 +45,7 @@
*/
#include "src/core/iomgr/endpoint.h"
-#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/iomgr_libevent.h"
#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h
index 1968246b75..46fba13f90 100644
--- a/src/core/iomgr/tcp_server.h
+++ b/src/core/iomgr/tcp_server.h
@@ -49,8 +49,8 @@ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep);
grpc_tcp_server *grpc_tcp_server_create();
/* Start listening to bound ports */
-void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset *pollset,
- grpc_tcp_server_cb cb, void *cb_arg);
+void grpc_tcp_server_start(grpc_tcp_server *server, grpc_tcp_server_cb cb,
+ void *cb_arg);
/* Add a port to the server, returning true on success, or false otherwise.
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 5ed517748a..2abaf15ce4 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -45,7 +45,7 @@
#include <string.h>
#include <errno.h>
-#include "src/core/iomgr/pollset_posix.h"
+#include "src/core/iomgr/iomgr_libevent.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
@@ -97,8 +97,13 @@ grpc_tcp_server *grpc_tcp_server_create() {
return s;
}
+static void done_destroy(void *p, grpc_iomgr_cb_status status) {
+ gpr_event_set(p, (void *)1);
+}
+
void grpc_tcp_server_destroy(grpc_tcp_server *s) {
size_t i;
+ gpr_event fd_done;
gpr_mu_lock(&s->mu);
/* shutdown all fd's */
for (i = 0; i < s->nports; i++) {
@@ -113,7 +118,9 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s) {
/* delete ALL the things */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
- grpc_fd_orphan(sp->emfd, NULL, NULL);
+ gpr_event_init(&fd_done);
+ grpc_fd_destroy(sp->emfd, done_destroy, &fd_done);
+ gpr_event_wait(&fd_done, gpr_inf_future);
}
gpr_free(s->ports);
gpr_free(s);
@@ -189,10 +196,10 @@ error:
}
/* event manager callback when reads are ready */
-static void on_read(void *arg, int success) {
+static void on_read(void *arg, grpc_iomgr_cb_status status) {
server_port *sp = arg;
- if (!success) {
+ if (status != GRPC_CALLBACK_SUCCESS) {
goto error;
}
@@ -208,7 +215,7 @@ static void on_read(void *arg, int success) {
case EINTR:
continue;
case EAGAIN:
- grpc_fd_notify_on_read(sp->emfd, on_read, sp);
+ grpc_fd_notify_on_read(sp->emfd, on_read, sp, gpr_inf_future);
return;
default:
gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
@@ -247,10 +254,15 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
s->ports = gpr_realloc(s->ports, sizeof(server_port *) * s->port_capacity);
}
sp = &s->ports[s->nports++];
- sp->server = s;
- sp->fd = fd;
sp->emfd = grpc_fd_create(fd);
- GPR_ASSERT(sp->emfd);
+ sp->fd = fd;
+ sp->server = s;
+ /* initialize the em desc */
+ if (sp->emfd == NULL) {
+ s->nports--;
+ gpr_mu_unlock(&s->mu);
+ return 0;
+ }
gpr_mu_unlock(&s->mu);
return 1;
@@ -307,8 +319,8 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, int index) {
return (0 <= index && index < s->nports) ? s->ports[index].fd : -1;
}
-void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset,
- grpc_tcp_server_cb cb, void *cb_arg) {
+void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb,
+ void *cb_arg) {
size_t i;
GPR_ASSERT(cb);
gpr_mu_lock(&s->mu);
@@ -317,10 +329,8 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset,
s->cb = cb;
s->cb_arg = cb_arg;
for (i = 0; i < s->nports; i++) {
- if (pollset) {
- grpc_pollset_add_fd(pollset, s->ports[i].emfd);
- }
- grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i]);
+ grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i],
+ gpr_inf_future);
s->active_ports++;
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index c99ac8021d..442d2fa624 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -555,11 +555,12 @@ static int fake_oauth2_has_request_metadata_only(
return 1;
}
-void on_simulated_token_fetch_done(void *user_data, int success) {
+void on_simulated_token_fetch_done(void *user_data,
+ grpc_iomgr_cb_status status) {
grpc_credentials_metadata_request *r =
(grpc_credentials_metadata_request *)user_data;
grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)r->creds;
- GPR_ASSERT(success);
+ GPR_ASSERT(status == GRPC_CALLBACK_SUCCESS);
r->cb(r->user_data, &c->access_token_md, 1, GRPC_CREDENTIALS_OK);
grpc_credentials_metadata_request_destroy(r);
}
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index 7f0fdf73c9..cab09ca49d 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -184,7 +184,8 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
}
static void endpoint_notify_on_read(grpc_endpoint *secure_ep,
- grpc_endpoint_read_cb cb, void *user_data) {
+ grpc_endpoint_read_cb cb, void *user_data,
+ gpr_timespec deadline) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
ep->read_cb = cb;
ep->read_user_data = user_data;
@@ -199,7 +200,7 @@ static void endpoint_notify_on_read(grpc_endpoint *secure_ep,
return;
}
- grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep);
+ grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep, deadline);
}
static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
@@ -216,11 +217,9 @@ static void on_write(void *data, grpc_endpoint_cb_status error) {
secure_endpoint_unref(ep);
}
-static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data) {
+static grpc_endpoint_write_status endpoint_write(
+ grpc_endpoint *secure_ep, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) {
int i = 0;
int output_buffer_count = 0;
tsi_result result = TSI_OK;
@@ -309,7 +308,7 @@ static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep,
/* Need to keep the endpoint alive across a transport */
secure_endpoint_ref(ep);
status = grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices,
- output_buffer_count, on_write, ep);
+ output_buffer_count, on_write, ep, deadline);
if (status != GRPC_ENDPOINT_WRITE_PENDING) {
secure_endpoint_unref(ep);
}
diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c
index 3df91ed8e7..eb11251912 100644
--- a/src/core/security/secure_transport_setup.c
+++ b/src/core/security/secure_transport_setup.c
@@ -105,7 +105,6 @@ static void check_peer(grpc_secure_transport_setup *s) {
grpc_security_status peer_status;
tsi_peer peer;
tsi_result result = tsi_handshaker_extract_peer(s->handshaker, &peer);
-
if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Peer extraction failed with error %s",
tsi_result_to_string(result));
@@ -153,8 +152,9 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset);
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- write_status = grpc_endpoint_write(s->endpoint, &to_send, 1,
- on_handshake_data_sent_to_peer, s);
+ write_status =
+ grpc_endpoint_write(s->endpoint, &to_send, 1,
+ on_handshake_data_sent_to_peer, s, gpr_inf_future);
if (write_status == GRPC_ENDPOINT_WRITE_ERROR) {
gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
secure_transport_setup_done(s, 0);
@@ -200,7 +200,8 @@ static void on_handshake_data_received_from_peer(
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
grpc_endpoint_notify_on_read(s->endpoint,
- on_handshake_data_received_from_peer, setup);
+ on_handshake_data_received_from_peer, setup,
+ gpr_inf_future);
cleanup_slices(slices, nslices);
return;
} else {
@@ -257,7 +258,8 @@ static void on_handshake_data_sent_to_peer(void *setup,
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
grpc_endpoint_notify_on_read(s->endpoint,
- on_handshake_data_received_from_peer, setup);
+ on_handshake_data_received_from_peer, setup,
+ gpr_inf_future);
} else {
check_peer(s);
}
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 9d7c0e5e5a..28b56dd4c9 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -77,9 +77,9 @@ static void on_accept(void *server, grpc_endpoint *tcp) {
/* Note: the following code is the same with server_chttp2.c */
/* Server callback: start listening on our ports */
-static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) {
+static void start(grpc_server *server, void *tcpp) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(tcp, pollset, on_accept, server);
+ grpc_tcp_server_start(tcp, on_accept, server);
}
/* Server callback: destroy the tcp listener (so we don't generate further
diff --git a/src/core/support/time.c b/src/core/support/time.c
index 5330092f56..712bdf441c 100644
--- a/src/core/support/time.c
+++ b/src/core/support/time.c
@@ -249,18 +249,3 @@ gpr_timespec gpr_timespec_from_timeval(struct timeval t) {
ts.tv_nsec = t.tv_usec * 1000;
return ts;
}
-
-gpr_int32 gpr_time_to_millis(gpr_timespec t) {
- if (t.tv_sec >= 2147483) {
- if (t.tv_sec == 2147483 && t.tv_nsec < 648 * GPR_NS_PER_MS) {
- return 2147483 * GPR_MS_PER_SEC + t.tv_nsec / GPR_NS_PER_MS;
- }
- return 2147483647;
- } else if (t.tv_sec <= -2147483) {
- /* TODO(ctiller): correct handling here (it's so far in the past do we
- care?) */
- return -2147483648;
- } else {
- return t.tv_sec * GPR_MS_PER_SEC + t.tv_nsec / GPR_NS_PER_MS;
- }
-}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 9c5f5064eb..9ed617f665 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -878,9 +878,9 @@ grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) {
return &call->incoming_metadata;
}
-static void call_alarm(void *arg, int success) {
+static void call_alarm(void *arg, grpc_iomgr_cb_status status) {
grpc_call *call = arg;
- if (success) {
+ if (status == GRPC_CALLBACK_SUCCESS) {
grpc_call_cancel(call);
}
grpc_call_internal_unref(call);
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index b59c36e03a..4837f5b978 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -36,7 +36,7 @@
#include <stdio.h>
#include <string.h>
-#include "src/core/iomgr/pollset.h"
+#include "src/core/iomgr/iomgr_completion_queue_interface.h"
#include "src/core/surface/call.h"
#include "src/core/surface/event_string.h"
#include "src/core/surface/surface_trace.h"
@@ -61,7 +61,6 @@ typedef struct event {
/* Completion queue structure */
struct grpc_completion_queue {
- /* TODO(ctiller): see if this can be removed */
int allow_polling;
/* When refs drops to zero, we are in shutdown mode, and will be destroyable
@@ -101,7 +100,7 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
/* Create and append an event to the queue. Returns the event so that its data
members can be filled in.
- Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
+ Requires grpc_iomgr_mu locked. */
static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data) {
@@ -127,8 +126,7 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
}
- gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
- grpc_pollset_kick(&cc->pollset);
+ gpr_cv_broadcast(&grpc_iomgr_cv);
return ev;
}
@@ -151,7 +149,7 @@ static void end_op_locked(grpc_completion_queue *cc,
if (gpr_unref(&cc->refs)) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
+ gpr_cv_broadcast(&grpc_iomgr_cv);
}
}
@@ -159,11 +157,11 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_byte_buffer *read) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data);
ev->base.data.read = read;
end_op_locked(cc, GRPC_READ);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
@@ -171,11 +169,11 @@ void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.invoke_accepted = error;
end_op_locked(cc, GRPC_INVOKE_ACCEPTED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
@@ -183,11 +181,11 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.write_accepted = error;
end_op_locked(cc, GRPC_WRITE_ACCEPTED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
@@ -195,11 +193,11 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.finish_accepted = error;
end_op_locked(cc, GRPC_FINISH_ACCEPTED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
@@ -208,13 +206,13 @@ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
void *user_data, size_t count,
grpc_metadata *elements) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish,
user_data);
ev->base.data.client_metadata_read.count = count;
ev->base.data.client_metadata_read.elements = elements;
end_op_locked(cc, GRPC_CLIENT_METADATA_READ);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
@@ -223,14 +221,14 @@ void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_metadata *metadata_elements,
size_t metadata_count) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data);
ev->base.data.finished.status = status;
ev->base.data.finished.details = details;
ev->base.data.finished.metadata_count = metadata_count;
ev->base.data.finished.metadata_elements = metadata_elements;
end_op_locked(cc, GRPC_FINISHED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
@@ -239,7 +237,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
gpr_timespec deadline, size_t metadata_count,
grpc_metadata *metadata_elements) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data);
ev->base.data.server_rpc_new.method = method;
ev->base.data.server_rpc_new.host = host;
@@ -247,7 +245,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
ev->base.data.server_rpc_new.metadata_count = metadata_count;
ev->base.data.server_rpc_new.metadata_elements = metadata_elements;
end_op_locked(cc, GRPC_SERVER_RPC_NEW);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
@@ -264,7 +262,7 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
event *ev = NULL;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
for (;;) {
if (cc->queue != NULL) {
gpr_uintptr bucket;
@@ -290,16 +288,15 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
ev = create_shutdown_event();
break;
}
- if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
+ if (cc->allow_polling && grpc_iomgr_work(deadline)) {
continue;
}
- if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
- GRPC_POLLSET_MU(&cc->pollset), deadline)) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
+ gpr_mu_unlock(&grpc_iomgr_mu);
return NULL;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
}
@@ -337,7 +334,7 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
event *ev = NULL;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
for (;;) {
if ((ev = pluck_event(cc, tag))) {
break;
@@ -346,16 +343,15 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ev = create_shutdown_event();
break;
}
- if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
+ if (cc->allow_polling && grpc_iomgr_work(deadline)) {
continue;
}
- if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
- GRPC_POLLSET_MU(&cc->pollset), deadline)) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
+ gpr_mu_unlock(&grpc_iomgr_mu);
return NULL;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
}
@@ -364,11 +360,11 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cc->refs)) {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index aa544a97f2..3829e7aa8f 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -52,7 +52,7 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
typedef struct listener {
void *arg;
- void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset);
+ void (*start)(grpc_server *server, void *arg);
void (*destroy)(grpc_server *server, void *arg);
struct listener *next;
} listener;
@@ -192,7 +192,7 @@ static void orphan_channel(channel_data *chand) {
chand->next = chand->prev = chand;
}
-static void finish_destroy_channel(void *cd, int success) {
+static void finish_destroy_channel(void *cd, grpc_iomgr_cb_status status) {
channel_data *chand = cd;
grpc_server *server = chand->server;
/*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/
@@ -247,7 +247,7 @@ static void start_new_rpc(grpc_call_element *elem) {
gpr_mu_unlock(&server->mu);
}
-static void kill_zombie(void *elem, int success) {
+static void kill_zombie(void *elem, grpc_iomgr_cb_status status) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
@@ -336,7 +336,7 @@ static void channel_op(grpc_channel_element *elem,
}
}
-static void finish_shutdown_channel(void *cd, int success) {
+static void finish_shutdown_channel(void *cd, grpc_iomgr_cb_status status) {
channel_data *chand = cd;
grpc_channel_op op;
op.type = GRPC_CHANNEL_DISCONNECT;
@@ -468,7 +468,7 @@ void grpc_server_start(grpc_server *server) {
listener *l;
for (l = server->listeners; l; l = l->next) {
- l->start(server, l->arg, grpc_cq_pollset(server->cq));
+ l->start(server, l->arg);
}
}
@@ -596,8 +596,7 @@ void grpc_server_destroy(grpc_server *server) {
}
void grpc_server_add_listener(grpc_server *server, void *arg,
- void (*start)(grpc_server *server, void *arg,
- grpc_pollset *pollset),
+ void (*start)(grpc_server *server, void *arg),
void (*destroy)(grpc_server *server, void *arg)) {
listener *l = gpr_malloc(sizeof(listener));
l->arg = arg;
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index 61292ebe4e..f0773ab9d5 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -47,8 +47,7 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
/* Add a listener to the server: when the server starts, it will call start,
and when it shuts down, it will call destroy */
void grpc_server_add_listener(grpc_server *server, void *listener,
- void (*start)(grpc_server *server, void *arg,
- grpc_pollset *pollset),
+ void (*start)(grpc_server *server, void *arg),
void (*destroy)(grpc_server *server, void *arg));
/* Setup a transport - creates a channel stack, binds the transport to the
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index a0961bd449..a5fdd03774 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -59,9 +59,9 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
}
/* Server callback: start listening on our ports */
-static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) {
+static void start(grpc_server *server, void *tcpp) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(tcp, pollset, new_transport, server);
+ grpc_tcp_server_start(tcp, new_transport, server);
}
/* Server callback: destroy the tcp listener (so we don't generate further
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 5bf763e76f..a8ae8cc5bc 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -711,7 +711,7 @@ static void unlock(transport *t) {
/* write some bytes if necessary */
while (start_write) {
switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count,
- finish_write, t)) {
+ finish_write, t, gpr_inf_future)) {
case GRPC_ENDPOINT_WRITE_DONE:
/* grab the lock directly without wrappers since we just want to
continue writes if we loop: no need to check read callbacks again */
@@ -1617,6 +1617,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
case GRPC_ENDPOINT_CB_SHUTDOWN:
case GRPC_ENDPOINT_CB_EOF:
case GRPC_ENDPOINT_CB_ERROR:
+ case GRPC_ENDPOINT_CB_TIMED_OUT:
lock(t);
drop_connection(t);
t->reading = 0;
@@ -1641,7 +1642,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
if (keep_reading) {
- grpc_endpoint_notify_on_read(t->ep, recv_data, t);
+ grpc_endpoint_notify_on_read(t->ep, recv_data, t, gpr_inf_future);
}
}